Commit 720f9edd authored by zhifan huang's avatar zhifan huang

Add network test: deploy re6st in net ns, and run test

Because some bugs of nemus occur in test-nodes, when use popen occur
"RuntimeError: Error from slave: 500 Unknown command", or when add
veth name option error. I write sample module my_net to mangae the net
ns.

my_net: use unshare to create net namespace, then use nsenter to enter
ns correpand to execute cmd like add device, set ip address... and
cotain some network demo for running tests.

re6st_wrap: wrap the deploy of re6st node, ease the creation of cert
file and run of the node

test_net contain the tests:
  - ping test: net segment(router), multi-net segment(demo),
	       test after a reboot
  - hmac test: test on demo

ping is like ping file in demo, a script to run multiping
parent 5d5510e2
clean_ruleset_interval=600
allow 1024-65535 10.0.0.0/8 1024-65535
deny 0-65535 0.0.0.0/0 0-65535
"""thie moudle use net namespace to create different net node"""
import subprocess
from subprocess import PIPE
import weakref
import logging
import time
import ipaddress
#iptables-legacy seems have permission problem on test-node
IPTABLES = "iptables-nft"
class NetManager(object):
"""contain all the nemu object created, so they can live more time"""
def __init__(self):
self.object = []
self.registrys = {}
class Device(object):
"""class for network device"""
_id = 0
@classmethod
def get_id(cls):
cls._id += 1
return cls._id
def __init__(self, dev_type, name=None):
"""
type: device type, str
name: device name, str. if not set, generate one name
"""
# name if name else .....
self.type = dev_type
self.name = name or "{}-{}".format(dev_type, self.get_id())
self.ips = []
self._up = False
@property
def up(self):
"bool value control device up or not"
return self._up
@up.setter
def up(self, value):
value = "up" if value else "down"
self.net.run(['ip', 'link', 'set', 'up', self.name])
def add_ip4(self, address, prefix):
ip = "{}/{}".format(address, prefix)
self.ips.append(address)
self.net.run(['ip', 'addr', 'add', ip, 'dev', self.name])
class Netns(object):
"""a network namespace"""
def __init__(self):
self.devices = []
self.app = subprocess.Popen(['unshare', '-n'], stdin=PIPE)
self.pid = self.app.pid
self.add_device_lo()
self.run(['sysctl', '-w', 'net.ipv4.ip_forward=1'], stdout=PIPE)
self.run(['sysctl', '-w', 'net.ipv6.conf.default.forwarding=1'],
stdout=PIPE)
def Popen(self, cmd, **kw):
""" wrapper for subprocess.Popen"""
return subprocess.Popen(['nsenter', '-t', str(self.pid), '-n'] + cmd, **kw)
def run(self, cmd, **kw):
""" wrapper for subprocess.checkout"""
subprocess.check_call(['nsenter', '-t', str(self.pid), '-n'] + cmd, **kw)
def add_device(self, dev):
self.devices.append(dev)
dev.net = weakref.proxy(self)
def add_device_lo(self):
lo = Device("lo", name="lo")
self.add_device(lo)
lo.up = True
def add_device_bridge(self):
""" create a bridge in the netns"""
br = Device("bridge")
self.add_device(br)
self.bridge = br
self.run(['ip', 'link', 'add', br.name, 'type', 'bridge'])
br.up = True
def add_route(self, net, *args):
self.run(['ip', 'route', 'add', net] + list(args))
def connect_direct(node1, node2):
"""create veths between 2 netns
no difference if node1 and node2 changed
Args:
node1(self): Netns
node2: Netns
"""
dev1 = Device("veth")
dev2 = Device("veth")
node1.add_device(dev1)
node2.add_device(dev2)
subprocess.check_call(['ip', 'link', 'add', dev1.name, 'netns', str(node1.pid), 'type', 'veth', 'peer', dev2.name, 'netns', str(node2.pid)])
dev1.up = dev2.up = True
return dev1, dev2
def connect_router(self, router):
""" connect a netns to a router
create veths between 2 netns, and set one veth to the bridge
in router
Args:
router: Netns
Retruns:
device(self), device(router)
"""
if not hasattr(router, "bridge"):
raise Exception("router should have a bridge")
dev1, dev2 = self.connect_direct(router)
self.add_device(dev1)
router.add_device(dev2)
router.run(['ip', 'link', 'set', dev2.name, 'master', router.bridge.name])
return dev1, dev2
def __del__(self):
self.app.terminate()
self.app.wait()
if hasattr(self, "proc"):
self.proc.terminate()
self.proc.wait()
class Host(Netns):
"""node used to run a application, not for connecting
use the first create veth as out, and it's ip for re6st config
"""
@property
def ip(self):
return self.out.ips[0]
@property
def out(self):
return self.devices[1]
def connectible_test(nm):
"""test each node can ping to their registry
Args:
nm: NetManger
Raise:
AssertionError
"""
for reg in nm.registrys:
for node in nm.registrys[reg]:
app0 = node.Popen(["ping", "-c", "1", reg.ip], stdout=PIPE)
ret = app0.wait()
assert ret == 0, "network construct failed {} to {}".format(node.ip, reg.ip)
logging.debug("each node can ping to their registry")
def net_simple():
"""build a simplest network
registry .1 ------ .2 node
10.1.1
Returns:
a network manager contain 2 nodes
"""
nm = NetManager()
node1 = Host()
node2 = Host()
dev1, dev2 = node1.connect_direct(node2)
dev1.add_ip4("10.1.1.1", prefix=24)
dev2.add_ip4("10.1.1.2", prefix=24)
nm.registrys[node1] = [node2]
connectible_test(nm)
return nm
def net_route():
"""build a network connect by a route(bridge)
Returns:
a network manager contain 3 nodes
"""
nm = NetManager()
router = Netns()
router.add_device_bridge()
registry = Host()
node1 = Host()
node2 = Host()
veth_r, _ = registry.connect_router(router)
veth_n1, _ = node1.connect_router(router)
veth_n2, _ = node2.connect_router(router)
veth_r.add_ip4("192.168.1.1", 24)
veth_n1.add_ip4("192.168.1.2", 24)
veth_n2.add_ip4("192.168.1.3", 24)
nm.object.append(router)
nm.registrys[registry] = [node1, node2]
connectible_test(nm)
return nm
def net_demo():
"""build a network like demo
Underlying network:
registry .2------
|
10.0.0|
.1 |
---------------Internet----------------
|.1 |.1 |.1
|10.1.0 |10.2.0 |
|.2 |.2 |
gateway1 gateway2 s3:10.0.1
|.1 |.1 |.2 |.3 |.4
s1:10.1.1 --s2:10.2.1-- m6 m7 m8
|.2 |.3 |.2 |.3 |.4
m1 m2 m3 m4 m5
Return:
a network manager contain 9 nodes
"""
nm = NetManager()
internet = Netns()
gateway1 = Netns()
router1 = Netns()
gateway2 = Netns()
router2 = Netns()
router3 = Netns()
registry = Host()
node1 = Host()
node2 = Host()
node3 = Host()
node4 = Host()
node5 = Host()
node6 = Host()
node7 = Host()
node8 = Host()
router1.add_device_bridge()
router2.add_device_bridge()
router3.add_device_bridge()
veth_re, veth_it1 = registry.connect_direct(internet)
veth_g1_1, veth_it2 = gateway1.connect_direct(internet)
veth_g2_1, veth_it3 = gateway2.connect_direct(internet)
veth_it1.add_ip4("10.0.0.1", 24)
veth_re.add_ip4("10.0.0.2", 24)
registry.add_route("10.0.0.0/8", 'via', "10.0.0.1")
veth_it2.add_ip4("10.1.0.1", 24)
veth_g1_1.add_ip4("10.1.0.2", 24)
gateway1.add_route("10.0.0.0/8", 'via', "10.1.0.1")
# sign ip for node
ip = ipaddress.ip_address(u"10.1.1.1")
for node in [gateway1, node1, node2]:
dev, _ = node.connect_router(router1)
dev.add_ip4(str(ip), 24)
ip += 1
for node in [node1, node2]:
node.add_route("10.0.0.0/8", 'via', "10.1.1.1")
gateway1.run([IPTABLES, '-t', 'nat', '-A', 'POSTROUTING', '-o', veth_g1_1.name, '-j', 'MASQUERADE'])
gateway1.run([IPTABLES, '-t', 'nat', '-N', 'MINIUPNPD'])
gateway1.run([IPTABLES, '-t', 'nat', '-A', 'PREROUTING', '-i', veth_g1_1.name, '-j', 'MINIUPNPD'])
gateway1.run([IPTABLES, '-N', 'MINIUPNPD'])
veth_it3.add_ip4("10.2.0.1", 24)
veth_g2_1.add_ip4("10.2.0.2", 24)
gateway2.add_route("10.0.0.0/8", 'via', "10.2.0.1")
ip = ipaddress.ip_address(u"10.2.1.1")
for node in [gateway2, node3, node4, node5]:
dev, _ = node.connect_router(router2)
dev.add_ip4(str(ip), 24)
ip += 1
for node in [node3, node4, node5]:
node.add_route("10.0.0.0/8", 'via', "10.2.1.1")
ip = ipaddress.ip_address(u"10.0.1.1")
for node in [internet, node6, node7, node8]:
dev, _ = node.connect_router(router3)
dev.add_ip4(str(ip), 24)
ip += 1
for node in [node6, node7, node8]:
node.add_route("10.0.0.0/8", 'via', "10.0.1.1")
# internet.add_route("10.1.0.0/16", 'via', "10.1.0.2")
internet.add_route("10.2.0.0/16", 'via', "10.2.0.2")
gateway1.proc = gateway1.Popen(['miniupnpd', '-d', '-f', 'miniupnpd.conf', '-P', 'miniupnpd.pid',
'-a', gateway1.devices[-1].name, '-i', gateway1.devices[-1].name],
stdout=PIPE, stderr=PIPE)
nm.object += [internet, gateway1, gateway2, router1, router2, router3]
nm.registrys[registry] = [node1, node2, node3, node4, node5, node6, node7, node8]
connectible_test(nm)
return nm
if __name__ == "__main__":
net_demo()
print("good bye!")
# -*- coding: utf-8 -*-
'''
Script launched on machines from the demo with the option -p/--ping
It uses Multiping to ping several IPs passed as arguments.
After Re6st is stable, this script logs when it does not get response from a
machine in a csv file stored in the directory of the machine in this format:
time, sequence number, number of non-responding machines, ip of these machines
'''
import argparse, errno, socket, time, sys
from multiping import MultiPing
PING_TIMEOUT = 4
class MultiPing(MultiPing):
# Patch of Multiping because it stays blocked to ipv4
# emission when we want to ping only ipv6 addresses.
# So we only keep the ipv6 part for the demo.
# Bug issued: https://github.com/romana/multi-ping/issues/22
def _read_all_from_socket(self, timeout):
pkts = []
if self._ipv6_address_present:
try:
self._sock6.settimeout(timeout)
while True:
p = self._sock6.recv(128)
pkts.append((bytearray(p), time.time()))
self._sock6.settimeout(0)
except socket.timeout:
pass
except socket.error as e:
if e.errno == errno.EWOULDBLOCK:
pass
else:
raise
return pkts
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', nargs = '+', help = 'the list of addresses to ping')
parser.add_argument('--retry', action='store_true', help='retry ping unitl success')
args = parser.parse_args()
addrs = args.a
retry = args.retry
while True:
mp = MultiPing(addrs)
mp.send()
_, no_responses = mp.receive(PING_TIMEOUT)
if retry and no_responses:
continue
else:
sys.stdout.write(" ".join(no_responses))
return
if __name__ == '__main__':
main()
"""wrap the deploy of re6st node, ease the creation of cert
file and run of the node
"""
import json
import shutil
import sqlite3
import weakref
import ipaddress
import time
import re
import tempfile
import logging
from subprocess import PIPE, call
from pathlib2 import Path
import re6st.tests.tools as tools
WORK_DIR = Path(__file__).parent / "temp_net_test"
DH_FILE = WORK_DIR / "dh2048.pem"
RE6STNET = "re6stnet"
RE6ST_REGISTRY = "re6st-registry"
#RE6ST_REGISTRY = "python -m re6st.cli.registry"
RE6ST_CONF = "re6st-conf"
def initial():
"""create the workplace and dh file"""
if not WORK_DIR.exists():
WORK_DIR.mkdir()
if not DH_FILE.exists():
logging.info("create dh file")
call(['openssl', 'dhparam', '-out', str(DH_FILE), '2048'], stderr=PIPE)
def ip_to_serial(ip6):
"""convert ipv6 address to serial"""
ip6 = ipaddress.IPv6Address(u"{}".format(ip6))
ip6 = "1{:x}".format(int(ip6)).rstrip('0')
return int(ip6, 16)
def wait_ps(p, timeout=1, sec=0.1):
"""implement timeout of wait"""
now = time.time()
while time.time() -timeout < now:
if p.poll() is not None:
return
time.sleep(sec)
raise Exception("{}, not terminate".format(p.pid))
class Re6stRegistry(object):
"""class run a re6st-registry service on a namespace"""
registry_seq = 0
def __init__(self, node, ip6, recreate=False):
self.node = node
# TODO need set once
self.ip = node.ip
self.ip6 = ip6
self.name = self.generate_name()
self.path = WORK_DIR / self.name
self.ca_key = self.path / "ca.key"
# because re6st-conf will create ca.crt so use another name
self.ca_crt = self.path / "ca.cert"
self.log = self.path / "registry.log"
self.db = self.path / "registry.db"
self.run_path = tempfile.mkdtemp()
if recreate and self.path.exists():
shutil.rmtree(str(self.path))
if not self.path.exists():
self.create_registry()
# use hash to identify the registry
with self.ca_key.open() as f:
text = f.read()
self.ident = hash(text)
# clear log file
if self.log.exists():
self.log.unlink()
self.clean()
self.run()
# wait the servcice started
p = self.node.Popen(['python', '-c', """if 1:
import socket, time
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
s.connect(('localhost', 80))
break
except socket.error:
time.sleep(.1)
"""])
try:
wait_ps(p, 5)
except Exception as e:
logging.error("%s: %s", self.name, e)
raise e
logging.info("re6st service started")
@classmethod
def generate_name(cls):
cls.registry_seq += 1
return "registry_{}".format(cls.registry_seq)
@property
def url(self):
return "http://{ip}/".format(ip=self.ip)
def create_registry(self):
self.path.mkdir()
tools.create_ca_file(str(self.ca_key), str(self.ca_crt),
serial=ip_to_serial(self.ip6))
def run(self):
cmd = ("{script} --ca {ca} --key {key} --dh {dh} --ipv4 10.42.0.0/16 8 "
" --logfile {log} --db {db} --run {run} --hello 4 --mailhost s "
"-v4")
cmd = cmd.format(script=RE6ST_REGISTRY, ca=self.ca_crt,
key=self.ca_key, dh=DH_FILE, log=self.log, db=self.db,
run=self.run_path).split()
logging.info("run registry %s at ns: %s with cmd: %s",
self.name, self.node.pid, " ".join(cmd))
self.proc = self.node.Popen(cmd, stdout=PIPE, stderr=PIPE)
def clean(self):
"""remove the file created last time"""
for f in [self.log]:
if f.exists():
f.unlink()
def __del__(self):
try:
logging.debug("teminate process %s", self.proc.pid)
self.proc.terminate()
self.proc.wait(1)
except:
pass
class Re6stNode(object):
"""class run a re6stnet service on a namespace"""
node_seq = 0
def __init__(self, node, registry, name=None, recreate=False):
"""
node: nemu node
name: name for res6st node
"""
self.name = name if name else self.generate_name()
self.node = node
self.registry = weakref.proxy(registry)
self.path = WORK_DIR / self.name
self.email = self.name + "@example.com"
if self.name == self.registry.name:
self.run_path = self.registry.run_path
else:
self.run_path = tempfile.mkdtemp()
self.log = self.path / "re6stnet.log"
self.crt = self.path / "cert.crt"
self.key = self.path / 'cert.key'
self.console = self.run_path + "/console.sock"
self.data_file = self.path / "data.json" # contain data for restart node
# condition, node of the registry
if self.name == self.registry.name:
self.ip6 = self.registry.ip6
if not self.crt.exists():
self.create_node()
else:
# if ca file changed, we need recreate node file
if self.data_file.exists():
with self.data_file.open() as f:
data = json.load(f)
self.ip6 = data.get("ip6")
recreate = not data.get('hash') == self.registry.ident
else:
recreate = True
if recreate and self.path.exists():
shutil.rmtree(str(self.path))
if not self.path.exists():
self.path.mkdir()
self.create_node()
logging.debug("%s's subnet is %s", self.name, self.ip6)
self.clean()
def __repr__(self):
return self.name
@classmethod
def generate_name(cls):
cls.node_seq += 1
return "node_{}".format(cls.node_seq)
def create_node(self):
"""create necessary file for node"""
logging.info("create dir of node %s", self.name)
cmd = "{script} --registry {registry_url} --email {email}"
cmd = cmd.format(script=RE6ST_CONF, registry_url=self.registry.url,
email=self.email).split()
p = self.node.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE,
cwd=str(self.path))
# read token
db = sqlite3.connect(str(self.registry.db), isolation_level=None)
count = 0
token = None
while not token:
time.sleep(.1)
token = db.execute("SELECT token FROM token WHERE email=?",
(self.email,)).fetchone()
count += 1
if count > 100:
p.terminate()
raise Exception("can't connect to the Register")
out, _ = p.communicate(str(token[0]))
# logging.debug("re6st-conf output: {}".format(out))
# find the ipv6 subnet of node
self.ip6 = re.search('(?<=subnet: )[0-9:a-z]+', out).group(0)
data = {'ip6': self.ip6, 'hash': self.registry.ident}
with open(str(self.data_file), 'w') as f:
json.dump(data, f)
logging.info("create dir of node %s finish", self.name)
def run(self, *args):
"""execute re6stnet"""
cmd = ("{script} --log {log} --run {run} --state {state}"
" --dh {dh} --ca {ca} --cert {cert} --key {key} -v4"
" --registry {registry} --console {console}"
)
cmd = cmd.format(script=RE6STNET, log=self.path, run=self.run_path,
state=self.path, dh=DH_FILE, ca=self.registry.ca_crt,
cert=self.crt, key=self.key, registry=self.registry.url,
console=self.console).split()
cmd += args
logging.info("run node %s at ns: %s with cmd: %s",
self.name, self.node.pid, " ".join(cmd))
self.proc = self.node.Popen(cmd, stdout=PIPE, stderr=PIPE)
def clean(self):
"""remove the file created last time"""
for name in ["re6stnet.log", "babeld.state", "cache.db", "babeld.log"]:
f = self.path / name
if f.exists():
f.unlink()
def stop(self):
"""stop running re6stnet process"""
logging.debug("%s teminate process %s", self.name, self.proc.pid)
self.proc.terminate()
# timeout only in python3. deadlock maybe
wait_ps(self.proc, 2)
def __del__(self):
"""teminate process and rm temp dir"""
try:
self.stop()
except Exception as e:
logging.warn("%s: %s", self.name, e)
# python2 seems auto clean the tempdir
# try:
# shutil.rmtree(self.run_path)
# except Exception as e:
# logging.error("{}: {}".format(self.name, e))
"""contain ping-test for re6set net"""
import os
import unittest
import time
import psutil
import logging
import sqlite3
import random
from binascii import b2a_hex
from pathlib2 import Path
import re6st_wrap
import my_net
PING_PATH = str(Path(__file__).parent.resolve() / "ping.py")
BABEL_HMAC = 'babel_hmac0', 'babel_hmac1', 'babel_hmac2'
def deploy_re6st(nm, recreate=False):
net = nm.registrys
nodes = []
registrys = []
re6st_wrap.Re6stRegistry.registry_seq = 0
re6st_wrap.Re6stNode.node_seq = 0
for registry in net:
reg = re6st_wrap.Re6stRegistry(registry, "2001:db8:42::", recreate=recreate)
reg_node = re6st_wrap.Re6stNode(registry, reg, name=reg.name)
registrys.append(reg)
reg_node.run("--gateway", "--disable-proto", "none", "--ip", registry.ip)
nodes.append(reg_node)
for m in net[registry]:
node = re6st_wrap.Re6stNode(m, reg)
node.run("-i" + m.out.name)
nodes.append(node)
return nodes, registrys
def wait_stable(nodes, timeout=240):
"""try use ping6 from each node to the other until ping success to all the
other nodes
Args:
timeout: int, the time for wait
return:
True if success
"""
logging.info("wait all node stable, timeout: %s", timeout)
now = time.time()
ips = {node.ip6: node.name for node in nodes}
# start the ping processs
for node in nodes:
sub_ips = set(ips) - {node.ip6}
node.ping_proc = node.node.Popen(
["python", PING_PATH, '--retry', '-a'] + list(sub_ips))
# check all the node network can ping each other, in order reverse
unfinished = list(nodes)
while unfinished:
for i in range(len(unfinished)-1, -1, -1):
node = unfinished[i]
if node.ping_proc.poll() is not None:
logging.debug("%s 's network is stable", node.name)
unfinished.pop(i)
time.sleep(0.5)
if time.time() - now > timeout:
for node in unfinished:
node.ping_proc.terminate()
logging.warn("%s can't ping to all the nodes", unfinished)
return False
logging.info("wait time cost: %s", time.time() - now)
return True
def get_config(db, name):
r = db.execute("SELECT value FROM config WHERE name=?", (name,)).fetchone()
if r:
return b2a_hex(*r)
def check_HMAC(db, machines):
"""method copy from demo"""
hmac = [get_config(db, k) for k in BABEL_HMAC]
rc = True
for x in psutil.Process().children(True):
if x.name() == 'babeld':
sign = accept = None
args = x.cmdline()
for x in args:
if x.endswith('/babeld.log'):
if x[:-11].split('/')[-1] not in machines:
break
elif x.startswith('key '):
x = x.split()
if 'sign' in x:
sign = x[-1]
elif 'accept' in x:
accept = x[-1]
else:
i = 0 if hmac[0] else 1
if hmac[i] != sign or hmac[i+1] != accept:
logging.warn('HMAC config wrong for in %s', args)
logging.warn("HMAC sign: %s, accept: %s", sign, accept)
rc = False
if rc:
logging.info('All nodes use Babel with the correct HMAC configuration')
else:
logging.warn('Expected config: %s', dict(zip(BABEL_HMAC, hmac)))
return rc
@unittest.skipIf(os.geteuid() != 0, "require root or create user namespace plz")
class TestNet(unittest.TestCase):
""" network test case"""
@classmethod
def setUpClass(cls):
"""create work dir"""
logging.basicConfig(level=logging.INFO)
re6st_wrap.initial()
@classmethod
def tearDownClass(cls):
"""watch any process leaked after tests"""
for p in psutil.Process().children():
logging.debug("unterminate ps, name: %s, pid: %s, status: %s, cmd: %s",
p.name(), p.pid, p.status(), p.cmdline())
p.terminate()
logging.basicConfig(level=logging.WARNING)
# try:
# p.kill()
# except:
# pass
def test_ping_router(self):
"""create a network in a net segment, test the connectivity by ping
"""
nm = my_net.net_route()
nodes, _ = deploy_re6st(nm)
wait_stable(nodes, 40)
time.sleep(10)
self.assertTrue(wait_stable(nodes, 30), " ping test failed")
def test_ping_demo(self):
"""create a network demo, test the connectivity by ping
wait at most 50 seconds, and test each node ping to other by ipv6 addr
"""
nm = my_net.net_demo()
nodes, _ = deploy_re6st(nm)
# wait 60, if the re6stnet stable quit wait
wait_stable(nodes, 50)
time.sleep(20)
self.assertTrue(wait_stable(nodes, 30), "ping test failed")
def test_reboot_one_machine(self):
"""create a network demo, wait the net stable, reboot on machine,
then test if network recover, this test seems always failed
"""
nm = my_net.net_demo()
nodes, _ = deploy_re6st(nm)
wait_stable(nodes, 100)
# stop on machine randomly
index = int(random.random() * 7) + 1
machine = nodes[index]
machine.proc.terminate()
machine.proc.wait()
time.sleep(5)
machine.run("-i" + machine.node.out.name)
self.assertTrue(wait_stable(nodes, 100), "network can't recover")
@unittest.skip("re6st dont use hmac now")
def test_hmac(self):
"""create a network demo, and run hmac test, this test check hmac 3
times the third part always failed, unless deploy_re6st in no recreate
mode
"""
nm = my_net.net_demo()
nodes, registrys = deploy_re6st(nm, False)
updateHMAC = ['python', '-c', "import urllib, sys; sys.exit("
"204 != urllib.urlopen('http://127.0.0.1/updateHMAC').code)"]
registry = registrys[0]
machine1 = nodes[5]
reg1_db = sqlite3.connect(str(registry.db), isolation_level=None,
check_same_thread=False)
# reg1_db.text_factory = str
m_net1 = [node.name for node in nodes]
# wait net stable, wait at most 100 seconds
wait_stable(nodes, 100)
logging.info('Check that the initial HMAC config is deployed on network 1')
self.assertTrue(check_HMAC(reg1_db, m_net1), "first hmac check failed")
logging.info('Test that a HMAC update works with nodes that are up')
registry.node.run(updateHMAC)
time.sleep(60)
# Checking HMAC on machines connected to registry 1...
self.assertTrue(check_HMAC(reg1_db, m_net1),
"second hmac check failed: HMAC update don't work")
# # check if one machine restarted
logging.info('Test that machines can update upon reboot '
'when they were off during a HMAC update.')
machine1.stop()
time.sleep(5)
registry.node.run(updateHMAC)
time.sleep(60)
machine1.run("-i" + machine1.node.out.name)
wait_stable(nodes, 100)
self.assertTrue(check_HMAC(reg1_db, m_net1),
"third hmac check failed: machine restart failed")
logging.info('Testing of HMAC done!')
reg1_db.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, filename='test.log', filemode='w',
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%I:%M:%S')
unittest.main(verbosity=3)
......@@ -2,9 +2,15 @@ import sys
import os
import time
import subprocess
from OpenSSL import crypto
from re6st import registry
with open(os.devnull, "wb") as null:
tmp = sys.stderr
sys.stderr = null
# avoid the python version error
from OpenSSL import crypto
from re6st import registry
sys.stderr = tmp
def generate_csr():
......@@ -68,8 +74,9 @@ def create_cert_file(pkey_file, cert_file, ca, ca_key, prefix, serial):
def create_ca_file(pkey_file, cert_file):
def create_ca_file(pkey_file, cert_file, serial=0x120010db80042):
"""create key and ca file with specify name
return key, cert in pem format """
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
......@@ -82,7 +89,7 @@ def create_ca_file(pkey_file, cert_file):
subject.O = "nexedi"
subject.CN = "TEST-CA"
cert.set_issuer(cert.get_subject())
cert.set_serial_number(10000)
cert.set_serial_number(serial)
cert.set_pubkey(key)
cert.sign(key, "sha512")
......
......@@ -94,6 +94,7 @@ setup(
install_requires = ['pyOpenSSL >= 0.13', 'miniupnpc'],
extras_require = {
'geoip': ['geoip2'],
'test': ['mock', 'pathlib2', 'multiping']
},
#dependency_links = [
# "http://miniupnp.free.fr/files/download.php?file=miniupnpc-1.7.20120714.tar.gz#egg=miniupnpc-1.7",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment