Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Boxiang Sun
slapos
Commits
2a54030b
Commit
2a54030b
authored
Jan 24, 2025
by
Xavier Thompson
Browse files
Options
Browse Files
Download
Plain Diff
simplehttpserver: Allow disabling write access
See merge request
nexedi/slapos!1685
parents
5c1234dc
658becec
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
342 additions
and
135 deletions
+342
-135
slapos/recipe/simplehttpserver/__init__.py
slapos/recipe/simplehttpserver/__init__.py
+22
-31
slapos/recipe/simplehttpserver/simplehttpserver.py
slapos/recipe/simplehttpserver/simplehttpserver.py
+94
-62
slapos/test/recipe/test_simplehttpserver.py
slapos/test/recipe/test_simplehttpserver.py
+226
-42
No files found.
slapos/recipe/simplehttpserver/__init__.py
View file @
2a54030b
...
...
@@ -29,46 +29,37 @@ import string, random
import
os
from
six.moves
import
range
class
Recipe
(
GenericBaseRecipe
):
from
zc.buildout
import
UserError
from
zc.buildout.buildout
import
bool_option
def
__init__
(
self
,
buildout
,
name
,
options
):
base_path
=
options
[
'base-path'
]
if
options
.
get
(
'use-hash-url'
,
'True'
)
in
[
'true'
,
'True'
]:
pool
=
string
.
ascii_letters
+
string
.
digits
hash_string
=
''
.
join
(
random
.
choice
(
pool
)
for
i
in
range
(
64
)
)
path
=
os
.
path
.
join
(
base_path
,
hash_string
)
def
issubpathof
(
subpath
,
path
):
subpath
=
os
.
path
.
abspath
(
subpath
)
path
=
os
.
path
.
abspath
(
path
)
relpath
=
os
.
path
.
relpath
(
subpath
,
start
=
path
)
return
not
relpath
.
startswith
(
os
.
pardir
)
if
os
.
path
.
exists
(
base_path
):
path_list
=
os
.
listdir
(
base_path
)
if
len
(
path_list
)
==
1
:
hash_string
=
path_list
[
0
]
path
=
os
.
path
.
join
(
base_path
,
hash_string
)
elif
len
(
path_list
)
>
1
:
raise
ValueError
(
"Folder %s should contain 0 or 1 element."
%
base_path
)
options
[
'root-dir'
]
=
path
options
[
'path'
]
=
hash_string
else
:
options
[
'root-dir'
]
=
base_path
options
[
'path'
]
=
''
class
Recipe
(
GenericBaseRecipe
):
def
__init__
(
self
,
buildout
,
name
,
options
):
host
,
port
,
socketpath
,
abstract
=
(
options
.
get
(
k
)
for
k
in
(
'host'
,
'port'
,
'socketpath'
,
'abstract'
))
oneof
=
host
,
socketpath
,
abstract
if
sum
(
bool
(
v
)
for
v
in
oneof
)
!=
1
or
bool
(
host
)
!=
bool
(
port
):
raise
UserError
(
"Specify one of (host, port) | socketpath | abstract"
)
address
=
(
host
,
int
(
port
))
if
host
else
socketpath
or
'
\
0
'
+
abstract
options
[
'address'
]
=
address
return
GenericBaseRecipe
.
__init__
(
self
,
buildout
,
name
,
options
)
def
install
(
self
):
if
not
os
.
path
.
exists
(
self
.
options
[
'root-dir'
]):
os
.
mkdir
(
self
.
options
[
'root-dir'
]
)
parameters
=
{
'host'
:
self
.
options
[
'host'
],
'port'
:
int
(
self
.
options
[
'port'
]),
'address'
:
self
.
options
[
'address'
],
'cwd'
:
self
.
options
[
'base-path'
],
'log-file'
:
self
.
options
[
'log-file'
],
'cert-file'
:
self
.
options
.
get
(
'cert-file'
,
''
),
'key-file'
:
self
.
options
.
get
(
'key-file'
,
''
),
'
root-dir'
:
self
.
options
[
'root-dir'
]
'cert-file'
:
self
.
options
.
get
(
'cert-file'
),
'key-file'
:
self
.
options
.
get
(
'key-file'
),
'
allow-write'
:
bool_option
(
self
.
options
,
'allow-write'
,
'false'
)
}
return
self
.
createPythonScript
(
self
.
options
[
'wrapper'
].
strip
(),
__name__
+
'.simplehttpserver.run'
,
...
...
slapos/recipe/simplehttpserver/simplehttpserver.py
View file @
2a54030b
# -*- coding: utf-8 -*-
from
six.moves.SimpleHTTPServer
import
SimpleHTTPRequestHandler
from
six.moves.BaseHTTPServer
import
HTTPServer
import
ssl
import
os
from
six.moves.socketserver
import
TCPServer
import
cgi
import
contextlib
import
errno
import
logging
from
netaddr
import
valid_ipv4
,
valid_ipv6
import
os
import
ssl
import
socket
import
cgi
,
errno
from
slapos.util
import
str2bytes
from
.
import
issubpathof
class
ServerHandler
(
SimpleHTTPRequestHandler
):
base_path
=
None
# set by run
restrict_write
=
True
# set by run
_additional_logs
=
None
@
contextlib
.
contextmanager
def
_log_extra
(
self
,
msg
):
self
.
_additional_logs
=
msg
try
:
yield
finally
:
self
.
_additional_logs
=
None
def
_log
(
self
,
level
,
msg
,
*
args
):
if
self
.
_additional_logs
:
msg
+=
self
.
_additional_logs
logging
.
log
(
level
,
'%s - - '
+
msg
,
self
.
client_address
[
0
],
*
args
)
def
log_message
(
self
,
msg
,
*
args
):
self
.
_log
(
logging
.
INFO
,
msg
,
*
args
)
def
log_error
(
self
,
msg
,
*
args
):
self
.
_log
(
logging
.
ERROR
,
msg
,
*
args
)
document_path
=
''
restrict_root_folder
=
True
def
log_request
(
self
,
*
args
):
with
self
.
_log_extra
(
'
\
n
'
+
str
(
self
.
headers
)):
SimpleHTTPRequestHandler
.
log_request
(
self
,
*
args
)
def
respond
(
self
,
code
=
200
,
type
=
'text/html'
):
self
.
send_response
(
code
)
self
.
send_header
(
"Content-type"
,
type
)
self
.
end_headers
()
def
restricted
Root
Access
(
self
):
if
self
.
restrict_
root_folder
and
self
.
path
and
self
.
path
==
'/'
:
# no
access to root path
def
restricted
Write
Access
(
self
):
if
self
.
restrict_
write
and
self
.
command
not
in
(
'GET'
,
'HEAD'
)
:
# no
write access
self
.
respond
(
403
)
self
.
wfile
.
write
(
b"Forbidden"
)
return
True
return
False
def
do_GET
(
self
):
logging
.
info
(
'%s - GET: %s
\
n
%s'
%
(
self
.
client_address
[
0
],
self
.
path
,
self
.
headers
))
if
self
.
restrictedRootAccess
():
return
SimpleHTTPRequestHandler
.
do_GET
(
self
)
def
do_POST
(
self
):
"""Write to a file on the server.
...
...
@@ -45,8 +66,7 @@ class ServerHandler(SimpleHTTPRequestHandler):
request can be encoded as application/x-www-form-urlencoded or multipart/form-data
"""
logging
.
info
(
'%s - POST: %s
\
n
%s'
%
(
self
.
client_address
[
0
],
self
.
path
,
self
.
headers
))
if
self
.
restrictedRootAccess
():
if
self
.
restrictedWriteAccess
():
return
form
=
cgi
.
FieldStorage
(
...
...
@@ -67,64 +87,76 @@ class ServerHandler(SimpleHTTPRequestHandler):
file_open_mode
=
'wb'
if
(
'clear'
in
form
and
form
[
'clear'
].
value
in
(
'1'
,
b'1'
))
else
'ab'
self
.
writeFile
(
file_path
,
file_content
,
file_open_mode
)
self
.
respond
(
200
,
type
=
self
.
headers
[
'Content-Type'
])
self
.
wfile
.
write
(
b"Content written to %s"
%
str2bytes
(
file_path
))
def
writeFile
(
self
,
filename
,
content
,
method
=
'ab'
):
file_path
=
os
.
path
.
abspath
(
os
.
path
.
join
(
self
.
document_path
,
filename
))
if
not
file_path
.
startswith
(
self
.
document_path
):
file_path
=
os
.
path
.
abspath
(
os
.
path
.
join
(
self
.
base_path
,
filename
))
# Check writing there is allowed
if
not
issubpathof
(
file_path
,
self
.
base_path
):
self
.
respond
(
403
,
'text/plain'
)
self
.
wfile
.
write
(
b"Forbidden"
)
return
# Create missing directories if needed
try
:
os
.
makedirs
(
os
.
path
.
dirname
(
file_path
))
except
OSError
as
exception
:
if
exception
.
errno
!=
errno
.
EEXIST
:
logging
.
error
(
'Failed to create file in %s. The error is
\
n
%s'
%
(
file_path
,
str
(
exception
))
)
logging
.
info
(
'Writing recieved content to file %s'
%
file_path
)
self
.
log_error
(
'Failed to create file in %s. The error is
\
n
%s'
,
file_path
,
exception
)
# Write content to file
self
.
log_message
(
'Writing received content to file %s'
,
file_path
)
try
:
with
open
(
file_path
,
method
)
as
myfile
:
myfile
.
write
(
content
)
logging
.
info
(
'Done.'
)
self
.
log_message
(
'Done.'
)
except
IOError
as
e
:
logging
.
error
(
'Something happened while processing
\
'
writeFile
\
'
. The message is %s'
%
str
(
e
))
class
HTTPServerV6
(
HTTPServer
):
address_family
=
socket
.
AF_INET6
self
.
log_error
(
'Something happened while processing
\
'
writeFile
\
'
. The message is %s'
,
e
)
self
.
respond
(
200
,
type
=
self
.
headers
[
'Content-Type'
])
self
.
wfile
.
write
(
b"Content written to %s"
%
str2bytes
(
filename
))
def
run
(
args
):
# minimal web server. serves files relative to the current directory.
logging
.
basicConfig
(
format
=
"%(asctime)s %(levelname)s - %(message)s"
,
filename
=
args
[
'log-file'
],
level
=
logging
.
INFO
)
# minimal web server. serves files relative to the
# current directory.
logging
.
basicConfig
(
format
=
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
,
filename
=
args
[
'log-file'
]
,
level
=
logging
.
INFO
)
address
=
args
[
'address'
]
cwd
=
args
[
'cwd'
]
port
=
args
[
'port'
]
host
=
args
[
'host'
]
os
.
chdir
(
args
[
'cwd'
])
os
.
chdir
(
cwd
)
Handler
=
ServerHandler
Handler
.
document_path
=
args
[
'root-dir'
]
Handler
.
restrict_
root_folder
=
(
args
[
'root-dir'
]
!=
args
[
'cwd'
])
Handler
.
base_path
=
cwd
Handler
.
restrict_
write
=
not
args
[
'allow-write'
]
if
valid_ipv6
(
host
):
server
=
HTTPServerV6
else
:
server
=
HTTPServer
httpd
=
server
((
host
,
port
),
Handler
)
scheme
=
'http'
if
'cert-file'
in
args
and
'key-file'
in
args
and
\
os
.
path
.
exists
(
args
[
'cert-file'
])
and
os
.
path
.
exists
(
args
[
'key-file'
]):
scheme
=
'https'
httpd
.
socket
=
ssl
.
wrap_socket
(
httpd
.
socket
,
try
:
host
,
port
=
address
family
,
_
,
_
,
_
,
_
=
socket
.
getaddrinfo
(
host
,
port
)[
0
]
except
ValueError
:
family
=
socket
.
AF_UNIX
class
Server
(
TCPServer
):
allow_reuse_address
=
1
# for tests, HTTPServer in stdlib sets it too
address_family
=
family
httpd
=
Server
(
address
,
Handler
)
certfile
=
args
[
'cert-file'
]
if
certfile
:
# keyfile == None signifies key is in certfile
PROTOCOL_TLS_SERVER
=
getattr
(
ssl
,
'PROTOCOL_TLS_SERVER'
,
None
)
if
PROTOCOL_TLS_SERVER
:
sslcontext
=
ssl
.
SSLContext
(
PROTOCOL_TLS_SERVER
)
sslcontext
.
load_cert_chain
(
certfile
,
args
[
'key-file'
])
httpd
.
socket
=
sslcontext
.
wrap_socket
(
httpd
.
socket
,
server_side
=
True
)
else
:
# BBB Py2, Py<3.6
httpd
.
socket
=
ssl
.
wrap_socket
(
httpd
.
socket
,
server_side
=
True
,
certfile
=
args
[
'cert-file'
]
,
certfile
=
certfile
,
keyfile
=
args
[
'key-file'
])
logging
.
info
(
"Starting simple http server at %s
://%s:%s"
%
(
scheme
,
host
,
port
)
)
logging
.
info
(
"Starting simple http server at %s
"
,
address
)
httpd
.
serve_forever
()
slapos/test/recipe/test_simplehttpserver.py
View file @
2a54030b
import
errno
import
os
import
shutil
import
tempfile
import
unittest
import
socket
import
subprocess
import
time
import
warnings
from
six.moves.urllib
import
parse
as
urlparse
import
requests
...
...
@@ -11,6 +14,58 @@ import requests
from
slapos.recipe
import
simplehttpserver
from
slapos.test.utils
import
makeRecipe
CERTIFICATE_FOR_TEST
=
"""
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8/zt/ndbvsCXb
2Kf5CaYlSsngykwfeeekDSoYHqWrl/WltFbdz/yw1ggRZUXo0l1ueJrDWqQzZIAT
9YjoNkX3G21nEIzg9/aKqq1vqHKBH+JaaAt+m84GnErFDztnkiMUKWFKyFmseg0O
QtkYGw179bXfcXX2x18gz8aBmCkjBjKjfiQtYWs9sPU0grBl9rE+h1maRh2uQXnF
BTMKHJ6wNGyFgg0ATqrBiLRv+wxCnuCdGzJzkZ3ytKuhqkwEcEIsVHSSwAx+hdBR
3AUBl1jfwUukj8a4rf23RR3pvYIZiMEdalsuiLBKyjzCqSPo5VZzSWSiK5CTPspM
4bz9OXPHAgMBAAECggEAMQcg/y0J+em/GHXutSrsn9Xz4s13y96K2cLUfadNoOLt
xYuv0SDIU3NiamjUJt6TgDnnI/Bakj5q/0J9vod9xOmnisn/Ucjhev1luoZ/FcIY
rQ06liCC5LIcr1wRM//z+6H0bDrnEFglFOMAgEFcUSDfilRbnqX/pnpf63R2j2/0
ttsSI3/plJAJbGha01S9jLTRKqHWy0vV0XJUXWkg0BETJci0w4fJ1kdMmnELaq4L
kU8IZoHwbRq/RBudQoN4ceZjUnMFcVSQCFa+5coYEJvrYvcrLzA8E01435AGyHyv
DzkiYwIrAzfQYhNVKLXgXrMGclNk8k9SMISSpVq92QKBgQDtJZjWrKxz5iWheIe8
uGM2rQ7ZgtAO9pYhDNXwKvlxEEXQrtWrVT2KA02/rbyOGoB4D7hlJXlopSLenV3o
5d3lRXhIXHSz2Qzy5/opPK0rt6TBXKWZ3+AxV7lpzJReltgRSn6mg1bgB2D14GYa
1gfH1W2fVJ2B5LrB3dPOCJOC4wKBgQDMBbEBsUh1HSAN9tR9icZcgYD2/1aYVHVJ
bnGUR1xs1cQRHKZZn6y/BBy021YAGIbgbFb8YhJC5lCMmeLADho3W1XxYhe6ssiE
E4sbK4y+fD2MFvAe7Y//IB0KRmAzTG3tPyOjBMftAMwrGoXIo990BAFtrO8tTIeb
9XcUnd0MzQKBgA8jz1YlP/1GPDDK2R+bRfo/oisQxuetpngFscLbe4FUYKCqCMof
bwZYn6YVGWyZFIqVtlf+xHmB0XAU6+HqivgQL1WvUWQJ/2Ginb30OboIx2Pw3kGs
oUuFJjky7mX7i1/POba3u9whnHcWFG6yK1z+qzj41fVs/N9ToioNMh2xAoGAIAY4
rYpVVES5FlgLLJVmtHiDdMHJpumC637RhzPYVyEKwKDdn63HoMgVdXIEQsmWyj1X
PhBqy2N5e0hgZkMQbGYCzHvYO676eHjU2fPxCKlZw9aJ5GDnvGUfCdDYItU5YAcM
IfeLJjF82rs0CrVmSsCiNMPzWwnrM1jJU0wgOXUCgYEAzAu7kDTITpMBvtUxWapQ
c1YoE4CqCG6Kq+l65SDe+c9VCckEmVPEhHbPmTv28iUh9n5MipCi7tehok2YX/Cw
o8M12F9A9sOWTqrNylCgIjU0GCTBkA5LvYV786TYJgWPZ5Mwdkmq5Ifbf3Ti/uGk
z6Cids97LVTVrV4iAZ+alY0=
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIDXzCCAkegAwIBAgIUAXy1ly1SQ41kXIKV2orz+SghlrUwDQYJKoZIhvcNAQEL
BQAwPjELMAkGA1UEBhMCRVUxDzANBgNVBAoMBk5leGVkaTEeMBwGA1UEAwwVdGVz
dF9zaW1wbGVodHRwc2VydmVyMCAXDTI0MTIwMjE0MTkzNVoYDzIxMDcwMTIyMTQx
OTM1WjA+MQswCQYDVQQGEwJFVTEPMA0GA1UECgwGTmV4ZWRpMR4wHAYDVQQDDBV0
ZXN0X3NpbXBsZWh0dHBzZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQC8/zt/ndbvsCXb2Kf5CaYlSsngykwfeeekDSoYHqWrl/WltFbdz/yw1ggR
ZUXo0l1ueJrDWqQzZIAT9YjoNkX3G21nEIzg9/aKqq1vqHKBH+JaaAt+m84GnErF
DztnkiMUKWFKyFmseg0OQtkYGw179bXfcXX2x18gz8aBmCkjBjKjfiQtYWs9sPU0
grBl9rE+h1maRh2uQXnFBTMKHJ6wNGyFgg0ATqrBiLRv+wxCnuCdGzJzkZ3ytKuh
qkwEcEIsVHSSwAx+hdBR3AUBl1jfwUukj8a4rf23RR3pvYIZiMEdalsuiLBKyjzC
qSPo5VZzSWSiK5CTPspM4bz9OXPHAgMBAAGjUzBRMB0GA1UdDgQWBBQc1p1Qudnk
WcxOnVt+4zw+MpmOITAfBgNVHSMEGDAWgBQc1p1QudnkWcxOnVt+4zw+MpmOITAP
BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCjZfuToaybR8JqTQ1l
4MZ8BEzFlq6Ebn8x4shiWc3wiX5cd1RSF4iilpDv2yp9MNiTHXkMxNEnx9NMdK/+
0bNlBn6tcv5MLZynXQnT+keJ73iYFB0Og298NauEQPI9x5/gf3zVGKBJ7d/aOumR
VRUugFQLzwWj27Muh1rbdayh73gpuNm1ZF+HgwWy8vYc5XoLS3gZ+tlGX3Im0Agg
ug2Kng5JY+f1aC8ZWBtTTFa2k2QALD1dD+vzGsoKitUEarg1CMHO/f6VsAFTfJT3
NDI4ky4bVMpkq17t65YXf1QVgquOEPfAnkzn51/vPzvezOMzPYQbsQqMbc4jehZT
oxpd
-----END CERTIFICATE-----
"""
.
strip
()
class
SimpleHTTPServerTest
(
unittest
.
TestCase
):
process
=
None
...
...
@@ -21,55 +76,124 @@ class SimpleHTTPServerTest(unittest.TestCase):
self
.
install_dir
=
tempfile
.
mkdtemp
()
self
.
addCleanup
(
shutil
.
rmtree
,
self
.
install_dir
)
self
.
wrapper
=
os
.
path
.
join
(
self
.
install_dir
,
'server'
)
host
,
port
=
os
.
environ
[
'SLAPOS_TEST_IPV4'
],
9999
self
.
server_url
=
'http://{host}:{port}'
.
format
(
host
=
host
,
port
=
port
)
self
.
recipe
=
makeRecipe
(
simplehttpserver
.
Recipe
,
options
=
{
self
.
logfile
=
self
.
wrapper
+
'.log'
self
.
process
=
None
def
setUpRecipe
(
self
,
opt
=
None
):
opt
=
opt
or
{}
self
.
certfile
=
opt
.
get
(
'cert-file'
)
if
not
'socketpath'
in
opt
and
not
'abstract'
in
opt
:
opt
[
'host'
]
=
host
=
os
.
environ
[
'SLAPOS_TEST_IPV4'
]
opt
[
'port'
]
=
port
=
9999
scheme
=
'https'
if
self
.
certfile
else
'http'
self
.
server_url
=
scheme
+
'://{}:{}'
.
format
(
host
,
port
)
else
:
self
.
server_url
=
None
options
=
{
'base-path'
:
self
.
base_path
,
'host'
:
host
,
'port'
:
port
,
'log-file'
:
os
.
path
.
join
(
self
.
install_dir
,
'simplehttpserver.log'
),
'log-file'
:
self
.
logfile
,
'wrapper'
:
self
.
wrapper
,
},
}
options
.
update
(
opt
)
self
.
recipe
=
makeRecipe
(
simplehttpserver
.
Recipe
,
options
=
options
,
name
=
'simplehttpserver'
,
)
def
tearDown
(
self
):
if
self
.
process
:
self
.
process
.
terminate
()
self
.
process
.
wait
()
def
test_options
(
self
):
self
.
assertNotEqual
(
self
.
recipe
.
options
[
'path'
],
''
)
self
.
assertEqual
(
self
.
recipe
.
options
[
'root-dir'
],
os
.
path
.
join
(
self
.
base_path
,
self
.
recipe
.
options
[
'path'
],
))
def
test_install
(
self
):
def
startServer
(
self
):
self
.
assertEqual
(
self
.
recipe
.
install
(),
self
.
wrapper
)
self
.
process
=
subprocess
.
Popen
(
self
.
wrapper
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
universal_newlines
=
True
,
# BBB Py2, use text= in Py3
)
server_base_url
=
urlparse
.
urljoin
(
self
.
server_url
,
self
.
recipe
.
options
[
'path'
],
)
address
=
self
.
recipe
.
options
[
'address'
]
if
self
.
server_url
:
kwargs
=
{
'verify'
:
False
}
if
self
.
certfile
else
{}
def
check_connection
():
resp
=
requests
.
get
(
self
.
server_url
,
**
kwargs
)
self
.
assertIn
(
'Directory listing for /'
,
resp
.
text
)
ConnectionError
=
requests
.
exceptions
.
ConnectionError
cleanup
=
None
else
:
s
=
socket
.
socket
(
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
def
check_connection
():
s
.
connect
(
address
)
ConnectionError
=
socket
.
error
cleanup
=
lambda
:
s
.
close
()
try
:
for
i
in
range
(
16
):
try
:
resp
=
requests
.
get
(
server_base_url
)
check_connection
(
)
break
except
requests
.
exceptions
.
ConnectionError
:
except
ConnectionError
:
time
.
sleep
(
i
*
.
1
)
else
:
# Kill process in case it did not crash
# otherwise .communicate() may hang forever.
self
.
process
.
terminate
()
self
.
process
.
wait
()
with
open
(
self
.
logfile
)
as
f
:
log
=
f
.
read
()
self
.
fail
(
'server did not start.
\
n
out: %s error: %s'
%
self
.
process
.
communicate
())
self
.
assertIn
(
'Directory listing for /'
,
resp
.
text
)
"Server did not start
\
n
"
"out: %s
\
n
"
"err: %s
\
n
"
"log: %s"
%
(
self
.
process
.
communicate
()
+
(
log
,)))
finally
:
if
cleanup
:
cleanup
()
with
open
(
self
.
logfile
)
as
f
:
self
.
assertIn
(
"Starting simple http server at %s"
%
(
address
,),
f
.
read
())
return
self
.
server_url
def
tearDown
(
self
):
if
self
.
process
:
self
.
process
.
terminate
()
self
.
process
.
wait
()
self
.
process
.
communicate
()
# close pipes
self
.
process
=
None
def
write_should_fail
(
self
,
url
,
hack_path
,
hack_content
):
# post with multipart/form-data encoding
resp
=
requests
.
post
(
url
,
files
=
{
'path'
:
hack_path
,
'content'
:
hack_content
,
},
)
# First check for actual access to forbidden files
try
:
with
open
(
hack_path
)
as
f
:
content
=
f
.
read
()
if
content
==
hack_content
:
self
.
fail
(
content
)
self
.
fail
(
"%s should not have been created"
%
hack_path
)
except
IOError
as
e
:
if
e
.
errno
!=
errno
.
ENOENT
:
raise
# Now check for proper response
self
.
assertEqual
(
resp
.
status_code
,
requests
.
codes
.
forbidden
)
self
.
assertEqual
(
resp
.
text
,
'Forbidden'
)
def
test_write_outside_base_path_should_fail
(
self
):
self
.
setUpRecipe
({
'allow-write'
:
'true'
})
server_base_url
=
self
.
startServer
()
# A file outside the server's root directory
hack_path
=
os
.
path
.
join
(
self
.
install_dir
,
'forbidden'
,
'hack.txt'
)
hack_content
=
"You should not be able to write to hack.txt"
self
.
write_should_fail
(
server_base_url
,
hack_path
,
hack_content
)
self
.
assertFalse
(
os
.
path
.
exists
(
os
.
path
.
dirname
(
hack_path
)))
def
test_write
(
self
):
self
.
setUpRecipe
({
'allow-write'
:
'true'
})
server_base_url
=
self
.
startServer
()
# post with multipart/form-data encoding
resp
=
requests
.
post
(
...
...
@@ -81,15 +205,21 @@ class SimpleHTTPServerTest(unittest.TestCase):
)
self
.
assertEqual
(
resp
.
status_code
,
requests
.
codes
.
ok
)
self
.
assertEqual
(
resp
.
text
,
'Content written to hello-form-data.txt'
)
with
open
(
os
.
path
.
join
(
self
.
base_path
,
self
.
recipe
.
options
[
'path'
],
'hello-form-data.txt'
))
as
f
:
hello_form_file
=
os
.
path
.
join
(
self
.
base_path
,
'hello-form-data.txt'
)
with
open
(
hello_form_file
)
as
f
:
self
.
assertEqual
(
f
.
read
(),
'hello-form-data'
)
self
.
assertIn
(
'hello-form-data.txt'
,
requests
.
get
(
server_base_url
).
text
)
self
.
assertEqual
(
requests
.
get
(
server_base_url
+
'/hello-form-data.txt'
).
text
,
'hello-form-data'
)
# check GET and POST are logged
with
open
(
self
.
logfile
)
as
f
:
log
=
f
.
read
()
self
.
assertIn
(
'Writing received content to file '
+
hello_form_file
,
log
)
self
.
assertIn
(
'"POST / HTTP/1.1" 200 -'
,
log
)
self
.
assertIn
(
'"GET /hello-form-data.txt HTTP/1.1" 200 -'
,
log
)
# post as application/x-www-form-urlencoded
resp
=
requests
.
post
(
server_base_url
,
...
...
@@ -100,8 +230,7 @@ class SimpleHTTPServerTest(unittest.TestCase):
)
self
.
assertEqual
(
resp
.
status_code
,
requests
.
codes
.
ok
)
with
open
(
os
.
path
.
join
(
self
.
base_path
,
self
.
recipe
.
options
[
'path'
],
'hello-form-urlencoded.txt'
))
as
f
:
os
.
path
.
join
(
self
.
base_path
,
'hello-form-urlencoded.txt'
))
as
f
:
self
.
assertEqual
(
f
.
read
(),
'hello-form-urlencoded'
)
self
.
assertIn
(
'hello-form-urlencoded.txt'
,
requests
.
get
(
server_base_url
).
text
)
...
...
@@ -119,3 +248,58 @@ class SimpleHTTPServerTest(unittest.TestCase):
},
)
self
.
assertEqual
(
resp
.
status_code
,
requests
.
codes
.
forbidden
)
def
test_readonly
(
self
):
self
.
setUpRecipe
()
indexpath
=
os
.
path
.
join
(
self
.
base_path
,
'index.txt'
)
indexcontent
=
"This file is served statically and readonly"
with
open
(
indexpath
,
'w'
)
as
f
:
f
.
write
(
indexcontent
)
server_base_url
=
self
.
startServer
()
indexurl
=
os
.
path
.
join
(
server_base_url
,
'index.txt'
)
resp
=
requests
.
get
(
indexurl
)
self
.
assertEqual
(
resp
.
status_code
,
requests
.
codes
.
ok
)
self
.
assertEqual
(
resp
.
text
,
indexcontent
)
resp
=
requests
.
post
(
server_base_url
,
files
=
{
'path'
:
'index.txt'
,
'content'
:
'Not readonly after all'
,
},
)
self
.
assertEqual
(
resp
.
status_code
,
requests
.
codes
.
forbidden
)
with
open
(
indexpath
)
as
f
:
self
.
assertEqual
(
f
.
read
(),
indexcontent
)
def
test_socketpath
(
self
):
socketpath
=
os
.
path
.
join
(
self
.
install_dir
,
'http.sock'
)
self
.
setUpRecipe
({
'socketpath'
:
socketpath
})
self
.
assertEqual
(
socketpath
,
self
.
recipe
.
options
[
'address'
])
self
.
startServer
()
def
test_abstract
(
self
):
abstract
=
os
.
path
.
join
(
self
.
install_dir
,
'abstract.http'
)
self
.
setUpRecipe
({
'abstract'
:
abstract
})
self
.
assertEqual
(
'
\
0
'
+
abstract
,
self
.
recipe
.
options
[
'address'
])
self
.
startServer
()
def
test_tls_self_signed
(
self
):
certfile
=
os
.
path
.
join
(
self
.
install_dir
,
'cert.pem'
)
with
open
(
certfile
,
'w'
)
as
f
:
f
.
write
(
CERTIFICATE_FOR_TEST
)
self
.
setUpRecipe
({
'cert-file'
:
certfile
})
with
warnings
.
catch_warnings
():
warnings
.
simplefilter
(
"ignore"
)
# suppress verify=False warning
server_base_url
=
self
.
startServer
()
# Check self-signed certificate is not accepted without verify=False
self
.
assertRaises
(
requests
.
exceptions
.
ConnectionError
,
requests
.
get
,
server_base_url
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment