Commit 82df1543 authored by Ulysse Beaugnon's avatar Ulysse Beaugnon

Merge branch 'master' of https://git.erp5.org/repos/vifibnet

Conflicts:
	TODO
parents 05ddb3f7 f475f2ca
......@@ -25,6 +25,11 @@ OPTIONS : REGISTRY.PY
port
The port on which the server will listen
--private ip
Ipv6 address of the vifibnet client running on the machine. This
address will be advertised only to nodes having a valid
certificate.
--db path
Path to the server Database file. A new DB file will be created
and correctly initialized if the file doesn't exists.
......@@ -42,6 +47,14 @@ OPTIONS : REGISTRY.PY
Path to the server key file. To generate a key file, see the --ca
option
--bootstrap prefix ip port proto
Connection informations of a node given to other as a bootstrap
node to initiate connection with the network.
Prefix should be the prefix number of a node, given in binary and
with correct length. For instance the VPN address
2001:db8:42:1::/64 ( asusming a network prefix 2001:db8:42::/48 )
corresponds to a prefix 1/16 i.e 0000000000000001.
--mailhost mailhost
Mailhost to be used to send email containing token for registration
......@@ -123,13 +136,10 @@ OPTIONS : VIFIBNET.PY
This parameter is also given to openvpn and babel for their log.
Default : 0
--server address
Ip address of the peer discovery server. SHOULD be an ipv6 address
belonging to the VPN network, as the server only allows requests
from inside the VPN (feature not used now for debugging purposes)
--server-port port
Port number on the peer discovery server to which we connect
--registry address
Complete public ( reachable from the internet ) address of the machine
running a registry. Will be used to get the pirvate address of the
registry and/or bootstrap peers
--hello duration
Set hello interval, in seconds, for both wired and wireless
......
URGENT => Name ideas :
resnet ( for Resiliable NET ) : i prefere resinet
rsnet ( Resiliable Scalable NET )
To be done :
Use an algorithm to choose which connections to keep and/or establish
instead of pure randomness
......@@ -10,35 +6,5 @@ To be done :
Write docstrings for all class/methods/functions
Handle corrupt peers DB gracefully
To be discussed:
U : Babel seems to be very long to establish the routes : maybe we should
tell him thant we are not on a wired network but on a mobile network ?
G : babel establish routes quickly enough i'd say. There are two new
options : hello and wireless, for hello_interval and treating all
interfaces as wireless. However, treating an interface as wireless
doesn't lessen the hello_interval, it only changes how babel estimates
quality link, and cost.
U : from babel web page : "When the Babel daemon detects a wired network,
it will use a larger interval between hellos".
Moreover, it seems that the wireless option only means
"hostile environment" which seems best for a resilient network.
30 sec of hello interval seams also too much. The default value for
babel is 4 sec (from babel man page).
According to raphael's stats on the nexedi's server downtime,
45% of the problems dont last more than 2 minutes, 55% no more than
3 minutes If it takes 2 min to detect a dead connection, then we wont be
solving many problems with our overlay network
G : ok, so babel hello-interval should be set to a lower value,
we should do some tests to pinpoint the best compromise between
speed and bandwith usage.
Btw, is there a doc ( pdf, image, file ) resuming Raphael's stats
on nexedi's server downtime ? it could be useful for the internship
rapport
G : It takes babel between 3 times and 4 times the hello interval to
reestablish connection, if a direct link is cut
U : So we have to reduce the hello interval. 2min to detect a dead link is
far too much.
G : k
Project name ? Resinet/resnet/rsnet
import sqlite3, socket, xmlrpclib, time, os
import sqlite3, socket, subprocess, xmlrpclib, time, os
import utils
class PeerManager:
# internal ip = temp arg/attribute
def __init__(self, db_dir_path, server, server_port, refresh_time, address,
def __init__(self, db_dir_path, registry, key_path, refresh_time, address,
internal_ip, prefix, manual, pp , db_size):
self._refresh_time = refresh_time
self._address = address
self._internal_ip = internal_ip
self._prefix = prefix
self._server = server
self._server_port = server_port
self._db_size = db_size
self._registry = registry
self._key_path = key_path
self._pp = pp
self._blacklist = [(prefix,)]
self._manual = manual
self._proxy = xmlrpclib.ServerProxy('http://%s:%u'
% (server, server_port))
utils.log('Connectiong to peers database...', 4)
self._db = sqlite3.connect(os.path.join(db_dir_path, 'peers.db'),
isolation_level=None)
utils.log('Database opened', 5)
utils.log('Preparing peers database...', 4)
self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
prefix TEXT PRIMARY KEY,
address TEXT NOT NULL,
used INTEGER NOT NULL DEFAULT 0,
date INTEGER DEFAULT (strftime('%s', 'now')))""")
self._db.execute("UPDATE peers SET used = 0")
self._db.execute("CREATE INDEX IF NOT EXISTS _peers_used ON peers(used)")
self._db.execute("""CREATE TABLE IF NOT EXISTS blacklist (
prefix TEXT PRIMARY KEY,
flag INTEGER NOT NULL)""")
self._db.execute("""CREATE INDEX IF NOT EXISTS
blacklist_flag ON blacklist(flag)""")
self._db.execute("INSERT OR REPLACE INTO blacklist VALUES (?,?)",
(prefix, 1))
self._db.execute("""CREATE TABLE IF NOT EXISTS config (
name text primary key,
value text)""")
try:
self._db.execute("UPDATE peers SET used = 0")
self._db.execute("""CREATE TABLE IF NOT EXISTS blacklist (
prefix TEXT PRIMARY KEY,
flag INTEGER NOT NULL)""")
self._db.execute("""CREATE INDEX IF NOT EXISTS
blacklist_flag ON blacklist(flag)""")
except sqlite3.OperationalError, e:
if e.args[0] == 'no such table: peers':
raise RuntimeError
a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
except StopIteration:
proxy = xmlrpclib.ServerProxy(registry)
a = proxy.getPrivateAddress()
self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
self._proxy = xmlrpclib.ServerProxy(a)
utils.log('Database prepared', 5)
self.next_refresh = time.time()
def clear_blacklist(self, flag):
utils.log('Clearing blacklist from flag %u' % (flag,), 3)
self._db.execute("DELETE FROM blacklist WHERE flag = ?", (flag,))
self._db.execute("DELETE FROM blacklist WHERE flag = ?",
(flag,))
utils.log('Blacklist cleared', 5)
def blacklist(self, prefix, flag):
......@@ -64,16 +75,18 @@ class PeerManager:
self._populate()
utils.log('DB refreshed', 3)
self.next_refresh = time.time() + self._refresh_time
return True
except socket.error, e:
utils.log(str(e), 4)
utils.log(e, 4)
utils.log('Connection to server failed, retrying in 30s', 2)
self.next_refresh = time.time() + 30
return False
def _declare(self):
if self._address != None:
utils.log('Sending connection info to server...', 3)
self._proxy.declare((self._internal_ip,
utils.address_list(self._address)))
utils.address_str(self._address)))
utils.log('Info sent', 5)
else:
utils.log("Warning : couldn't send ip, unknown external config", 4)
......@@ -82,31 +95,55 @@ class PeerManager:
utils.log('Populating the peers DB...', 2)
new_peer_list = self._proxy.getPeerList(self._db_size,
self._internal_ip)
self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
FROM peers WHERE used <= 0))""",
(str(len(new_peer_list) - self._db_size),))
self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
VALUES (?,?)""", new_peer_list)
self._db.execute("""DELETE FROM peers WHERE prefix IN
(SELECT prefix FROM blacklist)""")
with self._db:
self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
FROM peers WHERE used <= 0))""",
(str(len(new_peer_list) - self._db_size),))
self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
VALUES (?,?)""", new_peer_list)
self._db.execute("""DELETE FROM peers WHERE prefix IN
(SELECT prefix FROM blacklist)""")
utils.log('DB populated', 3)
utils.log('New peers : %s' % ', '.join(map(str, new_peer_list)), 5)
def getUnusedPeers(self, peer_count):
return self._db.execute("""SELECT prefix, address FROM peers WHERE used
<= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
(peer_count,))
for populate in self.refresh, self._bootstrap, bool:
peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
<= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
(peer_count,)).fetchall()
if peer_list or populate():
return peer_list
def _bootstrap(self):
utils.log('Getting Boot peer...', 3)
proxy = xmlrpclib.ServerProxy(self._registry)
try:
bootpeer = proxy.getBootstrapPeer(self._prefix).data
utils.log('Boot peer received from server', 4)
p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
bootpeer = p.communicate(bootpeer).split()
self.db.execute("INSERT INTO peers (prefix, address) VALUES (?,?)", bootpeer)
utils.log('Boot peer added', 4)
return True
except socket.error:
pass
except sqlite3.IntegrityError, e:
import pdb; pdb.set_trace()
if e.args[0] != '':
raise
return False
def usePeer(self, prefix):
utils.log('Updating peers database : using peer ' + str(prefix), 5)
self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
(prefix,))
utils.log('DB updated', 5)
def unusePeer(self, prefix):
utils.log('Updating peers database : unusing peer ' + str(prefix), 5)
self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
(prefix,))
utils.log('DB updated', 5)
......
......@@ -14,7 +14,7 @@ def openvpn(hello_interval, *args, **kw):
'--group', 'nogroup',
'--verb', str(verbose),
] + list(args)
utils.log(str(args), 5)
utils.log(args, 5)
return subprocess.Popen(args, **kw)
def server(server_ip, ip_length, max_clients, dh_path, pipe_fd, port, proto, hello_interval, *args, **kw):
......@@ -33,12 +33,16 @@ def server(server_ip, ip_length, max_clients, dh_path, pipe_fd, port, proto, hel
def client(server_address, pipe_fd, hello_interval, *args, **kw):
utils.log('Starting client...', 5)
remote= ['--nobind',
'--client',
'--up', 'ovpn-client',
'--route-up', 'ovpn-client ' + str(pipe_fd) ]
for ip, port, proto in utils.address_set(server_address):
remote += '--remote', ip, port, proto
remote = ['--nobind',
'--client',
'--up', 'ovpn-client',
'--route-up', 'ovpn-client ' + str(pipe_fd) ]
try:
for ip, port, proto in utils.address_list(server_address):
remote += '--remote', ip, port, proto
except ValueError, e:
utils.log('Error "%s" in unpacking address %s for openvpn client'
% (e, server_address,), 1)
remote += args
return openvpn(hello_interval, *remote, **kw)
......@@ -65,6 +69,6 @@ def router(network, internal_ip, interface_list,
if wireless:
args.append('-w')
args = args + interface_list
utils.log(str(args), 5)
utils.log(args, 5)
return subprocess.Popen(args, **kw)
#!/usr/bin/env python
import argparse, math, random, select, smtplib, sqlite3, string, socket, time, traceback, errno
import argparse, math, random, select, smtplib, sqlite3, string, socket
import subprocess, time, threading, traceback, errno
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
from email.mime.text import MIMEText
from OpenSSL import crypto
import utils
# To generate server ca and key with serial for 2001:db8:42::/48
# openssl req -nodes -new -x509 -key ca.key -set_serial 0x120010db80042 -days 365 -out ca.crt
......@@ -58,6 +57,8 @@ class main(object):
_('--bootstrap', nargs=4, action="append",
help='''VPN prefix, ip address, port and protocol to send as
bootstrap peers, instead of random ones''')
_('--private',
help='VPN IP of the node on which runs the registry')
self.config = parser.parse_args()
# Database initializing
......@@ -179,10 +180,14 @@ class main(object):
def getCa(self, handler):
return crypto.dump_certificate(crypto.FILETYPE_PEM, self.ca)
def getBootstrapPeer(self, handler):
def getPrivateAddress(self, handler):
return 'http://[%s]:%u' % (self.config.private, self.config.port)
def getBootstrapPeer(self, handler, client_prefix):
# TODO: Insert a flag column for bootstrap ready servers in peers
# ( servers which shouldn't go down or change ip and port as opposed to servers owned by particulars )
# that way, we also ascertain that the server sent is not the new node....
cert = self.db.execute("SELECT cert FROM vpn WHERE prefix = ?", (client_prefix,))
if self.config.bootstrap:
bootpeer = random.choice(self.config.bootstrap)
prefix = bootpeer[0]
......@@ -190,8 +195,16 @@ class main(object):
else:
prefix, address = self.db.execute("""SELECT prefix, address
FROM peers ORDER BY random() LIMIT 1""")
print "Sending bootstrap peer (%s, %s)" % (prefix, address)
return prefix, address
r, w = os.pipe()
try:
threading.Thread(target=os.write, args=(w, cert)).start()
p = subprocess.Popen(('openssl', 'rsautl', '-encrypt', '-certin', '-inkey', '/proc/self/fd/%u' % r),
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
print "Sending bootstrap peer (%s, %s)" % (prefix, address)
return xmlrpclib.Binary(p.communicate('%s %s' % (prefix, address)))
finally:
os.close(r)
os.close(w)
def declare(self, handler, address):
print "declaring new node"
......
......@@ -38,9 +38,9 @@ class Connection:
return True
# Unused for now. By killing tunnels with significantly lower trafic
# in comparison to other tunnels, we hope to connect to nodes with
# better bandwith, in order to improve connectivity with destinations
# we are really interested in.
# in comparison to other tunnels, we hope to connect to nodes with
# better bandwith, in order to improve connectivity with destinations
# we are really interested in.
def _updateBandwidth(self):
try:
f_rx = open('/sys/class/net/%s/statistics/rx_bytes' %
......@@ -185,4 +185,4 @@ class TunnelManager:
def killAll(self):
for prefix in self._connection_dict.keys():
self._kill(prefix)
......@@ -5,7 +5,8 @@ verbose = 0
def log(message, verbose_level):
if verbose >= verbose_level:
print time.strftime("%d-%m-%Y %H:%M:%S : " + message)
print time.strftime("%d-%m-%Y %H:%M:%S :"),
print message
def binFromIp(ip):
ip1, ip2 = struct.unpack('>QQ', socket.inet_pton(socket.AF_INET6, ip))
......@@ -37,9 +38,9 @@ def ipFromCert(network, cert_path):
prefix, prefix_len = subject.CN.split('/')
return ipFromPrefix(network, prefix, int(prefix_len))
def address_list(address_set):
def address_str(address_set):
return ';'.join(map(','.join, address_set))
def address_set(address_list):
def address_list(address_list):
return list(tuple(address.split(','))
for address in address_list.split(';'))
#!/usr/bin/env python
import argparse, errno, os, select, subprocess, time
import argparse, errno, os, select, subprocess, sqlite3, time
from argparse import ArgumentParser
import db, plib, upnpigd, utils, tunnel
......@@ -17,14 +17,16 @@ class ArgParser(ArgumentParser):
yield arg
def ovpnArgs(optional_args, ca_path, cert_path):
def ovpnArgs(optional_args, ca_path, cert_path, key_path):
# Treat openvpn arguments
if optional_args[0] == "--":
if optional_args and optional_args[0] == "--":
del optional_args[0]
optional_args.append('--ca')
optional_args.append(ca_path)
optional_args.append('--cert')
optional_args.append(cert_path)
optional_args.append('--key')
optional_args.append(key_path)
return optional_args
......@@ -46,10 +48,8 @@ def getConfig():
help='Defines the verbose level')
_('-i', '--interface', action='append', dest='iface_list', default=[],
help='Extra interface for LAN discovery')
_('--server', required=True,
help="VPN address of the discovery peer server")
_('--server-port', required=True, type=int,
help="VPN port of the discovery peer server")
_('--registry', required=True,
help="Complete public address of the discovery peer server")
# Routing algorithm options
_('--hello', type=int, default=15,
......@@ -69,6 +69,8 @@ def getConfig():
help='Path to the certificate authority file')
_('--cert', required=True,
help='Path to the certificate file')
_('--key', required=True,
help='Path to the private key file')
# args to be removed ?
_('--connection-count', default=20, type=int,
help='Number of tunnels')
......@@ -89,7 +91,8 @@ def main():
manual = bool(config.address)
network = utils.networkFromCa(config.ca)
internal_ip, prefix = utils.ipFromCert(network, config.cert)
openvpn_args = ovpnArgs(config.openvpn_args, config.ca, config.cert)
openvpn_args = ovpnArgs(config.openvpn_args, config.ca, config.cert,
config.key)
# Set global variables
tunnel.log = config.log
......@@ -119,7 +122,7 @@ def main():
except upnpigd.NoUPnPDevice:
utils.log('No upnp device found', 4)
peer_db = db.PeerManager(config.state, config.server, config.server_port,
peer_db = db.PeerManager(config.state, config.registry, config.key,
config.peers_db_refresh, config.address, internal_ip, prefix,
manual, config.pp, 200)
tunnel_manager = tunnel.TunnelManager(write_pipe, peer_db, openvpn_args,
......@@ -147,47 +150,39 @@ def main():
# main loop
try:
while True:
utils.log('Sleeping ...', 2)
nextUpdate = min(tunnel_manager.next_refresh, peer_db.next_refresh)
if forwarder != None:
nextUpdate = min(nextUpdate, forwarder.next_refresh)
nextUpdate = max(0, nextUpdate - time.time())
ready, tmp1, tmp2 = select.select([read_pipe], [], [], nextUpdate)
if ready:
peer_db.handle_message(read_pipe.readline())
if time.time() >= peer_db.next_refresh:
peer_db.refresh()
if time.time() >= tunnel_manager.next_refresh:
tunnel_manager.refresh()
if forwarder != None and time.time() > forwarder.next_refresh:
forwarder.refresh()
except KeyboardInterrupt:
try:
router.terminate()
except:
pass
try:
server_process.terminate()
except:
pass
tunnel_manager.killAll()
while True:
utils.log('Sleeping ...', 2)
nextUpdate = min(tunnel_manager.next_refresh, peer_db.next_refresh)
if forwarder != None:
nextUpdate = min(nextUpdate, forwarder.next_refresh)
nextUpdate = max(0, nextUpdate - time.time())
ready, tmp1, tmp2 = select.select([read_pipe], [], [], nextUpdate)
if ready:
peer_db.handle_message(read_pipe.readline())
if time.time() >= peer_db.next_refresh:
peer_db.refresh()
if time.time() >= tunnel_manager.next_refresh:
tunnel_manager.refresh()
if forwarder != None and time.time() > forwarder.next_refresh:
forwarder.refresh()
finally:
for p in [router] + server_process:
try:
p.terminate()
except:
pass
try:
tunnel_manager.killAll()
except:
pass
except sqlite3.Error:
traceback.print_exc()
os.rename(db_path, db_path + '.bak')
os.execvp(sys.executable, sys.argv)
except KeyboardInterrupt:
return 0
except:
try:
router.terminate()
except:
pass
try:
server_process.terminate()
except:
pass
try:
tunnel_manager.killAll()
except:
pass
raise
if __name__ == "__main__":
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment