Commit 9281211e authored by Vincent Pelletier's avatar Vincent Pelletier

Rewrite Dispatcher class.

Makes it thread-safe (with detailed explanation on for each method).
Makes it more efficient for unregister and registered uses.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1689 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 7031a421
......@@ -15,43 +15,66 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.locking import Lock
MARKER = []
EMPTY = {}
# This class is thread safe:
# - pop:
# We don't modify outer mapping.
# Inner mapping if accessed at most once by using a python primitive, so GIL
# ensures atomicity.
# - register:
# We protect modification in outer mapping by a lock.
# Inner mapping is accessed at most once by using a python primitive ('='
# operator), so GIL ensures atomicity.
# - unregister:
# We protect modification in outer mapping by a lock.
# Inner mapping access is done after detaching it from outer mapping in a
# thread-safe manner, so access doesn't need to worry about concurency.
# - registered:
# Nothing is modified in any structure, so there is not much to worry about
# concurency here. Note though that, by nature (read-only), this method can
# cause concurency problems in caller, depending on how it interprets the
# return value.
class Dispatcher:
"""Register a packet, connection pair as expecting a response packet."""
def __init__(self):
self.message_table = {}
lock = Lock()
self.message_table_lock_acquire = lock.acquire
self.message_table_lock_release = lock.release
def pop(self, conn, msg_id, default=MARKER):
"""Retrieve register-time provided payload."""
key = (id(conn), msg_id)
result = self.message_table.get(id(conn), EMPTY).pop(msg_id, default)
if default is MARKER:
result = self.message_table.pop(key)
else:
result = self.message_table.pop(key, default)
raise KeyError, (id(conn), msg_id)
return result
def register(self, conn, msg_id, payload):
"""Register an expectation for a reply. Thanks to GIL, it is
safe not to use a lock here."""
key = (id(conn), msg_id)
self.message_table[key] = payload
"""Register an expectation for a reply."""
self.message_table_lock_acquire()
try:
message_table = self.message_table.setdefault(id(conn), {})
finally:
self.message_table_lock_release()
message_table[msg_id] = payload
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads bloking it them """
# XXX: not thread-safe !
for key in self.message_table.keys():
if id(conn) == key[0]:
queue = self.message_table.pop(key)
queue.put((conn, None))
threads excepting responses from that connection """
self.message_table_lock_acquire()
try:
message_table = self.message_table.pop(id(conn), EMPTY)
finally:
self.message_table_lock_release()
for queue in message_table.itervalues():
queue.put((conn, None))
def registered(self, conn):
"""Check if a connection is registered into message table."""
searched_id = id(conn)
for conn_id, msg_id in self.message_table.iterkeys():
if searched_id == conn_id:
return True
return False
return len(self.message_table.get(id(conn), EMPTY)) != 0
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment