Commit 9e433594 authored by Julien Muchembled's avatar Julien Muchembled

In STOPPING cluster state, really wait for all transaction to be finished

parent 35737c9b
...@@ -158,6 +158,10 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -158,6 +158,10 @@ class PrimaryNotificationsHandler(MTEventHandler):
def stopOperation(self, conn): def stopOperation(self, conn):
logging.critical("master node ask to stop operation") logging.critical("master node ask to stop operation")
def notifyClusterInformation(self, conn, state):
# TODO: on shutdown, abort any transaction that is not voted
logging.info("cluster switching to %s state", state)
def invalidateObjects(self, conn, tid, oid_list): def invalidateObjects(self, conn, tid, oid_list):
app = self.app app = self.app
if app.ignore_invalidations: if app.ignore_invalidations:
......
...@@ -276,6 +276,7 @@ class AnswerBaseHandler(EventHandler): ...@@ -276,6 +276,7 @@ class AnswerBaseHandler(EventHandler):
class _DelayedConnectionEvent(EventHandler): class _DelayedConnectionEvent(EventHandler):
# WARNING: This assumes that the connection handler does not change.
handler_method_name = '_func' handler_method_name = '_func'
__new__ = object.__new__ __new__ = object.__new__
......
...@@ -460,6 +460,7 @@ class Application(BaseApplication): ...@@ -460,6 +460,7 @@ class Application(BaseApplication):
elif state == ClusterStates.BACKINGUP: elif state == ClusterStates.BACKINGUP:
handler = self.client_ro_service_handler handler = self.client_ro_service_handler
else: else:
if state != ClusterStates.STOPPING:
conn.abort() conn.abort()
continue continue
elif node.isStorage() and storage_handler: elif node.isStorage() and storage_handler:
...@@ -489,15 +490,18 @@ class Application(BaseApplication): ...@@ -489,15 +490,18 @@ class Application(BaseApplication):
def shutdown(self): def shutdown(self):
"""Close all connections and exit""" """Close all connections and exit"""
self.changeClusterState(ClusterStates.STOPPING) self.changeClusterState(ClusterStates.STOPPING)
self.listening_conn.close() # Marking a fictional storage node as starting operation blocks any
for conn in self.em.getConnectionList(): # request to start a new transaction. Do this way has 2 advantages:
node = self.nm.getByUUID(conn.getUUID()) # - It's simpler than changing the handler of all clients,
if node is None or not node.isIdentified(): # which is anyway not supported by EventQueue.
conn.close() # - Returning an error code would cause activity on client side for
# No need to change handlers in order to reject RequestIdentification # nothing.
# & AskBeginTransaction packets because they won't be any: # What's important is to not abort during the second phase of commits
# the only remaining connected peers are identified non-clients # and for this, clients must even be able to reconnect, in case of
# and we don't accept new connections anymore. # failure during tpc_finish.
# We're rarely involved in vote, so we have to trust clients that they
# abort any transaction that is still in the first phase.
self.storage_starting_set.add(None)
try: try:
# wait for all transaction to be finished # wait for all transaction to be finished
while self.tm.hasPending(): while self.tm.hasPending():
...@@ -506,13 +510,13 @@ class Application(BaseApplication): ...@@ -506,13 +510,13 @@ class Application(BaseApplication):
logging.critical('No longer operational') logging.critical('No longer operational')
logging.info("asking remaining nodes to shutdown") logging.info("asking remaining nodes to shutdown")
self.listening_conn.close()
handler = EventHandler(self) handler = EventHandler(self)
now = monotonic_time()
for node in self.nm.getConnectedList(): for node in self.nm.getConnectedList():
conn = node.getConnection() conn = node.getConnection()
if node.isStorage(): if node.isStorage():
conn.setHandler(handler) conn.setHandler(handler)
conn.send(Packets.NotifyNodeInformation(now, (( conn.send(Packets.NotifyNodeInformation(monotonic_time(), ((
node.getType(), node.getAddress(), node.getUUID(), node.getType(), node.getAddress(), node.getUUID(),
NodeStates.TEMPORARILY_DOWN, None),))) NodeStates.TEMPORARILY_DOWN, None),)))
conn.abort() conn.abort()
......
...@@ -29,7 +29,8 @@ from transaction.interfaces import TransientError ...@@ -29,7 +29,8 @@ from transaction.interfaces import TransientError
from ZODB import DB, POSException from ZODB import DB, POSException
from ZODB.DB import TransactionalUndo from ZODB.DB import TransactionalUndo
from neo.storage.transactions import TransactionManager, ConflictError from neo.storage.transactions import TransactionManager, ConflictError
from neo.lib.connection import ServerConnection, MTClientConnection from neo.lib.connection import ConnectionClosed, \
ServerConnection, MTClientConnection
from neo.lib.exception import DatabaseFailure, StoppedOperation from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib import logging from neo.lib import logging
...@@ -824,27 +825,46 @@ class Test(NEOThreadedTest): ...@@ -824,27 +825,46 @@ class Test(NEOThreadedTest):
self._testShutdown(cluster) self._testShutdown(cluster)
def _testShutdown(self, cluster): def _testShutdown(self, cluster):
if 1: def before_finish(_):
# fill DB a little
t, c = cluster.getTransaction()
c.root()[''] = ''
t.commit()
# tell admin to shutdown the cluster # tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING) cluster.neoctl.setClusterState(ClusterStates.STOPPING)
self.tic()
l = threading.Lock(); l.acquire()
with ConnectionFilter() as f:
# Make we sure that we send t2/BeginTransaction
# before t1/AskFinishTransaction
@f.delayAskBeginTransaction
def delay(_):
l.release()
return False
t2.start()
l.acquire()
t1, c1 = cluster.getTransaction()
ob = c1.root()['1'] = PCounter()
t1.commit()
ob.value += 1
TransactionalResource(t1, 0, tpc_finish=before_finish)
t2, c2 = cluster.getTransaction()
c2.root()['2'] = None
t2 = self.newPausedThread(t2.commit)
with Patch(cluster.client, _connectToPrimaryNode=lambda *_:
self.fail("unexpected reconnection to master")):
t1.commit()
self.assertRaises(ConnectionClosed, t2.join)
# all nodes except clients should exit # all nodes except clients should exit
cluster.join(cluster.master_list cluster.join(cluster.master_list
+ cluster.storage_list + cluster.storage_list
+ cluster.admin_list) + cluster.admin_list)
cluster.stop() # stop and reopen DB to check partition tables cluster.stop() # stop and reopen DB to check partition tables
dm = cluster.storage_list[0].dm cluster.start()
self.assertEqual(1, dm.getPTID()) pt = cluster.admin.pt
pt = list(dm.getPartitionTable()) self.assertEqual(1, pt.getID())
self.assertEqual(20, len(pt)) for row in pt.partition_list:
for _, _, state in pt: for cell in row:
self.assertEqual(state, CellStates.UP_TO_DATE) self.assertEqual(cell.getState(), CellStates.UP_TO_DATE)
for s in cluster.storage_list[1:]: t, c = cluster.getTransaction()
self.assertEqual(s.dm.getPTID(), 1) self.assertEqual(c.root()['1'].value, 1)
self.assertEqual(list(s.dm.getPartitionTable()), pt) self.assertNotIn('2', c.root())
@with_cluster() @with_cluster()
def testInternalInvalidation(self, cluster): def testInternalInvalidation(self, cluster):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment