Commit a74937c8 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix PT updates in case of late AnswerUnfinishedTransactions

This is done by moving
        self.replicator.populate()
after the switch to MasterOperationHandler, so that the latter is not delayed.

This change comes with some refactoring of the main loop,
to clean up app.checker and app.replicator properly (like app.tm).

Another option could have been to process notifications with the last handler,
instead of the first one. But if possible, cleaning up the whole code to not
delay handlers anymore looks the best option.
parent 041a3eda
...@@ -118,12 +118,12 @@ class HandlerSwitcher(object): ...@@ -118,12 +118,12 @@ class HandlerSwitcher(object):
logging.debug('Ignoring packet %r on closed connection %r', logging.debug('Ignoring packet %r on closed connection %r',
packet, connection) packet, connection)
return return
msg_id = packet.getId() if not packet.isResponse(): # notification
(request_dict, handler) = self._pending[0] # XXX: If there are several handlers, which one to use ?
# notifications are not expected self._pending[0][1].packetReceived(connection, packet)
if not packet.isResponse():
handler.packetReceived(connection, packet)
return return
msg_id = packet.getId()
request_dict, handler = self._pending[0]
# checkout the expected answer class # checkout the expected answer class
try: try:
klass, _, _, kw = request_dict.pop(msg_id) klass, _, _, kw = request_dict.pop(msg_id)
......
...@@ -38,7 +38,7 @@ from neo.lib.debug import register as registerLiveDebugger ...@@ -38,7 +38,7 @@ from neo.lib.debug import register as registerLiveDebugger
class Application(BaseApplication): class Application(BaseApplication):
"""The storage node application.""" """The storage node application."""
tm = None checker = replicator = tm = None
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__( super(Application, self).__init__(
...@@ -62,8 +62,6 @@ class Application(BaseApplication): ...@@ -62,8 +62,6 @@ class Application(BaseApplication):
# partitions. # partitions.
self.pt = None self.pt = None
self.checker = Checker(self)
self.replicator = Replicator(self)
self.listening_conn = None self.listening_conn = None
self.master_conn = None self.master_conn = None
self.master_node = None self.master_node = None
...@@ -171,9 +169,9 @@ class Application(BaseApplication): ...@@ -171,9 +169,9 @@ class Application(BaseApplication):
# Connect to a primary master node, verify data, and # Connect to a primary master node, verify data, and
# start the operation. This cycle will be executed permanently, # start the operation. This cycle will be executed permanently,
# until the user explicitly requests a shutdown. # until the user explicitly requests a shutdown.
self.ready = False
while True: while True:
self.cluster_state = None self.cluster_state = None
self.ready = False
self.operational = False self.operational = False
if self.master_node is None: if self.master_node is None:
# look for the primary master # look for the primary master
...@@ -182,10 +180,8 @@ class Application(BaseApplication): ...@@ -182,10 +180,8 @@ class Application(BaseApplication):
node = self.nm.getByUUID(self.uuid) node = self.nm.getByUUID(self.uuid)
if node is not None and node.isHidden(): if node is not None and node.isHidden():
self.wait() self.wait()
# drop any client node self.checker = Checker(self)
for conn in self.em.getConnectionList(): self.replicator = Replicator(self)
if conn not in (self.listening_conn, self.master_conn):
conn.close()
self.tm = TransactionManager(self) self.tm = TransactionManager(self)
try: try:
self.initialize() self.initialize()
...@@ -196,8 +192,15 @@ class Application(BaseApplication): ...@@ -196,8 +192,15 @@ class Application(BaseApplication):
except PrimaryFailure, msg: except PrimaryFailure, msg:
logging.error('primary master is down: %s', msg) logging.error('primary master is down: %s', msg)
finally: finally:
self.checker = Checker(self) self.ready = False
del self.tm # When not ready, we reject any incoming connection so for
# consistency, we also close any connection except that to the
# master. This includes connections to other storage nodes and any
# replication is aborted, whether we are feeding or out-of-date.
for conn in self.em.getConnectionList():
if conn not in (self.listening_conn, self.master_conn):
conn.close()
del self.checker, self.replicator, self.tm
def connectToPrimary(self): def connectToPrimary(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
...@@ -209,11 +212,6 @@ class Application(BaseApplication): ...@@ -209,11 +212,6 @@ class Application(BaseApplication):
at this stage.""" at this stage."""
pt = self.pt pt = self.pt
# First of all, make sure that I have no connection.
for conn in self.em.getConnectionList():
if not conn.isListening():
conn.close()
# search, find, connect and identify to the primary master # search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server) bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server)
self.master_node, self.master_conn, num_partitions, num_replicas = \ self.master_node, self.master_conn, num_partitions, num_replicas = \
...@@ -245,7 +243,6 @@ class Application(BaseApplication): ...@@ -245,7 +243,6 @@ class Application(BaseApplication):
_poll() _poll()
self.ready = True self.ready = True
self.master_conn.notify(Packets.NotifyReady()) self.master_conn.notify(Packets.NotifyReady())
self.replicator.populate()
def doOperation(self): def doOperation(self):
"""Handle everything, including replications and transactions.""" """Handle everything, including replications and transactions."""
...@@ -255,8 +252,8 @@ class Application(BaseApplication): ...@@ -255,8 +252,8 @@ class Application(BaseApplication):
_poll = self.em._poll _poll = self.em._poll
isIdle = self.em.isIdle isIdle = self.em.isIdle
handler = master.MasterOperationHandler(self) self.master_conn.setHandler(master.MasterOperationHandler(self))
self.master_conn.setHandler(handler) self.replicator.populate()
# Forget all unfinished data. # Forget all unfinished data.
self.dm.dropUnfinishedData() self.dm.dropUnfinishedData()
...@@ -277,13 +274,6 @@ class Application(BaseApplication): ...@@ -277,13 +274,6 @@ class Application(BaseApplication):
poll() poll()
finally: finally:
del self.task_queue del self.task_queue
# XXX: Although no handled exception should happen between
# replicator.populate() and the beginning of this 'try'
# clause, the replicator should be reset in a safer place.
self.replicator = Replicator(self)
# Abort any replication, whether we are feeding or out-of-date.
for node in self.nm.getStorageList(only_identified=True):
node.getConnection().close()
def changeClusterState(self, state): def changeClusterState(self, state):
self.cluster_state = state self.cluster_state = state
......
...@@ -66,9 +66,6 @@ class BaseMasterHandler(BaseHandler): ...@@ -66,9 +66,6 @@ class BaseMasterHandler(BaseHandler):
uuid_str(uuid)) uuid_str(uuid))
self.app.tm.abortFor(uuid) self.app.tm.abortFor(uuid)
def answerUnfinishedTransactions(self, conn, *args, **kw):
self.app.replicator.setUnfinishedTIDList(*args, **kw)
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid))) conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
......
...@@ -61,6 +61,9 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -61,6 +61,9 @@ class MasterOperationHandler(BaseMasterHandler):
logging.info('Pack finished.') logging.info('Pack finished.')
conn.answer(Packets.AnswerPack(True)) conn.answer(Packets.AnswerPack(True))
def answerUnfinishedTransactions(self, conn, *args, **kw):
self.app.replicator.setUnfinishedTIDList(*args, **kw)
def replicate(self, conn, tid, upstream_name, source_dict): def replicate(self, conn, tid, upstream_name, source_dict):
self.app.replicator.backup(tid, {p: a and (a, upstream_name) self.app.replicator.backup(tid, {p: a and (a, upstream_name)
for p, a in source_dict.iteritems()}) for p, a in source_dict.iteritems()})
......
...@@ -46,7 +46,7 @@ class StorageOperationHandler(EventHandler): ...@@ -46,7 +46,7 @@ class StorageOperationHandler(EventHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
if app.listening_conn and conn.isClient(): if app.ready and conn.isClient():
# XXX: Connection and Node should merged. # XXX: Connection and Node should merged.
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid: if uuid:
...@@ -62,7 +62,7 @@ class StorageOperationHandler(EventHandler): ...@@ -62,7 +62,7 @@ class StorageOperationHandler(EventHandler):
# Client # Client
def connectionFailed(self, conn): def connectionFailed(self, conn):
if self.app.listening_conn: if self.app.ready:
self.app.replicator.abort() self.app.replicator.abort()
def _acceptIdentification(self, node, *args): def _acceptIdentification(self, node, *args):
......
...@@ -286,6 +286,7 @@ class Replicator(object): ...@@ -286,6 +286,7 @@ class Replicator(object):
if self.current_partition is not None or not self.replicate_dict: if self.current_partition is not None or not self.replicate_dict:
return return
app = self.app app = self.app
assert app.master_conn and app.ready, (app.master_conn, app.ready)
# Start replicating the partition which is furthest behind, # Start replicating the partition which is furthest behind,
# to increase the overall backup_tid as soon as possible. # to increase the overall backup_tid as soon as possible.
# Then prefer a partition with no unfinished transaction. # Then prefer a partition with no unfinished transaction.
......
...@@ -997,8 +997,8 @@ class NEOThreadedTest(NeoTestBase): ...@@ -997,8 +997,8 @@ class NEOThreadedTest(NeoTestBase):
with Patch(client, _getFinalTID=lambda *_: None): with Patch(client, _getFinalTID=lambda *_: None):
self.assertRaises(ConnectionClosed, txn.commit) self.assertRaises(ConnectionClosed, txn.commit)
def assertPartitionTable(self, cluster, stats): def assertPartitionTable(self, cluster, stats, pt_node=None):
pt = cluster.admin.pt pt = (pt_node or cluster.admin).pt
index = [x.uuid for x in cluster.storage_list].index index = [x.uuid for x in cluster.storage_list].index
self.assertEqual(stats, '|'.join(pt._formatRows(sorted( self.assertEqual(stats, '|'.join(pt._formatRows(sorted(
pt.count_dict, key=lambda x: index(x.getUUID()))))) pt.count_dict, key=lambda x: index(x.getUUID())))))
......
...@@ -474,6 +474,25 @@ class Test(NEOThreadedTest): ...@@ -474,6 +474,25 @@ class Test(NEOThreadedTest):
# but we would need an API to do that easily. # but we would need an API to do that easily.
self.assertFalse(cluster.client.dispatcher.registered(conn)) self.assertFalse(cluster.client.dispatcher.registered(conn))
@with_cluster(replicas=2)
def test_notifyPartitionChanges(self, cluster):
cluster.db
s0, s1, s2 = cluster.storage_list
s1.stop()
cluster.join((s1,))
s1.resetNode()
# This checks that s1 processes any PT update
# (by MasterOperationHandler) even if it receives one before the
# AnswerUnfinishedTransactions packet.
with ConnectionFilter() as f:
f.delayAskUnfinishedTransactions()
s1.start()
self.tic()
s2.stop()
cluster.join((s2,))
self.tic()
self.assertPartitionTable(cluster, 'UUO', s1)
@with_cluster(replicas=1, partitions=10) @with_cluster(replicas=1, partitions=10)
def testRestartWithMissingStorage(self, cluster): def testRestartWithMissingStorage(self, cluster):
# translated from neo.tests.functional.testStorage.StorageTest # translated from neo.tests.functional.testStorage.StorageTest
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment