# # Copyright (C) 2006-2009 Nexedi SA # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from neo import logging from neo import protocol from neo.protocol import MASTER_NODE_TYPE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ DOWN_STATE from neo.master.handlers import MasterHandler from neo.exception import ElectionFailure class ElectionHandler(MasterHandler): """This class deals with events for a primary master election.""" def handleNotifyNodeInformation(self, conn, packet, node_list): uuid = conn.getUUID() if uuid is None: raise protocol.UnexpectedPacketError app = self.app for node_type, addr, uuid, state in node_list: if node_type != MASTER_NODE_TYPE: # No interest. continue # Register new master nodes. if app.server == addr: # This is self. continue else: node = app.nm.getNodeByServer(addr) # The master must be known assert node is not None if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if node.getUUID() is None: node.setUUID(uuid) if state in (node.getState(), RUNNING_STATE): # No change. Don't care. continue # Something wrong happened possibly. Cut the connection to # this node, if any, and notify the information to others. # XXX this can be very slow. for c in app.em.getConnectionList(): if c.getUUID() == uuid: c.close() node.setState(state) class ClientElectionHandler(ElectionHandler): def packetReceived(self, conn, packet): node = self.app.nm.getNodeByServer(conn.getAddress()) if node.getState() != BROKEN_STATE: node.setState(RUNNING_STATE) MasterHandler.packetReceived(self, conn, packet) def connectionStarted(self, conn): app = self.app addr = conn.getAddress() app.unconnected_master_node_set.remove(addr) app.negotiating_master_node_set.add(addr) MasterHandler.connectionStarted(self, conn) def connectionCompleted(self, conn): conn.ask(protocol.askPrimaryMaster()) MasterHandler.connectionCompleted(self, conn) def connectionClosed(self, conn): self.connectionFailed(conn) MasterHandler.connectionClosed(self, conn) def timeoutExpired(self, conn): self.connectionFailed(conn) MasterHandler.timeoutExpired(self, conn) def connectionFailed(self, conn): app = self.app addr = conn.getAddress() app.negotiating_master_node_set.discard(addr) node = app.nm.getNodeByServer(addr) if node.getState() == RUNNING_STATE: app.unconnected_master_node_set.add(addr) node.setState(TEMPORARILY_DOWN_STATE) elif node.getState() == TEMPORARILY_DOWN_STATE: app.unconnected_master_node_set.add(addr) MasterHandler.connectionFailed(self, conn) def peerBroken(self, conn): app = self.app addr = conn.getAddress() node = app.nm.getNodeByServer(addr) if node is not None: node.setState(DOWN_STATE) app.negotiating_master_node_set.discard(addr) MasterHandler.peerBroken(self, conn) def handleAcceptNodeIdentification(self, conn, packet, node_type, uuid, address, num_partitions, num_replicas, your_uuid): app = self.app node = app.nm.getNodeByServer(conn.getAddress()) if node_type != MASTER_NODE_TYPE: # The peer is not a master node! logging.error('%s:%d is not a master node', *address) app.nm.remove(node) app.negotiating_master_node_set.discard(node.getServer()) conn.close() return if conn.getAddress() != address: # The server address is different! Then why was # the connection successful? logging.error('%s:%d is waiting for %s:%d', conn.getAddress()[0], conn.getAddress()[1], *address) app.nm.remove(node) app.negotiating_master_node_set.discard(node.getServer()) conn.close() return if your_uuid != app.uuid: # uuid conflict happened, accept the new one and restart election app.uuid = your_uuid raise ElectionFailure, 'new uuid supplied' conn.setUUID(uuid) node.setUUID(uuid) if app.uuid < uuid: # I lost. app.primary = False app.negotiating_master_node_set.discard(conn.getAddress()) def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list): if conn.getConnector() is None: # Connection can be closed by peer after he sent # AnswerPrimaryMaster if he finds the primary master before we # give him our UUID. # The connection gets closed before this message gets processed # because this message might have been queued, but connection # interruption takes effect as soon as received. return app = self.app # Register new master nodes. for address, uuid in known_master_list: if app.server == address: # This is self. continue else: n = app.nm.getNodeByServer(address) # master node must be known assert n is not None if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if n.getUUID() is None or n.getUUID() != uuid: n.setUUID(uuid) if primary_uuid is not None: # The primary master is defined. if app.primary_master_node is not None \ and app.primary_master_node.getUUID() != primary_uuid: # There are multiple primary master nodes. This is # dangerous. raise ElectionFailure, 'multiple primary master nodes' primary_node = app.nm.getNodeByUUID(primary_uuid) if primary_node is None: # I don't know such a node. Probably this information # is old. So ignore it. logging.warning('received an unknown primary node UUID') else: if primary_node.getUUID() == primary_uuid: # Whatever the situation is, I trust this master. app.primary = False app.primary_master_node = primary_node # Stop waiting for connections than primary master's to # complete to exit election phase ASAP. primary_server = primary_node.getServer() app.unconnected_master_node_set.intersection_update( [primary_server]) app.negotiating_master_node_set.intersection_update( [primary_server]) # Request a node idenfitication. conn.ask(protocol.requestNodeIdentification(MASTER_NODE_TYPE, app.uuid, app.server, app.name)) class ServerElectionHandler(ElectionHandler): def handleReelectPrimaryMaster(self, conn, packet): raise ElectionFailure, 'reelection requested' def peerBroken(self, conn): app = self.app addr = conn.getAddress() node = app.nm.getNodeByServer(addr) if node is not None and node.getUUID() is not None: node.setState(BROKEN_STATE) MasterHandler.peerBroken(self, conn) def handleRequestNodeIdentification(self, conn, packet, node_type, uuid, address, name): if conn.getConnector() is None: # Connection can be closed by peer after he sent # RequestNodeIdentification if he finds the primary master before # we answer him. # The connection gets closed before this message gets processed # because this message might have been queued, but connection # interruption takes effect as soon as received. return self.checkClusterName(name) app = self.app if node_type != MASTER_NODE_TYPE: logging.info('reject a connection from a non-master') raise protocol.NotReadyError node = app.nm.getNodeByServer(address) if node is None: logging.error('unknown master node: %s' % (address, )) raise protocol.ProtocolError('unknown master node') # If this node is broken, reject it. if node.getUUID() == uuid: if node.getState() == BROKEN_STATE: raise protocol.BrokenNodeDisallowedError # supplied another uuid in case of conflict while not app.isValidUUID(uuid, address): uuid = app.getNewUUID(node_type) node.setUUID(uuid) conn.setUUID(uuid) p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, app.uuid, app.server, app.pt.getPartitions(), app.pt.getReplicas(), uuid) conn.answer(p, packet.getId()) def handleAnnouncePrimaryMaster(self, conn, packet): uuid = conn.getUUID() if uuid is None: raise protocol.UnexpectedPacketError app = self.app if app.primary: # I am also the primary... So restart the election. raise ElectionFailure, 'another primary arises' node = app.nm.getNodeByUUID(uuid) app.primary = False app.primary_master_node = node logging.info('%s is the primary', node)