neolog.py 8.25 KB
Newer Older
1 2
#!/usr/bin/env python
#
3
# neolog - read a NEO log
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2012  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20
import bz2, gzip, optparse, os, signal, sqlite3, time
21
from bisect import insort
22
from logging import getLevelName
23

24
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile)
25 26

class Log(object):
27

28 29 30
    _log_id = _packet_id = -1
    _protocol_date = None

31 32
    def __init__(self, db_path, decode_all=False, date_format=None,
                                filter_from=None):
33 34
        self._date_format = '%F %T' if date_format is None else date_format
        self._decode_all = decode_all
35
        self._filter_from = filter_from
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
        name = os.path.basename(db_path)
        try:
            name, ext = name.rsplit(os.extsep, 1)
            ZipFile = comp_dict[ext]
        except (KeyError, ValueError):
            # WKRD: Python does not support URI so we can't open in read-only
            #       mode. See http://bugs.python.org/issue13773
            os.stat(db_path) # do not create empty DB if file is missing
            self._db = sqlite3.connect(db_path)
        else:
            import shutil, tempfile
            with tempfile.NamedTemporaryFile() as f:
                shutil.copyfileobj(ZipFile(db_path), f)
                self._db = sqlite3.connect(f.name)
            name = name.rsplit(os.extsep, 1)[0]
        self._default_name = name
52

53
    def __iter__(self):
54
        db =  self._db
55
        try:
56
            db.execute("BEGIN")
57
            yield
58 59 60 61 62 63 64 65 66
            nl = "SELECT * FROM log WHERE id>?"
            np = "SELECT * FROM packet WHERE id>?"
            date = self._filter_from
            if date:
                date = " AND date>=%f" % date
                nl += date
                np += date
            nl = db.execute(nl, (self._log_id,))
            np = db.execute(np, (self._packet_id,))
67 68 69 70 71 72 73
            try:
                p = np.next()
                self._reload(p[1])
            except StopIteration:
                p = None
            for self._log_id, date, name, level, pathname, lineno, msg in nl:
                while p and p[1] < date:
74
                    yield self._packet(*p)
75
                    p = np.fetchone()
76
                yield date, name, getLevelName(level), msg.splitlines()
77
            if p:
78
                yield self._packet(*p)
79
                for p in np:
80
                    yield self._packet(*p)
81 82
        finally:
            db.rollback()
83

84 85
    def _reload(self, date):
        q = self._db.execute
86 87 88 89 90
        date, text = q("SELECT * FROM protocol WHERE date<=?"
                       " ORDER BY date DESC", (date,)).next()
        if self._protocol_date == date:
            return
        self._protocol_date = date
91
        g = {}
92
        exec bz2.decompress(text) in g
93 94
        for x in 'uuid_str', 'Packets', 'PacketMalformedError':
            setattr(self, x, g[x])
95
        try:
96
            self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
97 98 99 100 101
                                     (date,)).next()
        except StopIteration:
            self._next_protocol = float('inf')

    def _emit(self, date, name, levelname, msg_list):
102 103 104 105 106 107
        prefix = self._date_format
        if prefix:
            d = int(date)
            prefix = '%s.%04u ' % (time.strftime(prefix, time.localtime(d)),
                                   int((date - d) * 10000))
        prefix += '%-9s %-10s ' % (levelname, name or self._default_name)
108 109 110
        for msg in msg_list:
            print prefix + msg

111 112
    def _packet(self, id, date, name, msg_id, code, peer, body):
        self._packet_id = id
113 114 115 116
        if self._next_protocol <= date:
            self._reload(date)
        try:
            p = self.Packets[code]
117 118 119 120
        except KeyError:
            Packets[code] = p = type('UnknownPacket[%u]' % code, (object,), {})
        msg = ['#0x%04x %-30s %s' % (msg_id, p.__name__, peer)]
        if body is not None:
121 122
            logger = getattr(self, p.handler_method_name, None)
            if logger or self._decode_all:
123 124 125 126 127
                p = p()
                p._id = msg_id
                p._body = body
                try:
                    args = p.decode()
128
                except self.PacketMalformedError:
129 130
                    msg.append("Can't decode packet")
                else:
131 132 133 134
                    if logger:
                        msg += logger(*args)
                    elif args:
                        msg = '%s \t| %r' % (msg[0], args),
135
        return date, name, 'PACKET', msg
136

137
    def error(self, code, message):
138 139
        return "%s (%s)" % (code, message),

140
    def notifyNodeInformation(self, node_list):
141 142 143 144 145 146 147 148 149 150 151
        node_list.sort(key=lambda x: x[2])
        node_list = [(self.uuid_str(uuid), str(node_type),
                      '%s:%u' % address if address else '?', state)
                     for node_type, address, uuid, state in node_list]
        if node_list:
            t = ' ! %%%us | %%%us | %%%us | %%s' % (
                max(len(x[0]) for x in node_list),
                max(len(x[1]) for x in node_list),
                max(len(x[2]) for x in node_list))
            return map(t.__mod__, node_list)
        return ()
152 153


154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
def emit_many(log_list):
    log_list = [(log, iter(log).next) for log in log_list]
    for x in log_list: # try to start all transactions at the same time
        x[1]()
    event_list = []
    for log, next in log_list:
        try:
            event = next()
        except StopIteration:
            continue
        event_list.append((-event[0], next, log._emit, event))
    if event_list:
        event_list.sort()
        while True:
            key, next, emit, event = event_list.pop()
            try:
                next_date = - event_list[-1][0]
            except IndexError:
                next_date = float('inf')
            try:
                while event[0] <= next_date:
                    emit(*event)
                    event = next()
            except StopIteration:
                if not event_list:
                    break
            else:
                insort(event_list, (-event[0], next, emit, event))

def main():
    parser = optparse.OptionParser()
185 186 187 188
    parser.add_option('-a', '--all', action="store_true",
        help='decode all packets')
    parser.add_option('-d', '--date', metavar='FORMAT',
        help='custom date format, according to strftime(3)')
189 190 191 192 193 194 195 196
    parser.add_option('-f', '--follow', action="store_true",
        help='output appended data as the file grows')
    parser.add_option('-F', '--flush', action="append", type="int",
        help='with -f, tell process PID to flush logs approximately N'
              ' seconds (see -s)', metavar='PID')
    parser.add_option('-s', '--sleep-interval', type="float", default=1,
        help='with -f, sleep for approximately N seconds (default 1.0)'
              ' between iterations', metavar='N')
197 198 199
    parser.add_option('--from', dest='filter_from', type="float",
        help='show records more recent that timestamp N if N > 0,'
             ' or now+N if N < 0', metavar='N')
200 201 202 203 204
    options, args = parser.parse_args()
    if options.sleep_interval <= 0:
        parser.error("sleep_interval must be positive")
    if not args:
        parser.error("no log specified")
205 206 207 208 209
    filter_from = options.filter_from
    if filter_from and filter_from < 0:
        filter_from += time.time()
    log_list = [Log(db_path, options.all, options.date, filter_from)
                for db_path in args]
210 211 212 213 214 215 216 217 218 219 220 221 222
    if options.follow:
        try:
            pid_list = options.flush or ()
            while True:
                emit_many(log_list)
                for pid in pid_list:
                    os.kill(pid, signal.SIGRTMIN)
                time.sleep(options.sleep_interval)
        except KeyboardInterrupt:
            pass
    else:
        emit_many(log_list)

223
if __name__ == "__main__":
224
    main()