Commit 7301d385 by Julien Muchembled

Implement proper shutdown (aka STOPPING state)

In STOPPING state, the primary:
1. rejects all clients and any new node
2. finalize remaining transactions
   (i.e. those for which a tcp_finish was received)
3. finally shut down all nodes of the cluster
1 parent 854c69b9
......@@ -119,14 +119,20 @@ configuration file.
Shutting down
-------------
There is no administration command yet to stop properly a running cluster.
So following manual actions should be done:
Before shutting down NEO, all clients like Zope instances should be stopped,
so that cluster become idle. This is required for multi-DB setups, to prevent
critical failures in second phase of TPC.
a. Make sure all clients like Zope instances are stopped, so that cluster
become idle.
b. Stop all master nodes first with a SIGINT or SIGTERM, so that storage nodes
A cluster (i.e. masters+storages+admin) can be stopped gracefully by putting it
in STOPPING state using neoctl::
neoctl -a <admin> set cluster STOPPING
This can also be done manually, which helps if your cluster is in bad state:
- Stop all master nodes first with a SIGINT or SIGTERM, so that storage nodes
don't become in OUT_OF_DATE state.
c. At last stop remaining nodes with a SIGINT or SIGTERM.
- Next stop remaining nodes with a SIGINT or SIGTERM.
Deployment
==========
......
......@@ -80,7 +80,7 @@ RC - tpc_finish might raise while transaction got successfully committed.
Do the replication process, the verification stage, with or without
unfinished transactions, cells have to set as outdated, if yes, should the
partition table changes be broadcasted ? (BANDWITH, SPEED)
- Implement proper shutdown (ClusterStates.STOPPING)
- Make SIGINT on primary master change cluster in STOPPING state.
- Review PENDING/HIDDEN/SHUTDOWN states, don't use notifyNodeInformation()
to do a state-switch, use a exception-based mechanism ? (CODE)
- Split protocol.py in a 'protocol' module ?
......
......@@ -24,7 +24,8 @@ from .handler import AdminEventHandler, MasterEventHandler, \
from neo.lib.connector import getConnectorHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import NodeTypes, NodeStates, Packets, Errors
from neo.lib.protocol import ClusterStates, Errors, \
NodeTypes, NodeStates, Packets
from neo.lib.debug import register as registerLiveDebugger
class Application(object):
......@@ -88,13 +89,16 @@ class Application(object):
self.listening_conn = ListeningConnection(self.em, handler,
addr=self.server, connector=self.connector_handler())
while True:
while self.cluster_state != ClusterStates.STOPPING:
self.connectToPrimary()
try:
while True:
self.em.poll(1)
except PrimaryFailure:
logging.error('primary master is down')
self.listening_conn.close()
while not self.em.isIdle():
self.em.poll(1)
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
......
......@@ -24,6 +24,7 @@ from neo.lib.protocol import UUID_NAMESPACES, ZERO_TID, NotReadyError
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.handler import EventHandler
from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.lib.util import dump
......@@ -32,7 +33,7 @@ class StateChangedException(Exception): pass
from .backup_app import BackupApplication
from .handlers import election, identification, secondary
from .handlers import administration, client, storage, shutdown
from .handlers import administration, client, storage
from .pt import PartitionTable
from .recovery import RecoveryManager
from .transactions import TransactionManager
......@@ -143,7 +144,7 @@ class Application(object):
addr=self.server, connector=self.connector_handler())
# Start a normal operation.
while True:
while self.cluster_state != ClusterStates.STOPPING:
# (Re)elect a new primary master.
self.primary = not self.nm.getMasterList()
if not self.primary:
......@@ -294,7 +295,8 @@ class Application(object):
# nodes and client nodes. Abort connections to client nodes.
logging.critical('No longer operational')
except StateChangedException, e:
assert e.args[0] == ClusterStates.STARTING_BACKUP
if e.args[0] != ClusterStates.STARTING_BACKUP:
raise
self.backup_tid = tid = self.getLastTransaction()
self.pt.setBackupTidDict(dict((node.getUUID(), tid)
for node in self.nm.getStorageList(only_identified=True)))
......@@ -342,21 +344,25 @@ class Application(object):
in_conflict.setUUID(None)
# recover the cluster status at startup
self.runManager(RecoveryManager)
while True:
self.runManager(VerificationManager)
if self.backup_tid:
if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup"
" defined in configuration")
self.backup_app.provideService()
else:
self.provideService()
for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation())
if node.isClient():
node.getConnection().abort()
try:
self.runManager(RecoveryManager)
while True:
self.runManager(VerificationManager)
if self.backup_tid:
if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup"
" defined in configuration")
self.backup_app.provideService()
else:
self.provideService()
for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation())
if node.isClient():
node.getConnection().abort()
except StateChangedException, e:
assert e.args[0] == ClusterStates.STOPPING
self.shutdown()
def playSecondaryRole(self):
"""
......@@ -412,22 +418,22 @@ class Application(object):
storage_handler = self.storage_service_handler
elif self._current_manager is not None:
storage_handler = self._current_manager.getHandler()
elif state == ClusterStates.STOPPING:
storage_handler = None
else:
raise RuntimeError('Unexpected cluster state')
# change handlers
notification_packet = Packets.NotifyClusterInformation(state)
for node in self.nm.getIdentifiedList():
if node.isMaster():
continue
conn = node.getConnection()
node.notify(notification_packet)
conn.notify(notification_packet)
if node.isClient():
if state != ClusterStates.RUNNING:
conn.abort()
continue
handler = client_handler
elif node.isStorage():
elif node.isStorage() and storage_handler:
handler = storage_handler
else:
continue # keep handler
......@@ -451,35 +457,47 @@ class Application(object):
def shutdown(self):
"""Close all connections and exit"""
# XXX: This behaviour is probably broken, as it applies the same
# handler to all connection types. It must be carefuly reviewed and
# corrected.
# change handler
handler = shutdown.ShutdownHandler(self)
for node in self.nm.getIdentifiedList():
node.getConnection().setHandler(handler)
self.changeClusterState(ClusterStates.STOPPING)
self.listening_conn.close()
for conn in self.em.getConnectionList():
node = self.nm.getByUUID(conn.getUUID())
if node is None or not node.isIdentified():
conn.close()
# No need to change handlers in order to reject RequestIdentification
# & AskBeginTransaction packets because they won't be any:
# the only remaining connected peers are identified non-clients
# and we don't accept new connections anymore.
try:
# wait for all transaction to be finished
while self.tm.hasPending():
self.em.poll(1)
except OperationFailure:
# If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes.
logging.critical('No longer operational')
# wait for all transaction to be finished
while self.tm.hasPending():
self.em.poll(1)
logging.info("asking remaining nodes to shutdown")
handler = EventHandler(self)
for node in self.nm.getConnectedList():
conn = node.getConnection()
if node.isStorage():
conn.setHandler(handler)
conn.notify(Packets.NotifyNodeInformation(((
node.getType(), node.getAddress(), node.getUUID(),
NodeStates.TEMPORARILY_DOWN),)))
conn.abort()
elif conn.pending():
conn.abort()
else:
conn.close()
if self.cluster_state != ClusterStates.RUNNING:
logging.info("asking all nodes to shutdown")
# This code sends packets but never polls, so they never reach
# network.
for node in self.nm.getIdentifiedList():
notification = Packets.NotifyNodeInformation([node.asTuple()])
if node.isClient():
node.notify(notification)
elif node.isStorage() or node.isMaster():
node.notify(notification)
while self.em.connection_dict:
self.em.poll(1)
# then shutdown
sys.exit()
def identifyStorageNode(self, known):
if self.cluster_state == ClusterStates.STOPPING:
raise NotReadyError
if known:
state = NodeStates.RUNNING
else:
......
......@@ -46,7 +46,8 @@ class AdministrationHandler(MasterHandler):
if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
raise ProtocolError('Can not switch to this state')
except KeyError:
raise ProtocolError('Invalid state requested')
if state != ClusterStates.STOPPING:
raise ProtocolError('Invalid state requested')
# change state
if state == ClusterStates.VERIFYING:
......@@ -65,8 +66,6 @@ class AdministrationHandler(MasterHandler):
if app.tm.hasPending() or app.nm.getClientList(True):
raise ProtocolError("Can not switch to %s state with pending"
" transactions or connected clients" % state)
elif state != ClusterStates.STOPPING_BACKUP:
app.changeClusterState(state)
conn.answer(Errors.Ack('Cluster state changed'))
if state != app.cluster_state:
......
......@@ -64,6 +64,9 @@ class PrimaryHandler(EventHandler):
def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested'
def notifyClusterInformation(self, conn, state):
self.app.cluster_state = state
def notifyNodeInformation(self, conn, node_list):
app = self.app
for node_type, addr, uuid, state in node_list:
......
#
# Copyright (C) 2006-2012 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import neo.lib
from neo.lib import protocol
from . import BaseServiceHandler
def reject(*args, **kw):
raise protocol.ProtocolError('cluster is shutting down')
class ShutdownHandler(BaseServiceHandler):
"""This class deals with events for a shutting down phase."""
requestIdentification = reject
askBeginTransaction = reject
......@@ -14,6 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# XXX: Consider using ClusterStates.STOPPING to stop clusters
import os, random, socket, sys, tempfile, threading, time, types, weakref
import traceback
from collections import deque
......@@ -56,7 +58,7 @@ class Serialized(object):
"""Suspend lock owner and resume first suspended thread"""
if lock is None:
lock = cls._global_lock
if stop: # XXX: we should fix ClusterStates.STOPPING
if stop:
cls.pending = frozenset(stop)
else:
cls.pending = 0
......@@ -82,7 +84,7 @@ class Serialized(object):
lock = cls._global_lock
lock.acquire()
pending = cls.pending # XXX: getattr once to avoid race conditions
if type(pending) is frozenset: # XXX
if type(pending) is frozenset:
if lock is cls._global_lock:
cls.pending = 0
elif threading.currentThread() in pending:
......@@ -652,8 +654,12 @@ class NEOCluster(object):
def stop(self):
if hasattr(self, '_db') and self.client.em._timeout == 0:
self.client.setPoll(True)
self.__dict__.pop('_db', self.client).close()
#self.neoctl.setClusterState(ClusterStates.STOPPING) # TODO
sync = Storage.Storage.sync.im_func
Storage.Storage.sync = lambda self, force=True: None
try:
self.__dict__.pop('_db', self.client).close()
finally:
Storage.Storage.sync = sync
try:
Serialized.release(stop=
self.admin_list + self.storage_list + self.master_list)
......
......@@ -16,6 +16,7 @@
import sys
import threading
import traceback
import transaction
import unittest
from thread import get_ident
......@@ -24,7 +25,8 @@ from ZODB import POSException
from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError
from neo.lib.connection import MTClientConnection
from neo.lib.protocol import ClusterStates, NodeStates, Packets, ZERO_TID
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_TID
from . import NEOCluster, NEOThreadedTest, Patch
from neo.lib.util import makeChecksum
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
......@@ -466,6 +468,41 @@ class Test(NEOThreadedTest):
finally:
cluster.stop()
def testShutdown(self):
cluster = NEOCluster(master_count=3, partitions=10,
replicas=1, storage_count=3)
try:
cluster.start()
# fill DB a little
t, c = cluster.getTransaction()
c.root()[''] = ''
t.commit()
cluster.client.setPoll(0)
# tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING)
cluster.tic()
# all nodes except clients should exit
for master in cluster.master_list:
master.join(5)
self.assertFalse(master.isAlive())
for storage in cluster.storage_list:
storage.join(5)
self.assertFalse(storage.isAlive())
cluster.admin.join(5)
self.assertFalse(cluster.admin.isAlive())
finally:
cluster.stop()
cluster.reset() # reopen DB to check partition tables
dm = cluster.storage_list[0].dm
self.assertEqual(1, dm.getPTID())
pt = dm.getPartitionTable()
self.assertEqual(20, len(pt))
for _, _, state in pt:
self.assertEqual(state, CellStates.UP_TO_DATE)
for s in cluster.storage_list[1:]:
self.assertEqual(s.dm.getPTID(), 1)
self.assertEqual(s.dm.getPartitionTable(), pt)
if __name__ == "__main__":
unittest.main()
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!