Commit c5a1efea authored by Vincent Pelletier's avatar Vincent Pelletier

Extend r2641.

Move queue flushing to dispatcher.
Use forget_queue in "undo" as well.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2642 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent fbed3e71
...@@ -817,11 +817,6 @@ class Application(object): ...@@ -817,11 +817,6 @@ class Application(object):
self._getMasterConnection().notify(p) self._getMasterConnection().notify(p)
queue = self.local_var.queue queue = self.local_var.queue
self.dispatcher.forget_queue(queue) self.dispatcher.forget_queue(queue)
while True:
try:
queue.get(block=False)
except Empty:
break
self.local_var.clear() self.local_var.clear()
@profiler_decorator @profiler_decorator
...@@ -917,16 +912,11 @@ class Application(object): ...@@ -917,16 +912,11 @@ class Application(object):
# Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError, # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
# meaning that objects in transaction's oid_list do not exist any # meaning that objects in transaction's oid_list do not exist any
# longer. This is the symptom of a pack, so forbid undoing transaction # longer. This is the symptom of a pack, so forbid undoing transaction
# when it happens, but sill keep waiting for answers. # when it happens.
failed = False try:
while True: self.waitResponses()
try: except NEOStorageNotFoundError:
self.waitResponses() self.dispatcher.forget_queue(queue)
except NEOStorageNotFoundError:
failed = True
else:
break
if failed:
raise UndoError('non-undoable transaction') raise UndoError('non-undoable transaction')
# Send undo data to all storage nodes. # Send undo data to all storage nodes.
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.lib.locking import Lock from neo.lib.locking import Lock, Empty
from neo.lib.profiling import profiler_decorator from neo.lib.profiling import profiler_decorator
EMPTY = {} EMPTY = {}
NOBODY = [] NOBODY = []
...@@ -133,11 +133,13 @@ class Dispatcher: ...@@ -133,11 +133,13 @@ class Dispatcher:
@giant_lock @giant_lock
@profiler_decorator @profiler_decorator
def forget_queue(self, queue): def forget_queue(self, queue, flush_queue=True):
""" """
Forget all pending messages for given queue. Forget all pending messages for given queue.
Actually makes them "expected by nobody", so we know we can ignore Actually makes them "expected by nobody", so we know we can ignore
them, and not detect it as an error. them, and not detect it as an error.
flush_queue (boolean, default=True)
All packets in queue get flushed.
""" """
# XXX: expensive lookup: we iterate over the whole dict # XXX: expensive lookup: we iterate over the whole dict
found = 0 found = 0
...@@ -150,6 +152,13 @@ class Dispatcher: ...@@ -150,6 +152,13 @@ class Dispatcher:
if refcount != found: if refcount != found:
raise ValueError('We hit a refcount bug: %s queue uses ' \ raise ValueError('We hit a refcount bug: %s queue uses ' \
'expected, %s found' % (refcount, found)) 'expected, %s found' % (refcount, found))
if flush_queue:
get = queue.get
while True:
try:
get(block=False)
except Empty:
break
@profiler_decorator @profiler_decorator
def registered(self, conn): def registered(self, conn):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment