neolog.py 7.79 KB
Newer Older
1 2
#!/usr/bin/env python
#
3
# neolog - read a NEO log
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2012  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20
import bz2, optparse, os, signal, sqlite3, time
21
from bisect import insort
22
from logging import getLevelName
23

24 25

class Log(object):
26

27 28 29
    _log_id = _packet_id = -1
    _protocol_date = None

30 31
    def __init__(self, db_path, decode_all=False, date_format=None,
                                filter_from=None):
32 33
        self._date_format = '%F %T' if date_format is None else date_format
        self._decode_all = decode_all
34
        self._default_name = os.path.splitext(os.path.basename(db_path))[0]
35 36 37
        # WKRD: Python does not support URI so we can't open in read-only mode
        #       See http://bugs.python.org/issue13773
        os.stat(db_path) # do not create empty DB if file is missing
38
        self._db = sqlite3.connect(db_path)
39
        self._filter_from = filter_from
40

41
    def __iter__(self):
42
        db =  self._db
43
        try:
44
            db.execute("BEGIN")
45
            yield
46 47 48 49 50 51 52 53 54
            nl = "SELECT * FROM log WHERE id>?"
            np = "SELECT * FROM packet WHERE id>?"
            date = self._filter_from
            if date:
                date = " AND date>=%f" % date
                nl += date
                np += date
            nl = db.execute(nl, (self._log_id,))
            np = db.execute(np, (self._packet_id,))
55 56 57 58 59 60 61
            try:
                p = np.next()
                self._reload(p[1])
            except StopIteration:
                p = None
            for self._log_id, date, name, level, pathname, lineno, msg in nl:
                while p and p[1] < date:
62
                    yield self._packet(*p)
63
                    p = np.fetchone()
64
                yield date, name, getLevelName(level), msg.splitlines()
65
            if p:
66
                yield self._packet(*p)
67
                for p in np:
68
                    yield self._packet(*p)
69 70
        finally:
            db.rollback()
71

72 73
    def _reload(self, date):
        q = self._db.execute
74 75 76 77 78
        date, text = q("SELECT * FROM protocol WHERE date<=?"
                       " ORDER BY date DESC", (date,)).next()
        if self._protocol_date == date:
            return
        self._protocol_date = date
79
        g = {}
80
        exec bz2.decompress(text) in g
81 82
        for x in 'uuid_str', 'Packets', 'PacketMalformedError':
            setattr(self, x, g[x])
83
        try:
84
            self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
85 86 87 88 89
                                     (date,)).next()
        except StopIteration:
            self._next_protocol = float('inf')

    def _emit(self, date, name, levelname, msg_list):
90 91 92 93 94 95
        prefix = self._date_format
        if prefix:
            d = int(date)
            prefix = '%s.%04u ' % (time.strftime(prefix, time.localtime(d)),
                                   int((date - d) * 10000))
        prefix += '%-9s %-10s ' % (levelname, name or self._default_name)
96 97 98
        for msg in msg_list:
            print prefix + msg

99 100
    def _packet(self, id, date, name, msg_id, code, peer, body):
        self._packet_id = id
101 102 103 104
        if self._next_protocol <= date:
            self._reload(date)
        try:
            p = self.Packets[code]
105 106 107 108
        except KeyError:
            Packets[code] = p = type('UnknownPacket[%u]' % code, (object,), {})
        msg = ['#0x%04x %-30s %s' % (msg_id, p.__name__, peer)]
        if body is not None:
109 110
            logger = getattr(self, p.handler_method_name, None)
            if logger or self._decode_all:
111 112 113 114 115
                p = p()
                p._id = msg_id
                p._body = body
                try:
                    args = p.decode()
116
                except self.PacketMalformedError:
117 118
                    msg.append("Can't decode packet")
                else:
119 120 121 122
                    if logger:
                        msg += logger(*args)
                    elif args:
                        msg = '%s \t| %r' % (msg[0], args),
123
        return date, name, 'PACKET', msg
124

125
    def error(self, code, message):
126 127
        return "%s (%s)" % (code, message),

128
    def notifyNodeInformation(self, node_list):
129 130 131 132 133 134 135 136 137 138 139
        node_list.sort(key=lambda x: x[2])
        node_list = [(self.uuid_str(uuid), str(node_type),
                      '%s:%u' % address if address else '?', state)
                     for node_type, address, uuid, state in node_list]
        if node_list:
            t = ' ! %%%us | %%%us | %%%us | %%s' % (
                max(len(x[0]) for x in node_list),
                max(len(x[1]) for x in node_list),
                max(len(x[2]) for x in node_list))
            return map(t.__mod__, node_list)
        return ()
140 141


142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
def emit_many(log_list):
    log_list = [(log, iter(log).next) for log in log_list]
    for x in log_list: # try to start all transactions at the same time
        x[1]()
    event_list = []
    for log, next in log_list:
        try:
            event = next()
        except StopIteration:
            continue
        event_list.append((-event[0], next, log._emit, event))
    if event_list:
        event_list.sort()
        while True:
            key, next, emit, event = event_list.pop()
            try:
                next_date = - event_list[-1][0]
            except IndexError:
                next_date = float('inf')
            try:
                while event[0] <= next_date:
                    emit(*event)
                    event = next()
            except StopIteration:
                if not event_list:
                    break
            else:
                insort(event_list, (-event[0], next, emit, event))

def main():
    parser = optparse.OptionParser()
173 174 175 176
    parser.add_option('-a', '--all', action="store_true",
        help='decode all packets')
    parser.add_option('-d', '--date', metavar='FORMAT',
        help='custom date format, according to strftime(3)')
177 178 179 180 181 182 183 184
    parser.add_option('-f', '--follow', action="store_true",
        help='output appended data as the file grows')
    parser.add_option('-F', '--flush', action="append", type="int",
        help='with -f, tell process PID to flush logs approximately N'
              ' seconds (see -s)', metavar='PID')
    parser.add_option('-s', '--sleep-interval', type="float", default=1,
        help='with -f, sleep for approximately N seconds (default 1.0)'
              ' between iterations', metavar='N')
185 186 187
    parser.add_option('--from', dest='filter_from', type="float",
        help='show records more recent that timestamp N if N > 0,'
             ' or now+N if N < 0', metavar='N')
188 189 190 191 192
    options, args = parser.parse_args()
    if options.sleep_interval <= 0:
        parser.error("sleep_interval must be positive")
    if not args:
        parser.error("no log specified")
193 194 195 196 197
    filter_from = options.filter_from
    if filter_from and filter_from < 0:
        filter_from += time.time()
    log_list = [Log(db_path, options.all, options.date, filter_from)
                for db_path in args]
198 199 200 201 202 203 204 205 206 207 208 209 210
    if options.follow:
        try:
            pid_list = options.flush or ()
            while True:
                emit_many(log_list)
                for pid in pid_list:
                    os.kill(pid, signal.SIGRTMIN)
                time.sleep(options.sleep_interval)
        except KeyboardInterrupt:
            pass
    else:
        emit_many(log_list)

211
if __name__ == "__main__":
212
    main()