neolog.py 6.5 KB
Newer Older
1 2
#!/usr/bin/env python
#
3
# neolog - read a NEO log
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2012  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20
import bz2, logging, optparse, os, signal, sqlite3, sys, time
21
from bisect import insort
22
from logging import getLevelName
23

24 25

class Log(object):
26

27 28 29
    _log_id = _packet_id = -1
    _protocol_date = None

30 31
    def __init__(self, db_path):
        self._default_name = os.path.splitext(os.path.basename(db_path))[0]
32 33
        self._db = sqlite3.connect(db_path)

34
    def __iter__(self):
35
        db =  self._db
36
        try:
37
            db.execute("BEGIN")
38
            yield
39 40 41 42 43 44 45 46 47 48 49
            nl = db.execute("SELECT * FROM log WHERE id>?",
                            (self._log_id,))
            np = db.execute("SELECT * FROM packet WHERE id>?",
                            (self._packet_id,))
            try:
                p = np.next()
                self._reload(p[1])
            except StopIteration:
                p = None
            for self._log_id, date, name, level, pathname, lineno, msg in nl:
                while p and p[1] < date:
50
                    yield self._packet(*p)
51
                    p = np.fetchone()
52
                yield date, name, getLevelName(level), msg.splitlines()
53
            if p:
54
                yield self._packet(*p)
55
                for p in np:
56
                    yield self._packet(*p)
57 58
        finally:
            db.rollback()
59

60 61
    def _reload(self, date):
        q = self._db.execute
62 63 64 65 66
        date, text = q("SELECT * FROM protocol WHERE date<=?"
                       " ORDER BY date DESC", (date,)).next()
        if self._protocol_date == date:
            return
        self._protocol_date = date
67
        g = {}
68
        exec bz2.decompress(text) in g
69 70
        for x in 'uuid_str', 'Packets', 'PacketMalformedError':
            setattr(self, x, g[x])
71
        try:
72
            self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
73 74 75 76 77 78 79 80 81 82 83 84 85
                                     (date,)).next()
        except StopIteration:
            self._next_protocol = float('inf')

    def _emit(self, date, name, levelname, msg_list):
        d = int(date)
        prefix = '%s.%04u %-9s %-10s ' % (
            time.strftime('%F %T', time.localtime(d)),
            int((date - d) * 10000), levelname,
            name or self._default_name)
        for msg in msg_list:
            print prefix + msg

86 87
    def _packet(self, id, date, name, msg_id, code, peer, body):
        self._packet_id = id
88 89 90 91
        if self._next_protocol <= date:
            self._reload(date)
        try:
            p = self.Packets[code]
92 93 94 95 96
        except KeyError:
            Packets[code] = p = type('UnknownPacket[%u]' % code, (object,), {})
        msg = ['#0x%04x %-30s %s' % (msg_id, p.__name__, peer)]
        if body is not None:
            try:
97
                logger = getattr(self, p.handler_method_name)
98 99 100 101 102 103 104 105
            except AttributeError:
                pass
            else:
                p = p()
                p._id = msg_id
                p._body = body
                try:
                    args = p.decode()
106
                except self.PacketMalformedError:
107 108 109
                    msg.append("Can't decode packet")
                else:
                    msg += logger(*args)
110
        return date, name, 'PACKET', msg
111

112
    def error(self, code, message):
113 114
        return "%s (%s)" % (code, message),

115
    def notifyNodeInformation(self, node_list):
116 117 118 119 120 121 122 123 124 125 126
        node_list.sort(key=lambda x: x[2])
        node_list = [(self.uuid_str(uuid), str(node_type),
                      '%s:%u' % address if address else '?', state)
                     for node_type, address, uuid, state in node_list]
        if node_list:
            t = ' ! %%%us | %%%us | %%%us | %%s' % (
                max(len(x[0]) for x in node_list),
                max(len(x[1]) for x in node_list),
                max(len(x[2]) for x in node_list))
            return map(t.__mod__, node_list)
        return ()
127 128


129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
def emit_many(log_list):
    log_list = [(log, iter(log).next) for log in log_list]
    for x in log_list: # try to start all transactions at the same time
        x[1]()
    event_list = []
    for log, next in log_list:
        try:
            event = next()
        except StopIteration:
            continue
        event_list.append((-event[0], next, log._emit, event))
    if event_list:
        event_list.sort()
        while True:
            key, next, emit, event = event_list.pop()
            try:
                next_date = - event_list[-1][0]
            except IndexError:
                next_date = float('inf')
            try:
                while event[0] <= next_date:
                    emit(*event)
                    event = next()
            except StopIteration:
                if not event_list:
                    break
            else:
                insort(event_list, (-event[0], next, emit, event))

def main():
    parser = optparse.OptionParser()
    parser.add_option('-f', '--follow', action="store_true",
        help='output appended data as the file grows')
    parser.add_option('-F', '--flush', action="append", type="int",
        help='with -f, tell process PID to flush logs approximately N'
              ' seconds (see -s)', metavar='PID')
    parser.add_option('-s', '--sleep-interval', type="float", default=1,
        help='with -f, sleep for approximately N seconds (default 1.0)'
              ' between iterations', metavar='N')
    options, args = parser.parse_args()
    if options.sleep_interval <= 0:
        parser.error("sleep_interval must be positive")
    if not args:
        parser.error("no log specified")
    log_list = map(Log, args)
    if options.follow:
        try:
            pid_list = options.flush or ()
            while True:
                emit_many(log_list)
                for pid in pid_list:
                    os.kill(pid, signal.SIGRTMIN)
                time.sleep(options.sleep_interval)
        except KeyboardInterrupt:
            pass
    else:
        emit_many(log_list)

187
if __name__ == "__main__":
188
    main()