Commit 8f94b47d authored by Kirill Smelkov's avatar Kirill Smelkov

amari: Fix Conn rx timeout handling

Conn multiplexes many requests/responses over single WebSocket
connection. To do so it organizes dedicated receive thread that
continuously receives messages from underlying websocket connection and
dispatches received replies back to threads that issued corresponding requests.

An rx timeout in that receive thread is thus not something unexpected -
it can happen e.g. if there is simply no requests sent. But I missed
that in 61ad9032 (amari: Add functionality to interoperate with an
Amarisoft LTE service via WebSocket) and implicitly did not ignored such
global rx timeout. As the result `amari xlog` does not work properly if
period of requests is greater than timeout value, for example:

        $ xamari xlog ws://localhost:9001 ue_get/30s
        {"meta": {"event": "start", "time": 1670588996.0623107, "generator": "xlog ws://localhost:9001 ue_get[]/30.0s"}}
        {"meta": {"event": "service attach", "time": 1670588996.1852894, "srv_name": "ENB", "srv_type": "ENB", "srv_version": "2022-12-01"}}
        {"message":"config_get", ...}
  note  {"message":"ue_get","ue_list":[],"message_id":2,"time":3045.323,"utc":1670588996.423}
  ----> {"meta": {"event": "service detach", "time": 1670589026.3569217, "srv_name": "ENB", "srv_type": "ENB", "srv_version": "2022-12-01", "reason": "timed out"}}
        {"meta": {"event": "service attach", "time": 1670589029.485363, "srv_name": "ENB", "srv_type": "ENB", "srv_version": "2022-12-01"}}
        {"message":"config_get", ...}
        {"message":"ue_get","ue_list":[],"message_id":2,"time":3078.606,"utc":1670589029.706}
        ...

-> Fix it by ignoring global rx timeout.

NOTE: we must also add manual handling of per-request timeout when
waiting for corresponding reply. If we don't do that a situation where
particular reply does not come back, but replies for other requests are
coming back ok, will never be detected.

Here is how fixed version works now:

        $ xamari xlog ws://localhost:9001 ue_get/30s
        {"meta": {"event": "start", "time": 1670589223.0339117, "generator": "xlog ws://localhost:9001 ue_get[]/30.0s"}}
        {"meta": {"event": "service attach", "time": 1670589223.1970558, "srv_name": "ENB", "srv_type": "ENB", "srv_version": "2022-12-01"}}
        {"message":"config_get", ...}
        {"message":"ue_get","ue_list":[],"message_id":2,"time":3272.292,"utc":1670589223.391}
        {"message":"ue_get","ue_list":[],"message_id":3,"time":3302.274,"utc":1670589253.373}
        {"message":"ue_get","ue_list":[],"message_id":4,"time":3332.266,"utc":1670589283.365}
        ...

Note that ue_get messages are coming sequentially and there is no
"service detach" event, that was artificially popping up due to wrong
timeout handling.
parent 134f3a1e
...@@ -24,9 +24,8 @@ ...@@ -24,9 +24,8 @@
import websocket import websocket
import json import json
from golang import chan, panic from golang import chan, select, nilchan, func, defer, panic
from golang import context, sync from golang import context, sync, time
# ConnError represents an error happened during Conn IO operation. # ConnError represents an error happened during Conn IO operation.
...@@ -132,7 +131,15 @@ class Conn: ...@@ -132,7 +131,15 @@ class Conn:
def __serve_recv(conn, ctx): def __serve_recv(conn, ctx):
while 1: while 1:
try:
rx_raw = conn._ws.recv() rx_raw = conn._ws.recv()
except websocket.WebSocketTimeoutException:
# ignore global rx timeout. Because Conn is multiplexed .req()
# handles "wait for response" timeout individually for each
# request. We still want to enable global ._ws timeout so that
# ._sendmsg is not blocked forever.
continue
if len(rx_raw) == 0: if len(rx_raw) == 0:
raise ConnError("connection closed by peer") raise ConnError("connection closed by peer")
rx = json.loads(rx_raw) rx = json.loads(rx_raw)
...@@ -164,9 +171,27 @@ class Conn: ...@@ -164,9 +171,27 @@ class Conn:
rx, _ = conn.req_(msg, args_dict) rx, _ = conn.req_(msg, args_dict)
return rx return rx
@func
def req_(conn, msg, args_dict): # -> response, raw_response def req_(conn, msg, args_dict): # -> response, raw_response
rxq = conn._send_msg(msg, args_dict) rxq = conn._send_msg(msg, args_dict)
_, ok = rxq.recv_()
# handle rx timeout ourselves. We cannot rely on global rx timeout
# since e.g. other replies might be coming in again and again.
δt = conn._ws.gettimeout()
rxt = nilchan
if δt is not None:
_ = time.Timer(δt)
defer(_.stop)
rxt = _.c
_, _rx = select(
rxt.recv, # 0
rxq.recv_, # 1
)
if _ == 0:
raise websocket.WebSocketTimeoutException("timed out waiting for response")
_, ok = _rx
if not ok: if not ok:
# NOTE no need to lock - rxq is closed after ._down_err is set # NOTE no need to lock - rxq is closed after ._down_err is set
raise ConnError("recv") from conn._down_err raise ConnError("recv") from conn._down_err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment