Commit daa83cb4 authored by Julien Muchembled's avatar Julien Muchembled

master: fix 2 bugs in verification phase

- Last known TID was not updated when recovering a transaction.
- Missing OIDs were ignored, which caused partial transactions to be committed
  instead of being deleted.
parent 524463e8
......@@ -133,6 +133,9 @@ class VerificationManager(BaseServiceHandler):
if node.isStorage():
node.notify(packet)
else:
if app.getLastTransaction() < tid: # XXX: refactoring needed
app.setLastTransaction(tid)
app.tm.setLastTID(tid)
packet = Packets.CommitTransaction(tid)
for node in getIdentifiedList(pool_set=uuid_set):
node.notify(packet)
......@@ -221,7 +224,7 @@ class VerificationManager(BaseServiceHandler):
logging.info('OID not found: %s', message)
if not self._gotAnswerFrom(uuid):
return
self.app._object_present = False
self._object_present = False
def connectionCompleted(self, conn):
pass
......
......@@ -188,25 +188,6 @@ class MasterVerificationTests(NeoUnitTestBase):
verification.answerObjectPresent(conn, new_oid, new_tid)
self.assertTrue(uuid not in self.verification._uuid_set)
def test_15_oidNotFound(self):
verification = self.verification
uuid = self.identifyToMasterNode()
# do nothing as asking_uuid_dict is True
conn = self.getFakeConnection(uuid, self.storage_address)
self.assertEqual(len(self.verification._uuid_set), 0)
self.app._object_present = True
self.assertTrue(self.app._object_present)
verification.oidNotFound(conn, "msg")
self.assertTrue(self.app._object_present)
# do work as asking_uuid_dict is False
conn = self.getFakeConnection(uuid, self.storage_address)
self.assertEqual(len(self.verification._uuid_set), 0)
self.verification._uuid_set.add(uuid)
self.assertTrue(self.app._object_present)
verification.oidNotFound(conn, "msg")
self.assertFalse(self.app._object_present)
self.assertTrue(uuid not in self.verification._uuid_set)
if __name__ == '__main__':
unittest.main()
......@@ -319,6 +319,7 @@ class ServerNode(Node):
def _afterRun(self):
try:
self.listening_conn.close()
self.listening_conn = None
except AttributeError:
pass
......@@ -349,17 +350,12 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
del self.dm
except StandardError: # AttributeError & ProgrammingError
pass
if self.master_conn:
self.master_conn.close()
def getAdapter(self):
return self._init_args['getAdapter']
def switchTables(self):
q = self.dm.query
for table in 'trans', 'obj':
q('ALTER TABLE %s RENAME TO tmp' % table)
q('ALTER TABLE t%s RENAME TO %s' % (table, table))
q('ALTER TABLE tmp RENAME TO t%s' % table)
def getDataLockInfo(self):
dm = self.dm
index = tuple(dm.query("SELECT id, hash, compression FROM data"))
......@@ -780,6 +776,18 @@ class NEOCluster(object):
return Patch(self.client.cp, getCellSortKey=lambda orig, cell:
(orig(cell), key(cell)))
def moduloTID(self, partition):
"""Force generation of TIDs that will be stored in given partition"""
partition = p64(partition)
master = self.primary_master
return Patch(master.tm, _nextTID=lambda orig, *args:
orig(*args) if args else orig(partition, master.pt.getPartitions()))
def getStorageList(self, *args, **kw):
pt = self.primary_master.pt
uuid_set = {cell.getUUID() for offset in args
for cell in pt.getCellList(offset, **kw)}
return (s for s in self.storage_list if s.uuid in uuid_set)
class NEOThreadedTest(NeoTestBase):
......
......@@ -28,7 +28,7 @@ from neo.storage.transactions import TransactionManager, \
from neo.lib.connection import ConnectionClosed, MTClientConnection
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_TID
from .. import expectedFailure, _UnexpectedSuccess, Patch
from .. import expectedFailure, _ExpectedFailure, _UnexpectedSuccess, Patch
from . import NEOCluster, NEOThreadedTest
from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOStorageError
......@@ -394,30 +394,57 @@ class Test(NEOThreadedTest):
cluster.stop()
def testVerificationCommitUnfinishedTransactions(self):
""" Verification step should commit unfinished transactions """
# translated from neo.tests.functional.testCluster.ClusterTests
cluster = NEOCluster()
""" Verification step should commit locked transactions """
def delayUnlockInformation(conn, packet):
return isinstance(packet, Packets.NotifyUnlockInformation)
def onStoreTransaction(storage, die=False):
def storeTransaction(orig, *args, **kw):
orig(*args, **kw)
if die:
sys.exit()
storage.master_conn.close()
return Patch(storage.dm, storeTransaction=storeTransaction)
cluster = NEOCluster(partitions=2, storage_count=2)
try:
cluster.start()
s0, = cluster.getStorageList(0)
s1, = cluster.getStorageList(1)
t, c = cluster.getTransaction()
c.root()[0] = 'ok'
r = c.root()
r[0] = x = PCounter()
tids = [r._p_serial]
t.commit()
tids.append(r._p_serial)
r[1] = PCounter()
with onStoreTransaction(s0), onStoreTransaction(s1):
self.assertRaises(ConnectionClosed, t.commit)
self.tic()
data_info = cluster.storage.getDataLockInfo()
self.assertEqual(data_info.values(), [0, 0])
# (obj|trans) become t(obj|trans)
cluster.storage.switchTables()
t.begin()
y = r[1]
self.assertEqual(y.value, 0)
assert [u64(o._p_oid) for o in (r, x, y)] == range(3)
r[2] = 'ok'
with cluster.master.filterConnection(s0) as m2s, \
cluster.moduloTID(1):
m2s.add(delayUnlockInformation)
t.commit()
x.value = 1
# s0 will accept to store y (because it's not locked) but will
# never lock the transaction (packets from master delayed),
# so the last transaction will be dropped.
y.value = 2
with onStoreTransaction(s1, die=True):
self.assertRaises(ConnectionClosed, t.commit)
finally:
cluster.stop()
cluster.reset()
self.assertEqual(dict.fromkeys(data_info, 1),
cluster.storage.getDataLockInfo())
try:
cluster.start()
t, c = cluster.getTransaction()
# transaction should be verified and commited
self.assertEqual(c.root()[0], 'ok')
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
r = c.root()
self.assertEqual(r[0].value, 0)
self.assertEqual(r[1].value, 0)
self.assertEqual(r[2], 'ok')
finally:
cluster.stop()
......@@ -711,8 +738,7 @@ class Test(NEOThreadedTest):
# unlike the oid on which we used readCurrent. Thus we check
# that the node containing only the partition 1 is also
# involved in tpc_finish.
with Patch(cluster.master.tm, begin=lambda orig, node, tid:
orig(node, p64(u64(x._p_serial) + 2 & ~1))):
with cluster.moduloTID(0):
t2.commit()
for storage in cluster.storage_list:
self.assertFalse(storage.tm._transaction_dict)
......@@ -809,12 +835,13 @@ class Test(NEOThreadedTest):
try:
t.commit()
raise _UnexpectedSuccess
except ConnectionClosed:
pass
except ConnectionClosed, e:
e = type(e), None, None
t.begin()
expectedFailure(self.assertIn)('x', c.root())
self.assertIn('x', c.root())
finally:
cluster.stop()
raise _ExpectedFailure(e)
def testEmptyTransaction(self):
cluster = NEOCluster()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment