Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
slapos.libnetworkcache
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
nexedi
slapos.libnetworkcache
Commits
ad6e4d5c
Commit
ad6e4d5c
authored
Aug 24, 2011
by
Łukasz Nowak
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Simplify importing + pep8.
parent
5da0eddc
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
53 additions
and
34 deletions
+53
-34
slapos/libnetworkcachetests.py
slapos/libnetworkcachetests.py
+53
-34
No files found.
slapos/libnetworkcachetests.py
View file @
ad6e4d5c
...
...
@@ -12,9 +12,8 @@ import tempfile
import
threading
import
time
import
unittest
from
slapos.libnetworkcache
import
NetworkcacheClient
,
UploadError
,
DirectoryNotFound
from
slapos.signature
import
parseArgument
,
\
createPrivateKeyAndCertificateFile
import
slapos.libnetworkcache
import
slapos.signature
class
NCHandler
(
BaseHTTPServer
.
BaseHTTPRequestHandler
):
...
...
@@ -30,7 +29,7 @@ class NCHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def
do_GET
(
self
):
path
=
os
.
path
.
abspath
(
os
.
path
.
join
(
self
.
tree
,
*
self
.
path
.
split
(
'/'
)))
if
not
(
((
path
==
self
.
tree
)
or
path
.
startswith
(
self
.
tree
+
os
.
path
.
sep
))
((
path
==
self
.
tree
)
or
path
.
startswith
(
self
.
tree
+
os
.
path
.
sep
))
and
os
.
path
.
exists
(
path
)
):
...
...
@@ -91,11 +90,13 @@ class NCHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self
.
wfile
.
write
(
cksum
)
return
class
NCHandlerPOST200
(
NCHandler
):
def
do_POST
(
self
):
self
.
send_response
(
200
)
return
class
NCHandlerReturnWrong
(
NCHandler
):
def
do_POST
(
self
):
cksum
=
'incorrect'
...
...
@@ -106,12 +107,14 @@ class NCHandlerReturnWrong(NCHandler):
self
.
wfile
.
write
(
cksum
)
return
class
Server
(
BaseHTTPServer
.
HTTPServer
):
def
__init__
(
self
,
tree
,
*
args
):
BaseHTTPServer
.
HTTPServer
.
__init__
(
self
,
*
args
)
self
.
tree
=
os
.
path
.
abspath
(
tree
)
__run
=
True
def
serve_forever
(
self
):
while
self
.
__run
:
self
.
handle_request
()
...
...
@@ -119,21 +122,26 @@ class Server(BaseHTTPServer.HTTPServer):
def
handle_error
(
self
,
*
_
):
self
.
__run
=
False
def
_run_nc
(
tree
,
host
,
port
):
server_address
=
(
host
,
port
)
httpd
=
Server
(
tree
,
server_address
,
NCHandler
)
httpd
.
serve_forever
()
class
OfflineTest
(
unittest
.
TestCase
):
def
test_download_offline
(
self
):
nc
=
NetworkcacheClient
(
'http://127.0.0.1:0'
,
'http://127.0.0.1:0'
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
'http://127.0.0.1:0'
,
'http://127.0.0.1:0'
)
self
.
assertRaises
(
IOError
,
nc
.
download
,
'sha512sum'
)
def
test_upload_offline
(
self
):
content
=
tempfile
.
TemporaryFile
()
nc
=
NetworkcacheClient
(
'http://127.0.0.1:0'
,
'http://127.0.0.1:0'
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
'http://127.0.0.1:0'
,
'http://127.0.0.1:0'
)
self
.
assertRaises
(
IOError
,
nc
.
upload
,
content
)
def
wait
(
host
,
port
):
addr
=
host
,
port
for
i
in
range
(
120
):
...
...
@@ -150,6 +158,7 @@ def wait(host, port):
else
:
raise
class
OnlineMixin
:
def
_start_nc
(
self
):
self
.
thread
=
threading
.
Thread
(
target
=
_run_nc
,
args
=
(
self
.
tree
,
...
...
@@ -187,21 +196,22 @@ class OnlineMixin:
self
.
thread
.
join
()
shutil
.
rmtree
(
self
.
tree
)
class
OnlineTest
(
OnlineMixin
,
unittest
.
TestCase
):
"""Online tests against real HTTP server"""
def
test_upload
(
self
):
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
.
upload
(
self
.
test_data
)
def
test_upload_shadir
(
self
):
"""Check scenario with shadir used"""
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
urlmd5
=
str
(
random
.
random
())
nc
.
upload
(
self
.
test_data
,
'mykey'
,
urlmd5
=
urlmd5
,
file_name
=
'my file'
)
def
test_upload_shadir_select
(
self
):
"""Check scenario with shadir used"""
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
urlmd5
=
str
(
random
.
random
())
key
=
'somekey'
+
str
(
random
.
random
())
nc
.
upload
(
self
.
test_data
,
key
,
urlmd5
=
urlmd5
,
file_name
=
'my file'
)
...
...
@@ -210,7 +220,7 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
def
test_upload_shadir_select_not_exists
(
self
):
"""Check scenario with shadir used"""
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
urlmd5
=
str
(
random
.
random
())
key
=
'somekey'
+
str
(
random
.
random
())
nc
.
upload
(
self
.
test_data
,
key
,
urlmd5
=
urlmd5
,
file_name
=
'my file'
)
...
...
@@ -221,13 +231,13 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
def
test_upload_shadir_no_filename
(
self
):
"""Check scenario with shadir used, but not filename passed"""
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
urlmd5
=
str
(
random
.
random
())
self
.
assertRaises
(
ValueError
,
nc
.
upload
,
self
.
test_data
,
'somekey'
,
urlmd5
)
def
test_upload_twice_same
(
self
):
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
.
upload
(
self
.
test_data
)
self
.
test_data
.
seek
(
0
)
nc
.
upload
(
self
.
test_data
)
...
...
@@ -235,7 +245,7 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
def
test_download
(
self
):
# prepare some test data
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
# upload them
nc
.
upload
(
self
.
test_data
)
...
...
@@ -247,17 +257,19 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
self
.
assertEqual
(
result
.
read
(),
self
.
test_string
)
def
test_download_not_exists
(
self
):
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
try
:
nc
.
download
(
self
.
test_shasum
)
except
urllib2
.
HTTPError
,
error
:
self
.
assertEqual
(
error
.
code
,
httplib
.
NOT_FOUND
)
def
_run_nc_POST200
(
tree
,
host
,
port
):
server_address
=
(
host
,
port
)
httpd
=
Server
(
tree
,
server_address
,
NCHandlerPOST200
)
httpd
.
serve_forever
()
class
OnlineTestPOST200
(
OnlineMixin
,
unittest
.
TestCase
):
def
_start_nc
(
self
):
self
.
thread
=
threading
.
Thread
(
target
=
_run_nc_POST200
,
args
=
(
self
.
tree
,
...
...
@@ -268,14 +280,17 @@ class OnlineTestPOST200(OnlineMixin, unittest.TestCase):
def
test_upload_wrong_return_code
(
self
):
"""Check reaction on HTTP return code different then 201"""
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
self
.
assertRaises
(
UploadError
,
nc
.
upload
,
self
.
test_data
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
self
.
assertRaises
(
slapos
.
libnetworkcache
.
UploadError
,
nc
.
upload
,
self
.
test_data
)
def
_run_nc_POSTWrongChecksum
(
tree
,
host
,
port
):
server_address
=
(
host
,
port
)
httpd
=
Server
(
tree
,
server_address
,
NCHandlerReturnWrong
)
httpd
.
serve_forever
()
class
OnlineTestWrongChecksum
(
OnlineMixin
,
unittest
.
TestCase
):
def
_start_nc
(
self
):
self
.
thread
=
threading
.
Thread
(
target
=
_run_nc_POSTWrongChecksum
,
...
...
@@ -286,8 +301,10 @@ class OnlineTestWrongChecksum(OnlineMixin, unittest.TestCase):
def
test_upload_wrong_return_sha
(
self
):
"""Check reaction in case of wrong sha returned"""
nc
=
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
self
.
assertRaises
(
UploadError
,
nc
.
upload
,
self
.
test_data
)
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
self
.
shacache
,
self
.
shadir
)
self
.
assertRaises
(
slapos
.
libnetworkcache
.
UploadError
,
nc
.
upload
,
self
.
test_data
)
class
LibNetworkCacheMixin
(
unittest
.
TestCase
):
...
...
@@ -307,8 +324,10 @@ class LibNetworkCacheMixin(unittest.TestCase):
'--organization-unit-name'
,
'Dev'
,
'--common-name'
,
'R500.com'
,
'--email'
,
'test@example.com'
)
self
.
option_dict
=
parseArgument
(
*
self
.
signature_creation_argument_list
)
self
.
cert_as_text
=
createPrivateKeyAndCertificateFile
(
**
self
.
option_dict
)
self
.
option_dict
=
slapos
.
signature
.
parseArgument
(
*
self
.
signature_creation_argument_list
)
self
.
cert_as_text
=
slapos
.
signature
.
createPrivateKeyAndCertificateFile
(
**
self
.
option_dict
)
def
tearDown
(
self
):
''' Remove the files which have been created during the test. '''
...
...
@@ -333,13 +352,13 @@ class GenerateSignatureScriptTest(LibNetworkCacheMixin):
'signature_private_key_file'
:
'private.pem'
,
'signature_certificate_file'
:
'public.pem'
,
'email'
:
''
}
self
.
assertEquals
(
default_dict
,
parseArgument
())
self
.
assertEquals
(
default_dict
,
slapos
.
signature
.
parseArgument
())
def
test_parse_argument
(
self
):
'''
Check if the argument is properly set.
'''
size_argument_list
=
len
(
self
.
signature_creation_argument_list
)
/
2
size_argument_list
=
len
(
self
.
signature_creation_argument_list
)
/
2
size_option_dict
=
len
(
self
.
option_dict
)
self
.
assertEquals
(
size_argument_list
,
size_option_dict
,
"Argument list should have the same size of option dict."
)
...
...
@@ -359,7 +378,7 @@ class GenerateSignatureScriptTest(LibNetworkCacheMixin):
class
TestNetworkcacheClient
(
LibNetworkCacheMixin
):
"""
Class to test the NetworkcacheClient implementation.
Class to test the
slapos.libnetworkcache.
NetworkcacheClient implementation.
"""
def
setUp
(
self
):
...
...
@@ -375,7 +394,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
def
test_init_backward_compatible
(
self
):
"""Checks that invocation with minimal parameter works fine"""
nc
=
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
)
self
.
assertEqual
(
nc
.
shacache_url
,
self
.
shacache_url
)
self
.
assertTrue
(
nc
.
shadir_host
in
self
.
shadir_url
)
...
...
@@ -384,7 +403,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
"""
Check if the init method is setting the attributes correctly.
"""
nc
=
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
)
self
.
assertEquals
({
'Content-Type'
:
'application/json'
},
\
nc
.
shacache_header_dict
)
...
...
@@ -404,7 +423,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
Without the private key file, it is not possible to create the
signature so it must return an empty string.
"""
nc
=
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
)
self
.
assertEquals
(
''
,
nc
.
_getSignatureString
())
...
...
@@ -412,7 +431,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
"""
Check if the signature creation does not have any error.
"""
nc
=
NetworkcacheClient
(
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
)
...
...
@@ -424,7 +443,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
verify if the signature if trusted or not.
So, the _verifySignatureInCertificateList should return False.
"""
nc
=
NetworkcacheClient
(
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
)
...
...
@@ -440,7 +459,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
if the signature_string is valid and it should return False if the
signature_string is not correct.
"""
nc
=
NetworkcacheClient
(
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
,
...
...
@@ -456,14 +475,14 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
# SimpleHTTPServer? Because actually it gonna just throw an IOError.
def
test_verification_with_signature_certificate_file_list_url
(
self
):
"""
NetworkcacheClient supports to have the certification file under an HTTP
server.
slapos.libnetworkcache.NetworkcacheClient supports to have the
certification file under an HTTP
server.
During the _verifySignatureInCertificateList method, it'll try to
download the certification from the given URL and check if the signature
is valid.
"""
nc
=
NetworkcacheClient
(
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
,
...
...
@@ -475,9 +494,9 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
def
test_signature_verification_priority
(
self
):
"""
During the signature vefirication, the filesystem path has priority over
urls. So, if the public key is
urls. So, if the public key is
"""
nc
=
NetworkcacheClient
(
nc
=
slapos
.
libnetworkcache
.
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment