Commit f1f332cf authored by Xavier Thompson's avatar Xavier Thompson

simplehttpserver: Allow disabling write access

parent 83200a29
...@@ -29,16 +29,19 @@ import string, random ...@@ -29,16 +29,19 @@ import string, random
import os import os
from six.moves import range from six.moves import range
def is_true(s):
return bool(('false', 'true').index(s.lower()))
class Recipe(GenericBaseRecipe): class Recipe(GenericBaseRecipe):
def __init__(self, buildout, name, options): def __init__(self, buildout, name, options):
base_path = options['base-path'] base_path = options['base-path']
if options.get('use-hash-url', 'True') in ['true', 'True']: if is_true(options.get('use-hash-url', 'True')):
pool = string.ascii_letters + string.digits pool = string.ascii_letters + string.digits
hash_string = ''.join(random.choice(pool) for i in range(64)) hash_string = ''.join(random.choice(pool) for i in range(64))
path = os.path.join(base_path, hash_string) path = os.path.join(base_path, hash_string)
if os.path.exists(base_path): if os.path.exists(base_path):
path_list = os.listdir(base_path) path_list = os.listdir(base_path)
if len(path_list) == 1: if len(path_list) == 1:
...@@ -46,7 +49,7 @@ class Recipe(GenericBaseRecipe): ...@@ -46,7 +49,7 @@ class Recipe(GenericBaseRecipe):
path = os.path.join(base_path, hash_string) path = os.path.join(base_path, hash_string)
elif len(path_list) > 1: elif len(path_list) > 1:
raise ValueError("Folder %s should contain 0 or 1 element." % base_path) raise ValueError("Folder %s should contain 0 or 1 element." % base_path)
options['root-dir'] = path options['root-dir'] = path
options['path'] = hash_string options['path'] = hash_string
else: else:
...@@ -66,7 +69,8 @@ class Recipe(GenericBaseRecipe): ...@@ -66,7 +69,8 @@ class Recipe(GenericBaseRecipe):
'log-file': self.options['log-file'], 'log-file': self.options['log-file'],
'cert-file': self.options.get('cert-file', ''), 'cert-file': self.options.get('cert-file', ''),
'key-file': self.options.get('key-file', ''), 'key-file': self.options.get('key-file', ''),
'root-dir': self.options['root-dir'] 'root-dir': self.options['root-dir'],
'forbid-write': is_true(self.options.get('forbid-write', 'false'))
} }
return self.createPythonScript( return self.createPythonScript(
......
...@@ -15,23 +15,29 @@ class ServerHandler(SimpleHTTPRequestHandler): ...@@ -15,23 +15,29 @@ class ServerHandler(SimpleHTTPRequestHandler):
document_path = '' document_path = ''
restrict_root_folder = True restrict_root_folder = True
restrict_write = True
def respond(self, code=200, type='text/html'): def respond(self, code=200, type='text/html'):
self.send_response(code) self.send_response(code)
self.send_header("Content-type", type) self.send_header("Content-type", type)
self.end_headers() self.end_headers()
def restrictedRootAccess(self): def restrictedAccess(self):
if self.restrict_root_folder and self.path and self.path == '/': if self.restrict_root_folder and self.path and self.path == '/':
# no access to root path # no access to root path
self.respond(403) self.respond(403)
self.wfile.write(b"Forbidden") self.wfile.write(b"Forbidden")
return True return True
if self.restrict_write and self.command not in ('GET', 'HEAD'):
# no write access
self.respond(403)
self.wfile.write(b"Forbidden")
return True
return False return False
def do_GET(self): def do_GET(self):
logging.info('%s - GET: %s \n%s' % (self.client_address[0], self.path, self.headers)) logging.info('%s - GET: %s \n%s' % (self.client_address[0], self.path, self.headers))
if self.restrictedRootAccess(): if self.restrictedAccess():
return return
SimpleHTTPRequestHandler.do_GET(self) SimpleHTTPRequestHandler.do_GET(self)
...@@ -46,7 +52,7 @@ class ServerHandler(SimpleHTTPRequestHandler): ...@@ -46,7 +52,7 @@ class ServerHandler(SimpleHTTPRequestHandler):
request can be encoded as application/x-www-form-urlencoded or multipart/form-data request can be encoded as application/x-www-form-urlencoded or multipart/form-data
""" """
logging.info('%s - POST: %s \n%s' % (self.client_address[0], self.path, self.headers)) logging.info('%s - POST: %s \n%s' % (self.client_address[0], self.path, self.headers))
if self.restrictedRootAccess(): if self.restrictedAccess():
return return
form = cgi.FieldStorage( form = cgi.FieldStorage(
...@@ -83,7 +89,7 @@ class ServerHandler(SimpleHTTPRequestHandler): ...@@ -83,7 +89,7 @@ class ServerHandler(SimpleHTTPRequestHandler):
logging.error('Failed to create file in %s. The error is \n%s' % ( logging.error('Failed to create file in %s. The error is \n%s' % (
file_path, str(exception))) file_path, str(exception)))
logging.info('Writing recieved content to file %s' % file_path) logging.info('Writing received content to file %s' % file_path)
try: try:
with open(file_path, method) as myfile: with open(file_path, method) as myfile:
myfile.write(content) myfile.write(content)
...@@ -97,31 +103,32 @@ class HTTPServerV6(HTTPServer): ...@@ -97,31 +103,32 @@ class HTTPServerV6(HTTPServer):
def run(args): def run(args):
# minimal web server. serves files relative to the # minimal web server. serves files relative to the
# current directory. # current directory.
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
filename=args['log-file'] ,level=logging.INFO) filename=args['log-file'] ,level=logging.INFO)
port = args['port'] port = args['port']
host = args['host'] host = args['host']
os.chdir(args['cwd']) os.chdir(args['cwd'])
Handler = ServerHandler Handler = ServerHandler
Handler.document_path = args['root-dir'] Handler.document_path = args['root-dir']
Handler.restrict_root_folder = (args['root-dir'] != args['cwd']) Handler.restrict_root_folder = (args['root-dir'] != args['cwd'])
Handler.restrict_write = args['forbid-write']
if valid_ipv6(host): if valid_ipv6(host):
server = HTTPServerV6 server = HTTPServerV6
else: else:
server = HTTPServer server = HTTPServer
httpd = server((host, port), Handler) httpd = server((host, port), Handler)
scheme = 'http' scheme = 'http'
if 'cert-file' in args and 'key-file' in args and \ if 'cert-file' in args and 'key-file' in args and \
os.path.exists(args['cert-file']) and os.path.exists(args['key-file']): os.path.exists(args['cert-file']) and os.path.exists(args['key-file']):
scheme = 'https' scheme = 'https'
httpd.socket = ssl.wrap_socket (httpd.socket, httpd.socket = ssl.wrap_socket (httpd.socket,
server_side=True, server_side=True,
certfile=args['cert-file'], certfile=args['cert-file'],
keyfile=args['key-file']) keyfile=args['key-file'])
......
...@@ -21,35 +21,26 @@ class SimpleHTTPServerTest(unittest.TestCase): ...@@ -21,35 +21,26 @@ class SimpleHTTPServerTest(unittest.TestCase):
self.install_dir = tempfile.mkdtemp() self.install_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.install_dir) self.addCleanup(shutil.rmtree, self.install_dir)
self.wrapper = os.path.join(self.install_dir, 'server') self.wrapper = os.path.join(self.install_dir, 'server')
self.process = None
def setUpRecipe(self, opt=()):
host, port = os.environ['SLAPOS_TEST_IPV4'], 9999 host, port = os.environ['SLAPOS_TEST_IPV4'], 9999
self.server_url = 'http://{host}:{port}'.format(host=host, port=port) options = {
'base-path': self.base_path,
'host': host,
'port': port,
'log-file': os.path.join(self.install_dir, 'simplehttpserver.log'),
'wrapper': self.wrapper,
}
options.update(opt)
self.recipe = makeRecipe( self.recipe = makeRecipe(
simplehttpserver.Recipe, simplehttpserver.Recipe,
options={ options=options,
'base-path': self.base_path,
'host': host,
'port': port,
'log-file': os.path.join(self.install_dir, 'simplehttpserver.log'),
'wrapper': self.wrapper,
},
name='simplehttpserver', name='simplehttpserver',
) )
self.server_url = 'http://{host}:{port}'.format(host=host, port=port)
def tearDown(self): def startServer(self):
if self.process:
self.process.terminate()
self.process.wait()
def test_options(self):
self.assertNotEqual(self.recipe.options['path'], '')
self.assertEqual(
self.recipe.options['root-dir'],
os.path.join(
self.base_path,
self.recipe.options['path'],
))
def test_install(self):
self.assertEqual(self.recipe.install(), self.wrapper) self.assertEqual(self.recipe.install(), self.wrapper)
self.process = subprocess.Popen( self.process = subprocess.Popen(
self.wrapper, self.wrapper,
...@@ -70,6 +61,36 @@ class SimpleHTTPServerTest(unittest.TestCase): ...@@ -70,6 +61,36 @@ class SimpleHTTPServerTest(unittest.TestCase):
self.fail( self.fail(
'server did not start.\nout: %s error: %s' % self.process.communicate()) 'server did not start.\nout: %s error: %s' % self.process.communicate())
self.assertIn('Directory listing for /', resp.text) self.assertIn('Directory listing for /', resp.text)
return server_base_url
def tearDown(self):
if self.process:
self.process.terminate()
self.process.wait()
self.process.communicate() # close pipes
self.process = None
def test_options(self):
self.setUpRecipe()
self.assertNotEqual(self.recipe.options['path'], '')
self.assertEqual(
self.recipe.options['root-dir'],
os.path.join(
self.base_path,
self.recipe.options['path'],
))
def test_options_no_hash(self):
self.setUpRecipe({'use-hash-url': 'false'})
self.assertEqual(self.recipe.options['path'], '')
self.assertEqual(
self.recipe.options['root-dir'],
self.base_path
)
def test_install(self):
self.setUpRecipe()
server_base_url = self.startServer()
# post with multipart/form-data encoding # post with multipart/form-data encoding
resp = requests.post( resp = requests.post(
...@@ -119,3 +140,29 @@ class SimpleHTTPServerTest(unittest.TestCase): ...@@ -119,3 +140,29 @@ class SimpleHTTPServerTest(unittest.TestCase):
}, },
) )
self.assertEqual(resp.status_code, requests.codes.forbidden) self.assertEqual(resp.status_code, requests.codes.forbidden)
def test_readonly(self):
self.setUpRecipe({'forbid-write': 'true', 'use-hash-url': 'false'})
indexpath = os.path.join(self.base_path, 'index.txt')
indexcontent = "This file is served statically and readonly"
with open(indexpath, 'w') as f:
f.write(indexcontent)
server_base_url = self.startServer()
indexurl = os.path.join(server_base_url, 'index.txt')
resp = requests.get(indexurl)
self.assertEqual(resp.status_code, requests.codes.ok)
self.assertEqual(resp.text, indexcontent)
resp = requests.post(
server_base_url,
files={
'path': 'index.txt',
'content': 'Not readonly after all',
},
)
self.assertEqual(resp.status_code, requests.codes.forbidden)
with open(indexpath) as f:
self.assertEqual(f.read(), indexcontent)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment