Commit a7a86341 authored by Julien Muchembled's avatar Julien Muchembled

New protocol between nodes with authentication

parent 32ebb80b
......@@ -297,21 +297,27 @@ if len(sys.argv) > 1:
elif self.path == '/tunnel.html':
other = 'route'
gv = registry.Popen(('python', '-c', r"""if 1:
import math
import math, json
from re6st.registry import RegistryClient
g = eval(RegistryClient('http://localhost/').topology())
g = json.loads(RegistryClient(
'http://localhost/').topology())
r = set(g.pop('', ()))
a = set()
for v in g.itervalues():
a.update(v)
g.update(dict.fromkeys(a.difference(g), ()))
print 'digraph {'
a = 2 * math.pi / len(g)
z = 4
m2 = '%u/80' % (2 << 64)
title = lambda n: '2|80' if n == m2 else n
g = sorted((title(k), v) for k, v in g.iteritems())
for i, (n, p) in enumerate(g):
g = sorted((title(k), k in r, v) for k, v in g.iteritems())
for i, (n, r, v) in enumerate(g):
print '"%s"[pos="%s,%s!"%s];' % (title(n),
z * math.cos(a * i), z * math.sin(a * i),
', style=dashed' if p is None else '')
for p in p or ():
print '"%s" -> "%s";' % (n, title(p))
'' if r else ', style=dashed')
for v in v:
print '"%s" -> "%s";' % (n, title(v))
print '}'
"""), stdout=subprocess.PIPE, cwd="..").communicate()[0]
if gv:
......
......@@ -12,6 +12,7 @@ class PeerDB(object):
logging.info('Initialize cache ...')
self._db = sqlite3.connect(db_path, isolation_level=None)
self._db.text_factory = str
q = self._db.execute
q("PRAGMA synchronous = OFF")
q("PRAGMA journal_mode = MEMORY")
......
......@@ -18,10 +18,10 @@ Authenticated communication:
- the last one that was really used by the client (!hello)
- the one of the last handshake (hello)
"""
import base64, hmac, hashlib, httplib, inspect, logging
import base64, hmac, hashlib, httplib, inspect, json, logging
import mailbox, os, random, select, smtplib, socket, sqlite3
import string, struct, sys, threading, time, weakref
from collections import deque
import string, sys, threading, time, weakref
from collections import defaultdict, deque
from datetime import datetime
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from email.mime.text import MIMEText
......@@ -92,6 +92,20 @@ class RegistryServer(object):
self.onTimeout()
def sendto(self, prefix, code):
self.sock.sendto("%s\0%c" % (prefix, code), ('::1', tunnel.PORT))
def recv(self, code):
try:
prefix, msg = self.sock.recv(1<<16).split('\0', 1)
int(prefix, 2)
except ValueError:
pass
else:
if msg and ord(msg[0]) == code:
return prefix, msg[1:]
return None, None
def select(self, r, w, t):
if self.timeout:
t.append((self.timeout, self.onTimeout))
......@@ -198,8 +212,7 @@ class RegistryServer(object):
def hello(self, client_prefix):
with self.lock:
cert = self.getCert(client_prefix)
key = hashlib.sha1(struct.pack('Q',
random.getrandbits(64))).digest()
key = utils.newHmacSecret()
self.sessions.setdefault(client_prefix, [])[1:] = key,
key = x509.encrypt(cert, key)
sign = self.cert.sign(key)
......@@ -349,26 +362,22 @@ class RegistryServer(object):
# (in case 'peers' is empty).
peer = self.prefix
with self.lock:
address = utils.ipFromBin(self.network + peer), tunnel.PORT
self.sock.sendto('\2', address)
self.sendto(peer, 1)
s = self.sock,
timeout = 3
end = timeout + time.time()
# Loop because there may be answers from previous requests.
while select.select(s, (), (), timeout)[0]:
msg = self.sock.recv(1<<16)
if msg[0] == '\1':
try:
msg = msg[1:msg.index('\n')]
except ValueError:
continue
if msg.split()[0] == peer:
break
prefix, msg = self.recv(1)
if prefix == peer:
break
timeout = max(0, end - time.time())
else:
logging.info("Timeout while querying [%s]:%u", *address)
logging.info("Timeout while querying address for %s/%s",
int(peer, 2), len(peer))
return
cert = self.getCert(cn)
msg = "%s %s" % (peer, msg)
logging.info("Sending bootstrap peer: %s", msg)
return x509.encrypt(cert, msg)
......@@ -387,59 +396,46 @@ class RegistryServer(object):
while True:
r, w, _ = select.select(s, s if peers else (), (), 3)
if r:
ver, address = self.sock.recvfrom(1<<16)
address = utils.binFromIp(address[0])
if (address.startswith(self.network) and
len(ver) > 1 and ver[0] in '\3\4' # BBB
):
try:
peer_dict[max(filter(address[len(self.network):]
.startswith, peer_dict),
key=len)] = ver[1:]
except ValueError:
pass
prefix, ver = self.recv(4)
if prefix:
peer_dict[prefix] = ver
if w:
x = peers.pop()
peer_dict[x] = None
x = utils.ipFromBin(self.network + x)
try:
self.sock.sendto('\3', (x, tunnel.PORT))
except socket.error:
pass
prefix = peers.pop()
peer_dict[prefix] = None
self.sendto(prefix, 4)
elif not r:
break
return repr(peer_dict)
return json.dumps(peer_dict)
@rpc
def topology(self):
p = lambda p: '%s/%s' % (int(p, 2), len(p))
peers = deque((p(self.prefix),))
graph = defaultdict(set)
s = self.sock,
with self.lock:
peers = deque(('%u/%u' % (int(self.prefix, 2), len(self.prefix)),))
cookie = hex(random.randint(0, 1<<32))[2:]
graph = dict.fromkeys(peers)
s = self.sock,
while True:
r, w, _ = select.select(s, s if peers else (), (), 3)
if r:
answer = self.sock.recv(1<<16)
if answer[0] == '\xfe':
answer = answer[1:].split('\n')[:-1]
if len(answer) >= 3 and answer[0] == cookie:
x = answer[3:]
assert answer[1] not in x, (answer, graph)
graph[answer[1]] = x[:int(answer[2])]
x = set(x).difference(graph)
peers += x
graph.update(dict.fromkeys(x))
prefix, x = self.recv(5)
if prefix and x:
prefix = p(prefix)
x = x.split()
try:
n = int(x.pop(0))
except ValueError:
continue
if n <= len(x) and prefix not in x:
graph[prefix].update(x[:n])
peers += set(x).difference(graph)
for x in x[n:]:
graph[x].add(prefix)
graph[''].add(prefix)
if w:
x = utils.binFromSubnet(peers.popleft())
x = utils.ipFromBin(self.network + x)
try:
self.sock.sendto('\xff%s\n' % cookie, (x, tunnel.PORT))
except socket.error:
pass
self.sendto(utils.binFromSubnet(peers.popleft()), 5)
elif not r:
break
return repr(graph)
return json.dumps(dict((k, list(v)) for k, v in graph.iteritems()))
class RegistryClient(object):
......
import errno, logging, os, random, socket, subprocess, time, weakref
from collections import defaultdict, deque
from . import ctl, plib, utils, version
from bisect import bisect, insort
from OpenSSL import crypto
from . import ctl, plib, utils, version, x509
PORT = 326
......@@ -141,7 +143,7 @@ class TunnelKiller(object):
if (self.address, self.ifindex) in tm.ctl.locked:
self.state = 'locked'
self.timeout = time.time() + 2 * tm.timeout
tm.sendto(self.peer, ('\4' if self.client else '\5') + tm._prefix)
tm.sendto(self.peer, '\2' if self.client else '\3')
else:
self.timeout = 0
......@@ -160,6 +162,8 @@ class TunnelKiller(object):
class BaseTunnelManager(object):
_forward = None
def __init__(self, peer_db, cert, address=()):
self.cert = cert
self._network = cert.network
......@@ -181,91 +185,152 @@ class BaseTunnelManager(object):
# about binding and anycast.
self.sock.bind(('::', PORT))
# Initialize with a dummy peer (self) so that '_peers' is never empty.
self._peers = [x509.Peer(self._prefix)]
def select(self, r, w, t):
r[self.sock] = self.handlePeerEvent
def sendto(self, peer, msg):
ip = utils.ipFromBin(self._network + peer)
try:
return self.sock.sendto(msg, (ip, PORT))
except socket.error, e:
(logging.info if e.errno == errno.ENETUNREACH else logging.error)(
'Failed to send message to %s/%s (%s)',
int(peer, 2), len(peer), e)
def sendto(self, prefix, msg):
to = utils.ipFromBin(self._network + prefix), PORT
peer = self._peers[bisect(self._peers, prefix) - 1]
if peer.prefix != prefix:
peer = x509.Peer(prefix)
insort(self._peers, peer)
elif peer.connected:
if msg is None:
return
return self._sendto(to, msg, peer)
msg = peer.hello0(self.cert.cert)
if msg and self._sendto(to, msg):
peer.hello0Sent()
def _sendto(self, to, msg):
def _sendto(self, to, msg, peer=None):
try:
return self.sock.sendto(msg, to[:2])
r = self.sock.sendto(peer.encode(msg) if peer else msg, to)
except socket.error, e:
(logging.info if e.errno == errno.ENETUNREACH else logging.error)(
'Failed to send message to %s (%s)', to, e)
return
if r and peer and msg:
peer.sent()
return r
def handlePeerEvent(self):
msg, address = self.sock.recvfrom(1<<16)
to = address[:2]
if address[0] == '::1':
sender = None
else:
try:
sender = utils.binFromIp(address[0])
except socket.error, e:
# inet_pton does not parse '<ipv6>%<iface>'
logging.warning('ignored message from %r (%s)', address, e)
return
if not sender.startswith(self._network):
prefix, msg = msg.split('\0', 1)
int(prefix, 2)
except ValueError:
return
if not msg:
if msg:
self._forward = to
code = ord(msg[0])
if prefix == self._prefix:
msg = self._processPacket(msg)
if msg:
self._sendto(to, '%s\0%c%s' % (prefix, code, msg))
else:
self.sendto(prefix, chr(code | 0x80) + msg[1:])
return
try:
sender = utils.binFromIp(address[0])
except socket.error, e:
return # inet_pton does not parse '<ipv6>%<iface>'
if len(msg) <= 4 or not sender.startswith(self._network):
return
code = ord(msg[0])
if code == 1: # answer
# Old versions may send additional and obsolete addresses.
# Ignore them, as well as truncated lines.
prefix = sender[len(self._network):]
peer = self._peers[bisect(self._peers, prefix) - 1]
msg = peer.decode(msg)
if type(msg) is tuple:
seqno, msg = msg
if seqno == 2:
i = len(msg) // 2
h = msg[:i]
try:
peer.verify(msg[i:], h)
peer.newSession(self.cert.decrypt(h))
except (AttributeError, crypto.Error, x509.NewSessionError,
subprocess.CalledProcessError):
logging.debug('ignored new session key from %r',
address, exc_info=1)
return
self._sendto(to, "", peer) # ack
return
if seqno:
h = x509.fingerprint(self.cert.cert).digest()
seqno = msg.startswith(h)
msg = msg[len(h):]
try:
prefix, address = msg[1:msg.index('\n')].split()
int(prefix, 2)
except ValueError:
pass
cert = self.cert.loadVerify(msg,
True, crypto.FILETYPE_ASN1)
except x509.VerifyError, e:
logging.debug('ignored invalid certificate from %r (%s)',
address, e.args[2])
return
p = utils.binFromSubnet(x509.subnetFromCert(cert))
if p != peer.prefix:
if not prefix.startswith(p):
logging.debug('received %s/%s cert from wrong source %r',
int(p, 2), len(p), address)
return
peer = x509.Peer(p)
insort(self._peers, peer)
peer.cert = cert
if seqno:
self._sendto(to, peer.hello(self.cert))
else:
if prefix != self._prefix:
self.peer_db.addPeer(prefix, address)
msg = peer.hello0(self.cert.cert)
if msg and self._sendto(to, msg):
peer.hello0Sent()
elif msg:
# We got a valid and non-empty message. Always reply
# something so that the sender knows we're still connected.
answer = self._processPacket(msg, peer.prefix)
self._sendto(to, msg[0] + answer if answer else "", peer)
def _processPacket(self, msg, peer=None):
c = ord(msg[0])
msg = msg[1:]
code = c & 0x7f
if c > 0x7f and msg:
if peer and self._forward:
self._sendto(self._forward, '%s\0%c%s' % (peer, code, msg))
elif code == 1: # address
if msg:
if peer:
self.peer_db.addPeer(peer, msg)
try:
self._connecting.remove(prefix)
self._connecting.remove(peer)
except KeyError:
pass
else:
self._makeTunnel(prefix, address)
elif code == 2: # request
if self._address:
self._sendto(address, '\1%s %s\n' % (self._prefix,
';'.join(self._address.itervalues())))
#else: # I don't know my IP yet!
elif code == 3:
if len(msg) == 1:
self._sendto(address, '\3' + version.version)
elif code in (4, 5): # kill
prefix = msg[1:]
if sender and sender.startswith(prefix, len(self._network)):
return
self._makeTunnel(peer, msg)
else:
return ';'.join(self._address.itervalues())
elif 2 <= code <= 3: # kill
if peer:
try:
tunnel_killer = self._killing[prefix]
tunnel_killer = self._killing[peer]
except AttributeError:
pass
except KeyError:
if code == 4 and prefix in self._served: # request
self._killing[prefix] = TunnelKiller(prefix, self)
if code == 2 and peer in self._served: # request
self._killing[peer] = TunnelKiller(peer, self)
else:
if code == 5 and tunnel_killer.state == 'locked': # response
self._kill(prefix)
elif code == 255:
if code == 3 and tunnel_killer.state == 'locked': # response
self._kill(peer)
elif code == 4:
if not msg:
return version.version
elif code == 5:
# the registry wants to know the topology for debugging purpose
if not sender or sender[len(self._network):].startswith(
self.peer_db.registry_prefix):
msg = ['\xfe%s%u/%u\n%u\n' % (msg[1:],
int(self._prefix, 2), len(self._prefix),
len(self._connection_dict))]
msg.extend('%u/%u\n' % (int(x, 2), len(x))
for x in (self._connection_dict, self._served)
for x in x)
try:
self.sock.sendto(''.join(msg), address[:2])
except socket.error, e:
pass
if not peer or peer == self.peer_db.registry_prefix:
return str(len(self._connection_dict)) + ''.join(
' %s/%s' % (int(x, 2), len(x))
for x in (self._connection_dict, self._served)
for x in x)
class TunnelManager(BaseTunnelManager):
......@@ -490,6 +555,8 @@ class TunnelManager(BaseTunnelManager):
registry in self._connection_dict or
registry in self._served):
self._disconnected = 0
# Be ready to receive any message from the registry.
self.sendto(registry, None)
# Do not bootstrap too often, especially if we are several
# nodes to try.
elif self._disconnected < time.time():
......@@ -526,7 +593,7 @@ class TunnelManager(BaseTunnelManager):
address = self.peer_db.getAddress(peer)
if address:
count -= self._makeTunnel(peer, address)
elif self.sendto(peer, '\2'):
elif self.sendto(peer, '\1'):
self._connecting.add(peer)
count -= 1
elif distant_peers is None:
......
import argparse, errno, logging, os, select as _select, shlex, signal
import argparse, errno, hashlib, logging, os, select as _select, shlex, signal
import socket, struct, subprocess, sys, textwrap, threading, time, traceback
HMAC_LEN = len(hashlib.sha1('').digest())
try:
subprocess.CalledProcessError(0, '', '')
except TypeError: # BBB: Python < 2.7
......@@ -223,3 +226,10 @@ def parse_address(address_list):
def binFromSubnet(subnet):
p, l = subnet.split('/')
return bin(int(p))[2:].rjust(int(l), '0')
def newHmacSecret():
from random import getrandbits as g
pack = struct.Struct(">QQI").pack
assert len(pack(0,0,0)) == HMAC_LEN
return lambda x=None: pack(g(64) if x is None else x, g(64), g(32))
newHmacSecret = newHmacSecret()
import calendar, logging, os, subprocess, threading, time
# -*- coding: utf-8 -*-
import calendar, hashlib, hmac, logging, os, struct, subprocess, threading, time
from collections import deque
from datetime import datetime
from OpenSSL import crypto
from . import utils
def newHmacSecret():
x = datetime.utcnow()
return utils.newHmacSecret(int(time.mktime(x.timetuple())) * 1000000
+ x.microsecond)
def networkFromCa(ca):
return bin(ca.get_serial_number())[3:]
......@@ -65,9 +73,14 @@ def maybe_renew(path, cert, info, renew):
info, exc_info=exc_info)
return cert, time.time() + 86400
class VerifyError(Exception):
pass
class NewSessionError(Exception):
pass
class Cert(object):
def __init__(self, ca, key, cert=None):
......@@ -105,11 +118,13 @@ class Cert(object):
"CA Certificate", registry.getCa)
return min(next_renew, ca_renew)
def loadVerify(self, cert, strict=False):
def loadVerify(self, cert, strict=False, type=crypto.FILETYPE_PEM):
try:
r = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
r = crypto.load_certificate(type, cert)
except crypto.Error:
raise VerifyError(None, None, 'unable to load certificate')
if type != crypto.FILETYPE_PEM:
cert = crypto.dump_certificate(crypto.FILETYPE_PEM, r)
p = openssl('verify', '-CAfile', self.ca_path)
out, err = p.communicate(cert)
if p.returncode or strict:
......@@ -132,3 +147,101 @@ class Cert(object):
if p.returncode:
raise subprocess.CalledProcessError(p.returncode, 'openssl', err)
return out
class Peer(object):
"""
UDP: A ─────────────────────────────────────────────> B
hello0: 0, A
1, fingerprint(B), A
hello: 2, X = E(B)(secret), S(A)(X)
!hello: #, ver, type, value, HMAC(secret)(payload)
└──── payload ────┘
new secret > old secret
(concat timestamp with random bits)
Reject messages with # smaller or equal than previously processed.
Yes, we do UDP on purpose. The only drawbacks are:
- The limited size of packets, but they are big enough for a network
using 4096-bits RSA keys.
- hello0 packets (0 & 1) are subject to DoS, because verifying a
certificate uses much CPU. A solution would be to use TCP until the
secret is exchanged and continue with UDP.
"""
_hello = _last = 0
_key = newHmacSecret()
def __init__(self, prefix):
assert len(prefix) == 16 or prefix == ('0' * 14 + '1' + '0' * 65), prefix
self.prefix = prefix
@property
def connected(self):
return self._last is None or time.time() < self._last + 60
def __ne__(self, other):
raise AssertionError
__eq__ = __ge__ = __le__ = __ne__
def __gt__(self, other):
return self.prefix > (other if type(other) is str else other.prefix)
def __lt__(self, other):
return self.prefix < (other if type(other) is str else other.prefix)
def hello0(self, cert):
if self._hello < time.time():
try:
msg = '\0\0\0\1' + fingerprint(self.cert).digest()
except AttributeError:
msg = '\0\0\0\0'
return msg + crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
def hello0Sent(self):
self._hello = time.time() + 60
def hello(self, cert):
key = self._key = newHmacSecret()
h = encrypt(crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert),
key)
self._i = self._j = 2
self._last = 0
return '\0\0\0\2' + h + cert.sign(h)
def _hmac(self, msg):
return hmac.HMAC(self._key, msg, hashlib.sha1).digest()
def newSession(self, key):
if key <= self._key:
raise NewSessionError(self._key, key)
self._key = key
self._i = self._j = 2
self._last = None
def verify(self, sign, data):
crypto.verify(self.cert, sign, data, 'sha1')
seqno_struct = struct.Struct("!L")
def decode(self, msg, _unpack=seqno_struct.unpack):
seqno, = _unpack(msg[:4])
if seqno <= 2:
return seqno, msg[4:]
i = -utils.HMAC_LEN
if self._hmac(msg[:i]) == msg[i:] and self._i < seqno:
self._last = None
self._i = seqno
return msg[4:i]
def encode(self, msg, _pack=seqno_struct.pack):
self._j += 1
msg = _pack(self._j) + msg
return msg + self._hmac(msg)
del seqno_struct
def sent(self):
if not self._last:
self._last = time.time()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment