import unittest from ZPublisher.tests.testBaseRequest import TestRequestZope3ViewsBase from zope.testing.cleanup import cleanUp class RecordTests(unittest.TestCase): def test_repr(self): from ZPublisher.HTTPRequest import record rec = record() rec.a = 1 rec.b = 'foo' r = repr(rec) d = eval(r) self.assertEqual(d, rec.__dict__) class HTTPRequestFactoryMixin(object): def tearDown(self): cleanUp() def _getTargetClass(self): from ZPublisher.HTTPRequest import HTTPRequest return HTTPRequest def _makeOne(self, stdin=None, environ=None, response=None, clean=1): from StringIO import StringIO from ZPublisher.HTTPResponse import HTTPResponse if stdin is None: stdin = StringIO() if environ is None: environ = {} if 'REQUEST_METHOD' not in environ: environ['REQUEST_METHOD'] = 'GET' if 'SERVER_NAME' not in environ: environ['SERVER_NAME'] = 'http://localhost' if 'SERVER_PORT' not in environ: environ['SERVER_PORT'] = '8080' if response is None: response = HTTPResponse(stdout=StringIO()) return self._getTargetClass()(stdin, environ, response, clean) class HTTPRequestTests(unittest.TestCase, HTTPRequestFactoryMixin): def _processInputs(self, inputs): from urllib import quote_plus # Have the inputs processed, and return a HTTPRequest object holding the # result. # inputs is expected to be a list of (key, value) tuples, no CGI # encoding is required. query_string = [] add = query_string.append for key, val in inputs: add("%s=%s" % (quote_plus(key), quote_plus(val))) query_string = '&'.join(query_string) env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'} env['QUERY_STRING'] = query_string req = self._makeOne(environ=env) req.processInputs() self._noFormValuesInOther(req) return req def _noTaintedValues(self, req): self.assertFalse(req.taintedform.keys()) def _valueIsOrHoldsTainted(self, val): # Recursively searches a structure for a TaintedString and returns 1 # when one is found. # Also raises an Assertion if a string which *should* have been # tainted is found, or when a tainted string is not deemed dangerous. from ZPublisher.HTTPRequest import record from AccessControl.tainted import TaintedString retval = 0 if isinstance(val, TaintedString): self.assertFalse(not '<' in val, "%r is not dangerous, no taint required." % val) retval = 1 elif isinstance(val, record): for attr, value in val.__dict__.items(): rval = self._valueIsOrHoldsTainted(attr) if rval: retval = 1 rval = self._valueIsOrHoldsTainted(value) if rval: retval = 1 elif type(val) in (list, tuple): for entry in val: rval = self._valueIsOrHoldsTainted(entry) if rval: retval = 1 elif type(val) in (str, unicode): self.assertFalse('<' in val, "'%s' is dangerous and should have been tainted." % val) return retval def _noFormValuesInOther(self, req): for key in req.taintedform.keys(): self.assertFalse(req.other.has_key(key), 'REQUEST.other should not hold tainted values at first!') for key in req.form.keys(): self.assertFalse(req.other.has_key(key), 'REQUEST.other should not hold form values at first!') def _onlyTaintedformHoldsTaintedStrings(self, req): for key, val in req.taintedform.items(): self.assert_(self._valueIsOrHoldsTainted(key) or self._valueIsOrHoldsTainted(val), 'Tainted form holds item %s that is not tainted' % key) for key, val in req.form.items(): if req.taintedform.has_key(key): continue self.assertFalse(self._valueIsOrHoldsTainted(key) or self._valueIsOrHoldsTainted(val), 'Normal form holds item %s that is tainted' % key) def _taintedKeysAlsoInForm(self, req): for key in req.taintedform.keys(): self.assert_(req.form.has_key(key), "Found tainted %s not in form" % key) self.assertEquals(req.form[key], req.taintedform[key], "Key %s not correctly reproduced in tainted; expected %r, " "got %r" % (key, req.form[key], req.taintedform[key])) def test_no_docstring_on_instance(self): env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'} req = self._makeOne(environ=env) self.assertTrue(req.__doc__ is None) def test___bobo_traverse___raises(self): env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'} req = self._makeOne(environ=env) self.assertRaises(KeyError, req.__bobo_traverse__, 'REQUEST') self.assertRaises(KeyError, req.__bobo_traverse__, 'BODY') self.assertRaises(KeyError, req.__bobo_traverse__, 'BODYFILE') self.assertRaises(KeyError, req.__bobo_traverse__, 'RESPONSE') def test_processInputs_wo_query_string(self): env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'} req = self._makeOne(environ=env) req.processInputs() self._noFormValuesInOther(req) self.assertEquals(req.form, {}) def test_processInputs_wo_marshalling(self): inputs = ( ('foo', 'bar'), ('spam', 'eggs'), ('number', '1'), ('spacey key', 'val'), ('key', 'spacey val'), ('multi', '1'), ('multi', '2')) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['foo', 'key', 'multi', 'number', 'spacey key', 'spam']) self.assertEquals(req['number'], '1') self.assertEquals(req['multi'], ['1', '2']) self.assertEquals(req['spacey key'], 'val') self.assertEquals(req['key'], 'spacey val') self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_simple_marshalling(self): from DateTime.DateTime import DateTime inputs = ( ('num:int', '42'), ('fract:float', '4.2'), ('bign:long', '45'), ('words:string', 'Some words'), ('2tokens:tokens', 'one two'), ('aday:date', '2002/07/23'), ('accountedfor:required', 'yes'), ('multiline:lines', 'one\ntwo'), ('morewords:text', 'one\ntwo\n')) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['2tokens', 'accountedfor', 'aday', 'bign', 'fract', 'morewords', 'multiline', 'num', 'words']) self.assertEquals(req['2tokens'], ['one', 'two']) self.assertEquals(req['accountedfor'], 'yes') self.assertEquals(req['aday'], DateTime('2002/07/23')) self.assertEquals(req['bign'], 45L) self.assertEquals(req['fract'], 4.2) self.assertEquals(req['morewords'], 'one\ntwo\n') self.assertEquals(req['multiline'], ['one', 'two']) self.assertEquals(req['num'], 42) self.assertEquals(req['words'], 'Some words') self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_unicode_conversions(self): inputs = (('ustring:ustring:utf8', 'test\xc2\xae'), ('utext:utext:utf8', 'test\xc2\xae\ntest\xc2\xae\n'), ('utokens:utokens:utf8', 'test\xc2\xae test\xc2\xae'), ('ulines:ulines:utf8', 'test\xc2\xae\ntest\xc2\xae'), ('nouconverter:string:utf8', 'test\xc2\xae')) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['nouconverter', 'ulines', 'ustring', 'utext', 'utokens']) self.assertEquals(req['ustring'], u'test\u00AE') self.assertEquals(req['utext'], u'test\u00AE\ntest\u00AE\n') self.assertEquals(req['utokens'], [u'test\u00AE', u'test\u00AE']) self.assertEquals(req['ulines'], [u'test\u00AE', u'test\u00AE']) # expect a utf-8 encoded version self.assertEquals(req['nouconverter'], 'test\xc2\xae') self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_simple_containers(self): inputs = ( ('oneitem:list', 'one'), ('alist:list', 'one'), ('alist:list', 'two'), ('oneitemtuple:tuple', 'one'), ('atuple:tuple', 'one'), ('atuple:tuple', 'two'), ('onerec.foo:record', 'foo'), ('onerec.bar:record', 'bar'), ('setrec.foo:records', 'foo'), ('setrec.bar:records', 'bar'), ('setrec.foo:records', 'spam'), ('setrec.bar:records', 'eggs')) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['alist', 'atuple', 'oneitem', 'oneitemtuple', 'onerec', 'setrec']) self.assertEquals(req['oneitem'], ['one']) self.assertEquals(req['oneitemtuple'], ('one',)) self.assertEquals(req['alist'], ['one', 'two']) self.assertEquals(req['atuple'], ('one', 'two')) self.assertEquals(req['onerec'].foo, 'foo') self.assertEquals(req['onerec'].bar, 'bar') self.assertEquals(len(req['setrec']), 2) self.assertEquals(req['setrec'][0].foo, 'foo') self.assertEquals(req['setrec'][0].bar, 'bar') self.assertEquals(req['setrec'][1].foo, 'spam') self.assertEquals(req['setrec'][1].bar, 'eggs') self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_marshalling_into_sequences(self): inputs = ( ('ilist:int:list', '1'), ('ilist:int:list', '2'), ('ilist:list:int', '3'), ('ftuple:float:tuple', '1.0'), ('ftuple:float:tuple', '1.1'), ('ftuple:tuple:float', '1.2'), ('tlist:tokens:list', 'one two'), ('tlist:list:tokens', '3 4')) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['ftuple', 'ilist', 'tlist']) self.assertEquals(req['ilist'], [1, 2, 3]) self.assertEquals(req['ftuple'], (1.0, 1.1, 1.2)) self.assertEquals(req['tlist'], [['one', 'two'], ['3', '4']]) self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_records_w_sequences(self): inputs = ( ('onerec.name:record', 'foo'), ('onerec.tokens:tokens:record', 'one two'), ('onerec.ints:int:record', '1'), ('onerec.ints:int:record', '2'), ('setrec.name:records', 'first'), ('setrec.ilist:list:int:records', '1'), ('setrec.ilist:list:int:records', '2'), ('setrec.ituple:tuple:int:records', '1'), ('setrec.ituple:tuple:int:records', '2'), ('setrec.name:records', 'second'), ('setrec.ilist:list:int:records', '1'), ('setrec.ilist:list:int:records', '2'), ('setrec.ituple:tuple:int:records', '1'), ('setrec.ituple:tuple:int:records', '2')) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['onerec', 'setrec']) self.assertEquals(req['onerec'].name, 'foo') self.assertEquals(req['onerec'].tokens, ['one', 'two']) # Implicit sequences and records don't mix. self.assertEquals(req['onerec'].ints, 2) self.assertEquals(len(req['setrec']), 2) self.assertEquals(req['setrec'][0].name, 'first') self.assertEquals(req['setrec'][1].name, 'second') for i in range(2): self.assertEquals(req['setrec'][i].ilist, [1, 2]) self.assertEquals(req['setrec'][i].ituple, (1, 2)) self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_defaults(self): inputs = ( ('foo:default:int', '5'), ('alist:int:default', '3'), ('alist:int:default', '4'), ('alist:int:default', '5'), ('alist:int', '1'), ('alist:int', '2'), ('explicitlist:int:list:default', '3'), ('explicitlist:int:list:default', '4'), ('explicitlist:int:list:default', '5'), ('explicitlist:int:list', '1'), ('explicitlist:int:list', '2'), ('bar.spam:record:default', 'eggs'), ('bar.foo:record:default', 'foo'), ('bar.foo:record', 'baz'), ('setrec.spam:records:default', 'eggs'), ('setrec.foo:records:default', 'foo'), ('setrec.foo:records', 'baz'), ('setrec.foo:records', 'ham'), ) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['alist', 'bar', 'explicitlist', 'foo', 'setrec']) self.assertEquals(req['alist'], [1, 2, 3, 4, 5]) self.assertEquals(req['explicitlist'], [1, 2, 3, 4, 5]) self.assertEquals(req['foo'], 5) self.assertEquals(req['bar'].spam, 'eggs') self.assertEquals(req['bar'].foo, 'baz') self.assertEquals(len(req['setrec']), 2) self.assertEquals(req['setrec'][0].spam, 'eggs') self.assertEquals(req['setrec'][0].foo, 'baz') self.assertEquals(req['setrec'][1].spam, 'eggs') self.assertEquals(req['setrec'][1].foo, 'ham') self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_wo_marshalling_w_Taints(self): inputs = ( ('foo', 'bar'), ('spam', 'eggs'), ('number', '1'), ('tainted', '<tainted value>'), ('<tainted key>', 'value'), ('spacey key', 'val'), ('key', 'spacey val'), ('tinitmulti', '<1>'), ('tinitmulti', '2'), ('tdefermulti', '1'), ('tdefermulti', '<2>'), ('tallmulti', '<1>'), ('tallmulti', '<2>')) req = self._processInputs(inputs) taintedformkeys = list(req.taintedform.keys()) taintedformkeys.sort() self.assertEquals(taintedformkeys, ['<tainted key>', 'tainted', 'tallmulti', 'tdefermulti', 'tinitmulti']) self._taintedKeysAlsoInForm(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_simple_marshalling_w_taints(self): inputs = ( ('<tnum>:int', '42'), ('<tfract>:float', '4.2'), ('<tbign>:long', '45'), ('twords:string', 'Some <words>'), ('t2tokens:tokens', 'one <two>'), ('<taday>:date', '2002/07/23'), ('taccountedfor:required', '<yes>'), ('tmultiline:lines', '<one\ntwo>'), ('tmorewords:text', '<one\ntwo>\n')) req = self._processInputs(inputs) taintedformkeys = list(req.taintedform.keys()) taintedformkeys.sort() self.assertEquals(taintedformkeys, ['<taday>', '<tbign>', '<tfract>', '<tnum>', 't2tokens', 'taccountedfor', 'tmorewords', 'tmultiline', 'twords']) self._taintedKeysAlsoInForm(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_unicode_w_taints(self): inputs = (('tustring:ustring:utf8', '<test\xc2\xae>'), ('tutext:utext:utf8', '<test\xc2\xae>\n<test\xc2\xae\n>'), ('tinitutokens:utokens:utf8', '<test\xc2\xae> test\xc2\xae'), ('tinitulines:ulines:utf8', '<test\xc2\xae>\ntest\xc2\xae'), ('tdeferutokens:utokens:utf8', 'test\xc2\xae <test\xc2\xae>'), ('tdeferulines:ulines:utf8', 'test\xc2\xae\n<test\xc2\xae>'), ('tnouconverter:string:utf8', '<test\xc2\xae>')) req = self._processInputs(inputs) taintedformkeys = list(req.taintedform.keys()) taintedformkeys.sort() self.assertEquals(taintedformkeys, ['tdeferulines', 'tdeferutokens', 'tinitulines', 'tinitutokens', 'tnouconverter', 'tustring', 'tutext']) self._taintedKeysAlsoInForm(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_simple_containers_w_taints(self): inputs = ( ('toneitem:list', '<one>'), ('<tkeyoneitem>:list', 'one'), ('tinitalist:list', '<one>'), ('tinitalist:list', 'two'), ('tdeferalist:list', 'one'), ('tdeferalist:list', '<two>'), ('toneitemtuple:tuple', '<one>'), ('tinitatuple:tuple', '<one>'), ('tinitatuple:tuple', 'two'), ('tdeferatuple:tuple', 'one'), ('tdeferatuple:tuple', '<two>'), ('tinitonerec.foo:record', '<foo>'), ('tinitonerec.bar:record', 'bar'), ('tdeferonerec.foo:record', 'foo'), ('tdeferonerec.bar:record', '<bar>'), ('tinitinitsetrec.foo:records', '<foo>'), ('tinitinitsetrec.bar:records', 'bar'), ('tinitinitsetrec.foo:records', 'spam'), ('tinitinitsetrec.bar:records', 'eggs'), ('tinitdefersetrec.foo:records', 'foo'), ('tinitdefersetrec.bar:records', '<bar>'), ('tinitdefersetrec.foo:records', 'spam'), ('tinitdefersetrec.bar:records', 'eggs'), ('tdeferinitsetrec.foo:records', 'foo'), ('tdeferinitsetrec.bar:records', 'bar'), ('tdeferinitsetrec.foo:records', '<spam>'), ('tdeferinitsetrec.bar:records', 'eggs'), ('tdeferdefersetrec.foo:records', 'foo'), ('tdeferdefersetrec.bar:records', 'bar'), ('tdeferdefersetrec.foo:records', 'spam'), ('tdeferdefersetrec.bar:records', '<eggs>')) req = self._processInputs(inputs) taintedformkeys = list(req.taintedform.keys()) taintedformkeys.sort() self.assertEquals(taintedformkeys, ['<tkeyoneitem>', 'tdeferalist', 'tdeferatuple', 'tdeferdefersetrec', 'tdeferinitsetrec', 'tdeferonerec', 'tinitalist', 'tinitatuple', 'tinitdefersetrec', 'tinitinitsetrec', 'tinitonerec', 'toneitem', 'toneitemtuple']) self._taintedKeysAlsoInForm(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_records_w_sequences_tainted(self): inputs = ( ('tinitonerec.tokens:tokens:record', '<one> two'), ('tdeferonerec.tokens:tokens:record', 'one <two>'), ('tinitsetrec.name:records', 'first'), ('tinitsetrec.ilist:list:records', '<1>'), ('tinitsetrec.ilist:list:records', '2'), ('tinitsetrec.ituple:tuple:int:records', '1'), ('tinitsetrec.ituple:tuple:int:records', '2'), ('tinitsetrec.name:records', 'second'), ('tinitsetrec.ilist:list:records', '1'), ('tinitsetrec.ilist:list:records', '2'), ('tinitsetrec.ituple:tuple:int:records', '1'), ('tinitsetrec.ituple:tuple:int:records', '2'), ('tdeferfirstsetrec.name:records', 'first'), ('tdeferfirstsetrec.ilist:list:records', '1'), ('tdeferfirstsetrec.ilist:list:records', '<2>'), ('tdeferfirstsetrec.ituple:tuple:int:records', '1'), ('tdeferfirstsetrec.ituple:tuple:int:records', '2'), ('tdeferfirstsetrec.name:records', 'second'), ('tdeferfirstsetrec.ilist:list:records', '1'), ('tdeferfirstsetrec.ilist:list:records', '2'), ('tdeferfirstsetrec.ituple:tuple:int:records', '1'), ('tdeferfirstsetrec.ituple:tuple:int:records', '2'), ('tdefersecondsetrec.name:records', 'first'), ('tdefersecondsetrec.ilist:list:records', '1'), ('tdefersecondsetrec.ilist:list:records', '2'), ('tdefersecondsetrec.ituple:tuple:int:records', '1'), ('tdefersecondsetrec.ituple:tuple:int:records', '2'), ('tdefersecondsetrec.name:records', 'second'), ('tdefersecondsetrec.ilist:list:records', '1'), ('tdefersecondsetrec.ilist:list:records', '<2>'), ('tdefersecondsetrec.ituple:tuple:int:records', '1'), ('tdefersecondsetrec.ituple:tuple:int:records', '2'), ) req = self._processInputs(inputs) taintedformkeys = list(req.taintedform.keys()) taintedformkeys.sort() self.assertEquals(taintedformkeys, ['tdeferfirstsetrec', 'tdeferonerec', 'tdefersecondsetrec', 'tinitonerec', 'tinitsetrec']) self._taintedKeysAlsoInForm(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_defaults_w_taints(self): inputs = ( ('tfoo:default', '<5>'), ('doesnnotapply:default', '<4>'), ('doesnnotapply', '4'), ('tinitlist:default', '3'), ('tinitlist:default', '4'), ('tinitlist:default', '5'), ('tinitlist', '<1>'), ('tinitlist', '2'), ('tdeferlist:default', '3'), ('tdeferlist:default', '<4>'), ('tdeferlist:default', '5'), ('tdeferlist', '1'), ('tdeferlist', '2'), ('tinitbar.spam:record:default', 'eggs'), ('tinitbar.foo:record:default', 'foo'), ('tinitbar.foo:record', '<baz>'), ('tdeferbar.spam:record:default', '<eggs>'), ('tdeferbar.foo:record:default', 'foo'), ('tdeferbar.foo:record', 'baz'), ('rdoesnotapply.spam:record:default', '<eggs>'), ('rdoesnotapply.spam:record', 'eggs'), ('tinitsetrec.spam:records:default', 'eggs'), ('tinitsetrec.foo:records:default', 'foo'), ('tinitsetrec.foo:records', '<baz>'), ('tinitsetrec.foo:records', 'ham'), ('tdefersetrec.spam:records:default', '<eggs>'), ('tdefersetrec.foo:records:default', 'foo'), ('tdefersetrec.foo:records', 'baz'), ('tdefersetrec.foo:records', 'ham'), ('srdoesnotapply.foo:records:default', '<eggs>'), ('srdoesnotapply.foo:records', 'baz'), ('srdoesnotapply.foo:records', 'ham')) req = self._processInputs(inputs) taintedformkeys = list(req.taintedform.keys()) taintedformkeys.sort() self.assertEquals(taintedformkeys, ['tdeferbar', 'tdeferlist', 'tdefersetrec', 'tfoo', 'tinitbar', 'tinitlist', 'tinitsetrec']) self._taintedKeysAlsoInForm(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_tainted_attribute_raises(self): input = ('taintedattr.here<be<taint:record', 'value',) self.assertRaises(ValueError, self._processInputs, input) def test_processInputs_w_tainted_values_cleans_exceptions(self): # Feed tainted garbage to the conversion methods, and any exception # returned should be HTML safe from DateTime.interfaces import SyntaxError from ZPublisher.Converters import type_converters for type, convert in type_converters.items(): try: convert('<html garbage>') except Exception as e: self.assertFalse('<' in e.args, '%s converter does not quote unsafe value!' % type) except SyntaxError as e: self.assertFalse('<' in e, '%s converter does not quote unsafe value!' % type) def test_processInputs_w_dotted_name_as_tuple(self): # Collector #500 inputs = ( ('name.:tuple', 'name with dot as tuple'),) req = self._processInputs(inputs) formkeys = list(req.form.keys()) formkeys.sort() self.assertEquals(formkeys, ['name.']) self.assertEquals(req['name.'], ('name with dot as tuple',)) self._noTaintedValues(req) self._onlyTaintedformHoldsTaintedStrings(req) def test_processInputs_w_cookie_parsing(self): env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'} env['HTTP_COOKIE'] = 'foo=bar; baz=gee' req = self._makeOne(environ=env) self.assertEquals(req.cookies['foo'], 'bar') self.assertEquals(req.cookies['baz'], 'gee') env['HTTP_COOKIE'] = 'foo=bar; baz="gee, like, e=mc^2"' req = self._makeOne(environ=env) self.assertEquals(req.cookies['foo'], 'bar') self.assertEquals(req.cookies['baz'], 'gee, like, e=mc^2') # Collector #1498: empty cookies env['HTTP_COOKIE'] = 'foo=bar; hmm; baz=gee' req = self._makeOne(environ=env) self.assertEquals(req.cookies['foo'], 'bar') self.assertEquals(req.cookies['hmm'], '') self.assertEquals(req.cookies['baz'], 'gee') # Unquoted multi-space cookies env['HTTP_COOKIE'] = 'single=cookie data; ' \ 'quoted="cookie data with unquoted spaces"; ' \ 'multi=cookie data with unquoted spaces; ' \ 'multi2=cookie data with unquoted spaces' req = self._makeOne(environ=env) self.assertEquals(req.cookies['single'], 'cookie data') self.assertEquals(req.cookies['quoted'], 'cookie data with unquoted spaces') self.assertEquals(req.cookies['multi'], 'cookie data with unquoted spaces') self.assertEquals(req.cookies['multi2'], 'cookie data with unquoted spaces') def test_postProcessInputs(self): from ZPublisher.HTTPRequest import default_encoding _NON_ASCII = u'\xc4\xd6\xdc' req = self._makeOne() req.form = {'foo': _NON_ASCII.encode(default_encoding), 'foo_list': [_NON_ASCII.encode(default_encoding), 'SPAM'], 'foo_tuple': (_NON_ASCII.encode(default_encoding), 'HAM'), 'foo_dict': {'foo': _NON_ASCII, 'bar': 'EGGS'}} req.postProcessInputs() self.assertTrue(isinstance(req.form['foo'], unicode)) self.assertEqual(req.form['foo'], _NON_ASCII) self.assertTrue(isinstance(req.form['foo_list'], list)) self.assertTrue(isinstance(req.form['foo_list'][0], unicode)) self.assertEqual(req.form['foo_list'][0], _NON_ASCII) self.assertTrue(isinstance(req.form['foo_list'][1], unicode)) self.assertEqual(req.form['foo_list'][1], u'SPAM') self.assertTrue(isinstance(req.form['foo_tuple'], tuple)) self.assertTrue(isinstance(req.form['foo_tuple'][0], unicode)) self.assertEqual(req.form['foo_tuple'][0], _NON_ASCII) self.assertTrue(isinstance(req.form['foo_tuple'][1], unicode)) self.assertEqual(req.form['foo_tuple'][1], u'HAM') self.assertTrue(isinstance(req.form['foo_dict'], dict)) self.assertTrue(isinstance(req.form['foo_dict']['foo'], unicode)) self.assertEqual(req.form['foo_dict']['foo'], _NON_ASCII) self.assertTrue(isinstance(req.form['foo_dict']['bar'], unicode)) self.assertEqual(req.form['foo_dict']['bar'], u'EGGS') def test_close_removes_stdin_references(self): # Verifies that all references to the input stream go away on # request.close(). Otherwise a tempfile may stick around. import sys from StringIO import StringIO s = StringIO(TEST_FILE_DATA) start_count = sys.getrefcount(s) req = self._makeOne(stdin=s, environ=TEST_ENVIRON.copy()) req.processInputs() self.assertNotEqual(start_count, sys.getrefcount(s)) # Precondition req.close() self.assertEqual(start_count, sys.getrefcount(s)) # The test def test_processInputs_w_large_input_gets_tempfile(self): # checks fileupload object supports the filename from StringIO import StringIO s = StringIO(TEST_LARGEFILE_DATA) req = self._makeOne(stdin=s, environ=TEST_ENVIRON.copy()) req.processInputs() f = req.form.get('file') self.assert_(f.name) def test_processInputs_with_file_upload_gets_iterator(self): # checks fileupload object supports the iterator protocol # collector entry 1837 from StringIO import StringIO s = StringIO(TEST_FILE_DATA) req = self._makeOne(stdin=s, environ=TEST_ENVIRON.copy()) req.processInputs() f=req.form.get('file') self.assertEqual(list(f),['test\n']) f.seek(0) self.assertEqual(f.next(),'test\n') f.seek(0) self.assertEqual(f.xreadlines(),f) def test__authUserPW_simple( self ): import base64 user_id = 'user' password = 'password' encoded = base64.encodestring( '%s:%s' % ( user_id, password ) ) auth_header = 'basic %s' % encoded environ = { 'HTTP_AUTHORIZATION': auth_header } request = self._makeOne( environ=environ ) user_id_x, password_x = request._authUserPW() self.assertEqual( user_id_x, user_id ) self.assertEqual( password_x, password ) def test__authUserPW_with_embedded_colon( self ): # http://www.zope.org/Collectors/Zope/2039 import base64 user_id = 'user' password = 'embedded:colon' encoded = base64.encodestring( '%s:%s' % ( user_id, password ) ) auth_header = 'basic %s' % encoded environ = { 'HTTP_AUTHORIZATION': auth_header } request = self._makeOne( environ=environ ) user_id_x, password_x = request._authUserPW() self.assertEqual( user_id_x, user_id ) self.assertEqual( password_x, password ) def test_debug_not_in_qs_still_gets_attr(self): from zope.publisher.base import DebugFlags # when accessing request.debug we will see the DebugFlags instance request = self._makeOne() self.assert_(isinstance(request.debug, DebugFlags)) # It won't be available through dictonary lookup, though self.assert_(request.get('debug') is None) def test_debug_in_qs_gets_form_var(self): env = {'QUERY_STRING': 'debug=1'} # request.debug will actually yield a 'debug' form variable # if it exists request = self._makeOne(environ=env) request.processInputs() self.assertEqual(request.debug, '1') self.assertEqual(request.get('debug'), '1') self.assertEqual(request['debug'], '1') # we can still override request.debug with a form variable or directly def test_debug_override_via_form_other(self): request = self._makeOne() request.processInputs() request.form['debug'] = '1' self.assertEqual(request.debug, '1') request['debug'] = '2' self.assertEqual(request.debug, '2') def test_interfaces(self): from zope.publisher.interfaces.browser import IBrowserRequest from zope.interface.verify import verifyClass klass = self._getTargetClass() # TODO # verifyClass(IBrowserRequest, klass) def test_locale_property_accessor(self): from zope.component import provideAdapter from zope.publisher.browser import BrowserLanguages from zope.publisher.interfaces.http import IHTTPRequest from zope.i18n.interfaces import IUserPreferredLanguages from zope.i18n.interfaces.locales import ILocale from ZPublisher.HTTPRequest import _marker provideAdapter(BrowserLanguages, [IHTTPRequest], IUserPreferredLanguages) env = {'HTTP_ACCEPT_LANGUAGE': 'en'} request = self._makeOne(environ=env) # before accessing request.locale for the first time, request._locale # is still a marker self.assert_(request._locale is _marker) # when accessing request.locale we will see an ILocale self.assert_(ILocale.providedBy(request.locale)) # and request._locale has been set self.assert_(request._locale is request.locale) # It won't be available through dictonary lookup, though self.assert_(request.get('locale') is None) def test_locale_in_qs(self): from zope.component import provideAdapter from zope.publisher.browser import BrowserLanguages from zope.publisher.interfaces.http import IHTTPRequest from zope.i18n.interfaces import IUserPreferredLanguages provideAdapter(BrowserLanguages, [IHTTPRequest], IUserPreferredLanguages) # request.locale will actually yield a 'locale' form variable # if it exists env = {'HTTP_ACCEPT_LANGUAGE': 'en', 'QUERY_STRING': 'locale=1'} request = self._makeOne(environ=env) request.processInputs() self.assertEqual(request.locale, '1') self.assertEqual(request.get('locale'), '1') self.assertEqual(request['locale'], '1') def test_locale_property_override_via_form_other(self): from zope.component import provideAdapter from zope.publisher.browser import BrowserLanguages from zope.publisher.interfaces.http import IHTTPRequest from zope.i18n.interfaces import IUserPreferredLanguages from zope.i18n.interfaces.locales import ILocale provideAdapter(BrowserLanguages, [IHTTPRequest], IUserPreferredLanguages) env = {'HTTP_ACCEPT_LANGUAGE': 'en'} # we can still override request.locale with a form variable request = self._makeOne(environ=env) request.processInputs() self.assert_(ILocale.providedBy(request.locale)) request.form['locale'] = '1' self.assertEqual(request.locale, '1') request['locale'] = '2' self.assertEqual(request.locale, '2') def test_locale_semantics(self): from zope.component import provideAdapter from zope.publisher.browser import BrowserLanguages from zope.publisher.interfaces.http import IHTTPRequest from zope.i18n.interfaces import IUserPreferredLanguages from zope.i18n.interfaces.locales import ILocale provideAdapter(BrowserLanguages, [IHTTPRequest], IUserPreferredLanguages) env_ = {'HTTP_ACCEPT_LANGUAGE': 'en'} # we should also test the correct semantics of the locale for httplang in ('it', 'it-ch', 'it-CH', 'IT', 'IT-CH', 'IT-ch'): env = env_.copy() env['HTTP_ACCEPT_LANGUAGE'] = httplang request = self._makeOne(environ=env) locale = request.locale self.assert_(ILocale.providedBy(locale)) parts = httplang.split('-') lang = parts.pop(0).lower() territory = variant = None if parts: territory = parts.pop(0).upper() if parts: variant = parts.pop(0).upper() self.assertEqual(locale.id.language, lang) self.assertEqual(locale.id.territory, territory) self.assertEqual(locale.id.variant, variant) def test_locale_fallback(self): from zope.component import provideAdapter from zope.publisher.browser import BrowserLanguages from zope.publisher.interfaces.http import IHTTPRequest from zope.i18n.interfaces import IUserPreferredLanguages from zope.i18n.interfaces.locales import ILocale provideAdapter(BrowserLanguages, [IHTTPRequest], IUserPreferredLanguages) env = {'HTTP_ACCEPT_LANGUAGE': 'en', 'HTTP_ACCEPT_LANGUAGE': 'xx'} # Now test for non-existant locale fallback request = self._makeOne(environ=env) locale = request.locale self.assert_(ILocale.providedBy(locale)) self.assert_(locale.id.language is None) self.assert_(locale.id.territory is None) self.assert_(locale.id.variant is None) def test_method_GET(self): env = {'REQUEST_METHOD': 'GET'} request = self._makeOne(environ=env) self.assertEqual(request.method, 'GET') def test_method_POST(self): env = {'REQUEST_METHOD': 'POST'} request = self._makeOne(environ=env) self.assertEqual(request.method, 'POST') def test_getClientAddr_wo_trusted_proxy(self): env = {'REMOTE_ADDR': '127.0.0.1', 'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'} request = self._makeOne(environ=env) self.assertEqual(request.getClientAddr(), '127.0.0.1') def test_getClientAddr_one_trusted_proxy(self): from ZPublisher.HTTPRequest import trusted_proxies env = {'REMOTE_ADDR': '127.0.0.1', 'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100' } orig = trusted_proxies[:] try: trusted_proxies.append('127.0.0.1') request = self._makeOne(environ=env) self.assertEqual(request.getClientAddr(), '192.168.1.100') finally: trusted_proxies[:] = orig def test_getClientAddr_trusted_proxy_last(self): from ZPublisher.HTTPRequest import trusted_proxies env = {'REMOTE_ADDR': '192.168.1.100', 'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'} orig = trusted_proxies[:] try: trusted_proxies.append('192.168.1.100') request = self._makeOne(environ=env) self.assertEqual(request.getClientAddr(), '10.1.20.30') finally: trusted_proxies[:] = orig def test_getClientAddr_trusted_proxy_no_REMOTE_ADDR(self): from ZPublisher.HTTPRequest import trusted_proxies env = {'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'} orig = trusted_proxies[:] try: trusted_proxies.append('192.168.1.100') request = self._makeOne(environ=env) self.assertEqual(request.getClientAddr(), '') finally: trusted_proxies[:] = orig def test_getHeader_exact(self): request = self._makeOne(environ=TEST_ENVIRON.copy()) self.assertEqual(request.getHeader('content-type'), 'multipart/form-data; boundary=12345') def test_getHeader_case_insensitive(self): request = self._makeOne(environ=TEST_ENVIRON.copy()) self.assertEqual(request.getHeader('Content-Type'), 'multipart/form-data; boundary=12345') def test_getHeader_underscore_is_dash(self): request = self._makeOne(environ=TEST_ENVIRON.copy()) self.assertEqual(request.getHeader('content_type'), 'multipart/form-data; boundary=12345') def test_getHeader_literal_turns_off_case_normalization(self): request = self._makeOne(environ=TEST_ENVIRON.copy()) self.assertEqual(request.getHeader('Content-Type', literal=True), None) def test_getHeader_nonesuch(self): request = self._makeOne(environ=TEST_ENVIRON.copy()) self.assertEqual(request.getHeader('none-such'), None) def test_getHeader_nonesuch_with_default(self): request = self._makeOne(environ=TEST_ENVIRON.copy()) self.assertEqual(request.getHeader('Not-existant', default='Whatever'), 'Whatever') def test_clone_updates_method_to_GET(self): request = self._makeOne(environ={'REQUEST_METHOD': 'POST'}) request['PARENTS'] = [object()] clone = request.clone() self.assertEqual(clone.method, 'GET') def test_clone_keeps_preserves__auth(self): request = self._makeOne() request['PARENTS'] = [object()] request._auth = 'foobar' clone = request.clone() self.assertEqual(clone._auth, 'foobar') def test_clone_doesnt_re_clean_environ(self): request = self._makeOne() request.environ['HTTP_CGI_AUTHORIZATION'] = 'lalalala' request['PARENTS'] = [object()] clone = request.clone() self.assertEqual(clone.environ['HTTP_CGI_AUTHORIZATION'], 'lalalala') def test_clone_keeps_only_last_PARENT(self): PARENTS = [object(), object()] request = self._makeOne() request['PARENTS'] = PARENTS clone = request.clone() self.assertEqual(clone['PARENTS'], PARENTS[1:]) def test_clone_preserves_response_class(self): class DummyResponse: pass request = self._makeOne(None, TEST_ENVIRON.copy(), DummyResponse()) request['PARENTS'] = [object()] clone = request.clone() self.assertTrue(isinstance(clone.response, DummyResponse)) def test_clone_preserves_request_subclass(self): class SubRequest(self._getTargetClass()): pass request = SubRequest(None, TEST_ENVIRON.copy(), None) request['PARENTS'] = [object()] clone = request.clone() self.assertTrue(isinstance(clone, SubRequest)) def test_clone_preserves_direct_interfaces(self): from zope.interface import directlyProvides from zope.interface import Interface class IFoo(Interface): pass request = self._makeOne() request['PARENTS'] = [object()] directlyProvides(request, IFoo) clone = request.clone() self.assertTrue(IFoo.providedBy(clone)) def test_resolve_url_doesnt_send_endrequestevent(self): import zope.event events = [] zope.event.subscribers.append(events.append) request = self._makeOne() request['PARENTS'] = [object()] try: request.resolve_url(request.script + '/') finally: zope.event.subscribers.remove(events.append) self.assertFalse(len(events), "HTTPRequest.resolve_url should not emit events") def test_resolve_url_errorhandling(self): # Check that resolve_url really raises the same error # it received from ZPublisher.BaseRequest.traverse from zExceptions import NotFound request = self._makeOne() request['PARENTS'] = [object()] self.assertRaises( NotFound , request.resolve_url , request.script + '/does_not_exist' ) def test_parses_json_cookies(self): # https://bugs.launchpad.net/zope2/+bug/563229 # reports cookies in the wild with embedded double quotes (e.g, # JSON-encoded data structures. env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80', 'HTTP_COOKIE': 'json={"intkey":123,"stringkey":"blah"}; ' 'anothercookie=boring; baz' } req = self._makeOne(environ=env) self.assertEquals(req.cookies['json'], '{"intkey":123,"stringkey":"blah"}') self.assertEquals(req.cookies['anothercookie'], 'boring') def test_getVirtualRoot(self): # https://bugs.launchpad.net/zope2/+bug/193122 req = self._makeOne() req._script = [] self.assertEquals(req.getVirtualRoot(), '') req._script = ['foo', 'bar'] self.assertEquals(req.getVirtualRoot(), '/foo/bar') class TestHTTPRequestZope3Views(TestRequestZope3ViewsBase,): def _makeOne(self, root): from zope.interface import directlyProvides from zope.publisher.browser import IDefaultBrowserLayer request = HTTPRequestFactoryMixin()._makeOne() request['PARENTS'] = [root] # The request needs to implement the proper interface directlyProvides(request, IDefaultBrowserLayer) return request def test_no_traversal_of_view_request_attribute(self): # make sure views don't accidentally publish the 'request' attribute from ZPublisher import NotFound root, _ = self._makeRootAndFolder() # make sure the view itself is traversable: view = self._makeOne(root).traverse('folder/@@meth') from ZPublisher.HTTPRequest import HTTPRequest self.assertEqual(view.request.__class__, HTTPRequest,) # but not the request: self.assertRaises( NotFound, self._makeOne(root).traverse, 'folder/@@meth/request' ) TEST_ENVIRON = { 'CONTENT_TYPE': 'multipart/form-data; boundary=12345', 'REQUEST_METHOD': 'POST', 'SERVER_NAME': 'localhost', 'SERVER_PORT': '80', } TEST_FILE_DATA = ''' --12345 Content-Disposition: form-data; name="file"; filename="file" Content-Type: application/octet-stream test --12345-- ''' TEST_LARGEFILE_DATA = ''' --12345 Content-Disposition: form-data; name="file"; filename="file" Content-Type: application/octet-stream test %s ''' % ('test' * 1000) def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(RecordTests)) suite.addTest(unittest.makeSuite(HTTPRequestTests)) suite.addTest(unittest.makeSuite(TestHTTPRequestZope3Views)) return suite