You need to sign in or sign up before continuing.
test_xmlrpc.py 16.4 KB
Newer Older
1
import base64
2
import datetime
3
import sys
4
import time
5 6
import unittest
import xmlrpclib
7 8
import SimpleXMLRPCServer
import threading
9
import mimetools
10
from test import test_support
11 12 13

alist = [{'astring': 'foo@bar.baz.spam',
          'afloat': 7283.43,
14
          'anint': 2**20,
15
          'ashortlong': 2,
16 17
          'anotherlist': ['.zyx.41'],
          'abase64': xmlrpclib.Binary("my dog has fleas"),
18
          'boolean': False,
19 20
          'unicode': '\u4000\u6000\u8000',
          'ukey\u4000': 'regular value',
21 22
          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
          'datetime2': xmlrpclib.DateTime(
23
                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
24
          'datetime3': xmlrpclib.DateTime(
25
                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
26 27 28 29
          'datetime4': xmlrpclib.DateTime(
                        datetime.date(2005, 2, 10)),
          'datetime5': xmlrpclib.DateTime(
                        datetime.time(11, 41, 23)),
30 31 32 33 34 35 36 37
          }]

class XMLRPCTestCase(unittest.TestCase):

    def test_dump_load(self):
        self.assertEquals(alist,
                          xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0])

38
    def test_dump_bare_datetime(self):
39 40 41 42
        # This checks that an unwrapped datetime.date object can be handled
        # by the marshalling code.  This can't be done via test_dump_load()
        # since with use_datetime set to 1 the unmarshaller would create
        # datetime objects for the 'datetime[123]' keys as well
43
        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
44
        s = xmlrpclib.dumps((dt,))
45 46
        (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
        self.assertEquals(newdt, dt)
47 48
        self.assertEquals(m, None)

49 50 51 52 53 54 55
        (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
        self.assertEquals(newdt, xmlrpclib.DateTime('20050210T11:41:23'))

    def test_dump_bare_date(self):
        # This checks that an unwrapped datetime.date object can be handled
        # by the marshalling code.  This can't be done via test_dump_load()
        # since the unmarshaller produces a datetime object
56
        d = datetime.datetime(2005, 2, 10, 11, 41, 23).date()
57 58 59 60 61 62 63 64 65 66 67 68 69
        s = xmlrpclib.dumps((d,))
        (newd,), m = xmlrpclib.loads(s, use_datetime=1)
        self.assertEquals(newd.date(), d)
        self.assertEquals(newd.time(), datetime.time(0, 0, 0))
        self.assertEquals(m, None)

        (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
        self.assertEquals(newdt, xmlrpclib.DateTime('20050210T00:00:00'))

    def test_dump_bare_time(self):
        # This checks that an unwrapped datetime.time object can be handled
        # by the marshalling code.  This can't be done via test_dump_load()
        # since the unmarshaller produces a datetime object
70
        t = datetime.datetime(2005, 2, 10, 11, 41, 23).time()
71 72 73 74 75 76 77 78 79 80
        s = xmlrpclib.dumps((t,))
        (newt,), m = xmlrpclib.loads(s, use_datetime=1)
        today = datetime.datetime.now().date().strftime("%Y%m%d")
        self.assertEquals(newt.time(), t)
        self.assertEquals(newt.date(), datetime.datetime.now().date())
        self.assertEquals(m, None)

        (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
        self.assertEquals(newdt, xmlrpclib.DateTime('%sT11:41:23'%today))

81 82
    def test_bug_1164912 (self):
        d = xmlrpclib.DateTime()
Tim Peters's avatar
Tim Peters committed
83
        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
84 85 86 87 88 89 90
                                            methodresponse=True))
        self.assert_(isinstance(new_d.value, str))

        # Check that the output of dumps() is still an 8-bit string
        s = xmlrpclib.dumps((new_d,), methodresponse=True)
        self.assert_(isinstance(s, str))

91 92 93 94 95 96 97 98 99
    def test_newstyle_class(self):
        class T(object):
            pass
        t = T()
        t.x = 100
        t.y = "Hello"
        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
        self.assertEquals(t2, t.__dict__)

100
    def test_dump_big_long(self):
101
        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
102 103 104 105

    def test_dump_bad_dict(self):
        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))

106 107 108 109 110 111 112 113 114 115 116 117
    def test_dump_recursive_seq(self):
        l = [1,2,3]
        t = [3,4,5,l]
        l.append(t)
        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))

    def test_dump_recursive_dict(self):
        d = {'1':1, '2':1}
        t = {'3':3, 'd':d}
        d['t'] = t
        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))

118
    def test_dump_big_int(self):
119
        if sys.maxint > 2**31-1:
120
            self.assertRaises(OverflowError, xmlrpclib.dumps,
121
                              (int(2**34),))
122

123 124 125 126 127 128 129 130 131 132 133 134 135 136
        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
        self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,))
        self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,))

        def dummy_write(s):
            pass

        m = xmlrpclib.Marshaller()
        m.dump_int(xmlrpclib.MAXINT, dummy_write)
        m.dump_int(xmlrpclib.MININT, dummy_write)
        self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write)
        self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write)


137 138 139 140 141 142 143 144
    def test_dump_none(self):
        value = alist + [None]
        arg1 = (alist + [None],)
        strg = xmlrpclib.dumps(arg1, allow_none=True)
        self.assertEquals(value,
                          xmlrpclib.loads(strg)[0][0])
        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))

145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244

class HelperTestCase(unittest.TestCase):
    def test_escape(self):
        self.assertEqual(xmlrpclib.escape("a&b"), "a&b")
        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")

class FaultTestCase(unittest.TestCase):
    def test_repr(self):
        f = xmlrpclib.Fault(42, 'Test Fault')
        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
        self.assertEqual(repr(f), str(f))

    def test_dump_fault(self):
        f = xmlrpclib.Fault(42, 'Test Fault')
        s = xmlrpclib.dumps((f,))
        (newf,), m = xmlrpclib.loads(s)
        self.assertEquals(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
        self.assertEquals(m, None)

        s = xmlrpclib.Marshaller().dumps(f)
        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)


class DateTimeTestCase(unittest.TestCase):
    def test_default(self):
        t = xmlrpclib.DateTime()

    def test_time(self):
        d = 1181399930.036952
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))

    def test_time_tuple(self):
        d = (2007,6,9,10,38,50,5,160,0)
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t), '20070609T10:38:50')

    def test_time_struct(self):
        d = time.localtime(1181399930.036952)
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t),  time.strftime("%Y%m%dT%H:%M:%S", d))

    def test_datetime_datetime(self):
        d = datetime.datetime(2007,1,2,3,4,5)
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t), '20070102T03:04:05')

    def test_datetime_date(self):
        d = datetime.date(2007,9,8)
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t), '20070908T00:00:00')

    def test_datetime_time(self):
        d = datetime.time(13,17,19)
        # allow for date rollover by checking today's or tomorrow's dates
        dd1 = datetime.datetime.now().date()
        dd2 = dd1 + datetime.timedelta(days=1)
        vals = (dd1.strftime('%Y%m%dT13:17:19'),
                dd2.strftime('%Y%m%dT13:17:19'))
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t) in vals, True)

    def test_repr(self):
        d = datetime.datetime(2007,1,2,3,4,5)
        t = xmlrpclib.DateTime(d)
        val ="<DateTime '20070102T03:04:05' at %x>" % id(t)
        self.assertEqual(repr(t), val)

    def test_decode(self):
        d = ' 20070908T07:11:13  '
        t1 = xmlrpclib.DateTime()
        t1.decode(d)
        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
        self.assertEqual(t1, tref)

        t2 = xmlrpclib._datetime(d)
        self.assertEqual(t1, tref)

class BinaryTestCase(unittest.TestCase):
    def test_default(self):
        t = xmlrpclib.Binary()
        self.assertEqual(str(t), '')

    def test_string(self):
        d = '\x01\x02\x03abc123\xff\xfe'
        t = xmlrpclib.Binary(d)
        self.assertEqual(str(t), d)

    def test_decode(self):
        d = '\x01\x02\x03abc123\xff\xfe'
        de = base64.encodestring(d)
        t1 = xmlrpclib.Binary()
        t1.decode(de)
        self.assertEqual(str(t1), d)

        t2 = xmlrpclib._binary(de)
        self.assertEqual(str(t2), d)


245 246 247 248 249 250 251 252 253
PORT = None

def http_server(evt, numrequests):
    class TestInstanceClass:
        def div(self, x, y):
            '''This is the div function'''
            return x // y

    try:
254 255
        serv = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", 0),
                        logRequests=False, bind_and_activate=False)
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
        serv.socket.settimeout(3)
        serv.server_bind()
        global PORT
        PORT = serv.socket.getsockname()[1]
        serv.server_activate()
        serv.register_introspection_functions()
        serv.register_multicall_functions()
        serv.register_function(pow)
        serv.register_function(lambda x,y: x+y, 'add')
        serv.register_instance(TestInstanceClass())

        # handle up to 'numrequests' requests
        while numrequests > 0:
            serv.handle_request()
            numrequests -= 1

    except socket.timeout:
        pass
    finally:
        serv.socket.close()
        PORT = None
        evt.set()


280
class SimpleServerTestCase(unittest.TestCase):
281
    def setUp(self):
282 283 284
        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

285
        self.evt = threading.Event()
286 287 288
        # start server thread to handle requests
        serv_args = (self.evt, 2)
        threading.Thread(target=http_server, args=serv_args).start()
289 290 291 292 293 294 295 296 297 298 299 300 301

        # wait for port to be assigned to server
        n = 1000
        while n > 0 and PORT is None:
            time.sleep(0.001)
            n -= 1

        time.sleep(0.5)

    def tearDown(self):
        # wait on the server thread to terminate
        self.evt.wait()

302 303 304
        # disable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False

305
    def test_simple1(self):
306 307 308
        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            self.assertEqual(p.pow(6,8), 6**8)
309
        except xmlrpclib.ProtocolError as e:
310 311
            # protocol error; provide additional information in test output
            self.fail("%s\n%s" % (e, e.headers))
312 313

    def test_introspection1(self):
314 315 316 317 318 319
        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            meth = p.system.listMethods()
            expected_methods = set(['pow', 'div', 'add', 'system.listMethods',
                'system.methodHelp', 'system.methodSignature', 'system.multicall'])
            self.assertEqual(set(meth), expected_methods)
320
        except xmlrpclib.ProtocolError as e:
321 322
            # protocol error; provide additional information in test output
            self.fail("%s\n%s" % (e, e.headers))
323 324

    def test_introspection2(self):
325 326 327 328
        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            divhelp = p.system.methodHelp('div')
            self.assertEqual(divhelp, 'This is the div function')
329
        except xmlrpclib.ProtocolError as e:
330 331
            # protocol error; provide additional information in test output
            self.fail("%s\n%s" % (e, e.headers))
332 333 334 335

    def test_introspection3(self):
        # the SimpleXMLRPCServer doesn't support signatures, but
        # at least check that we can try
336 337 338 339
        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            divsig = p.system.methodSignature('div')
            self.assertEqual(divsig, 'signatures not supported')
340
        except xmlrpclib.ProtocolError as e:
341 342
            # protocol error; provide additional information in test output
            self.fail("%s\n%s" % (e, e.headers))
343 344

    def test_multicall(self):
345 346 347 348 349 350 351 352 353 354
        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            multicall = xmlrpclib.MultiCall(p)
            multicall.add(2,3)
            multicall.pow(6,8)
            multicall.div(127,42)
            add_result, pow_result, div_result = multicall()
            self.assertEqual(add_result, 2+3)
            self.assertEqual(pow_result, 6**8)
            self.assertEqual(div_result, 127//42)
355
        except xmlrpclib.ProtocolError as e:
356 357
            # protocol error; provide additional information in test output
            self.fail("%s\n%s" % (e, e.headers))
358 359


360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
# This is a contrived way to make a failure occur on the server side
# in order to test the _send_traceback_header flag on the server
class FailingMessageClass(mimetools.Message):
    def __getitem__(self, key):
        key = key.lower()
        if key == 'content-length':
            return 'I am broken'
        return mimetools.Message.__getitem__(self, key)


class FailingServerTestCase(unittest.TestCase):
    def setUp(self):
        self.evt = threading.Event()
        # start server thread to handle requests
        serv_args = (self.evt, 2)
        threading.Thread(target=http_server, args=serv_args).start()

        # wait for port to be assigned to server
        n = 1000
        while n > 0 and PORT is None:
            time.sleep(0.001)
            n -= 1

        time.sleep(0.5)

    def tearDown(self):
        # wait on the server thread to terminate
        self.evt.wait()
        # reset flag
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
        # reset message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message

    def test_basic(self):
        # check that flag is false by default
        flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

398 399 400 401 402 403 404
        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            self.assertEqual(p.pow(6,8), 6**8)
405
        except xmlrpclib.ProtocolError as e:
406 407
            # protocol error; provide additional information in test output
            self.fail("%s\n%s" % (e, e.headers))
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442

    def test_fail_no_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            p.pow(6,8)
        except xmlrpclib.ProtocolError as e:
            # The two server-side error headers shouldn't be sent back in this case
            self.assertTrue(e.headers.get("X-exception") is None)
            self.assertTrue(e.headers.get("X-traceback") is None)
        else:
            self.fail('ProtocolError not raised')

    def test_fail_with_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
            p.pow(6,8)
        except xmlrpclib.ProtocolError as e:
            # We should get error info in the response
            expected_err = "invalid literal for int() with base 10: 'I am broken'"
            self.assertEqual(e.headers.get("x-exception"), expected_err)
            self.assertTrue(e.headers.get("x-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')


443
def test_main():
444 445 446 447 448 449 450 451
    xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
         BinaryTestCase, FaultTestCase]

    # The test cases against a SimpleXMLRPCServer raise a socket error
    # 10035 (WSAEWOULDBLOCK) in the server thread handle_request call when
    # run on Windows. This only happens on the first test to run, but it
    # fails every time and so these tests are skipped on win32 platforms.
    if sys.platform != 'win32':
452 453
        xmlrpc_tests.append(SimpleServerTestCase)
        xmlrpc_tests.append(FailingServerTestCase)
454

455
    test_support.run_unittest(*xmlrpc_tests)
456 457 458

if __name__ == "__main__":
    test_main()