Commit 9177c3af authored by Denis Bilenko's avatar Denis Bilenko

drop support for Python 2.5

parent ed74f199
......@@ -6,7 +6,6 @@ env:
matrix:
include:
- python: 2.6
- python: 2.5
- python: 2.7
env: DEBUGPY=-dbg GEVENTSETUP_EV_VERIFY=3
- python: 2.6
......@@ -17,7 +16,6 @@ install:
- pip install --use-mirrors cython
- cython --version
- pip install --use-mirrors greenlet psycopg2 pysendfile web.py
- if [ "x$TRAVIS_PYTHON_VERSION" == "x2.5" ]; then sudo apt-get install libssl-dev libkrb5-dev libbluetooth-dev; pip install --use-mirrors sslfix; fi
- if [ "x$TRAVIS_PYTHON_VERSION" == "x2.7" ]; then pip install --use-mirrors -q pep8; fi
- python -c 'import greenlet; print greenlet, greenlet.__version__; import psycopg2; print psycopg2, psycopg2.__version__; import web; print web, web.__version__'
- export CYTHON=`which cython`
......
......@@ -20,7 +20,7 @@ gevent_ is written and maintained by `Denis Bilenko`_ and is licensed under MIT
get gevent
----------
Install Python 2.5 or newer and greenlet_ extension.
Install Python 2.6 or newer and greenlet_ extension.
Download the latest release from `Python Package Index`_ or clone `the repository`_.
......
......@@ -19,7 +19,7 @@ Features include:
Installation
------------
gevent runs on Python 2.5 and newer and requires
gevent runs on Python 2.6 and newer and requires
* greenlet__ which can be installed with ``pip install greenlet``.
......
......@@ -2,7 +2,6 @@
"""Make the standard library cooperative."""
from __future__ import absolute_import
import sys
from sys import version_info
__all__ = ['patch_all',
'patch_socket',
......@@ -186,13 +185,7 @@ def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=Tru
if select:
patch_select(aggressive=aggressive)
if ssl:
if version_info[:2] > (2, 5):
patch_ssl()
else:
try:
patch_ssl()
except ImportError:
pass # in Python 2.5, 'ssl' is a standalone package not included in stdlib
patch_ssl()
if httplib:
raise ValueError('gevent.httplib is no longer provided, httplib must be False')
if subprocess:
......
......@@ -379,46 +379,23 @@ class WSGIHandler(object):
raise AssertionError("The application did not call start_response()")
self._write_with_headers(data)
if sys.version_info[:2] >= (2, 6):
def _write_with_headers(self, data):
towrite = bytearray()
self.headers_sent = True
self.finalize_headers()
towrite.extend('HTTP/1.1 %s\r\n' % self.status)
for header in self.response_headers:
towrite.extend('%s: %s\r\n' % header)
towrite.extend('\r\n')
if data:
if self.response_use_chunked:
## Write the chunked encoding
towrite.extend("%x\r\n%s\r\n" % (len(data), data))
else:
towrite.extend(data)
self._sendall(towrite)
else:
# Python 2.5 does not have bytearray
def _write_with_headers(self, data):
towrite = []
self.headers_sent = True
self.finalize_headers()
towrite.append('HTTP/1.1 %s\r\n' % self.status)
for header in self.response_headers:
towrite.append('%s: %s\r\n' % header)
towrite.append('\r\n')
if data:
if self.response_use_chunked:
## Write the chunked encoding
towrite.append("%x\r\n%s\r\n" % (len(data), data))
else:
towrite.append(data)
self._sendall(''.join(towrite))
def _write_with_headers(self, data):
towrite = bytearray()
self.headers_sent = True
self.finalize_headers()
towrite.extend('HTTP/1.1 %s\r\n' % self.status)
for header in self.response_headers:
towrite.extend('%s: %s\r\n' % header)
towrite.extend('\r\n')
if data:
if self.response_use_chunked:
## Write the chunked encoding
towrite.extend("%x\r\n%s\r\n" % (len(data), data))
else:
towrite.extend(data)
self._sendall(towrite)
def start_response(self, status, headers, exc_info=None):
if exc_info:
......
......@@ -73,10 +73,6 @@ __imports__ = ['error',
'getservbyport',
'getdefaulttimeout',
'setdefaulttimeout',
# Python 2.5 and older:
'RAND_add',
'RAND_egd',
'RAND_status',
# Windows:
'errorTab']
......@@ -655,15 +651,4 @@ def getfqdn(name=''):
return name
try:
from gevent.ssl import sslwrap_simple as ssl, SSLError as sslerror, SSLSocket as SSLType
_have_ssl = True
except ImportError:
_have_ssl = False
if sys.version_info[:2] <= (2, 5) and _have_ssl:
__implements__.extend(['ssl', 'sslerror', 'SSLType'])
__all__ = __implements__ + __extensions__ + __imports__
......@@ -5,10 +5,6 @@
For the documentation, refer to :mod:`ssl` module manual.
This module implements cooperative SSL socket wrappers.
On Python 2.6 and newer it uses Python's native :mod:`ssl` module. On Python 2.5 and 2.4
it requires `ssl package`_ to be installed.
.. _`ssl package`: http://pypi.python.org/pypi/ssl
"""
from __future__ import absolute_import
......
......@@ -20,16 +20,6 @@ __all__ = ['Timeout',
'with_timeout']
try:
BaseException
except NameError: # Python < 2.5
class BaseException:
# not subclassing from object() intentionally, because in
# that case "raise Timeout" fails with TypeError.
pass
class Timeout(BaseException):
"""Raise *exception* in the current greenlet after given time period::
......@@ -129,10 +119,7 @@ class Timeout(BaseException):
self.timer.stop()
def __repr__(self):
try:
classname = self.__class__.__name__
except AttributeError: # Python < 2.5
classname = 'Timeout'
classname = type(self).__name__
if self.pending:
pending = ' pending'
else:
......
import httplib
import StringIO
import sys
from unittest import TestCase
from test import test_support
class FakeSocket:
def __init__(self, text, fileclass=StringIO.StringIO):
self.text = text
self.fileclass = fileclass
def sendall(self, data):
self.data = data
def makefile(self, mode, bufsize=None):
if mode != 'r' and mode != 'rb':
raise httplib.UnimplementedFileMode()
return self.fileclass(self.text)
class NoEOFStringIO(StringIO.StringIO):
"""Like StringIO, but raises AssertionError on EOF.
This is used below to test that httplib doesn't try to read
more from the underlying file than it should.
"""
def read(self, n=-1):
data = StringIO.StringIO.read(self, n)
if data == '':
raise AssertionError('caller tried to read past EOF')
return data
def readline(self, length=None):
data = StringIO.StringIO.readline(self, length)
if data == '':
raise AssertionError('caller tried to read past EOF')
return data
class HeaderTests(TestCase):
def test_auto_headers(self):
# Some headers are added automatically, but should not be added by
# .request() if they are explicitly set.
import httplib
class HeaderCountingBuffer(list):
def __init__(self):
self.count = {}
def append(self, item):
kv = item.split(':')
if len(kv) > 1:
# item is a 'Key: Value' header string
lcKey = kv[0].lower()
self.count.setdefault(lcKey, 0)
self.count[lcKey] += 1
list.append(self, item)
for explicit_header in True, False:
for header in 'Content-length', 'Host', 'Accept-encoding':
conn = httplib.HTTPConnection('example.com')
conn.sock = FakeSocket('blahblahblah')
conn._buffer = HeaderCountingBuffer()
body = 'spamspamspam'
headers = {}
if explicit_header:
headers[header] = str(len(body))
conn.request('POST', '/', body, headers)
self.assertEqual(conn._buffer.count[header.lower()], 1)
# Collect output to a buffer so that we don't have to cope with line-ending
# issues across platforms. Specifically, the headers will have \r\n pairs
# and some platforms will strip them from the output file.
def test():
buf = StringIO.StringIO()
_stdout = sys.stdout
try:
sys.stdout = buf
_test()
finally:
sys.stdout = _stdout
# print individual lines with endings stripped
s = buf.getvalue()
for line in s.split("\n"):
print line.strip()
def _test():
# Test HTTP status lines
body = "HTTP/1.1 200 Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock, 1)
resp.begin()
print resp.read()
resp.close()
body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock, 1)
try:
resp.begin()
except httplib.BadStatusLine:
print "BadStatusLine raised as expected"
else:
print "Expect BadStatusLine"
# Check invalid host_port
for hp in ("www.python.org:abc", "www.python.org:"):
try:
h = httplib.HTTP(hp)
except httplib.InvalidURL:
print "InvalidURL raised as expected"
else:
print "Expect InvalidURL"
for hp,h,p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 8000),
("www.python.org:80", "www.python.org", 80),
("www.python.org", "www.python.org", 80),
("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
try:
http = httplib.HTTP(hp)
except httplib.InvalidURL:
print "InvalidURL raised erroneously"
c = http._conn
if h != c.host: raise AssertionError, ("Host incorrectly parsed", h, c.host)
if p != c.port: raise AssertionError, ("Port incorrectly parsed", p, c.host)
# test response with multiple message headers with the same field name.
text = ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"\r\n'
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
' Path="/acme"\r\n'
'\r\n'
'No body\r\n')
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
', '
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
s = FakeSocket(text)
r = httplib.HTTPResponse(s, 1)
r.begin()
cookies = r.getheader("Set-Cookie")
if cookies != hdr:
raise AssertionError, "multiple headers not combined properly"
# Test that the library doesn't attempt to read any data
# from a HEAD request. (Tickles SF bug #622042.)
sock = FakeSocket(
'HTTP/1.1 200 OK\r\n'
'Content-Length: 14432\r\n'
'\r\n',
NoEOFStringIO)
resp = httplib.HTTPResponse(sock, 1, method="HEAD")
resp.begin()
if resp.read() != "":
raise AssertionError, "Did not expect response from HEAD request"
resp.close()
class OfflineTest(TestCase):
def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
def test_main(verbose=None):
tests = [HeaderTests,OfflineTest]
test_support.run_unittest(*tests)
test()
# Some simple Queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
import Queue
import sys
import threading
import time
from test.test_support import verify, TestFailed, verbose
QUEUE_SIZE = 5
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading.Thread):
def __init__(self, fn, args):
self.fn = fn
self.args = args
self.startedEvent = threading.Event()
threading.Thread.__init__(self)
def run(self):
# The sleep isn't necessary, but is intended to give the blocking
# function in the main thread a chance at actually blocking before
# we unclog it. But if the sleep is longer than the timeout-based
# tests wait in their blocking functions, those tests will fail.
# So we give them much longer timeout values compared to the
# sleep here (I aimed at 10 seconds for blocking functions --
# they should never actually wait that long - they should make
# progress as soon as we call self.fn()).
time.sleep(0.1)
self.startedEvent.set()
self.fn(*self.args)
# Execute a function that blocks, and in a separate thread, a function that
# triggers the release. Returns the result of the blocking function.
# Caution: block_func must guarantee to block until trigger_func is
# called, and trigger_func must guarantee to change queue state so that
# block_func can make enough progress to return. In particular, a
# block_func that just raises an exception regardless of whether trigger_func
# is called will lead to timing-dependent sporadic failures, and one of
# those went rarely seen but undiagnosed for years. Now block_func
# must be unexceptional. If block_func is supposed to raise an exception,
# call _doExceptionalBlockingTest() instead.
def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
result = block_func(*block_args)
# If block_func returned before our thread made the call, we failed!
if not t.startedEvent.isSet():
raise TestFailed("blocking function '%r' appeared not to block" %
block_func)
t.join(10) # make sure the thread terminates
if t.isAlive():
raise TestFailed("trigger function '%r' appeared to not return" %
trigger_func)
return result
# Call this instead if block_func is supposed to raise an exception.
def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
trigger_args, expected_exception_class):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
try:
try:
block_func(*block_args)
except expected_exception_class:
raise
else:
raise TestFailed("expected exception of kind %r" %
expected_exception_class)
finally:
t.join(10) # make sure the thread terminates
if t.isAlive():
raise TestFailed("trigger function '%r' appeared to not return" %
trigger_func)
if not t.startedEvent.isSet():
raise TestFailed("trigger thread ended but event never set")
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
pass
class FailingQueue(Queue.Queue):
def __init__(self, *args):
self.fail_next_put = False
self.fail_next_get = False
Queue.Queue.__init__(self, *args)
def _put(self, item):
if self.fail_next_put:
self.fail_next_put = False
raise FailingQueueException, "You Lose"
return Queue.Queue._put(self, item)
def _get(self):
if self.fail_next_get:
self.fail_next_get = False
raise FailingQueueException, "You Lose"
return Queue.Queue._get(self)
def FailingQueueTest(q):
if not q.empty():
raise RuntimeError, "Call this function with an empty queue"
for i in range(QUEUE_SIZE-1):
q.put(i)
# Test a failing non-blocking put.
q.fail_next_put = True
try:
q.put("oops", block=0)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
q.fail_next_put = True
try:
q.put("oops", timeout=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
q.put("last")
verify(q.full(), "Queue should be full")
# Test a failing blocking put
q.fail_next_put = True
try:
_doBlockingTest(q.put, ("full",), q.get, ())
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
# Test a failing timeout put
q.fail_next_put = True
try:
_doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
verify(q.full(), "Queue should be full")
q.get()
verify(not q.full(), "Queue should not be full")
q.put("last")
verify(q.full(), "Queue should be full")
# Test a blocking put
_doBlockingTest( q.put, ("full",), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
verify(q.empty(), "Queue should be empty")
q.put("first")
q.fail_next_get = True
try:
q.get()
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
verify(not q.empty(), "Queue should not be empty")
q.fail_next_get = True
try:
q.get(timeout=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
verify(not q.empty(), "Queue should not be empty")
q.get()
verify(q.empty(), "Queue should be empty")
q.fail_next_get = True
try:
_doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# put succeeded, but get failed.
verify(not q.empty(), "Queue should not be empty")
q.get()
verify(q.empty(), "Queue should be empty")
def SimpleQueueTest(q):
if not q.empty():
raise RuntimeError, "Call this function with an empty queue"
# I guess we better check things actually queue correctly a little :)
q.put(111)
q.put(222)
verify(q.get() == 111 and q.get() == 222,
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
verify(not q.empty(), "Queue should not be empty")
verify(not q.full(), "Queue should not be full")
q.put("last")
verify(q.full(), "Queue should be full")
try:
q.put("full", block=0)
raise TestFailed("Didn't appear to block with a full queue")
except Queue.Full:
pass
try:
q.put("full", timeout=0.01)
raise TestFailed("Didn't appear to time-out with a full queue")
except Queue.Full:
pass
# Test a blocking put
_doBlockingTest(q.put, ("full",), q.get, ())
_doBlockingTest(q.put, ("full", True, 10), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
verify(q.empty(), "Queue should be empty")
try:
q.get(block=0)
raise TestFailed("Didn't appear to block with an empty queue")
except Queue.Empty:
pass
try:
q.get(timeout=0.01)
raise TestFailed("Didn't appear to time-out with an empty queue")
except Queue.Empty:
pass
# Test a blocking get
_doBlockingTest(q.get, (), q.put, ('empty',))
_doBlockingTest(q.get, (True, 10), q.put, ('empty',))
cum = 0
cumlock = threading.Lock()
def worker(q):
global cum
while True:
x = q.get()
if x is None:
q.task_done()
return
cumlock.acquire()
try:
cum += x
finally:
cumlock.release()
q.task_done()
def QueueJoinTest(q):
global cum
cum = 0
for i in (0,1):
threading.Thread(target=worker, args=(q,)).start()
for i in xrange(100):
q.put(i)
q.join()
verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
for i in (0,1):
q.put(None) # instruct the threads to close
q.join() # verify that you can join twice
def QueueTaskDoneTest(q):
try:
q.task_done()
except ValueError:
pass
else:
raise TestFailed("Did not detect task count going negative")
def test():
q = Queue.Queue()
QueueTaskDoneTest(q)
QueueJoinTest(q)
QueueJoinTest(q)
QueueTaskDoneTest(q)
q = Queue.Queue(QUEUE_SIZE)
# Do it a couple of times on the same queue
SimpleQueueTest(q)
SimpleQueueTest(q)
if verbose:
print "Simple Queue tests seemed to work"
q = FailingQueue(QUEUE_SIZE)
FailingQueueTest(q)
FailingQueueTest(q)
if verbose:
print "Failing Queue tests seemed to work"
test()
# Testing select module
from test.test_support import verbose, reap_children
import select
import os
# test some known error conditions
try:
rfd, wfd, xfd = select.select(1, 2, 3)
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
class Nope:
pass
class Almost:
def fileno(self):
return 'fileno'
try:
rfd, wfd, xfd = select.select([Nope()], [], [])
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
try:
rfd, wfd, xfd = select.select([Almost()], [], [])
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
try:
rfd, wfd, xfd = select.select([], [], [], 'not a number')
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
def test():
import sys
if sys.platform[:3] in ('win', 'mac', 'os2', 'riscos'):
if verbose:
print "Can't test select easily on", sys.platform
return
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 0.1; done'
p = os.popen(cmd, 'r')
for tout in (0, 0.1, 0.2, 0.4, 0.8, 1.6) + (None,)*10:
if verbose:
print 'timeout =', tout
rfd, wfd, xfd = select.select([p], [], [], tout)
if (rfd, wfd, xfd) == ([], [], []):
continue
if (rfd, wfd, xfd) == ([p], [], []):
line = p.readline()
if verbose:
print repr(line)
if not line:
if verbose:
print 'EOF'
break
continue
print 'Unexpected return values from select():', rfd, wfd, xfd
p.close()
reap_children()
test()
# Test the signal module
from test.test_support import verbose, TestSkipped, TestFailed, vereq
import signal
import os, sys, time
if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
raise TestSkipped, "Can't test signal on %s" % sys.platform
MAX_DURATION = 20 # Entire test should last at most 20 sec.
if verbose:
x = '-x'
else:
x = '+x'
pid = os.getpid()
if verbose:
print "test runner's pid is", pid
# Shell script that will send us asynchronous signals
script = """
(
set %(x)s
sleep 2
kill -HUP %(pid)d
sleep 2
kill -USR1 %(pid)d
sleep 2
kill -USR2 %(pid)d
) &
""" % vars()
a_called = b_called = False
def handlerA(*args):
global a_called
a_called = True
if verbose:
print "handlerA invoked", args
class HandlerBCalled(Exception):
pass
def handlerB(*args):
global b_called
b_called = True
if verbose:
print "handlerB invoked", args
raise HandlerBCalled, args
# Set up a child to send signals to us (the parent) after waiting long
# enough to receive the alarm. It seems we miss the alarm for some
# reason. This will hopefully stop the hangs on Tru64/Alpha.
# Alas, it doesn't. Tru64 appears to miss all the signals at times, or
# seemingly random subsets of them, and nothing done in force_test_exit
# so far has actually helped.
def force_test_exit():
# Sigh, both imports seem necessary to avoid errors.
import os
fork_pid = os.fork()
if fork_pid:
# In parent.
return fork_pid
# In child.
import os, time
try:
# Wait 5 seconds longer than the expected alarm to give enough
# time for the normal sequence of events to occur. This is
# just a stop-gap to try to prevent the test from hanging.
time.sleep(MAX_DURATION + 5)
print >> sys.__stdout__, ' child should not have to kill parent'
for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
os.kill(pid, getattr(signal, signame))
print >> sys.__stdout__, " child sent", signame, "to", pid
time.sleep(1)
finally:
os._exit(0)
# Install handlers.
hup = signal.signal(signal.SIGHUP, handlerA)
usr1 = signal.signal(signal.SIGUSR1, handlerB)
usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
alrm = signal.signal(signal.SIGALRM, signal.default_int_handler)
try:
signal.alarm(MAX_DURATION)
vereq(signal.getsignal(signal.SIGHUP), handlerA)
vereq(signal.getsignal(signal.SIGUSR1), handlerB)
vereq(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
vereq(signal.getsignal(signal.SIGALRM), signal.default_int_handler)
# Try to ensure this test exits even if there is some problem with alarm.
# Tru64/Alpha often hangs and is ultimately killed by the buildbot.
fork_pid = force_test_exit()
try:
signal.getsignal(4242)
raise TestFailed('expected ValueError for invalid signal # to '
'getsignal()')
except ValueError:
pass
try:
signal.signal(4242, handlerB)
raise TestFailed('expected ValueError for invalid signal # to '
'signal()')
except ValueError:
pass
try:
signal.signal(signal.SIGUSR1, None)
raise TestFailed('expected TypeError for non-callable')
except TypeError:
pass
# Launch an external script to send us signals.
# We expect the external script to:
# send HUP, which invokes handlerA to set a_called
# send USR1, which invokes handlerB to set b_called and raise
# HandlerBCalled
# send USR2, which is ignored
#
# Then we expect the alarm to go off, and its handler raises
# KeyboardInterrupt, finally getting us out of the loop.
os.system(script)
try:
print "starting pause() loop..."
while 1:
try:
if verbose:
print "call pause()..."
signal.pause()
if verbose:
print "pause() returned"
except HandlerBCalled:
if verbose:
print "HandlerBCalled exception caught"
except KeyboardInterrupt:
if verbose:
print "KeyboardInterrupt (the alarm() went off)"
if not a_called:
print 'HandlerA not called'
if not b_called:
print 'HandlerB not called'
finally:
# Forcibly kill the child we created to ping us if there was a test error.
try:
# Make sure we don't kill ourself if there was a fork error.
if fork_pid > 0:
os.kill(fork_pid, signal.SIGKILL)
except:
# If the child killed us, it has probably exited. Killing a
# non-existent process will raise an error which we don't care about.
pass
# Restore handlers.
signal.alarm(0) # cancel alarm in case we died early
signal.signal(signal.SIGHUP, hup)
signal.signal(signal.SIGUSR1, usr1)
signal.signal(signal.SIGUSR2, usr2)
signal.signal(signal.SIGALRM, alrm)
This diff is collapsed.
# Test just the SSL support in the socket module, in a moderately bogus way.
import sys
from test import test_support
import socket
import errno
# Optionally test SSL support. This requires the 'network' resource as given
# on the regrtest command line.
skip_expected = not (test_support.is_resource_enabled('network') and
hasattr(socket, "ssl"))
def test_basic():
test_support.requires('network')
import urllib
if test_support.verbose:
print "test_basic ..."
socket.RAND_status()
try:
socket.RAND_egd(1)
except TypeError:
pass
else:
print "didn't raise TypeError"
socket.RAND_add("this is a random string", 75.0)
try:
f = urllib.urlopen('https://sf.net')
except IOError, exc:
if exc.errno == errno.ETIMEDOUT:
raise test_support.ResourceDenied('HTTPS connection is timing out')
else:
raise
buf = f.read()
f.close()
def test_timeout():
test_support.requires('network')
def error_msg(extra_msg):
print >> sys.stderr, """\
WARNING: an attempt to connect to %r %s, in
test_timeout. That may be legitimate, but is not the outcome we hoped
for. If this message is seen often, test_timeout should be changed to
use a more reliable address.""" % (ADDR, extra_msg)
if test_support.verbose:
print "test_timeout ..."
# A service which issues a welcome banner (without need to write
# anything).
ADDR = "pop.gmail.com", 995
s = socket.socket()
s.settimeout(30.0)
try:
s.connect(ADDR)
except socket.timeout:
error_msg('timed out')
return
except socket.error, exc: # In case connection is refused.
if exc.args[0] == errno.ECONNREFUSED:
error_msg('was refused')
return
else:
raise
ss = socket.ssl(s)
# Read part of return welcome banner twice.
ss.read(1)
ss.read(1)
s.close()
def test_rude_shutdown():
if test_support.verbose:
print "test_rude_shutdown ..."
try:
import threading
except ImportError:
return
# Some random port to connect to.
PORT = [9934]
listener_ready = threading.Event()
listener_gone = threading.Event()
# `listener` runs in a thread. It opens a socket listening on PORT, and
# sits in an accept() until the main thread connects. Then it rudely
# closes the socket, and sets Event `listener_gone` to let the main thread
# know the socket is gone.
def listener():
s = socket.socket()
PORT[0] = test_support.bind_port(s, '', PORT[0])
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect(('localhost', PORT[0]))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
except socket.sslerror:
pass
else:
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
def test_main():
if not hasattr(socket, "ssl"):
raise test_support.TestSkipped("socket module has no ssl support")
test_rude_shutdown()
test_basic()
test_timeout()
if __name__ == "__main__":
test_main()
# Test suite for SocketServer.py
from test import test_support
from test.test_support import (verbose, verify, TESTFN, TestSkipped,
reap_children)
test_support.requires('network')
from SocketServer import *
import socket
import errno
import select
import time
import threading
import os
NREQ = 3
DELAY = 0.5
class MyMixinHandler:
def handle(self):
time.sleep(DELAY)
line = self.rfile.readline()
time.sleep(DELAY)
self.wfile.write(line)
class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
pass
class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
pass
class MyMixinServer:
def serve_a_few(self):
for i in range(NREQ):
self.handle_request()
def handle_error(self, request, client_address):
self.close_request(request)
self.server_close()
raise
teststring = "hello world\n"
def receive(sock, n, timeout=20):
r, w, x = select.select([sock], [], [], timeout)
if sock in r:
return sock.recv(n)
else:
raise RuntimeError, "timed out on %r" % (sock,)
def testdgram(proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
s.sendto(teststring, addr)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
def teststream(proto, addr):
s = socket.socket(proto, socket.SOCK_STREAM)
s.connect(addr)
s.sendall(teststring)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
class ServerThread(threading.Thread):
def __init__(self, addr, svrcls, hdlrcls):
threading.Thread.__init__(self)
self.__addr = addr
self.__svrcls = svrcls
self.__hdlrcls = hdlrcls
def run(self):
class svrcls(MyMixinServer, self.__svrcls):
pass
if verbose: print "thread: creating server"
svr = svrcls(self.__addr, self.__hdlrcls)
# pull the address out of the server in case it changed
# this can happen if another process is using the port
addr = svr.server_address
if addr:
self.__addr = addr
if self.__addr != svr.socket.getsockname():
raise RuntimeError('server_address was %s, expected %s' %
(self.__addr, svr.socket.getsockname()))
if verbose: print "thread: serving three times"
svr.serve_a_few()
if verbose: print "thread: done"
seed = 0
def pickport():
global seed
seed += 1
return 10000 + (os.getpid() % 1000)*10 + seed
host = "localhost"
testfiles = []
def pickaddr(proto):
if proto == socket.AF_INET:
return (host, pickport())
else:
fn = TESTFN + str(pickport())
if os.name == 'os2':
# AF_UNIX socket names on OS/2 require a specific prefix
# which can't include a drive letter and must also use
# backslashes as directory separators
if fn[1] == ':':
fn = fn[2:]
if fn[0] in (os.sep, os.altsep):
fn = fn[1:]
fn = os.path.join('\socket', fn)
if os.sep == '/':
fn = fn.replace(os.sep, os.altsep)
else:
fn = fn.replace(os.altsep, os.sep)
testfiles.append(fn)
return fn
def cleanup():
for fn in testfiles:
try:
os.remove(fn)
except os.error:
pass
testfiles[:] = []
def testloop(proto, servers, hdlrcls, testfunc):
for svrcls in servers:
addr = pickaddr(proto)
if verbose:
print "ADDR =", addr
print "CLASS =", svrcls
t = ServerThread(addr, svrcls, hdlrcls)
if verbose: print "server created"
t.start()
if verbose: print "server running"
for i in range(NREQ):
time.sleep(DELAY)
if verbose: print "test client", i
testfunc(proto, addr)
if verbose: print "waiting for server"
t.join()
if verbose: print "done"
class ForgivingTCPServer(TCPServer):
# prevent errors if another process is using the port we want
def server_bind(self):
host, default_port = self.server_address
# this code shamelessly stolen from test.test_support
# the ports were changed to protect the innocent
import sys
for port in [default_port, 3434, 8798, 23833]:
try:
self.server_address = host, port
TCPServer.server_bind(self)
break
except socket.error, (err, msg):
if err != errno.EADDRINUSE:
raise
print >>sys.__stderr__, \
' WARNING: failed to listen on port %d, trying another' % port
tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
tcpservers.append(ForkingTCPServer)
udpservers = [UDPServer, ThreadingUDPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
udpservers.append(ForkingUDPServer)
if not hasattr(socket, 'AF_UNIX'):
streamservers = []
dgramservers = []
else:
class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
streamservers.append(ForkingUnixStreamServer)
class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
dgramservers.append(ForkingUnixDatagramServer)
def sloppy_cleanup():
# See http://python.org/sf/1540386
# We need to reap children here otherwise a child from one server
# can be left running for the next server and cause a test failure.
time.sleep(DELAY)
reap_children()
def testall():
testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
sloppy_cleanup()
testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
if hasattr(socket, 'AF_UNIX'):
sloppy_cleanup()
testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
def test_main():
import imp
if imp.lock_held():
# If the import lock is held, the threads will hang.
raise TestSkipped("can't run when import lock is held")
try:
testall()
finally:
cleanup()
reap_children()
if __name__ == "__main__":
test_main()
This diff is collapsed.
# Very rudimentary test of thread module
# Create a bunch of threads, let each do some work, wait until all are done
from test.test_support import verbose
import random
import thread
import time
mutex = thread.allocate_lock()
rmutex = thread.allocate_lock() # for calls to random
running = 0
done = thread.allocate_lock()
done.acquire()
numtasks = 10
def task(ident):
global running
rmutex.acquire()
delay = random.random() * numtasks / 10.
rmutex.release()
if verbose:
print 'task', ident, 'will run for', round(delay, 1), 'sec'
time.sleep(delay)
if verbose:
print 'task', ident, 'done'
mutex.acquire()
running = running - 1
if running == 0:
done.release()
mutex.release()
next_ident = 0
def newtask():
global next_ident, running
mutex.acquire()
next_ident = next_ident + 1
if verbose:
print 'creating task', next_ident
thread.start_new_thread(task, (next_ident,))
running = running + 1
mutex.release()
for i in range(numtasks):
newtask()
print 'waiting for all tasks to complete'
done.acquire()
print 'all tasks done'
class barrier:
def __init__(self, n):
self.n = n
self.waiting = 0
self.checkin = thread.allocate_lock()
self.checkout = thread.allocate_lock()
self.checkout.acquire()
def enter(self):
checkin, checkout = self.checkin, self.checkout
checkin.acquire()
self.waiting = self.waiting + 1
if self.waiting == self.n:
self.waiting = self.n - 1
checkout.release()
return
checkin.release()
checkout.acquire()
self.waiting = self.waiting - 1
if self.waiting == 0:
checkin.release()
return
checkout.release()
numtrips = 3
def task2(ident):
global running
for i in range(numtrips):
if ident == 0:
# give it a good chance to enter the next
# barrier before the others are all out
# of the current one
delay = 0.001
else:
rmutex.acquire()
delay = random.random() * numtasks / 10.
rmutex.release()
if verbose:
print 'task', ident, 'will run for', round(delay, 1), 'sec'
time.sleep(delay)
if verbose:
print 'task', ident, 'entering barrier', i
bar.enter()
if verbose:
print 'task', ident, 'leaving barrier', i
mutex.acquire()
running -= 1
# Must release mutex before releasing done, else the main thread can
# exit and set mutex to None as part of global teardown; then
# mutex.release() raises AttributeError.
finished = running == 0
mutex.release()
if finished:
done.release()
print '\n*** Barrier Test ***'
if done.acquire(0):
raise ValueError, "'done' should have remained acquired"
bar = barrier(numtasks)
running = numtasks
for i in range(numtasks):
thread.start_new_thread(task2, (i,))
done.acquire()
print 'all tasks done'
# not all platforms support changing thread stack size
print '\n*** Changing thread stack size ***'
if thread.stack_size() != 0:
raise ValueError, "initial stack_size not 0"
thread.stack_size(0)
if thread.stack_size() != 0:
raise ValueError, "stack_size not reset to default"
from os import name as os_name
if os_name in ("nt", "os2", "posix"):
tss_supported = 1
try:
thread.stack_size(4096)
except ValueError:
print 'caught expected ValueError setting stack_size(4096)'
except thread.error:
tss_supported = 0
print 'platform does not support changing thread stack size'
if tss_supported:
failed = lambda s, e: s != e
fail_msg = "stack_size(%d) failed - should succeed"
for tss in (262144, 0x100000, 0):
thread.stack_size(tss)
if failed(thread.stack_size(), tss):
raise ValueError, fail_msg % tss
print 'successfully set stack_size(%d)' % tss
for tss in (262144, 0x100000):
print 'trying stack_size = %d' % tss
next_ident = 0
for i in range(numtasks):
newtask()
print 'waiting for all tasks to complete'
done.acquire()
print 'all tasks done'
# reset stack size to default
thread.stack_size(0)
This diff is collapsed.
import gc
import threading
import unittest
from doctest import DocTestSuite
from test import test_support
class ThreadingLocalTest(unittest.TestCase):
def test_derived(self):
# Issue 3088: if there is a threads switch inside the __init__
# of a threading.local derived class, the per-thread dictionary
# is created but not correctly set on the object.
# The first member set may be bogus.
import time
class Local(threading.local):
def __init__(self):
time.sleep(0.01)
local = Local()
def f(i):
local.x = i
# Simply check that the variable is correctly set
self.assertEqual(local.x, i)
threads= []
for i in range(10):
t = threading.Thread(target=f, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
def test_derived_cycle_dealloc(self):
# http://bugs.python.org/issue6990
class Local(threading.local):
pass
locals = None
passed = [False]
e1 = threading.Event()
e2 = threading.Event()
def f():
# 1) Involve Local in a cycle
cycle = [Local()]
cycle.append(cycle)
cycle[0].foo = 'bar'
# 2) GC the cycle (triggers threadmodule.c::local_clear
# before local_dealloc)
del cycle
gc.collect()
e1.set()
e2.wait()
# 4) New Locals should be empty
passed[0] = all(not hasattr(local, 'foo') for local in locals)
t = threading.Thread(target=f)
t.start()
e1.wait()
# 3) New Locals should recycle the original's address. Creating
# them in the thread overwrites the thread state and avoids the
# bug
locals = [Local() for i in range(10)]
e2.set()
t.join()
self.assertTrue(passed[0])
def test_main():
suite = DocTestSuite('_threading_local')
try:
from thread import _local
except ImportError:
pass
else:
import _threading_local
local_orig = _threading_local.local
def setUp(test):
_threading_local.local = _local
def tearDown(test):
_threading_local.local = local_orig
suite.addTest(DocTestSuite('_threading_local',
setUp=setUp, tearDown=tearDown)
)
suite.addTest(unittest.makeSuite(ThreadingLocalTest))
test_support.run_suite(suite)
if __name__ == '__main__':
test_main()
"""Unit tests for socket timeout feature."""
import unittest
from test import test_support
# This requires the 'network' resource as given on the regrtest command line.
skip_expected = not test_support.is_resource_enabled('network')
import time
import socket
class CreationTestCase(unittest.TestCase):
"""Test case for socket.gettimeout() and socket.settimeout()"""
def setUp(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def tearDown(self):
self.sock.close()
def testObjectCreation(self):
# Test Socket creation
self.assertEqual(self.sock.gettimeout(), None,
"timeout not disabled by default")
def testFloatReturnValue(self):
# Test return value of gettimeout()
self.sock.settimeout(7.345)
self.assertEqual(self.sock.gettimeout(), 7.345)
self.sock.settimeout(3)
self.assertEqual(self.sock.gettimeout(), 3)
self.sock.settimeout(None)
self.assertEqual(self.sock.gettimeout(), None)
def testReturnType(self):
# Test return type of gettimeout()
self.sock.settimeout(1)
self.assertEqual(type(self.sock.gettimeout()), type(1.0))
self.sock.settimeout(3.9)
self.assertEqual(type(self.sock.gettimeout()), type(1.0))
def testTypeCheck(self):
# Test type checking by settimeout()
self.sock.settimeout(0)
self.sock.settimeout(0L)
self.sock.settimeout(0.0)
self.sock.settimeout(None)
self.assertRaises(TypeError, self.sock.settimeout, "")
self.assertRaises(TypeError, self.sock.settimeout, u"")
self.assertRaises(TypeError, self.sock.settimeout, ())
self.assertRaises(TypeError, self.sock.settimeout, [])
self.assertRaises(TypeError, self.sock.settimeout, {})
self.assertRaises(TypeError, self.sock.settimeout, 0j)
def testRangeCheck(self):
# Test range checking by settimeout()
self.assertRaises(ValueError, self.sock.settimeout, -1)
self.assertRaises(ValueError, self.sock.settimeout, -1L)
self.assertRaises(ValueError, self.sock.settimeout, -1.0)
def testTimeoutThenBlocking(self):
# Test settimeout() followed by setblocking()
self.sock.settimeout(10)
self.sock.setblocking(1)
self.assertEqual(self.sock.gettimeout(), None)
self.sock.setblocking(0)
self.assertEqual(self.sock.gettimeout(), 0.0)
self.sock.settimeout(10)
self.sock.setblocking(0)
self.assertEqual(self.sock.gettimeout(), 0.0)
self.sock.setblocking(1)
self.assertEqual(self.sock.gettimeout(), None)
def testBlockingThenTimeout(self):
# Test setblocking() followed by settimeout()
self.sock.setblocking(0)
self.sock.settimeout(1)
self.assertEqual(self.sock.gettimeout(), 1)
self.sock.setblocking(1)
self.sock.settimeout(1)
self.assertEqual(self.sock.gettimeout(), 1)
class TimeoutTestCase(unittest.TestCase):
"""Test case for socket.socket() timeout functions"""
# There are a number of tests here trying to make sure that an operation
# doesn't take too much longer than expected. But competing machine
# activity makes it inevitable that such tests will fail at times.
# When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K
# and Win98SE. Boosting it to 2.0 helped a lot, but isn't a real
# solution.
fuzz = 2.0
def setUp(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addr_remote = ('www.python.org.', 80)
self.addr_local = ('127.0.0.1', 25339)
def tearDown(self):
self.sock.close()
def testConnectTimeout(self):
# Test connect() timeout
_timeout = 0.001
self.sock.settimeout(_timeout)
# If we are too close to www.python.org, this test will fail.
# Pick a host that should be farther away.
if (socket.getfqdn().split('.')[-2:] == ['python', 'org'] or
socket.getfqdn().split('.')[-2:-1] == ['xs4all']):
self.addr_remote = ('tut.fi', 80)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.connect,
self.addr_remote)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is more than %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testRecvTimeout(self):
# Test recv() timeout
_timeout = 0.02
self.sock.connect(self.addr_remote)
self.sock.settimeout(_timeout)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.recv, 1024)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testAcceptTimeout(self):
# Test accept() timeout
_timeout = 2
self.sock.settimeout(_timeout)
self.sock.bind(self.addr_local)
self.sock.listen(5)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.accept)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testRecvfromTimeout(self):
# Test recvfrom() timeout
_timeout = 2
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(_timeout)
self.sock.bind(self.addr_local)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.recvfrom, 8192)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testSend(self):
# Test send() timeout
# couldn't figure out how to test it
pass
def testSendto(self):
# Test sendto() timeout
# couldn't figure out how to test it
pass
def testSendall(self):
# Test sendall() timeout
# couldn't figure out how to test it
pass
def test_main():
test_support.requires('network')
test_support.run_unittest(CreationTestCase, TimeoutTestCase)
if __name__ == "__main__":
test_main()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -16,11 +16,6 @@ win32 * C:\Python27\python.exe -u -m monkey_test test_subprocess.py
win32 * C:\Python27\python.exe -u -m monkey_test --Event test_subprocess.py
# these need investigating:
* * .*/python2.5(-dbg)? -u -m monkey_test --Event test_urllib2net.py
* * .*/python2.5(-dbg)? -u -m monkey_test test_urllib2net.py
* * .*/python2.5(-dbg)? -u test__threading_vs_settrace.py
* * .*/python2.5(-dbg)? -u test__example_portforwarder.py
* * .*/python2.5(-dbg)? -u test__socket_close.py
* * * -u test__issue6.py
# bunch of SSLError: [Errno 1] _ssl.c:504: error:14090086:SSL routines:SSL3_GET_SERVER_CERTIFICATE:certificate verify failed
......@@ -28,4 +23,4 @@ win32 * C:\Python27\python.exe -u -m monkey_test --Event test_subprocess.py
* * * -u -m monkey_test --Event test_ssl.py
* * * -u -m monkey_test test_ssl.py
* * /usr/bin/python2.[567]-dbg -u test__backdoor.py
* * /usr/bin/python2.[67]-dbg -u test__backdoor.py
......@@ -300,7 +300,6 @@ def run_setup(ext_modules):
install_requires=['greenlet'],
classifiers=[
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 2.5",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Operating System :: MacOS :: MacOS X",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment