Commit 4882da74 authored by Chris McDonough's avatar Chris McDonough

Jim commented this code at the PyCon DC 2004 sprint.

parent 6077a3ad
...@@ -15,49 +15,87 @@ import thread ...@@ -15,49 +15,87 @@ import thread
from ZServerPublisher import ZServerPublisher from ZServerPublisher import ZServerPublisher
class ZRendevous: class ZRendevous:
"""Worker thread pool
For better or worse, we hide locking sementics from the worker
threads. The worker threads do no locking.
"""
def __init__(self, n=1): def __init__(self, n=1):
sync=thread.allocate_lock() sync = thread.allocate_lock()
self._a=sync.acquire self._acquire = sync.acquire
self._r=sync.release self._release = sync.release
pool=[] pool = []
self._lists=pool, [], [] self._lists = (
self._a() pool, # Collection of locks representing threads are not
# waiting for work to do
[], # Request queue
[], # Pool of locks representing threads that are
# waiting (ready) for work to do.
)
self._acquire() # callers will block
try: try:
while n > 0: while n > 0:
l=thread.allocate_lock() l = thread.allocate_lock()
l.acquire() l.acquire()
pool.append(l) pool.append(l)
thread.start_new_thread(ZServerPublisher, thread.start_new_thread(ZServerPublisher,
(self.accept,)) (self.accept,))
n=n-1 n = n-1
finally: self._r() finally:
self._release() # let callers through now
def accept(self): def accept(self):
self._a() """Return a request from the request queue
If no requests are in the request queue, then block until
there is nonw.
"""
self._acquire() # prevent other calls to protect data structures
try: try:
pool, requests, ready = self._lists pool, requests, ready = self._lists
while not requests: while not requests:
l=pool[-1] # There are no requests in the queue. Wait until there are.
del pool[-1]
# This thread is waiting, to remove a lock from the collection
# of locks corresponding to threads not waiting for work
l = pool.pop()
# And add it to the collection of locks representing threads
# ready and waiting for work.
ready.append(l) ready.append(l)
self._r() self._release() # allow other calls
# Now try to acquire the lock. We will block until
# someone calls handle to queue a request and releases the lock
# which handle finds in the ready queue
l.acquire() l.acquire()
self._a()
self._acquire() # prevent calls so we can update
# not waiting pool
pool.append(l) pool.append(l)
r=requests[0] # return the *first* request
del requests[0] return requests.pop(0)
return r
finally: self._r() finally:
self._release() # allow calls
def handle(self, name, request, response): def handle(self, name, request, response):
self._a() """Queue a request for processing
"""
self._acquire() # prevent other calls to protect data structs
try: try:
pool, requests, ready = self._lists pool, requests, ready = self._lists
# queue request
requests.append((name, request, response)) requests.append((name, request, response))
if ready: if ready:
l=ready[-1] # If any threads are ready and waiting for work
del ready[-1] # then remove one of the locks from the ready pool
l.release() # and release it, letting the waiting thread go forward
finally: self._r() # and consume the request
ready.pop().release()
finally:
self._release()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment