Commit e9243962 authored by Julien Muchembled's avatar Julien Muchembled

registry: fix use of socket connection to babeld

- getBootstrapPeer was stuck as long as there was no other request being served
- registry crashed when re6stnet is stopped
parent 1c616d4c
...@@ -175,30 +175,52 @@ SetCostMultiplier = Packet(2, ...@@ -175,30 +175,52 @@ SetCostMultiplier = Packet(2,
Struct("B", "set_cost_multiplier", "flags")) Struct("B", "set_cost_multiplier", "flags"))
class ConnectionClosed(Exception):
def __str__(self):
return "connection to babeld closed (%s)" % self.args
class Babel(object): class Babel(object):
_decode = None _decode = None
def __init__(self, socket_path, handler, network): def __init__(self, socket_path, handler, network):
self.socket_path = socket_path
self.handler = handler
self.network = network self.network = network
self.locked = set()
self.reset()
def reset(self):
try:
del self.socket, self.request_dump
except AttributeError:
pass
self.write_buffer = Buffer() self.write_buffer = Buffer()
self.read_buffer = Buffer() self.read_buffer = Buffer()
self.read_buffer.want(header.size) self.read_buffer.want(header.size)
self.handler = handler
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def select(*args): def select(*args):
try: try:
s.connect(socket_path) s.connect(self.socket_path)
except socket.error: except socket.error, e:
return logging.debug("%s", e)
return e
s.send("\1") s.send("\1")
s.setblocking(0) s.setblocking(0)
del self.request_dump, self.select del self.select
self.socket = s self.socket = s
return self.select(*args) return self.select(*args)
self.select = select self.select = select
self.request_dump = lambda: self.handle_dump((), (), (), ())
self.locked = set() def request_dump(self):
if self.select({}, {}, ()):
self.handle_dump((), (), (), ())
else:
# interfaces + neighbours + installed routes
self.request_dump = lambda: self.send(Dump(11))
self.request_dump()
def send(self, packet): def send(self, packet):
packet.write(self.write_buffer) packet.write(self.write_buffer)
...@@ -212,7 +234,7 @@ class Babel(object): ...@@ -212,7 +234,7 @@ class Babel(object):
def _read(self): def _read(self):
d = self.socket.recv(65536) d = self.socket.recv(65536)
if not d: if not d:
raise RuntimeError("connection to babeld closed") raise ConnectionClosed(self.socket_path)
b = self.read_buffer b = self.read_buffer
b += d b += d
while b.ready: while b.ready:
...@@ -235,9 +257,6 @@ class Babel(object): ...@@ -235,9 +257,6 @@ class Babel(object):
def _write(self): def _write(self):
self.write_buffer.send(self.socket) self.write_buffer.send(self.socket)
def request_dump(self):
self.send(Dump(11)) # interfaces + neighbours + installed routes
def handle_dump(self, interfaces, neighbours, xroutes, routes): def handle_dump(self, interfaces, neighbours, xroutes, routes):
# neighbours = {neigh_prefix: (neighbour, {dst_prefix: route})} # neighbours = {neigh_prefix: (neighbour, {dst_prefix: route})}
n = dict(((n.address, n.ifindex), (n, {})) for n in neighbours) n = dict(((n.address, n.ifindex), (n, {})) for n in neighbours)
......
...@@ -91,10 +91,6 @@ class RegistryServer(object): ...@@ -91,10 +91,6 @@ class RegistryServer(object):
self.email = self.ca.get_subject().emailAddress self.email = self.ca.get_subject().emailAddress
self.peers_lock = threading.Lock() self.peers_lock = threading.Lock()
l = threading.Lock()
l.acquire()
self.wait_dump = l.acquire
self.babel_dump = l.release
self.ctl = ctl.Babel(config.control_socket, self.ctl = ctl.Babel(config.control_socket,
weakref.proxy(self), self.network) weakref.proxy(self), self.network)
...@@ -103,7 +99,23 @@ class RegistryServer(object): ...@@ -103,7 +99,23 @@ class RegistryServer(object):
def select(self, r, w, t): def select(self, r, w, t):
if self.timeout: if self.timeout:
t.append((self.timeout, self.onTimeout)) t.append((self.timeout, self.onTimeout))
self.ctl.select(r, w, t)
def request_dump(self):
assert self.peers_lock.locked()
self._wait_dump = True
while True:
self.ctl.request_dump()
try:
while self._wait_dump:
args = {}, {}, ()
self.ctl.select(*args)
utils.select(*args)
break
except ctl.ConnectionClosed:
self.ctl.reset()
def babel_dump(self):
self._wait_dump = False
def onTimeout(self): def onTimeout(self):
# XXX: Because we use threads to process requests, the statements # XXX: Because we use threads to process requests, the statements
...@@ -320,8 +332,7 @@ class RegistryServer(object): ...@@ -320,8 +332,7 @@ class RegistryServer(object):
with self.peers_lock: with self.peers_lock:
age, peers = self.peers age, peers = self.peers
if age < time.time() or not peers: if age < time.time() or not peers:
self.ctl.request_dump() self.request_dump()
self.wait_dump()
peers = [prefix peers = [prefix
for neigh_routes in self.ctl.neighbours.itervalues() for neigh_routes in self.ctl.neighbours.itervalues()
for prefix in neigh_routes[1] for prefix in neigh_routes[1]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment