Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Esteban Blanc
slapos
Commits
62ccd142
Commit
62ccd142
authored
Aug 23, 2020
by
Jérome Perrin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
software/erp5/test: refactor to use managed resources
parent
be3ec684
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
210 additions
and
221 deletions
+210
-221
software/erp5/test/setup.py
software/erp5/test/setup.py
+1
-0
software/erp5/test/test/test_balancer.py
software/erp5/test/test/test_balancer.py
+208
-221
software/slapos-sr-testing/software.cfg
software/slapos-sr-testing/software.cfg
+1
-0
No files found.
software/erp5/test/setup.py
View file @
62ccd142
...
@@ -51,6 +51,7 @@ setup(name=name,
...
@@ -51,6 +51,7 @@ setup(name=name,
'backports.lzma'
,
'backports.lzma'
,
'cryptography'
,
'cryptography'
,
'pyOpenSSL'
,
'pyOpenSSL'
,
'typing; python_version<"3"'
,
],
],
test_suite
=
'test'
,
test_suite
=
'test'
,
)
)
software/erp5/test/test/test_balancer.py
View file @
62ccd142
from
.
import
ERP5InstanceTestCase
from
.
import
setUpModule
from
slapos.testing.utils
import
findFreeTCPPort
from
BaseHTTPServer
import
HTTPServer
from
BaseHTTPServer
import
BaseHTTPRequestHandler
import
OpenSSL.SSL
from
cryptography.hazmat.backends
import
default_backend
from
cryptography.hazmat.primitives
import
serialization
,
hashes
from
cryptography.hazmat.primitives.asymmetric
import
rsa
from
cryptography
import
x509
from
cryptography.x509.oid
import
NameOID
import
hashlib
import
hashlib
import
json
import
json
import
multiprocess
ing
import
logg
ing
import
os
import
os
import
re
quests
import
re
import
shutil
import
shutil
import
subprocess
import
subprocess
import
tempfile
import
tempfile
import
time
import
time
from
BaseHTTPServer
import
BaseHTTPRequestHandler
from
typing
import
Dict
import
mock
import
OpenSSL.SSL
import
requests
from
cryptography
import
x509
from
cryptography.hazmat.backends
import
default_backend
from
cryptography.hazmat.primitives
import
hashes
,
serialization
from
cryptography.hazmat.primitives.asymmetric
import
rsa
from
cryptography.x509.oid
import
NameOID
from
slapos.testing.testcase
import
ManagedResource
from
slapos.testing.utils
import
(
CrontabMixin
,
ManagedHTTPServer
,
findFreeTCPPort
)
from
.
import
ERP5InstanceTestCase
,
setUpModule
setUpModule
# pyflakes
setUpModule
# pyflakes
class
TestHandler
(
BaseHTTPRequestHandler
):
def
do_GET
(
self
):
self
.
send_response
(
200
)
self
.
send_header
(
"Content-Type"
,
"application/json"
)
response
=
{
'Path'
:
self
.
path
,
'Incoming Headers'
:
self
.
headers
.
dict
}
response
=
json
.
dumps
(
response
,
indent
=
2
)
self
.
end_headers
()
self
.
wfile
.
write
(
response
)
class
TestFrontendXForwardedFor
(
ERP5InstanceTestCase
):
class
EchoHTTPServer
(
ManagedHTTPServer
):
__partition_reference__
=
'xff'
"""An HTTP Server responding with the request path and incoming headers,
http_server_process
=
None
encoded in json.
frontend_caucase_dir
=
None
"""
frontend_caucased_process
=
None
class
RequestHandler
(
BaseHTTPRequestHandler
):
backend_caucase_dir
=
None
def
do_GET
(
self
):
backend_caucased_process
=
None
# type: () -> None
self
.
send_response
(
200
)
self
.
send_header
(
"Content-Type"
,
"application/json"
)
response
=
json
.
dumps
(
{
'Path'
:
self
.
path
,
'Incoming Headers'
:
self
.
headers
.
dict
},
indent
=
2
,
)
self
.
end_headers
()
self
.
wfile
.
write
(
response
)
@
classmethod
log_message
=
logging
.
getLogger
(
__name__
+
'.HeaderEchoHandler'
).
info
def
getInstanceSoftwareType
(
cls
):
return
'balancer'
@
classmethod
def
setUpClass
(
cls
):
# start a dummy web server echoing headers.
http_server_port
=
findFreeTCPPort
(
cls
.
_ipv4_address
)
server
=
HTTPServer
(
(
cls
.
_ipv4_address
,
http_server_port
),
TestHandler
)
cls
.
http_server_process
=
multiprocessing
.
Process
(
target
=
server
.
serve_forever
,
name
=
'HTTPServer'
)
cls
.
http_server_process
.
start
()
cls
.
http_server_netloc
=
'%s:%s'
%
(
cls
.
_ipv4_address
,
http_server_port
)
# start a caucased and generate a valid client certificate.
cls
.
computer_partition_root_path
=
os
.
path
.
abspath
(
os
.
curdir
)
cls
.
frontend_caucase_dir
=
tempfile
.
mkdtemp
()
frontend_caucased_dir
=
os
.
path
.
join
(
cls
.
frontend_caucase_dir
,
'caucased'
)
os
.
mkdir
(
frontend_caucased_dir
)
frontend_user_dir
=
os
.
path
.
join
(
cls
.
frontend_caucase_dir
,
'user'
)
os
.
mkdir
(
frontend_user_dir
)
frontend_service_dir
=
os
.
path
.
join
(
cls
.
frontend_caucase_dir
,
'service'
)
os
.
mkdir
(
frontend_service_dir
)
frontend_caucased_netloc
=
'%s:%s'
%
(
cls
.
_ipv4_address
,
findFreeTCPPort
(
cls
.
_ipv4_address
))
cls
.
frontend_caucased_url
=
'http://'
+
frontend_caucased_netloc
cls
.
user_certificate
=
frontend_user_key
=
os
.
path
.
join
(
frontend_user_dir
,
'client.key.pem'
)
frontend_user_csr
=
os
.
path
.
join
(
frontend_user_dir
,
'client.csr.pem'
)
key
=
rsa
.
generate_private_key
(
class
CaucaseService
(
ManagedResource
):
public_exponent
=
65537
,
"""A caucase service.
key_size
=
2048
,
"""
backend
=
default_backend
()
url
=
None
# type: str
)
directory
=
None
# type: str
with
open
(
frontend_user_key
,
'wb'
)
as
f
:
_caucased_process
=
None
# type: subprocess.Popen
f
.
write
(
key
.
private_bytes
(
encoding
=
serialization
.
Encoding
.
PEM
,
format
=
serialization
.
PrivateFormat
.
TraditionalOpenSSL
,
encryption_algorithm
=
serialization
.
NoEncryption
(),
))
csr
=
x509
.
CertificateSigningRequestBuilder
().
subject_name
(
x509
.
Name
([
x509
.
NameAttribute
(
NameOID
.
COMMON_NAME
,
u'user'
),
])).
sign
(
key
,
hashes
.
SHA256
(),
default_backend
())
with
open
(
frontend_user_csr
,
'wb'
)
as
f
:
f
.
write
(
csr
.
public_bytes
(
serialization
.
Encoding
.
PEM
))
cls
.
software_release_root_path
=
os
.
path
.
join
(
def
open
(
self
):
cls
.
slap
.
_software_root
,
# type: () -> None
hashlib
.
md5
(
cls
.
getSoftwareURL
()).
hexdigest
(),
# start a caucased and server certificate.
software_release_root_path
=
os
.
path
.
join
(
self
.
_cls
.
slap
.
_software_root
,
hashlib
.
md5
(
self
.
_cls
.
getSoftwareURL
().
encode
()).
hexdigest
(),
)
)
caucased_path
=
os
.
path
.
join
(
cls
.
software_release_root_path
,
'bin'
,
'caucased'
)
caucased_path
=
os
.
path
.
join
(
software_release_root_path
,
'bin'
,
'caucased'
)
caucase_path
=
os
.
path
.
join
(
cls
.
software_release_root_path
,
'bin'
,
'caucase'
)
cls
.
frontend_caucased_process
=
subprocess
.
Popen
(
self
.
directory
=
tempfile
.
mkdtemp
()
[
caucased_dir
=
os
.
path
.
join
(
self
.
directory
,
'caucased'
)
caucased_path
,
os
.
mkdir
(
caucased_dir
)
'--db'
,
os
.
path
.
join
(
frontend_caucased_dir
,
'caucase.sqlite'
),
os
.
mkdir
(
os
.
path
.
join
(
caucased_dir
,
'user'
))
'--server-key'
,
os
.
path
.
join
(
frontend_caucased_dir
,
'server.key.pem'
),
os
.
mkdir
(
os
.
path
.
join
(
caucased_dir
,
'service'
))
'--netloc'
,
frontend_caucased_netloc
,
'--service-auto-approve-count'
,
'1'
,
backend_caucased_netloc
=
'%s:%s'
%
(
self
.
_cls
.
_ipv4_address
,
findFreeTCPPort
(
self
.
_cls
.
_ipv4_address
))
],
self
.
url
=
'http://'
+
backend_caucased_netloc
stdout
=
subprocess
.
PIPE
,
self
.
_caucased_process
=
subprocess
.
Popen
(
stderr
=
subprocess
.
STDOUT
,
[
caucased_path
,
'--db'
,
os
.
path
.
join
(
caucased_dir
,
'caucase.sqlite'
),
'--server-key'
,
os
.
path
.
join
(
caucased_dir
,
'server.key.pem'
),
'--netloc'
,
backend_caucased_netloc
,
'--service-auto-approve-count'
,
'1'
,
],
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
,
)
)
for
_
in
range
(
30
):
for
_
in
range
(
30
):
try
:
try
:
if
requests
.
get
(
cls
.
frontend_caucased_
url
).
status_code
==
200
:
if
requests
.
get
(
self
.
url
).
status_code
==
200
:
break
break
except
Exception
:
except
Exception
:
pass
pass
...
@@ -118,173 +94,184 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase):
...
@@ -118,173 +94,184 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase):
else
:
else
:
raise
RuntimeError
(
'caucased failed to start.'
)
raise
RuntimeError
(
'caucased failed to start.'
)
cau_args
=
[
def
close
(
self
):
caucase_path
,
# type: () -> None
'--ca-url'
,
cls
.
frontend_caucased_url
,
self
.
_caucased_process
.
terminate
()
'--ca-crt'
,
os
.
path
.
join
(
frontend_user_dir
,
'service-ca-crt.pem'
),
self
.
_caucased_process
.
wait
()
'--crl'
,
os
.
path
.
join
(
frontend_user_dir
,
'service.crl'
),
shutil
.
rmtree
(
self
.
directory
)
'--user-ca-crt'
,
os
.
path
.
join
(
frontend_user_dir
,
'user-ca-crt.pem'
),
'--user-crl'
,
os
.
path
.
join
(
frontend_user_dir
,
'user.crl'
),
]
cas_args
=
[
caucase_path
,
'--ca-url'
,
cls
.
frontend_caucased_url
,
'--ca-crt'
,
os
.
path
.
join
(
frontend_service_dir
,
'service-ca-crt.pem'
),
'--crl'
,
os
.
path
.
join
(
frontend_service_dir
,
'service.crl'
),
'--user-ca-crt'
,
os
.
path
.
join
(
frontend_service_dir
,
'user-ca-crt.pem'
),
'--user-crl'
,
os
.
path
.
join
(
frontend_service_dir
,
'user.crl'
),
]
caucase_process
=
subprocess
.
Popen
(
class
BalancerTestCase
(
ERP5InstanceTestCase
):
cau_args
+
[
'--mode'
,
'user'
,
'--send-csr'
,
frontend_user_csr
,
],
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
,
)
result
=
caucase_process
.
communicate
()
csr_id
=
result
[
0
].
split
()[
0
]
subprocess
.
check_call
(
@
classmethod
cau_args
+
[
def
getInstanceSoftwareType
(
cls
):
'--mode'
,
'user'
,
return
'balancer'
'--get-crt'
,
csr_id
,
frontend_user_key
,
],
@
classmethod
def
_getInstanceParameterDict
(
cls
):
# type: () -> Dict
return
{
'tcpv4-port'
:
8000
,
'computer-memory-percent-threshold'
:
100
,
# XXX what is this ? should probably not be needed here
'name'
:
cls
.
__name__
,
'monitor-passwd'
:
'secret'
,
'apachedex-configuration'
:
'--erp5-base +erp5 .*/VirtualHostRoot/erp5(/|
\
\
?|$) --base +other / --skip-user-agent Zabbix --error-detail --js-embed --quiet'
,
'apachedex-promise-threshold'
:
100
,
'haproxy-server-check-path'
:
'/'
,
'zope-family-dict'
:
{
'default'
:
[
'dummy_http_server'
],
},
'dummy_http_server'
:
[[
cls
.
getManagedResource
(
"backend_web_server"
,
EchoHTTPServer
).
netloc
,
1
,
False
]],
'backend-path-dict'
:
{
'default'
:
''
,
},
'ssl-authentication-dict'
:
{},
'ssl'
:
{
'caucase-url'
:
cls
.
getManagedResource
(
"caucase"
,
CaucaseService
).
url
,
}
}
@
classmethod
def
getInstanceParameterDict
(
cls
):
# type: () -> Dict
return
{
'_'
:
json
.
dumps
(
cls
.
_getInstanceParameterDict
())}
def
setUp
(
self
):
self
.
default_balancer_url
=
json
.
loads
(
self
.
computer_partition
.
getConnectionParameterDict
()[
'_'
])[
'default'
]
class
CaucaseClientCertificate
(
ManagedResource
):
"""A client certificate issued by a caucase services.
"""
ca_crt_file
=
None
# type: str
crl_file
=
None
# type: str
csr_file
=
None
# type: str
cert_file
=
None
# type: str
key_file
=
None
# type: str
def
open
(
self
):
# type: () -> None
self
.
tmpdir
=
tempfile
.
mkdtemp
()
self
.
ca_crt_file
=
os
.
path
.
join
(
self
.
tmpdir
,
'ca-crt.pem'
)
self
.
crl_file
=
os
.
path
.
join
(
self
.
tmpdir
,
'ca-crl.pem'
)
self
.
csr_file
=
os
.
path
.
join
(
self
.
tmpdir
,
'csr.pem'
)
self
.
cert_file
=
os
.
path
.
join
(
self
.
tmpdir
,
'crt.pem'
)
self
.
key_file
=
os
.
path
.
join
(
self
.
tmpdir
,
'key.pem'
)
def
close
(
self
):
# type: () -> None
shutil
.
rmtree
(
self
.
tmpdir
)
def
request
(
self
,
common_name
,
caucase
):
# type: (str, CaucaseService) -> None
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
software_release_root_path
=
os
.
path
.
join
(
self
.
_cls
.
slap
.
_software_root
,
hashlib
.
md5
(
self
.
_cls
.
getSoftwareURL
().
encode
()).
hexdigest
(),
)
)
caucase_path
=
os
.
path
.
join
(
software_release_root_path
,
'bin'
,
'caucase'
)
cls
.
client_certificate
=
frontend_service_key
=
os
.
path
.
join
(
frontend_service_dir
,
'crt.pem'
)
cas_args
=
[
frontend_service_csr
=
os
.
path
.
join
(
frontend_service_dir
,
'csr.pem'
)
caucase_path
,
'--ca-url'
,
caucase
.
url
,
'--ca-crt'
,
self
.
ca_crt_file
,
'--crl'
,
self
.
crl_file
,
]
key
=
rsa
.
generate_private_key
(
key
=
rsa
.
generate_private_key
(
public_exponent
=
65537
,
public_exponent
=
65537
,
key_size
=
2048
,
key_size
=
2048
,
backend
=
default_backend
()
backend
=
default_backend
()
)
)
with
open
(
frontend_service_key
,
'wb'
)
as
f
:
with
open
(
self
.
key_file
,
'wb'
)
as
f
:
f
.
write
(
key
.
private_bytes
(
f
.
write
(
encoding
=
serialization
.
Encoding
.
PEM
,
key
.
private_bytes
(
format
=
serialization
.
PrivateFormat
.
TraditionalOpenSSL
,
encoding
=
serialization
.
Encoding
.
PEM
,
encryption_algorithm
=
serialization
.
NoEncryption
(),
format
=
serialization
.
PrivateFormat
.
TraditionalOpenSSL
,
))
encryption_algorithm
=
serialization
.
NoEncryption
(),
))
csr
=
x509
.
CertificateSigningRequestBuilder
().
subject_name
(
x509
.
Name
([
x509
.
NameAttribute
(
NameOID
.
COMMON_NAME
,
u'service'
),
csr
=
x509
.
CertificateSigningRequestBuilder
().
subject_name
(
])).
sign
(
key
,
hashes
.
SHA256
(),
default_backend
())
x509
.
Name
([
with
open
(
frontend_service_csr
,
'wb'
)
as
f
:
x509
.
NameAttribute
(
NameOID
.
COMMON_NAME
,
common_name
,
),
])).
sign
(
key
,
hashes
.
SHA256
(),
default_backend
(),
)
with
open
(
self
.
csr_file
,
'wb'
)
as
f
:
f
.
write
(
csr
.
public_bytes
(
serialization
.
Encoding
.
PEM
))
f
.
write
(
csr
.
public_bytes
(
serialization
.
Encoding
.
PEM
))
c
aucase_process
=
subprocess
.
Popen
(
c
sr_id
=
subprocess
.
check_output
(
cas_args
+
[
cas_args
+
[
'--send-csr'
,
frontend_service_csr
,
'--send-csr'
,
self
.
csr_file
,
],
],
stdout
=
subprocess
.
PIPE
,
).
split
()[
0
]
stderr
=
subprocess
.
STDOUT
,
assert
csr_id
)
result
=
caucase_process
.
communicate
()
csr_id
=
result
[
0
].
split
()[
0
]
for
_
in
range
(
30
):
for
_
in
range
(
30
):
if
not
subprocess
.
call
(
if
not
subprocess
.
call
(
cas_args
+
[
cas_args
+
[
'--get-crt'
,
csr_id
,
frontend_service_key
,
'--get-crt'
,
csr_id
,
self
.
cert_file
,
],
],
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
,
)
==
0
:
)
==
0
:
break
break
else
:
else
:
time
.
sleep
(
1
)
time
.
sleep
(
1
)
else
:
else
:
raise
RuntimeError
(
'getting service certificate failed.'
)
raise
RuntimeError
(
'getting service certificate failed.'
)
with
open
(
self
.
cert_file
)
as
f
:
assert
'BEGIN CERTIFICATE'
in
f
.
read
()
# start a caucased and server certificate.
cls
.
backend_caucase_dir
=
tempfile
.
mkdtemp
()
backend_caucased_dir
=
os
.
path
.
join
(
cls
.
backend_caucase_dir
,
'caucased'
)
os
.
mkdir
(
backend_caucased_dir
)
backend_caucased_netloc
=
'%s:%s'
%
(
cls
.
_ipv4_address
,
findFreeTCPPort
(
cls
.
_ipv4_address
))
cls
.
backend_caucased_url
=
'http://'
+
backend_caucased_netloc
cls
.
backend_caucased_process
=
subprocess
.
Popen
(
[
caucased_path
,
'--db'
,
os
.
path
.
join
(
backend_caucased_dir
,
'caucase.sqlite'
),
'--server-key'
,
os
.
path
.
join
(
backend_caucased_dir
,
'server.key.pem'
),
'--netloc'
,
backend_caucased_netloc
,
'--service-auto-approve-count'
,
'1'
,
],
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
,
)
for
_
in
range
(
10
):
try
:
if
requests
.
get
(
cls
.
backend_caucased_url
).
status_code
==
200
:
break
except
Exception
:
pass
time
.
sleep
(
1
)
else
:
raise
RuntimeError
(
'caucased failed to start.'
)
super
(
TestFrontendXForwardedFor
,
cls
).
setUpClass
()
class
TestFrontendXForwardedFor
(
BalancerTestCase
):
__partition_reference__
=
'xff'
@
classmethod
@
classmethod
def
getInstanceParameterDict
(
cls
):
def
_getInstanceParameterDict
(
cls
):
return
{
# type: () -> Dict
'_'
:
json
.
dumps
({
frontend_caucase
=
cls
.
getManagedResource
(
'frontend_caucase'
,
CaucaseService
)
'tcpv4-port'
:
3306
,
certificate
=
cls
.
getManagedResource
(
'client_certificate'
,
CaucaseClientCertificate
)
'computer-memory-percent-threshold'
:
100
,
certificate
.
request
(
u'shared frontend'
,
frontend_caucase
)
# XXX what is this ? should probably not be needed here
'name'
:
cls
.
__name__
,
'monitor-passwd'
:
'secret'
,
'apachedex-configuration'
:
''
,
'apachedex-promise-threshold'
:
100
,
'haproxy-server-check-path'
:
'/'
,
'zope-family-dict'
:
{
'default'
:
[
'dummy_http_server'
],
'default-auth'
:
[
'dummy_http_server'
],
},
'dummy_http_server'
:
[[
cls
.
http_server_netloc
,
1
,
False
]],
'backend-path-dict'
:
{
'default'
:
'/'
,
'default-auth'
:
'/'
,
},
'ssl-authentication-dict'
:
{
'default'
:
False
,
'default-auth'
:
True
,
},
'ssl'
:
{
'caucase-url'
:
cls
.
backend_caucased_url
,
'frontend-caucase-url-list'
:
[
cls
.
frontend_caucased_url
],
},
})
}
@
classmethod
parameter_dict
=
super
(
TestFrontendXForwardedFor
,
cls
).
_getInstanceParameterDict
()
def
_cleanup
(
cls
,
snapshot_name
):
# add another "-auth" backend, that will have ssl-authentication enabled
if
cls
.
http_server_process
:
parameter_dict
[
'zope-family-dict'
][
'default-auth'
]
=
[
'dummy_http_server'
]
cls
.
http_server_process
.
terminate
()
parameter_dict
[
'backend-path-dict'
][
'default-auth'
]
=
'/'
if
cls
.
frontend_caucased_process
:
parameter_dict
[
'ssl-authentication-dict'
]
=
{
cls
.
frontend_caucased_process
.
terminate
()
'default'
:
False
,
if
cls
.
frontend_caucase_dir
:
'default-auth'
:
True
,
shutil
.
rmtree
(
cls
.
frontend_caucase_dir
)
}
if
cls
.
backend_caucased_process
:
parameter_dict
[
'ssl'
][
'frontend-caucase-url-list'
]
=
[
frontend_caucase
.
url
]
cls
.
backend_caucased_process
.
terminate
()
return
parameter_dict
if
cls
.
backend_caucase_dir
:
shutil
.
rmtree
(
cls
.
backend_caucase_dir
)
super
(
TestFrontendXForwardedFor
,
cls
).
_cleanup
(
snapshot_name
)
def
test_x_forwarded_for_added_when_verified_connection
(
self
):
def
test_x_forwarded_for_added_when_verified_connection
(
self
):
# type: () -> None
client_certificate
=
self
.
getManagedResource
(
'client_certificate'
,
CaucaseClientCertificate
)
for
backend
in
(
'default'
,
'default-auth'
):
for
backend
in
(
'default'
,
'default-auth'
):
balancer_url
=
json
.
loads
(
self
.
computer_partition
.
getConnectionParameterDict
()[
'_'
])[
backend
]
balancer_url
=
json
.
loads
(
self
.
computer_partition
.
getConnectionParameterDict
()[
'_'
])[
backend
]
result
=
requests
.
get
(
result
=
requests
.
get
(
balancer_url
,
balancer_url
,
headers
=
{
'X-Forwarded-For'
:
'1.2.3.4'
},
headers
=
{
'X-Forwarded-For'
:
'1.2.3.4'
},
cert
=
self
.
client_certificate
,
cert
=
(
client_certificate
.
cert_file
,
client_certificate
.
key_file
)
,
verify
=
False
,
verify
=
False
,
).
json
()
).
json
()
self
.
assertEqual
(
result
[
'Incoming Headers'
].
get
(
'x-forwarded-for'
).
split
(
', '
)[
0
],
'1.2.3.4'
)
self
.
assertEqual
(
result
[
'Incoming Headers'
].
get
(
'x-forwarded-for'
).
split
(
', '
)[
0
],
'1.2.3.4'
)
def
test_x_forwarded_for_stripped_when_not_verified_connection
(
self
):
def
test_x_forwarded_for_stripped_when_not_verified_connection
(
self
):
# type: () -> None
balancer_url
=
json
.
loads
(
self
.
computer_partition
.
getConnectionParameterDict
()[
'_'
])[
'default'
]
balancer_url
=
json
.
loads
(
self
.
computer_partition
.
getConnectionParameterDict
()[
'_'
])[
'default'
]
result
=
requests
.
get
(
result
=
requests
.
get
(
balancer_url
,
balancer_url
,
...
...
software/slapos-sr-testing/software.cfg
View file @
62ccd142
...
@@ -318,3 +318,4 @@ funcsigs = 1.0.2
...
@@ -318,3 +318,4 @@ funcsigs = 1.0.2
mysqlclient = 1.3.12
mysqlclient = 1.3.12
pexpect = 4.8.0
pexpect = 4.8.0
ptyprocess = 0.6.0
ptyprocess = 0.6.0
typing = 3.7.4.3
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment