Commit afe5297b authored by Tim Peters's avatar Tim Peters

Semantic-neutral format and comment changes.

parent 8623b36e
# Some simple Queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable
# to ensure the Queue locks remain stable.
import Queue
import sys
import threading
......@@ -7,21 +7,23 @@ import time
from test.test_support import verify, TestFailed, verbose
queue_size = 5
QUEUE_SIZE = 5
# Execute a function that blocks, and in a seperate thread, a function that
# triggers the release. Returns the result of the blocking function.
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading.Thread):
def __init__(self, fn, args):
self.fn = fn
self.args = args
self.startedEvent = threading.Event()
threading.Thread.__init__(self)
def run(self):
time.sleep(.1)
self.startedEvent.set()
self.fn(*self.args)
# Execute a function that blocks, and in a seperate thread, a function that
# triggers the release. Returns the result of the blocking function.
def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
......@@ -60,7 +62,7 @@ class FailingQueue(Queue.Queue):
def FailingQueueTest(q):
if not q.empty():
raise RuntimeError, "Call this function with an empty queue"
for i in range(queue_size-1):
for i in range(QUEUE_SIZE-1):
q.put(i)
# Test a failing non-blocking put.
q.fail_next_put = True
......@@ -80,7 +82,7 @@ def FailingQueueTest(q):
# Test a failing blocking put
q.fail_next_put = True
try:
_doBlockingTest( q.put, ("full",), q.get, ())
_doBlockingTest(q.put, ("full",), q.get, ())
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
......@@ -90,7 +92,7 @@ def FailingQueueTest(q):
# Test a failing timeout put
q.fail_next_put = True
try:
_doBlockingTest( q.put, ("full", True, 0.2), q.get, ())
_doBlockingTest(q.put, ("full", True, 0.2), q.get, ())
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
......@@ -105,7 +107,7 @@ def FailingQueueTest(q):
# Test a blocking put
_doBlockingTest( q.put, ("full",), q.get, ())
# Empty it
for i in range(queue_size):
for i in range(QUEUE_SIZE):
q.get()
verify(q.empty(), "Queue should be empty")
q.put("first")
......@@ -144,7 +146,7 @@ def SimpleQueueTest(q):
q.put(222)
verify(q.get() == 111 and q.get() == 222,
"Didn't seem to queue the correct data!")
for i in range(queue_size-1):
for i in range(QUEUE_SIZE-1):
q.put(i)
verify(not q.full(), "Queue should not be full")
q.put("last")
......@@ -160,10 +162,10 @@ def SimpleQueueTest(q):
except Queue.Full:
pass
# Test a blocking put
_doBlockingTest( q.put, ("full",), q.get, ())
_doBlockingTest( q.put, ("full", True, 0.2), q.get, ())
_doBlockingTest(q.put, ("full",), q.get, ())
_doBlockingTest(q.put, ("full", True, 0.2), q.get, ())
# Empty it
for i in range(queue_size):
for i in range(QUEUE_SIZE):
q.get()
verify(q.empty(), "Queue should be empty")
try:
......@@ -181,13 +183,13 @@ def SimpleQueueTest(q):
_doBlockingTest(q.get, (True, 0.2), q.put, ('empty',))
def test():
q=Queue.Queue(queue_size)
q = Queue.Queue(QUEUE_SIZE)
# Do it a couple of times on the same queue
SimpleQueueTest(q)
SimpleQueueTest(q)
if verbose:
print "Simple Queue tests seemed to work"
q = FailingQueue(queue_size)
q = FailingQueue(QUEUE_SIZE)
FailingQueueTest(q)
FailingQueueTest(q)
if verbose:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment