Commit 3b4d8542 authored by Vincent Pelletier's avatar Vincent Pelletier

Use explicit locking.

This fixes problems uncovered by current work on askStoreObject pipelining.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1771 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 38bcfed0
......@@ -19,24 +19,14 @@ from neo.locking import Lock
MARKER = []
EMPTY = {}
# This class is thread safe:
# - pop:
# We don't modify outer mapping.
# Inner mapping if accessed at most once by using a python primitive, so GIL
# ensures atomicity.
# - register:
# We protect modification in outer mapping by a lock.
# Inner mapping is accessed at most once by using a python primitive ('='
# operator), so GIL ensures atomicity.
# - unregister:
# We protect modification in outer mapping by a lock.
# Inner mapping access is done after detaching it from outer mapping in a
# thread-safe manner, so access doesn't need to worry about concurency.
# - registered:
# Nothing is modified in any structure, so there is not much to worry about
# concurency here. Note though that, by nature (read-only), this method can
# cause concurency problems in caller, depending on how it interprets the
# return value.
def giant_lock(func):
def wrapped(self, *args, **kw):
self.lock_acquire()
try:
return func(self, *args, **kw)
finally:
self.lock_release()
return wrapped
class Dispatcher:
"""Register a packet, connection pair as expecting a response packet."""
......@@ -44,9 +34,10 @@ class Dispatcher:
def __init__(self):
self.message_table = {}
lock = Lock()
self.message_table_lock_acquire = lock.acquire
self.message_table_lock_release = lock.release
self.lock_acquire = lock.acquire
self.lock_release = lock.release
@giant_lock
def pop(self, conn, msg_id, default=MARKER):
"""Retrieve register-time provided payload."""
result = self.message_table.get(id(conn), EMPTY).pop(msg_id, default)
......@@ -54,22 +45,19 @@ class Dispatcher:
raise KeyError, (id(conn), msg_id)
return result
def register(self, conn, msg_id, payload):
@giant_lock
def register(self, conn, msg_id, queue):
"""Register an expectation for a reply."""
self.message_table_lock_acquire()
try:
message_table = self.message_table.setdefault(id(conn), {})
finally:
self.message_table_lock_release()
message_table[msg_id] = payload
self.message_table.setdefault(id(conn), {})[msg_id] = queue
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads excepting responses from that connection """
self.message_table_lock_acquire()
self.lock_acquire()
try:
message_table = self.message_table.pop(id(conn), EMPTY)
finally:
self.lock_release()
notified_set = set()
for queue in message_table.itervalues():
queue_id = id(queue)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment