Commit 11db6901 authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼

simplehttpserver: prevent overwriting files outside of document path

See merge request nexedi/slapos!906
parents eb59577a 9353852c
...@@ -40,6 +40,7 @@ extras_require = { ...@@ -40,6 +40,7 @@ extras_require = {
'jsonschema', 'jsonschema',
'mock', 'mock',
'testfixtures', 'testfixtures',
'requests',
), ),
} }
......
...@@ -35,7 +35,7 @@ class Recipe(GenericBaseRecipe): ...@@ -35,7 +35,7 @@ class Recipe(GenericBaseRecipe):
base_path = options['base-path'] base_path = options['base-path']
if options.get('use-hash-url', 'True') in ['true', 'True']: if options.get('use-hash-url', 'True') in ['true', 'True']:
pool = string.letters + string.digits pool = string.ascii_letters + string.digits
hash_string = ''.join(random.choice(pool) for i in range(64)) hash_string = ''.join(random.choice(pool) for i in range(64))
path = os.path.join(base_path, hash_string) path = os.path.join(base_path, hash_string)
......
...@@ -43,20 +43,23 @@ class ServerHandler(SimpleHTTPRequestHandler): ...@@ -43,20 +43,23 @@ class ServerHandler(SimpleHTTPRequestHandler):
form = cgi.FieldStorage( form = cgi.FieldStorage(
fp=self.rfile, fp=self.rfile,
headers=self.headers, headers=self.headers,
environ={'REQUEST_METHOD':'POST', environ={'REQUEST_METHOD': 'POST',
'CONTENT_TYPE':self.headers['Content-Type']} 'CONTENT_TYPE': self.headers['Content-Type']}
) )
name = form['path'].value name = form['path'].value.decode('utf-8')
content = form['content'].value content = form['content'].value
method = 'a' method = 'ab'
if 'clear' in form and form['clear'].value == '1': if 'clear' in form and form['clear'].value == '1':
method = 'w' method = 'wb'
self.writeFile(name, content, method) self.writeFile(name, content, method)
self.respond(200, type=self.headers['Content-Type']) self.respond(200, type=self.headers['Content-Type'])
self.wfile.write(b"Content written to %s" % str2bytes(name)) self.wfile.write(b"Content written to %s" % str2bytes(name))
def writeFile(self, filename, content, method='a'): def writeFile(self, filename, content, method='ab'):
file_path = os.path.join(self.document_path, filename) file_path = os.path.abspath(os.path.join(self.document_path, filename))
if not file_path.startswith(self.document_path):
self.respond(403, 'text/plain')
self.wfile.write(b"Forbidden")
try: try:
os.makedirs(os.path.dirname(file_path)) os.makedirs(os.path.dirname(file_path))
......
import os
import shutil
import tempfile
import unittest
import subprocess
import time
from six.moves.urllib import parse as urlparse
import requests
from slapos.recipe import simplehttpserver
from slapos.test.utils import makeRecipe
class SimpleHTTPServerTest(unittest.TestCase):
process = None
def setUp(self):
self.base_path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.base_path)
self.install_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.install_dir)
self.wrapper = os.path.join(self.install_dir, 'server')
host, port = os.environ['SLAPOS_TEST_IPV4'], 9999
self.server_url = 'http://{host}:{port}'.format(host=host, port=port)
self.recipe = makeRecipe(
simplehttpserver.Recipe,
options={
'base-path': self.base_path,
'host': host,
'port': port,
'log-file': os.path.join(self.install_dir, 'simplehttpserver.log'),
'wrapper': self.wrapper,
},
name='simplehttpserver',
)
def tearDown(self):
if self.process:
self.process.terminate()
self.process.wait()
def test_options(self):
self.assertNotEqual(self.recipe.options['path'], '')
self.assertEqual(
self.recipe.options['root-dir'],
os.path.join(
self.base_path,
self.recipe.options['path'],
))
def test_install(self):
self.assertEqual(self.recipe.install(), self.wrapper)
self.process = subprocess.Popen(
self.wrapper,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
server_base_url = urlparse.urljoin(
self.server_url,
self.recipe.options['path'],
)
for i in range(16):
try:
resp = requests.get(server_base_url)
break
except requests.exceptions.ConnectionError:
time.sleep(i * .1)
else:
self.fail(
'server did not start.\nout: %s error: %s' % self.process.communicate())
self.assertIn('Directory listing for /', resp.text)
resp = requests.post(
server_base_url,
files={
'path': 'hello.txt',
'content': b'hello',
},
)
self.assertEqual(resp.status_code, requests.codes.ok)
with open(
os.path.join(self.base_path, self.recipe.options['path'],
'hello.txt')) as f:
self.assertEqual(f.read(), 'hello')
self.assertIn('hello.txt', requests.get(server_base_url).text)
self.assertEqual(
requests.get(server_base_url + '/hello.txt').text, 'hello')
# incorrect paths are refused
for path in '/hello.txt', '../hello.txt':
resp = requests.post(
server_base_url,
files={
'path': path,
'content': b'hello',
},
)
self.assertEqual(resp.status_code, requests.codes.forbidden)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment