Commit a2c3afaa authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Implement log rotation

Rotate output enb.xlog ourselves at sync points so that nothing is lost
in the output (hello `logrotate copytruncate`) and so that we can emit
pre- and post- logrotate syncs.

Reuse logging's RotatingFileHandler and TimedRotatingFileHandler to
implement actual rotation, but carefully wrap them in our writer
classes so that we emit exactly the output we prepared explicitly
without any headers prepended by logging, and that we explicitly control
when rotation happens.

/proposed-for-review-at !5
parent 58220253
...@@ -47,7 +47,7 @@ from __future__ import print_function, division, absolute_import ...@@ -47,7 +47,7 @@ from __future__ import print_function, division, absolute_import
# - "service connect failure" when xlog tries to connect to monitored LTE service # - "service connect failure" when xlog tries to connect to monitored LTE service
# with unsuccessful result. # with unsuccessful result.
# - "sync" emitted periodically and when xlogs starts, # - "sync" emitted periodically and when xlogs starts,
# stops (TODO and rotate logs). Comes with current state of # stops and rotates logs. Comes with current state of
# connection to LTE service and xlog setup # connection to LTE service and xlog setup
# - "xlog failure" on internal xlog error # - "xlog failure" on internal xlog error
# #
...@@ -55,14 +55,14 @@ from __future__ import print_function, division, absolute_import ...@@ -55,14 +55,14 @@ from __future__ import print_function, division, absolute_import
LOS_window = 1000 LOS_window = 1000
# TODO log file + rotate # Note about log rotation: we rotate output ourselves at sync points.
# #
# Rejected alternative: automatic rotation by an external tool, e.g. log_proxy: # Rejected alternative: automatic rotation by an external tool, e.g. log_proxy:
# see https://github.com/metwork-framework/log_proxy # see https://github.com/metwork-framework/log_proxy
# and https://superuser.com/questions/291368/log-rotation-of-stdout # and https://superuser.com/questions/291368/log-rotation-of-stdout
# #
# reason for rejection: on every rotation we want to emit "end of file" # reason for rejection: on every rotation we want to emit "pre-logrotate"
# entries to old file + header to new file. # sync to old file + "post-logrotate" sync to new file.
from xlte import amari from xlte import amari
...@@ -71,11 +71,14 @@ from xlte.amari import drb ...@@ -71,11 +71,14 @@ from xlte.amari import drb
import json import json
import traceback import traceback
import io import io
import re
from golang import func, defer, chan, select from golang import func, defer, chan, select
from golang import context, sync, time from golang import context, sync, time
from golang.gcompat import qq from golang.gcompat import qq
import logging; log = logging.getLogger('xlte.amari.xlog') import logging
import logging.handlers
log = logging.getLogger('xlte.amari.xlog')
# LogSpec represents one specification of what to log. # LogSpec represents one specification of what to log.
...@@ -132,10 +135,19 @@ class LogSpec: ...@@ -132,10 +135,19 @@ class LogSpec:
return LogSpec(query, optv, period) return LogSpec(query, optv, period)
# IWriter represents output to where xlog writes its data.
# it is created by _openwriter.
class IWriter:
def writeline(line: str): "writeline emits and flushes line to destination"
def need_rotate() -> bool: "need_rotate returns True when it is time to rotate"
def rotate(): "rotate performs rotation of destination"
rotatespec = "rotatespec indicates rotate specification of the writer"
# xlog queries service @wsuri periodically according to queries specified by # xlog queries service @wsuri periodically according to queries specified by
# logspecv and logs the result. # logspecv and logs the result.
@func @func
def xlog(ctx, wsuri, logspecv): def xlog(ctx, wsuri, w: IWriter, logspecv):
# make sure we always have meta.sync - either the caller specifies it # make sure we always have meta.sync - either the caller specifies it
# explicitly, or we add it automatically to come first with default # explicitly, or we add it automatically to come first with default
# 10x·longest periodicity. Do the same about config_get - by default we # 10x·longest periodicity. Do the same about config_get - by default we
...@@ -168,7 +180,7 @@ def xlog(ctx, wsuri, logspecv): ...@@ -168,7 +180,7 @@ def xlog(ctx, wsuri, logspecv):
"which is > LOS_window (%d)" % (ns, LOS_window)) "which is > LOS_window (%d)" % (ns, LOS_window))
# ready to start logging # ready to start logging
xl = _XLogger(wsuri, logspecv, lsync.period) xl = _XLogger(wsuri, w, logspecv, lsync.period)
# emit sync at start/stop # emit sync at start/stop
xl.jemit_sync("detached", "start", {}) xl.jemit_sync("detached", "start", {})
...@@ -200,8 +212,9 @@ def xlog(ctx, wsuri, logspecv): ...@@ -200,8 +212,9 @@ def xlog(ctx, wsuri, logspecv):
# _XLogger serves xlog implementation. # _XLogger serves xlog implementation.
class _XLogger: class _XLogger:
def __init__(xl, wsuri, logspecv, δt_sync): def __init__(xl, wsuri, w, logspecv, δt_sync):
xl.wsuri = wsuri xl.wsuri = wsuri
xl.w = w
xl.logspecv = logspecv xl.logspecv = logspecv
xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period
xl.tsync = float('-inf') # never yet xl.tsync = float('-inf') # never yet
...@@ -210,7 +223,7 @@ class _XLogger: ...@@ -210,7 +223,7 @@ class _XLogger:
def emit(xl, line): def emit(xl, line):
assert isinstance(line, str) assert isinstance(line, str)
assert '\n' not in line, line assert '\n' not in line, line
print(line, flush=True) xl.w.writeline(line)
# jemit emits line corresponding to event to the log. # jemit emits line corresponding to event to the log.
def jemit(xl, event, args_dict): def jemit(xl, event, args_dict):
...@@ -220,15 +233,28 @@ class _XLogger: ...@@ -220,15 +233,28 @@ class _XLogger:
xl.emit(json.dumps(d)) xl.emit(json.dumps(d))
# jemit_sync emits line with sync event to the log. # jemit_sync emits line with sync event to the log.
# TODO logrotate at this point # the output is rotated at sync point if it is time to rotate.
def jemit_sync(xl, state, reason, args_dict): def jemit_sync(xl, state, reason, args_dict):
tnow = time.now() tnow = time.now()
d = {"state": state, d = {"state": state,
"reason": reason, "reason": reason,
"generator": "xlog %s %s" % (xl.wsuri, ' '.join(['%s' % _ for _ in xl.logspecv]))} "flags": "",
"generator": "xlog %s%s %s" % (
'--rotate %s ' % xl.w.rotatespec if xl.w.rotatespec else '',
xl.wsuri,
' '.join(['%s' % _ for _ in xl.logspecv]))}
d.update(args_dict) d.update(args_dict)
rotate = xl.w.need_rotate()
if rotate:
d["flags"] = "pre-logrotate"
xl.jemit("sync", d) xl.jemit("sync", d)
xl.tsync = tnow xl.tsync = tnow
if rotate:
xl.w.rotate()
# emit "post-logrotate" sync right after rotation so that new log
# chunk starts afresh with sync.
d["flags"] = "post-logrotate"
xl.jemit("sync", d)
# xlog1 performs one cycle of attach/log,log,log.../detach. # xlog1 performs one cycle of attach/log,log,log.../detach.
@func @func
...@@ -434,6 +460,65 @@ def _xmsg(name, f, doc1): ...@@ -434,6 +460,65 @@ def _xmsg(name, f, doc1):
_xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bearers") _xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bearers")
# _openwriter opens destination log file for writing.
# the file is configured to be logrotated according to rotatespec.
def _openwriter(path: str, rotatespec: str|None) -> IWriter:
if rotatespec is None:
return _PlainWriter(path)
# parse rotatespec
# <X>(KB|MB|GB|sec|min|hour|day)[.nbackup]
m = re.match(r"(?P<X>[0-9]+)((?P<size>[KMG]B)|(?P<time>(sec|min|hour|day)))"
r"\.(?P<nbackup>[0-9]+)$", rotatespec)
if m is None:
raise ValueError("invalid rotatespec %s" % qq(rotatespec))
x = int(m.group("X"))
nbackup = int(m.group("nbackup"))
size = m.group("size")
time = m.group("time")
kw = {}
kw["backupCount"] = nbackup
if size is not None:
kw["maxBytes"] = x * {'KB':1<<10, 'MB':1<<20, 'GB':1<<30}[size]
logh = logging.handlers.RotatingFileHandler(path, **kw)
else:
assert time is not None
kw["interval"] = x
kw["when"] = {'sec':'S', 'min':'M', 'hour':'H', 'day':'D'}[time]
logh = logging.handlers.TimedRotatingFileHandler(path, utc=True, **kw)
return _RotatingWriter(logh, rotatespec)
# _PlainWriter implements writer that emits data to plain file without rotation.
class _PlainWriter(IWriter):
def __init__(w, path):
w.f = open(path, "w")
def writeline(w, line: str):
w.f.write(line+'\n')
w.f.flush()
def need_rotate(w): return False
def rotate(w): pass
rotatespec = None
# _RotatingWriter implements writer on top logging's RotatingFileHandler or TimedRotatingFileHandler.
class _RotatingWriter(IWriter):
def __init__(w, logh: logging.handlers.BaseRotatingHandler, rotatespec: str):
w.logh = logh
w.rotatespec = rotatespec
logh.format = lambda line: line # tune logging not to add its headers
def writeline(w, line: str):
# go directly to underlying FileHandler.emit to skip automatic rollover
# in BaseRotatingHandler.emit . Note: emit adds '\n' and does flush.
logging.FileHandler.emit(w.logh, line)
def need_rotate(w): return w.logh.shouldRollover('')
def rotate(w): w.logh.doRollover()
# ---------------------------------------- # ----------------------------------------
...@@ -792,13 +877,14 @@ summary = "maintain extra log for a service" ...@@ -792,13 +877,14 @@ summary = "maintain extra log for a service"
def usage(out): def usage(out):
print("""\ print("""\
Usage: xamari xlog [OPTIONS] <wsuri> <logspec>+ Usage: xamari xlog [OPTIONS] <wsuri> <output> <logspec>+
Maintain extra log for a service. Maintain extra log for a service.
The service is queried periodically according to logspec and results are saved The service is queried periodically according to logspec and results are saved
in JSON format to a file (see 'xamari help jsonlog'). in JSON format to output file (see 'xamari help jsonlog').
<wsuri> is URI (see 'xamari help websock') of an Amarisoft-service. <wsuri> is URI (see 'xamari help websock') of an Amarisoft-service.
<output> is path to output file.
<logspec> is specification of what to log. It has the following parts: <logspec> is specification of what to log. It has the following parts:
<query>[<options>]/<period> <query>[<options>]/<period>
...@@ -827,7 +913,9 @@ Additionally the following queries are used to control xlog itself: ...@@ -827,7 +913,9 @@ Additionally the following queries are used to control xlog itself:
Options: Options:
-h --help show this help --rotate <rotatespec> rotate output approximately according to rotatespec
rotatespec is <X>(KB|MB|GB|sec|min|hour|day)[.nbackup]
-h --help show this help
""" % (LogSpec.DEFAULT_PERIOD, """ % (LogSpec.DEFAULT_PERIOD,
'\n'.join(" %-14s %s" % (q, f.xlog_doc1) '\n'.join(" %-14s %s" % (q, f.xlog_doc1)
for q, f in sorted(_xmsg_registry.items()))), for q, f in sorted(_xmsg_registry.items()))),
...@@ -836,25 +924,30 @@ file=out) ...@@ -836,25 +924,30 @@ file=out)
def main(ctx, argv): def main(ctx, argv):
try: try:
optv, argv = getopt.getopt(argv[1:], "h", ["help"]) optv, argv = getopt.getopt(argv[1:], "h", ["rotate=", "help"])
except getopt.GetoptError as e: except getopt.GetoptError as e:
print(e, file=sys.stderr) print(e, file=sys.stderr)
usage(sys.stderr) usage(sys.stderr)
sys.exit(2) sys.exit(2)
rotatespec = None
for opt, arg in optv: for opt, arg in optv:
if opt in ( "--rotate"):
rotatespec = arg
if opt in ("-h", "--help"): if opt in ("-h", "--help"):
usage(sys.stdout) usage(sys.stdout)
sys.exit(0) sys.exit(0)
if len(argv) < 2: if len(argv) < 3:
usage(sys.stderr) usage(sys.stderr)
sys.exit(2) sys.exit(2)
wsuri = argv[0] wsuri = argv[0]
output = argv[1]
logspecv = [] logspecv = []
for arg in argv[1:]: for arg in argv[2:]:
logspecv.append( LogSpec.parse(arg) ) logspecv.append( LogSpec.parse(arg) )
xlog(ctx, wsuri, logspecv) w = _openwriter(output, rotatespec)
xlog(ctx, wsuri, w, logspecv)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment