Commit f5626f73 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Emit config_get after every sync(attached) instead of only after every attach

We emit config_get after every attach from the beginning of xlog in
e0cc8a38 (amari.xlog: Initial draft). The reasoning here is that it is
useful by default to know configuration of a service.

In the previous patch we added sync events so that xlog stream becomes
self-synchronizing. To continue that line it is now useful to have that
config_get emitted after every such synchronization point instead of
only after attaching to the service. That's what hereby patch does.

As a bonus the code is reworked in a way that config_get setup is not
hardcoded anymore and config_get periodicity now can be controlled by
users via explicitly specifying config_get in the logspec.
parent 1843f1a8
...@@ -131,17 +131,26 @@ class LogSpec: ...@@ -131,17 +131,26 @@ class LogSpec:
def xlog(ctx, wsuri, logspecv): def xlog(ctx, wsuri, logspecv):
# make sure we always have meta.sync - either the caller specifies it # make sure we always have meta.sync - either the caller specifies it
# explicitly, or we add it automatically to come first with default # explicitly, or we add it automatically to come first with default
# 10x·longest periodicity. # 10x·longest periodicity. Do the same about config_get - by default we
# want it to be present after every sync.
lsync = None lsync = None
isync = None
lconfig_get = None
pmax = 1 pmax = 1
for (i,l) in enumerate(logspecv): for (i,l) in enumerate(logspecv):
pmax = max(pmax, l.period) pmax = max(pmax, l.period)
if l.query == "meta.sync": if l.query == "meta.sync":
isync = i
lsync = l lsync = l
if l.query == "config_get":
lconfig_get = l
logspecv = logspecv[:] logspecv = logspecv[:]
if lsync is None: if lsync is None:
isync = 0
lsync = LogSpec("meta.sync", [], pmax*10) lsync = LogSpec("meta.sync", [], pmax*10)
logspecv.insert(0, lsync) logspecv.insert(0, lsync)
if lconfig_get is None:
logspecv.insert(isync+1, LogSpec("config_get", [], lsync.period))
xl = _XLogger(wsuri, logspecv, lsync.period) xl = _XLogger(wsuri, logspecv, lsync.period)
...@@ -263,10 +272,6 @@ class _XLogger: ...@@ -263,10 +272,6 @@ class _XLogger:
_, resp_raw = conn.req_(ctx, query, opts) _, resp_raw = conn.req_(ctx, query, opts)
return resp_raw return resp_raw
# emit config_get after attach
cfg_raw = req_(ctx, 'config_get', {})
xl.emit(cfg_raw)
# loop emitting requested logspecs # loop emitting requested logspecs
t0 = time.now() t0 = time.now()
tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0 tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment