db.py 5.33 KB
Newer Older
1
import logging, sqlite3, socket, subprocess, xmlrpclib, time
2
from urllib import splittype, splithost, splitport
Guillaume Bury's avatar
Guillaume Bury committed
3
import utils
4

5

6
class PeerDB(object):
Guillaume Bury's avatar
Guillaume Bury committed
7

8
    # internal ip = temp arg/attribute
9
    def __init__(self, db_path, registry, key_path, prefix, db_size=200):
10
        self._prefix = prefix
11
        self._db_size = db_size
12
        self._key_path = key_path
13
        self._proxy = xmlrpclib.ServerProxy(registry)
14

15
        logging.info('Initialize cache ...')
Guillaume Bury's avatar
Guillaume Bury committed
16
        self._db = sqlite3.connect(db_path, isolation_level=None)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
        q = self._db.execute
        q("PRAGMA synchronous = OFF")
        q("PRAGMA journal_mode = MEMORY")
        q("""CREATE TABLE IF NOT EXISTS peer (
            prefix TEXT PRIMARY KEY,
            address TEXT NOT NULL)""")
        q("""CREATE TABLE IF NOT EXISTS config (
            name text primary key,
            value text)""")
        q('ATTACH DATABASE ":memory:" AS volatile')
        q("""CREATE TABLE volatile.stat (
            peer TEXT PRIMARY KEY REFERENCES peer(prefix) ON DELETE CASCADE,
            try INTEGER NOT NULL DEFAULT 0)""")
        q("CREATE INDEX volatile.stat_try ON stat(try)")
        q("INSERT INTO volatile.stat (peer) SELECT prefix FROM peer")
32
        q("PRAGMA foreign_keys = ON")
33
        try:
34
            a = q("SELECT value FROM config WHERE name='registry'").next()[0]
35
        except StopIteration:
36 37
            logging.info("Private IP of registry not in cache."
                         " Asking registry via its public IP ...")
Julien Muchembled's avatar
Julien Muchembled committed
38 39 40
            retry = 1
            while True:
                try:
41
                    a = self._proxy.getPrivateAddress()
Julien Muchembled's avatar
Julien Muchembled committed
42 43 44 45 46
                    break
                except socket.error, e:
                    logging.warning(e)
                    time.sleep(retry)
                    retry = min(60, retry * 2)
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
            q("INSERT INTO config VALUES ('registry',?)", (a,))
        self.registry_ip = utils.binFromIp(a)
        logging.info("Cache initialized. Registry IP is %s", a)

    def log(self):
        if logging.getLogger().isEnabledFor(5):
            logging.trace("Cache:")
            for prefix, address, _try in self._db.execute(
                    "SELECT peer.*, try FROM peer, volatile.stat"
                    " WHERE prefix=peer ORDER BY prefix"):
                logging.trace("- %s: %s%s", prefix, address,
                              ' (blacklisted)' if _try else '')

    def connecting(self, prefix, connecting):
        self._db.execute("UPDATE volatile.stat SET try=? WHERE peer=?",
                         (connecting, prefix))

    def resetConnecting(self):
        self._db.execute("UPDATE volatile.stat SET try=0")

    def getAddress(self, prefix):
        r = self._db.execute("SELECT address FROM peer, volatile.stat"
                             " WHERE prefix=? AND prefix=peer AND try=0",
                             (prefix,)).fetchone()
        return r and r[0]

73 74 75 76 77 78 79 80 81 82
    # Exclude our own address from results in case it is there, which may
    # happen if a node change its certificate without clearing the cache.
    # IOW, one should probably always put our own address there.
    _get_peer_sql = "SELECT %s FROM peer, volatile.stat" \
                    " WHERE prefix=peer AND prefix!=? AND try=?"
    def getPeerList(self, failed=0, __sql=_get_peer_sql % "prefix, address"
                                                        + " ORDER BY RANDOM()"):
        return self._db.execute(__sql, (self._prefix, failed))
    def getPeerCount(self, failed=0, __sql=_get_peer_sql % "COUNT(*)"):
        return self._db.execute(__sql, (self._prefix, failed)).next()[0]
83 84

    def getBootstrapPeer(self):
85
        logging.info('Getting Boot peer...')
86
        try:
87 88 89 90
            bootpeer = self._proxy.getBootstrapPeer(self._prefix).data
        except (socket.error, xmlrpclib.Fault), e:
            logging.warning('Failed to bootstrap (%s)', e)
        else:
91 92
            p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
                    stdin=subprocess.PIPE, stdout=subprocess.PIPE)
Guillaume Bury's avatar
Guillaume Bury committed
93
            bootpeer = p.communicate(bootpeer)[0].split()
94 95 96 97 98
            if bootpeer[0] != self._prefix:
                self.addPeer(*bootpeer)
                return bootpeer
            logging.warning('Buggy registry sent us our own address')

99
    def addPeer(self, prefix, address, set_preferred=False):
100 101 102 103 104
        logging.debug('Adding peer %s: %s', prefix, address)
        with self._db:
            q = self._db.execute
            try:
                (a,), = q("SELECT address FROM peer WHERE prefix=?", (prefix,))
105 106 107 108 109 110 111 112 113 114 115
                if set_preferred:
                    preferred = address.split(';')
                    address = a
                else:
                    preferred = a.split(';')
                def key(a):
                    try:
                        return preferred.index(a)
                    except ValueError:
                        return len(preferred)
                address = ';'.join(sorted(address.split(';'), key=key))
116 117 118 119
            except ValueError:
                q("DELETE FROM peer WHERE prefix IN (SELECT peer"
                  " FROM volatile.stat ORDER BY try, RANDOM() LIMIT ?,-1)",
                  (self._db_size,))
120 121
                a = None
            if a != address:
122 123
                q("INSERT OR REPLACE INTO peer VALUES (?,?)", (prefix, address))
            q("INSERT OR REPLACE INTO volatile.stat VALUES (?,0)", (prefix,))