Commit 660f2d21 authored by Yohann D'Anello's avatar Yohann D'Anello

[monitoring] Move monitoring in a separate file

Signed-off-by: Yohann D'Anello's avatarYohann D'ANELLO <ynerant@crans.org>
parent fbf3367d
......@@ -373,8 +373,8 @@ def main():
cleanup.append(lambda: subprocess.call(('ip', '-6', 'route', 'flush', 'table', '34071')))
cleanup.append(lambda: subprocess.call(('ip', '-6', 'route', 'flush', 'table', '34072')))
def clean_monitoring_addresses():
for address in tunnel_manager._neighbour_monitoring_addresses.values():
subprocess.check_call(('ip', '-6', 'address', 'del', address, 'dev', 'lo'))
for link in list(tunnel_manager.monitoring_manager.links.values()):
link.delete()
cleanup.append(clean_monitoring_addresses)
ip('rule', 'from', my_subnet, 'to', my_subnet, 'iif', config.main_interface, 'lookup', '34071')
if_rt = ['ip', '-6', 'route', 'del',
......
import logging
import random
import socket
import subprocess
from re6st import utils
RE6ST_PORT = 326 # Found in tunnel.PORT, but we want to avoid import loops
MONITORING_PORT = 327 # Port used for monitoring
class MonitoringManager:
def __init__(self, tunnel_manager, main_interface='lo'):
self.tunnel_manager = tunnel_manager
self.main_interface = main_interface
self.links = {}
@property
def full_prefix_bin(self):
return self.tunnel_manager._network + self.tunnel_manager._prefix
@property
def main_address(self):
return utils.ipFromBin(self.full_prefix_bin, '1')
@property
def main_peer(self):
return self.tunnel_manager._getPeer(self.tunnel_manager._prefix)
def get_link_per_address(self, address):
"""
Find associated monitoring link per monitoring address, if existing.
"""
for link in self.links.values():
if link.address.lower() == address.lower():
return link
return None
def update_monitoring_links(self):
"""
Refresh routes that are used for link monitoring.
"""
# Cleanup old routes
for link in list(self.links.values()):
if link.prefix not in self.tunnel_manager.ctl.neighbours:
link.delete()
# Babel is not initialized yet.
if not hasattr(self.tunnel_manager.ctl, 'neighbours'):
return
# Get nexthop for each prefix, and draw a route for monitoring addresses
for prefix in self.tunnel_manager.ctl.neighbours.keys():
if prefix is None:
continue
if prefix not in self.links:
self.links[prefix] = MonitoringLink(self, prefix)
self.links[prefix].update_link(self.tunnel_manager.ctl.neighbours[prefix][0])
def monitor_links(self):
for link in self.links.values():
link.monitor()
class MonitoringLink:
manager = None
prefix = None
address = None
def __init__(self, monitoring_manager, prefix):
self.manager = monitoring_manager
self.prefix = prefix
@property
def tested_address(self):
return utils.ipFromBin(self.manager.tunnel_manager._network + self.prefix)
def update_link(self, neighbour):
nexthop = neighbour.address
nexthop = utils.ipFromBin("".join(bin(ord(c))[2:].zfill(8) for c in nexthop))
# Find interface name from interface id
ifindex = neighbour.ifindex
output = subprocess.check_output(('ip', 'link'))
for line in output.split('\n'):
if line.startswith(str(ifindex) + ':'):
iface = line.split(' ')[1][:-1]
break
else:
logging.error("Unknown interface index: " + str(ifindex))
return
# Assign new IP address to this link is not existing
if not self.address:
p = self.manager.full_prefix_bin
s = bin(random.randint(2, 2 ** (128 - len(p))))[2:]
self.address = utils.ipFromBin(p, s)
# Add route in kernel
subprocess.check_output(('ip', '-6', 'address', 'add', self.address, 'dev', self.manager.main_interface),
stderr=subprocess.STDOUT) # Ignore stderr
subprocess.check_output(('ip', '-6', 'route', 'del', self.address, 'dev', self.manager.main_interface,
'table', 'main'), stderr=subprocess.STDOUT)
subprocess.check_output(('ip', '-6', 'route', 'add', self.manager.main_address, 'from', self.address,
'via', nexthop, 'dev', iface, 'src', self.address, 'table', '34071'),
stderr=subprocess.STDOUT)
def monitor(self):
msock = None
try:
print("Send ping from " + self.address + " for " + self.tested_address
+ " to " + self.manager.main_address + "...")
msock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
msock.bind((self.address, MONITORING_PORT))
msg = b'\x08'
peer = self.manager.main_peer
data = peer.encode(msg)
peer._j -= 1 # Don't increment seqno with this packet
msock.sendto(data, (self.manager.main_address, RE6ST_PORT))
except Exception, e:
logging.error("Error while monitoring " + self.tested_address + ": " + str(e))
self.manager.tunnel_manager.newVersion() # Maybe babel dump is not up to date
finally:
if msock:
try:
msock.close()
finally:
pass
def ping(self):
self.manager.main_peer._i -= 1 # Don't increment seqno with this packet
print("Monitoring!")
print(self.address)
def delete(self):
subprocess.check_call(('ip', '-6', 'address', 'del', self.address, 'dev', self.manager.main_interface))
subprocess.check_call(('ip', '-6', 'route', 'del', self.manager.main_address,
'from', self.address, 'table', '34071'))
if self.prefix in self.manager.links:
del self.manager.links[self.prefix]
def __del__(self):
self.delete()
......@@ -4,6 +4,7 @@ from collections import defaultdict, deque
from bisect import bisect, insort
from OpenSSL import crypto
from . import ctl, plib, rina, utils, version, x509
from .monitoring import MonitoringManager
PORT = 326
......@@ -206,11 +207,12 @@ class BaseTunnelManager(object):
self.cache = cache
self._connecting = set()
self._connection_dict = {}
self._neighbour_monitoring_addresses = {}
self._served = defaultdict(dict)
self._version = cache.version
self._conf_country = conf_country
self.monitoring_manager = MonitoringManager(self, 'lo') # FIXME Give main interface name
address_dict = defaultdict(list)
for family, address in address:
address_dict[family] += address
......@@ -533,9 +535,9 @@ class BaseTunnelManager(object):
if peer and self._prefix == self.cache.registry_prefix:
logging.info("%s/%s: %s", int(peer, 2), len(peer), msg)
elif code == 8:
print("Monitoring!")
print(sender[0])
self._getPeer(peer)._i -= 1 # Don't increment seqno with this packet
link = self.monitoring_manager.get_link_per_address(sender[0])
if link is not None:
link.ping()
def askInfo(self, prefix):
return self.sendto(prefix, '\4' + self._info(True))
......@@ -671,90 +673,6 @@ class BaseTunnelManager(object):
'\7%s (%s)' % (msg, os.uname()[2]))
break
def updateMonitoringLinks(self):
"""
Refresh routes that are used for link monitoring.
"""
my_address = utils.ipFromBin(self._network + self._prefix, '1')
# Cleanup old routes
for prefix in list(self._neighbour_monitoring_addresses.keys()):
if prefix not in self.ctl.neighbours:
address = self._neighbour_monitoring_addresses[prefix]
# FIXME Replace lo by main inteface name
subprocess.check_call(('ip', '-6', 'address', 'del', address, 'dev', 'lo'))
subprocess.check_call(('ip', '-6', 'route', 'del', my_address,
'from', address, 'table', '34071'))
del self._neighbour_monitoring_addresses[prefix]
# Babel is not initialized yet.
if not hasattr(self.ctl, 'neighbours'):
return
# Get nexthop for each prefix, and draw a route for monitoring addresses
for prefix in self.ctl.neighbours.keys():
if prefix is None:
continue
neighbour = self.ctl.neighbours[prefix][0]
nexthop = neighbour.address
nexthop = utils.ipFromBin("".join(bin(ord(c))[2:].zfill(8) for c in nexthop))
# Find interface name from interface id
ifindex = neighbour.ifindex
output = subprocess.check_output(('ip', 'link'))
for line in output.split('\n'):
if line.startswith(str(ifindex) + ':'):
iface = line.split(' ')[1][:-1]
break
else:
logging.error("Unknown interface index: " + str(ifindex))
continue
# Assign new IP address to this link is not existing
if prefix not in self._neighbour_monitoring_addresses:
p = self.ctl.network + self._prefix
s = bin(random.randint(2, 2 ** (128 - len(p))))[2:]
self._neighbour_monitoring_addresses[prefix] = utils.ipFromBin(p, s)
address = self._neighbour_monitoring_addresses[prefix]
# Add route in kernel
# FIXME Replace lo by main inteface name
subprocess.check_output(('ip', '-6', 'address', 'add', address, 'dev', 'lo'),
stderr=subprocess.STDOUT) # Ignore stderr
subprocess.check_output(('ip', '-6', 'route', 'del', address, 'dev', 'lo', 'table', 'main'),
stderr=subprocess.STDOUT)
subprocess.check_output(('ip', '-6', 'route', 'add', my_address, 'from', address,
'via', nexthop, 'dev', iface, 'src', address, 'table', '34071'),
stderr=subprocess.STDOUT)
def monitorLinks(self):
"""
Try to forward packets through each direct link.
"""
my_address = utils.ipFromBin(self._network + self._prefix, '1')
for prefix, address in self._neighbour_monitoring_addresses.items():
msock = None
try:
print("Send ping from " + address + " for " + utils.ipFromBin(self._network + prefix)
+ " to " + my_address + "...")
msock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
msock.bind((address, PORT + 1))
msg = b'\x08'
peer = self._getPeer(self._prefix)
data = peer.encode(msg)
peer._j -= 1 # Don't increment seqno with this packet
msock.sendto(data, (my_address, PORT))
except Exception, e:
logging.error("Error while monitoring " + utils.ipFromBin(self._network + prefix) + ": " + str(e))
finally:
if msock:
try:
msock.close()
finally:
pass
def _updateCountry(self, address):
def update():
for a in address:
......@@ -866,8 +784,8 @@ class TunnelManager(BaseTunnelManager):
else:
self._next_refresh = time.time() + 5
self.checkRoutingCache()
self.updateMonitoringLinks()
self.monitorLinks()
self.monitoring_manager.update_monitoring_links()
self.monitoring_manager.monitor_links()
def babel_dump(self):
t = time.time()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment