neolog.py 10.8 KB
Newer Older
1 2
#!/usr/bin/env python
#
3
# neolog - read a NEO log
4
#
5
# Copyright (C) 2012-2017  Nexedi SA
6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20
import bz2, gzip, errno, optparse, os, signal, sqlite3, sys, time
21
from bisect import insort
22
from logging import getLevelName
23
from zlib import decompress
24

25
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile, xz='xzcat')
26 27

class Log(object):
28

29 30 31
    _log_id = _packet_id = -1
    _protocol_date = None

32
    def __init__(self, db_path, decode=0, date_format=None,
33
                       filter_from=None, node_column=True, node_list=None):
34
        self._date_format = '%F %T' if date_format is None else date_format
35
        self._decode = decode
36
        self._filter_from = filter_from
37
        self._node_column = node_column
38
        self._node_list = node_list
39 40 41 42 43 44 45 46 47 48
        name = os.path.basename(db_path)
        try:
            name, ext = name.rsplit(os.extsep, 1)
            ZipFile = comp_dict[ext]
        except (KeyError, ValueError):
            # WKRD: Python does not support URI so we can't open in read-only
            #       mode. See http://bugs.python.org/issue13773
            os.stat(db_path) # do not create empty DB if file is missing
            self._db = sqlite3.connect(db_path)
        else:
49
            import shutil, subprocess, tempfile
50
            with tempfile.NamedTemporaryFile() as f:
51 52 53 54
                if type(ZipFile) is str:
                    subprocess.check_call((ZipFile, db_path), stdout=f)
                else:
                    shutil.copyfileobj(ZipFile(db_path), f)
55 56 57
                self._db = sqlite3.connect(f.name)
            name = name.rsplit(os.extsep, 1)[0]
        self._default_name = name
58

59
    def __iter__(self):
60
        db =  self._db
61
        try:
62
            db.execute("BEGIN")
63
            yield
64 65 66 67 68 69 70 71 72
            nl = "SELECT * FROM log WHERE id>?"
            np = "SELECT * FROM packet WHERE id>?"
            date = self._filter_from
            if date:
                date = " AND date>=%f" % date
                nl += date
                np += date
            nl = db.execute(nl, (self._log_id,))
            np = db.execute(np, (self._packet_id,))
73 74 75 76 77 78 79
            try:
                p = np.next()
                self._reload(p[1])
            except StopIteration:
                p = None
            for self._log_id, date, name, level, pathname, lineno, msg in nl:
                while p and p[1] < date:
80
                    yield self._packet(*p)
81
                    p = np.fetchone()
82
                yield date, name, getLevelName(level), msg.splitlines()
83
            if p:
84
                yield self._packet(*p)
85
                for p in np:
86
                    yield self._packet(*p)
87 88
        finally:
            db.rollback()
89

90 91
    def _reload(self, date):
        q = self._db.execute
92 93 94 95 96
        date, text = q("SELECT * FROM protocol WHERE date<=?"
                       " ORDER BY date DESC", (date,)).next()
        if self._protocol_date == date:
            return
        self._protocol_date = date
97
        g = {}
98
        exec bz2.decompress(text) in g
99 100
        for x in 'uuid_str', 'Packets', 'PacketMalformedError':
            setattr(self, x, g[x])
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
        x = {}
        if self._decode > 1:
            PStruct = g['PStruct']
            PBoolean = g['PBoolean']
            def hasData(item):
                items = item._items
                for i, item in enumerate(items):
                    if isinstance(item, PStruct):
                        j = hasData(item)
                        if j:
                            return (i,) + j
                    elif (isinstance(item, PBoolean)
                          and item._name == 'compression'
                          and i + 2 < len(items)
                          and items[i+2]._name == 'data'):
                        return i,
            for p in self.Packets.itervalues():
                if p._fmt is not None:
                    path = hasData(p._fmt)
                    if path:
                        assert not hasattr(p, '_neolog'), p
                        x[p._code] = path
        self._getDataPath = x.get

125
        try:
126
            self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
127 128 129 130 131
                                     (date,)).next()
        except StopIteration:
            self._next_protocol = float('inf')

    def _emit(self, date, name, levelname, msg_list):
132 133 134 135
        if not name:
            name = self._default_name
        if self._node_list and name not in self._node_list:
            return
136 137 138 139 140
        prefix = self._date_format
        if prefix:
            d = int(date)
            prefix = '%s.%04u ' % (time.strftime(prefix, time.localtime(d)),
                                   int((date - d) * 10000))
141 142
        prefix += ('%-9s %-10s ' % (levelname, name) if self._node_column else
                   '%-9s ' % levelname)
143 144 145
        for msg in msg_list:
            print prefix + msg

146 147
    def _packet(self, id, date, name, msg_id, code, peer, body):
        self._packet_id = id
148 149 150 151
        if self._next_protocol <= date:
            self._reload(date)
        try:
            p = self.Packets[code]
152
            msg = p.__name__
153
        except KeyError:
154 155 156
            msg = 'UnknownPacket[%u]' % code
            body = None
        msg = ['#0x%04x %-30s %s' % (msg_id, msg, peer)]
157
        if body is not None:
158
            log = getattr(p, '_neolog', None)
159
            if log or self._decode:
160 161 162 163 164
                p = p()
                p._id = msg_id
                p._body = body
                try:
                    args = p.decode()
165
                except self.PacketMalformedError:
166 167
                    msg.append("Can't decode packet")
                else:
168 169 170
                    if log:
                        args, extra = log(*args)
                        msg += extra
171 172 173 174 175
                    else:
                        path = self._getDataPath(code)
                        if path:
                            args = self._decompress(args, path)
                    if args and self._decode:
176
                        msg[0] += ' \t| ' + repr(args)
177
        return date, name, 'PACKET', msg
178

179 180 181 182 183 184 185 186 187 188 189 190 191 192
    def _decompress(self, args, path):
        if args:
            args = list(args)
            i = path[0]
            path = path[1:]
            if path:
                args[i] = self._decompress(args[i], path)
            else:
                data = args[i+2]
                if args[i]:
                    data = decompress(data)
                args[i:i+3] = (len(data), data),
            return tuple(args)

193

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
def emit_many(log_list):
    log_list = [(log, iter(log).next) for log in log_list]
    for x in log_list: # try to start all transactions at the same time
        x[1]()
    event_list = []
    for log, next in log_list:
        try:
            event = next()
        except StopIteration:
            continue
        event_list.append((-event[0], next, log._emit, event))
    if event_list:
        event_list.sort()
        while True:
            key, next, emit, event = event_list.pop()
            try:
                next_date = - event_list[-1][0]
            except IndexError:
                next_date = float('inf')
            try:
                while event[0] <= next_date:
                    emit(*event)
                    event = next()
217 218 219 220
            except IOError, e:
                if e.errno == errno.EPIPE:
                    sys.exit(1)
                raise
221 222 223 224 225 226 227 228
            except StopIteration:
                if not event_list:
                    break
            else:
                insort(event_list, (-event[0], next, emit, event))

def main():
    parser = optparse.OptionParser()
229
    parser.add_option('-a', '--all', action="store_true",
230 231 232
        help='decode body of packets')
    parser.add_option('-A', '--decompress', action="store_true",
        help='decompress data when decode body of packets (implies --all)')
233 234
    parser.add_option('-d', '--date', metavar='FORMAT',
        help='custom date format, according to strftime(3)')
235 236 237 238 239
    parser.add_option('-f', '--follow', action="store_true",
        help='output appended data as the file grows')
    parser.add_option('-F', '--flush', action="append", type="int",
        help='with -f, tell process PID to flush logs approximately N'
              ' seconds (see -s)', metavar='PID')
240 241
    parser.add_option('-n', '--node', action="append",
        help='only show log entries from the given node'
242 243
             ' (only useful for logs produced by threaded tests),'
             " special value '-' hides the column")
244 245 246
    parser.add_option('-s', '--sleep-interval', type="float", default=1,
        help='with -f, sleep for approximately N seconds (default 1.0)'
              ' between iterations', metavar='N')
247
    parser.add_option('--from', dest='filter_from',
248
        help='show records more recent that timestamp N if N > 0,'
249 250
             ' or now+N if N < 0; N can also be a string that is'
             ' parseable by dateutil ', metavar='N')
251 252 253 254 255
    options, args = parser.parse_args()
    if options.sleep_interval <= 0:
        parser.error("sleep_interval must be positive")
    if not args:
        parser.error("no log specified")
256
    filter_from = options.filter_from
257 258 259 260 261 262 263 264 265 266 267 268 269
    if filter_from:
        try:
            filter_from = float(options.filter_from)
        except ValueError:
            from dateutil.parser import parse
            x = parse(filter_from)
            if x.tzinfo:
                filter_from = (x - x.fromtimestamp(0, x.tzinfo)).total_seconds()
            else:
                filter_from = time.mktime(x.timetuple()) + x.microsecond * 1e-6
        else:
            if filter_from < 0:
                filter_from += time.time()
270 271 272 273 274 275
    node_list = options.node or []
    try:
        node_list.remove('-')
        node_column = False
    except ValueError:
        node_column = True
276 277 278
    log_list = [Log(db_path,
                    2 if options.decompress else 1 if options.all else 0,
                    options.date, filter_from, node_column, node_list)
279
                for db_path in args]
280 281 282 283 284 285 286 287 288 289 290 291 292
    if options.follow:
        try:
            pid_list = options.flush or ()
            while True:
                emit_many(log_list)
                for pid in pid_list:
                    os.kill(pid, signal.SIGRTMIN)
                time.sleep(options.sleep_interval)
        except KeyboardInterrupt:
            pass
    else:
        emit_many(log_list)

293
if __name__ == "__main__":
294
    main()