Commit ce42103a authored by Julien Muchembled's avatar Julien Muchembled

storage: fix write-lock leak

parent 04ae2fc0
...@@ -62,7 +62,8 @@ class BaseMasterHandler(BaseHandler): ...@@ -62,7 +62,8 @@ class BaseMasterHandler(BaseHandler):
elif node_type == NodeTypes.CLIENT and state != NodeStates.RUNNING: elif node_type == NodeTypes.CLIENT and state != NodeStates.RUNNING:
logging.info('Notified of non-running client, abort (%s)', logging.info('Notified of non-running client, abort (%s)',
uuid_str(uuid)) uuid_str(uuid))
self.app.tm.abortFor(uuid) # See comment in ClientOperationHandler.connectionClosed
self.app.tm.abortFor(uuid, even_if_voted=True)
def notifyPartitionChanges(self, conn, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
......
...@@ -29,6 +29,24 @@ SLOW_STORE = 2 ...@@ -29,6 +29,24 @@ SLOW_STORE = 2
class ClientOperationHandler(BaseHandler): class ClientOperationHandler(BaseHandler):
def connectionClosed(self, conn):
logging.debug('connection closed for %r', conn)
app = self.app
if app.operational:
# Even if in most cases, abortFor is called from both this method
# and BaseMasterHandler.notifyPartitionChanges (especially since
# storage nodes disconnects unknown clients on their own), these 2
# handlers also cover distinct scenarios, so neither of them is
# redundant:
# - A client node may have network issues with this particular
# storage node and remain RUNNING: we may still be involved in
# the second phase so we only abort non-voted transactions here.
# By not taking part to any further deadlock avoidance,
# not releasing write-locks now would lead to a deadlock.
# - A client node may be disconnected from the master, whereas
# there are still voted (and not locked) transactions to abort.
app.tm.abortFor(conn.getUUID())
def askTransactionInformation(self, conn, tid): def askTransactionInformation(self, conn, tid):
t = self.app.dm.getTransaction(tid) t = self.app.dm.getTransaction(tid)
if t is None: if t is None:
......
...@@ -573,14 +573,14 @@ class TransactionManager(EventQueue): ...@@ -573,14 +573,14 @@ class TransactionManager(EventQueue):
self.read_queue.executeQueuedEvents() self.read_queue.executeQueuedEvents()
self.executeQueuedEvents() self.executeQueuedEvents()
def abortFor(self, uuid): def abortFor(self, uuid, even_if_voted=False):
""" """
Abort any non-locked transaction of a node Abort any non-locked transaction of a node
""" """
logging.debug('Abort for %s', uuid_str(uuid))
# abort any non-locked transaction of this node # abort any non-locked transaction of this node
for ttid, transaction in self._transaction_dict.items(): for ttid, transaction in self._transaction_dict.items():
if transaction.uuid == uuid: if transaction.uuid == uuid and (
even_if_voted or not transaction.voted):
self.abort(ttid) self.abort(ttid)
def isLockedTid(self, tid): def isLockedTid(self, tid):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment