Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
R
re6stnet
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
zhifan huang
re6stnet
Commits
e1aa324b
Commit
e1aa324b
authored
Apr 04, 2022
by
zhifan huang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add network simulate ping test
parent
713ef0e7
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
589 additions
and
10 deletions
+589
-10
re6st/tests/test_network/__init__.py
re6st/tests/test_network/__init__.py
+1
-0
re6st/tests/test_network/fixnemu.py
re6st/tests/test_network/fixnemu.py
+71
-0
re6st/tests/test_network/network_build.py
re6st/tests/test_network/network_build.py
+153
-0
re6st/tests/test_network/ping.py
re6st/tests/test_network/ping.py
+60
-0
re6st/tests/test_network/re6st_wrap.py
re6st/tests/test_network/re6st_wrap.py
+188
-0
re6st/tests/test_network/test_ping.py
re6st/tests/test_network/test_ping.py
+108
-0
re6st/tests/test_unit/__init__.py
re6st/tests/test_unit/__init__.py
+0
-2
re6st/tests/test_unit/test_conf.py
re6st/tests/test_unit/test_conf.py
+2
-3
re6st/tests/test_unit/test_tunnel/__init__.py
re6st/tests/test_unit/test_tunnel/__init__.py
+1
-1
re6st/tests/tools.py
re6st/tests/tools.py
+4
-3
setup.py
setup.py
+1
-1
No files found.
re6st/tests/test_network/__init__.py
0 → 100644
View file @
e1aa324b
__all__
=
[
"test_ping"
]
re6st/tests/test_network/fixnemu.py
0 → 100644
View file @
e1aa324b
# -*- coding: utf-8 -*-
# Copyright 2010, 2011 INRIA
# Copyright 2011 Martín Ferrari <martin.ferrari@gmail.com>
#
# This file is contains patches to Nemu.
#
# Nemu is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License version 2, as published by the Free
# Software Foundation.
#
# Nemu is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Nemu. If not, see <http://www.gnu.org/licenses/>.
import
re
from
new
import
function
from
nemu.iproute
import
backticks
,
get_if_data
,
route
,
\
get_addr_data
,
get_all_route_data
,
interface
def
_get_all_route_data
():
ipdata
=
backticks
([
IP_PATH
,
"-o"
,
"route"
,
"list"
])
# "table", "all"
ipdata
+=
backticks
([
IP_PATH
,
"-o"
,
"-f"
,
"inet6"
,
"route"
,
"list"
])
ifdata
=
get_if_data
()[
1
]
ret
=
[]
for
line
in
ipdata
.
split
(
"
\
n
"
):
if
line
==
""
:
continue
# PATCH: parse 'from'
# PATCH: 'dev' is missing on 'unreachable' ipv4 routes
match
=
re
.
match
(
'(?:(unicast|local|broadcast|multicast|throw|'
r'unreachable|prohibit|blackhole|nat) )?(\
S+)(?:
from (\
S+))?
'
r'
(
?
:
via
(
\
S
+
))
?
(
?
:
dev
(
\
S
+
))
?
.
*
(
?
:
metric
(
\
d
+
))
?
', line)
if not match:
raise RuntimeError("Invalid output from `ip route'
:
`
%
s
'" % line)
tipe = match.group(1) or "unicast"
prefix = match.group(2)
#src = match.group(3)
nexthop = match.group(4)
interface = ifdata[match.group(5) or "lo"]
metric = match.group(6)
if prefix == "default" or re.search(r'
/
0
$
', prefix):
prefix = None
prefix_len = 0
else:
match = re.match(r'
([
0
-
9
a
-
f
:.]
+
)(
?
:
/
(
\
d
+
))
?$
', prefix)
prefix = match.group(1)
prefix_len = int(match.group(2) or 32)
ret.append(route(tipe, prefix, prefix_len, nexthop, interface.index,
metric))
return ret
get_all_route_data.func_code = _get_all_route_data.func_code
interface__init__ = interface.__init__
def __init__(self, *args, **kw):
interface__init__(self, *args, **kw)
if self.name:
self.name = self.name.split('
@
',1)[0]
interface.__init__ = __init__
get_addr_data.orig = function(get_addr_data.func_code,
get_addr_data.func_globals)
def _get_addr_data():
byidx, bynam = get_addr_data.orig()
return byidx, {name.split('
@
',1)[0]: a for name, a in bynam.iteritems()}
get_addr_data.func_code = _get_addr_data.func_code
re6st/tests/test_network/network_build.py
0 → 100644
View file @
e1aa324b
import
nemu
import
weakref
import
ipaddress
from
subprocess
import
PIPE
from
pathlib2
import
Path
fix_file
=
Path
(
__file__
).
parent
.
resolve
()
/
"fixnemu.py"
execfile
(
str
(
fix_file
))
IPTABLES
=
'iptables'
class
Node
(
nemu
.
Node
):
"""simple nemu.Node used for registry and nodes"""
def
_add_interface
(
self
,
iface
):
self
.
iface
=
iface
iface
.
__dict__
[
'node'
]
=
weakref
.
proxy
(
self
)
return
super
(
Node
,
self
).
_add_interface
(
iface
)
@
property
def
ip
(
self
):
if
hasattr
(
self
,
"_ip"
):
return
str
(
self
.
_ip
)
# return 1 ipv4 address of the one interface, reverse mode
for
iface
in
self
.
get_interfaces
()[::
-
1
]:
for
addr
in
iface
.
get_addresses
():
addr
=
addr
[
'address'
]
if
'.'
in
addr
:
#TODO different type problem?
self
.
_ip
=
ipaddress
.
ip_address
(
addr
)
return
addr
def
connect_network
(
self
,
internet
):
""" create a p2p interface with internet, and assaing ipaddress for
interface
internet: Internet object
ret: 2 interface
"""
self
.
if_i
,
it_if
=
nemu
.
P2PInterface
.
create_pair
(
self
,
internet
)
ip
=
internet
.
assign_ip
(
8
)
self
.
_ip
=
ip
+
2
self
.
if_i
.
add_v4_address
(
str
(
self
.
_ip
),
prefix_len
=
24
)
it_if
.
add_v4_address
(
str
(
ip
+
1
),
prefix_len
=
24
)
internet
.
iface
[
self
]
=
it_if
self
.
if_i
.
up
=
it_if
.
up
=
True
self
.
add_route
(
prefix
=
str
(
internet
.
net
[
0
])
,
prefix_len
=
internet
.
net
.
prefixlen
,
nexthop
=
str
(
ip
+
1
))
return
self
.
if_i
,
it_if
def
connect_switch
(
self
,
switch
,
ip
):
self
.
if_s
=
if_s
=
nemu
.
NodeInterface
(
self
)
switch
.
connect
(
if_s
)
if_s
.
up
=
True
if_s
.
add_v4_address
(
ip
,
prefix_len
=
24
)
return
if_s
class
Network
(
nemu
.
Node
):
def
__init__
(
self
,
net
):
"""net: ipv4 network like 10.0.0.0/8"""
super
(
Network
,
self
).
__init__
()
self
.
net
=
ipaddress
.
ip_network
(
u"{}"
.
format
(
net
))
# start at 10.1.0.0
self
.
next_net
=
self
.
net
[
1
<<
16
]
self
.
iface
=
{}
def
assign_ip
(
self
,
length
):
ret
=
self
.
next_net
self
.
next_net
+=
1
<<
length
return
ret
class
NetManager
(
object
):
"""contain all the nemu object created, so they can live more time"""
def
__init__
(
self
):
self
.
object
=
[]
self
.
registrys
=
{}
def
network_demo
():
registry
=
Node
()
internet
=
Network
(
"10.0.0.0/8"
)
m0
=
Node
()
m1
=
Node
()
m2
=
Node
()
m3
=
Node
()
m4
=
Node
()
m5
=
Node
()
# gateway
g1
=
Node
()
switch0
=
nemu
.
Switch
()
switch1
=
nemu
.
Switch
()
nm
=
NetManager
()
nm
.
object
=
[
registry
,
internet
,
m0
,
m1
,
m2
,
m3
,
m4
,
m5
,
g1
,
switch0
,
switch1
]
nm
.
registrys
=
{
registry
:
[
m0
,
m1
,
m2
,
m3
,
m4
,
m5
]}
# for node in [registry, m0, m2, m3, m4, m5]:
# print "pid: {}".format(node.pid)
g0_if_0
,
_
=
g1
.
connect_network
(
internet
)
m0
.
connect_network
(
internet
)
registry
.
connect_network
(
internet
)
g1
.
Popen
((
IPTABLES
,
'-t'
,
'nat'
,
'-A'
,
'POSTROUTING'
,
'-o'
,
g0_if_0
.
name
,
'-j'
,
'MASQUERADE'
)).
wait
()
g1
.
Popen
((
IPTABLES
,
'-t'
,
'nat'
,
'-N'
,
'MINIUPNPD'
)).
wait
()
g1
.
Popen
((
IPTABLES
,
'-t'
,
'nat'
,
'-A'
,
'PREROUTING'
,
'-i'
,
g0_if_0
.
name
,
'-j'
,
'MINIUPNPD'
)).
wait
()
g1
.
Popen
((
IPTABLES
,
'-N'
,
'MINIUPNPD'
)).
wait
()
# Enable forwarding for communication between registry and registry2
internet
.
Popen
((
'sysctl'
,
'-q'
,
'net.ipv6.conf.all.forwarding=1'
)).
wait
()
it_if_2
=
nemu
.
NodeInterface
(
internet
)
switch0
.
connect
(
it_if_2
)
it_if_2
.
add_v4_address
(
"10.1.9.1"
,
prefix_len
=
24
)
it_if_2
.
up
=
True
nm
.
object
.
append
(
it_if_2
)
ip
=
ipaddress
.
ip_address
(
u"10.1.9.1"
)
for
node
in
[
m1
,
m2
]:
ip
+=
1
node
.
connect_switch
(
switch0
,
str
(
ip
))
ip
=
ipaddress
.
ip_address
(
u"10.1.10.0"
)
for
node
in
[
g1
,
m3
,
m4
,
m5
]:
ip
+=
1
node
.
connect_switch
(
switch1
,
str
(
ip
))
switch0
.
up
=
True
switch1
.
up
=
True
for
node
in
[
m1
,
m2
]:
node
.
add_route
(
nexthop
=
"10.1.9.1"
)
for
node
in
[
m3
,
m4
,
m5
]:
node
.
add_route
(
nexthop
=
"10.1.10.1"
)
for
node
in
[
m0
,
m1
,
m2
,
m3
,
m4
,
m5
]:
app0
=
m1
.
Popen
([
"ping"
,
"-c"
,
"1"
,
registry
.
ip
],
stdout
=
PIPE
)
ret
=
app0
.
wait
()
assert
ret
==
0
,
"network construct failed"
return
nm
re6st/tests/test_network/ping.py
0 → 100644
View file @
e1aa324b
# -*- coding: utf-8 -*-
'''
Script launched on machines from the demo with the option -p/--ping
It uses Multiping to ping several IPs passed as arguments.
After Re6st is stable, this script logs when it does not get response from a
machine in a csv file stored in the directory of the machine in this format:
time, sequence number, number of non-responding machines, ip of these machines
'''
import
argparse
,
errno
,
socket
,
time
,
sys
from
multiping
import
MultiPing
PING_INTERVAL
=
10
PING_TIMEOUT
=
4
class
MultiPing
(
MultiPing
):
# Patch of Multiping because it stays blocked to ipv4
# emission when we want to ping only ipv6 addresses.
# So we only keep the ipv6 part for the demo.
# Bug issued: https://github.com/romana/multi-ping/issues/22
def
_read_all_from_socket
(
self
,
timeout
):
pkts
=
[]
if
self
.
_ipv6_address_present
:
try
:
self
.
_sock6
.
settimeout
(
timeout
)
while
True
:
p
=
self
.
_sock6
.
recv
(
128
)
pkts
.
append
((
bytearray
(
p
),
time
.
time
()))
self
.
_sock6
.
settimeout
(
0
)
except
socket
.
timeout
:
pass
except
socket
.
error
as
e
:
if
e
.
errno
==
errno
.
EWOULDBLOCK
:
pass
else
:
raise
return
pkts
def
main
():
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'-a'
,
nargs
=
'+'
,
help
=
'the list of addresses to ping'
)
parser
.
add_argument
(
'--retry'
,
action
=
'store_true'
,
help
=
'retry ping unitl success'
)
args
=
parser
.
parse_args
()
addrs
=
args
.
a
retry
=
args
.
retry
while
True
:
mp
=
MultiPing
(
addrs
)
mp
.
send
()
_
,
no_responses
=
mp
.
receive
(
PING_TIMEOUT
)
if
retry
and
no_responses
:
continue
else
:
sys
.
stdout
.
write
(
" "
.
join
(
no_responses
))
return
if
__name__
==
'__main__'
:
main
()
re6st/tests/test_network/re6st_wrap.py
0 → 100644
View file @
e1aa324b
import
os
import
sys
import
json
import
shutil
import
sqlite3
import
weakref
import
ipaddress
import
time
import
re
from
subprocess
import
PIPE
,
call
from
pathlib2
import
Path
import
re6st.tests.tools
as
tools
WORK_DIR
=
Path
(
__file__
).
parent
.
resolve
()
/
"temp_net_test"
DH_FILE
=
WORK_DIR
/
"dh2048.pem"
RE6STNET
=
"re6stnet"
RE6ST_REGISTRY
=
"re6st-registry"
RE6ST_CONF
=
"re6st-conf"
def
initial
():
"""create the workplace and dh file"""
if
not
WORK_DIR
.
exists
():
WORK_DIR
.
mkdir
()
if
not
DH_FILE
.
exists
():
call
(
"openssl dhparam -out %s 2048"
%
str
(
DH_FILE
),
shell
=
True
)
def
ip_to_serial
(
ip6
):
"""convert ipv6 address to serial"""
ip6
=
ipaddress
.
IPv6Address
(
u"{}"
.
format
(
ip6
))
ip6
=
"1{:x}"
.
format
(
int
(
ip6
)).
rstrip
(
'0'
)
return
int
(
ip6
,
16
)
class
Re6stRegistry
(
object
):
"""class run a re6st-registry service on a namespace"""
registry_seq
=
-
1
def
__init__
(
self
,
node
,
ip6
,
recreate
=
False
):
self
.
node
=
node
# TODO need set once
self
.
ip
=
node
.
ip
self
.
ip6
=
ip6
self
.
name
=
self
.
generate_name
()
self
.
path
=
WORK_DIR
/
self
.
name
self
.
ca_key
=
self
.
path
/
"ca.key"
# because re6st-conf will create ca.crt so use another name
self
.
ca_crt
=
self
.
path
/
"ca.cert"
self
.
log
=
self
.
path
/
"registry.log"
self
.
db
=
self
.
path
/
"registry.db"
self
.
run_path
=
self
.
path
/
"run"
if
recreate
and
self
.
path
.
exists
():
shutil
.
rmtree
(
str
(
self
.
path
))
if
not
self
.
path
.
exists
():
self
.
create_registry
()
# use hash to identify the registry
with
self
.
ca_key
.
open
()
as
f
:
text
=
f
.
readlines
()
text
=
''
.
join
(
text
)
self
.
ident
=
hash
(
text
)
self
.
run
()
@
classmethod
def
generate_name
(
cls
):
cls
.
registry_seq
+=
1
return
"registry_{}"
.
format
(
cls
.
registry_seq
)
@
property
def
url
(
self
):
return
"http://{ip}/"
.
format
(
ip
=
self
.
ip
)
def
create_registry
(
self
):
self
.
path
.
mkdir
()
tools
.
create_ca_file
(
str
(
self
.
ca_key
),
str
(
self
.
ca_crt
),
serial
=
ip_to_serial
(
self
.
ip6
))
#python -m re6st.cli.registry
def
run
(
self
):
cmd
=
(
"{script} --ca {ca} --key {key} --dh {dh} --ipv4 10.42.0.0/16 8 "
" --logfile {log} --db {db} --run {run} --hello 4 --mailhost s -v4"
)
cmd
=
cmd
.
format
(
script
=
RE6ST_REGISTRY
,
ca
=
self
.
ca_crt
,
key
=
self
.
ca_key
,
dh
=
DH_FILE
,
log
=
self
.
log
,
db
=
self
.
db
,
run
=
self
.
run_path
).
split
()
self
.
proc
=
self
.
node
.
Popen
(
cmd
,
stdout
=
PIPE
,
stderr
=
PIPE
)
class
Re6stNode
(
object
):
"""class run a re6stnet service on a namespace"""
node_seq
=
-
1
def
__init__
(
self
,
node
,
registry
,
name
=
None
,
recreate
=
False
):
"""
node: nemu node
name: name for res6st node
"""
self
.
name
=
name
if
name
else
self
.
generate_name
()
self
.
node
=
node
self
.
registry
=
weakref
.
proxy
(
registry
)
self
.
path
=
WORK_DIR
/
self
.
name
self
.
email
=
self
.
name
+
"@example.com"
self
.
run_path
=
self
.
path
/
"run"
self
.
crt
=
self
.
path
/
"cert.crt"
self
.
key
=
self
.
path
/
'cert.key'
self
.
console
=
self
.
run_path
/
"console.sock"
self
.
data_file
=
self
.
path
/
"data.json"
# contain data for restart node
# condition node of the registry
if
self
.
name
==
self
.
registry
.
name
:
self
.
ip6
=
self
.
registry
.
ip6
if
not
self
.
crt
.
exists
():
self
.
create_node
()
else
:
# if ca file changed, we need recreate node file
if
self
.
data_file
.
exists
():
with
self
.
data_file
.
open
()
as
f
:
data
=
json
.
load
(
f
)
self
.
ip6
=
data
.
get
(
"ip6"
)
recreate
=
not
data
.
get
(
'hash'
)
==
self
.
registry
.
ident
else
:
recreate
=
True
if
recreate
and
self
.
path
.
exists
():
shutil
.
rmtree
(
str
(
self
.
path
))
if
not
self
.
path
.
exists
():
self
.
path
.
mkdir
()
self
.
create_node
()
sys
.
stderr
.
write
(
"{}'s subnet is {}
\
n
"
.
format
(
self
.
name
,
self
.
ip6
))
@
classmethod
def
generate_name
(
cls
):
cls
.
node_seq
+=
1
return
"node_{}"
.
format
(
cls
.
node_seq
)
def
create_node
(
self
):
"""create necessary file for node"""
sys
.
stderr
.
write
(
"create file for node {}"
.
format
(
self
.
name
))
cmd
=
"{script} --registry {registry_url} --email {email}"
cmd
=
cmd
.
format
(
script
=
RE6ST_CONF
,
registry_url
=
self
.
registry
.
url
,
email
=
self
.
email
).
split
()
p
=
self
.
node
.
Popen
(
cmd
,
stdin
=
PIPE
,
stdout
=
PIPE
,
cwd
=
str
(
self
.
path
))
db
=
sqlite3
.
connect
(
str
(
self
.
registry
.
db
),
isolation_level
=
None
)
count
=
0
token
=
None
while
not
token
:
time
.
sleep
(.
1
)
token
=
db
.
execute
(
"SELECT token FROM token WHERE email=?"
,
(
self
.
email
,)).
fetchone
()
count
+=
1
if
count
>
100
:
raise
Exception
(
"can't connect to the Register"
)
out
,
_
=
p
.
communicate
(
str
(
token
[
0
]))
self
.
ip6
=
re
.
search
(
'(?<=subnet: )[0-9:a-z]+'
,
out
).
group
(
0
)
data
=
{
'ip6'
:
self
.
ip6
,
'hash'
:
self
.
registry
.
ident
}
with
open
(
str
(
self
.
data_file
),
'w'
)
as
f
:
json
.
dump
(
data
,
f
)
def
run
(
self
,
*
args
):
"""execute re6stnet"""
cmd
=
(
"{script} --log {log} --run {run} --state {state}"
" --dh {dh} --ca {ca} --cert {cert} --key {key} -v4"
" --registry {registry} --console {console}"
)
cmd
=
cmd
.
format
(
script
=
RE6STNET
,
log
=
self
.
path
,
run
=
self
.
run_path
,
state
=
self
.
path
,
dh
=
DH_FILE
,
ca
=
self
.
registry
.
ca_crt
,
cert
=
self
.
crt
,
key
=
self
.
key
,
registry
=
self
.
registry
.
url
,
console
=
self
.
console
).
split
()
cmd
+=
args
self
.
proc
=
self
.
node
.
Popen
(
cmd
,
stdout
=
PIPE
,
stderr
=
PIPE
)
re6st/tests/test_network/test_ping.py
0 → 100644
View file @
e1aa324b
"""contain ping-test for re6set net"""
import
unittest
import
time
import
re6st_wrap
import
subprocess
import
sys
import
os
from
pathlib2
import
Path
from
network_build
import
*
PING_PATH
=
Path
(
__file__
).
parent
.
resolve
()
/
"ping.py"
def
wait_stable
(
nodes
,
timeout
=
180
):
sys
.
stderr
.
write
(
"wait stalbe
\
n
"
)
now
=
time
.
time
()
# wait all the nodes can connect to each other
ips
=
{
node
.
ip6
:
node
.
name
for
node
in
nodes
}
for
node
in
nodes
:
sub_ips
=
set
(
ips
)
-
{
node
.
ip6
}
node
.
ping_proc
=
node
.
node
.
Popen
(
"python {} --retry -a {} "
.
format
(
PING_PATH
,
' '
.
join
(
sub_ips
)),
shell
=
True
)
unfinished
=
list
(
nodes
)
# check all the node network can ping each other
while
len
(
unfinished
):
for
i
in
range
(
len
(
unfinished
)
-
1
,
-
1
,
-
1
):
node
=
unfinished
[
i
]
if
node
.
ping_proc
.
poll
()
is
not
None
:
sys
.
stderr
.
write
(
"{}'s network is stable
\
n
"
.
format
(
node
.
name
))
unfinished
.
pop
(
i
)
node
.
ping_proc
.
wait
()
time
.
sleep
(
0.5
)
if
time
.
time
()
-
now
>
timeout
:
raise
Exception
(
"wait network stable timeout"
)
def
ping_test
(
nodes
):
"""test Re6st node connecty """
ips
=
{
node
.
ip6
:
node
.
name
for
node
in
nodes
}
sys
.
stderr
.
write
(
"start ping test
\
n
"
)
failed
=
False
for
node
in
nodes
:
sub_ips
=
set
(
ips
)
-
{
node
.
ip6
}
node
.
ping_proc
=
node
.
node
.
Popen
(
"python {} -a {} "
.
format
(
PING_PATH
,
' '
.
join
(
sub_ips
)),
shell
=
True
,
stdout
=
subprocess
.
PIPE
)
out
,
_
=
node
.
ping_proc
.
communicate
()
unreached
=
[
ips
[
addr
]
for
addr
in
out
.
split
()]
if
unreached
:
sys
.
stderr
.
write
(
"{} can't ping to {}
\
n
"
.
format
(
node
.
name
,
","
.
join
(
unreached
)))
failed
=
True
sys
.
stderr
.
write
(
"ping test finish
\
n
"
)
return
failed
@
unittest
.
skipIf
(
os
.
geteuid
()
!=
0
,
"require root"
)
class
TestPing
(
unittest
.
TestCase
):
"""" ping-test case"""
@
classmethod
def
setUpClass
(
cls
):
"""set up for all case"""
# re6st_wrap.WORK_DIR = xxx # chage the work dir
re6st_wrap
.
initial
()
def
tearDown
(
self
):
# terminate all the sub process
return
p
=
subprocess
.
Popen
(
"ps -aux | grep re6st | grep -v grep"
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
)
out
,
_
=
p
.
communicate
()
out
=
str
(
out
)
out
=
out
.
split
(
'
\
n
'
)
out
.
pop
()
for
proc
in
out
:
proc
=
proc
.
split
()[
1
]
subprocess
.
call
([
"kill"
,
"-15"
,
proc
])
def
test_sample
(
self
):
"""create a network demo, test the connectivity by ping
wait the network stable then ping 3 times
"""
nm
=
network_demo
()
net
=
nm
.
registrys
nodes
=
[]
registrys
=
[]
for
registry
in
net
:
reg
=
re6st_wrap
.
Re6stRegistry
(
registry
,
"2001:db8:42::"
,
recreate
=
False
)
reg_node
=
re6st_wrap
.
Re6stNode
(
registry
,
reg
,
name
=
reg
.
name
)
registrys
.
append
(
registry
)
reg_node
.
run
(
"--gateway"
,
"--disable-proto"
,
"none"
,
"--ip"
,
registry
.
ip
)
nodes
.
append
(
reg_node
)
for
m
in
net
[
registry
]:
node
=
re6st_wrap
.
Re6stNode
(
m
,
reg
)
node
.
run
(
"-i"
+
m
.
iface
.
name
)
nodes
.
append
(
node
)
wait_stable
(
nodes
)
for
i
in
range
(
3
):
time
.
sleep
(
20
)
self
.
assertFalse
(
ping_test
(
nodes
),
"N.{} ping test failed"
.
format
(
i
))
if
__name__
==
"__main__"
:
unittest
.
main
()
re6st/tests/test_unit/__init__.py
View file @
e1aa324b
...
...
@@ -6,5 +6,3 @@ __all__ = ["test_registry",
"test_registry_client"
,
"test_conf"
,
"test_tunnel"
]
from
re6st.tests.test_unit
import
*
re6st/tests/test_unit/test_conf.py
View file @
e1aa324b
...
...
@@ -22,8 +22,7 @@ key_path = 'cert.key'
# TODO test for is needed
class
TestConf
(
unittest
.
TestCase
):
""" Unit test case for re6st-conf
"""
""" Unit test case for re6st-conf"""
@
classmethod
def
setUpClass
(
cls
):
...
...
@@ -114,7 +113,7 @@ class TestConf(unittest.TestCase):
def
test_ca_only
(
self
):
""" only create ca file and exit
""" only create ca
.crt
file and exit
"""
command
=
self
.
command
+
" --ca-only"
sys
.
argv
=
command
.
split
()
...
...
re6st/tests/test_unit/test_tunnel/__init__.py
View file @
e1aa324b
__all__
=
[
"test_multi_gateway_manager"
,
"test_base_tunnel_manager"
]
from
re6st.tests.test_unit.test_tunnel
import
*
\ No newline at end of file
from
re6st.tests.test_unit.test_tunnel
import
*
re6st/tests/tools.py
View file @
e1aa324b
...
...
@@ -69,8 +69,9 @@ def create_cert_file(pkey_file, cert_file, ca, ca_key, prefix, serial):
def
create_ca_file
(
pkey_file
,
cert_file
):
def
create_ca_file
(
pkey_file
,
cert_file
,
serial
=
0x120010db80042
):
"""create key and ca file with specify name
return key, cert in pem format """
key
=
crypto
.
PKey
()
key
.
generate_key
(
crypto
.
TYPE_RSA
,
2048
)
cert
=
crypto
.
X509
()
...
...
@@ -83,7 +84,7 @@ def create_ca_file(pkey_file, cert_file):
subject
.
O
=
"nexedi"
subject
.
CN
=
"TEST-CA"
cert
.
set_issuer
(
cert
.
get_subject
())
cert
.
set_serial_number
(
10000
)
cert
.
set_serial_number
(
serial
)
cert
.
set_pubkey
(
key
)
cert
.
sign
(
key
,
"sha512"
)
...
...
setup.py
View file @
e1aa324b
...
...
@@ -95,7 +95,7 @@ setup(
install_requires
=
[
'pyOpenSSL >= 0.13'
,
'miniupnpc'
],
extras_require
=
{
'geoip'
:
[
'geoip2'
],
'test'
:
[
'mock'
,
'pathlib2'
]
'test'
:
[
'mock'
,
'pathlib2'
,
'nemu'
,
'python-unshare'
,
'passfd'
]
},
#dependency_links = [
# "http://miniupnp.free.fr/files/download.php?file=miniupnpc-1.7.20120714.tar.gz#egg=miniupnpc-1.7",
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment