Commit fbed3e71 authored by Vincent Pelletier's avatar Vincent Pelletier

Don't wait for all answers when aborting.

Instead, forget them at dispatcher level, and empty queue (in case
answers were received but not handled yet).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2641 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 3cee6a15
...@@ -815,22 +815,13 @@ class Application(object): ...@@ -815,22 +815,13 @@ class Application(object):
'storage node %r of abortion, ignoring.', 'storage node %r of abortion, ignoring.',
conn, exc_info=1) conn, exc_info=1)
self._getMasterConnection().notify(p) self._getMasterConnection().notify(p)
# Just wait for responses to arrive. If any leads to an exception,
# log it and continue: we *must* eat all answers to not disturb the
# next transaction.
queue = self.local_var.queue queue = self.local_var.queue
pending = self.dispatcher.pending self.dispatcher.forget_queue(queue)
_waitAnyMessage = self._waitAnyMessage while True:
while pending(queue):
try: try:
_waitAnyMessage() queue.get(block=False)
except: except Empty:
neo.lib.logging.error( break
'Exception in tpc_abort while' \
'handling pending answers, ignoring.',
exc_info=1)
self.local_var.clear() self.local_var.clear()
@profiler_decorator @profiler_decorator
......
...@@ -131,6 +131,26 @@ class Dispatcher: ...@@ -131,6 +131,26 @@ class Dispatcher:
message_table[msg_id] = NOBODY message_table[msg_id] = NOBODY
return queue return queue
@giant_lock
@profiler_decorator
def forget_queue(self, queue):
"""
Forget all pending messages for given queue.
Actually makes them "expected by nobody", so we know we can ignore
them, and not detect it as an error.
"""
# XXX: expensive lookup: we iterate over the whole dict
found = 0
for message_table in self.message_table.itervalues():
for msg_id, t_queue in message_table.iteritems():
if queue is t_queue:
found += 1
message_table[msg_id] = NOBODY
refcount = self.queue_dict.pop(id(queue), 0)
if refcount != found:
raise ValueError('We hit a refcount bug: %s queue uses ' \
'expected, %s found' % (refcount, found))
@profiler_decorator @profiler_decorator
def registered(self, conn): def registered(self, conn):
"""Check if a connection is registered into message table.""" """Check if a connection is registered into message table."""
......
...@@ -621,6 +621,9 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -621,6 +621,9 @@ class ClientApplicationTests(NeoUnitTestBase):
class Dispatcher(object): class Dispatcher(object):
def pending(self, queue): def pending(self, queue):
return not queue.empty() return not queue.empty()
def forget_queue(self, queue):
pass
app.dispatcher = Dispatcher() app.dispatcher = Dispatcher()
# conflict occurs on storage 2 # conflict occurs on storage 2
app.store(oid1, tid, 'DATA', None, txn) app.store(oid1, tid, 'DATA', None, txn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment