Commit 1fce5cc4 authored by Julien Muchembled's avatar Julien Muchembled

Review logging to keep all debugging information in RAM and flush only if useful

The main goal of this patch is to keep all DEBUG logs and packet logger enabled
without exploding disk usage.
This is done by keeping the last 16 MB (by default) of debugging information in
a RAM buffer, and to emit it to an SQLite DB upon RTMIN signal or in case of
warning and more severe log.

Implementation is also cleaned up for better integration within a framework
or if run standalone. NEO logger is now a direct child of root handler.
Only warnings and more severe logs are forwarded to root handler.

A new script 'neolog' is added to pretty-print the contents of the SQLite log.

In unit tests, logging events are not buffered but emitted immediately.
When a test passes, payloads of all exchanged packets are discarded to reduce
disk usage on test bots.

This slows down performance tests by about 15 % because even if nothing is
written to disk, debug and packet log records are now always rendered.
parent 4df3c4e1
......@@ -20,7 +20,6 @@ import ZODB.interfaces
import neo.lib
from functools import wraps
from neo.lib import setupLog
from neo.lib.util import add64
from neo.lib.protocol import ZERO_TID
from .app import Application
......@@ -63,7 +62,7 @@ class Storage(BaseStorage.BaseStorage,
)))
def __init__(self, master_nodes, name, read_only=False,
compress=None, logfile=None, verbose=False, _app=None,
compress=None, logfile=None, _app=None,
dynamic_master_list=None, **kw):
"""
Do not pass those parameters (used internally):
......@@ -71,7 +70,8 @@ class Storage(BaseStorage.BaseStorage,
"""
if compress is None:
compress = True
setupLog('CLIENT', filename=logfile, verbose=verbose)
if logfile:
neo.lib.logging.setup(logfile)
BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
......@@ -84,8 +84,6 @@ class Storage(BaseStorage.BaseStorage,
self._init_kw = {
'read_only': read_only,
'compress': compress,
'logfile': logfile,
'verbose': verbose,
'dynamic_master_list': dynamic_master_list,
'_app': _app,
}
......
......@@ -27,9 +27,9 @@
and is still allowed on a read-only neostorage.
</description>
</key>
<key name="verbose" datatype="boolean">
<key name="logfile" datatype="path">
<description>
Log debugging information
Log debugging information to specified SQLite DB.
</description>
</key>
<key name="dynamic_master_list" datatype="path">
......
#
# Copyright (C) 2006-2011 Nexedi SA
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,49 +14,4 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as logging_std
FMT = ('%(asctime)s %(levelname)-9s %(name)-10s'
' [%(module)14s:%(lineno)3d] \n%(message)s')
class Formatter(logging_std.Formatter):
def formatTime(self, record, datefmt=None):
return logging_std.Formatter.formatTime(self, record,
'%Y-%m-%d %H:%M:%S') + '.%04d' % (record.msecs * 10)
def format(self, record):
lines = iter(logging_std.Formatter.format(self, record).splitlines())
prefix = lines.next()
return '\n'.join(prefix + line for line in lines)
def setupLog(name='NEO', filename=None, verbose=False):
global logging
if verbose:
level = logging_std.DEBUG
else:
level = logging_std.INFO
if logging is not None:
for handler in logging.handlers:
handler.acquire()
try:
handler.close()
finally:
handler.release()
del logging.manager.loggerDict[logging.name]
logging = logging_std.getLogger(name)
for handler in logging.handlers[:]:
logging.removeHandler(handler)
if filename is None:
handler = logging_std.StreamHandler()
else:
handler = logging_std.FileHandler(filename)
handler.setFormatter(Formatter(FMT))
logging.setLevel(level)
logging.addHandler(handler)
logging.propagate = 0
# Create default logger
logging = None
setupLog()
from .logger import logging
......@@ -72,9 +72,6 @@ class ConfigurationManager(object):
assert cluster != '', "Cluster name must be non-empty"
return cluster
def getName(self):
return self.__get('name')
def getReplicas(self):
return int(self.__get('replicas'))
......
......@@ -23,7 +23,6 @@ from .connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException, \
ConnectorConnectionClosedException
from .locking import RLock
from .logger import PACKET_LOGGER
from .profiling import profiler_decorator
from .protocol import Errors, PacketMalformedError, Packets, ParserState
from .util import dump, ReadBuffer
......@@ -166,7 +165,7 @@ class HandlerSwitcher(object):
@profiler_decorator
def _handle(self, connection, packet):
assert len(self._pending) == 1 or self._pending[0][0]
PACKET_LOGGER.dispatch(connection, packet, False)
neo.lib.logging.packet(connection, packet, False)
if connection.isClosed() and packet.ignoreOnClosedConnection():
neo.lib.logging.debug('Ignoring packet %r on closed connection %r',
packet, connection)
......@@ -638,7 +637,7 @@ class Connection(BaseConnection):
if was_empty:
# enable polling for writing.
self.em.addWriter(self)
PACKET_LOGGER.dispatch(self, packet, True)
neo.lib.logging.packet(self, packet, True)
@not_closed
def notify(self, packet):
......
#
# Copyright (C) 2010 Nexedi SA
# Copyright (C) 2010-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -22,34 +22,23 @@ import sys
from functools import wraps
import neo
# WARNING: This module should only be used for live application debugging.
# It - by purpose - allows code injection in a running neo process.
# You don't want to enable it in a production environment. Really.
ENABLED = False
# kill -RTMIN+1 <pid>
# Dump information to logs.
# kill -RTMIN+2 <pid>
# Loads (or reloads) neo.debug module.
# The content is up to you (it's only imported). It can be a breakpoint.
# How to include in python code:
# from neo.debug import register
# register()
#
# How to trigger it:
# Kill python process with:
# SIGUSR1:
# Loads (or reloads) neo.debug module.
# The content is up to you (it's only imported).
# SIGUSR2:
# Triggers a pdb prompt on process' controlling TTY.
def decorate(func):
def decorator(sig, frame):
def safe_handler(func):
def wrapper(sig, frame):
try:
func(sig, frame)
except:
# Prevent exception from exiting signal handler, so mistakes in
# "debug" module don't kill process.
traceback.print_exc()
return wraps(func)(decorator)
return wraps(func)(wrapper)
@decorate
@safe_handler
def debugHandler(sig, frame):
file, filename, (suffix, mode, type) = imp.find_module('debug',
neo.__path__)
......@@ -95,24 +84,10 @@ def winpdb(depth=0):
finally:
os.abort()
@decorate
def pdbHandler(sig, frame):
try:
winpdb(2) # depth is 2, because of the decorator
except ImportError:
global _debugger
if _debugger is None:
_debugger = getPdb()
debugger.set_trace(frame)
def register(on_log=None):
if ENABLED:
signal.signal(signal.SIGUSR1, debugHandler)
signal.signal(signal.SIGUSR2, pdbHandler)
if on_log is not None:
# use 'kill -RTMIN <pid>
@decorate
def on_log_signal(signum, signal):
on_log()
signal.signal(signal.SIGRTMIN, on_log_signal)
if on_log is not None:
@safe_handler
def on_log_signal(signum, signal):
on_log()
signal.signal(signal.SIGRTMIN+1, on_log_signal)
signal.signal(signal.SIGRTMIN+2, debugHandler)
#
# Copyright (C) 2006-2011 Nexedi SA
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,57 +14,203 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from base64 import b64encode
import neo
from .protocol import PacketMalformedError
from .util import dump
from .handler import EventHandler
from .profiling import profiler_decorator
# WARNING: Log rotating should not be implemented here.
# SQLite does not access database only by file descriptor,
# and an OperationalError exception would be raised if a log is emitted
# between a rename and a reopen.
# Fortunately, SQLite allow multiple process to access the same DB,
# so an external tool should be able to dump and empty tables.
LOGGER_ENABLED = False
from binascii import b2a_hex
from collections import deque
from functools import wraps
from logging import getLogger, Formatter, Logger, LogRecord, StreamHandler, \
DEBUG, WARNING
from time import time
from traceback import format_exception
import neo, os, signal, sqlite3, threading
class PacketLogger(object):
""" Logger at packet level (for debugging purpose) """
# Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE = ( 234360832 # extra memory used
- 16777264 # sum of raw data ('msg' attribute)
) // 187509 # number of records
FMT = ('%(asctime)s %(levelname)-9s %(name)-10s'
' [%(module)14s:%(lineno)3d] \n%(message)s')
class _Formatter(Formatter):
def formatTime(self, record, datefmt=None):
return Formatter.formatTime(self, record,
'%Y-%m-%d %H:%M:%S') + '.%04d' % (record.msecs * 10)
def format(self, record):
lines = iter(Formatter.format(self, record).splitlines())
prefix = lines.next()
return '\n'.join(prefix + line for line in lines)
class PacketRecord(object):
args = None
levelno = DEBUG
__init__ = property(lambda self: self.__dict__.update)
class logging(Logger):
default_root_handler = StreamHandler()
default_root_handler.setFormatter(_Formatter(FMT))
def __init__(self):
self.enable(LOGGER_ENABLED)
def enable(self, enabled):
self.dispatch = enabled and self._dispatch or (lambda *args, **kw: None)
def _dispatch(self, conn, packet, outgoing):
"""This is a helper method to handle various packet types."""
# default log message
uuid = dump(conn.getUUID())
ip, port = conn.getAddress()
packet_name = packet.__class__.__name__
neo.lib.logging.debug('#0x%04x %-30s %s %s (%s:%d) %s', packet.getId(),
packet_name, outgoing and '>' or '<', uuid, ip, port,
b64encode(packet._body[:96]))
# look for custom packet logger
logger = getattr(self, packet.handler_method_name, None)
if logger is None:
Logger.__init__(self, None)
self.parent = root = getLogger()
if not root.handlers:
root.addHandler(self.default_root_handler)
self.db = None
self._record_queue = deque()
self._record_size = 0
self._async = set()
l = threading.Lock()
self._acquire = l.acquire
release = l.release
def _release():
try:
while self._async:
self._async.pop()(self)
finally:
release()
self._release = _release
self.backlog()
def __async(wrapped):
def wrapper(self):
self._async.add(wrapped)
if self._acquire(0):
self._release()
return wraps(wrapped)(wrapper)
@__async
def flush(self):
if self.db is None:
return
# enhanced log
self.db.execute("BEGIN")
for r in self._record_queue:
self._emit(r)
self.db.commit()
self._record_queue.clear()
self._record_size = 0
def backlog(self, max_size=1<<24):
self._acquire()
try:
args = packet.decode() or ()
except PacketMalformedError:
neo.lib.logging.warning("Can't decode packet for logging")
return
log_message = logger(conn, *args)
if log_message is not None:
neo.lib.logging.debug('#0x%04x %s', packet.getId(), log_message)
self._max_size = max_size
if max_size is None:
self.flush()
else:
q = self._record_queue
while max_size < self._record_size:
self._record_size -= RECORD_SIZE + len(q.popleft().msg)
finally:
self._release()
def setup(self, filename=None, reset=False):
self._acquire()
try:
if self.db is not None:
self.db.close()
if not filename:
self.db = None
self._record_queue.clear()
self._record_size = 0
return
if filename:
self.db = sqlite3.connect(filename, isolation_level=None,
check_same_thread=False)
q = self.db.execute
if reset:
for t in 'log', 'packet':
q('DROP TABLE IF EXISTS ' + t)
q("""CREATE TABLE IF NOT EXISTS log (
date REAL NOT NULL,
name TEXT,
level INTEGER NOT NULL,
pathname TEXT,
lineno INTEGER,
msg TEXT)
""")
q("""CREATE INDEX IF NOT EXISTS _log_i1 ON log(date)""")
q("""CREATE TABLE IF NOT EXISTS packet (
date REAL NOT NULL,
name TEXT,
msg_id INTEGER NOT NULL,
code INTEGER NOT NULL,
peer TEXT NOT NULL,
body BLOB)
""")
q("""CREATE INDEX IF NOT EXISTS _packet_i1 ON packet(date)""")
finally:
self._release()
__del__ = setup
def error(self, conn, code, message):
return "%s (%s)" % (code, message)
def isEnabledFor(self, level):
return True
def notifyNodeInformation(self, conn, node_list):
for node_type, address, uuid, state in node_list:
if address is not None:
address = '%s:%d' % address
def _emit(self, r):
if type(r) is PacketRecord:
ip, port = r.addr
peer = '%s %s (%s:%u)' % ('>' if r.outgoing else '<',
r.uuid and b2a_hex(r.uuid), ip, port)
self.db.execute("INSERT INTO packet VALUES (?,?,?,?,?,?)",
(r.created, r._name, r.msg_id, r.code, peer, buffer(r.msg)))
else:
pathname = os.path.relpath(r.pathname, *neo.__path__)
self.db.execute("INSERT INTO log VALUES (?,?,?,?,?,?)",
(r.created, r._name, r.levelno, pathname, r.lineno, r.msg))
def _queue(self, record):
record._name = self.name and str(self.name)
self._acquire()
try:
if self._max_size is None:
self._emit(record)
else:
address = '?'
node = (dump(uuid), node_type, address, state)
neo.lib.logging.debug(' ! %s | %8s | %22s | %s' % node)
self._record_size += RECORD_SIZE + len(record.msg)
q = self._record_queue
q.append(record)
if record.levelno < WARNING:
while self._max_size < self._record_size:
self._record_size -= RECORD_SIZE + len(q.popleft().msg)
else:
self.flush()
finally:
self._release()
def callHandlers(self, record):
if self.db is not None:
record.msg = record.getMessage()
record.args = None
if record.exc_info:
record.msg += '\n' + ''.join(
format_exception(*record.exc_info)).strip()
record.exc_info = None
self._queue(record)
if Logger.isEnabledFor(self, record.levelno):
record.name = self.name or 'NEO'
self.parent.callHandlers(record)
def packet(self, connection, packet, outgoing):
if self.db is not None:
ip, port = connection.getAddress()
self._queue(PacketRecord(
created=time(),
msg_id=packet._id,
code=packet._code,
outgoing=outgoing,
uuid=connection.getUUID(),
addr=connection.getAddress(),
msg=packet._body))
PACKET_LOGGER = PacketLogger()
logging = logging()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
......@@ -2,7 +2,7 @@
#
# neoadmin - run an administrator node of NEO
#
# Copyright (C) 2009 Nexedi SA
# Copyright (C) 2009-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,26 +18,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from optparse import OptionParser
from neo.lib import setupLog
from neo.lib import logging
from neo.lib.config import ConfigurationManager
parser = OptionParser()
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
'process')
parser.add_option('-v', '--verbose', action = 'store_true',
help = 'print verbose messages')
parser.add_option('-f', '--file', help = 'specify a configuration file')
parser.add_option('-s', '--section', help = 'specify a configuration section')
parser.add_option('-l', '--logfile', help = 'specify a logging file')
parser.add_option('-c', '--cluster', help = 'the cluster name')
parser.add_option('-m', '--masters', help = 'master node list')
parser.add_option('-b', '--bind', help = 'the local address to bind to')
parser.add_option('-n', '--name', help = 'the node name (improve logging)')
parser.add_option('-D', '--dynamic-master-list', help='path of the file '
'containing dynamic master node list')
defaults = dict(
name = 'admin',
bind = '127.0.0.1:9999',
masters = '127.0.0.1:10000',
)
......@@ -47,7 +43,6 @@ def main(args=None):
(options, args) = parser.parse_args(args=args)
arguments = dict(
uuid = options.uuid,
name = options.name or options.section,
cluster = options.cluster,
masters = options.masters,
bind = options.bind,
......@@ -61,7 +56,7 @@ def main(args=None):
)
# setup custom logging
setupLog(config.getName().upper(), options.logfile or None, options.verbose)
logging.setup(options.logfile)
# and then, load and run the application
from neo.admin.app import Application
......
......@@ -2,7 +2,7 @@
#
# neoadmin - run an administrator node of NEO
#
# Copyright (C) 2009 Nexedi SA
# Copyright (C) 2009-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -19,15 +19,14 @@
import sys
from optparse import OptionParser
from neo.lib import setupLog
from neo.lib import logging
from neo.lib.util import parseNodeAddress
parser = OptionParser()
parser.add_option('-v', '--verbose', action = 'store_true',
help = 'print verbose messages')
parser.add_option('-a', '--address', help = 'specify the address (ip:port) ' \
'of an admin node', default = '127.0.0.1:9999')
parser.add_option('--handler', help = 'specify the connection handler')
parser.add_option('-l', '--logfile', help = 'specify a logging file')
def main(args=None):
(options, args) = parser.parse_args(args=args)
......@@ -36,7 +35,7 @@ def main(args=None):
else:
address = ('127.0.0.1', 9999)
setupLog('NEOCTL', options.verbose)
logging.setup(options.logfile)
from neo.neoctl.app import Application
print Application(address).execute(args)
......
#!/usr/bin/env python
#
# neostorage - run a storage node of NEO
#
# Copyright (C) 2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging, os, sqlite3, sys, time
from neo.lib.protocol import Packets, PacketMalformedError
from neo.lib.util import dump
def emit(date, name, levelname, msg_list):
d = int(date)
prefix = '%s.%04u %-9s %-10s ' % (time.strftime('%F %T', time.localtime(d)),
int((date - d) * 10000), levelname,
name or default_name)
for msg in msg_list:
print prefix + msg
class packet(object):
def __new__(cls, date, name, msg_id, code, peer, body):
try:
p = Packets[code]
except KeyError:
Packets[code] = p = type('UnknownPacket[%u]' % code, (object,), {})
msg = ['#0x%04x %-30s %s' % (msg_id, p.__name__, peer)]
if body is not None:
try:
logger = getattr(cls, p.handler_method_name)
except AttributeError:
pass
else:
p = p()
p._id = msg_id
p._body = body
try:
args = p.decode()
except PacketMalformedError:
msg.append("Can't decode packet")
else:
msg += logger(*args)
emit(date, name, 'PACKET', msg)
@staticmethod
def error(code, message):
return "%s (%s)" % (code, message),
@staticmethod
def notifyNodeInformation(node_list):
for node_type, address, uuid, state in node_list:
address = '%s:%u' % address if address else '?'
yield ' ! %s | %8s | %22s | %s' % (
dump(uuid), node_type, address, state)
def main():
global default_name
db_path = sys.argv[1]
default_name, _ = os.path.splitext(os.path.basename(db_path))
db = sqlite3.connect(db_path)
nl = db.execute('select * from log')
np = db.execute('select * from packet')
try:
p = np.next()
except StopIteration:
p = None
for date, name, level, pathname, lineno, msg in nl:
try:
while p and p[0] < date:
packet(*p)
p = np.next()
except StopIteration:
p = None
emit(date, name, logging.getLevelName(level), msg.splitlines())
if p:
packet(*p)
for p in np:
packet(*p)
if __name__ == "__main__":
sys.exit(main())
......@@ -2,7 +2,7 @@
#
# neomaster - run a master node of NEO
#
# Copyright (C) 2006 Nexedi SA
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,16 +18,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from optparse import OptionParser
from neo.lib import setupLog
from neo.lib import logging
from neo.lib.config import ConfigurationManager
parser = OptionParser()
parser.add_option('-v', '--verbose', action = 'store_true',
help = 'print verbose messages')
parser.add_option('-f', '--file', help = 'specify a configuration file')
parser.add_option('-s', '--section', help = 'specify a configuration section')
parser.add_option('-u', '--uuid', help='the node UUID (testing purpose)')
parser.add_option('-n', '--name', help = 'the node name (impove logging)')
parser.add_option('-b', '--bind', help = 'the local address to bind to')
parser.add_option('-c', '--cluster', help = 'the cluster name')
parser.add_option('-m', '--masters', help = 'master node list')
......@@ -38,7 +35,6 @@ parser.add_option('-D', '--dynamic-master-list', help='path of the file '
'containing dynamic master node list')
defaults = dict(
name = 'master',
bind = '127.0.0.1:10000',
masters = '',
replicas = 0,
......@@ -51,7 +47,6 @@ def main(args=None):
arguments = dict(
uuid = options.uuid or None,
bind = options.bind,
name = options.name or options.section,
cluster = options.cluster,
masters = options.masters,
replicas = options.replicas,
......@@ -65,7 +60,7 @@ def main(args=None):
)
# setup custom logging
setupLog(config.getName().upper(), options.logfile or None, options.verbose)
logging.setup(options.logfile)
# and then, load and run the application
from neo.master.app import Application
......
......@@ -2,7 +2,7 @@
#
# neostorage - run a storage node of NEO
#
# Copyright (C) 2006 Nexedi SA
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,13 +18,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from optparse import OptionParser
from neo.lib import setupLog
from neo.lib import logging
from neo.lib.config import ConfigurationManager
parser = OptionParser()
parser.add_option('-v', '--verbose', action = 'store_true',
help = 'print verbose messages')
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
'process. Previously assigned UUID takes precedence (ie ' \
'you should always use -R with this switch)')
......@@ -33,7 +31,6 @@ parser.add_option('-s', '--section', help = 'specify a configuration section')
parser.add_option('-l', '--logfile', help = 'specify a logging file')
parser.add_option('-R', '--reset', action = 'store_true',
help = 'remove an existing database if any')
parser.add_option('-n', '--name', help = 'the node name (impove logging)')
parser.add_option('-b', '--bind', help = 'the local address to bind to')
parser.add_option('-c', '--cluster', help = 'the cluster name')
parser.add_option('-m', '--masters', help = 'master node list')
......@@ -45,7 +42,6 @@ parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0)
defaults = dict(
name = 'storage',
bind = '127.0.0.1',
masters = '127.0.0.1:10000',
adapter = 'MySQL',
......@@ -60,7 +56,6 @@ def main(args=None):
arguments = dict(
uuid = options.uuid,
bind = options.bind,
name = options.name or options.section,
cluster = options.cluster,
masters = options.masters,
database = options.database,
......@@ -76,7 +71,7 @@ def main(args=None):
)
# setup custom logging
setupLog(config.getName().upper(), options.logfile or None, options.verbose)
logging.setup(options.logfile)
# and then, load and run the application
from neo.storage.app import Application
......
#!/usr/bin/env python
#
# Copyright (C) 2009 Nexedi SA
# Copyright (C) 2009-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#!/usr/bin/env python
#
# Copyright (C) 2011 Nexedi SA
# Copyright (C) 2011-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -17,8 +17,10 @@
import inspect, random, signal, sys
from optparse import OptionParser
from neo.lib import logger, logging
from neo.lib import logging
from neo.tests import functional
logging.backlog()
del logging.default_root_handler.handle
def main():
args, _, _, defaults = inspect.getargspec(functional.NEOCluster.__init__)
......@@ -40,8 +42,6 @@ def main():
parser.add_option('--' + option, **kw)
parser.set_defaults(**defaults)
options, args = parser.parse_args()
if options.verbose:
logger.PACKET_LOGGER.enable(True)
if options.seed:
functional.random = random.Random(options.seed)
cluster = functional.NEOCluster(args, **dict((x, getattr(options, x))
......
......@@ -22,11 +22,10 @@ import sys
import tempfile
import unittest
import MySQLdb
import neo
import transaction
from mock import Mock
from neo.lib import debug, logger, protocol, setupLog
from neo.lib import debug, logging, protocol
from neo.lib.protocol import Packets
from neo.lib.util import getAddressType
from time import time, gmtime
......@@ -44,10 +43,12 @@ IP_VERSION_FORMAT_DICT = {
ADDRESS_TYPE = socket.AF_INET
debug.ENABLED = True
logging.default_root_handler.handle = lambda record: None
logging.backlog(None)
debug.register()
# prevent "signal only works in main thread" errors in subprocesses
debug.ENABLED = False
debug.register = lambda on_log=None: None
def mockDefaultValue(name, function):
def method(self, *args, **kw):
......@@ -109,30 +110,31 @@ def setupMySQLdb(db_list, user=DB_USER, password='', clear_databases=True):
conn.close()
class NeoTestBase(unittest.TestCase):
def setUp(self):
logger.PACKET_LOGGER.enable(True)
sys.stdout.write(' * %s ' % (self.id(), ))
sys.stdout.flush()
self.setupLog()
logging.name = self.setupLog()
unittest.TestCase.setUp(self)
def setupLog(self):
test_case, test_method = self.id().rsplit('.', 1)
log_file = os.path.join(getTempDirectory(), test_case + '.log')
setupLog(test_method, log_file, True)
test_case, logging.name = self.id().rsplit('.', 1)
logging.setup(os.path.join(getTempDirectory(), test_case + '.log'))
def tearDown(self):
def tearDown(self, success='ok' if sys.version_info < (2,7) else 'success'):
assert self.tearDown.im_func is NeoTestBase.tearDown.im_func
self._tearDown(sys._getframe(1).f_locals[success])
def _tearDown(self, success):
# Kill all unfinished transactions for next test.
# Note we don't even abort them because it may require a valid
# connection to a master node (see Storage.sync()).
transaction.manager.__init__()
unittest.TestCase.tearDown(self)
sys.stdout.write('\n')
sys.stdout.flush()
print
class failureException(AssertionError):
def __init__(self, msg=None):
neo.lib.logging.error(msg)
logging.error(msg)
AssertionError.__init__(self, msg)
failIfEqual = failUnlessEqual = assertEquals = assertNotEquals = None
......@@ -180,7 +182,6 @@ class NeoUnitTestBase(NeoTestBase):
database = '%s@%s%s' % (DB_USER, prefix, index)
return Mock({
'getCluster': cluster,
'getName': 'storage',
'getBind': (masters[0], 10020 + index),
'getMasters': (masters, getAddressType((
self.local_ip, 0))),
......@@ -510,7 +511,7 @@ connector_cpt = 0
class DoNothingConnector(Mock):
def __init__(self, s=None):
neo.lib.logging.info("initializing connector")
logging.info("initializing connector")
global connector_cpt
self.desc = connector_cpt
connector_cpt += 1
......
......@@ -8,10 +8,11 @@ import datetime
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from neo.lib import logger
MAIL_SERVER = '127.0.0.1:25'
from neo.lib import logging
logging.backlog()
class AttributeDict(dict):
def __getattr__(self, item):
......@@ -28,7 +29,6 @@ class BenchmarkRunner(object):
parser = optparse.OptionParser()
# register common options
parser.add_option('', '--title')
parser.add_option('-v', '--verbose', action='store_true')
parser.add_option('', '--mail-to', action='append')
parser.add_option('', '--mail-from')
parser.add_option('', '--mail-server')
......@@ -44,7 +44,6 @@ class BenchmarkRunner(object):
self._config.update(self.load_options(options, self._args))
self._config.update(
title = options.title or self.__class__.__name__,
verbose = bool(options.verbose),
mail_from = options.mail_from,
mail_to = options.mail_to,
mail_server = mail_server.split(':'),
......@@ -93,7 +92,6 @@ class BenchmarkRunner(object):
s.close()
def run(self):
logger.PACKET_LOGGER.enable(self._config.verbose)
subject, report = self.start()
report = self.build_report(report)
if self._config.mail_to:
......
......@@ -77,7 +77,7 @@ class ClientApplicationTests(NeoUnitTestBase):
Application.getPartitionTable = getPartitionTable
self._to_stop_list = []
def tearDown(self):
def _tearDown(self, success):
# stop threads
for app in self._to_stop_list:
app.close()
......@@ -85,7 +85,7 @@ class ClientApplicationTests(NeoUnitTestBase):
Application._getMasterConnection = self._getMasterConnection
Application._ask = self._ask
Application.getPartitionTable = self.getPartitionTable
NeoUnitTestBase.tearDown(self)
NeoUnitTestBase._tearDown(self, success)
# some helpers
......
......@@ -288,4 +288,5 @@ class ClusterPdb(object):
__builtin__.pdb = ClusterPdb()
signal.signal(signal.SIGUSR2, debug.decorate(lambda sig, frame: pdb(depth=2)))
signal.signal(signal.SIGUSR1, debug.safe_handler(
lambda sig, frame: pdb(depth=2)))
......@@ -33,7 +33,6 @@ import psutil
import neo.scripts
from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo.lib import setupLog
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
from neo.lib.util import dump
from .. import DB_USER, setupMySQLdb, NeoTestBase, buildUrlFromString, \
......@@ -171,6 +170,8 @@ class NEOProcess(object):
# prevent child from killing anything
del self.__class__.__del__
try:
# release SQLite debug log
neo.lib.logging.setup()
# release system-wide lock
for allocator in PortAllocator.allocator_set.copy():
allocator.reset()
......@@ -243,7 +244,6 @@ class NEOCluster(object):
db_user=DB_USER, db_password='',
cleanup_on_delete=False, temp_dir=None, clear_databases=True,
adapter=os.getenv('NEO_TESTS_ADAPTER'),
verbose=True,
address_type=ADDRESS_TYPE,
):
if not adapter:
......@@ -251,7 +251,6 @@ class NEOCluster(object):
self.adapter = adapter
self.zodb_storage_list = []
self.cleanup_on_delete = cleanup_on_delete
self.verbose = verbose
self.uuid_set = set()
self.db_list = db_list
if temp_dir is None:
......@@ -282,16 +281,16 @@ class NEOCluster(object):
# create admin node
self.__newProcess(NEO_ADMIN, {
'--cluster': self.cluster_name,
'--name': 'admin',
'--logfile': os.path.join(self.temp_dir, 'admin.log'),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip), admin_port, ),
'--masters': self.master_nodes,
})
# create master nodes
for index, port in enumerate(master_node_list):
for i, port in enumerate(master_node_list):
self.__newProcess(NEO_MASTER, {
'--cluster': self.cluster_name,
'--name': 'master_%d' % index,
'--logfile': os.path.join(self.temp_dir, 'master_%u.log' % i),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip), port, ),
'--masters': self.master_nodes,
......@@ -299,10 +298,10 @@ class NEOCluster(object):
'--partitions': partitions,
})
# create storage nodes
for index, db in enumerate(db_list):
for i, db in enumerate(db_list):
self.__newProcess(NEO_STORAGE, {
'--cluster': self.cluster_name,
'--name': 'storage_%d' % index,
'--logfile': os.path.join(self.temp_dir, 'storage_%u.log' % i),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip),
0 ),
......@@ -316,11 +315,6 @@ class NEOCluster(object):
def __newProcess(self, command, arguments):
uuid = self.__allocateUUID()
arguments['--uuid'] = uuid
if self.verbose:
arguments['--verbose'] = True
logfile = arguments['--name']
arguments['--logfile'] = os.path.join(self.temp_dir, '%s.log' % (logfile, ))
self.process_dict.setdefault(command, []).append(
NEOProcess(command, uuid, arguments))
......@@ -419,8 +413,6 @@ class NEOCluster(object):
result = Storage(
master_nodes=master_nodes,
name=self.cluster_name,
logfile=os.path.join(self.temp_dir, 'client.log'),
verbose=self.verbose,
**kw)
self.zodb_storage_list.append(result)
return result
......@@ -649,8 +641,7 @@ class NEOCluster(object):
class NEOFunctionalTest(NeoTestBase):
def setupLog(self):
log_file = os.path.join(self.getTempDirectory(), 'test.log')
setupLog('TEST', log_file, True)
neo.lib.logging.setup(os.path.join(self.getTempDirectory(), 'test.log'))
def getTempDirectory(self):
# build the full path based on test case and current test method
......
......@@ -76,10 +76,10 @@ class ClientTests(NEOFunctionalTest):
temp_dir=self.getTempDirectory()
)
def tearDown(self):
def _tearDown(self, success):
if self.neo is not None:
self.neo.stop()
NEOFunctionalTest.tearDown(self)
NEOFunctionalTest._tearDown(self, success)
def __setup(self):
# start cluster
......
......@@ -27,10 +27,10 @@ class ClusterTests(NEOFunctionalTest):
NEOFunctionalTest.setUp(self)
self.neo = None
def tearDown(self):
def _tearDown(self, success):
if self.neo is not None:
self.neo.stop()
NEOFunctionalTest.tearDown(self)
NEOFunctionalTest._tearDown(self, success)
def testClusterStartup(self):
neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1,
......
......@@ -32,9 +32,9 @@ class MasterTests(NEOFunctionalTest):
self.storage = self.neo.getZODBStorage()
self.neoctl = self.neo.getNEOCTL()
def tearDown(self):
def _tearDown(self, success):
self.neo.stop()
NEOFunctionalTest.tearDown(self)
NEOFunctionalTest._tearDown(self, success)
def testStoppingSecondaryMaster(self):
# Wait for masters to stabilize
......
......@@ -39,10 +39,10 @@ class StorageTests(NEOFunctionalTest):
NEOFunctionalTest.setUp(self)
self.neo = None
def tearDown(self):
def _tearDown(self, success):
if self.neo is not None:
self.neo.stop()
NEOFunctionalTest.tearDown(self)
NEOFunctionalTest._tearDown(self, success)
def queryCount(self, db, query):
try:
......
......@@ -70,10 +70,10 @@ class MasterClientElectionTests(MasterClientElectionTestBase):
ClientConnection._addPacket = _addPacket
super(MasterClientElectionTests, self).setUp()
def tearDown(self):
def _tearDown(self, success):
# restore patched methods
ClientConnection._addPacket = self._addPacket
NeoUnitTestBase.tearDown(self)
NeoUnitTestBase._tearDown(self, success)
def _checkUnconnected(self, node):
addr = node.getAddress()
......@@ -233,8 +233,8 @@ class MasterServerElectionTests(MasterClientElectionTestBase):
ClientConnection._addPacket = _addPacket
super(MasterServerElectionTests, self).setUp()
def tearDown(self):
NeoUnitTestBase.tearDown(self)
def _tearDown(self, success):
NeoUnitTestBase._tearDown(self, success)
# restore environnement
ClientConnection._addPacket = self._addPacket
......
......@@ -55,10 +55,10 @@ class StorageClientHandlerTests(NeoUnitTestBase):
self.app.primary_master_node = pmn
self.master_port = 10010
def tearDown(self):
def _tearDown(self, success):
self.app.close()
del self.app
super(StorageClientHandlerTests, self).tearDown()
super(StorageClientHandlerTests, self)._tearDown(success)
def _getConnection(self, uuid=None):
return self.getFakeConnection(uuid=uuid, address=('127.0.0.1', 1000))
......
......@@ -34,10 +34,10 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase):
self.app.pt = PartitionTable(4, 1)
self.identification = IdentificationHandler(self.app)
def tearDown(self):
def _tearDown(self, success):
self.app.close()
del self.app
super(StorageIdentificationHandlerTests, self).tearDown()
super(StorageIdentificationHandlerTests, self)._tearDown(success)
def test_requestIdentification1(self):
""" nodes are rejected during election or if unknown storage """
......
......@@ -42,10 +42,10 @@ class StorageInitializationHandlerTests(NeoUnitTestBase):
self.app.load_lock_dict = {}
self.app.pt = PartitionTable(self.num_partitions, self.num_replicas)
def tearDown(self):
def _tearDown(self, success):
self.app.close()
del self.app
super(StorageInitializationHandlerTests, self).tearDown()
super(StorageInitializationHandlerTests, self)._tearDown(success)
# Common methods
def getLastUUID(self):
......
......@@ -52,10 +52,10 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
self.app.primary_master_node = pmn
self.master_port = 10010
def tearDown(self):
def _tearDown(self, success):
self.app.close()
del self.app
super(StorageMasterHandlerTests, self).tearDown()
super(StorageMasterHandlerTests, self)._tearDown(success)
def getMasterConnection(self):
address = ("127.0.0.1", self.master_port)
......
......@@ -35,10 +35,10 @@ class StorageAppTests(NeoUnitTestBase):
self.app.event_queue = deque()
self.app.event_queue_dict = {}
def tearDown(self):
def _tearDown(self, success):
self.app.close()
del self.app
super(StorageAppTests, self).tearDown()
super(StorageAppTests, self)._tearDown(success)
def test_01_loadPartitionTable(self):
self.app.dm = Mock({
......
......@@ -36,12 +36,12 @@ class StorageDBTests(NeoUnitTestBase):
self.setNumPartitions(1)
return self._db
def tearDown(self):
def _tearDown(self, success):
try:
self.__dict__.pop('_db', None).close()
except AttributeError:
pass
NeoUnitTestBase.tearDown(self)
NeoUnitTestBase._tearDown(self, success)
def getDB(self):
raise NotImplementedError
......
......@@ -43,10 +43,10 @@ class StorageVerificationHandlerTests(NeoUnitTestBase):
self.app.load_lock_dict = {}
self.app.pt = PartitionTable(self.num_partitions, self.num_replicas)
def tearDown(self):
def _tearDown(self, success):
self.app.close()
del self.app
super(StorageVerificationHandlerTests, self).tearDown()
super(StorageVerificationHandlerTests, self)._tearDown(success)
# Common methods
def getLastUUID(self):
......
......@@ -36,10 +36,10 @@ class BootstrapManagerTests(NeoUnitTestBase):
self.num_partitions = 1009
self.num_replicas = 2
def tearDown(self):
def _tearDown(self, success):
self.app.close()
del self.app
super(BootstrapManagerTests, self).tearDown()
super(BootstrapManagerTests, self)._tearDown(success)
# Common methods
def getLastUUID(self):
......
......@@ -25,7 +25,7 @@ from . import DoNothingConnector
from neo.lib.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException
from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, ParserState
from neo.lib.protocol import Packets, ParserState, PACKET_HEADER_FORMAT
from . import NeoUnitTestBase
from neo.lib.util import ReadBuffer
from neo.lib.locking import Queue
......@@ -406,13 +406,12 @@ class ConnectionTests(NeoUnitTestBase):
def test_07_Connection_addPacket(self):
# new packet
p = Mock({"encode" : "testdata", "getId": 0})
p._body = ''
p.handler_method_name = 'testmethod'
p = Packets.Ping()
p._id = 0
bc = self._makeConnection()
self._checkWriteBuf(bc, '')
bc._addPacket(p)
self._checkWriteBuf(bc, 'testdata')
self._checkWriteBuf(bc, PACKET_HEADER_FORMAT.pack(0, p._code, 10))
self._checkWriterAdded(1)
def test_Connection_analyse1(self):
......
......@@ -25,7 +25,7 @@ import transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app
from neo.client import Storage
from neo.lib import bootstrap, setupLog
from neo.lib import bootstrap
from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \
ConnectorConnectionRefusedException, ConnectorTryAgainException
......@@ -518,7 +518,6 @@ class NEOCluster(object):
cls.SocketConnector_makeListeningConnection(self, BIND)
SocketConnector.receive = receive
SocketConnector.send = send
Storage.setupLog = lambda *args, **kw: None
Serialized.init()
@staticmethod
......@@ -536,19 +535,11 @@ class NEOCluster(object):
cls.SocketConnector_makeListeningConnection
SocketConnector.receive = cls.SocketConnector_receive
SocketConnector.send = cls.SocketConnector_send
Storage.setupLog = setupLog
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True,
db_user=DB_USER, db_password='', verbose=None):
if verbose is not None:
temp_dir = os.getenv('TEMP') or \
os.path.join(tempfile.gettempdir(), 'neo_tests')
os.path.exists(temp_dir) or os.makedirs(temp_dir)
log_file = tempfile.mkstemp('.log', '', temp_dir)[1]
print 'Logging to %r' % log_file
setupLog(LoggerThreadName(), log_file, verbose)
db_user=DB_USER, db_password=''):
self.name = 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100))
master_list = [MasterApplication.newAddress()
......@@ -751,11 +742,16 @@ class NEOThreadedTest(NeoTestBase):
def setupLog(self):
log_file = os.path.join(getTempDirectory(), self.id() + '.log')
setupLog(LoggerThreadName(), log_file, True)
neo.lib.logging.setup(log_file)
return LoggerThreadName()
def tearDown(self):
super(NEOThreadedTest, self).tearDown()
def _tearDown(self, success):
super(NEOThreadedTest, self)._tearDown(success)
ServerNode.resetPorts()
if success:
q = neo.lib.logging.db.execute
q("UPDATE packet SET body=NULL")
q("VACUUM")
def getUnpickler(self, conn):
reader = conn._reader
......
......@@ -41,11 +41,11 @@ class ZODBTestCase(TestCase):
self.neo.start()
self._storage = self.neo.getZODBStorage()
def tearDown(self):
def _tearDown(self, success):
self._storage.cleanup()
self.neo.stop()
del self.neo, self._storage
super(ZODBTestCase, self).tearDown()
super(ZODBTestCase, self)._tearDown(success)
assertEquals = failUnlessEqual = TestCase.assertEqual
assertNotEquals = failIfEqual = TestCase.assertNotEqual
......
......@@ -39,8 +39,8 @@ class RecoveryTests(ZODBTestCase, StorageTestBase, RecoveryStorage):
self._dst = self.neo.getZODBStorage()
self._dst_db = ZODB.DB(self._dst)
def tearDown(self):
super(RecoveryTests, self).tearDown()
def _tearDown(self, success):
super(RecoveryTests, self)._tearDown(success)
self._dst_db.close()
self._dst.cleanup()
self.neo_dst.stop()
......
......@@ -26,9 +26,9 @@ class NEOZODBTests(ZODBTestCase, testZODB.ZODBTests):
super(NEOZODBTests, self).setUp()
self._db = ZODB.DB(self._storage)
def tearDown(self):
def _tearDown(self, success):
self._db.close()
super(NEOZODBTests, self).tearDown()
super(NEOZODBTests, self)._tearDown(success)
def checkMultipleUndoInOneTransaction(self):
# XXX: Upstream test accesses a persistent object outside a transaction
......
......@@ -55,6 +55,7 @@ setup(
# (eg. we don't want neotestrunner if nothing depends on tests)
'neoadmin=neo.scripts.neoadmin:main',
'neoctl=neo.scripts.neoctl:main',
'neolog=neo.scripts.neolog:main',
'neomaster=neo.scripts.neomaster:main',
'neomigrate=neo.scripts.neomigrate:main',
'neostorage=neo.scripts.neostorage:main',
......
......@@ -91,7 +91,6 @@ class MatrixImportBenchmark(BenchmarkRunner):
master_count=masters,
partitions=partitions,
replicas=replicas,
verbose=self._config.verbose,
)
try:
neo.start()
......
......@@ -45,7 +45,6 @@ class ImportBenchmark(BenchmarkRunner):
partitions=config.partitions,
replicas=config.replicas,
master_count=config.masters,
verbose=False,
)
# import datafs
try:
......
......@@ -74,7 +74,6 @@ class ReplicationBenchmark(BenchmarkRunner):
partitions=config.partitions,
replicas=1,
master_count=1,
verbose=False,
)
neo.start()
p_time = r_time = None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment