Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
slapos.libnetworkcache
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Douglas
slapos.libnetworkcache
Commits
c31266d1
Commit
c31266d1
authored
Aug 01, 2011
by
Łukasz Nowak
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert reorganisation.
There is no time yet for such changes in libnetworkcache.
parent
8fd75343
Changes
8
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
113 additions
and
484 deletions
+113
-484
setup.py
setup.py
+1
-1
slapos/libconnection.py
slapos/libconnection.py
+0
-115
slapos/libnetworkcache.py
slapos/libnetworkcache.py
+30
-5
slapos/libnetworkcachetests.py
slapos/libnetworkcachetests.py
+82
-72
slapos/libnetworkcacheutils.py
slapos/libnetworkcacheutils.py
+0
-44
slapos/tests/__init__.py
slapos/tests/__init__.py
+0
-0
slapos/tests/libnetworkcachemixin.py
slapos/tests/libnetworkcachemixin.py
+0
-188
slapos/tests/test_signature.py
slapos/tests/test_signature.py
+0
-59
No files found.
setup.py
View file @
c31266d1
...
@@ -47,5 +47,5 @@ setup(
...
@@ -47,5 +47,5 @@ setup(
zip_safe
=
True
,
zip_safe
=
True
,
packages
=
find_packages
(),
packages
=
find_packages
(),
namespace_packages
=
[
'slapos'
],
namespace_packages
=
[
'slapos'
],
test_suite
=
"slapos.
tests"
,
test_suite
=
"slapos.
libnetworkcachetests"
)
)
slapos/libconnection.py
deleted
100644 → 0
View file @
8fd75343
##############################################################################
#
# Copyright (c) 2010 ViFiB SARL and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import
httplib
import
socket
import
ssl
import
urllib2
import
os
import
tempfile
from
slapos.libnetworkcacheutils
import
parseUrl
CERTIFICATE_FILE_LOCATION_LIST
=
[
'/etc/ssl/certs/ca-certificates.crt'
,
# Debian
'/etc/pki/tls/certs/ca-bundle.crt'
,
# Redhat
'/usr/share/ssl/certs/ca-bundle.crt'
,
# Redhat
'/etc/ssl/cert.pem'
,
# FreeBSD
# Other paths...
'/etc/certs/ca-bundle.crt'
,
'/usr/local/ssl/certs/ca-bundle.crt'
,
'/etc/apache/ssl.crt/ca-bundle.crt'
,
'/usr/share/curl/curl-ca-bundle.crt'
,
'/usr/lib/ssl/cert.pem'
,
]
class
HTTPSConnectionCertificateVerification
(
httplib
.
HTTPSConnection
):
""" HTTPSConnection wrapper to check who we are talking to... """
def
__init__
(
self
,
*
args
,
**
kwargs
):
"""
Find the certificate file location before calling the init method
from the super class.
"""
self
.
ssl_cert_path
=
None
for
p
in
CERTIFICATE_FILE_LOCATION_LIST
:
if
os
.
path
.
exists
(
p
):
self
.
ssl_cert_path
=
p
break
if
self
.
ssl_cert_path
is
None
:
raise
IOError
(
"CA file not found. "
\
"Download it from here: http://curl.haxx.se/ca/cacert.pem"
)
httplib
.
HTTPSConnection
.
__init__
(
self
,
*
args
,
**
kwargs
)
def
connect
(
self
):
""" Connect to a host on a given (SSL) port and verify its certificate. """
sock
=
socket
.
create_connection
((
self
.
host
,
self
.
port
))
if
self
.
_tunnel_host
:
self
.
sock
=
sock
self
.
_tunnel
()
self
.
sock
=
ssl
.
wrap_socket
(
sock
,
ca_certs
=
self
.
ssl_cert_path
,
cert_reqs
=
ssl
.
CERT_REQUIRED
)
cert
=
self
.
sock
.
getpeercert
()
for
field
in
cert
[
'subject'
]:
if
field
[
0
][
0
]
==
'commonName'
:
certhost
=
field
[
0
][
1
]
if
certhost
!=
self
.
host
:
raise
ssl
.
SSLError
(
\
"Host name '%s' doesn't match with the certificate host '%s'"
%
(
self
.
host
,
certhost
))
# XXX(lucas): we should also check the notBefore and notAfter.
# FYI, some hosts does not provides notBefore.
# Then in this only notAfter matters.
class
VerifiedHTTPSHandler
(
urllib2
.
HTTPSHandler
):
def
__init__
(
self
,
connection_class
=
HTTPSConnectionCertificateVerification
):
self
.
specialized_conn_class
=
connection_class
urllib2
.
HTTPSHandler
.
__init__
(
self
)
def
https_open
(
self
,
req
):
return
self
.
do_open
(
self
.
specialized_conn_class
,
req
)
def
install_opener_director
(
info_dict
):
if
info_dict
.
get
(
'scheme'
)
in
(
'https'
,):
https_handler
=
VerifiedHTTPSHandler
()
url_opener
=
urllib2
.
build_opener
(
https_handler
)
urllib2
.
install_opener
(
url_opener
)
return
def
download
(
url
):
info_dict
=
parseUrl
(
url
)
install_opener_director
(
info_dict
)
request
=
urllib2
.
Request
(
url
=
info_dict
.
get
(
'simple_url'
),
data
=
None
,
headers
=
info_dict
.
get
(
'header_dict'
))
response
=
urllib2
.
urlopen
(
request
)
file_descriptor
=
tempfile
.
NamedTemporaryFile
()
while
True
:
data
=
response
.
read
(
1
<<
13
)
file_descriptor
.
write
(
data
)
if
not
data
:
break
file_descriptor
.
seek
(
0
)
return
file_descriptor
slapos/libnetworkcache.py
View file @
c31266d1
...
@@ -13,6 +13,7 @@
...
@@ -13,6 +13,7 @@
##############################################################################
##############################################################################
import
base64
import
hashlib
import
hashlib
import
httplib
import
httplib
import
json
import
json
...
@@ -20,9 +21,8 @@ import os
...
@@ -20,9 +21,8 @@ import os
import
tempfile
import
tempfile
import
urllib
import
urllib
import
urllib2
import
urllib2
import
urlparse
import
M2Crypto
import
M2Crypto
import
slapos.libconnection
from
slapos.libnetworkcacheutils
import
parseUrl
_MARKER
=
([],
[
''
],
None
,
''
)
_MARKER
=
([],
[
''
],
None
,
''
)
...
@@ -36,17 +36,34 @@ class NetworkcacheClient(object):
...
@@ -36,17 +36,34 @@ class NetworkcacheClient(object):
- SHACACHE
- SHACACHE
'''
'''
def
parseUrl
(
self
,
url
):
return_dict
=
{}
parsed_url
=
urlparse
.
urlparse
(
url
)
return_dict
[
'header_dict'
]
=
{
'Content-Type'
:
'application/json'
}
user
=
parsed_url
.
username
passwd
=
parsed_url
.
password
if
user
is
not
None
:
authentication_string
=
'%s:%s'
%
(
user
,
passwd
)
base64string
=
base64
.
encodestring
(
authentication_string
).
strip
()
return_dict
[
'header_dict'
][
'Authorization'
]
=
'Basic %s'
%
\
base64string
return_dict
[
'path'
]
=
parsed_url
.
path
return_dict
[
'host'
]
=
parsed_url
.
hostname
return_dict
[
'port'
]
=
parsed_url
.
port
or
80
return
return_dict
def
__init__
(
self
,
shacache
,
shadir
,
def
__init__
(
self
,
shacache
,
shadir
,
signature_private_key_file
=
None
,
signature_private_key_file
=
None
,
signature_certificate_file_list
=
None
):
signature_certificate_file_list
=
None
):
''' Set the initial values. '''
''' Set the initial values. '''
# ShaCache Properties
# ShaCache Properties
for
k
,
v
in
parseUrl
(
shacache
).
iteritems
():
for
k
,
v
in
self
.
parseUrl
(
shacache
).
iteritems
():
setattr
(
self
,
'shacache_%s'
%
k
,
v
)
setattr
(
self
,
'shacache_%s'
%
k
,
v
)
self
.
shacache_url
=
shacache
self
.
shacache_url
=
shacache
# ShaDir Properties
# ShaDir Properties
for
k
,
v
in
parseUrl
(
shadir
).
iteritems
():
for
k
,
v
in
self
.
parseUrl
(
shadir
).
iteritems
():
setattr
(
self
,
'shadir_%s'
%
k
,
v
)
setattr
(
self
,
'shadir_%s'
%
k
,
v
)
self
.
signature_private_key_file
=
signature_private_key_file
self
.
signature_private_key_file
=
signature_private_key_file
...
@@ -202,7 +219,7 @@ class NetworkcacheClient(object):
...
@@ -202,7 +219,7 @@ class NetworkcacheClient(object):
return
True
return
True
for
certificate_url
in
self
.
signature_certificate_url_list
:
for
certificate_url
in
self
.
signature_certificate_url_list
:
file_descriptor
=
s
lapos
.
libconnection
.
download
(
certificate_url
)
file_descriptor
=
s
elf
.
_fetchCertificateFileFromUrl
(
certificate_url
)
try
:
try
:
file_name
=
file_descriptor
.
name
file_name
=
file_descriptor
.
name
if
self
.
_verifySignatureCertificate
(
signature_string
,
file_name
):
if
self
.
_verifySignatureCertificate
(
signature_string
,
file_name
):
...
@@ -221,6 +238,14 @@ class NetworkcacheClient(object):
...
@@ -221,6 +238,14 @@ class NetworkcacheClient(object):
VerifyEVP
.
verify_update
(
''
)
VerifyEVP
.
verify_update
(
''
)
return
VerifyEVP
.
verify_final
(
signature_string
.
decode
(
'base64'
))
return
VerifyEVP
.
verify_final
(
signature_string
.
decode
(
'base64'
))
def
_fetchCertificateFileFromUrl
(
self
,
certification_file_url
):
""" Download the certification files from the url. """
file_descriptor
=
tempfile
.
NamedTemporaryFile
()
path
,
headers
=
urllib
.
urlretrieve
(
certification_file_url
,
file_descriptor
.
name
)
file_descriptor
.
seek
(
0
)
return
file_descriptor
class
DirectoryNotFound
(
Exception
):
class
DirectoryNotFound
(
Exception
):
pass
pass
...
...
slapos/
tests/test_libnetworkcache
.py
→
slapos/
libnetworkcachetests
.py
View file @
c31266d1
##############################################################################
#
# Copyright (c) 2010 ViFiB SARL and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import
unittest
import
unittest
import
tempfile
import
tempfile
import
urllib2
import
os
import
slapos.libconnection
from
slapos.libnetworkcache
import
NetworkcacheClient
from
slapos.libnetworkcache
import
NetworkcacheClient
from
slapos.
tests.libnetworkcachemixin
import
LibNetworkCacheMixin
,
start_server
,
stop_server
from
slapos.
signature
import
parseArgument
,
\
createPrivateKeyAndCertificateFile
class
OfflineTest
(
unittest
.
TestCase
):
class
OfflineTest
(
unittest
.
TestCase
):
def
test_download_offline
(
self
):
def
test_download_offline
(
self
):
...
@@ -31,6 +15,73 @@ class OfflineTest(unittest.TestCase):
...
@@ -31,6 +15,73 @@ class OfflineTest(unittest.TestCase):
nc
=
NetworkcacheClient
(
'http://127.0.0.1:0'
,
'http://127.0.0.1:0'
)
nc
=
NetworkcacheClient
(
'http://127.0.0.1:0'
,
'http://127.0.0.1:0'
)
self
.
assertRaises
(
IOError
,
nc
.
upload
,
content
)
self
.
assertRaises
(
IOError
,
nc
.
upload
,
content
)
class
LibNetworkCacheMixin
(
unittest
.
TestCase
):
def
setUp
(
self
):
''' Setup the test. '''
self
.
pub_file_descriptor
=
tempfile
.
NamedTemporaryFile
()
self
.
priv_file_descritor
=
tempfile
.
NamedTemporaryFile
()
self
.
signature_certificate_file
=
self
.
pub_file_descriptor
.
name
self
.
signature_private_key_file
=
self
.
priv_file_descritor
.
name
self
.
signature_creation_argument_list
=
\
(
'--signature-certificate-file'
,
self
.
signature_certificate_file
,
'--signature-private-key-file'
,
self
.
signature_private_key_file
,
'--country'
,
'BR'
,
'--state-name'
,
'Campos'
,
'--locality-name'
,
'Rio de Janeiro'
,
'--organization-name'
,
'Nexedi'
,
'--organization-unit-name'
,
'Dev'
,
'--common-name'
,
'R500.com'
,
'--email'
,
'test@example.com'
)
self
.
option_dict
=
parseArgument
(
*
self
.
signature_creation_argument_list
)
self
.
cert_as_text
=
createPrivateKeyAndCertificateFile
(
**
self
.
option_dict
)
def
tearDown
(
self
):
''' Remove the files which have been created during the test. '''
self
.
priv_file_descritor
.
close
()
self
.
pub_file_descriptor
.
close
()
class
GenerateSignatureScriptTest
(
LibNetworkCacheMixin
):
''' Class which must test the signature.py script. '''
def
test_parse_argument_with_empty_list
(
self
):
'''
If the argument list is empty, then the parseArgument method should
return a dictionary with default argument values.
'''
default_dict
=
{
'organization_name'
:
'Default Company Ltd'
,
'state_name'
:
'Default Province'
,
'organization_unit_name'
:
''
,
'common_name'
:
''
,
'country'
:
'XX'
,
'locality_name'
:
'Default City'
,
'signature_private_key_file'
:
'private.pem'
,
'signature_certificate_file'
:
'public.pem'
,
'email'
:
''
}
self
.
assertEquals
(
default_dict
,
parseArgument
())
def
test_parse_argument
(
self
):
'''
Check if the argument is properly set.
'''
size_argument_list
=
len
(
self
.
signature_creation_argument_list
)
/
2
size_option_dict
=
len
(
self
.
option_dict
)
self
.
assertEquals
(
size_argument_list
,
size_option_dict
,
"Argument list should have the same size of option dict."
)
# Assert if the values are equals.
for
value
in
self
.
option_dict
.
values
():
self
.
assertTrue
(
value
in
self
.
signature_creation_argument_list
,
\
'%s is not in %s.'
%
(
value
,
self
.
signature_creation_argument_list
))
def
test_key_and_certificate_file_creation
(
self
):
'''
Check if key file and the certificate file are being created correctly.
'''
self
.
assertTrue
(
os
.
path
.
exists
(
self
.
signature_certificate_file
))
self
.
assertTrue
(
os
.
path
.
exists
(
self
.
signature_private_key_file
))
class
TestNetworkcacheClient
(
LibNetworkCacheMixin
):
class
TestNetworkcacheClient
(
LibNetworkCacheMixin
):
"""
"""
...
@@ -70,7 +121,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
...
@@ -70,7 +121,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
def
test_signature_creation_without_private_key_file
(
self
):
def
test_signature_creation_without_private_key_file
(
self
):
"""
"""
Without the private key file, it is not possible to create the
Without the private key file, it is not possible to create the
signature so it must
retun an empty string.
signature so it must
"""
"""
self
.
assertEquals
(
''
,
self
.
nc
.
_getSignatureString
())
self
.
assertEquals
(
''
,
self
.
nc
.
_getSignatureString
())
...
@@ -87,7 +138,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
...
@@ -87,7 +138,7 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
def
test_verification_without_signature_certificate_file_list
(
self
):
def
test_verification_without_signature_certificate_file_list
(
self
):
"""
"""
Without the signature certificate file list it is not possible to
Without the signature certificate file list it is not possible to
verify if the signature i
s
trusted or not.
verify if the signature i
f
trusted or not.
So, the _verifySignatureInCertificateList should return False.
So, the _verifySignatureInCertificateList should return False.
"""
"""
nc
=
NetworkcacheClient
(
nc
=
NetworkcacheClient
(
...
@@ -118,6 +169,8 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
...
@@ -118,6 +169,8 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
result_bool
=
nc
.
_verifySignatureInCertificateList
(
wrong_signature_string
)
result_bool
=
nc
.
_verifySignatureInCertificateList
(
wrong_signature_string
)
self
.
assertFalse
(
result_bool
)
self
.
assertFalse
(
result_bool
)
# XXX(lucas): Should we provide the file under HTTP server using
# SimpleHTTPServer? Because actually it gonna just throw an IOError.
def
test_verification_with_signature_certificate_file_list_url
(
self
):
def
test_verification_with_signature_certificate_file_list_url
(
self
):
"""
"""
NetworkcacheClient supports to have the certification file under an HTTP
NetworkcacheClient supports to have the certification file under an HTTP
...
@@ -136,59 +189,16 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
...
@@ -136,59 +189,16 @@ class TestNetworkcacheClient(LibNetworkCacheMixin):
self
.
assertRaises
(
IOError
,
\
self
.
assertRaises
(
IOError
,
\
nc
.
_verifySignatureInCertificateList
,
signature_string
)
nc
.
_verifySignatureInCertificateList
,
signature_string
)
def
test_verification_with_non_valid_remote_https_server
(
self
):
"""
If the HTTPS server does not has a valid certificated,
URLError must be raised, because we don't trunk on such server.
"""
https_server_port
,
https_server_thread
,
file_pem
=
start_server
()
https_server_url
=
'https://localhost:%s'
%
https_server_port
try
:
nc
=
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
,
signature_certificate_file_list
=
[
https_server_url
+
self
.
signature_certificate_file
])
signature_string
=
nc
.
_getSignatureString
()
self
.
assertRaises
(
urllib2
.
URLError
,
nc
.
_verifySignatureInCertificateList
,
signature_string
)
finally
:
stop_server
(
https_server_url
,
https_server_thread
)
def
test_verification_with_valid_remote_https_server
(
self
):
"""
If the HTTPS server is a trustable server we must download the
certificate, without any problem.
"""
https_server_port
,
https_server_thread
,
file_pem
=
start_server
()
https_server_url
=
'https://localhost:%s'
%
https_server_port
# making the https server a valid server
slapos
.
libconnection
.
CERTIFICATE_FILE_LOCATION_LIST
=
[
file_pem
]
https_cert_url
=
https_server_url
+
self
.
signature_certificate_file
try
:
nc
=
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
,
signature_certificate_file_list
=
[
https_cert_url
])
signature_string
=
nc
.
_getSignatureString
()
nc
.
_verifySignatureInCertificateList
(
signature_string
)
finally
:
stop_server
(
https_server_url
,
https_server_thread
)
def
test_signature_verification_priority
(
self
):
def
test_signature_verification_priority
(
self
):
"""
"""
During the signature vefirication, the filesystem path has priority over
During the signature vefirication, the filesystem path has priority over
urls. It will only download the certificate from the url if the local
urls. So, if the public key is
certificates are not valid.
"""
"""
nc
=
NetworkcacheClient
(
nc
=
NetworkcacheClient
(
shacache
=
self
.
shacache_url
,
shacache
=
self
.
shacache_url
,
shadir
=
self
.
shadir_url
,
shadir
=
self
.
shadir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
,
signature_private_key_file
=
self
.
signature_private_key_file
,
signature_certificate_file_list
=
[
'http://localhost:0/public.pem'
,
signature_certificate_file_list
=
[
'http://localhost:0/public.pem'
])
self
.
signature_certificate_file
])
signature_string
=
nc
.
_getSignatureString
()
signature_string
=
nc
.
_getSignatureString
()
self
.
assertTrue
(
nc
.
_verifySignatureInCertificateList
(
signature_string
))
self
.
assertRaises
(
IOError
,
\
nc
.
_verifySignatureInCertificateList
,
signature_string
)
slapos/libnetworkcacheutils.py
deleted
100644 → 0
View file @
8fd75343
##############################################################################
#
# Copyright (c) 2010 ViFiB SARL and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import
socket
import
urlparse
import
base64
def
parseUrl
(
url
):
info_dict
=
{}
parsed_url
=
urlparse
.
urlparse
(
url
)
info_dict
[
'header_dict'
]
=
{
'Content-Type'
:
'application/json'
}
user
=
parsed_url
.
username
passwd
=
parsed_url
.
password
if
user
is
not
None
:
authentication_string
=
'%s:%s'
%
(
user
,
passwd
)
base64string
=
base64
.
encodestring
(
authentication_string
).
strip
()
info_dict
[
'header_dict'
][
'Authorization'
]
=
'Basic %s'
%
\
base64string
info_dict
[
'path'
]
=
parsed_url
.
path
info_dict
[
'host'
]
=
parsed_url
.
hostname
info_dict
[
'scheme'
]
=
parsed_url
.
scheme
info_dict
[
'port'
]
=
parsed_url
.
port
or
\
socket
.
getservbyname
(
parsed_url
.
scheme
)
info_dict
[
'simple_url'
]
=
"%s://%s:%s%s"
%
\
(
info_dict
.
get
(
'scheme'
),
info_dict
.
get
(
'host'
),
info_dict
.
get
(
'port'
),
info_dict
.
get
(
'path'
))
return
info_dict
slapos/tests/__init__.py
deleted
100644 → 0
View file @
8fd75343
slapos/tests/libnetworkcachemixin.py
deleted
100644 → 0
View file @
8fd75343
##############################################################################
#
# Copyright (c) 2010 ViFiB SARL and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import
tempfile
import
unittest
import
socket
import
os
import
threading
import
shutil
import
subprocess
import
random
import
httplib
import
time
import
errno
from
SocketServer
import
BaseServer
from
BaseHTTPServer
import
HTTPServer
from
SimpleHTTPServer
import
SimpleHTTPRequestHandler
from
OpenSSL
import
SSL
from
slapos.signature
import
parseArgument
,
\
createPrivateKeyAndCertificateFile
class
LibNetworkCacheMixin
(
unittest
.
TestCase
):
def
setUp
(
self
):
""" Setup the test. """
self
.
pub_file_descriptor
=
tempfile
.
NamedTemporaryFile
()
self
.
priv_file_descritor
=
tempfile
.
NamedTemporaryFile
()
self
.
signature_certificate_file
=
self
.
pub_file_descriptor
.
name
self
.
signature_private_key_file
=
self
.
priv_file_descritor
.
name
self
.
signature_creation_argument_list
=
\
(
'--signature-certificate-file'
,
self
.
signature_certificate_file
,
'--signature-private-key-file'
,
self
.
signature_private_key_file
,
'--country'
,
'BR'
,
'--state-name'
,
'Campos'
,
'--locality-name'
,
'Rio de Janeiro'
,
'--organization-name'
,
'Nexedi'
,
'--organization-unit-name'
,
'Dev'
,
'--common-name'
,
'R500.com'
,
'--email'
,
'test@example.com'
)
self
.
option_dict
=
parseArgument
(
*
self
.
signature_creation_argument_list
)
self
.
cert_as_text
=
createPrivateKeyAndCertificateFile
(
**
self
.
option_dict
)
def
tearDown
(
self
):
""" Remove the files which have been created during the test. """
self
.
priv_file_descritor
.
close
()
self
.
pub_file_descriptor
.
close
()
class
SecureHTTPServer
(
HTTPServer
):
def
__init__
(
self
,
server_address
,
HandlerClass
,
file_pem
):
BaseServer
.
__init__
(
self
,
server_address
,
HandlerClass
)
self
.
file_pem
=
file_pem
ctx
=
SSL
.
Context
(
SSL
.
SSLv23_METHOD
)
ctx
.
use_privatekey_file
(
file_pem
)
ctx
.
use_certificate_file
(
file_pem
)
self
.
socket
=
SSL
.
Connection
(
ctx
,
socket
.
socket
(
self
.
address_family
,
self
.
socket_type
))
self
.
server_bind
()
self
.
server_activate
()
def
shutdown_request
(
self
,
request
):
shutil
.
remove
(
self
.
file_pem
)
request
.
shutdown
()
class
SecureHTTPRequestHandler
(
SimpleHTTPRequestHandler
):
def
setup
(
self
):
self
.
connection
=
self
.
request
self
.
rfile
=
socket
.
_fileobject
(
self
.
request
,
"rb"
,
self
.
rbufsize
)
self
.
wfile
=
socket
.
_fileobject
(
self
.
request
,
"wb"
,
self
.
wbufsize
)
def
do_GET
(
self
):
if
'__stop__'
in
self
.
path
:
raise
SystemExit
try
:
f
=
open
(
self
.
path
,
'rb'
)
except
IOError
:
self
.
send_error
(
404
,
"File not found"
)
return
None
try
:
self
.
send_response
(
200
)
fs
=
os
.
fstat
(
f
.
fileno
())
self
.
send_header
(
"Content-Length"
,
str
(
fs
[
6
]))
self
.
end_headers
()
shutil
.
copyfileobj
(
f
,
self
.
wfile
)
finally
:
f
.
close
()
self
.
setup
()
return
None
# def log_request(self, code):
# pass
def
create_pem_file
():
file_pem
=
tempfile
.
mktemp
()
command_list
=
[
'openssl'
,
'req'
,
'-new'
,
'-x509'
,
'-keyout'
,
file_pem
,
'-out'
,
file_pem
,
'-days'
,
'365'
,
'-nodes'
,
'-batch'
]
popen
=
subprocess
.
Popen
(
command_list
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
)
result
=
popen
.
communicate
(
input
)[
0
]
if
popen
.
returncode
is
None
:
popen
.
kill
()
if
popen
.
returncode
!=
0
:
raise
ValueError
(
'Issue during calling %r, result was:
\
n
%s'
%
\
(
command_list
,
result
))
return
file_pem
def
get_port
():
for
i
in
range
(
10
):
port
=
random
.
randrange
(
20000
,
30000
)
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
try
:
try
:
s
.
connect
((
'localhost'
,
port
))
except
socket
.
error
:
return
port
finally
:
s
.
close
()
raise
RuntimeError
,
"Can't find port"
def
_run_server
(
address
,
port
,
file_pem
):
httpd
=
SecureHTTPServer
((
address
,
port
),
SecureHTTPRequestHandler
,
file_pem
)
httpd
.
serve_forever
()
def
start_server
():
file_pem
=
create_pem_file
()
assert
os
.
path
.
exists
(
file_pem
)
port
=
get_port
()
thread
=
threading
.
Thread
(
target
=
_run_server
,
args
=
(
''
,
port
,
file_pem
),
name
=
''
)
thread
.
setDaemon
(
True
)
thread
.
start
()
# Maybe we should wait for the server...
wait
(
port
,
True
)
return
port
,
thread
,
file_pem
def
stop_server
(
server_url
,
thread
=
None
):
try
:
conn
=
httplib
.
HTTPSConnection
(
server_url
.
split
(
'/'
)[
-
1
])
conn
.
request
(
"GET"
,
"/__stop__"
)
conn
.
getresponse
()
except
Exception
:
pass
if
thread
is
not
None
:
thread
.
join
(
0
)
def
wait
(
port
,
up
):
addr
=
'localhost'
,
port
for
i
in
range
(
120
):
time
.
sleep
(
0.25
)
try
:
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
s
.
connect
(
addr
)
s
.
close
()
if
up
:
break
except
socket
.
error
,
e
:
if
e
[
0
]
not
in
(
errno
.
ECONNREFUSED
,
errno
.
ECONNRESET
):
raise
s
.
close
()
if
not
up
:
break
else
:
if
up
:
raise
else
:
raise
SystemError
(
"Couln't stop server"
)
slapos/tests/test_signature.py
deleted
100644 → 0
View file @
8fd75343
##############################################################################
#
# Copyright (c) 2010 ViFiB SARL and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import
os
from
slapos.tests.libnetworkcachemixin
import
LibNetworkCacheMixin
from
slapos.signature
import
parseArgument
class
GenerateSignatureScriptTest
(
LibNetworkCacheMixin
):
""" Class which must test the signature.py script. """
def
test_parse_argument_with_empty_list
(
self
):
"""
If the argument list is empty, then the parseArgument method should
return a dictionary with default argument values.
"""
default_dict
=
{
'organization_name'
:
'Default Company Ltd'
,
'state_name'
:
'Default Province'
,
'organization_unit_name'
:
''
,
'common_name'
:
''
,
'country'
:
'XX'
,
'locality_name'
:
'Default City'
,
'signature_private_key_file'
:
'private.pem'
,
'signature_certificate_file'
:
'public.pem'
,
'email'
:
''
}
self
.
assertEquals
(
default_dict
,
parseArgument
())
def
test_parse_argument
(
self
):
"""
Check if the argument is properly set.
"""
size_argument_list
=
len
(
self
.
signature_creation_argument_list
)
/
2
size_option_dict
=
len
(
self
.
option_dict
)
self
.
assertEquals
(
size_argument_list
,
size_option_dict
,
"Argument list should have the same size of option dict."
)
# Assert if the values are equals.
for
value
in
self
.
option_dict
.
values
():
self
.
assertTrue
(
value
in
self
.
signature_creation_argument_list
,
\
'%s is not in %s.'
%
(
value
,
self
.
signature_creation_argument_list
))
def
test_key_and_certificate_file_creation
(
self
):
"""
Check if key file and the certificate file are being created correctly.
"""
self
.
assertTrue
(
os
.
path
.
exists
(
self
.
signature_certificate_file
))
self
.
assertTrue
(
os
.
path
.
exists
(
self
.
signature_private_key_file
))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment