Commit 1843f1a8 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Emit sync events periodically

So that xlog stream becomes self-synchronized and could be used even if
we start reading it from some intermediate point instead of only from
the beginning.

We will need this in general - to be able to start reading long log not
only from its beginning, and also in particular for Wendelin systems
where logs are uploaded by Fluentd in chunks and some chunks could be
potentially lost.

Sync events are emitted always unconditionally with default sync
interval being 10x the longest specified period. We also provide users a
way to control sync periodicity via explicitly specifying
"meta.sync/period" query in the logspec.

See !3 (comment 175796) and
further for related discussion.

This is change of xlog protocol. But it is early days and the only
direct consumer of xlog is amari.kpi which we adjust accordingly. So it
should be ok.
parent 271fad82
......@@ -47,7 +47,7 @@ class LogMeasure:
# ._rlog IO reader for enb.log
#
# ._estats \/ last xlog.Message with read stats result
# \/ last xlog.Event | LogError
# \/ last xlog.Event\sync | LogError
# \/ None
# ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None
# ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None
......@@ -153,6 +153,10 @@ def _read(logm):
if x is None:
x = LogError.EOF # represent EOF as LogError
# ignore sync events
if isinstance(x, xlog.Event) and x.event == "sync":
continue
# handle messages that update current Measurement
if isinstance(x, xlog.Message):
if x.message == "x.drb_stats":
......@@ -162,7 +166,7 @@ def _read(logm):
continue # ignore other messages
# it is an error, event or stats.
# it is an error, event\sync or stats.
# if it is an event or stats -> finalize timestamp for _m_next.
# start building next _m_next covering [x, x_next).
# shift m <- ._m <- ._m_next <- (new Measurement | None for LogError)
......
......@@ -536,6 +536,35 @@ def test_LogMeasure_cc_wraparound():
readok(4, 10) # 4-5
# verify that LogMeasure ignores syncs in xlog stream.
@func
def test_LogMeasure_sync():
t = tLogMeasure()
defer(t.close)
_ = t.expect1
cc = 'rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum'
t.xlog( jstats(1, {}) )
t.xlog( jstats(2, {cc: 4}) )
t.xlog( '{"meta": {"event": "sync", "time": 2.5, "state": "attached", "reason": "periodic", "generator": "xlog ws://localhost:9001 stats[]/30.0s"}}' )
t.xlog( jstats(3, {cc: 7}) )
def readok(τ, CC_value):
_('X.Tstart', τ)
_('X.δT', int(τ+1)-τ)
if CC_value is not None:
_(CC, CC_value)
else:
t.expect_nodata()
t.read()
readok(0.02, None) # attach-1
readok(1, 4) # 1-2
readok(2, 3) # 2-3 jumping over sync
# jstats returns json-encoded stats message corresponding to counters dict.
# τ goes directly to stats['utc'] as is.
def jstats(τ, counters): # -> str
......
......@@ -45,6 +45,8 @@
# - "service detach" when xlog disconnects from monitored LTE service
# - "service connect failure" when xlog tries to connect to monitored LTE service
# with unsuccessful result.
# - "sync" emitted periodically with current state of
# connection to LTE service and xlog setup
# - "xlog failure" on internal xlog error
......@@ -127,7 +129,22 @@ class LogSpec:
# xlog queries service @wsuri periodically according to queries specified by
# logspecv and logs the result.
def xlog(ctx, wsuri, logspecv):
xl = _XLogger(wsuri, logspecv)
# make sure we always have meta.sync - either the caller specifies it
# explicitly, or we add it automatically to come first with default
# 10x·longest periodicity.
lsync = None
pmax = 1
for (i,l) in enumerate(logspecv):
pmax = max(pmax, l.period)
if l.query == "meta.sync":
lsync = l
logspecv = logspecv[:]
if lsync is None:
lsync = LogSpec("meta.sync", [], pmax*10)
logspecv.insert(0, lsync)
xl = _XLogger(wsuri, logspecv, lsync.period)
slogspecv = ' '.join(['%s' % _ for _ in logspecv])
xl.jemit("start", {"generator": "xlog %s %s" % (wsuri, slogspecv)})
......@@ -146,18 +163,21 @@ def xlog(ctx, wsuri, logspecv):
# e.g. disk full in xl.jemit itself
log.exception('xlog failure (second level):')
δt_reconnect = min(3, lsync.period)
_, _rx = select(
ctx.done().recv, # 0
time.after(3).recv, # 1
time.after(δt_reconnect).recv, # 1
)
if _ == 0:
raise ctx.err()
# _XLogger serves xlog implementation.
class _XLogger:
def __init__(xl, wsuri, logspecv):
def __init__(xl, wsuri, logspecv, δt_sync):
xl.wsuri = wsuri
xl.logspecv = logspecv
xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period
xl.tsync = time.now() # first `start` serves as sync
# emit saves line to the log.
def emit(xl, line):
......@@ -172,9 +192,24 @@ class _XLogger:
d = {"meta": d}
xl.emit(json.dumps(d))
# jemit_sync emits line with sync event to the log.
# TODO logrotate at this point
def jemit_sync(xl, state, args_dict):
tnow = time.now()
d = {"state": state,
"generator": "xlog %s %s" % (xl.wsuri, ' '.join(['%s' % _ for _ in xl.logspecv]))}
d.update(args_dict)
xl.jemit("sync", d)
xl.tsync = tnow
# xlog1 performs one cycle of attach/log,log,log.../detach.
@func
def xlog1(xl, ctx):
# emit sync periodically even in detached state
# this is useful to still know e.g. intended logspec if the service is stopped for a long time
if time.now() - xl.tsync >= xl.δt_sync:
xl.jemit_sync("detached", {})
# connect to the service
try:
conn = amari.connect(ctx, xl.wsuri)
......@@ -215,10 +250,10 @@ class _XLogger:
xsrv_ready.recv()
# spawn main logger
wg.go(xl._xlog1, conn, xmsgsrv_dict)
wg.go(xl._xlog1, conn, xmsgsrv_dict, srv_info)
def _xlog1(xl, ctx, conn, xmsgsrv_dict):
def _xlog1(xl, ctx, conn, xmsgsrv_dict, srv_info):
# req_ queries either amari service directly, or an extra message service.
def req_(ctx, query, opts): # -> resp_raw
if query in xmsgsrv_dict:
......@@ -273,6 +308,9 @@ class _XLogger:
if _ == 0:
raise ctx.err()
if logspec.query == 'meta.sync':
xl.jemit_sync("attached", srv_info)
else:
resp_raw = req_(ctx, logspec.query, opts)
xl.emit(resp_raw)
......@@ -539,6 +577,11 @@ following synthetic queries is also provided:
%s
Additionally the following queries are used to control xlog itself:
meta.sync specify how often synchronization events are emitted
default is 10x the longest period
Options:
-h --help show this help
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment