db.py 8.07 KB
Newer Older
1
import logging, sqlite3, socket, subprocess, xmlrpclib, time
Guillaume Bury's avatar
Guillaume Bury committed
2
import utils
3

4

Guillaume Bury's avatar
Guillaume Bury committed
5
class PeerManager:
Guillaume Bury's avatar
Guillaume Bury committed
6

7
    # internal ip = temp arg/attribute
Guillaume Bury's avatar
Guillaume Bury committed
8
    def __init__(self, db_path, registry, key_path, refresh_time, address,
9
                       internal_ip, prefix, manual, pp, db_size):
10
        self._refresh_time = refresh_time
11
        self.address = address
12
        self._internal_ip = internal_ip
13
        self._prefix = prefix
14
        self._db_size = db_size
15 16
        self._registry = registry
        self._key_path = key_path
17
        self._pp = pp
18
        self._manual = manual
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
19
        self.tunnel_manager = None
20

Guillaume Bury's avatar
Guillaume Bury committed
21
        logging.info('Connecting to peers database...')
Guillaume Bury's avatar
Guillaume Bury committed
22
        self._db = sqlite3.connect(db_path, isolation_level=None)
Guillaume Bury's avatar
Guillaume Bury committed
23
        logging.debug('Database opened')
24

Guillaume Bury's avatar
Guillaume Bury committed
25
        logging.info('Preparing peers database...')
26 27 28 29 30 31
        self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
                            prefix TEXT PRIMARY KEY,
                            address TEXT NOT NULL,
                            used INTEGER NOT NULL DEFAULT 0,
                            date INTEGER DEFAULT (strftime('%s', 'now')))""")
        self._db.execute("UPDATE peers SET used = 0")
32 33
        self._db.execute("""CREATE INDEX IF NOT EXISTS
                          _peers_used ON peers(used)""")
34 35 36
        self._db.execute("""CREATE TABLE IF NOT EXISTS config (
                            name text primary key,
                            value text)""")
37 38 39 40 41 42 43
        self._db.execute('ATTACH DATABASE ":memory:" AS blacklist')
        self._db.execute("""CREATE TABLE blacklist.flag (
                            prefix TEXT PRIMARY KEY,
                            flag INTEGER NOT NULL)""")
        self._db.execute("""CREATE INDEX blacklist.blacklist_flag
                            ON flag(flag)""")
        self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
44
        try:
45
            a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
46 47 48
        except StopIteration:
            proxy = xmlrpclib.ServerProxy(registry)
            a = proxy.getPrivateAddress()
49
            self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
50
        self._proxy = xmlrpclib.ServerProxy(a)
Guillaume Bury's avatar
Guillaume Bury committed
51
        logging.debug('Database prepared')
52

53
        self.next_refresh = time.time()
54

55
    def clear_blacklist(self, flag):
Guillaume Bury's avatar
Guillaume Bury committed
56
        logging.info('Clearing blacklist from flag %u' % flag)
57
        self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?",
58
                          (flag,))
Guillaume Bury's avatar
Guillaume Bury committed
59
        logging.info('Blacklist cleared')
Guillaume Bury's avatar
Guillaume Bury committed
60

61
    def blacklist(self, prefix, flag):
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
62
        logging.info('Blacklisting %s' % prefix)
63
        self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
64
        self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)",
65
                          (prefix, flag))
Guillaume Bury's avatar
Guillaume Bury committed
66
        logging.debug('%s blacklisted' % prefix)
67 68

    def whitelist(self, prefix):
Guillaume Bury's avatar
Guillaume Bury committed
69
        logging.info('Unblacklisting %s' % prefix)
70
        self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,))
Guillaume Bury's avatar
Guillaume Bury committed
71
        logging.debug('%s whitelisted' % prefix)
72

73
    def refresh(self):
Guillaume Bury's avatar
Guillaume Bury committed
74
        logging.info('Refreshing the peers DB...')
Guillaume Bury's avatar
Guillaume Bury committed
75 76 77
        try:
            self._declare()
            self.next_refresh = time.time() + self._refresh_time
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
78 79 80 81
        except socket.error, e:
            logging.info('Connection to server failed, re-bootstraping')
        try:
            self._bootstrap()
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
82
            self.next_refresh = time.time() + self._refresh_time
Guillaume Bury's avatar
Guillaume Bury committed
83
        except socket.error, e:
Guillaume Bury's avatar
Guillaume Bury committed
84 85
            logging.debug('socket.error : %s' % e)
            logging.info('Connection to server failed, retrying in 30s')
Guillaume Bury's avatar
Guillaume Bury committed
86
            self.next_refresh = time.time() + 30
87

88
    def _declare(self):
89
        if self.address != None:
Guillaume Bury's avatar
Guillaume Bury committed
90
            logging.info('Sending connection info to server...')
91
            self._proxy.declare(utils.address_str(self.address))
Guillaume Bury's avatar
Guillaume Bury committed
92
            logging.debug('Info sent')
93
        else:
Guillaume Bury's avatar
Guillaume Bury committed
94
            logging.warning("Warning : couldn't send ip, unknown external config")
95

96
    def getUnusedPeers(self, peer_count):
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
97
        for populate in self._bootstrap, bool:
98 99 100
            peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
                                            <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
                                         (peer_count,)).fetchall()
101
            if peer_list:
102
                return peer_list
103 104 105
            populate()
        logging.warning('Cannot find any new peers')
        return []
106 107

    def _bootstrap(self):
Guillaume Bury's avatar
Guillaume Bury committed
108
        logging.info('Getting Boot peer...')
109 110 111
        proxy = xmlrpclib.ServerProxy(self._registry)
        try:
            bootpeer = proxy.getBootstrapPeer(self._prefix).data
Guillaume Bury's avatar
Guillaume Bury committed
112
            logging.debug('Boot peer received from server')
113 114
            p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
                    stdin=subprocess.PIPE, stdout=subprocess.PIPE)
Guillaume Bury's avatar
Guillaume Bury committed
115
            bootpeer = p.communicate(bootpeer)[0].split()
116
            return self._addPeer(bootpeer)
117 118 119
        except socket.error:
            pass
        except sqlite3.IntegrityError, e:
Guillaume Bury's avatar
Guillaume Bury committed
120
            if e.args[0] != 'column prefix is not unique':
121
                raise
122 123
        except Exception, e:
            logging.info('Unable to bootstrap : %s' % e)
124
        return False
125

126
    def usePeer(self, prefix):
Guillaume Bury's avatar
Guillaume Bury committed
127 128
        logging.trace('Updating peers database : using peer %s' % prefix)
        self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
129
                (prefix,))
Guillaume Bury's avatar
Guillaume Bury committed
130
        logging.debug('DB updated')
131

132
    def unusePeer(self, prefix):
Guillaume Bury's avatar
Guillaume Bury committed
133 134
        logging.trace('Updating peers database : unusing peer %s' % prefix)
        self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
135
                (prefix,))
Guillaume Bury's avatar
Guillaume Bury committed
136
        logging.debug('DB updated')
Guillaume Bury's avatar
Guillaume Bury committed
137

138
    def flagPeer(self, prefix):
Guillaume Bury's avatar
Guillaume Bury committed
139
        logging.trace('Updating peers database : flagging peer %s' % prefix)
140 141
        self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
                (prefix,))
Guillaume Bury's avatar
Guillaume Bury committed
142
        logging.debug('DB updated')
143

Guillaume Bury's avatar
Guillaume Bury committed
144
    def handle_message(self, msg):
Guillaume Bury's avatar
Guillaume Bury committed
145 146
        script_type, arg = msg.split()
        if script_type == 'client-connect':
Julien Muchembled's avatar
typo  
Julien Muchembled committed
147
            logging.info('Incoming connection from %s' % (arg,))
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
148
            prefix = utils.binFromSubnet(arg)
Julien Muchembled's avatar
typo  
Julien Muchembled committed
149
            if self.tunnel_manager.checkIncomingTunnel(prefix):
150
                self.blacklist(prefix, 2)
Guillaume Bury's avatar
Guillaume Bury committed
151
        elif script_type == 'client-disconnect':
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
152
            self.whitelist(utils.binFromSubnet(arg))
Guillaume Bury's avatar
Guillaume Bury committed
153
            logging.info('%s has disconnected' % (arg,))
Guillaume Bury's avatar
Guillaume Bury committed
154
        elif script_type == 'route-up':
155
            if not self._manual:
156
                external_ip = arg
157
                new_address = list([external_ip, port, proto]
Guillaume Bury's avatar
Guillaume Bury committed
158
                                   for port, proto, _ in self._pp)
159 160
                if self.address != new_address:
                    self.address = new_address
Guillaume Bury's avatar
Guillaume Bury committed
161 162
                    logging.info('Received new external ip : %s'
                              % (external_ip,))
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
163 164 165 166 167 168
                    try:
                        self._declare()
                    except socket.error, e:
                        logging.debug('socket.error : %s' % e)
                        logging.info('''Connection to server failed while
                            declaring external infos''')
Guillaume Bury's avatar
Guillaume Bury committed
169
        else:
Guillaume Bury's avatar
Guillaume Bury committed
170 171
            logging.debug('Unknow message recieved from the openvpn pipe : %s'
                    % msg)
172

173
    def readSocket(self, msg):
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
174
        peer = msg.replace('\n', '').split(' ')
175 176 177 178 179 180 181 182 183 184
        if len(peer) != 2:
            logging.debug('Invalid package recieved : %s' % msg)
            return
        self._addPeer(peer)

    def _addPeer(self, peer):
        logging.debug('Adding peer %s' % peer)
        if int(self._db.execute("""SELECT COUNT(*) FROM blacklist.flag WHERE prefix = ?""", (peer[0],)).next()[0]) > 0:
            logging.info('Peer is blacklisted')
            return False
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
185 186
        self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
            RANDOM() LIMIT MAX(0, (SELECT COUNT(*) FROM peers
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
187
            WHERE used <= 0) - ?)""", (str(self._db_size),))
188
        self._db.execute("INSERT OR IGNORE INTO peers (prefix, address) VALUES (?,?)", peer)
189 190
        logging.debug('Peer added')
        return True