neolog.py 8.76 KB
Newer Older
1 2
#!/usr/bin/env python
#
3
# neolog - read a NEO log
4
#
5
# Copyright (C) 2012-2015  Nexedi SA
6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20
import bz2, gzip, errno, optparse, os, signal, sqlite3, sys, time
21
from bisect import insort
22
from logging import getLevelName
23

24
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile)
25 26

class Log(object):
27

28 29 30
    _log_id = _packet_id = -1
    _protocol_date = None

31
    def __init__(self, db_path, decode_all=False, date_format=None,
32
                                filter_from=None, node_list=None):
33 34
        self._date_format = '%F %T' if date_format is None else date_format
        self._decode_all = decode_all
35
        self._filter_from = filter_from
36
        self._node_list = node_list
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
        name = os.path.basename(db_path)
        try:
            name, ext = name.rsplit(os.extsep, 1)
            ZipFile = comp_dict[ext]
        except (KeyError, ValueError):
            # WKRD: Python does not support URI so we can't open in read-only
            #       mode. See http://bugs.python.org/issue13773
            os.stat(db_path) # do not create empty DB if file is missing
            self._db = sqlite3.connect(db_path)
        else:
            import shutil, tempfile
            with tempfile.NamedTemporaryFile() as f:
                shutil.copyfileobj(ZipFile(db_path), f)
                self._db = sqlite3.connect(f.name)
            name = name.rsplit(os.extsep, 1)[0]
        self._default_name = name
53

54
    def __iter__(self):
55
        db =  self._db
56
        try:
57
            db.execute("BEGIN")
58
            yield
59 60 61 62 63 64 65 66 67
            nl = "SELECT * FROM log WHERE id>?"
            np = "SELECT * FROM packet WHERE id>?"
            date = self._filter_from
            if date:
                date = " AND date>=%f" % date
                nl += date
                np += date
            nl = db.execute(nl, (self._log_id,))
            np = db.execute(np, (self._packet_id,))
68 69 70 71 72 73 74
            try:
                p = np.next()
                self._reload(p[1])
            except StopIteration:
                p = None
            for self._log_id, date, name, level, pathname, lineno, msg in nl:
                while p and p[1] < date:
75
                    yield self._packet(*p)
76
                    p = np.fetchone()
77
                yield date, name, getLevelName(level), msg.splitlines()
78
            if p:
79
                yield self._packet(*p)
80
                for p in np:
81
                    yield self._packet(*p)
82 83
        finally:
            db.rollback()
84

85 86
    def _reload(self, date):
        q = self._db.execute
87 88 89 90 91
        date, text = q("SELECT * FROM protocol WHERE date<=?"
                       " ORDER BY date DESC", (date,)).next()
        if self._protocol_date == date:
            return
        self._protocol_date = date
92
        g = {}
93
        exec bz2.decompress(text) in g
94 95
        for x in 'uuid_str', 'Packets', 'PacketMalformedError':
            setattr(self, x, g[x])
96
        try:
97
            self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
98 99 100 101 102
                                     (date,)).next()
        except StopIteration:
            self._next_protocol = float('inf')

    def _emit(self, date, name, levelname, msg_list):
103 104 105 106
        if not name:
            name = self._default_name
        if self._node_list and name not in self._node_list:
            return
107 108 109 110 111
        prefix = self._date_format
        if prefix:
            d = int(date)
            prefix = '%s.%04u ' % (time.strftime(prefix, time.localtime(d)),
                                   int((date - d) * 10000))
112
        prefix += '%-9s %-10s ' % (levelname, name)
113 114 115
        for msg in msg_list:
            print prefix + msg

116 117
    def _packet(self, id, date, name, msg_id, code, peer, body):
        self._packet_id = id
118 119 120 121
        if self._next_protocol <= date:
            self._reload(date)
        try:
            p = self.Packets[code]
122 123 124 125
        except KeyError:
            Packets[code] = p = type('UnknownPacket[%u]' % code, (object,), {})
        msg = ['#0x%04x %-30s %s' % (msg_id, p.__name__, peer)]
        if body is not None:
126 127
            logger = getattr(self, p.handler_method_name, None)
            if logger or self._decode_all:
128 129 130 131 132
                p = p()
                p._id = msg_id
                p._body = body
                try:
                    args = p.decode()
133
                except self.PacketMalformedError:
134 135
                    msg.append("Can't decode packet")
                else:
136 137 138 139
                    if logger:
                        msg += logger(*args)
                    elif args:
                        msg = '%s \t| %r' % (msg[0], args),
140
        return date, name, 'PACKET', msg
141

142
    def error(self, code, message):
143 144
        return "%s (%s)" % (code, message),

145
    def notifyNodeInformation(self, node_list):
146 147 148 149 150 151 152 153 154 155 156
        node_list.sort(key=lambda x: x[2])
        node_list = [(self.uuid_str(uuid), str(node_type),
                      '%s:%u' % address if address else '?', state)
                     for node_type, address, uuid, state in node_list]
        if node_list:
            t = ' ! %%%us | %%%us | %%%us | %%s' % (
                max(len(x[0]) for x in node_list),
                max(len(x[1]) for x in node_list),
                max(len(x[2]) for x in node_list))
            return map(t.__mod__, node_list)
        return ()
157 158


159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
def emit_many(log_list):
    log_list = [(log, iter(log).next) for log in log_list]
    for x in log_list: # try to start all transactions at the same time
        x[1]()
    event_list = []
    for log, next in log_list:
        try:
            event = next()
        except StopIteration:
            continue
        event_list.append((-event[0], next, log._emit, event))
    if event_list:
        event_list.sort()
        while True:
            key, next, emit, event = event_list.pop()
            try:
                next_date = - event_list[-1][0]
            except IndexError:
                next_date = float('inf')
            try:
                while event[0] <= next_date:
                    emit(*event)
                    event = next()
182 183 184 185
            except IOError, e:
                if e.errno == errno.EPIPE:
                    sys.exit(1)
                raise
186 187 188 189 190 191 192 193
            except StopIteration:
                if not event_list:
                    break
            else:
                insort(event_list, (-event[0], next, emit, event))

def main():
    parser = optparse.OptionParser()
194 195 196 197
    parser.add_option('-a', '--all', action="store_true",
        help='decode all packets')
    parser.add_option('-d', '--date', metavar='FORMAT',
        help='custom date format, according to strftime(3)')
198 199 200 201 202
    parser.add_option('-f', '--follow', action="store_true",
        help='output appended data as the file grows')
    parser.add_option('-F', '--flush', action="append", type="int",
        help='with -f, tell process PID to flush logs approximately N'
              ' seconds (see -s)', metavar='PID')
203 204 205
    parser.add_option('-n', '--node', action="append",
        help='only show log entries from the given node'
             ' (only useful for logs produced by threaded tests)')
206 207 208
    parser.add_option('-s', '--sleep-interval', type="float", default=1,
        help='with -f, sleep for approximately N seconds (default 1.0)'
              ' between iterations', metavar='N')
209 210 211
    parser.add_option('--from', dest='filter_from', type="float",
        help='show records more recent that timestamp N if N > 0,'
             ' or now+N if N < 0', metavar='N')
212 213 214 215 216
    options, args = parser.parse_args()
    if options.sleep_interval <= 0:
        parser.error("sleep_interval must be positive")
    if not args:
        parser.error("no log specified")
217 218 219
    filter_from = options.filter_from
    if filter_from and filter_from < 0:
        filter_from += time.time()
220 221
    log_list = [Log(db_path, options.all, options.date, filter_from,
                    options.node)
222
                for db_path in args]
223 224 225 226 227 228 229 230 231 232 233 234 235
    if options.follow:
        try:
            pid_list = options.flush or ()
            while True:
                emit_many(log_list)
                for pid in pid_list:
                    os.kill(pid, signal.SIGRTMIN)
                time.sleep(options.sleep_interval)
        except KeyboardInterrupt:
            pass
    else:
        emit_many(log_list)

236
if __name__ == "__main__":
237
    main()