Commit c4c04f4b authored by Julien Muchembled's avatar Julien Muchembled

Speed up bootstrap and reconnection

The way peer addresses were exchanged polluted caches with information about
dead nodes. In particular, bootstrapping often took a long time because the
cache of the primary node was mostly useless.

This also fixes bootstrap of registry.
parent d3ff0e56
...@@ -5,12 +5,6 @@ ...@@ -5,12 +5,6 @@
- the ip address of the network being built - the ip address of the network being built
- the creator of the network ( add option in registry ? ) - the creator of the network ( add option in registry ? )
- Fix bootstrap problem:
registry & --private option ( see re6stnet man page HOW TO ).
one have to start the registry twice, the first time without
the --private option
- Babel limitations: - Babel limitations:
- The metric does not take latency into account. - The metric does not take latency into account.
......
ca ca.crt ca ca.crt
key registry/ca.key key registry/ca.key
private 2001:db8:42::1
logfile registry/registry.log logfile registry/registry.log
...@@ -117,25 +117,20 @@ certificates, as follows: translate the significant part to hexadecimal ...@@ -117,25 +117,20 @@ certificates, as follows: translate the significant part to hexadecimal
-days 365 -out ca.crt -days 365 -out ca.crt
The CA email will be used as sender for mails containing tokens. The CA email will be used as sender for mails containing tokens.
The registry can now be started::
Now start the registry in order to setup the main re6st node, which should be
on the same machine::
re6st-registry --ca ca.crt --key ca.key --mailhost smtp.example.com re6st-registry --ca ca.crt --key ca.key --mailhost smtp.example.com
re6st-conf --registry http://localhost/
If `re6st-conf` is run in the directory containing CA files, ca.crt will be
overridden without harm.
Note that the registry was started without specifying the re6st IP of the main Like the registry, the first registered node should be always up because its
node, because it was not known yet. For your network to work, it has to be presence is used by all other nodes to garantee they are connected to the
restarted with appropriate --private option. network. It is therefore recommended to run it on the same machine as the
registry::
Let's suppose your first node is allocated subnet 2001:db8:42::/64. re6st-conf --registry http://localhost/
Its IP is the first unicast address::
re6st-registry --private 2001:db8:42::1 ... If `re6st-conf` is run in the directory containing CA files, ca.crt will be
re6stnet --registry http://localhost/ --ip re6st.example.com ... overridden without harm. See previous section for more information to create
a node.
TROUBLESHOOTING TROUBLESHOOTING
=============== ===============
......
...@@ -75,9 +75,6 @@ def main(): ...@@ -75,9 +75,6 @@ def main():
help="SMTP host to send confirmation emails. For debugging" help="SMTP host to send confirmation emails. For debugging"
" purpose, it can also be an absolute or existing path to" " purpose, it can also be an absolute or existing path to"
" a mailbox file") " a mailbox file")
_('--private',
help="re6stnet IP of the node on which runs the registry."
" Required for normal operation.")
_('--prefix-length', default=16, type=int, _('--prefix-length', default=16, type=int,
help="Default length of allocated prefixes.") help="Default length of allocated prefixes.")
_('--anonymous-prefix-length', type=int, _('--anonymous-prefix-length', type=int,
......
...@@ -31,29 +31,24 @@ class PeerDB(object): ...@@ -31,29 +31,24 @@ class PeerDB(object):
q("INSERT INTO volatile.stat (peer) SELECT prefix FROM peer") q("INSERT INTO volatile.stat (peer) SELECT prefix FROM peer")
try: try:
a = q("SELECT value FROM config WHERE name='registry'").next()[0] a = q("SELECT value FROM config WHERE name='registry'").next()[0]
except StopIteration: int(a, 2)
a = self._updateRegistryIP() except (StopIteration, ValueError):
else:
self.registry_ip = utils.binFromIp(a)
if not self.registry_ip.startswith(network):
a = self._updateRegistryIP()
logging.info("Cache initialized. Registry IP is %s", a)
def _updateRegistryIP(self):
logging.info("Asking registry its private IP...") logging.info("Asking registry its private IP...")
retry = 1 retry = 1
while True: while True:
try: try:
a = self._registry.getPrivateAddress(self._prefix) a = self._registry.getPrefix(self._prefix)
int(a, 2)
break break
except socket.error, e: except socket.error, e:
logging.warning(e) logging.warning(e)
time.sleep(retry) time.sleep(retry)
retry = min(60, retry * 2) retry = min(60, retry * 2)
self._db.execute("INSERT OR REPLACE INTO config VALUES ('registry',?)", q("INSERT OR REPLACE INTO config VALUES ('registry',?)", (a,))
(a,)) self._db.commit()
self.registry_ip = utils.binFromIp(a) self.registry_prefix = a
return a logging.info("Cache initialized. Prefix of registry node is %s/%u",
int(a, 2), len(a))
def log(self): def log(self):
if logging.getLogger().isEnabledFor(5): if logging.getLogger().isEnabledFor(5):
...@@ -64,6 +59,19 @@ class PeerDB(object): ...@@ -64,6 +59,19 @@ class PeerDB(object):
logging.trace("- %s: %s%s", prefix, address, logging.trace("- %s: %s%s", prefix, address,
' (blacklisted)' if _try else '') ' (blacklisted)' if _try else '')
def cacheMinimize(self, size):
with self._db:
self._cacheMinimize(size)
def _cacheMinimize(self, size):
a = self._db.execute(
"SELECT peer FROM volatile.stat ORDER BY try, RANDOM() LIMIT ?,-1",
(size,)).fetchall()
if a:
q = self._db.executemany
q("DELETE FROM peer WHERE prefix IN (?)", a)
q("DELETE FROM volatile.stat WHERE peer IN (?)", a)
def connecting(self, prefix, connecting): def connecting(self, prefix, connecting):
self._db.execute("UPDATE volatile.stat SET try=? WHERE peer=?", self._db.execute("UPDATE volatile.stat SET try=? WHERE peer=?",
(connecting, prefix)) (connecting, prefix))
...@@ -119,13 +127,8 @@ class PeerDB(object): ...@@ -119,13 +127,8 @@ class PeerDB(object):
return len(preferred) return len(preferred)
address = ';'.join(sorted(address.split(';'), key=key)) address = ';'.join(sorted(address.split(';'), key=key))
except ValueError: except ValueError:
a = q("SELECT peer FROM volatile.stat ORDER BY try, RANDOM()" self._cacheMinimize(self._db_size)
" LIMIT ?,-1", (self._db_size,)).fetchall() a = None
if a:
qq = self._db.executemany
qq("DELETE FROM peer WHERE prefix IN (?)", a)
qq("DELETE FROM volatile.stat WHERE peer IN (?)", a)
# 'a != address' will evaluate to True because types differs
if a != address: if a != address:
q("INSERT OR REPLACE INTO peer VALUES (?,?)", (prefix, address)) q("INSERT OR REPLACE INTO peer VALUES (?,?)", (prefix, address))
q("INSERT OR REPLACE INTO volatile.stat VALUES (?,0)", (prefix,)) q("INSERT OR REPLACE INTO volatile.stat VALUES (?,0)", (prefix,))
...@@ -30,24 +30,27 @@ class getcallargs(type): ...@@ -30,24 +30,27 @@ class getcallargs(type):
class RegistryServer(object): class RegistryServer(object):
__metaclass__ = getcallargs __metaclass__ = getcallargs
_peers = 0, ()
def __init__(self, config): def __init__(self, config):
self.config = config self.config = config
self.cert_duration = 365 * 86400 self.cert_duration = 365 * 86400
self.lock = threading.Lock() self.lock = threading.Lock()
self.sessions = {} self.sessions = {}
if self.config.private:
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
else:
logging.warning('You have declared no private address'
', either this is the first start, or you should'
'check you configuration')
# Database initializing # Database initializing
utils.makedirs(os.path.dirname(self.config.db)) utils.makedirs(os.path.dirname(self.config.db))
self.db = sqlite3.connect(self.config.db, isolation_level=None, self.db = sqlite3.connect(self.config.db, isolation_level=None,
check_same_thread=False) check_same_thread=False)
self.db.execute("""CREATE TABLE IF NOT EXISTS config (
name text primary key,
value text)""")
try:
(self.prefix,), = self.db.execute(
"SELECT value FROM config WHERE name='prefix'")
except ValueError:
self.prefix = None
self.db.execute("""CREATE TABLE IF NOT EXISTS token ( self.db.execute("""CREATE TABLE IF NOT EXISTS token (
token text primary key not null, token text primary key not null,
email text not null, email text not null,
...@@ -157,7 +160,7 @@ class RegistryServer(object): ...@@ -157,7 +160,7 @@ class RegistryServer(object):
s.sendmail(self._email, email, msg.as_string()) s.sendmail(self._email, email, msg.as_string())
s.quit() s.quit()
def _getPrefix(self, prefix_len): def _newPrefix(self, prefix_len):
max_len = 128 - len(self.network) max_len = 128 - len(self.network)
assert 0 < prefix_len <= max_len assert 0 < prefix_len <= max_len
try: try:
...@@ -173,7 +176,7 @@ class RegistryServer(object): ...@@ -173,7 +176,7 @@ class RegistryServer(object):
if len(prefix) < max_len or '1' in prefix: if len(prefix) < max_len or '1' in prefix:
return prefix return prefix
self.db.execute("UPDATE cert SET cert = 'reserved' WHERE prefix = ?", (prefix,)) self.db.execute("UPDATE cert SET cert = 'reserved' WHERE prefix = ?", (prefix,))
return self._getPrefix(prefix_len) return self._newPrefix(prefix_len)
def requestCertificate(self, token, req): def requestCertificate(self, token, req):
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, req) req = crypto.load_certificate_request(crypto.FILETYPE_PEM, req)
...@@ -193,9 +196,13 @@ class RegistryServer(object): ...@@ -193,9 +196,13 @@ class RegistryServer(object):
if not prefix_len: if not prefix_len:
return return
email = None email = None
prefix = self._getPrefix(prefix_len) prefix = self._newPrefix(prefix_len)
self.db.execute("UPDATE cert SET email = ? WHERE prefix = ?", self.db.execute("UPDATE cert SET email = ? WHERE prefix = ?",
(email, prefix)) (email, prefix))
if self.prefix is None:
self.prefix = prefix
self.db.execute(
"INSERT INTO config VALUES ('prefix',?)", (prefix,))
return self._createCertificate(prefix, req.get_subject(), return self._createCertificate(prefix, req.get_subject(),
req.get_pubkey()) req.get_pubkey())
...@@ -227,37 +234,51 @@ class RegistryServer(object): ...@@ -227,37 +234,51 @@ class RegistryServer(object):
def getCa(self): def getCa(self):
return crypto.dump_certificate(crypto.FILETYPE_PEM, self.ca) return crypto.dump_certificate(crypto.FILETYPE_PEM, self.ca)
def getPrefix(self, cn):
return self.prefix
def getPrivateAddress(self, cn): def getPrivateAddress(self, cn):
return self.config.private # BBB: Deprecated by getPrefix.
return utils.ipFromBin(self.network + self.prefix)
def getBootstrapPeer(self, cn): def getBootstrapPeer(self, cn):
with self.lock: with self.lock:
cert = self._getCert(cn) cert = self._getCert(cn)
address = self.config.private, tunnel.PORT age, peers = self._peers
if time.time() < age or not peers:
peers = [x[1] for x in utils.iterRoutes(self.network)]
random.shuffle(peers)
self._peers = time.time() + 60, peers
peer = peers.pop()
if peer == cn:
# Very unlikely (e.g. peer restarted with empty cache),
# so don't bother looping over above code
# (in case 'peers' is empty).
peer = self.prefix
address = utils.ipFromBin(self.network + peer), tunnel.PORT
self.sock.sendto('\2', address) self.sock.sendto('\2', address)
peer = None start = time.time()
while select.select([self.sock], [], [], peer is None)[0]: timeout = 1
# Loop because there may be answers from previous requests.
while select.select([self.sock], [], [], timeout)[0]:
msg = self.sock.recv(1<<16) msg = self.sock.recv(1<<16)
if msg[0] == '\1': if msg[0] == '\1':
try: try:
peer = msg[1:].split('\n')[-2] msg = msg[1:msg.index('\n')]
except IndexError: except ValueError:
peer = '' continue
if peer is None: if msg.split()[0] == peer:
raise EnvironmentError("Timeout while querying [%s]:%u" % address) break
if not peer or peer.split()[0] == cn: timeout = max(0, time.time() - start)
raise LookupError("No bootstrap peer found") else:
logging.info("Sending bootstrap peer: %s", peer) raise EnvironmentError("Timeout while querying [%s]:%u"
return utils.encrypt(cert, peer) % address)
logging.info("Sending bootstrap peer: %s", msg)
return utils.encrypt(cert, msg)
def topology(self): def topology(self):
with self.lock: with self.lock:
is_registry = utils.binFromIp(self.config.private peers = deque(('%u/%u' % (int(self.prefix, 2), len(self.prefix)),))
)[len(self.network):].startswith
peers = deque('%u/%u' % (int(x, 2), len(x))
for x, in self.db.execute("SELECT prefix FROM cert")
if is_registry(x))
assert len(peers) == 1
cookie = hex(random.randint(0, 1<<32))[2:] cookie = hex(random.randint(0, 1<<32))[2:]
graph = dict.fromkeys(peers) graph = dict.fromkeys(peers)
asked = 0 asked = 0
......
import logging, random, socket, subprocess, time import logging, random, socket, subprocess, time
from collections import deque from collections import deque
from itertools import chain
from . import plib, utils from . import plib, utils
PORT = 326 PORT = 326
RTF_CACHE = 0x01000000 # cache entry
# Be careful the refresh interval should let the routes be established # Be careful the refresh interval should let the routes be established
...@@ -278,8 +276,6 @@ class TunnelManager(object): ...@@ -278,8 +276,6 @@ class TunnelManager(object):
count -= self._makeTunnel(peer, address) count -= self._makeTunnel(peer, address)
else: else:
ip = utils.ipFromBin(self._network + peer) ip = utils.ipFromBin(self._network + peer)
# TODO: Send at least 1 address. This helps the registry
# node filling its cache when building a new network.
try: try:
self.sock.sendto('\2', (ip, PORT)) self.sock.sendto('\2', (ip, PORT))
except socket.error, e: except socket.error, e:
...@@ -317,23 +313,9 @@ class TunnelManager(object): ...@@ -317,23 +313,9 @@ class TunnelManager(object):
del self._distant_peers[:] del self._distant_peers[:]
for conn in self._connection_dict.itervalues(): for conn in self._connection_dict.itervalues():
conn.routes = 0 conn.routes = 0
a = len(self._network)
b = a + len(self._prefix)
other = [] other = []
with open('/proc/net/ipv6_route') as f: for iface, prefix in utils.iterRoutes(self._network, self._prefix):
self._last_routing_table = f.read() assert iface != 'lo', (iface, prefix)
for line in self._last_routing_table.splitlines():
line = line.split()
iface = line[-1]
if iface == 'lo' or int(line[-2], 16) & RTF_CACHE:
continue
ip = bin(int(line[0], 16))[2:].rjust(128, '0')
if ip[:a] != self._network or ip[a:b] == self._prefix:
continue
prefix_len = int(line[1], 16)
prefix = ip[a:prefix_len]
logging.trace('Route on iface %s detected to %s/%u',
iface, utils.ipFromBin(ip), prefix_len)
nexthop = self._iface_to_prefix.get(iface) nexthop = self._iface_to_prefix.get(iface)
if nexthop: if nexthop:
self._connection_dict[nexthop].routes += 1 self._connection_dict[nexthop].routes += 1
...@@ -343,10 +325,9 @@ class TunnelManager(object): ...@@ -343,10 +325,9 @@ class TunnelManager(object):
other.append(prefix) other.append(prefix)
else: else:
self._distant_peers.append(prefix) self._distant_peers.append(prefix)
is_registry = self._peer_db.registry_ip[a:].startswith registry = self._peer_db.registry_prefix
if is_registry(self._prefix) or any(is_registry(peer) if registry == self._prefix or any(registry in x for x in (
for peer in chain(self._distant_peers, other, self._distant_peers, other, self._served, self._connection_dict)):
self._served, self._connection_dict)):
self._disconnected = None self._disconnected = None
# XXX: When there is no new peer to connect when looking at routes # XXX: When there is no new peer to connect when looking at routes
# coming from tunnels, we'd like to consider those discovered # coming from tunnels, we'd like to consider those discovered
...@@ -410,37 +391,34 @@ class TunnelManager(object): ...@@ -410,37 +391,34 @@ class TunnelManager(object):
return return
code = ord(msg[0]) code = ord(msg[0])
if code == 1: # answer if code == 1: # answer
# We parse the message in a way to discard a truncated line. # Old versions may send additional and obsolete addresses.
for peer in msg[1:].split('\n')[:-1]: # Ignore them, as well as truncated lines.
try: try:
prefix, address = peer.split() prefix, address = msg[1:msg.index('\n')].split()
int(prefix, 2) int(prefix, 2)
except ValueError: except ValueError:
break pass
else:
if prefix != self._prefix: if prefix != self._prefix:
self._peer_db.addPeer(prefix, address) self._peer_db.addPeer(prefix, address)
try: try:
self._connecting.remove(prefix) self._connecting.remove(prefix)
except KeyError: except KeyError:
continue pass
else:
self._makeTunnel(prefix, address) self._makeTunnel(prefix, address)
elif code == 2: # request elif code == 2: # request
encode = '%s %s\n'.__mod__
if self._address: if self._address:
msg = [encode((self._prefix, self._address))] msg = '\1%s %s\n' % (self._prefix, self._address)
else: # I don't know my IP yet!
msg = []
# Add an extra random peer, mainly for the registry.
if random.randint(0, self._peer_db.getPeerCount()):
msg.append(encode(self._peer_db.getPeerList().next()))
if msg:
try: try:
self.sock.sendto('\1' + ''.join(msg), address[:2]) self.sock.sendto(msg, address[:2])
except socket.error, e: except socket.error, e:
logging.info('Failed to reply to %s (%s)', address, e) logging.info('Failed to reply to %s (%s)', address, e)
#else: # I don't know my IP yet!
elif code == 255: elif code == 255:
# the registry wants to know the topology for debugging purpose # the registry wants to know the topology for debugging purpose
if utils.binFromIp(address[0]) == self._peer_db.registry_ip: if utils.binFromIp(address[0])[len(self._network):].startswith(
self._peer_db.registry_prefix):
msg = ['\xfe%s%u/%u\n%u\n' % (msg[1:], msg = ['\xfe%s%u/%u\n%u\n' % (msg[1:],
int(self._prefix, 2), len(self._prefix), int(self._prefix, 2), len(self._prefix),
len(self._connection_dict))] len(self._connection_dict))]
......
...@@ -155,6 +155,33 @@ def binFromSubnet(subnet): ...@@ -155,6 +155,33 @@ def binFromSubnet(subnet):
p, l = subnet.split('/') p, l = subnet.split('/')
return bin(int(p))[2:].rjust(int(l), '0') return bin(int(p))[2:].rjust(int(l), '0')
if 1:
def _iterRoutes():
with open('/proc/net/ipv6_route') as f:
routing_table = f.read()
for line in routing_table.splitlines():
line = line.split()
iface = line[-1]
if 0 < int(line[5], 16) < 1 << 31: # positive metric
yield (iface, bin(int(line[0], 16))[2:].rjust(128, '0'),
int(line[1], 16))
_iterRoutes.__doc__ = """Iterates over all routes
Amongst all returned routes starting with re6st prefix:
- one is the local one with our prefix
- any route with null prefix will be ignored
- other are reachable routes installed by babeld
"""
def iterRoutes(network, exclude_prefix=None):
a = len(network)
for iface, ip, prefix_len in _iterRoutes():
if ip[:a] == network:
prefix = ip[a:prefix_len]
if prefix and prefix != exclude_prefix:
yield iface, prefix
def decrypt(key_path, data): def decrypt(key_path, data):
p = subprocess.Popen( p = subprocess.Popen(
('openssl', 'rsautl', '-decrypt', '-inkey', key_path), ('openssl', 'rsautl', '-decrypt', '-inkey', key_path),
......
...@@ -278,6 +278,7 @@ def main(): ...@@ -278,6 +278,7 @@ def main():
r_pipe, write_pipe = os.pipe() r_pipe, write_pipe = os.pipe()
read_pipe = os.fdopen(r_pipe) read_pipe = os.fdopen(r_pipe)
peer_db = db.PeerDB(db_path, registry, config.key, network, prefix) peer_db = db.PeerDB(db_path, registry, config.key, network, prefix)
cleanup.append(lambda: peer_db.cacheMinimize(config.client_count))
tunnel_manager = tunnel.TunnelManager(write_pipe, peer_db, tunnel_manager = tunnel.TunnelManager(write_pipe, peer_db,
config.openvpn_args, timeout, config.tunnel_refresh, config.openvpn_args, timeout, config.tunnel_refresh,
config.client_count, config.iface_list, network, prefix, config.client_count, config.iface_list, network, prefix,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment