Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
slapcache
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
slapcache
Commits
819deee8
Commit
819deee8
authored
Jul 12, 2022
by
Thomas Gambier
🚴🏼
Browse files
Options
Browse Files
Download
Plain Diff
Remove usage of NetworkCache
See merge request
!7
parents
2e989ea4
e883e945
Pipeline
#22659
passed with stage
in 0 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
149 additions
and
295 deletions
+149
-295
slapcache/signature.py
slapcache/signature.py
+67
-127
slapcache/test/test_signature.py
slapcache/test/test_signature.py
+82
-168
No files found.
slapcache/signature.py
View file @
819deee8
...
...
@@ -40,110 +40,6 @@ from random import choice
from
string
import
ascii_lowercase
from
slapos.libnetworkcache
import
NetworkcacheClient
from
slapos.networkcachehelper
import
helper_download_network_cached
class
NetworkCache
:
def
__init__
(
self
,
configuration_path
):
if
not
os
.
path
.
exists
(
configuration_path
):
raise
ValueError
(
"You need configuration file"
)
self
.
configuration
=
configuration_path
self
.
_load
()
def
_load
(
self
):
network_cache_info
=
configparser
.
RawConfigParser
()
network_cache_info
.
read
(
self
.
configuration
)
network_cache_info_dict
=
dict
(
network_cache_info
.
items
(
'networkcache'
))
def
get_
(
name
):
return
network_cache_info_dict
.
get
(
name
)
self
.
download_binary_cache_url
=
get_
(
'download-binary-cache-url'
)
self
.
download_cache_url
=
get_
(
'download-cache-url'
)
self
.
download_binary_dir_url
=
get_
(
'download-binary-dir-url'
)
self
.
signature_certificate_list
=
get_
(
'signature-certificate-list'
)
# Not mandatory
self
.
dir_url
=
get_
(
'upload-dir-url'
)
self
.
cache_url
=
get_
(
'upload-cache-url'
)
key_file_key
=
'signature-private-key-file'
self
.
signature_private_key_file
=
get_
(
key_file_key
)
key_file_old_key
=
'signature_private_key_file'
if
key_file_old_key
in
network_cache_info_dict
:
raise
ValueError
(
'%s is not supported anymore, use %s'
%
(
key_file_old_key
,
key_file_key
))
self
.
shacache_ca_file
=
get_
(
'shacache-ca-file'
)
self
.
shacache_cert_file
=
get_
(
'shacache-cert-file'
)
self
.
shacache_key_file
=
get_
(
'shacache-key-file'
)
self
.
shadir_cert_file
=
get_
(
'shadir-cert-file'
)
self
.
shadir_key_file
=
get_
(
'shadir-key-file'
)
self
.
shadir_ca_file
=
get_
(
'shadir-ca-file'
)
if
network_cache_info
.
has_section
(
'shacache'
):
self
.
directory_key
=
network_cache_info
.
get
(
'shacache'
,
'key'
)
else
:
self
.
directory_key
=
"slapos-upgrade-testing-key"
def
upload
(
self
,
path
,
metadata_dict
,
is_sha256file
=
False
):
"""
Upload an existing file, using a file_descriptor.
"""
if
is_sha256file
:
key
=
self
.
directory_key
+
"-sha256-content"
else
:
key
=
self
.
directory_key
file_descriptor
=
open
(
path
,
'rb'
)
if
not
(
self
.
dir_url
and
self
.
cache_url
):
raise
ValueError
(
"upload-dir-url and/or upload-cache-url is not defined"
)
# backward compatibility
metadata_dict
.
setdefault
(
'file'
,
'notused'
)
metadata_dict
.
setdefault
(
'urlmd5'
,
'notused'
)
# convert '' into None in order to call nc nicely
with
NetworkcacheClient
(
self
.
cache_url
,
self
.
dir_url
,
signature_private_key_file
=
self
.
signature_private_key_file
or
None
,
signature_certificate_list
=
self
.
signature_certificate_list
or
None
,
shacache_ca_file
=
self
.
shacache_ca_file
or
None
,
shacache_cert_file
=
self
.
shacache_cert_file
or
None
,
shacache_key_file
=
self
.
shacache_key_file
or
None
,
shadir_cert_file
=
self
.
shadir_cert_file
or
None
,
shadir_key_file
=
self
.
shadir_key_file
or
None
,
shadir_ca_file
=
self
.
shadir_ca_file
or
None
,
)
as
nc
:
return
nc
.
upload
(
file_descriptor
,
key
,
**
metadata_dict
)
def
download
(
self
,
path
,
wanted_metadata_dict
=
{},
required_key_list
=
[],
strategy
=
None
,
is_sha256file
=
False
):
if
is_sha256file
:
key
=
self
.
directory_key
+
"-sha256-content"
else
:
key
=
self
.
directory_key
result
=
helper_download_network_cached
(
self
.
download_binary_dir_url
,
self
.
download_binary_cache_url
,
self
.
signature_certificate_list
,
key
,
wanted_metadata_dict
,
required_key_list
,
strategy
)
if
result
:
# XXX check if nc filters signature_certificate_list!
# Creates a file with content to desired path.
file_descriptor
,
metadata_dict
=
result
f
=
open
(
path
,
'w+b'
)
try
:
shutil
.
copyfileobj
(
file_descriptor
,
f
)
# XXX method should check MD5.
return
metadata_dict
finally
:
f
.
close
()
file_descriptor
.
close
()
return
False
def
strategy
(
entry_list
):
...
...
@@ -155,13 +51,20 @@ def strategy(entry_list):
best_entry
=
entry
timestamp
=
entry
[
'timestamp'
]
return
best_entry
return
best_entry
class
Signature
:
def
__init__
(
self
,
config
,
logger
=
None
):
self
.
config
=
config
self
.
logger
=
logger
self
.
shacache
=
NetworkcacheClient
(
open
(
self
.
config
.
slapos_configuration
,
'r'
))
network_cache_info
=
configparser
.
RawConfigParser
()
network_cache_info
.
read
(
self
.
config
.
slapos_configuration
)
if
network_cache_info
.
has_section
(
'shacache'
):
self
.
key
=
network_cache_info
.
get
(
'shacache'
,
'key'
)
else
:
self
.
key
=
"slapos-upgrade-testing-key"
def
log
(
self
,
message
,
*
args
):
if
self
.
logger
is
not
None
:
...
...
@@ -179,32 +82,66 @@ class Signature:
return
base
def
save_file_hash
(
self
,
path
,
destination
):
base
=
self
.
get_file_hash
(
path
)
base
=
self
.
get_file_hash
(
path
)
with
open
(
destination
,
"wb"
)
as
f
:
f
.
write
(
base
)
def
_download_once
(
self
,
path
,
wanted_metadata_dict
=
{},
required_key_list
=
[],
is_sha256file
=
False
):
if
is_sha256file
:
key
=
self
.
key
+
"-sha256-content"
else
:
key
=
self
.
key
self
.
log
(
'Downloading %s...'
,
key
)
result
=
self
.
shacache
.
select
(
key
,
wanted_metadata_dict
,
required_key_list
)
entry
=
None
result
=
list
(
result
)
if
result
:
entry
=
strategy
(
result
)
if
not
entry
:
# XXX: this should be the choice of 'strategy' function
self
.
log
(
"Can't find best entry matching strategy, selecting "
"random one between acceptable ones."
)
entry
=
result
[
0
]
if
not
entry
:
self
.
log
(
'No entry matching key %s'
,
key
)
else
:
# XXX check if nc filters signature_certificate_list!
# Creates a file with content to desired path.
f
=
open
(
path
,
'w+b'
)
fd_download
=
self
.
shacache
.
download
(
entry
[
'sha512'
])
try
:
shutil
.
copyfileobj
(
fd_download
,
f
)
# XXX method should check MD5.
return
entry
finally
:
f
.
close
()
fd_download
.
close
()
return
False
def
_download
(
self
,
path
):
"""
Download a tar of the repository from cache, and untar it.
"""
shacache
=
NetworkCache
(
self
.
config
.
slapos_configuration
)
if
shacache
.
signature_certificate_list
is
None
:
raise
ValueError
(
"You need at least one valid signature for download"
)
download_metadata_dict
=
shacache
.
download
(
path
=
path
,
required_key_list
=
[
'timestamp'
],
strategy
=
strategy
)
try
:
download_metadata_dict
=
self
.
_download_once
(
path
=
path
,
required_key_list
=
[
'timestamp'
])
except
:
return
False
if
download_metadata_dict
:
self
.
log
(
'File downloaded in %s'
,
path
)
current_sha256
=
self
.
get_file_hash
(
path
)
with
tempfile
.
NamedTemporaryFile
()
as
f_256
:
sha256path
=
f_256
.
name
if
shacache
.
download
(
path
=
sha256path
,
required_key_list
=
[
'timestamp'
],
strategy
=
strategy
,
is_sha256file
=
True
):
try
:
sha256sum_present
=
self
.
_download_once
(
path
=
sha256path
,
required_key_list
=
[
'timestamp'
],
is_sha256file
=
True
)
self
.
log
(
'sha 256 downloaded in %s'
,
sha256path
)
expected_sha256
=
f_256
.
read
()
except
:
sha256sum_present
=
False
if
sha256sum_present
:
expected_sha256
=
f_256
.
read
()
if
current_sha256
==
expected_sha256
:
return
True
else
:
...
...
@@ -230,29 +167,32 @@ class Signature:
"""
Creates uploads repository to cache.
"""
shacache
=
NetworkCache
(
self
.
config
.
slapos_configuration
)
sha256path
=
path
+
".sha256"
self
.
save_file_hash
(
path
,
sha256path
)
metadata_dict
=
{
# XXX: we set date from client side. It can be potentially dangerous
# as it can be badly configured.
'timestamp'
:
time
.
time
(),
'token'
:
''
.
join
([
choice
(
ascii_lowercase
)
for
_
in
range
(
128
)])
'token'
:
''
.
join
([
choice
(
ascii_lowercase
)
for
_
in
range
(
128
)]),
# backward compatibility
'file'
:
'notused'
,
'urlmd5'
:
'notused'
}
try
:
sha512sum
=
s
hacache
.
upload
(
path
=
path
,
metadata_dict
=
metadata_dict
)
sha512sum
=
s
elf
.
shacache
.
upload
(
open
(
path
,
'rb'
),
self
.
key
,
**
metadata_dict
)
if
sha512sum
:
self
.
log
(
'Uploaded %s to cache (using %s key) with SHA512 %s.'
,
path
,
shacache
.
directory_key
,
sha512sum
)
sha512sum_path
=
shacache
.
upload
(
path
=
sha256path
,
metadata_dict
=
metadata_dict
,
is_sha256file
=
True
)
self
.
key
,
sha512sum
)
sha512sum_path
=
self
.
shacache
.
upload
(
open
(
sha256path
,
'rb'
),
self
.
key
+
"-sha256-content"
,
**
metadata_dict
)
if
sha512sum_path
:
self
.
log
(
'Uploaded %s to cache (using %s key) with SHA512 %s.'
,
sha256path
,
s
hacache
.
directory_
key
,
sha512sum_path
)
s
elf
.
key
,
sha512sum_path
)
else
:
self
.
log
(
'Fail to upload sha256file file to cache.'
)
else
:
...
...
@@ -263,7 +203,7 @@ class Signature:
def
upload
(
self
):
self
.
_upload
(
self
.
config
.
file
)
# Class containing all parameters needed for configuration
class
Config
:
def
__init__
(
self
,
option_dict
=
None
):
...
...
slapcache/test/test_signature.py
View file @
819deee8
# uncompyle6 version 3.6.5
# Python bytecode 2.7 (62211)
# Decompiled from: Python 3.7.3 (default, Apr 3 2019, 05:39:12)
# [GCC 8.3.0]
# Embedded file name: /root/slapos.package/slapcache/test/test_signature.py
# Compiled at: 2015-06-10 21:40:55
from
slapcache
import
signature
from
slapcache.signature
import
Signature
,
Config
,
strategy
from
slapos.libnetworkcache
import
NetworkcacheClient
from
optparse
import
Values
import
slapos.signature
,
time
,
difflib
,
tempfile
,
unittest
,
os
import
slapos.signature
,
tempfile
,
unittest
,
os
,
sys
def
_fake_call
(
self
,
*
args
,
**
kw
):
self
.
last_call
=
(
args
,
kw
)
UPGRADE_KEY
=
"""[debian-default]
KEY_CONTENT
=
"""[debian-default]
repository-list =
main = http://ftp.fr.debian.org/debian/ wheezy main
main-src = http://ftp.fr.debian.org/debian/ wheezy main
...
...
@@ -58,51 +51,6 @@ upgrade = 2014-06-04
"""
UPGRADE_KEY_WITHOUT_KEY_LIST
=
"""[debian-default]
repository-list =
main = http://ftp.fr.debian.org/debian/ wheezy main
main-src = http://ftp.fr.debian.org/debian/ wheezy main
update = http://ftp.fr.debian.org/debian/ wheezy-updates main
update-src = http://ftp.fr.debian.org/debian/ wheezy-updates main
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/Debian_7.0/ ./
re6stnet = http://git.erp5.org/dist/deb ./
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
filter-promise-list =
core
signature-list =
debian+++jessie/sid+++
debian+++7.4+++
debian+++7.5+++
debian+++7.3+++
debian+++7+++
[opensuse-legacy]
repository-list =
suse = http://download.opensuse.org/distribution/12.1/repo/oss/
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/openSUSE_12.1/
re6st = http://git.erp5.org/dist/rpm
filter-promise-list =
core
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
signature-list =
opensuse+++12.1+++x86_64
[system]
reboot = 2011-10-10
upgrade = 2014-06-04
"""
SIGNATURE
=
"""-----BEGIN CERTIFICATE-----
MIIB8DCCAVmgAwIBAgIJAPFf61p8y809MA0GCSqGSIb3DQEBBQUAMBAxDjAMBgNV
BAMMBUNPTVAtMCAXDTE0MDIxNzE2NDgxN1oYDzIxMTQwMTI0MTY0ODE3WjAQMQ4w
...
...
@@ -116,36 +64,15 @@ Jr1kUrs8Fg3ig8eRFQlBSLYfANIUxcQ2ScFAkmsvwXY3Md7uaSvMJsEl2jcjdmdi
eSreNkx85j9GtMLY/2cv0kF4yAQNRtibtDkbg6fRNkmUopDosJNVf79l1GKX8JFL
zZBOFdOaLYY/6dLRwiTUKHU6su8=
-----END CERTIFICATE-----"""
BASE_UPDATE_CFG_DATA
=
"""
[networkcache]
download-binary-cache-url = http://www.shacache.org/shacache
download-cache-url = http
s
://www.shacache.org/shacache
download-cache-url = http://www.shacache.org/shacache
download-binary-dir-url = http://www.shacache.org/shadir
"""
UPDATE_CFG_DATA
=
"""
[shacache]
key = slapos-upgrade-testing-key-with-config-file-invalid
"""
+
BASE_UPDATE_CFG_DATA
UPDATE_UPLOAD_CFG_DATA
=
"""
[shacache]
key = slapos-upgrade-testing-key-with-config-file
"""
+
BASE_UPDATE_CFG_DATA
VALID_UPDATE_CFG_DATA
=
"""
[shacache]
key = 'slapos-upgrade-testing-key-with-config-file-valid'
"""
+
BASE_UPDATE_CFG_DATA
UPDATE_CFG_WITH_UPLOAD_DATA
=
UPDATE_CFG_DATA
+
"""
signature-private-key-file = /etc/opt/slapos/signature.key
upload-cache-url = https://www.shacache.org/shacache
shacache-cert-file = /etc/opt/slapos/shacache.crt
shacache-key-file = /etc/opt/slapos/shacache.key
upload-dir-url = https://www.shacache.org/shadir
shadir-cert-file = /etc/opt/slapos/shacache.crt
shadir-key-file = /etc/opt/slapos/shacache.key
download-dir-url = http://www.shacache.org/shadir
"""
def
_fake_upload
(
self
,
*
args
,
**
kwargs
):
...
...
@@ -157,95 +84,36 @@ class NetworkCacheTestCase(unittest.TestCase):
def
setUp
(
self
):
self
.
original_networkcache_upload
=
NetworkcacheClient
.
upload
NetworkcacheClient
.
upload
=
_fake_upload
self
.
config_dict
=
{
'slapos_configuration'
:
self
.
_createConfigurationFile
(),
'srv_file'
:
'/tmp/test_base_promise_slapupdate'
,
'dry_run'
:
False
,
'verbose'
:
False
}
self
.
config_dict
=
Config
()
def
tearDown
(
self
):
NetworkcacheClient
.
upload
=
self
.
original_networkcache_upload
def
_createConfigurationFile
(
self
):
with
open
(
'/tmp/test_signature_000000_configuration.cfg'
,
'w'
)
as
(
configuration_file
):
configuration_file
.
write
(
VALID_UPDATE_CFG_DATA
)
return
'/tmp/test_signature_000000_configuration.cfg'
def
_createConfigurationFile
(
self
,
key
,
with_upload
=
False
,
with_signature
=
None
):
self
.
tmp_dir
=
tempfile
.
mkdtemp
()
assert
(
not
(
with_upload
and
with_signature
))
configuration_file_path
=
os
.
path
.
join
(
self
.
tmp_dir
,
'slapcache.conf'
)
signature_certificate_file
=
os
.
path
.
join
(
self
.
tmp_dir
,
'shacache.cert'
)
signature_private_key_file
=
os
.
path
.
join
(
self
.
tmp_dir
,
'shacache.key'
)
slapos
.
signature
.
generateCertificate
(
signature_certificate_file
,
signature_private_key_file
,
'COMP-123A'
)
_fake_signature_path
=
os
.
path
.
join
(
self
.
tmp_dir
,
'fake_file'
)
open
(
_fake_signature_path
,
'w'
).
write
(
'# XXX ...'
)
# KEY used by slapcache
content
=
"""
[shacache]
key = %s
def
test_basic_configuration
(
self
):
info
,
self
.
configuration_file_path
=
tempfile
.
mkstemp
()
configuration_content
=
UPDATE_CFG_DATA
+
"""
signature-certificate-list = %(certificate)s
"""
%
{
'certificate'
:
'
\
n
'
.
join
(
' '
+
l
if
l
.
strip
()
else
'
\
n
'
for
l
in
SIGNATURE
.
splitlines
())}
print
(
configuration_content
)
open
(
self
.
configuration_file_path
,
'w'
).
write
(
configuration_content
)
shacache
=
signature
.
NetworkCache
(
self
.
configuration_file_path
)
self
.
assertEqual
(
shacache
.
download_binary_cache_url
,
'http://www.shacache.org/shacache'
)
self
.
assertEqual
(
shacache
.
download_cache_url
,
'https://www.shacache.org/shacache'
)
self
.
assertEqual
(
shacache
.
download_binary_dir_url
,
'http://www.shacache.org/shadir'
)
self
.
assertEqual
(
shacache
.
signature_certificate_list
,
SIGNATURE
)
self
.
assertEqual
(
shacache
.
directory_key
,
'slapos-upgrade-testing-key-with-config-file-invalid'
)
self
.
assertEqual
(
shacache
.
dir_url
,
None
)
self
.
assertEqual
(
shacache
.
cache_url
,
None
)
self
.
assertEqual
(
shacache
.
signature_private_key_file
,
None
)
self
.
assertEqual
(
shacache
.
shacache_cert_file
,
None
)
self
.
assertEqual
(
shacache
.
shacache_key_file
,
None
)
self
.
assertEqual
(
shacache
.
shadir_cert_file
,
None
)
self
.
assertEqual
(
shacache
.
shadir_key_file
,
None
)
return
"""
%
key
# basic URLS
content
+=
BASE_UPDATE_CFG_DATA
def
test_basic_configuration_with_upload
(
self
):
info
,
self
.
configuration_file_path
=
tempfile
.
mkstemp
()
configuration_content
=
UPDATE_CFG_WITH_UPLOAD_DATA
+
"""
if
with_signature
:
content
+=
"""
signature-certificate-list = %(certificate)s
"""
%
{
'certificate'
:
'
\
n
'
.
join
(
' '
+
l
if
l
.
strip
()
else
'
\
n
'
for
l
in
SIGNATURE
.
splitlines
())}
open
(
self
.
configuration_file_path
,
'w'
).
write
(
configuration_content
)
shacache
=
signature
.
NetworkCache
(
self
.
configuration_file_path
)
self
.
assertEqual
(
shacache
.
download_binary_cache_url
,
'http://www.shacache.org/shacache'
)
self
.
assertEqual
(
shacache
.
download_cache_url
,
'https://www.shacache.org/shacache'
)
self
.
assertEqual
(
shacache
.
download_binary_dir_url
,
'http://www.shacache.org/shadir'
)
self
.
assertEqual
(
shacache
.
signature_certificate_list
,
SIGNATURE
)
self
.
assertEqual
(
shacache
.
directory_key
,
'slapos-upgrade-testing-key-with-config-file-invalid'
)
self
.
assertEqual
(
shacache
.
dir_url
,
'https://www.shacache.org/shadir'
)
self
.
assertEqual
(
shacache
.
cache_url
,
'https://www.shacache.org/shacache'
)
self
.
assertEqual
(
shacache
.
shacache_cert_file
,
'/etc/opt/slapos/shacache.crt'
)
self
.
assertEqual
(
shacache
.
shacache_key_file
,
'/etc/opt/slapos/shacache.key'
)
self
.
assertEqual
(
shacache
.
shadir_cert_file
,
'/etc/opt/slapos/shacache.crt'
)
self
.
assertEqual
(
shacache
.
shadir_key_file
,
'/etc/opt/slapos/shacache.key'
)
self
.
assertEqual
(
shacache
.
signature_private_key_file
,
'/etc/opt/slapos/signature.key'
)
def
test_configuration_file_dont_exist
(
self
):
self
.
assertRaises
(
ValueError
,
signature
.
NetworkCache
,
'/abc/123'
)
def
test_download_not_existing_signature_from_cache
(
self
):
info
,
path
=
tempfile
.
mkstemp
()
info
,
self
.
configuration_file_path
=
tempfile
.
mkstemp
()
open
(
self
.
configuration_file_path
,
'w'
).
write
(
UPDATE_CFG_DATA
)
shacache
=
signature
.
NetworkCache
(
self
.
configuration_file_path
)
self
.
assertEqual
(
False
,
shacache
.
download
(
path
=
path
,
required_key_list
=
[
'timestamp'
],
strategy
=
signature
.
strategy
))
self
.
assertEqual
(
''
,
open
(
path
,
'r'
).
read
())
"""
%
{
'certificate'
:
'
\
n
'
.
join
(
' '
+
l
if
l
.
strip
()
else
'
\
n
'
for
l
in
with_signature
.
splitlines
())}
def
test_download_existing_from_cache
(
self
):
info
,
path
=
tempfile
.
mkstemp
()
info
,
self
.
configuration_file_path
=
tempfile
.
mkstemp
()
open
(
self
.
configuration_file_path
,
'w'
).
write
(
VALID_UPDATE_CFG_DATA
)
shacache
=
signature
.
NetworkCache
(
self
.
configuration_file_path
)
shacache
.
download
(
path
=
path
,
required_key_list
=
[
'timestamp'
],
strategy
=
signature
.
strategy
)
self
.
maxDiff
=
None
self
.
assertEqual
(
UPGRADE_KEY
.
splitlines
(),
open
(
path
,
'r'
).
read
().
splitlines
())
return
def
test_upload_to_cache
(
self
):
info
,
path
=
tempfile
.
mkstemp
()
info
,
_fake_signature_path
=
tempfile
.
mkstemp
()
info
,
self
.
configuration_file_path
=
tempfile
.
mkstemp
()
signature_certificate_file
=
'/tmp/signature_certificate_file_demo_test'
if
os
.
path
.
exists
(
signature_certificate_file
):
os
.
remove
(
signature_certificate_file
)
signature_private_key_file
=
'/tmp/signature_private_key_file_demo_test'
if
os
.
path
.
exists
(
signature_private_key_file
):
os
.
remove
(
signature_private_key_file
)
slapos
.
signature
.
generateCertificate
(
signature_certificate_file
,
signature_private_key_file
,
'COMP-123A'
)
configuration_content
=
UPDATE_UPLOAD_CFG_DATA
+
"""
if
with_upload
:
content
+=
"""
signature-private-key-file = %(signature_private_key_file)s
upload-cache-url = https://www.shacache.org/shacache
shacache-cert-file = %(tempfile)s
...
...
@@ -260,13 +128,59 @@ signature-certificate-list =
'signature_private_key_file'
:
signature_private_key_file
,
'certificate'
:
''
.
join
(
' '
+
l
if
l
.
strip
()
else
'
\
n
'
for
l
in
open
(
signature_certificate_file
,
'r'
).
readlines
())
}
open
(
self
.
configuration_file_path
,
'w'
).
write
(
configuration_content
)
open
(
_fake_signature_path
,
'w'
).
write
(
'# XXX ...'
)
shacache
=
signature
.
NetworkCache
(
self
.
configuration_file_path
)
metadata_dict
=
{
'timestamp'
:
time
.
time
()}
shacache
.
upload
(
path
=
path
,
metadata_dict
=
metadata_dict
)
with
open
(
configuration_file_path
,
'w'
)
as
configuration_file
:
configuration_file
.
write
(
content
)
return
configuration_file_path
def
test_basic_configuration
(
self
):
self
.
config_dict
.
slapos_configuration
=
self
.
_createConfigurationFile
(
"slapos-upgrade-testing-key-with-config-file-invalid"
)
shacache
=
Signature
(
self
.
config_dict
)
self
.
assertEqual
(
shacache
.
key
,
'slapos-upgrade-testing-key-with-config-file-invalid'
)
def
test_basic_configuration_with_upload
(
self
):
self
.
config_dict
.
slapos_configuration
=
self
.
_createConfigurationFile
(
"slapos-upgrade-testing-key-with-config-file-invalid"
,
True
)
shacache
=
Signature
(
self
.
config_dict
)
self
.
assertEqual
(
shacache
.
key
,
'slapos-upgrade-testing-key-with-config-file-invalid'
)
def
test_configuration_file_dont_exist
(
self
):
self
.
config_dict
.
slapos_configuration
=
'/abc/123'
if
sys
.
version_info
.
major
<
3
:
self
.
assertRaises
(
IOError
,
Signature
,
self
.
config_dict
)
else
:
self
.
assertRaises
(
FileNotFoundError
,
Signature
,
self
.
config_dict
)
def
test_download_not_existing
(
self
):
_
,
path
=
tempfile
.
mkstemp
()
self
.
config_dict
.
slapos_configuration
=
self
.
_createConfigurationFile
(
"slapos-upgrade-testing-key-with-config-file-invalid"
)
shacache
=
Signature
(
self
.
config_dict
)
self
.
assertEqual
(
False
,
shacache
.
_download
(
path
=
path
))
self
.
assertEqual
(
''
,
open
(
path
,
'r'
).
read
())
def
test_download_existing
(
self
):
_
,
path
=
tempfile
.
mkstemp
()
# WARNING, the real key has ' inside
self
.
config_dict
.
slapos_configuration
=
self
.
_createConfigurationFile
(
"'slapos-upgrade-testing-key-with-config-file-valid'"
)
shacache
=
Signature
(
self
.
config_dict
)
shacache
.
_download
(
path
=
path
)
self
.
maxDiff
=
None
self
.
assertEqual
(
KEY_CONTENT
.
splitlines
(),
open
(
path
,
'r'
).
read
().
splitlines
())
def
test_download_existing_without_signature_cert
(
self
):
_
,
path
=
tempfile
.
mkstemp
()
# WARNING, the real key has ' inside
self
.
config_dict
.
slapos_configuration
=
self
.
_createConfigurationFile
(
"'slapos-upgrade-testing-key-with-config-file-valid'"
,
with_signature
=
SIGNATURE
)
shacache
=
Signature
(
self
.
config_dict
)
shacache
.
_download
(
path
=
path
)
self
.
maxDiff
=
None
self
.
assertEqual
(
KEY_CONTENT
.
splitlines
(),
open
(
path
,
'r'
).
read
().
splitlines
())
def
test_upload_to_cache
(
self
):
info
,
path
=
tempfile
.
mkstemp
()
self
.
config_dict
.
slapos_configuration
=
self
.
_createConfigurationFile
(
"slapos-upgrade-testing-key-with-config-file-invalid"
,
True
)
shacache
=
Signature
(
self
.
config_dict
)
shacache
.
_upload
(
path
=
path
)
def
test_signature_strategy
(
self
):
entry_list
=
[{
'timestamp'
:
123824.0
},
{
'timestamp'
:
12345.0
},
{
'timestamp'
:
13345.0
},
{
'timestamp'
:
12344.0
},
{
'timestamp'
:
12045.0
}]
self
.
assertEqual
(
signature
.
strategy
(
entry_list
),
{
'timestamp'
:
123824.0
})
# okay decompiling slapcache/test/test_signature.pyc
self
.
assertEqual
(
strategy
(
entry_list
),
{
'timestamp'
:
123824.0
})
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment