neolog.py 7.05 KB
Newer Older
1 2
#!/usr/bin/env python
#
3
# neolog - read a NEO log
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2012  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20
import bz2, logging, optparse, os, signal, sqlite3, sys, time
21
from bisect import insort
22
from logging import getLevelName
23

24 25

class Log(object):
26

27 28 29
    _log_id = _packet_id = -1
    _protocol_date = None

30 31 32
    def __init__(self, db_path, decode_all, date_format):
        self._date_format = '%F %T' if date_format is None else date_format
        self._decode_all = decode_all
33
        self._default_name = os.path.splitext(os.path.basename(db_path))[0]
34 35
        self._db = sqlite3.connect(db_path)

36
    def __iter__(self):
37
        db =  self._db
38
        try:
39
            db.execute("BEGIN")
40
            yield
41 42 43 44 45 46 47 48 49 50 51
            nl = db.execute("SELECT * FROM log WHERE id>?",
                            (self._log_id,))
            np = db.execute("SELECT * FROM packet WHERE id>?",
                            (self._packet_id,))
            try:
                p = np.next()
                self._reload(p[1])
            except StopIteration:
                p = None
            for self._log_id, date, name, level, pathname, lineno, msg in nl:
                while p and p[1] < date:
52
                    yield self._packet(*p)
53
                    p = np.fetchone()
54
                yield date, name, getLevelName(level), msg.splitlines()
55
            if p:
56
                yield self._packet(*p)
57
                for p in np:
58
                    yield self._packet(*p)
59 60
        finally:
            db.rollback()
61

62 63
    def _reload(self, date):
        q = self._db.execute
64 65 66 67 68
        date, text = q("SELECT * FROM protocol WHERE date<=?"
                       " ORDER BY date DESC", (date,)).next()
        if self._protocol_date == date:
            return
        self._protocol_date = date
69
        g = {}
70
        exec bz2.decompress(text) in g
71 72
        for x in 'uuid_str', 'Packets', 'PacketMalformedError':
            setattr(self, x, g[x])
73
        try:
74
            self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
75 76 77 78 79
                                     (date,)).next()
        except StopIteration:
            self._next_protocol = float('inf')

    def _emit(self, date, name, levelname, msg_list):
80 81 82 83 84 85
        prefix = self._date_format
        if prefix:
            d = int(date)
            prefix = '%s.%04u ' % (time.strftime(prefix, time.localtime(d)),
                                   int((date - d) * 10000))
        prefix += '%-9s %-10s ' % (levelname, name or self._default_name)
86 87 88
        for msg in msg_list:
            print prefix + msg

89 90
    def _packet(self, id, date, name, msg_id, code, peer, body):
        self._packet_id = id
91 92 93 94
        if self._next_protocol <= date:
            self._reload(date)
        try:
            p = self.Packets[code]
95 96 97 98
        except KeyError:
            Packets[code] = p = type('UnknownPacket[%u]' % code, (object,), {})
        msg = ['#0x%04x %-30s %s' % (msg_id, p.__name__, peer)]
        if body is not None:
99 100
            logger = getattr(self, p.handler_method_name, None)
            if logger or self._decode_all:
101 102 103 104 105
                p = p()
                p._id = msg_id
                p._body = body
                try:
                    args = p.decode()
106
                except self.PacketMalformedError:
107 108
                    msg.append("Can't decode packet")
                else:
109 110 111 112
                    if logger:
                        msg += logger(*args)
                    elif args:
                        msg = '%s \t| %r' % (msg[0], args),
113
        return date, name, 'PACKET', msg
114

115
    def error(self, code, message):
116 117
        return "%s (%s)" % (code, message),

118
    def notifyNodeInformation(self, node_list):
119 120 121 122 123 124 125 126 127 128 129
        node_list.sort(key=lambda x: x[2])
        node_list = [(self.uuid_str(uuid), str(node_type),
                      '%s:%u' % address if address else '?', state)
                     for node_type, address, uuid, state in node_list]
        if node_list:
            t = ' ! %%%us | %%%us | %%%us | %%s' % (
                max(len(x[0]) for x in node_list),
                max(len(x[1]) for x in node_list),
                max(len(x[2]) for x in node_list))
            return map(t.__mod__, node_list)
        return ()
130 131


132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
def emit_many(log_list):
    log_list = [(log, iter(log).next) for log in log_list]
    for x in log_list: # try to start all transactions at the same time
        x[1]()
    event_list = []
    for log, next in log_list:
        try:
            event = next()
        except StopIteration:
            continue
        event_list.append((-event[0], next, log._emit, event))
    if event_list:
        event_list.sort()
        while True:
            key, next, emit, event = event_list.pop()
            try:
                next_date = - event_list[-1][0]
            except IndexError:
                next_date = float('inf')
            try:
                while event[0] <= next_date:
                    emit(*event)
                    event = next()
            except StopIteration:
                if not event_list:
                    break
            else:
                insort(event_list, (-event[0], next, emit, event))

def main():
    parser = optparse.OptionParser()
163 164 165 166
    parser.add_option('-a', '--all', action="store_true",
        help='decode all packets')
    parser.add_option('-d', '--date', metavar='FORMAT',
        help='custom date format, according to strftime(3)')
167 168 169 170 171 172 173 174 175 176 177 178 179
    parser.add_option('-f', '--follow', action="store_true",
        help='output appended data as the file grows')
    parser.add_option('-F', '--flush', action="append", type="int",
        help='with -f, tell process PID to flush logs approximately N'
              ' seconds (see -s)', metavar='PID')
    parser.add_option('-s', '--sleep-interval', type="float", default=1,
        help='with -f, sleep for approximately N seconds (default 1.0)'
              ' between iterations', metavar='N')
    options, args = parser.parse_args()
    if options.sleep_interval <= 0:
        parser.error("sleep_interval must be positive")
    if not args:
        parser.error("no log specified")
180
    log_list = [Log(db_path, options.all, options.date) for db_path in args]
181 182 183 184 185 186 187 188 189 190 191 192 193
    if options.follow:
        try:
            pid_list = options.flush or ()
            while True:
                emit_many(log_list)
                for pid in pid_list:
                    os.kill(pid, signal.SIGRTMIN)
                time.sleep(options.sleep_interval)
        except KeyboardInterrupt:
            pass
    else:
        emit_many(log_list)

194
if __name__ == "__main__":
195
    main()