diff --git a/amari/xlog.py b/amari/xlog.py index 656ac27472c3c7ea504aa3260263a34ec1eab853..ce4732888f19ed201eb6f7d91af1c8b0c3b70f68 100644 --- a/amari/xlog.py +++ b/amari/xlog.py @@ -48,6 +48,9 @@ # stops (TODO and rotate logs). Comes with current state of # connection to LTE service and xlog setup # - "xlog failure" on internal xlog error +# +# it is guaranteed that valid xlog stream has a sync event at least every LOS_window entries. +LOS_window = 1000 # TODO log file + rotate @@ -153,7 +156,15 @@ def xlog(ctx, wsuri, logspecv): if lconfig_get is None: logspecv.insert(isync+1, LogSpec("config_get", [], lsync.period)) + # verify that sync will come at least every LOS_window records + ns = 0 + for l in logspecv: + ns += (lsync.period / l.period) + if ns > LOS_window: + raise ValueError("meta.sync asked to come ~ every %d entries, " + "which is > LOS_window (%d)" % (ns, LOS_window)) + # ready to start logging xl = _XLogger(wsuri, logspecv, lsync.period) # emit sync at start/stop @@ -407,10 +418,18 @@ _xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bea # # The reader must provide .readline() method. # The ownership of wrapped reader is transferred to the Reader. -class ParseError(RuntimeError): pass +class ParseError(RuntimeError): pass # an entry could not be parsed +class LOSError(RuntimeError): pass # loss of synchronization class Reader: # ._r underlying IO reader # ._lineno current line number + # ._sync sync(attached) covering current message(s) | None + # for a message M sync S covering it can come in the log both before and after M + # S covers M if there is no other event/error E in between S and M + # ._n_nosync for how long we have not seen a sync + # ._emsgq [](Message|Event|Exception) + # queue for messages/events/... while we are reading ahead to look for sync + # non-message could be only at tail pass # xdict represents dict loaded from xlog entry. @@ -447,6 +466,9 @@ class SyncEvent(Event): def __init__(xr, r): xr._r = r xr._lineno = 0 + xr._sync = None + xr._n_nosync = 0 + xr._emsgq = [] # close release resources associated with the Reader. @func(Reader) @@ -456,6 +478,75 @@ def close(xr): # read returns next xlog entry or None at EOF. @func(Reader) def read(xr): # -> Event|Message|None + while 1: + # flush what we queued during readahead + if len(xr._emsgq) > 0: + x = xr._emsgq.pop(0) + + # event/error + if not isinstance(x, Message): + for _ in xr._emsgq: # non-message could be only at tail + assert not isinstance(_, Message), _ + if isinstance(x, SyncEvent) and x.state == "attached": + assert xr._sync is x # readahead should have set it + else: + # attach/detach/sync(detached)/error separate sync from other messages + xr._sync = None + if isinstance(x, Exception): + raise x + return x + + # message + assert isinstance(x, Message) + + # TODO verify messages we get/got against their schedule in covering sync. + # Raise LOSError (loss of synchronization) if what we actually see + # does not match what sync says it should be. + + return x + assert len(xr._emsgq) == 0 + + # read next message/event/... potentially reading ahead while looking for covering sync + while 1: + try: + x = xr._read1() + except Exception as e: + x = e + + # if we see EOF - we return it to outside only if the queue is empty + # otherwise it might be that readahead reaches EOF early, but at + # the time when queue flush would want to yield it to the user, the + # stream might have more data. + if x is None: + if len(xr._emsgq) == 0: + return None + else: + break # flush the queue + + xr._emsgq.append(x) + + # if we see sync(attached) - it will cover future messages till next + # event, and messages that are already queued + if isinstance(x, SyncEvent): + xr._n_nosync = 0 + if x.state == "attached": + xr._sync = x + else: + xr._n_nosync += 1 + if xr._n_nosync > LOS_window: + xr._emsgq.append(LOSError("no sync for %d entries" % xr._n_nosync)) + + if isinstance(x, Message): + if xr._sync is None: # have message and no sync - + continue # - continue to read ahead to find it + + # message with sync or any event - flush the queue + break + +# _read1 serves read by reading one next raw entry from the log. +# it does not detect loss of synchronization. +@func(Reader) +def _read1(xr): x = xr._jread1() if x is None: return None diff --git a/amari/xlog_test.py b/amari/xlog_test.py index 4539d2059b32a3ff123be829bd767b7af326fc72..5871452a8e9bb053e536a9a5bfdb8cb55ec25d2d 100644 --- a/amari/xlog_test.py +++ b/amari/xlog_test.py @@ -19,7 +19,7 @@ # See https://www.nexedi.com/licensing for rationale and options. from xlte.amari import xlog -from golang import func, defer +from golang import func, defer, b import io from pytest import raises @@ -86,6 +86,50 @@ zzzqqqrrrr assert _ is None +# verify that EOF is not returned prematurely due to readahead pre-hitting it +# sooner on the live stream. +@func +def test_Reader_readahead_vs_eof(): + fxlog = io.BytesIO(b'') + def logit(line): + line = b(line) + assert b'\n' not in line + pos = fxlog.tell() + fxlog.seek(0, io.SEEK_END) + fxlog.write(b'%s\n' % line) + fxlog.seek(pos, io.SEEK_SET) + + xr = xlog.Reader(fxlog) + def expect_msg(Ï„, msg): + _ = xr.read() + assert type(_) is xlog.Message + assert _.timestamp == Ï„ + assert _.message == msg + + logit('{"message": "aaa", "utc": 1}') + logit('{"message": "bbb", "utc": 2}') + expect_msg(1, "aaa") + expect_msg(2, "bbb") + + # ^^^ readahead hit EOF internally, but at the time next .read() is called, + # the stream has more data + logit('{"message": "ccc", "utc": 3}') + expect_msg(3, "ccc") + + # now, when read is called, the stream has no more data + # -> EOF is reported to the caller + _ = xr.read() + assert _ is None + + # now the stream has more data again + logit('{"message": "ddd", "utc": 4}') + logit('{"message": "eee", "utc": 5}') + expect_msg(4, "ddd") + expect_msg(5, "eee") + _ = xr.read() + assert _ is None + + def test_LogSpec(): logspec = "stats[samples,rf]/60s" spec = xlog.LogSpec.parse(logspec)