Commit 9b33b1db authored by Julien Muchembled's avatar Julien Muchembled

Fix oids remaining write-locked forever

This happened in 2 cases:
- Commit a4c06242 ("Review aborting of
  transactions") introduced a race condition causing oids to remain
  write-locked forever after that the transaction modifying them is aborted.
- An unfinished transaction is not locked/unlocked during tpc_finish: oids
  must be unlocked when being notified that the transaction is finished.
parent 7f754b5e
...@@ -549,6 +549,24 @@ class Application(ThreadedApplication): ...@@ -549,6 +549,24 @@ class Application(ThreadedApplication):
txn_context = self._txn_container.pop(transaction) txn_context = self._txn_container.pop(transaction)
if txn_context is None: if txn_context is None:
return return
# We want that the involved nodes abort a transaction after any
# other packet sent by the client for this transaction. IOW, if we
# already have a connection with a storage node, potentially with
# a pending write, aborting only via the master may lead to a race
# condition. The consequence would be that storage nodes lock oids
# forever.
p = Packets.AbortTransaction(txn_context.ttid, ())
for uuid in txn_context.involved_nodes:
try:
self.cp.connection_dict[uuid].notify(p)
except (KeyError, ConnectionClosed):
pass
# Because we want to be sure that the involved nodes are notified,
# we still have to send the full list to the master. Most of the
# time, the storage nodes get 2 AbortTransaction packets, and the
# second one is rarely useful. Another option would be that the
# storage nodes keep a list of aborted transactions, but the
# difficult part would be to avoid a memory leak.
try: try:
notify = self.master_conn.notify notify = self.master_conn.notify
except AttributeError: except AttributeError:
......
...@@ -20,7 +20,7 @@ import traceback ...@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO from cStringIO import StringIO
from struct import Struct from struct import Struct
PROTOCOL_VERSION = 10 PROTOCOL_VERSION = 11
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
...@@ -1033,11 +1033,11 @@ class StoreObject(Packet): ...@@ -1033,11 +1033,11 @@ class StoreObject(Packet):
class AbortTransaction(Packet): class AbortTransaction(Packet):
""" """
Abort a transaction. C -> PM -> S. Abort a transaction. C -> S and C -> PM -> S.
""" """
_fmt = PStruct('abort_transaction', _fmt = PStruct('abort_transaction',
PTID('tid'), PTID('tid'),
PFUUIDList, # unused for PM -> S PFUUIDList, # unused for * -> S
) )
class StoreTransaction(Packet): class StoreTransaction(Packet):
......
...@@ -20,7 +20,17 @@ from neo.lib.handler import EventHandler ...@@ -20,7 +20,17 @@ from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, StoppedOperation from neo.lib.exception import PrimaryFailure, StoppedOperation
from neo.lib.protocol import uuid_str, NodeStates, NodeTypes, Packets from neo.lib.protocol import uuid_str, NodeStates, NodeTypes, Packets
class BaseMasterHandler(EventHandler): class BaseHandler(EventHandler):
def notifyTransactionFinished(self, conn, ttid, max_tid):
app = self.app
app.tm.abort(ttid)
app.replicator.transactionFinished(ttid, max_tid)
def abortTransaction(self, conn, ttid, _):
self.notifyTransactionFinished(conn, ttid, None)
class BaseMasterHandler(BaseHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
if self.app.listening_conn: # if running if self.app.listening_conn: # if running
......
...@@ -15,18 +15,19 @@ ...@@ -15,18 +15,19 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import DelayEvent, EventHandler from neo.lib.handler import DelayEvent
from neo.lib.util import dump, makeChecksum, add64 from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, ProtocolError, \ from neo.lib.protocol import Packets, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, NotRegisteredError from ..transactions import ConflictError, NotRegisteredError
from . import BaseHandler
import time import time
# Log stores taking (incl. lock delays) more than this many seconds. # Log stores taking (incl. lock delays) more than this many seconds.
# Set to None to disable. # Set to None to disable.
SLOW_STORE = 2 SLOW_STORE = 2
class ClientOperationHandler(EventHandler): class ClientOperationHandler(BaseHandler):
def askTransactionInformation(self, conn, tid): def askTransactionInformation(self, conn, tid):
t = self.app.dm.getTransaction(tid) t = self.app.dm.getTransaction(tid)
......
...@@ -31,9 +31,6 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -31,9 +31,6 @@ class MasterOperationHandler(BaseMasterHandler):
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID) dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit() dm.commit()
def notifyTransactionFinished(self, conn, *args):
self.app.replicator.transactionFinished(*args)
def notifyPartitionChanges(self, conn, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
...@@ -57,10 +54,6 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -57,10 +54,6 @@ class MasterOperationHandler(BaseMasterHandler):
def notifyUnlockInformation(self, conn, ttid): def notifyUnlockInformation(self, conn, ttid):
self.app.tm.unlock(ttid) self.app.tm.unlock(ttid)
def abortTransaction(self, conn, ttid, _):
self.app.tm.abort(ttid)
self.app.replicator.transactionFinished(ttid)
def askPack(self, conn, tid): def askPack(self, conn, tid):
app = self.app app = self.app
logging.info('Pack started, up to %s...', dump(tid)) logging.info('Pack started, up to %s...', dump(tid))
......
...@@ -1567,6 +1567,21 @@ class Test(NEOThreadedTest): ...@@ -1567,6 +1567,21 @@ class Test(NEOThreadedTest):
self.tic() self.tic()
self.assertPartitionTable(cluster, pt) self.assertPartitionTable(cluster, pt)
@with_cluster()
def testAbortTransaction(self, cluster):
t, c = cluster.getTransaction()
r = c.root()
r._p_changed = 1
def abort(_):
raise Exception
TransactionalResource(t, 0, tpc_vote=abort)
with cluster.client.filterConnection(cluster.storage) as cs:
cs.delayAskStoreObject()
self.assertRaises(Exception, t.commit)
t.begin()
r._p_changed = 1
t.commit()
@with_cluster(replicas=1) @with_cluster(replicas=1)
def testPartialConflict(self, cluster): def testPartialConflict(self, cluster):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment