Commit 2f49dae1 authored by Julien Muchembled's avatar Julien Muchembled

Use new control socket of babeld to get routes

parent f9991e58
...@@ -161,7 +161,8 @@ if 1: ...@@ -161,7 +161,8 @@ if 1:
" -set_serial 0x120010db80042 -days %u" % CA_DAYS, shell=True) " -set_serial 0x120010db80042 -days %u" % CA_DAYS, shell=True)
db_path = 'registry/registry.db' db_path = 'registry/registry.db'
registry.screen('../re6st-registry @registry/re6st-registry.conf --db %s' registry.screen('../re6st-registry @registry/re6st-registry.conf --db %s'
' --mailhost %s -v%u' % (db_path, os.path.abspath('mbox'), VERBOSE)) ' --mailhost %s -v%u --control-socket registry/babeld.socket'
% (db_path, os.path.abspath('mbox'), VERBOSE))
registry_url = 'http://%s/' % REGISTRY registry_url = 'http://%s/' % REGISTRY
registry.Popen(('python', '-c', """if 1: registry.Popen(('python', '-c', """if 1:
import socket, time import socket, time
...@@ -194,8 +195,9 @@ if 1: ...@@ -194,8 +195,9 @@ if 1:
p.communicate(str(token[0])) p.communicate(str(token[0]))
os.remove(dh_path) os.remove(dh_path)
os.remove(folder + '/ca.crt') os.remove(folder + '/ca.crt')
node.screen('../re6stnet @%s/re6stnet.conf -v%u --registry %s %s' node.screen('../re6stnet @%s/re6stnet.conf -v%u --registry %s'
% (folder, VERBOSE, registry, args)) ' --control-socket %s/babeld.socket'
' %s' % (folder, VERBOSE, registry, folder, args))
re6stnet(registry, 'registry', '--ip ' + REGISTRY, registry='http://localhost/') re6stnet(registry, 'registry', '--ip ' + REGISTRY, registry='http://localhost/')
re6stnet(machine1, 'm1', '-I%s' % m1_if_0.name) re6stnet(machine1, 'm1', '-I%s' % m1_if_0.name)
re6stnet(machine2, 'm2', '--remote-gateway 10.1.1.1', prefix_len=80) re6stnet(machine2, 'm2', '--remote-gateway 10.1.1.1', prefix_len=80)
......
...@@ -3,7 +3,7 @@ import httplib, logging, socket ...@@ -3,7 +3,7 @@ import httplib, logging, socket
from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import BaseHTTPRequestHandler
from SocketServer import ThreadingTCPServer from SocketServer import ThreadingTCPServer
from urlparse import parse_qsl from urlparse import parse_qsl
from re6st import registry, utils from re6st import ctl, registry, utils
# To generate server ca and key with serial for 2001:db8:42::/48 # To generate server ca and key with serial for 2001:db8:42::/48
# openssl req -nodes -new -x509 -key ca.key -set_serial 0x120010db80042 -days 3650 -out ca.crt # openssl req -nodes -new -x509 -key ca.key -set_serial 0x120010db80042 -days 3650 -out ca.crt
...@@ -80,6 +80,9 @@ def main(): ...@@ -80,6 +80,9 @@ def main():
_('--anonymous-prefix-length', type=int, _('--anonymous-prefix-length', type=int,
help="Length of allocated anonymous prefixes." help="Length of allocated anonymous prefixes."
" If 0 or unset, registration by email is required") " If 0 or unset, registration by email is required")
_('--control-socket', metavar='CTL_SOCK', default=ctl.SOCK_PATH,
help="Socket path to use for communication between re6stnet and babeld"
" (option -R of Babel).")
_('-l', '--logfile', default='/var/log/re6stnet/registry.log', _('-l', '--logfile', default='/var/log/re6stnet/registry.log',
help="Path to logging file.") help="Path to logging file.")
_('-v', '--verbose', default=1, type=int, _('-v', '--verbose', default=1, type=int,
......
import logging, socket, struct
from collections import namedtuple
from . import utils
SOCK_PATH = '/var/run/re6st-babeld.sock'
uint16 = struct.Struct("!H")
header = struct.Struct("!HI")
class Struct(object):
def __init__(self, format, *args):
if args:
t = namedtuple(*args)
if isinstance(format, str):
s = struct.Struct("!" + format)
def encode(buffer, value):
buffer += s.pack(*value)
def decode(buffer, offset=0):
return offset + s.size, t(*s.unpack_from(buffer, offset))
else:
def encode(buffer, value):
for f, value in zip(format, value):
f.encode(buffer, value)
def decode(buffer, offset=0):
r = []
for f in format:
offset, x = f.decode(buffer, offset)
r.append(x)
return offset, t(*r)
self.encode = encode
self.decode = decode
class Array(object):
def __init__(self, item):
self._item = item
def encode(self, buffer, value):
buffer += uint16.pack(len(value))
encode = self._item.encode
for value in value:
encode(buffer, value)
def decode(self, buffer, offset=0):
r = []
o = offset + 2
decode = self._item.decode
for i in xrange(*uint16.unpack_from(buffer, offset)):
o, x = decode(buffer, o)
r.append(x)
return o, r
class String(object):
@staticmethod
def encode(buffer, value):
buffer += value + "\0"
@staticmethod
def decode(buffer, offset=0):
i = buffer.index("\0", offset)
return i + 1, buffer[offset:i]
class Buffer(object):
def __init__(self):
self._buf = bytearray()
self._r = self._w = 0
def __iadd__(self, value):
self._buf += value
return self
def __len__(self):
return len(self._buf)
def _seek(self, r):
n = len(self._buf)
if r < n:
self._r = r
else:
self._w -= n
del self._buf[:]
self._r = 0
# reading
@property
def ready(self):
return self._w <= len(self._buf)
def want(self, n):
self._w = self._r + n
def unpack_from(self, struct):
r = self._r
value = struct.unpack_from(self._buf, r)
self._seek(r + struct.size)
return value
def decode(self, decode):
r, value = decode(self._buf, self._r)
self._seek(r)
return value
try: # BBB: Python < 2.7.4 (http://bugs.python.org/issue10212)
uint16.unpack_from(bytearray(uint16.size))
except TypeError:
def unpack_from(self, struct):
r = self._r
x = r + struct.size
value = struct.unpack(buffer(self._buf)[r:x])
self._seek(x)
return value
def decode(self, decode):
r = self._r
size, value = decode(buffer(self._buf)[r:])
self._seek(r + size)
return value
# writing
def send(self, socket, *args):
r = self._r
self._seek(r + socket.send(self._buf[r:], *args))
def pack_into(self, struct, offset, *args):
struct.pack_into(self._buf, offset, *args)
class Packet(object):
response_dict = {}
def __new__(cls, id, request, response=None):
if response:
cls.response_dict[id] = response.decode
if request:
def packet(*args):
self = object.__new__(cls)
self.id = id
self.args = args
self.request = request
return self
return packet
def write(self, buffer):
logging.trace('send %s%r', self.__class__.__name__,
(self.id,) + self.args)
offset = len(buffer)
buffer += '\0' * header.size
r = self.request
if isinstance(r, Struct):
r.encode(buffer, self.args)
else:
r.encode(buffer, *self.args)
buffer.pack_into(header, offset, self.id,
len(buffer) - header.size - offset)
Dump = Packet(1,
Struct("B"),
Struct((
Array(Struct((Struct("I", "index", "index"), String), "interface", "index name")),
Array(Struct("16sIHHHHHiHH", "neighbour", "address ifindex reach rxcost txcost rtt rttcost channel if_up")),
Array(Struct("16sBH", "xroute", "prefix plen metric")),
Array(Struct("16sBHHH8siiI16s16sB", "route", "prefix plen metric smoothed_metric refmetric id seqno age ifindex neigh_address nexthop flags")),
), "dump", "interfaces neighbours xroutes routes"))
class Babel(object):
_decode = None
def __init__(self, socket_path, handler, network):
self.network = network
self.write_buffer = Buffer()
self.read_buffer = Buffer()
self.read_buffer.want(header.size)
self.handler = handler
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def select(*args):
try:
s.connect(socket_path)
except socket.error:
return
s.send("\1")
s.setblocking(0)
del self.request_dump, self.select
self.socket = s
return self.select(*args)
self.select = select
self.request_dump = lambda: self.handle_dump((), (), (), ())
def send(self, packet):
packet.write(self.write_buffer)
def select(self, r, w, t):
s = self.socket
r[s] = self._read
if self.write_buffer:
w[s] = self._write
def _read(self):
d = self.socket.recv(65536)
if not d:
raise RuntimeError("connection to babeld closed")
b = self.read_buffer
b += d
while b.ready:
if self._decode:
packet = b.decode(self._decode)
self._decode = None
b.want(header.size)
name = packet.__class__.__name__
logging.trace('recv %r', packet)
try:
h = getattr(self, "handle_" + name)
except AttributeError:
h = getattr(self.handler, "babel_" + name)
h(*packet)
else:
packet_type, size = b.unpack_from(header)
self._decode = Packet.response_dict[packet_type]
b.want(size)
def _write(self):
self.write_buffer.send(self.socket)
def request_dump(self):
self.send(Dump(11)) # interfaces + neighbours + installed routes
def handle_dump(self, interfaces, neighbours, xroutes, routes):
# neighbours = {neigh_prefix: (neighbour, {dst_prefix: route})}
n = dict(((n.address, n.ifindex), (n, {})) for n in neighbours)
unidentified = set(n)
self.neighbours = neighbours = {}
a = len(self.network)
for route in routes:
assert route.flags & 1, route # installed
assert route.neigh_address == route.nexthop, route
address = route.neigh_address, route.ifindex
neigh_routes = n[address]
ip = utils.binFromRawIp(route.prefix)
if ip[:a] == self.network:
prefix = ip[a:route.plen]
if prefix and not route.refmetric:
neighbours[prefix] = neigh_routes
unidentified.remove(address)
else:
prefix = None
neigh_routes[1][prefix] = route
if unidentified:
routes = {}
for address in unidentified:
routes.update(n[address][1])
if routes:
neighbours[None] = None, routes
logging.trace("Routes via unidentified neighbours. %r",
neighbours)
self.interfaces = dict((i.index, name) for i, name in interfaces)
self.handler.babel_dump()
...@@ -62,7 +62,7 @@ def client(iface, address_list, encrypt, *args, **kw): ...@@ -62,7 +62,7 @@ def client(iface, address_list, encrypt, *args, **kw):
def router(subnet, hello_interval, table, log_path, state_path, pidfile, def router(subnet, hello_interval, table, log_path, state_path, pidfile,
tunnel_interfaces, *args, **kw): tunnel_interfaces, control_socket, *args, **kw):
s = utils.ipFromBin(subnet) s = utils.ipFromBin(subnet)
n = len(subnet) n = len(subnet)
cmd = ['babeld', cmd = ['babeld',
...@@ -80,6 +80,8 @@ def router(subnet, hello_interval, table, log_path, state_path, pidfile, ...@@ -80,6 +80,8 @@ def router(subnet, hello_interval, table, log_path, state_path, pidfile,
cmd += '-t%u' % table, '-T%u' % table cmd += '-t%u' % table, '-T%u' % table
else: else:
cmd[-2:-2] = '-C', 'redistribute ip ::/0 eq 0' cmd[-2:-2] = '-C', 'redistribute ip ::/0 eq 0'
if control_socket:
cmd += '-R', '%s' % control_socket
for iface in tunnel_interfaces: for iface in tunnel_interfaces:
cmd += '-C', 'interface %s legacy-rxcost 5120' % iface cmd += '-C', 'interface %s legacy-rxcost 5120' % iface
cmd += args cmd += args
......
...@@ -18,15 +18,16 @@ Authenticated communication: ...@@ -18,15 +18,16 @@ Authenticated communication:
- the last one that was really used by the client (!hello) - the last one that was really used by the client (!hello)
- the one of the last handshake (hello) - the one of the last handshake (hello)
""" """
import base64, hmac, hashlib, httplib, inspect, logging, mailbox, os, random import base64, hmac, hashlib, httplib, inspect, logging
import select, smtplib, socket, sqlite3, string, struct, sys, threading, time import mailbox, os, random, select, smtplib, socket, sqlite3
import string, struct, sys, threading, time, weakref
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from email.mime.text import MIMEText from email.mime.text import MIMEText
from OpenSSL import crypto from OpenSSL import crypto
from urllib import splittype, splithost, splitport, urlencode from urllib import splittype, splithost, splitport, urlencode
from . import tunnel, utils from . import ctl, tunnel, utils
HMAC_HEADER = "Re6stHMAC" HMAC_HEADER = "Re6stHMAC"
RENEW_PERIOD = 30 * 86400 RENEW_PERIOD = 30 * 86400
...@@ -88,11 +89,21 @@ class RegistryServer(object): ...@@ -88,11 +89,21 @@ class RegistryServer(object):
logging.info("Network: %s/%u", utils.ipFromBin(self.network), logging.info("Network: %s/%u", utils.ipFromBin(self.network),
len(self.network)) len(self.network))
self.email = self.ca.get_subject().emailAddress self.email = self.ca.get_subject().emailAddress
self.peers_lock = threading.Lock()
l = threading.Lock()
l.acquire()
self.wait_dump = l.acquire
self.babel_dump = l.release
self.ctl = ctl.Babel(config.control_socket,
weakref.proxy(self), self.network)
self.onTimeout() self.onTimeout()
def select(self, r, w, t): def select(self, r, w, t):
if self.timeout: if self.timeout:
t.append((self.timeout, self.onTimeout)) t.append((self.timeout, self.onTimeout))
self.ctl.select(r, w, t)
def onTimeout(self): def onTimeout(self):
# XXX: Because we use threads to process requests, the statements # XXX: Because we use threads to process requests, the statements
...@@ -306,10 +317,16 @@ class RegistryServer(object): ...@@ -306,10 +317,16 @@ class RegistryServer(object):
@rpc @rpc
def getBootstrapPeer(self, cn): def getBootstrapPeer(self, cn):
with self.lock: with self.peers_lock:
age, peers = self.peers age, peers = self.peers
if age < time.time() or not peers: if age < time.time() or not peers:
peers = [x[1] for x in utils.iterRoutes(self.network)] self.ctl.request_dump()
self.wait_dump()
peers = [prefix
for neigh_routes in self.ctl.neighbours.itervalues()
for prefix in neigh_routes[1]
if prefix]
peers.append(self.prefix)
random.shuffle(peers) random.shuffle(peers)
self.peers = time.time() + 60, peers self.peers = time.time() + 60, peers
peer = peers.pop() peer = peers.pop()
...@@ -318,6 +335,7 @@ class RegistryServer(object): ...@@ -318,6 +335,7 @@ class RegistryServer(object):
# so don't bother looping over above code # so don't bother looping over above code
# (in case 'peers' is empty). # (in case 'peers' is empty).
peer = self.prefix peer = self.prefix
with self.lock:
address = utils.ipFromBin(self.network + peer), tunnel.PORT address = utils.ipFromBin(self.network + peer), tunnel.PORT
self.sock.sendto('\2', address) self.sock.sendto('\2', address)
start = time.time() start = time.time()
......
import logging, os, random, socket, subprocess, time import logging, os, random, socket, subprocess, time, weakref
from collections import defaultdict, deque from collections import defaultdict, deque
from . import plib, utils, version from . import ctl, plib, utils, version
PORT = 326 PORT = 326
...@@ -39,7 +39,7 @@ class MultiGatewayManager(dict): ...@@ -39,7 +39,7 @@ class MultiGatewayManager(dict):
class Connection(object): class Connection(object):
_retry = routes = 0 _retry = 0
time = float('inf') time = float('inf')
def __init__(self, tunnel_manager, address_list, iface, prefix): def __init__(self, tunnel_manager, address_list, iface, prefix):
...@@ -104,10 +104,11 @@ class Connection(object): ...@@ -104,10 +104,11 @@ class Connection(object):
class TunnelManager(object): class TunnelManager(object):
def __init__(self, peer_db, openvpn_args, timeout, def __init__(self, control_socket, peer_db, openvpn_args, timeout,
refresh, client_count, iface_list, network, prefix, refresh, client_count, iface_list, network, prefix,
address, ip_changed, encrypt, remote_gateway, disable_proto, address, ip_changed, encrypt, remote_gateway, disable_proto,
neighbour_list=()): neighbour_list=()):
self.ctl = ctl.Babel(control_socket, weakref.proxy(self), network)
self.encrypt = encrypt self.encrypt = encrypt
self.ovpn_args = openvpn_args self.ovpn_args = openvpn_args
self.peer_db = peer_db self.peer_db = peer_db
...@@ -117,7 +118,7 @@ class TunnelManager(object): ...@@ -117,7 +118,7 @@ class TunnelManager(object):
self._read_pipe = os.fdopen(r) self._read_pipe = os.fdopen(r)
self._connecting = set() self._connecting = set()
self._connection_dict = {} self._connection_dict = {}
self._disconnected = None self._disconnected = 0
self._distant_peers = [] self._distant_peers = []
self._iface_to_prefix = {} self._iface_to_prefix = {}
self._refresh_time = refresh self._refresh_time = refresh
...@@ -189,28 +190,33 @@ class TunnelManager(object): ...@@ -189,28 +190,33 @@ class TunnelManager(object):
def select(self, r, w, t): def select(self, r, w, t):
r[self._read_pipe] = self.handleTunnelEvent r[self._read_pipe] = self.handleTunnelEvent
r[self.sock] = self.handlePeerEvent r[self.sock] = self.handlePeerEvent
if self._next_refresh:
t.append((self._next_refresh, self.refresh)) t.append((self._next_refresh, self.refresh))
self.ctl.select(r, w, t)
def refresh(self): def refresh(self):
logging.debug('Checking tunnels...') logging.debug('Checking tunnels...')
self._cleanDeads() self._cleanDeads()
if self._next_tunnel_refresh < time.time() or \
self._makeNewTunnels(False):
self._next_refresh = None
self.ctl.request_dump() # calls babel_dump immediately at startup
else:
self._next_refresh = time.time() + 5
def babel_dump(self):
remove = self._next_tunnel_refresh < time.time() remove = self._next_tunnel_refresh < time.time()
if remove: if remove:
self._countRoutes()
self._removeSomeTunnels() self._removeSomeTunnels()
self.resetTunnelRefresh() self.resetTunnelRefresh()
self.peer_db.log() self.peer_db.log()
self._makeNewTunnels(remove) self._makeNewTunnels(True)
# XXX: Commented code is an attempt to clean up unused interfaces but # XXX: Commented code is an attempt to clean up unused interfaces
# it is too aggressive. Sometimes _makeNewTunnels only asks address # but babeld does not leave ipv6 membership for deleted taps,
# (and the tunnel is created when we have an answer), so when the
# maximum number of tunnels is reached, taps are recreated all the
# time.
# Also, babeld does not leave ipv6 membership for deleted taps,
# causing a memory leak in the kernel (capped by sysctl # causing a memory leak in the kernel (capped by sysctl
# net.core.optmem_max), and after some time, new neighbours fail # net.core.optmem_max), and after some time, new neighbours fail
# to see each other. # to see each other.
#if remove and self._free_iface_list: #if remove and len(self._connecting) < len(self._free_iface_list):
# self._tuntap(self._free_iface_list.pop()) # self._tuntap(self._free_iface_list.pop())
self._next_refresh = time.time() + 5 self._next_refresh = time.time() + 5
...@@ -220,7 +226,13 @@ class TunnelManager(object): ...@@ -220,7 +226,13 @@ class TunnelManager(object):
self._kill(prefix) self._kill(prefix)
def _tunnelScore(self, prefix): def _tunnelScore(self, prefix):
n = self._connection_dict[prefix].routes n = 0
try:
for x in self.ctl.neighbours[prefix][1]:
if x:
n += 1
except KeyError:
pass
return (prefix in self._neighbour_set, n) if n else () return (prefix in self._neighbour_set, n) if n else ()
def _removeSomeTunnels(self): def _removeSomeTunnels(self):
...@@ -244,7 +256,6 @@ class TunnelManager(object): ...@@ -244,7 +256,6 @@ class TunnelManager(object):
int(prefix, 2), len(prefix)) int(prefix, 2), len(prefix))
def _makeTunnel(self, prefix, address): def _makeTunnel(self, prefix, address):
assert len(self._connection_dict) < self._client_count, (prefix, self.__dict__)
if prefix in self._served or prefix in self._connection_dict: if prefix in self._served or prefix in self._connection_dict:
return False return False
assert prefix != self._prefix, self.__dict__ assert prefix != self._prefix, self.__dict__
...@@ -264,40 +275,56 @@ class TunnelManager(object): ...@@ -264,40 +275,56 @@ class TunnelManager(object):
c.open() c.open()
return True return True
def _makeNewTunnels(self, route_counted): def _makeNewTunnels(self, route_dumped):
count = self._client_count - len(self._connection_dict) count = self._client_count - len(self._connection_dict)
if not count: if not count:
return return
assert count >= 0
# CAVEAT: Forget any peer that didn't reply to our previous address # CAVEAT: Forget any peer that didn't reply to our previous address
# request, either because latency is too high or some packet # request, either because latency is too high or some packet
# was lost. However, this means that some time should pass # was lost. However, this means that some time should pass
# before calling _makeNewTunnels again. # before calling _makeNewTunnels again.
self._connecting.clear() self._connecting.clear()
distant_peers = self._distant_peers distant_peers = self._distant_peers
if len(distant_peers) < count and not route_counted: if len(distant_peers) < count or 0 < self._disconnected < time.time():
self._countRoutes() if not route_dumped:
disconnected = self._disconnected return True
if disconnected is not None: logging.debug('Analyze routes ...')
logging.info("No route to registry (%u neighbours, %u distant" neighbours = self.ctl.neighbours
" peers)", len(disconnected), len(distant_peers)) # Collect all nodes known by Babel
# We aren't the registry node and we have no tunnel to or from it, peers = set(prefix
# so it looks like we are not connected to the network, and our for neigh_routes in neighbours.itervalues()
# neighbours are in the same situation. for prefix in neigh_routes[1]
self._disconnected = None if prefix)
disconnected = set(disconnected).union(distant_peers) # Keep only distant peers.
if disconnected: distant_peers[:] = peers.difference(neighbours)
# We do have neighbours that are probably also disconnected, # Check whether we're connected to the network.
registry = self.peer_db.registry_prefix
if (registry == self._prefix or registry in peers
or registry in self._connection_dict
or registry in self._served):
self._disconnected = 0
# Do not bootstrap too often, especially if we are several
# nodes to try.
elif self._disconnected < time.time():
logging.info("No route to registry (%u peers, %u distant)",
len(peers), len(distant_peers))
self._disconnected = time.time() + self.timeout * (
1 + random.randint(0, len(peers)))
distant_peers = None
if peers:
# We aren't the only disconnected node
# so force rebootstrapping. # so force rebootstrapping.
peer = self.peer_db.getBootstrapPeer() peer = self.peer_db.getBootstrapPeer()
if not peer: if not peer:
# Registry dead ? Assume we're connected after all. # Registry dead ? Assume we're connected after all.
disconnected = None distant_peers = self._distant_peers
elif peer[0] not in disconnected: elif peer[0] not in peers:
# Got a node that will probably help us rejoining the # Got a node that will probably help us rejoining
# network, so connect to it. # the network, so connect to it.
count -= self._makeTunnel(*peer) count -= self._makeTunnel(*peer)
if disconnected is None: if not count:
return
if distant_peers:
# Normal operation. Choose peers to connect to by looking at the # Normal operation. Choose peers to connect to by looking at the
# routing table. # routing table.
neighbour_set = self._neighbour_set.intersection(distant_peers) neighbour_set = self._neighbour_set.intersection(distant_peers)
...@@ -306,36 +333,31 @@ class TunnelManager(object): ...@@ -306,36 +333,31 @@ class TunnelManager(object):
peer = neighbour_set.pop() peer = neighbour_set.pop()
i = distant_peers.index(peer) i = distant_peers.index(peer)
else: else:
i = random.randrange(0, len(distant_peers)) i = random.randrange(len(distant_peers))
peer = distant_peers[i] peer = distant_peers[i]
distant_peers[i] = distant_peers[-1] distant_peers[i] = distant_peers[-1]
del distant_peers[-1] del distant_peers[-1]
address = self.peer_db.getAddress(peer) address = self.peer_db.getAddress(peer)
if address: if address:
count -= self._makeTunnel(peer, address) count -= self._makeTunnel(peer, address)
else: elif self.sendto(peer, '\2'):
ip = utils.ipFromBin(self._network + peer)
try:
self.sock.sendto('\2', (ip, PORT))
except socket.error, e:
logging.info('Failed to query %s (%s)', ip, e)
self._connecting.add(peer) self._connecting.add(peer)
count -= 1 count -= 1
elif count: elif distant_peers is None:
# No route/tunnel to registry, which usually happens when starting # No route/tunnel to registry, which usually happens when starting
# up. Select peers from cache for which we have no route. # up. Select peers from cache for which we have no route.
new = 0 new = 0
bootstrap = True bootstrap = True
for peer, address in self.peer_db.getPeerList(): for peer, address in self.peer_db.getPeerList():
if peer not in disconnected: if peer not in peers:
logging.info("Try to bootstrap using peer %u/%u",
int(peer, 2), len(peer))
bootstrap = False bootstrap = False
if self._makeTunnel(peer, address): if self._makeTunnel(peer, address):
new += 1 new += 1
if new == count: if new == count:
return return
if not (new or disconnected): # The following condition on 'peers' is the same as above,
# when we asked the registry for a node to bootstrap.
if not (new or peers):
if bootstrap: if bootstrap:
# Startup without any good address in the cache. # Startup without any good address in the cache.
peer = self.peer_db.getBootstrapPeer() peer = self.peer_db.getBootstrapPeer()
...@@ -347,41 +369,6 @@ class TunnelManager(object): ...@@ -347,41 +369,6 @@ class TunnelManager(object):
if self._makeTunnel(*peer): if self._makeTunnel(*peer):
break break
def _countRoutes(self):
logging.debug('Starting to count the routes on each interface...')
del self._distant_peers[:]
for conn in self._connection_dict.itervalues():
conn.routes = 0
other = []
for iface, prefix in utils.iterRoutes(self._network, self._prefix):
assert iface != 'lo', (iface, prefix)
nexthop = self._iface_to_prefix.get(iface)
if nexthop:
self._connection_dict[nexthop].routes += 1
if prefix in self._served or prefix in self._connection_dict:
continue
if iface in self._iface_list:
other.append(prefix)
else:
self._distant_peers.append(prefix)
registry = self.peer_db.registry_prefix
if registry == self._prefix or any(registry in x for x in (
self._distant_peers, other, self._served, self._connection_dict)):
self._disconnected = None
# XXX: When there is no new peer to connect when looking at routes
# coming from tunnels, we'd like to consider those discovered
# from the LAN. However, we don't want to create tunnels to
# nodes of the LAN so do nothing until we find a way to get
# some information from Babel.
#if not self._distant_peers:
# self._distant_peers = other
else:
self._disconnected = other
logging.debug("Routes counted: %u distant peers",
len(self._distant_peers))
for c in self._connection_dict.itervalues():
logging.trace('- %s: %s', c.iface, c.routes)
def killAll(self): def killAll(self):
for prefix in self._connection_dict.keys(): for prefix in self._connection_dict.keys():
self._kill(prefix) self._kill(prefix)
...@@ -431,6 +418,14 @@ class TunnelManager(object): ...@@ -431,6 +418,14 @@ class TunnelManager(object):
if address: if address:
self._address[family] = utils.dump_address(address) self._address[family] = utils.dump_address(address)
def sendto(self, peer, msg):
ip = utils.ipFromBin(self._network + peer)
try:
return self.sock.sendto(msg, (ip, PORT))
except socket.error, e:
logging.info('Failed to send message to %s/%s (%s)',
int(peer, 2), len(peer), e)
def _sendto(self, to, msg): def _sendto(self, to, msg):
try: try:
return self.sock.sendto(msg, to[:2]) return self.sock.sendto(msg, to[:2])
......
...@@ -190,7 +190,10 @@ def makedirs(path): ...@@ -190,7 +190,10 @@ def makedirs(path):
raise raise
def binFromIp(ip): def binFromIp(ip):
ip1, ip2 = struct.unpack('>QQ', socket.inet_pton(socket.AF_INET6, ip)) return binFromRawIp(socket.inet_pton(socket.AF_INET6, ip))
def binFromRawIp(ip):
ip1, ip2 = struct.unpack('>QQ', ip)
return bin(ip1)[2:].rjust(64, '0') + bin(ip2)[2:].rjust(64, '0') return bin(ip1)[2:].rjust(64, '0') + bin(ip2)[2:].rjust(64, '0')
...@@ -229,33 +232,6 @@ def binFromSubnet(subnet): ...@@ -229,33 +232,6 @@ def binFromSubnet(subnet):
p, l = subnet.split('/') p, l = subnet.split('/')
return bin(int(p))[2:].rjust(int(l), '0') return bin(int(p))[2:].rjust(int(l), '0')
if 1:
def _iterRoutes():
with open('/proc/net/ipv6_route') as f:
routing_table = f.read()
for line in routing_table.splitlines():
line = line.split()
iface = line[-1]
if 0 < int(line[5], 16) < 1 << 31: # positive metric
yield (iface, bin(int(line[0], 16))[2:].rjust(128, '0'),
int(line[1], 16))
_iterRoutes.__doc__ = """Iterates over all routes
Amongst all returned routes starting with re6st prefix:
- one is the local one with our prefix
- any route with null prefix will be ignored
- other are reachable routes installed by babeld
"""
def iterRoutes(network, exclude_prefix=None):
a = len(network)
for iface, ip, prefix_len in _iterRoutes():
if ip[:a] == network:
prefix = ip[a:prefix_len]
if prefix and prefix != exclude_prefix:
yield iface, prefix
def decrypt(key_path, data): def decrypt(key_path, data):
p = Popen(('openssl', 'rsautl', '-decrypt', '-inkey', key_path), p = Popen(('openssl', 'rsautl', '-decrypt', '-inkey', key_path),
stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdin=subprocess.PIPE, stdout=subprocess.PIPE)
......
...@@ -3,7 +3,7 @@ import atexit, errno, logging, os, signal, socket ...@@ -3,7 +3,7 @@ import atexit, errno, logging, os, signal, socket
import sqlite3, subprocess, sys, time, threading import sqlite3, subprocess, sys, time, threading
from collections import deque from collections import deque
from OpenSSL import crypto from OpenSSL import crypto
from re6st import db, plib, tunnel, utils, version from re6st import ctl, db, plib, tunnel, utils, version
from re6st.registry import RegistryClient, RENEW_PERIOD from re6st.registry import RegistryClient, RENEW_PERIOD
from re6st.utils import exit from re6st.utils import exit
...@@ -60,6 +60,9 @@ def getConfig(): ...@@ -60,6 +60,9 @@ def getConfig():
_('--babel-pidfile', metavar='PID', default='/var/run/re6st-babeld.pid', _('--babel-pidfile', metavar='PID', default='/var/run/re6st-babeld.pid',
help="Specify a file to write our process id to" help="Specify a file to write our process id to"
" (option -I of Babel).") " (option -I of Babel).")
_('--control-socket', metavar='CTL_SOCK', default=ctl.SOCK_PATH,
help="Socket path to use for communication between re6stnet and babeld"
" (option -R of Babel).")
_('--hello', type=int, default=15, _('--hello', type=int, default=15,
help="Hello interval in seconds, for both wired and wireless" help="Hello interval in seconds, for both wired and wireless"
" connections. OpenVPN ping-exit option is set to 4 times the" " connections. OpenVPN ping-exit option is set to 4 times the"
...@@ -303,8 +306,8 @@ def main(): ...@@ -303,8 +306,8 @@ def main():
required('registry') required('registry')
peer_db = db.PeerDB(db_path, registry, config.key, network, prefix) peer_db = db.PeerDB(db_path, registry, config.key, network, prefix)
cleanup.append(lambda: peer_db.cacheMinimize(config.client_count)) cleanup.append(lambda: peer_db.cacheMinimize(config.client_count))
tunnel_manager = tunnel.TunnelManager(peer_db, tunnel_manager = tunnel.TunnelManager(config.control_socket,
config.openvpn_args, timeout, config.tunnel_refresh, peer_db, config.openvpn_args, timeout, config.tunnel_refresh,
config.client_count, config.iface_list, network, prefix, config.client_count, config.iface_list, network, prefix,
address, ip_changed, config.encrypt, remote_gateway, address, ip_changed, config.encrypt, remote_gateway,
config.disable_proto, config.neighbour) config.disable_proto, config.neighbour)
...@@ -405,6 +408,7 @@ def main(): ...@@ -405,6 +408,7 @@ def main():
os.path.join(config.log, 'babeld.log'), os.path.join(config.log, 'babeld.log'),
os.path.join(config.state, 'babeld.state'), os.path.join(config.state, 'babeld.state'),
config.babel_pidfile, tunnel_interfaces, config.babel_pidfile, tunnel_interfaces,
config.control_socket,
*config.babel_args).stop) *config.babel_args).stop)
if config.up: if config.up:
exit.release() exit.release()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment