Commit 69975d76 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Make replication work with temporary TIDs.

- Storage nodes start to replicate a partition when all transactions that were
pending when the oudated partition was added are committed.
- Transactions are registered by the master from the tpc_begin step.
Signed-off-by: default avatarGrégory <gregory@nexedi.com>

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2649 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5410225c
......@@ -184,7 +184,7 @@ class EventHandler(object):
def askUnfinishedTransactions(self, conn):
raise UnexpectedPacketError
def answerUnfinishedTransactions(self, conn, tid_list):
def answerUnfinishedTransactions(self, conn, max_tid, ttid_list):
raise UnexpectedPacketError
def askObjectPresent(self, conn, oid, tid):
......@@ -229,6 +229,9 @@ class EventHandler(object):
def notifyUnlockInformation(self, conn, ttid):
raise UnexpectedPacketError
def notifyTransactionFinished(self, conn, ttid, max_tid):
raise UnexpectedPacketError
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid, unlock):
raise UnexpectedPacketError
......@@ -506,6 +509,7 @@ class EventHandler(object):
d[Packets.AnswerLastTransaction] = self.answerLastTransaction
d[Packets.AskCheckCurrentSerial] = self.askCheckCurrentSerial
d[Packets.AnswerCheckCurrentSerial] = self.answerCheckCurrentSerial
d[Packets.NotifyTransactionFinished] = self.notifyTransactionFinished
return d
......
......@@ -661,18 +661,18 @@ class AnswerUnfinishedTransactions(Packet):
"""
Answer unfinished transactions S -> PM.
"""
_header_format = '!L'
_header_format = '!8sL'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, tid_list):
body = [pack(self._header_format, len(tid_list))]
def _encode(self, max_tid, tid_list):
body = [pack(self._header_format, max_tid, len(tid_list))]
body.extend(tid_list)
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
(max_tid, n) = unpack(self._header_format, body[:offset])
tid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
......@@ -681,7 +681,7 @@ class AnswerUnfinishedTransactions(Packet):
tid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
tid_list.append(tid)
return (tid_list,)
return (max_tid, tid_list)
class AskObjectPresent(Packet):
"""
......@@ -784,6 +784,18 @@ class AskFinishTransaction(Packet):
oid_list.append(oid)
return (tid, oid_list)
class NotifyTransactionFinished(Packet):
"""
Notify that a transaction blocking a replication is now finished
M -> S
"""
def _encode(self, ttid, max_tid):
return _encodeTID(ttid) + _encodeTID(max_tid)
def _decode(self, body):
(ttid, max_tid) = unpack('8s8s', body)
return (ttid, max_tid)
class AnswerTransactionFinished(Packet):
"""
Answer when a transaction is finished. PM -> C.
......@@ -2044,6 +2056,10 @@ class PacketRegistry(dict):
AskCheckCurrentSerial,
AnswerCheckCurrentSerial,
)
NotifyTransactionFinished = register(
0x003E,
NotifyTransactionFinished,
)
# build a "singleton"
Packets = PacketRegistry()
......
......@@ -585,6 +585,13 @@ class Application(object):
for storage_uuid in txn.getUUIDList():
getByUUID(storage_uuid).getConnection().notify(notify_unlock)
# Notify storage that have replications blocked by this transaction
notify_finished = Packets.NotifyTransactionFinished(ttid, tid)
for storage_uuid in txn.getNotificationUUIDList():
node = getByUUID(storage_uuid)
if node is not None and node.isConnected():
node.getConnection().notify(notify_finished)
# remove transaction from manager
self.tm.remove(transaction_node.getUUID(), ttid)
self.setLastTransaction(tid)
......
......@@ -51,8 +51,9 @@ class ClientServiceHandler(MasterHandler):
"""
A client request a TID, nothing is kept about it until the finish.
"""
conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
conn.getUUID(), tid)))
app = self.app
node = app.nm.getByUUID(conn.getUUID())
conn.answer(Packets.AnswerBeginTransaction(app.tm.begin(node, tid)))
def askNewOIDs(self, conn, num_oids):
app = self.app
......@@ -84,9 +85,8 @@ class ClientServiceHandler(MasterHandler):
usable_uuid_set = set((x.getUUID() for x in identified_node_list))
partitions = app.pt.getPartitions()
peer_id = conn.getPeerId()
node = app.nm.getByUUID(conn.getUUID())
tid = app.tm.prepare(node, ttid, partitions, oid_list,
usable_uuid_set, peer_id)
tid = app.tm.prepare(ttid, partitions, oid_list, usable_uuid_set,
peer_id)
# check if greater and foreign OID was stored
if app.tm.updateLastOID(oid_list):
......
......@@ -59,7 +59,10 @@ class StorageServiceHandler(BaseServiceHandler):
conn.answer(Packets.AnswerLastIDs(loid, ltid, app.pt.getID()))
def askUnfinishedTransactions(self, conn):
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
tm = self.app.tm
pending_list = tm.registerForNotification(conn.getUUID())
last_tid = tm.getLastTID()
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
conn.answer(p)
def answerInformationLocked(self, conn, ttid):
......
......@@ -91,29 +91,31 @@ class Transaction(object):
"""
A pending transaction
"""
_tid = None
_msg_id = None
_oid_list = None
_prepared = False
# uuid dict hold flag to known who has locked the transaction
_uuid_set = None
_lock_wait_uuid_set = None
def __init__(self, node, ttid, tid, oid_list, uuid_list, msg_id):
def __init__(self, node, ttid):
"""
Prepare the transaction, set OIDs and UUIDs related to it
"""
self._node = node
self._ttid = ttid
self._tid = tid
self._oid_list = oid_list
self._msg_id = msg_id
# uuid dict hold flag to known who has locked the transaction
self._uuid_set = set(uuid_list)
self._lock_wait_uuid_set = set(uuid_list)
self._birth = time()
self._prepared = False
# store storage uuids that must be notified at commit
self._notification_set = set()
def __repr__(self):
return "<%s(client=%r, tid=%r, oids=%r, storages=%r, age=%.2fs) at %x>" % (
self.__class__.__name__,
self._node,
dump(self._tid),
[dump(x) for x in self._oid_list],
[dump(x) for x in self._uuid_set],
[dump(x) for x in self._oid_list or ()],
[dump(x) for x in self._uuid_set or ()],
time() - self._birth,
id(self),
)
......@@ -161,6 +163,19 @@ class Transaction(object):
"""
return self._prepared
def registerForNotification(self, uuid):
"""
Register a storage node that requires a notification at commit
"""
self._notification_set.add(uuid)
def getNotificationUUIDList(self):
"""
Returns the list of storage waiting for the transaction to be
finished
"""
return list(self._notification_set)
def prepare(self, tid, oid_list, uuid_list, msg_id):
self._tid = tid
......@@ -332,31 +347,42 @@ class TransactionManager(object):
"""
return bool(self._ttid_dict)
def getPendingList(self):
def registerForNotification(self, uuid):
"""
Return the list of pending transaction IDs
"""
return [txn.getTID() for txn in self._ttid_dict.values()]
# remember that this node must be notified when pending transactions
# will be finished
for txn in self._ttid_dict.itervalues():
txn.registerForNotification(uuid)
return set(self._ttid_dict.keys())
def begin(self, uuid, tid=None):
def begin(self, node, tid=None):
"""
Generate a new TID
"""
if tid is None:
# No TID requested, generate a temporary one
tid = self.getTTID()
ttid = self.getTTID()
else:
# Use of specific TID requested, queue it immediately and update
# last TID.
self._queue.append((uuid, tid))
self._queue.append((node.getUUID(), tid))
self.setLastTID(tid)
return tid
ttid = tid
txn = Transaction(node, ttid)
self._ttid_dict[ttid] = txn
self._node_dict.setdefault(node, {})[ttid] = txn
neo.lib.logging.debug('Begin %s for %s', txn, node)
return ttid
def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id):
def prepare(self, ttid, divisor, oid_list, uuid_list, msg_id):
"""
Prepare a transaction to be finished
"""
# XXX: not efficient but the list should be often small
txn = self._ttid_dict[ttid]
node = txn.getNode()
for _, tid in self._queue:
if ttid == tid:
break
......@@ -365,9 +391,7 @@ class TransactionManager(object):
self._queue.append((node.getUUID(), ttid))
neo.lib.logging.debug('Finish TXN %s for %s (was %s)',
dump(tid), node, dump(ttid))
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
self._ttid_dict[ttid] = txn
self._node_dict.setdefault(node, {})[ttid] = txn
txn.prepare(tid, oid_list, uuid_list, msg_id)
return tid
def remove(self, uuid, ttid):
......@@ -383,7 +407,6 @@ class TransactionManager(object):
ttid_dict = self._ttid_dict
if ttid in ttid_dict:
txn = ttid_dict[ttid]
tid = txn.getTID()
node = txn.getNode()
# ...and tried to finish
del ttid_dict[ttid]
......
......@@ -195,7 +195,7 @@ class VerificationManager(BaseServiceHandler):
# approved during recovery, there is no need to check them here.
pass
def answerUnfinishedTransactions(self, conn, tid_list):
def answerUnfinishedTransactions(self, conn, max_tid, tid_list):
uuid = conn.getUUID()
neo.lib.logging.info('got unfinished transactions %s from %r',
[dump(tid) for tid in tid_list], conn)
......
......@@ -27,8 +27,11 @@ class MasterOperationHandler(BaseMasterHandler):
def answerLastIDs(self, conn, loid, ltid, lptid):
self.app.replicator.setCriticalTID(ltid)
def answerUnfinishedTransactions(self, conn, tid_list):
self.app.replicator.setUnfinishedTIDList(tid_list)
def answerUnfinishedTransactions(self, conn, max_tid, ttid_list):
self.app.replicator.setUnfinishedTIDList(max_tid, ttid_list)
def notifyTransactionFinished(self, conn, ttid, max_tid):
self.app.replicator.transactionFinished(ttid, max_tid)
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
......
......@@ -18,7 +18,7 @@
import neo
from neo.storage.handlers import BaseMasterHandler
from neo.lib.protocol import Packets, Errors, ProtocolError
from neo.lib.protocol import Packets, Errors, ProtocolError, INVALID_TID
from neo.lib.util import dump
from neo.lib.exception import OperationFailure
......@@ -62,7 +62,7 @@ class VerificationHandler(BaseMasterHandler):
def askUnfinishedTransactions(self, conn):
tid_list = self.app.dm.getUnfinishedTIDList()
conn.answer(Packets.AnswerUnfinishedTransactions(tid_list))
conn.answer(Packets.AnswerUnfinishedTransactions(INVALID_TID, tid_list))
def askTransactionInformation(self, conn, tid):
app = self.app
......
......@@ -26,22 +26,26 @@ from neo.lib.util import dump
class Partition(object):
"""This class abstracts the state of a partition."""
def __init__(self, offset, tid):
self.offset = offset
if tid is None:
tid = ZERO_TID
self.tid = tid
def __init__(self, offset, max_tid, ttid_list):
self._offset = offset
self._pending_ttid_list = ttid_list
# pending upper bound
self._critical_tid = max_tid
def getOffset(self):
return self.offset
return self._offset
def getCriticalTID(self):
return self.tid
return self._critical_tid
def safe(self, min_pending_tid):
tid = self.tid
return tid is not None and (
min_pending_tid is None or tid < min_pending_tid)
def transactionFinished(self, ttid, max_tid):
self._pending_ttid_list.remove(ttid)
assert max_tid is not None
# final upper bound
self._critical_tid = max_tid
def safe(self):
return not self._pending_ttid_list
class Task(object):
"""
......@@ -115,23 +119,18 @@ class Replicator(object):
I ask only non-existing data. """
# new_partition_set
# outdated partitions for which no critical tid was asked to primary
# master yet
# critical_tid_list
# outdated partitions for which a critical tid was asked to primary
# master, but not answered so far
# outdated partitions for which no pending transactions was asked to
# primary master yet
# partition_dict
# outdated partitions (with or without a critical tid - if without, it
# was asked to primary master)
# outdated partitions with pending transaction and temporary critical
# tid
# current_partition
# partition being currently synchronised
# current_connection
# connection to a storage node we are replicating from
# waiting_for_unfinished_tids
# unfinished_tid_list has been asked to primary master node, but it
# unfinished tids have been asked to primary master node, but it
# didn't answer yet.
# unfinished_tid_list
# The list of unfinished TIDs known by master node.
# replication_done
# False if we know there is something to replicate.
# True when current_partition is replicated, or we don't know yet if
......@@ -140,13 +139,11 @@ class Replicator(object):
current_partition = None
current_connection = None
waiting_for_unfinished_tids = False
unfinished_tid_list = None
replication_done = True
def __init__(self, app):
self.app = app
self.new_partition_set = set()
self.critical_tid_list = []
self.partition_dict = {}
self.task_list = []
self.task_dict = {}
......@@ -156,7 +153,6 @@ class Replicator(object):
When connection to primary master is lost, stop waiting for unfinished
transactions.
"""
self.critical_tid_list = []
self.waiting_for_unfinished_tids = False
def storageLost(self):
......@@ -182,13 +178,11 @@ class Replicator(object):
self.task_dict = {}
self.current_partition = None
self.current_connection = None
self.unfinished_tid_list = None
self.replication_done = True
def pending(self):
"""Return whether there is any pending partition."""
return len(self.partition_dict) or len(self.new_partition_set) \
or self.critical_tid_list
return len(self.partition_dict) or len(self.new_partition_set)
def getCurrentOffset(self):
assert self.current_partition is not None
......@@ -205,25 +199,21 @@ class Replicator(object):
def isCurrentConnection(self, conn):
return self.current_connection is conn
def setCriticalTID(self, tid):
"""This is a callback from MasterOperationHandler."""
neo.lib.logging.debug('setting critical TID %s to %s', dump(tid),
', '.join([str(p) for p in self.critical_tid_list]))
for offset in self.critical_tid_list:
self.partition_dict[offset] = Partition(offset, tid)
self.critical_tid_list = []
def _askCriticalTID(self):
self.app.master_conn.ask(Packets.AskLastIDs())
self.critical_tid_list.extend(self.new_partition_set)
self.new_partition_set.clear()
def setUnfinishedTIDList(self, tid_list):
def setUnfinishedTIDList(self, max_tid, ttid_list):
"""This is a callback from MasterOperationHandler."""
neo.lib.logging.debug('setting unfinished TIDs %s',
','.join([dump(tid) for tid in tid_list]))
neo.lib.logging.debug('setting unfinished TTIDs %s',
','.join([dump(tid) for tid in ttid_list]))
# all new outdated partition must wait those ttid
new_partition_set = self.new_partition_set
while new_partition_set:
offset = new_partition_set.pop()
self.partition_dict[offset] = Partition(offset, max_tid, ttid_list)
self.waiting_for_unfinished_tids = False
self.unfinished_tid_list = tid_list
def transactionFinished(self, ttid, max_tid):
""" Callback from MasterOperationHandler """
partition = self.partition_dict[self.app.pt.getPartition(ttid)]
partition.transactionFinished(ttid, max_tid)
def _askUnfinishedTIDs(self):
conn = self.app.master_conn
......@@ -283,10 +273,6 @@ class Replicator(object):
self.current_connection = None
def act(self):
# If the new partition list is not empty, I must ask a critical
# TID to a primary master node.
if self.new_partition_set:
self._askCriticalTID()
if self.current_partition is not None:
# Don't end replication until we have received all expected
......@@ -305,24 +291,22 @@ class Replicator(object):
neo.lib.logging.debug('waiting for unfinished tids')
return
if self.unfinished_tid_list is None:
if self.new_partition_set:
# Ask pending transactions.
neo.lib.logging.debug('asking unfinished tids')
self._askUnfinishedTIDs()
return
# Try to select something.
if len(self.unfinished_tid_list):
min_unfinished_tid = min(self.unfinished_tid_list)
else:
min_unfinished_tid = None
for partition in self.partition_dict.values():
if partition.safe(min_unfinished_tid):
# XXX: replication could start up to the initial critical tid, that
# is below the pending transactions, then finish when all pending
# transactions are committed.
if partition.safe():
self.current_partition = partition
break
else:
# Not yet.
self.unfinished_tid_list = None
neo.lib.logging.debug('not ready yet')
return
......
......@@ -283,7 +283,7 @@ class NeoUnitTestBase(NeoTestBase):
def checkNotifyPacket(self, conn, packet_type, packet_number=0, decode=False):
""" Check if a notify-packet with the right type is sent """
calls = conn.mockGetNamedCalls('notify')
self.assertTrue(len(calls) > packet_number)
self.assertTrue(len(calls) > packet_number, (len(calls), packet_number))
packet = calls[packet_number].getParam(0)
self.assertTrue(isinstance(packet, protocol.Packet))
self.assertEquals(packet.getType(), packet_type)
......@@ -324,6 +324,9 @@ class NeoUnitTestBase(NeoTestBase):
def checkNotifyUnlockInformation(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.NotifyUnlockInformation, **kw)
def checkNotifyTransactionFinished(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.NotifyTransactionFinished, **kw)
def checkRequestIdentification(self, conn, **kw):
return self.checkAskPacket(conn, Packets.RequestIdentification, **kw)
......
......@@ -15,12 +15,14 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import time
import unittest
import transaction
from persistent import Persistent
from neo.tests.functional import NEOCluster, NEOFunctionalTest
from neo.lib.protocol import ClusterStates, NodeStates
from ZODB.tests.StorageTestBase import zodb_pickle
from MySQLdb import ProgrammingError
from MySQLdb.constants.ER import NO_SUCH_TABLE
......@@ -522,5 +524,46 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectClusterRecovering()
self.neo.expectOudatedCells(number=10)
def testReplicationBlockedByUnfinished(self):
# start a cluster with 1 of 2 storages and a replica
(started, stopped) = self.__setup(storage_number=2, replicas=1,
pending_number=1, partitions=10)
self.neo.expectRunning(started[0])
self.neo.expectStorageNotKnown(stopped[0])
self.neo.expectOudatedCells(number=0)
self.neo.expectClusterRunning()
self.__populate()
self.neo.expectOudatedCells(number=0)
# start a transaction that will block the end of the replication
db, conn = self.neo.getZODBConnection()
st = conn._storage
t = transaction.Transaction()
t.user = 'user'
t.description = 'desc'
oid = st.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject(42))
st.tpc_begin(t)
st.store(oid, rev, data, '', t)
# start the oudated storage
stopped[0].start()
self.neo.expectPending(stopped[0])
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.neo.expectRunning(stopped[0])
self.neo.expectClusterRunning()
self.neo.expectAssignedCells(started[0], 10)
self.neo.expectAssignedCells(stopped[0], 10)
# wait a bit, replication must not happen. This hack is required
# because we cannot gather informations directly from the storages
time.sleep(10)
self.neo.expectOudatedCells(number=10)
# finish the transaction, the replication must happen and finish
st.tpc_vote(t)
st.tpc_finish(t)
self.neo.expectOudatedCells(number=0, timeout=10)
if __name__ == "__main__":
unittest.main()
......@@ -74,11 +74,12 @@ class MasterClientHandlerTests(NeoUnitTestBase):
})
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
client_node = self.app.nm.getByUUID(client_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.askBeginTransaction(conn, None)
calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(client_uuid, None)
calls[0].checkArgs(client_node, None)
self.checkAnswerBeginTransaction(conn)
# Client asks for a TID
conn = self.getFakeConnection(client_uuid, self.client_address)
......@@ -86,7 +87,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
service.askBeginTransaction(conn, tid1)
calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(client_uuid, None)
calls[0].checkArgs(client_node, None)
args = self.checkAnswerBeginTransaction(conn, decode=True)
self.assertEqual(args, (tid1, ))
......@@ -142,9 +143,10 @@ class MasterClientHandlerTests(NeoUnitTestBase):
self.assertTrue(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, oid_list)
self.checkAskLockInformation(storage_conn)
self.assertEquals(len(self.app.tm.getPendingList()), 1)
self.assertEquals(len(self.app.tm.registerForNotification(storage_uuid)), 1)
txn = self.app.tm[ttid]
self.assertEquals(txn.getTID(), self.app.tm.getPendingList()[0])
pending_ttid = list(self.app.tm.registerForNotification(storage_uuid))[0]
self.assertEquals(ttid, pending_ttid)
self.assertEquals(len(txn.getOIDList()), 0)
self.assertEquals(len(txn.getUUIDList()), 1)
......
......@@ -101,8 +101,8 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
oid_list = self.getOID(), self.getOID()
msg_id = 1
# register a transaction
ttid = self.app.tm.begin(client_1.getUUID())
tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list,
ttid = self.app.tm.begin(client_1)
tid = self.app.tm.prepare(ttid, 1, oid_list, uuid_list,
msg_id)
self.assertTrue(ttid in self.app.tm)
# the first storage acknowledge the lock
......@@ -141,17 +141,17 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# give a uuid
service.askUnfinishedTransactions(conn)
packet = self.checkAnswerUnfinishedTransactions(conn)
tid_list, = packet.decode()
max_tid, tid_list = packet.decode()
self.assertEqual(tid_list, [])
# create some transaction
node, conn = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
port=self.client_port)
self.app.tm.prepare(node, self.getNextTID(), 1,
ttid = self.app.tm.begin(node)
self.app.tm.prepare(ttid, 1,
[self.getOID(1)], [node.getUUID()], 1)
conn = self.getFakeConnection(node.getUUID(), self.storage_address)
service.askUnfinishedTransactions(conn)
packet = self.checkAnswerUnfinishedTransactions(conn)
(tid_list, ) = packet.decode()
max_tid, tid_list = self.checkAnswerUnfinishedTransactions(conn, decode=True)
self.assertEqual(len(tid_list), 1)
def _testWithMethod(self, method, state):
......@@ -208,26 +208,28 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
ttid1 = tm.begin(node1.getUUID())
tid1 = tm.prepare(client1, ttid1, 1, oid_list,
ttid1 = tm.begin(client1)
tid1 = tm.prepare(ttid1, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(ttid1, node2.getUUID())
# storage 1 request a notification at commit
tm. registerForNotification(node1.getUUID())
self.checkNoPacketSent(cconn1)
# Storage 1 dies
node1.setTemporarilyDown()
self.service.nodeLost(conn1, node1)
# T1: last locking node lost, client receives AnswerTransactionFinished
self.checkAnswerTransactionFinished(cconn1)
self.checkNotifyTransactionFinished(conn1)
self.checkNotifyUnlockInformation(conn2)
self.checkNoPacketSent(conn1)
# ...and notifications are sent to other clients
self.checkInvalidateObjects(cconn2)
self.checkInvalidateObjects(cconn3)
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
ttid2 = tm.begin(node1.getUUID())
tid2 = tm.prepare(client2, ttid2, 1, oid_list,
ttid2 = tm.begin(node1)
tid2 = tm.prepare(ttid2, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
......@@ -235,8 +237,8 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
ttid3 = tm.begin(node1.getUUID())
tid3 = tm.prepare(client3, ttid3, 1, oid_list,
ttid3 = tm.begin(node1)
tid3 = tm.prepare(ttid3, 1, oid_list,
[node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
......
......@@ -37,7 +37,7 @@ class testTransactionManager(NeoUnitTestBase):
def makeNode(self, i):
uuid = self.makeUUID(i)
node = Mock({'getUUID': uuid, '__hash__': 0})
node = Mock({'getUUID': uuid, '__hash__': i, '__repr__': 'FakeNode'})
return uuid, node
def testTransaction(self):
......@@ -49,7 +49,8 @@ class testTransactionManager(NeoUnitTestBase):
uuid_list = (uuid1, uuid2) = [self.makeUUID(1), self.makeUUID(2)]
msg_id = 1
# create transaction object
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
txn = Transaction(node, ttid)
txn.prepare(tid, oid_list, uuid_list, msg_id)
self.assertEqual(txn.getUUIDList(), uuid_list)
self.assertEqual(txn.getOIDList(), oid_list)
# lock nodes one by one
......@@ -69,16 +70,16 @@ class testTransactionManager(NeoUnitTestBase):
callback = Mock()
txnman = TransactionManager(on_commit=callback)
self.assertFalse(txnman.hasPending())
self.assertEqual(txnman.getPendingList(), [])
self.assertEqual(txnman.registerForNotification(uuid1), set())
# begin the transaction
ttid = txnman.begin(client_uuid)
ttid = txnman.begin(node)
self.assertTrue(ttid is not None)
self.assertFalse(txnman.hasPending())
self.assertEqual(len(txnman.getPendingList()), 0)
self.assertEqual(len(txnman.registerForNotification(uuid1)), 1)
self.assertTrue(txnman.hasPending())
# prepare the transaction
tid = txnman.prepare(node, ttid, 1, oid_list, uuid_list, msg_id)
tid = txnman.prepare(ttid, 1, oid_list, uuid_list, msg_id)
self.assertTrue(txnman.hasPending())
self.assertEqual(txnman.getPendingList()[0], tid)
self.assertEqual(txnman.registerForNotification(uuid1), set([ttid]))
txn = txnman[ttid]
self.assertEqual(txn.getTID(), tid)
self.assertEqual(txn.getUUIDList(), list(uuid_list))
......@@ -90,30 +91,30 @@ class testTransactionManager(NeoUnitTestBase):
self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
# transaction finished
txnman.remove(client_uuid, ttid)
self.assertEqual(txnman.getPendingList(), [])
self.assertEqual(txnman.registerForNotification(uuid1), set())
def testAbortFor(self):
node1 = Mock({'__hash__': 1})
node2 = Mock({'__hash__': 2})
oid_list = [self.makeOID(1), ]
storage_1_uuid = self.makeUUID(1)
storage_2_uuid = self.makeUUID(2)
client_uuid = self.makeUUID(3)
storage_1_uuid, node1 = self.makeNode(1)
storage_2_uuid, node2 = self.makeNode(2)
client_uuid, client = self.makeNode(3)
txnman = TransactionManager(lambda tid, txn: None)
# register 4 transactions made by two nodes
self.assertEqual(txnman.getPendingList(), [])
ttid1 = txnman.begin(client_uuid)
tid1 = txnman.prepare(node1, ttid1, 1, oid_list, [storage_1_uuid], 1)
self.assertEqual(txnman.getPendingList(), [tid1])
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set())
ttid1 = txnman.begin(client)
tid1 = txnman.prepare(ttid1, 1, oid_list, [storage_1_uuid], 1)
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1]))
# abort transactions of another node, transaction stays
txnman.abortFor(node2)
self.assertEqual(txnman.getPendingList(), [tid1])
# abort transactions of requesting node, transaction is removed
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1]))
# abort transactions of requesting node, transaction is not removed
# because the transaction is prepared and must remains until the end of
# the 2PC
txnman.abortFor(node1)
self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending())
self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1]))
self.assertTrue(txnman.hasPending())
# ...and the lock is available
txnman.begin(client_uuid, self.getNextTID())
txnman.begin(client, self.getNextTID())
def test_getNextOIDList(self):
txnman = TransactionManager(lambda tid, txn: None)
......@@ -141,8 +142,8 @@ class testTransactionManager(NeoUnitTestBase):
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
ttid1 = tm.begin(client_uuid)
tid1 = tm.prepare(client1, ttid1, 1, oid_list,
ttid1 = tm.begin(client1)
tid1 = tm.prepare(ttid1, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(ttid1, storage_2_uuid)
t1 = tm[ttid1]
......@@ -155,8 +156,8 @@ class testTransactionManager(NeoUnitTestBase):
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
ttid2 = tm.begin(client_uuid)
tid2 = tm.prepare(client2, ttid2, 1, oid_list,
ttid2 = tm.begin(client2)
tid2 = tm.prepare(ttid2, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_2)
t2 = tm[ttid2]
self.assertFalse(t2.locked())
......@@ -169,8 +170,8 @@ class testTransactionManager(NeoUnitTestBase):
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
ttid3 = tm.begin(client_uuid)
tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ],
ttid3 = tm.begin(client3)
tid3 = tm.prepare(ttid3, 1, oid_list, [storage_2_uuid, ],
msg_id_3)
t3 = tm[ttid3]
self.assertFalse(t3.locked())
......@@ -213,29 +214,28 @@ class testTransactionManager(NeoUnitTestBase):
strictly increasing order.
Note: this implementation might change later, to allow more paralelism.
"""
client_uuid = self.makeUUID(3)
client_uuid, client = self.makeNode(1)
tm = TransactionManager(lambda tid, txn: None)
# With a requested TID, lock spans from begin to remove
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
tid1 = tm.begin(client_uuid, ttid1)
tid1 = tm.begin(client, ttid1)
self.assertEqual(tid1, ttid1)
tm.remove(client_uuid, tid1)
# Without a requested TID, lock spans from prepare to remove only
ttid3 = tm.begin(client_uuid)
ttid4 = tm.begin(client_uuid) # Doesn't raise
ttid3 = tm.begin(client)
ttid4 = tm.begin(client) # Doesn't raise
node = Mock({'getUUID': client_uuid, '__hash__': 0})
tid4 = tm.prepare(node, ttid4, 1, [], [], 0)
tid4 = tm.prepare(ttid4, 1, [], [], 0)
tm.remove(client_uuid, tid4)
tm.prepare(node, ttid3, 1, [], [], 0)
tm.prepare(ttid3, 1, [], [], 0)
def testClientDisconectsAfterBegin(self):
client1_uuid = self.makeUUID(1)
client_uuid1, node1 = self.makeNode(1)
tm = TransactionManager(lambda tid, txn: None)
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tm.begin(client1_uuid, tid1)
node1 = Mock({'getUUID': client1_uuid, '__hash__': 0})
tm.begin(node1, tid1)
tm.abortFor(node1)
self.assertTrue(tid1 not in tm)
......@@ -245,10 +245,10 @@ class testTransactionManager(NeoUnitTestBase):
uuid2, node2 = self.makeNode(2)
storage_uuid = self.makeUUID(3)
tm = TransactionManager(callback)
ttid1 = tm.begin(uuid1)
ttid2 = tm.begin(uuid2)
tid1 = tm.prepare(node1, ttid1, 1, [], [storage_uuid], 0)
tid2 = tm.prepare(node2, ttid2, 1, [], [storage_uuid], 0)
ttid1 = tm.begin(node1)
ttid2 = tm.begin(node2)
tid1 = tm.prepare(ttid1, 1, [], [storage_uuid], 0)
tid2 = tm.prepare(ttid2, 1, [], [storage_uuid], 0)
tm.lock(ttid2, storage_uuid)
# txn 2 is still blocked by txn 1
self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
......
......@@ -124,14 +124,14 @@ class MasterVerificationTests(NeoUnitTestBase):
self.assertEquals(len(self.verification._uuid_set), 0)
self.assertEquals(len(self.verification._tid_set), 0)
new_tid = self.getNextTID()
verification.answerUnfinishedTransactions(conn, [new_tid])
verification.answerUnfinishedTransactions(conn, new_tid, [new_tid])
self.assertEquals(len(self.verification._tid_set), 0)
# update dict
conn = self.getFakeConnection(uuid, self.storage_address)
self.verification._uuid_set.add(uuid)
self.assertEquals(len(self.verification._tid_set), 0)
new_tid = self.getNextTID(new_tid)
verification.answerUnfinishedTransactions(conn, [new_tid,])
verification.answerUnfinishedTransactions(conn, new_tid, [new_tid])
self.assertTrue(uuid not in self.verification._uuid_set)
self.assertEquals(len(self.verification._tid_set), 1)
self.assertTrue(new_tid in self.verification._tid_set)
......
......@@ -190,11 +190,12 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
self.app.replicator = Mock()
self.operation.answerUnfinishedTransactions(
conn=conn,
tid_list=(INVALID_TID, ),
max_tid=INVALID_TID,
ttid_list=(INVALID_TID, ),
)
calls = self.app.replicator.mockGetNamedCalls('setUnfinishedTIDList')
self.assertEquals(len(calls), 1)
calls[0].checkArgs((INVALID_TID, ))
calls[0].checkArgs(INVALID_TID, (INVALID_TID, ))
def test_askPack(self):
self.app.dm = Mock({'pack': None})
......
......@@ -40,7 +40,6 @@ class StorageReplicatorTests(NeoUnitTestBase):
})
replicator = Replicator(app)
self.assertEqual(replicator.new_partition_set, set())
replicator.replication_done = False
replicator.populate()
self.assertEqual(replicator.new_partition_set, set([0]))
......@@ -50,40 +49,32 @@ class StorageReplicatorTests(NeoUnitTestBase):
replicator.task_dict = {'foo': 'bar'}
replicator.current_partition = 'foo'
replicator.current_connection = 'foo'
replicator.unfinished_tid_list = ['foo']
replicator.replication_done = 'foo'
replicator.reset()
self.assertEqual(replicator.task_list, [])
self.assertEqual(replicator.task_dict, {})
self.assertEqual(replicator.current_partition, None)
self.assertEqual(replicator.current_connection, None)
self.assertEqual(replicator.unfinished_tid_list, None)
self.assertTrue(replicator.replication_done)
def test_setCriticalTID(self):
replicator = Replicator(None)
critical_tid = self.getNextTID()
partition = Partition(0, critical_tid)
partition = Partition(0, critical_tid, [])
self.assertEqual(partition.getCriticalTID(), critical_tid)
def test_setUnfinishedTIDList(self):
replicator = Replicator(None)
replicator.waiting_for_unfinished_tids = True
assert replicator.unfinished_tid_list is None, \
replicator.unfinished_tid_list
tid_list = [self.getNextTID(), ]
replicator.setUnfinishedTIDList(tid_list)
self.assertEqual(replicator.unfinished_tid_list, tid_list)
self.assertFalse(replicator.waiting_for_unfinished_tids)
self.assertEqual(partition.getOffset(), 0)
def test_act(self):
# Also tests "pending"
uuid = self.getNewUUID()
master_uuid = self.getNewUUID()
bad_unfinished_tid = self.getNextTID()
critical_tid = self.getNextTID()
unfinished_tid = self.getNextTID()
critical_tid_0 = self.getNextTID()
critical_tid_1 = self.getNextTID()
critical_tid_2 = self.getNextTID()
unfinished_ttid_1 = self.getOID(1)
unfinished_ttid_2 = self.getOID(2)
app = Mock()
app.server = ('127.0.0.1', 10000)
app.name = 'fake cluster'
app.em = Mock({
'register': None,
})
......@@ -105,6 +96,7 @@ class StorageReplicatorTests(NeoUnitTestBase):
app.pt = Mock({
'getCellList': [running_cell, unknown_cell],
'getOutdatedOffsetListFor': [0],
'getPartition': 0,
})
node_conn_handler = Mock({
'startReplication': None,
......@@ -119,37 +111,28 @@ class StorageReplicatorTests(NeoUnitTestBase):
app.master_conn = self.getFakeConnection(uuid=master_uuid)
self.assertTrue(replicator.pending())
replicator.act()
# ask last IDs to infer critical_tid and unfinished tids
# ask unfinished tids
act()
last_ids, unfinished_tids = [x.getParam(0) for x in \
app.master_conn.mockGetNamedCalls('ask')]
self.assertEqual(last_ids.getType(), Packets.AskLastIDs)
self.assertFalse(replicator.new_partition_set)
self.assertEqual(unfinished_tids.getType(),
Packets.AskUnfinishedTransactions)
unfinished_tids = app.master_conn.mockGetNamedCalls('ask')[0].getParam(0)
self.assertTrue(replicator.new_partition_set)
self.assertEqual(unfinished_tids.getType(), Packets.AskUnfinishedTransactions)
self.assertTrue(replicator.waiting_for_unfinished_tids)
# nothing happens until waiting_for_unfinished_tids becomes False
act()
self.checkNoPacketSent(app.master_conn)
self.assertTrue(replicator.waiting_for_unfinished_tids)
# Send answers (garanteed to happen in this order)
replicator.setCriticalTID(critical_tid)
act()
self.checkNoPacketSent(app.master_conn)
self.assertTrue(replicator.waiting_for_unfinished_tids)
# first time, there is an unfinished tid before critical tid,
# replication cannot start, and unfinished TIDs are asked again
replicator.setUnfinishedTIDList([unfinished_tid, bad_unfinished_tid])
replicator.setUnfinishedTIDList(critical_tid_0,
[unfinished_ttid_1, unfinished_ttid_2])
self.assertFalse(replicator.waiting_for_unfinished_tids)
# Note: detection that nothing can be replicated happens on first call
# and unfinished tids are asked again on second call. This is ok, but
# might change, so just call twice.
act()
replicator.transactionFinished(unfinished_ttid_1, critical_tid_1)
act()
self.checkAskPacket(app.master_conn, Packets.AskUnfinishedTransactions)
self.assertTrue(replicator.waiting_for_unfinished_tids)
# this time, critical tid check should be satisfied
replicator.setUnfinishedTIDList([unfinished_tid, ])
replicator.transactionFinished(unfinished_ttid_2, critical_tid_2)
replicator.current_connection = node_conn
act()
self.assertEqual(replicator.current_partition,
......@@ -174,8 +157,6 @@ class StorageReplicatorTests(NeoUnitTestBase):
'isPending': False,
})
act()
# unfinished tid list will not be asked again
self.assertTrue(replicator.unfinished_tid_list)
# also, replication is over
self.assertFalse(replicator.pending())
......
......@@ -161,7 +161,7 @@ class StorageVerificationHandlerTests(NeoUnitTestBase):
})
conn = self.getMasterConnection()
self.verification.askUnfinishedTransactions(conn)
(tid_list, ) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
(max_tid, tid_list) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
self.assertEqual(len(tid_list), 0)
call_list = self.app.dm.mockGetNamedCalls('getUnfinishedTIDList')
self.assertEqual(len(call_list), 1)
......@@ -173,7 +173,7 @@ class StorageVerificationHandlerTests(NeoUnitTestBase):
})
conn = self.getMasterConnection()
self.verification.askUnfinishedTransactions(conn)
(tid_list, ) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
(max_tid, tid_list) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
self.assertEqual(len(tid_list), 1)
self.assertEqual(u64(tid_list[0]), 4)
......
......@@ -195,13 +195,15 @@ class ProtocolTests(NeoUnitTestBase):
self.assertEqual(p.decode(), ())
def test_27_answerUnfinishedTransaction(self):
tid = self.getNextTID()
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tid3 = self.getNextTID()
tid4 = self.getNextTID()
tid_list = [tid1, tid2, tid3, tid4]
p = Packets.AnswerUnfinishedTransactions(tid_list)
p_tid_list = p.decode()[0]
p = Packets.AnswerUnfinishedTransactions(tid, tid_list)
p_tid, p_tid_list = p.decode()
self.assertEqual(p_tid, tid)
self.assertEqual(p_tid_list, tid_list)
def test_28_askObjectPresent(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment