Commit 5bc6419c authored by zhifan huang's avatar zhifan huang

test: deploy re6st on net ns, and run ping test

network_build: use nemu to create net namespace, provide several network

re6st_wrap: wrap the deploy of re6st node, ease the creation of cert
file and run of the node

test_net contain the tests:
  - ping test: net segment(router), multi-net segment(demo),test after a
    reboot. Because net_demo contain a subnet UPnP, test is not stable.

ping is like ping file in demo, a script to run multiping
parent e7636b86
# -*- coding: utf-8 -*-
# Copyright 2010, 2011 INRIA
# Copyright 2011 Martín Ferrari <martin.ferrari@gmail.com>
#
# This file is contains patches to Nemu.
#
# Nemu is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License version 2, as published by the Free
# Software Foundation.
#
# Nemu is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Nemu. If not, see <http://www.gnu.org/licenses/>.
import re
from new import function
from nemu.iproute import backticks, get_if_data, route, \
get_addr_data, get_all_route_data, interface
def _get_all_route_data():
ipdata = backticks([IP_PATH, "-o", "route", "list"]) # "table", "all"
ipdata += backticks([IP_PATH, "-o", "-f", "inet6", "route", "list"])
ifdata = get_if_data()[1]
ret = []
for line in ipdata.split("\n"):
if line == "":
continue
# PATCH: parse 'from'
# PATCH: 'dev' is missing on 'unreachable' ipv4 routes
match = re.match('(?:(unicast|local|broadcast|multicast|throw|'
r'unreachable|prohibit|blackhole|nat) )?(\S+)(?: from (\S+))?'
r'(?: via (\S+))?(?: dev (\S+))?.*(?: metric (\d+))?', line)
if not match:
raise RuntimeError("Invalid output from `ip route': `%s'" % line)
tipe = match.group(1) or "unicast"
prefix = match.group(2)
#src = match.group(3)
nexthop = match.group(4)
interface = ifdata[match.group(5) or "lo"]
metric = match.group(6)
if prefix == "default" or re.search(r'/0$', prefix):
prefix = None
prefix_len = 0
else:
match = re.match(r'([0-9a-f:.]+)(?:/(\d+))?$', prefix)
prefix = match.group(1)
prefix_len = int(match.group(2) or 32)
ret.append(route(tipe, prefix, prefix_len, nexthop, interface.index,
metric))
return ret
get_all_route_data.func_code = _get_all_route_data.func_code
interface__init__ = interface.__init__
def __init__(self, *args, **kw):
interface__init__(self, *args, **kw)
if self.name:
self.name = self.name.split('@',1)[0]
interface.__init__ = __init__
get_addr_data.orig = function(get_addr_data.func_code,
get_addr_data.func_globals)
def _get_addr_data():
byidx, bynam = get_addr_data.orig()
return byidx, {name.split('@',1)[0]: a for name, a in bynam.iteritems()}
get_addr_data.func_code = _get_addr_data.func_code
clean_ruleset_interval=600
allow 1024-65535 10.0.0.0/8 1024-65535
deny 0-65535 0.0.0.0/0 0-65535
import time
import nemu
import weakref
import ipaddress
import logging
from subprocess import PIPE
from pathlib2 import Path
fix_file = Path(__file__).parent.resolve() / "fixnemu.py"
execfile(str(fix_file))
IPTABLES = 'iptables-nft'
class Node(nemu.Node):
"""simple nemu.Node used for registry and nodes"""
def __init__(self):
super(Node, self).__init__()
self.Popen(('sysctl', '-q',
'net.ipv4.icmp_echo_ignore_broadcasts=0')).wait()
def _add_interface(self, iface):
self.iface = iface
iface.__dict__['node'] = weakref.proxy(self)
return super(Node, self)._add_interface(iface)
@property
def ip(self):
if hasattr(self, "_ip"):
return str(self._ip)
# return 1 ipv4 address of the one interface, reverse mode
for iface in self.get_interfaces()[::-1]:
for addr in iface.get_addresses():
addr = addr['address']
if '.' in addr:
#TODO different type problem?
self._ip = addr
return addr
def connect_switch(self, switch, ip, prefix_len=24):
self.if_s = if_s = nemu.NodeInterface(self)
switch.connect(if_s)
if_s.up = True
if_s.add_v4_address(ip, prefix_len=prefix_len)
return if_s
class NetManager(object):
"""contain all the nemu object created, so they can live more time"""
def __init__(self):
self.object = []
self.registrys = {}
def connectible_test(nm):
"""test each node can ping to their registry
Args:
nm: NetManger
Raise:
AssertionError
"""
for reg in nm.registrys:
for node in nm.registrys[reg]:
app0 = node.Popen(["ping", "-c", "1", reg.ip], stdout=PIPE)
ret = app0.wait()
assert ret == 0, "network construct failed {} to {}".format(node.ip, reg.ip)
logging.debug("each node can ping to their registry")
def net_route():
"""build a network connect by a route(bridge)
Returns:
a network manager contain 3 nodes
"""
nm = NetManager()
switch1 = nemu.Switch()
switch1.up = True
registry = Node()
machine1 = Node()
machine2 = Node()
r1_if_0 = registry.connect_switch(switch1, "192.168.1.1")
m1_if_0 = machine1.connect_switch(switch1, "192.168.1.2")
m2_if_0 = machine2.connect_switch(switch1, "192.168.1.3")
nm.object.append(switch1)
nm.registrys[registry] = [machine1, machine2]
connectible_test(nm)
return nm
def net_demo():
internet = Node()
gateway1 = Node()
gateway2 = Node()
registry = Node()
m1 = Node()
m2 = Node()
m3 = Node()
m4 = Node()
m5 = Node()
m6 = Node()
m7 = Node()
m8 = Node()
switch1 = nemu.Switch()
switch2 = nemu.Switch()
switch3 = nemu.Switch()
nm = NetManager()
nm.object = [internet, switch3, switch1, switch2, gateway1, gateway2]
nm.registrys = {registry: [m1, m2, m3, m4, m5, m6, m7, m8]}
# for node in [g1, m3, m4, m5]:
# print "pid: {}".format(node.pid)
re_if_0, in_if_0 = nemu.P2PInterface.create_pair(registry,internet)
g1_if_0, in_if_1 = nemu.P2PInterface.create_pair(gateway1, internet)
g2_if_0, in_if_2 = nemu.P2PInterface.create_pair(gateway2, internet)
re_if_0.add_v4_address(address="10.0.0.2", prefix_len=24)
in_if_0.add_v4_address(address='10.0.0.1', prefix_len=24)
in_if_1.add_v4_address(address='10.1.0.1', prefix_len=24)
in_if_2.add_v4_address(address='10.2.0.1', prefix_len=24)
g1_if_0.add_v4_address(address='10.1.0.2', prefix_len=24)
g2_if_0.add_v4_address(address='10.2.0.2', prefix_len=24)
for iface in [re_if_0, in_if_0, g1_if_0, in_if_1, g2_if_0, in_if_2]:
nm.object.append(iface)
iface.up = True
ip = ipaddress.ip_address(u"10.1.1.1")
for i, node in enumerate([gateway1, m1, m2]):
iface = node.connect_switch(switch1, str(ip + i))
nm.object.append(iface)
if i: # except the first
node.add_route(nexthop=ip)
gateway1.Popen((IPTABLES, '-t', 'nat', '-A', 'POSTROUTING', '-o', g1_if_0.name, '-j', 'MASQUERADE')).wait()
gateway1.Popen((IPTABLES, '-t', 'nat', '-N', 'MINIUPNPD')).wait()
gateway1.Popen((IPTABLES, '-t', 'nat', '-A', 'PREROUTING', '-i', g1_if_0.name, '-j', 'MINIUPNPD')).wait()
gateway1.Popen((IPTABLES, '-N', 'MINIUPNPD')).wait()
ip = ipaddress.ip_address(u"10.2.1.1")
for i, node in enumerate([gateway2, m3, m4, m5]):
iface = node.connect_switch(switch1, str(ip + i))
nm.object.append(iface)
if i: # except the first
node.add_route(prefix='10.0.0.0', prefix_len=8, nexthop=ip)
ip = ipaddress.ip_address(u"10.0.1.1")
for i, node in enumerate([internet, m6, m7, m8]):
iface = node.connect_switch(switch2, str(ip + i))
nm.object.append(iface)
if i: # except the first
node.add_route(prefix='10.0.0.0', prefix_len=8, nexthop=ip)
registry.add_route(prefix='10.0.0.0', prefix_len=8, nexthop='10.0.0.1')
gateway1.add_route(prefix='10.0.0.0', prefix_len=8, nexthop='10.1.0.1')
gateway2.add_route(prefix='10.0.0.0', prefix_len=8, nexthop='10.2.0.1')
internet.add_route(prefix='10.2.0.0', prefix_len=16, nexthop='10.2.0.2')
MINIUPnP_CONF = Path(__file__).parent / 'miniupnpd.conf'
gateway1.proc = gateway1.Popen(['miniupnpd', '-d', '-f', MINIUPnP_CONF,
'-P', 'miniupnpd.pid','-a', gateway1.if_s.name,
'-i', g1_if_0.name],
stdout=PIPE, stderr=PIPE)
switch1.up = switch2.up = switch3.up =True
connectible_test(nm)
return nm
def network_direct():
"""one server and one client connect direct"""
registry = Node()
m0 = Node()
nm = NetManager()
nm.registrys = {registry: [m0]}
re_if_0, m_if_0 = nemu.P2PInterface.create_pair(registry, m0)
registry._ip = u"10.1.2.1"
re_if_0.add_v4_address(u"10.1.2.1", prefix_len=24)
m_if_0.add_v4_address(u"10.1.2.2", prefix_len=24)
re_if_0.up = m_if_0.up = True
for node in [m0]:
app0 = node.Popen(["ping", "-c", "1", "10.1.2.1"], stdout=PIPE)
ret = app0.wait()
assert ret == 0, "network construct failed"
return nm
if __name__ == "__main__":
nm = network_demo()
time.sleep(1000000)
# -*- coding: utf-8 -*-
'''
Script launched on machines from the demo with the option -p/--ping
It uses Multiping to ping several IPs passed as arguments.
After Re6st is stable, this script logs when it does not get response from a
machine in a csv file stored in the directory of the machine in this format:
time, sequence number, number of non-responding machines, ip of these machines
'''
import argparse, errno, socket, time, sys
from multiping import MultiPing
PING_INTERVAL = 10
PING_TIMEOUT = 4
class MultiPing(MultiPing):
# Patch of Multiping because it stays blocked to ipv4
# emission when we want to ping only ipv6 addresses.
# So we only keep the ipv6 part for the demo.
# Bug issued: https://github.com/romana/multi-ping/issues/22
def _read_all_from_socket(self, timeout):
pkts = []
if self._ipv6_address_present:
try:
self._sock6.settimeout(timeout)
while True:
p = self._sock6.recv(128)
pkts.append((bytearray(p), time.time()))
self._sock6.settimeout(0)
except socket.timeout:
pass
except socket.error as e:
if e.errno == errno.EWOULDBLOCK:
pass
else:
raise
return pkts
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', nargs = '+', help = 'the list of addresses to ping')
parser.add_argument('--retry', action='store_true', help='retry ping unitl success')
args = parser.parse_args()
addrs = args.a
retry = args.retry
while True:
mp = MultiPing(addrs)
mp.send()
_, no_responses = mp.receive(PING_TIMEOUT)
if retry and no_responses:
continue
else:
sys.stdout.write(" ".join(no_responses))
return
if __name__ == '__main__':
main()
"""wrap the deploy of re6st node, ease the creation of cert
file and run of the node
"""
import json
import shutil
import sqlite3
import weakref
import ipaddress
import time
import re
import tempfile
import logging
from subprocess import PIPE, call
from pathlib2 import Path
import re6st.tests.tools as tools
WORK_DIR = Path(__file__).parent / "temp_net_test"
DH_FILE = WORK_DIR / "dh2048.pem"
RE6STNET = "re6stnet"
RE6STNET = "python -m re6st.cli.node"
RE6ST_REGISTRY = "re6st-registry"
RE6ST_REGISTRY = "python -m re6st.cli.registry"
RE6ST_CONF = "re6st-conf"
RE6ST_CONF = "python -m re6st.cli.conf"
def initial():
"""create the workplace and dh file"""
if not WORK_DIR.exists():
WORK_DIR.mkdir()
if not DH_FILE.exists():
logging.info("create dh file")
call(['openssl', 'dhparam', '-out', str(DH_FILE), '2048'], stderr=PIPE)
def ip_to_serial(ip6):
"""convert ipv6 address to serial"""
ip6 = ipaddress.IPv6Address(u"{}".format(ip6))
ip6 = "1{:x}".format(int(ip6)).rstrip('0')
return int(ip6, 16)
class Re6stRegistry(object):
"""class run a re6st-registry service on a namespace"""
registry_seq = 0
def __init__(self, node, ip6, client_number, recreate=False):
self.node = node
# TODO need set once
self.ip = node.ip
self.ip6 = ip6
self.client_number = client_number
self.name = self.generate_name()
self.path = WORK_DIR / self.name
self.ca_key = self.path / "ca.key"
# because re6st-conf will create ca.crt so use another name
self.ca_crt = self.path / "ca.cert"
self.log = self.path / "registry.log"
self.db = self.path / "registry.db"
self.run_path = tempfile.mkdtemp()
if recreate and self.path.exists():
shutil.rmtree(str(self.path))
if not self.path.exists():
self.create_registry()
# use hash to identify the registry
with self.ca_key.open() as f:
text = f.read()
self.ident = hash(text)
# clear log file
if self.log.exists():
self.log.unlink()
self.clean()
self.run()
# wait the servcice started
p = self.node.Popen(['python', '-c', """if 1:
import socket, time
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
s.connect(('localhost', 80))
break
except socket.error:
time.sleep(.1)
"""])
now = time.time()
while time.time() - now < 10:
if p.poll() != None:
break
time.sleep(0.1)
else:
p.destroy()
logging.error("registry failed to start, %s", self.name)
raise Exception("registry failed to start")
logging.info("re6st service started")
@classmethod
def generate_name(cls):
cls.registry_seq += 1
return "registry_{}".format(cls.registry_seq)
@property
def url(self):
return "http://{ip}/".format(ip=self.ip)
def create_registry(self):
self.path.mkdir()
tools.create_ca_file(str(self.ca_key), str(self.ca_crt),
serial=ip_to_serial(self.ip6))
def run(self):
cmd = ("{script} --ca {ca} --key {key} --dh {dh} --ipv4 10.42.0.0/16 8 "
" --logfile {log} --db {db} --run {run} --hello 4 --mailhost s "
"-v4 --client-count {nb}")
cmd = cmd.format(script=RE6ST_REGISTRY, ca=self.ca_crt,
key=self.ca_key, dh=DH_FILE, log=self.log, db=self.db,
run=self.run_path, nb=(self.client_number+1)//2).split()
logging.debug("run registry %s at ns: %s with cmd: %s",
self.name, self.node.pid, " ".join(cmd))
self.proc = self.node.Popen(cmd, stdout=PIPE, stderr=PIPE)
def clean(self):
"""remove the file created last time"""
for f in [self.log]:
if f.exists():
f.unlink()
def __del__(self):
try:
logging.debug("teminate process %s", self.proc.pid)
self.proc.destroy()
except:
pass
class Re6stNode(object):
"""class run a re6stnet service on a namespace"""
node_seq = 0
def __init__(self, node, registry, name=None, recreate=False):
"""
node: nemu node
name: name for res6st node
"""
self.name = name if name else self.generate_name()
self.node = node
self.registry = weakref.proxy(registry)
self.path = WORK_DIR / self.name
self.email = self.name + "@example.com"
if self.name == self.registry.name:
self.run_path = self.registry.run_path
else:
self.run_path = tempfile.mkdtemp()
self.log = self.path / "re6stnet.log"
self.crt = self.path / "cert.crt"
self.key = self.path / 'cert.key'
self.console = self.run_path + "/console.sock"
self.data_file = self.path / "data.json" # contain data for restart node
# condition, node of the registry
if self.name == self.registry.name:
self.ip6 = self.registry.ip6
if not self.crt.exists():
self.create_node()
else:
# if ca file changed, we need recreate node file
if self.data_file.exists():
with self.data_file.open() as f:
data = json.load(f)
self.ip6 = data.get("ip6")
recreate = not data.get('hash') == self.registry.ident
else:
recreate = True
if recreate and self.path.exists():
shutil.rmtree(str(self.path))
if not self.path.exists():
self.path.mkdir()
self.create_node()
logging.debug("%s's subnet is %s", self.name, self.ip6)
self.clean()
def __repr__(self):
return self.name
@classmethod
def generate_name(cls):
cls.node_seq += 1
return "node_{}".format(cls.node_seq)
def create_node(self):
"""create necessary file for node"""
logging.info("create dir of node %s", self.name)
cmd = "{script} --registry {registry_url} --email {email}"
cmd = cmd.format(script=RE6ST_CONF, registry_url=self.registry.url,
email=self.email).split()
p = self.node.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE,
cwd=str(self.path))
# read token
db = sqlite3.connect(str(self.registry.db), isolation_level=None)
count = 0
token = None
while not token:
time.sleep(.1)
token = db.execute("SELECT token FROM token WHERE email=?",
(self.email,)).fetchone()
count += 1
if count > 100:
p.destroy()
raise Exception("can't connect to the Register")
out, _ = p.communicate(str(token[0]))
# logging.debug("re6st-conf output: {}".format(out))
# find the ipv6 subnet of node
self.ip6 = re.search('(?<=subnet: )[0-9:a-z]+', out).group(0)
data = {'ip6': self.ip6, 'hash': self.registry.ident}
with open(str(self.data_file), 'w') as f:
json.dump(data, f)
logging.info("create dir of node %s finish", self.name)
def run(self, *args):
"""execute re6stnet"""
cmd = ("{script} --log {log} --run {run} --state {state}"
" --dh {dh} --ca {ca} --cert {cert} --key {key} -v4"
" --registry {registry} --console {console}"
)
cmd = cmd.format(script=RE6STNET, log=self.path, run=self.run_path,
state=self.path, dh=DH_FILE, ca=self.registry.ca_crt,
cert=self.crt, key=self.key, registry=self.registry.url,
console=self.console).split()
cmd += args
logging.debug("run node %s at ns: %s with cmd: %s",
self.name, self.node.pid, " ".join(cmd))
self.proc = self.node.Popen(cmd, stdout=PIPE, stderr=PIPE)
def clean(self):
"""remove the file created last time"""
for name in ["re6stnet.log", "babeld.state", "cache.db", "babeld.log"]:
f = self.path / name
if f.exists():
f.unlink()
def stop(self):
"""stop running re6stnet process"""
logging.debug("%s teminate process %s", self.name, self.proc.pid)
self.proc.destroy()
def __del__(self):
"""teminate process and rm temp dir"""
try:
self.stop()
except Exception as e:
logging.warn("%s: %s", self.name, e)
# python2 seems auto clean the tempdir
# try:
# shutil.rmtree(self.run_path)
# except Exception as e:
# logging.error("{}: {}".format(self.name, e))
"""contain ping-test for re6set net"""
import os
import unittest
import time
import psutil
import logging
import sqlite3
import random
from binascii import b2a_hex
from pathlib2 import Path
import re6st_wrap
import network_build
PING_PATH = str(Path(__file__).parent.resolve() / "ping.py")
BABEL_HMAC = 'babel_hmac0', 'babel_hmac1', 'babel_hmac2'
def deploy_re6st(nm, recreate=False):
net = nm.registrys
nodes = []
registrys = []
re6st_wrap.Re6stRegistry.registry_seq = 0
re6st_wrap.Re6stNode.node_seq = 0
for registry in net:
reg = re6st_wrap.Re6stRegistry(registry, "2001:db8:42::", len(net[registry]),
recreate=recreate)
reg_node = re6st_wrap.Re6stNode(registry, reg, name=reg.name)
registrys.append(reg)
reg_node.run("--gateway", "--disable-proto", "none", "--ip", registry.ip)
nodes.append(reg_node)
for m in net[registry]:
node = re6st_wrap.Re6stNode(m, reg)
node.run("-i" + m.iface.name)
nodes.append(node)
return nodes, registrys
def wait_stable(nodes, timeout=240):
"""try use ping6 from each node to the other until ping success to all the
other nodes
Args:
timeout: int, the time for wait
return:
True if success
"""
logging.info("wait all node stable, timeout: %s", timeout)
now = time.time()
ips = {node.ip6: node.name for node in nodes}
# start the ping processs
for node in nodes:
sub_ips = set(ips) - {node.ip6}
node.ping_proc = node.node.Popen(
["python", PING_PATH, '--retry', '-a'] + list(sub_ips))
# check all the node network can ping each other, in order reverse
unfinished = list(nodes)
while unfinished:
for i in range(len(unfinished)-1, -1, -1):
node = unfinished[i]
if node.ping_proc.poll() is not None:
logging.debug("%s 's network is stable", node.name)
unfinished.pop(i)
time.sleep(0.5)
if time.time() - now > timeout:
for node in unfinished:
node.ping_proc.destroy()
logging.warn("%s can't ping to all the nodes", unfinished)
return False
logging.info("wait time cost: %s", time.time() - now)
return True
@unittest.skipIf(os.geteuid() != 0, "require root or create user namespace plz")
class TestNet(unittest.TestCase):
""" network test case"""
@classmethod
def setUpClass(cls):
"""create work dir"""
logging.basicConfig(level=logging.INFO)
re6st_wrap.initial()
@classmethod
def tearDownClass(cls):
"""watch any process leaked after tests"""
logging.basicConfig(level=logging.WARNING)
for p in psutil.Process().children():
logging.debug("unterminate ps, name: %s, pid: %s, status: %s, cmd: %s",
p.name(), p.pid, p.status(), p.cmdline())
p.terminate()
# try:
# p.kill()
# except:
# pass