Commit 96d124b9 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Abort a transaction on all involved storages.

This fix an issue where some locks were not released on a storage because the
node list used to sent abort notifications was built with the content of
data_dict that don't keep informations about unresolved conflicts.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2176 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 4ac34f7b
...@@ -116,6 +116,7 @@ class ThreadContext(object): ...@@ -116,6 +116,7 @@ class ThreadContext(object):
'asked_object': 0, 'asked_object': 0,
'undo_conflict_oid_list': [], 'undo_conflict_oid_list': [],
'undo_error_oid_list': [], 'undo_error_oid_list': [],
'involved_nodes': set(),
} }
...@@ -607,12 +608,14 @@ class Application(object): ...@@ -607,12 +608,14 @@ class Application(object):
self.local_var.object_serial_dict[oid] = (serial, version) self.local_var.object_serial_dict[oid] = (serial, version)
getConnForCell = self.cp.getConnForCell getConnForCell = self.cp.getConnForCell
queue = self.local_var.queue queue = self.local_var.queue
add_involved_nodes = self.local_var.involved_nodes.add
for cell in cell_list: for cell in cell_list:
conn = getConnForCell(cell) conn = getConnForCell(cell)
if conn is None: if conn is None:
continue continue
try: try:
conn.ask(p, on_timeout=on_timeout, queue=queue) conn.ask(p, on_timeout=on_timeout, queue=queue)
add_involved_nodes(cell.getNode())
except ConnectionClosed: except ConnectionClosed:
continue continue
...@@ -733,6 +736,7 @@ class Application(object): ...@@ -733,6 +736,7 @@ class Application(object):
p = Packets.AskStoreTransaction(tid, str(transaction.user), p = Packets.AskStoreTransaction(tid, str(transaction.user),
str(transaction.description), dumps(transaction._extension), str(transaction.description), dumps(transaction._extension),
local_var.data_dict.keys()) local_var.data_dict.keys())
add_involved_nodes = self.local_var.involved_nodes.add
for cell in self._getCellListForTID(tid, writable=True): for cell in self._getCellListForTID(tid, writable=True):
logging.debug("voting object %s %s", cell.getAddress(), logging.debug("voting object %s %s", cell.getAddress(),
cell.getState()) cell.getState())
...@@ -743,6 +747,7 @@ class Application(object): ...@@ -743,6 +747,7 @@ class Application(object):
local_var.txn_voted = False local_var.txn_voted = False
try: try:
self._askStorage(conn, p) self._askStorage(conn, p)
add_involved_nodes(cell.getNode())
except ConnectionClosed: except ConnectionClosed:
continue continue
...@@ -768,19 +773,10 @@ class Application(object): ...@@ -768,19 +773,10 @@ class Application(object):
return return
tid = self.local_var.tid tid = self.local_var.tid
getCellListForOID = self._getCellListForOID
# select nodes where transaction was stored
node_set = set([x.getNode() for x in self._getCellListForTID(tid,
writable=True)])
# select nodes where objects were stored
for oid in self.local_var.data_dict.iterkeys():
node_set |= set([x.getNode() for x in getCellListForOID(oid,
writable=True)])
p = Packets.AbortTransaction(tid) p = Packets.AbortTransaction(tid)
getConnForNode = self.cp.getConnForNode getConnForNode = self.cp.getConnForNode
# cancel transaction one all those nodes # cancel transaction one all those nodes
for node in node_set: for node in self.local_var.involved_nodes:
conn = getConnForNode(node) conn = getConnForNode(node)
if conn is None: if conn is None:
continue continue
......
...@@ -292,6 +292,9 @@ class NeoTestBase(unittest.TestCase): ...@@ -292,6 +292,9 @@ class NeoTestBase(unittest.TestCase):
def checkInvalidateObjects(self, conn, **kw): def checkInvalidateObjects(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.InvalidateObjects, **kw) return self.checkNotifyPacket(conn, Packets.InvalidateObjects, **kw)
def checkAbortTransaction(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.AbortTransaction, **kw)
def checkAnswerTransactionFinished(self, conn, **kw): def checkAnswerTransactionFinished(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw) return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw)
......
...@@ -605,6 +605,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -605,6 +605,7 @@ class ClientApplicationTests(NeoTestBase):
app.cp = Mock({ 'getConnForNode': ReturnValues(conn1, conn2), }) app.cp = Mock({ 'getConnForNode': ReturnValues(conn1, conn2), })
# fake data # fake data
app.local_var.data_dict = {oid1: '', oid2: ''} app.local_var.data_dict = {oid1: '', oid2: ''}
app.local_var.involved_nodes = set([cell1, cell2])
app.tpc_abort(txn) app.tpc_abort(txn)
# will check if there was just one call/packet : # will check if there was just one call/packet :
self.checkNotifyPacket(conn1, Packets.AbortTransaction) self.checkNotifyPacket(conn1, Packets.AbortTransaction)
...@@ -616,6 +617,69 @@ class ClientApplicationTests(NeoTestBase): ...@@ -616,6 +617,69 @@ class ClientApplicationTests(NeoTestBase):
self.assertEquals(app.local_var.txn_voted, False) self.assertEquals(app.local_var.txn_voted, False)
self.assertEquals(app.local_var.txn_finished, False) self.assertEquals(app.local_var.txn_finished, False)
def test_tpc_abort3(self):
""" check that abort is sent to all nodes involved in the transaction """
app = self.getApp()
# three partitions/storages: one per object/transaction
app.num_partitions = 3
app.num_replicas = 0
tid = self.makeTID(0) # on partition 0
oid1 = self.makeOID(1) # on partition 1, conflicting
oid2 = self.makeOID(2) # on partition 2
# storage nodes
address1 = ('127.0.0.1', 10000)
address2 = ('127.0.0.1', 10001)
address3 = ('127.0.0.1', 10002)
app.nm.createMaster(address=address1)
app.nm.createStorage(address=address2)
app.nm.createStorage(address=address3)
# answer packets
packet1 = Packets.AnswerStoreTransaction(tid=tid)
packet2 = Packets.AnswerStoreObject(conflicting=1, oid=oid1, serial=tid)
packet3 = Packets.AnswerStoreObject(conflicting=0, oid=oid2, serial=tid)
[p.setId(i) for p, i in zip([packet1, packet2, packet3], range(3))]
conn1 = Mock({'__repr__': 'conn1', 'getAddress': address1, 'fakeReceived': packet1})
conn2 = Mock({'__repr__': 'conn2', 'getAddress': address2, 'fakeReceived': packet2})
conn3 = Mock({'__repr__': 'conn3', 'getAddress': address3, 'fakeReceived': packet3})
node1 = Mock({'__repr__': 'node1', '__hash__': 1, 'getConnection': conn1})
node2 = Mock({'__repr__': 'node2', '__hash__': 2, 'getConnection': conn2})
node3 = Mock({'__repr__': 'node3', '__hash__': 3, 'getConnection': conn3})
cell1 = Mock({ 'getNode': node1, '__hash__': 1, 'getConnection': conn1})
cell2 = Mock({ 'getNode': node2, '__hash__': 2, 'getConnection': conn2})
cell3 = Mock({ 'getNode': node3, '__hash__': 3, 'getConnection': conn3})
# fake environment
app.pt = Mock({
'getCellListForTID': [cell1],
'getCellListForOID': ReturnValues([cell2], [cell3]),
})
app.cp = Mock({'getConnForCell': ReturnValues(conn2, conn3, conn1)})
app.dispatcher = Mock()
app.master_conn = Mock({'__hash__': 0})
txn = self.makeTransactionObject()
app.local_var.txn, app.local_var.tid = txn, tid
class Dispatcher(object):
def pending(self, queue):
return not queue.empty()
app.dispatcher = Dispatcher()
# begin a transaction
app.tpc_begin(txn, tid)
# conflict occurs on storage 2
app.store(oid1, tid, 'DATA', None, txn)
app.store(oid2, tid, 'DATA', None, txn)
app.local_var.queue.put((conn2, packet2))
app.local_var.queue.put((conn3, packet3))
# vote fails as the conflict is not resolved, nothing is sent to storage 3
self.assertRaises(ConflictError, app.tpc_vote, txn, failing_tryToResolveConflict)
class ConnectionPool(object):
def getConnForNode(self, node):
return node.getConnection()
app.cp = ConnectionPool()
# abort must be sent to storage 1 and 2
app.tpc_abort(txn)
self.checkAbortTransaction(app.master_conn)
self.checkAbortTransaction(conn2)
self.checkAbortTransaction(conn3)
def test_tpc_finish1(self): def test_tpc_finish1(self):
# ignore mismatch transaction # ignore mismatch transaction
app = self.getApp() app = self.getApp()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment