logger.py 9.8 KB
Newer Older
1
#
2
# Copyright (C) 2006-2015  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17 18 19 20 21 22
# WARNING: Log rotating should not be implemented here.
#          SQLite does not access database only by file descriptor,
#          and an OperationalError exception would be raised if a log is emitted
#          between a rename and a reopen.
#          Fortunately, SQLite allow multiple process to access the same DB,
#          so an external tool should be able to dump and empty tables.
23

24 25
from collections import deque
from functools import wraps
26
from logging import getLogger, Formatter, Logger, StreamHandler, \
27 28 29
    DEBUG, WARNING
from time import time
from traceback import format_exception
30
import bz2, inspect, neo, os, signal, sqlite3, sys, threading
31

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
# Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE = ( 234360832 # extra memory used
              - 16777264  # sum of raw data ('msg' attribute)
              ) // 187509 # number of records

FMT = ('%(asctime)s %(levelname)-9s %(name)-10s'
       ' [%(module)14s:%(lineno)3d] \n%(message)s')

class _Formatter(Formatter):

    def formatTime(self, record, datefmt=None):
        return Formatter.formatTime(self, record,
           '%Y-%m-%d %H:%M:%S') + '.%04d' % (record.msecs * 10)

    def format(self, record):
        lines = iter(Formatter.format(self, record).splitlines())
        prefix = lines.next()
        return '\n'.join(prefix + line for line in lines)


class PacketRecord(object):

    args = None
    levelno = DEBUG
    __init__ = property(lambda self: self.__dict__.update)


59
class NEOLogger(Logger):
60 61 62

    default_root_handler = StreamHandler()
    default_root_handler.setFormatter(_Formatter(FMT))
63 64

    def __init__(self):
65 66 67 68
        Logger.__init__(self, None)
        self.parent = root = getLogger()
        if not root.handlers:
            root.addHandler(self.default_root_handler)
69
        self._db = None
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
        self._record_queue = deque()
        self._record_size = 0
        self._async = set()
        l = threading.Lock()
        self._acquire = l.acquire
        release = l.release
        def _release():
            try:
                while self._async:
                    self._async.pop()(self)
            finally:
                release()
        self._release = _release
        self.backlog()

85 86 87 88 89 90 91
    def __enter__(self):
        self._acquire()
        return self._db

    def __exit__(self, t, v, tb):
        self._release()

92 93 94 95 96 97 98
    def __async(wrapped):
        def wrapper(self):
            self._async.add(wrapped)
            if self._acquire(0):
                self._release()
        return wraps(wrapped)(wrapper)

99 100 101 102 103 104 105 106 107 108 109 110
    @__async
    def reopen(self):
        if self._db is None:
            return
        q = self._db.execute
        if not q("SELECT id FROM packet LIMIT 1").fetchone():
            q("DROP TABLE protocol")
            # DROP TABLE already replaced previous data with zeros,
            # so VACUUM is not really useful. But here, it should be free.
            q("VACUUM")
        self._setup(q("PRAGMA database_list").fetchone()[2])

111 112
    @__async
    def flush(self):
113
        if self._db is None:
114
            return
115 116 117 118 119
        try:
            for r in self._record_queue:
                self._emit(r)
        finally:
            # Always commit, to not lose any record that we could emit.
120
            self.commit()
121 122 123
        self._record_queue.clear()
        self._record_size = 0

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
    def commit(self):
        try:
            self._db.commit()
        except sqlite3.OperationalError, e:
            x = e.args[0]
            if x == 'database is locked':
                sys.stderr.write('%s: retrying to emit log...' % x)
                while e.args[0] == x:
                    try:
                        self._db.commit()
                    except sqlite3.OperationalError, e:
                        continue
                    sys.stderr.write(' ok\n')
                    return
            raise

140
    def backlog(self, max_size=1<<24, max_packet=None):
141
        with self:
142
            self._max_packet = max_packet
143 144 145 146 147 148 149 150
            self._max_size = max_size
            if max_size is None:
                self.flush()
            else:
                q = self._record_queue
                while max_size < self._record_size:
                    self._record_size -= RECORD_SIZE + len(q.popleft().msg)

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
    def _setup(self, filename=None, reset=False):
        from . import protocol as p
        global uuid_str
        uuid_str = p.uuid_str
        if self._db is not None:
            self._db.close()
            if not filename:
                self._db = None
                self._record_queue.clear()
                self._record_size = 0
                return
        if filename:
            self._db = sqlite3.connect(filename, check_same_thread=False)
            q = self._db.execute
            if self._max_size is None:
                q("PRAGMA synchronous = OFF")
167 168
            if 1: # Not only when logging everything,
                  # but also for interoperability with logrotate.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
                q("PRAGMA journal_mode = MEMORY")
            if reset:
                for t in 'log', 'packet':
                    q('DROP TABLE IF EXISTS ' + t)
            q("""CREATE TABLE IF NOT EXISTS log (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    date REAL NOT NULL,
                    name TEXT,
                    level INTEGER NOT NULL,
                    pathname TEXT,
                    lineno INTEGER,
                    msg TEXT)
              """)
            q("""CREATE INDEX IF NOT EXISTS _log_i1 ON log(date)""")
            q("""CREATE TABLE IF NOT EXISTS packet (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    date REAL NOT NULL,
                    name TEXT,
                    msg_id INTEGER NOT NULL,
                    code INTEGER NOT NULL,
                    peer TEXT NOT NULL,
                    body BLOB)
              """)
            q("""CREATE INDEX IF NOT EXISTS _packet_i1 ON packet(date)""")
            q("""CREATE TABLE IF NOT EXISTS protocol (
                    date REAL PRIMARY KEY NOT NULL,
                    text BLOB NOT NULL)
              """)
            with open(inspect.getsourcefile(p)) as p:
                p = buffer(bz2.compress(p.read()))
            for t, in q("SELECT text FROM protocol ORDER BY date DESC"):
                if p == t:
                    break
            else:
203 204 205 206
                try:
                    t = self._record_queue[0].created
                except IndexError:
                    t = time()
207
                with self._db:
208
                    q("INSERT INTO protocol VALUES (?,?)", (t, p))
209

210
    def setup(self, filename=None, reset=False):
211
        with self:
212
            self._setup(filename, reset)
213
    __del__ = setup
214

215 216
    def isEnabledFor(self, level):
        return True
217

218 219 220 221
    def _emit(self, r):
        if type(r) is PacketRecord:
            ip, port = r.addr
            peer = '%s %s (%s:%u)' % ('>' if r.outgoing else '<',
222
                                      uuid_str(r.uuid), ip, port)
223 224 225
            msg = r.msg
            if msg is not None:
                msg = buffer(msg)
226
            self._db.execute("INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)",
227
                (r.created, r._name, r.msg_id, r.code, peer, msg))
228 229
        else:
            pathname = os.path.relpath(r.pathname, *neo.__path__)
230
            self._db.execute("INSERT INTO log VALUES (NULL,?,?,?,?,?,?)",
231 232 233 234 235 236 237 238
                (r.created, r._name, r.levelno, pathname, r.lineno, r.msg))

    def _queue(self, record):
        record._name = self.name and str(self.name)
        self._acquire()
        try:
            if self._max_size is None:
                self._emit(record)
239
                self.commit()
240
            else:
241 242 243 244 245 246 247 248 249 250 251 252
                self._record_size += RECORD_SIZE + len(record.msg)
                q = self._record_queue
                q.append(record)
                if record.levelno < WARNING:
                    while self._max_size < self._record_size:
                        self._record_size -= RECORD_SIZE + len(q.popleft().msg)
                else:
                    self.flush()
        finally:
            self._release()

    def callHandlers(self, record):
253
        if self._db is not None:
254 255 256 257 258 259 260 261 262 263 264 265
            record.msg = record.getMessage()
            record.args = None
            if record.exc_info:
                record.msg += '\n' + ''.join(
                    format_exception(*record.exc_info)).strip()
                record.exc_info = None
            self._queue(record)
        if Logger.isEnabledFor(self, record.levelno):
            record.name = self.name or 'NEO'
            self.parent.callHandlers(record)

    def packet(self, connection, packet, outgoing):
266
        if self._db is not None:
267 268 269
            body = packet._body
            if self._max_packet and self._max_packet < len(body):
                body = None
270 271 272 273 274 275 276
            self._queue(PacketRecord(
                created=time(),
                msg_id=packet._id,
                code=packet._code,
                outgoing=outgoing,
                uuid=connection.getUUID(),
                addr=connection.getAddress(),
277
                msg=body))
278

279

280
logging = NEOLogger()
281
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
282
signal.signal(signal.SIGRTMIN+1, lambda signum, frame: logging.reopen())