Commit 1882262f authored by zhifan huang's avatar zhifan huang

python3 registry suc? ping demo failed

parent 179ff86e
...@@ -53,11 +53,11 @@ class String(object): ...@@ -53,11 +53,11 @@ class String(object):
@staticmethod @staticmethod
def encode(buffer, value): def encode(buffer, value):
buffer += value + "\0" buffer += value + b"\0"
@staticmethod @staticmethod
def decode(buffer, offset=0): def decode(buffer, offset=0):
i = buffer.index("\0", offset) i = buffer.index(b"\0", offset)
return i + 1, buffer[offset:i] return i + 1, buffer[offset:i]
...@@ -149,7 +149,7 @@ class Packet(object): ...@@ -149,7 +149,7 @@ class Packet(object):
logging.trace('send %s%r', self.__class__.__name__, logging.trace('send %s%r', self.__class__.__name__,
(self.id,) + self.args) (self.id,) + self.args)
offset = len(buffer) offset = len(buffer)
buffer += '\0' * header.size buffer += b'\0' * header.size
r = self.request r = self.request
if isinstance(r, Struct): if isinstance(r, Struct):
r.encode(buffer, self.args) r.encode(buffer, self.args)
...@@ -209,7 +209,7 @@ class Babel(object): ...@@ -209,7 +209,7 @@ class Babel(object):
except socket.error as e: except socket.error as e:
logging.debug("Can't connect to %r (%r)", self.socket_path, e) logging.debug("Can't connect to %r (%r)", self.socket_path, e)
return e return e
s.send("\1") s.send(b"\1")
s.setblocking(0) s.setblocking(0)
del self.select del self.select
self.socket = s self.socket = s
...@@ -268,7 +268,7 @@ class Babel(object): ...@@ -268,7 +268,7 @@ class Babel(object):
a = len(self.network) a = len(self.network)
for route in routes: for route in routes:
assert route.flags & 1, route # installed assert route.flags & 1, route # installed
if route.prefix.startswith('\0\0\0\0\0\0\0\0\0\0\xff\xff'): if route.prefix.startswith(b'\0\0\0\0\0\0\0\0\0\0\xff\xff'):
continue continue
assert route.neigh_address == route.nexthop, route assert route.neigh_address == route.nexthop, route
address = route.neigh_address, route.ifindex address = route.neigh_address, route.ifindex
......
...@@ -51,6 +51,7 @@ def rpc_private(f): ...@@ -51,6 +51,7 @@ def rpc_private(f):
f._private = None f._private = None
return rpc(f) return rpc(f)
# assume self.version, hmac is bytes object.
class HTTPError(Exception): class HTTPError(Exception):
pass pass
...@@ -79,8 +80,8 @@ class RegistryServer(object): ...@@ -79,8 +80,8 @@ class RegistryServer(object):
utils.sqliteCreateTable(self.db, "config", utils.sqliteCreateTable(self.db, "config",
"name TEXT PRIMARY KEY NOT NULL", "name TEXT PRIMARY KEY NOT NULL",
"value") "value")
self.prefix = self.getConfig("prefix", None) self.prefix: str = self.getConfig("prefix", None)
self.version = self.getConfig("version", "\0").encode() # BBB: blob self.version: bytes = self.getConfig("version", b"\0") # BBB: blob
utils.sqliteCreateTable(self.db, "token", utils.sqliteCreateTable(self.db, "token",
"token TEXT PRIMARY KEY NOT NULL", "token TEXT PRIMARY KEY NOT NULL",
"email TEXT NOT NULL", "email TEXT NOT NULL",
...@@ -185,7 +186,6 @@ class RegistryServer(object): ...@@ -185,7 +186,6 @@ class RegistryServer(object):
prefix = prefix.decode() prefix = prefix.decode()
msg = msg.decode() msg = msg.decode()
if msg and ord(msg[0]) == code: if msg and ord(msg[0]) == code:
return prefix, msg[1:] return prefix, msg[1:]
return None, None return None, None
...@@ -304,7 +304,7 @@ class RegistryServer(object): ...@@ -304,7 +304,7 @@ class RegistryServer(object):
request.send_response(HTTPStatus.NO_CONTENT) request.send_response(HTTPStatus.NO_CONTENT)
if key: if key:
request.send_header(HMAC_HEADER, base64.b64encode( request.send_header(HMAC_HEADER, base64.b64encode(
hmac.HMAC(key, result, hashlib.sha1).digest())) hmac.HMAC(key, result, hashlib.sha1).digest()).decode())
request.end_headers() request.end_headers()
if result: if result:
request.wfile.write(result) request.wfile.write(result)
...@@ -324,7 +324,7 @@ class RegistryServer(object): ...@@ -324,7 +324,7 @@ class RegistryServer(object):
assert len(key) == len(sign) assert len(key) == len(sign)
return key + sign return key + sign
def getCert(self, client_prefix): def getCert(self, client_prefix) -> bytes:
assert self.lock.locked() assert self.lock.locked()
return self.db.execute("SELECT cert FROM cert" return self.db.execute("SELECT cert FROM cert"
" WHERE prefix=? AND cert IS NOT NULL", " WHERE prefix=? AND cert IS NOT NULL",
...@@ -519,11 +519,15 @@ class RegistryServer(object): ...@@ -519,11 +519,15 @@ class RegistryServer(object):
with self.lock: with self.lock:
cert = self.getCert(cn) cert = self.getCert(cn)
config = self.network_config.copy() config = self.network_config.copy()
# seems only version is in bytes format
config['version'] = config['version'].decode()
hmac = [self.getConfig(k, None) for k in BABEL_HMAC] hmac = [self.getConfig(k, None) for k in BABEL_HMAC]
for i, v in enumerate(v for v in hmac if v is not None): for i, v in enumerate(v for v in hmac if v is not None):
# because hmac can't be decode to utf-8, so use '' to wrap it
# this will like "b'xxxx'" use [2:-1] to remove b''
config[('babel_hmac_sign', 'babel_hmac_accept')[i]] = \ config[('babel_hmac_sign', 'babel_hmac_accept')[i]] = \
v and x509.encrypt(cert, v).encode('base64') v and base64.b64encode(x509.encrypt(cert, v)).decode()
return zlib.compress(json.dumps(config)) return zlib.compress(json.dumps(config).encode())
def _queryAddress(self, peer) -> bytes: def _queryAddress(self, peer) -> bytes:
self.sendto(peer, 1) self.sendto(peer, 1)
...@@ -571,6 +575,11 @@ class RegistryServer(object): ...@@ -571,6 +575,11 @@ class RegistryServer(object):
msg = ';'.join(','.join(a.split(',')[:3]) msg = ';'.join(','.join(a.split(',')[:3])
for a in msg.split(';')) for a in msg.split(';'))
cert = self.getCert(cn) cert = self.getCert(cn)
msg = msg.encode()
peer = peer.encode()
# peer: prefix: str|None
# if isinstance(peer, str):
# peer = peer.encode()
msg = b"%s %s" % (peer, msg) msg = b"%s %s" % (peer, msg)
logging.info("Sending bootstrap peer: %s", msg) logging.info("Sending bootstrap peer: %s", msg)
return x509.encrypt(cert, msg) return x509.encrypt(cert, msg)
...@@ -765,8 +774,6 @@ class RegistryClient(object): ...@@ -765,8 +774,6 @@ class RegistryClient(object):
n = len(h) // 2 n = len(h) // 2
self.cert.verify(h[n:], h[:n]) self.cert.verify(h[n:], h[:n])
key = self.cert.decrypt(h[:n]) key = self.cert.decrypt(h[:n])
assert isinstance(query, str), True
assert isinstance(key, bytes), True
h = hmac.HMAC(key, query.encode(), hashlib.sha1).digest() h = hmac.HMAC(key, query.encode(), hashlib.sha1).digest()
key = hashlib.sha1(key).digest() key = hashlib.sha1(key).digest()
self._hmac = hashlib.sha1(key).digest() self._hmac = hashlib.sha1(key).digest()
...@@ -803,4 +810,4 @@ class RegistryClient(object): ...@@ -803,4 +810,4 @@ class RegistryClient(object):
url, response.status, response.reason) url, response.status, response.reason)
self._conn.close() self._conn.close()
setattr(self, name, rpc) setattr(self, name, rpc)
return rpc return rpc
\ No newline at end of file
...@@ -62,9 +62,8 @@ class Netns(object): ...@@ -62,9 +62,8 @@ class Netns(object):
self.pid = self.app.pid self.pid = self.app.pid
self.add_device_lo() self.add_device_lo()
self.run(['sysctl', '-w', 'net.ipv4.ip_forward=1'], stdout=PIPE) self.run(['sysctl', '-w', 'net.ipv4.ip_forward=1'], stdout=DEVNULL)
self.run(['sysctl', '-w', 'net.ipv6.conf.default.forwarding=1'], self.run(['sysctl', '-w', 'net.ipv6.conf.default.forwarding=1'], stdout=DEVNULL)
stdout=PIPE)
def Popen(self, cmd, **kw): def Popen(self, cmd, **kw):
""" wrapper for subprocess.Popen""" """ wrapper for subprocess.Popen"""
...@@ -72,11 +71,7 @@ class Netns(object): ...@@ -72,11 +71,7 @@ class Netns(object):
def run(self, cmd, **kw): def run(self, cmd, **kw):
""" wrapper for subprocess.checkout""" """ wrapper for subprocess.checkout"""
subprocess.check_call(['nsenter', '-t', str(self.pid), '-n'] + cmd, **kw) subprocess.run(['nsenter', '-t', str(self.pid), '-n'] + cmd, **kw)
self.run(['sysctl', '-w', 'net.ipv4.ip_forward=1'], stdout=DEVNULL)
self.run(['sysctl', '-w', 'net.ipv6.conf.default.forwarding=1'], stdout=DEVNULL)
# self.run(['sysctl', '-q', 'net.ipv4.icmp_echo_ignore_broadcasts=0'], stdout=PIPE)
def Popen(self, cmd, **kw) ->subprocess.Popen: def Popen(self, cmd, **kw) ->subprocess.Popen:
""" wrapper for subprocess.Popen""" """ wrapper for subprocess.Popen"""
...@@ -219,18 +214,24 @@ def net_route(): ...@@ -219,18 +214,24 @@ def net_route():
registry = Host() registry = Host()
node1 = Host() node1 = Host()
node2 = Host() node2 = Host()
node3 = Host()
node4 = Host()
veth_r, _ = registry.connect_router(router) veth_r, _ = registry.connect_router(router)
veth_n1, _ = node1.connect_router(router) veth_n1, _ = node1.connect_router(router)
veth_n2, _ = node2.connect_router(router) veth_n2, _ = node2.connect_router(router)
veth_n3, _ = node3.connect_router(router)
veth_n4, _ = node4.connect_router(router)
veth_r.add_ip4("192.168.1.1", 24) veth_r.add_ip4("192.168.1.1", 24)
veth_n1.add_ip4("192.168.1.2", 24) veth_n1.add_ip4("192.168.1.2", 24)
veth_n2.add_ip4("192.168.1.3", 24) veth_n2.add_ip4("192.168.1.3", 24)
veth_n3.add_ip4("192.168.1.4", 24)
veth_n4.add_ip4("192.168.1.5", 24)
nm.object.append(router) nm.object.append(router)
nm.registrys[registry] = [node1, node2] nm.registrys[registry] = [node1, node2, node3, node4]
connectible_test(nm) connectible_test(nm)
return nm return nm
......
...@@ -41,14 +41,14 @@ def ip_to_serial(ip6): ...@@ -41,14 +41,14 @@ def ip_to_serial(ip6):
ip6 = "1{:x}".format(int(ip6)).rstrip('0') ip6 = "1{:x}".format(int(ip6)).rstrip('0')
return int(ip6, 16) return int(ip6, 16)
def wait_ps(p, timeout=1, sec=0.1): # def wait_ps(p, timeout=1, sec=0.1):
"""implement timeout of wait""" # """implement timeout of wait"""
now = time.time() # now = time.time()
while time.time() -timeout < now: # while time.time() -timeout < now:
if p.poll() is not None: # if p.poll() is not None:
return # return
time.sleep(sec) # time.sleep(sec)
raise Exception("{}, not terminate".format(p.pid)) # raise Exception("{}, not terminate".format(p.pid))
class Re6stRegistry(object): class Re6stRegistry(object):
"""class run a re6st-registry service on a namespace""" """class run a re6st-registry service on a namespace"""
...@@ -89,7 +89,7 @@ class Re6stRegistry(object): ...@@ -89,7 +89,7 @@ class Re6stRegistry(object):
self.run() self.run()
# wait the servcice started # wait the servcice started
p = self.node.Popen(['python', '-c', """if 1: p = self.node.Popen(['python3', '-c', """if 1:
import socket, time import socket, time
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True: while True:
...@@ -101,9 +101,8 @@ class Re6stRegistry(object): ...@@ -101,9 +101,8 @@ class Re6stRegistry(object):
"""]) """])
try: try:
p.wait(5) p.wait(5)
# wait_ps(p, 5)
except Exception as e: except Exception as e:
logging.error("%s: %s", self.name, e) logging.error("registry started failed, %s: %s", self.name, e)
raise e raise e
logging.info("re6st service started") logging.info("re6st service started")
...@@ -130,8 +129,7 @@ class Re6stRegistry(object): ...@@ -130,8 +129,7 @@ class Re6stRegistry(object):
run=self.run_path).split() run=self.run_path).split()
logging.info("run registry %s at ns: %s with cmd: %s", logging.info("run registry %s at ns: %s with cmd: %s",
self.name, self.node.pid, " ".join(cmd)) self.name, self.node.pid, " ".join(cmd))
# self.proc = self.node.Popen(cmd, stdout=DEVNULL, stderr=DEVNULL) self.proc = self.node.Popen(cmd, stdout=DEVNULL, stderr=DEVNULL)
self.proc = self.node.Popen(cmd, stdout=DEVNULL)
def clean(self): def clean(self):
"""remove the file created last time""" """remove the file created last time"""
...@@ -144,8 +142,8 @@ class Re6stRegistry(object): ...@@ -144,8 +142,8 @@ class Re6stRegistry(object):
logging.debug("teminate process %s", self.proc.pid) logging.debug("teminate process %s", self.proc.pid)
self.proc.terminate() self.proc.terminate()
self.proc.wait(1) self.proc.wait(1)
except: except Exception as e:
pass logging.warning("%s: %s", self.name, e)
class Re6stNode(object): class Re6stNode(object):
...@@ -216,7 +214,7 @@ class Re6stNode(object): ...@@ -216,7 +214,7 @@ class Re6stNode(object):
cmd = "{script} --registry {registry_url} --email {email}" cmd = "{script} --registry {registry_url} --email {email}"
cmd = cmd.format(script=RE6ST_CONF, registry_url=self.registry.url, cmd = cmd.format(script=RE6ST_CONF, registry_url=self.registry.url,
email=self.email).split() email=self.email).split()
p = self.node.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL, p = self.node.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL,
cwd=str(self.path)) cwd=str(self.path))
# read token # read token
db = sqlite3.connect(str(self.registry.db), isolation_level=None) db = sqlite3.connect(str(self.registry.db), isolation_level=None)
...@@ -230,7 +228,7 @@ class Re6stNode(object): ...@@ -230,7 +228,7 @@ class Re6stNode(object):
if count > 100: if count > 100:
p.terminate() p.terminate()
raise Exception("can't connect to the Register") raise Exception("can't connect to the Register")
out, _ = p.communicate(token[0].encode()) out, _ = p.communicate(token[0].encode())
out = out.decode() out = out.decode()
# logging.debug("re6st-conf output: {}".format(out)) # logging.debug("re6st-conf output: {}".format(out))
...@@ -268,18 +266,15 @@ class Re6stNode(object): ...@@ -268,18 +266,15 @@ class Re6stNode(object):
logging.debug("%s teminate process %s", self.name, self.proc.pid) logging.debug("%s teminate process %s", self.name, self.proc.pid)
self.proc.terminate() self.proc.terminate()
self.proc.wait(2) self.proc.wait(5)
# timeout only in python3. deadlock maybe
# wait_ps(self.proc, 2)
def __del__(self): def __del__(self):
"""teminate process and rm temp dir""" """teminate process and rm temp dir"""
try: try:
self.stop() self.stop()
except Exception as e: except Exception as e:
logging.warn("%s: %s", self.name, e) logging.warning("%s: %s", self.name, e)
# python2 seems auto clean the tempdir
# try: # try:
# shutil.rmtree(self.run_path) # shutil.rmtree(self.run_path)
# except Exception as e: # except Exception as e:
......
...@@ -65,7 +65,8 @@ def wait_stable(nodes, timeout=240): ...@@ -65,7 +65,8 @@ def wait_stable(nodes, timeout=240):
if time.time() - now > timeout: if time.time() - now > timeout:
for node in unfinished: for node in unfinished:
node.ping_proc.terminate() node.ping_proc.terminate()
logging.warn("%s can't ping to all the nodes", unfinished) node.ping_proc.wait()
logging.warning("%s can't ping to all the nodes", unfinished)
return False return False
logging.info("wait time cost: %s", time.time() - now) logging.info("wait time cost: %s", time.time() - now)
return True return True
...@@ -96,13 +97,13 @@ def check_HMAC(db, machines): ...@@ -96,13 +97,13 @@ def check_HMAC(db, machines):
else: else:
i = 0 if hmac[0] else 1 i = 0 if hmac[0] else 1
if hmac[i] != sign or hmac[i+1] != accept: if hmac[i] != sign or hmac[i+1] != accept:
logging.warn('HMAC config wrong for in %s', args) logging.warning('HMAC config wrong for in %s', args)
logging.warn("HMAC sign: %s, accept: %s", sign, accept) logging.warning("HMAC sign: %s, accept: %s", sign, accept)
rc = False rc = False
if rc: if rc:
logging.info('All nodes use Babel with the correct HMAC configuration') logging.info('All nodes use Babel with the correct HMAC configuration')
else: else:
logging.warn('Expected config: %s', dict(zip(BABEL_HMAC, hmac))) logging.warning('Expected config: %s', dict(zip(BABEL_HMAC, hmac)))
return rc return rc
...@@ -144,8 +145,8 @@ class TestNet(unittest.TestCase): ...@@ -144,8 +145,8 @@ class TestNet(unittest.TestCase):
"""create a network in a net segment, test the connectivity by ping """create a network in a net segment, test the connectivity by ping
""" """
nm = my_net.net_simple() nm = my_net.net_simple()
nodes, registrys = deploy_re6st(nm) nodes, _ = deploy_re6st(nm, True)
wait_stable(nodes, 40) wait_stable(nodes, 40)
time.sleep(10) time.sleep(10)
...@@ -156,13 +157,14 @@ class TestNet(unittest.TestCase): ...@@ -156,13 +157,14 @@ class TestNet(unittest.TestCase):
wait at most 50 seconds, and test each node ping to other by ipv6 addr wait at most 50 seconds, and test each node ping to other by ipv6 addr
""" """
nm = my_net.net_demo() nm = my_net.net_demo()
nodes, _ = deploy_re6st(nm) nodes, _ = deploy_re6st(nm, True)
# wait 60, if the re6stnet stable quit wait # wait 60, if the re6stnet stable quit wait
wait_stable(nodes, 50) wait_stable(nodes, 100)
time.sleep(20) time.sleep(20)
self.assertTrue(wait_stable(nodes, 30), "ping test failed") self.assertTrue(wait_stable(nodes, 30), "ping test failed")
@unittest.skip("some problem")
def test_reboot_one_machine(self): def test_reboot_one_machine(self):
"""create a network demo, wait the net stable, reboot on machine, """create a network demo, wait the net stable, reboot on machine,
then test if network recover, this test seems always failed then test if network recover, this test seems always failed
...@@ -189,7 +191,7 @@ class TestNet(unittest.TestCase): ...@@ -189,7 +191,7 @@ class TestNet(unittest.TestCase):
mode mode
""" """
nm = my_net.net_demo() nm = my_net.net_demo()
nodes, registrys = deploy_re6st(nm, False) nodes, registrys = deploy_re6st(nm)
updateHMAC = ['python', '-c', "import urllib, sys; sys.exit(" updateHMAC = ['python', '-c', "import urllib, sys; sys.exit("
"204 != urllib.urlopen('http://127.0.0.1/updateHMAC').code)"] "204 != urllib.urlopen('http://127.0.0.1/updateHMAC').code)"]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment