Commit aa4d621d authored by Julien Muchembled's avatar Julien Muchembled

New log format to show node id (and optionally cluster name) in node column

neolog has new options: -N for old behaviour, and -C to show the cluster name.
parent 8ef1ddba
...@@ -101,3 +101,8 @@ class BaseApplication(object): ...@@ -101,3 +101,8 @@ class BaseApplication(object):
self.nm.close() self.nm.close()
self.em.close() self.em.close()
self.__dict__.clear() self.__dict__.clear()
def setUUID(self, uuid):
if self.uuid != uuid:
self.uuid = uuid
logging.node(self.name, uuid)
...@@ -175,9 +175,7 @@ class EventHandler(object): ...@@ -175,9 +175,7 @@ class EventHandler(object):
if your_uuid is None: if your_uuid is None:
raise ProtocolError('No UUID supplied') raise ProtocolError('No UUID supplied')
logging.info('connected to a primary master node') logging.info('connected to a primary master node')
if app.uuid != your_uuid: app.setUUID(your_uuid)
app.uuid = your_uuid
logging.info('Got a new UUID: %s', uuid_str(your_uuid))
app.id_timestamp = None app.id_timestamp = None
elif node.getUUID() != uuid or app.uuid != your_uuid != None: elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids') raise ProtocolError('invalid uuids')
......
...@@ -22,6 +22,9 @@ from time import time ...@@ -22,6 +22,9 @@ from time import time
from traceback import format_exception from traceback import format_exception
import bz2, inspect, neo, os, signal, sqlite3, sys, threading import bz2, inspect, neo, os, signal, sqlite3, sys, threading
from .util import nextafter
INF = float('inf')
# Stats for storage node of matrix test (py2.7:SQLite) # Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE = ( 234360832 # extra memory used RECORD_SIZE = ( 234360832 # extra memory used
- 16777264 # sum of raw data ('msg' attribute) - 16777264 # sum of raw data ('msg' attribute)
...@@ -59,9 +62,8 @@ class NEOLogger(Logger): ...@@ -59,9 +62,8 @@ class NEOLogger(Logger):
self.parent = root = getLogger() self.parent = root = getLogger()
if not root.handlers: if not root.handlers:
root.addHandler(self.default_root_handler) root.addHandler(self.default_root_handler)
self._db = None self.__reset()
self._record_queue = deque() self._nid_dict = {}
self._record_size = 0
self._async = set() self._async = set()
l = threading.Lock() l = threading.Lock()
self._acquire = l.acquire self._acquire = l.acquire
...@@ -75,6 +77,12 @@ class NEOLogger(Logger): ...@@ -75,6 +77,12 @@ class NEOLogger(Logger):
self._release = _release self._release = _release
self.backlog() self.backlog()
def __reset(self):
self._db = None
self._node = {}
self._record_queue = deque()
self._record_size = 0
def __enter__(self): def __enter__(self):
self._acquire() self._acquire()
return self._db return self._db
...@@ -149,9 +157,7 @@ class NEOLogger(Logger): ...@@ -149,9 +157,7 @@ class NEOLogger(Logger):
if self._db is not None: if self._db is not None:
self._db.close() self._db.close()
if not filename: if not filename:
self._db = None self.__reset()
self._record_queue.clear()
self._record_size = 0
return return
if filename: if filename:
self._db = sqlite3.connect(filename, check_same_thread=False) self._db = sqlite3.connect(filename, check_same_thread=False)
...@@ -161,45 +167,52 @@ class NEOLogger(Logger): ...@@ -161,45 +167,52 @@ class NEOLogger(Logger):
if 1: # Not only when logging everything, if 1: # Not only when logging everything,
# but also for interoperability with logrotate. # but also for interoperability with logrotate.
q("PRAGMA journal_mode = MEMORY") q("PRAGMA journal_mode = MEMORY")
if reset: for t, columns in (('log', (
for t in 'log', 'packet': "level INTEGER NOT NULL",
"pathname TEXT",
"lineno INTEGER",
"msg TEXT",
)),
('packet', (
"msg_id INTEGER NOT NULL",
"code INTEGER NOT NULL",
"peer TEXT NOT NULL",
"body BLOB",
))):
if reset:
q('DROP TABLE IF EXISTS ' + t) q('DROP TABLE IF EXISTS ' + t)
q("""CREATE TABLE IF NOT EXISTS log ( q('DROP TABLE IF EXISTS %s1' % t)
id INTEGER PRIMARY KEY AUTOINCREMENT, elif (2, 'name', 'TEXT', 0, None, 0) in q(
date REAL NOT NULL, "PRAGMA table_info(%s)" % t):
name TEXT, q("ALTER TABLE %s RENAME TO %s1" % (t, t))
level INTEGER NOT NULL, columns = (
pathname TEXT, "date REAL PRIMARY KEY",
lineno INTEGER, "node INTEGER",
msg TEXT) ) + columns
q("CREATE TABLE IF NOT EXISTS %s (\n %s) WITHOUT ROWID"
% (t, ',\n '.join(columns)))
q("""CREATE TABLE IF NOT EXISTS protocol (
date REAL PRIMARY KEY,
text BLOB NOT NULL) WITHOUT ROWID
""") """)
q("""CREATE INDEX IF NOT EXISTS _log_i1 ON log(date)""") q("""CREATE TABLE IF NOT EXISTS node (
q("""CREATE TABLE IF NOT EXISTS packet ( id INTEGER PRIMARY KEY,
id INTEGER PRIMARY KEY AUTOINCREMENT,
date REAL NOT NULL,
name TEXT, name TEXT,
msg_id INTEGER NOT NULL, cluster TEXT,
code INTEGER NOT NULL, nid INTEGER)
peer TEXT NOT NULL,
body BLOB)
""")
q("""CREATE INDEX IF NOT EXISTS _packet_i1 ON packet(date)""")
q("""CREATE TABLE IF NOT EXISTS protocol (
date REAL PRIMARY KEY NOT NULL,
text BLOB NOT NULL)
""") """)
with open(inspect.getsourcefile(p)) as p: with open(inspect.getsourcefile(p)) as p:
p = buffer(bz2.compress(p.read())) p = buffer(bz2.compress(p.read()))
for t, in q("SELECT text FROM protocol ORDER BY date DESC"): x = q("SELECT text FROM protocol ORDER BY date DESC LIMIT 1"
if p == t: ).fetchone()
break if (x and x[0]) != p:
else:
try: try:
t = self._record_queue[0].created x = self._record_queue[0].created
except IndexError: except IndexError:
t = time() x = time()
with self._db: q("INSERT INTO protocol VALUES (?,?)", (x, p))
q("INSERT INTO protocol VALUES (?,?)", (t, p)) self._db.commit()
self._node = {x[1:]: x[0] for x in q("SELECT * FROM node")}
def setup(self, filename=None, reset=False): def setup(self, filename=None, reset=False):
with self: with self:
...@@ -217,6 +230,20 @@ class NEOLogger(Logger): ...@@ -217,6 +230,20 @@ class NEOLogger(Logger):
return True return True
def _emit(self, r): def _emit(self, r):
try:
nid = self._node[r._node]
except KeyError:
if r._node == (None, None, None):
nid = None
else:
try:
nid = 1 + max(x for x in self._node.itervalues()
if x is not None)
except ValueError:
nid = 0
self._db.execute("INSERT INTO node VALUES (?,?,?,?)",
(nid,) + r._node)
self._node[r._node] = nid
if type(r) is PacketRecord: if type(r) is PacketRecord:
ip, port = r.addr ip, port = r.addr
peer = ('%s %s ([%s]:%s)' if ':' in ip else '%s %s (%s:%s)') % ( peer = ('%s %s ([%s]:%s)' if ':' in ip else '%s %s (%s:%s)') % (
...@@ -224,15 +251,22 @@ class NEOLogger(Logger): ...@@ -224,15 +251,22 @@ class NEOLogger(Logger):
msg = r.msg msg = r.msg
if msg is not None: if msg is not None:
msg = buffer(msg) msg = buffer(msg)
self._db.execute("INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)", q = "INSERT INTO packet VALUES (?,?,?,?,?,?)"
(r.created, r._name, r.msg_id, r.code, peer, msg)) x = [r.created, nid, r.msg_id, r.code, peer, msg]
else: else:
pathname = os.path.relpath(r.pathname, *neo.__path__) pathname = os.path.relpath(r.pathname, *neo.__path__)
self._db.execute("INSERT INTO log VALUES (NULL,?,?,?,?,?,?)", q = "INSERT INTO log VALUES (?,?,?,?,?,?)"
(r.created, r._name, r.levelno, pathname, r.lineno, r.msg)) x = [r.created, nid, r.levelno, pathname, r.lineno, r.msg]
while 1:
try:
self._db.execute(q, x)
break
except sqlite3.IntegrityError:
x[0] = nextafter(x[0], INF)
def _queue(self, record): def _queue(self, record):
record._name = self.name and str(self.name) name = self.name and str(self.name)
record._node = (name,) + self._nid_dict.get(name, (None, None))
self._acquire() self._acquire()
try: try:
if self._max_size is None: if self._max_size is None:
...@@ -277,6 +311,14 @@ class NEOLogger(Logger): ...@@ -277,6 +311,14 @@ class NEOLogger(Logger):
addr=connection.getAddress(), addr=connection.getAddress(),
msg=body)) msg=body))
def node(self, *cluster_nid):
name = self.name and str(self.name)
prev = self._nid_dict.get(name)
if prev != cluster_nid:
from .protocol import uuid_str
self.info('Node ID: %s', uuid_str(cluster_nid[1]))
self._nid_dict[name] = cluster_nid
logging = NEOLogger() logging = NEOLogger()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush()) signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
......
...@@ -23,6 +23,20 @@ from Queue import deque ...@@ -23,6 +23,20 @@ from Queue import deque
from struct import pack, unpack, Struct from struct import pack, unpack, Struct
from time import gmtime from time import gmtime
# https://stackoverflow.com/a/6163157
def nextafter():
global nextafter
from ctypes import CDLL, util as ctypes_util, c_double
from time import time
_libm = CDLL(ctypes_util.find_library('m'))
nextafter = _libm.nextafter
nextafter.restype = c_double
nextafter.argtypes = c_double, c_double
x = time()
y = nextafter(x, float('inf'))
assert x < y and (x+y)/2 in (x,y), (x, y)
nextafter()
TID_LOW_OVERFLOW = 2**32 TID_LOW_OVERFLOW = 2**32
TID_LOW_MAX = TID_LOW_OVERFLOW - 1 TID_LOW_MAX = TID_LOW_OVERFLOW - 1
SECOND_PER_TID_LOW = 60.0 / TID_LOW_OVERFLOW SECOND_PER_TID_LOW = 60.0 / TID_LOW_OVERFLOW
......
...@@ -21,7 +21,7 @@ from time import time ...@@ -21,7 +21,7 @@ from time import time
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.app import BaseApplication, buildOptionParser from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID from neo.lib.protocol import UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.connection import ListeningConnection, ClientConnection from neo.lib.connection import ListeningConnection, ClientConnection
...@@ -58,7 +58,7 @@ class Application(BaseApplication): ...@@ -58,7 +58,7 @@ class Application(BaseApplication):
backup_app = None backup_app = None
truncate_tid = None truncate_tid = None
def uuid(self, uuid): def setUUID(self, uuid):
node = self.nm.getByUUID(uuid) node = self.nm.getByUUID(uuid)
if node is not self._node: if node is not self._node:
if node: if node:
...@@ -66,7 +66,8 @@ class Application(BaseApplication): ...@@ -66,7 +66,8 @@ class Application(BaseApplication):
if node.isConnected(True): if node.isConnected(True):
node.getConnection().close() node.getConnection().close()
self._node.setUUID(uuid) self._node.setUUID(uuid)
uuid = property(lambda self: self._node.getUUID(), uuid) logging.node(self.name, uuid)
uuid = property(lambda self: self._node.getUUID(), setUUID)
@property @property
def election(self): def election(self):
...@@ -264,7 +265,6 @@ class Application(BaseApplication): ...@@ -264,7 +265,6 @@ class Application(BaseApplication):
if self.uuid is None: if self.uuid is None:
self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER) self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER)
logging.info('My UUID: ' + uuid_str(self.uuid))
self._node.setRunning() self._node.setRunning()
self._node.id_timestamp = None self._node.id_timestamp = None
self.primary = monotonic_time() self.primary = monotonic_time()
......
...@@ -81,6 +81,11 @@ class BackupApplication(object): ...@@ -81,6 +81,11 @@ class BackupApplication(object):
self.nm.close() self.nm.close()
del self.__dict__ del self.__dict__
def setUUID(self, uuid):
if self.uuid != uuid:
self.uuid = uuid
logging.info('Upstream Node ID: %s', uuid_str(uuid))
def log(self): def log(self):
self.nm.log() self.nm.log()
if self.pt is not None: if self.pt is not None:
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
import argparse, bz2, gzip, errno, os, signal, sqlite3, sys, time import argparse, bz2, gzip, errno, os, signal, sqlite3, sys, time
from bisect import insort from bisect import insort
from itertools import chain
from logging import getLevelName from logging import getLevelName
from zlib import decompress from zlib import decompress
...@@ -26,16 +27,20 @@ comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile, xz='xzcat') ...@@ -26,16 +27,20 @@ comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile, xz='xzcat')
class Log(object): class Log(object):
_log_id = _packet_id = -1 _log_date = _packet_date = 0
_protocol_date = None _protocol_date = None
def __init__(self, db_path, decode=0, date_format=None, def __init__(self, db_path, decode=0, date_format=None,
filter_from=None, node_column=True, node_list=None): filter_from=None, show_cluster=False, no_nid=False,
node_column=True, node_list=None):
self._date_format = '%F %T' if date_format is None else date_format self._date_format = '%F %T' if date_format is None else date_format
self._decode = decode self._decode = decode
self._filter_from = filter_from self._filter_from = filter_from
self._no_nid = no_nid
self._node_column = node_column self._node_column = node_column
self._node_list = node_list self._node_list = node_list
self._node_dict = {}
self._show_cluster = show_cluster
name = os.path.basename(db_path) name = os.path.basename(db_path)
try: try:
name, ext = name.rsplit(os.extsep, 1) name, ext = name.rsplit(os.extsep, 1)
...@@ -57,35 +62,61 @@ class Log(object): ...@@ -57,35 +62,61 @@ class Log(object):
self._default_name = name self._default_name = name
def __iter__(self): def __iter__(self):
db = self._db q = self._db.execute
try: try:
db.execute("BEGIN") q("BEGIN")
yield yield
nl = "SELECT * FROM log WHERE id>?"
np = "SELECT * FROM packet WHERE id>?"
date = self._filter_from date = self._filter_from
if date: if date and max(self._log_date, self._packet_date) < date:
date = " AND date>=%f" % date log_args = packet_args = date,
nl += date date = " WHERE date>=?"
np += date else:
nl = db.execute(nl, (self._log_id,)) self._filter_from = None
np = db.execute(np, (self._packet_id,)) log_args = self._log_date,
packet_args = self._packet_date,
date = " WHERE date>?"
old = "SELECT date, name, NULL, NULL, %s FROM %s" + date
new = ("SELECT date, name, cluster, nid, %s"
" FROM %s JOIN node ON node=id" + date)
log = 'level, pathname, lineno, msg'
pkt = 'msg_id, code, peer, body'
try:
nl = q(new % (log, 'log'), log_args)
except sqlite3.OperationalError:
nl = q(old % (log, 'log'), log_args)
np = q(old % (pkt, 'packet'), packet_args)
else:
np = q(new % (pkt, 'packet'), packet_args)
try:
nl = chain(q(old % (log, 'log1'), log_args), nl)
np = chain(q(old % (pkt, 'packet1'), packet_args), np)
except sqlite3.OperationalError:
pass
try: try:
p = np.next() p = np.next()
self._reload(p[1]) self._reload(p[0])
except StopIteration: except StopIteration:
p = None p = None
for self._log_id, date, name, level, pathname, lineno, msg in nl: for date, name, cluster, nid, level, pathname, lineno, msg in nl:
while p and p[1] < date: while p and p[0] < date:
yield self._packet(*p) yield self._packet(*p)
p = np.fetchone() p = next(np, None)
yield date, name, getLevelName(level), msg.splitlines() self._log_date = date
yield (date, self._node(name, cluster, nid),
getLevelName(level), msg.splitlines())
if p: if p:
yield self._packet(*p) yield self._packet(*p)
for p in np: for p in np:
yield self._packet(*p) yield self._packet(*p)
finally: finally:
db.rollback() self._db.rollback()
def _node(self, name, cluster, nid):
if nid and not self._no_nid:
name = self.uuid_str(nid)
if self._show_cluster:
name = cluster + '/' + name
return name
def _reload(self, date): def _reload(self, date):
q = self._db.execute q = self._db.execute
...@@ -143,8 +174,8 @@ class Log(object): ...@@ -143,8 +174,8 @@ class Log(object):
for msg in msg_list: for msg in msg_list:
print prefix + msg print prefix + msg
def _packet(self, id, date, name, msg_id, code, peer, body): def _packet(self, date, name, cluster, nid, msg_id, code, peer, body):
self._packet_id = id self._packet_date = date
if self._next_protocol <= date: if self._next_protocol <= date:
self._reload(date) self._reload(date)
try: try:
...@@ -174,7 +205,7 @@ class Log(object): ...@@ -174,7 +205,7 @@ class Log(object):
args = self._decompress(args, path) args = self._decompress(args, path)
if args and self._decode: if args and self._decode:
msg[0] += ' \t| ' + repr(args) msg[0] += ' \t| ' + repr(args)
return date, name, 'PACKET', msg return date, self._node(name, cluster, nid), 'PACKET', msg
def _decompress(self, args, path): def _decompress(self, args, path):
if args: if args:
...@@ -250,6 +281,11 @@ def main(): ...@@ -250,6 +281,11 @@ def main():
' if N < 0; N can also be a string that is parseable by dateutil') ' if N < 0; N can also be a string that is parseable by dateutil')
_('file', nargs='+', _('file', nargs='+',
help='log file, compressed (gz, bz2 or xz) or not') help='log file, compressed (gz, bz2 or xz) or not')
_ = parser.add_mutually_exclusive_group().add_argument
_('-C', '--cluster', action="store_true",
help='show cluster name in node column')
_('-N', '--no-nid', action="store_true",
help='always show node name (instead of NID) in node column')
args = parser.parse_args() args = parser.parse_args()
if args.sleep_interval <= 0: if args.sleep_interval <= 0:
parser.error("sleep_interval must be positive") parser.error("sleep_interval must be positive")
...@@ -275,7 +311,8 @@ def main(): ...@@ -275,7 +311,8 @@ def main():
node_column = True node_column = True
log_list = [Log(db_path, log_list = [Log(db_path,
2 if args.decompress else 1 if args.all else 0, 2 if args.decompress else 1 if args.all else 0,
args.date, filter_from, node_column, node_list) args.date, filter_from, args.cluster, args.no_nid,
node_column, node_list)
for db_path in args.file] for db_path in args.file]
if args.follow: if args.follow:
try: try:
......
...@@ -19,8 +19,7 @@ from collections import deque ...@@ -19,8 +19,7 @@ from collections import deque
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication, buildOptionParser from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.protocol import uuid_str, \ from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets
CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.connection import ListeningConnection from neo.lib.connection import ListeningConnection
from neo.lib.exception import StoppedOperation, PrimaryFailure from neo.lib.exception import StoppedOperation, PrimaryFailure
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
...@@ -122,6 +121,7 @@ class Application(BaseApplication): ...@@ -122,6 +121,7 @@ class Application(BaseApplication):
# force node uuid from command line argument, for testing purpose only # force node uuid from command line argument, for testing purpose only
if 'uuid' in config: if 'uuid' in config:
self.uuid = config['uuid'] self.uuid = config['uuid']
logging.node(self.name, self.uuid)
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
...@@ -157,6 +157,7 @@ class Application(BaseApplication): ...@@ -157,6 +157,7 @@ class Application(BaseApplication):
# load configuration # load configuration
self.uuid = dm.getUUID() self.uuid = dm.getUUID()
logging.node(self.name, self.uuid)
num_partitions = dm.getNumPartitions() num_partitions = dm.getNumPartitions()
num_replicas = dm.getNumReplicas() num_replicas = dm.getNumReplicas()
ptid = dm.getPTID() ptid = dm.getPTID()
...@@ -169,7 +170,6 @@ class Application(BaseApplication): ...@@ -169,7 +170,6 @@ class Application(BaseApplication):
self.pt = PartitionTable(num_partitions, num_replicas) self.pt = PartitionTable(num_partitions, num_replicas)
logging.info('Configuration loaded:') logging.info('Configuration loaded:')
logging.info('UUID : %s', uuid_str(self.uuid))
logging.info('PTID : %s', dump(ptid)) logging.info('PTID : %s', dump(ptid))
logging.info('Name : %s', self.name) logging.info('Name : %s', self.name)
logging.info('Partitions: %s', num_partitions) logging.info('Partitions: %s', num_partitions)
...@@ -254,9 +254,7 @@ class Application(BaseApplication): ...@@ -254,9 +254,7 @@ class Application(BaseApplication):
self.devpath) self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \ self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection() bootstrap.getPrimaryConnection()
uuid = self.uuid self.dm.setUUID(self.uuid)
logging.info('I am %s', uuid_str(uuid))
self.dm.setUUID(uuid)
# Reload a partition table from the database. This is necessary # Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition # when a previous primary master died while sending a partition
......
...@@ -1027,6 +1027,7 @@ class NEOThreadedTest(NeoTestBase): ...@@ -1027,6 +1027,7 @@ class NEOThreadedTest(NeoTestBase):
self.__run_count[test_id] = 1 + i self.__run_count[test_id] = 1 + i
if i: if i:
test_id += '-%s' % i test_id += '-%s' % i
logging._nid_dict.clear()
logging.setup(os.path.join(getTempDirectory(), test_id + '.log')) logging.setup(os.path.join(getTempDirectory(), test_id + '.log'))
return LoggerThreadName() return LoggerThreadName()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment