Commit e966eee6 authored by Grégory Wisniewski's avatar Grégory Wisniewski

- Move master handlers in the handlers module.

- Split service.py file in two, one for the client sercice handler and the
  second for the storage.
- Base class for those two handlers as moved in the module python file (as done
  in client and storage applications).
- Remove moved storages from previous commit.
- Update application code to use new module location
- Fix a part of master tests.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@841 71dcc9de-d417-0410-9af5-da40c76e7ee4
git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@842 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5b3beccb
...@@ -32,14 +32,7 @@ from neo.event import EventManager ...@@ -32,14 +32,7 @@ from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection, ServerConnection from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \ from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \
OperationFailure OperationFailure
from neo.master.identification import IdentificationEventHandler from neo.master import handlers
from neo.master.administration import AdministrationEventHandler
from neo.master.election import ClientElectionEventHandler, ServerElectionEventHandler
from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler
from neo.master.service import ClientServiceEventHandler, StorageServiceEventHandler
from neo.master.secondary import PrimaryMasterEventHandler, SecondaryMasterEventHandler
from neo.master.shutdown import ShutdownEventHandler
from neo.master.pt import PartitionTable from neo.master.pt import PartitionTable
from neo.util import dump from neo.util import dump
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
...@@ -132,8 +125,8 @@ class Application(object): ...@@ -132,8 +125,8 @@ class Application(object):
self.unconnected_master_node_set = set() self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set() self.negotiating_master_node_set = set()
self.listening_conn.setHandler(ServerElectionEventHandler(self)) self.listening_conn.setHandler(handlers.ServerElectionHandler(self))
client_handler = ClientElectionEventHandler(self) client_handler = handlers.ClientElectionHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
...@@ -618,7 +611,7 @@ class Application(object): ...@@ -618,7 +611,7 @@ class Application(object):
dump(self.uuid), *(self.server)) dump(self.uuid), *(self.server))
# all incoming connections identify through this handler # all incoming connections identify through this handler
self.listening_conn.setHandler(IdentificationEventHandler(self)) self.listening_conn.setHandler(handlers.IdentificationHandler(self))
# If I know any storage node, make sure that they are not in the running state, # If I know any storage node, make sure that they are not in the running state,
# because they are not connected at this stage. # because they are not connected at this stage.
...@@ -641,7 +634,7 @@ class Application(object): ...@@ -641,7 +634,7 @@ class Application(object):
logging.info('play the secondary role with %s (%s:%d)', logging.info('play the secondary role with %s (%s:%d)',
dump(self.uuid), *(self.server)) dump(self.uuid), *(self.server))
handler = PrimaryMasterEventHandler(self) handler = handlers.PrimaryMasterHandler(self)
em = self.em em = self.em
# Make sure that every connection has the secondary event handler. # Make sure that every connection has the secondary event handler.
...@@ -659,13 +652,13 @@ class Application(object): ...@@ -659,13 +652,13 @@ class Application(object):
# select the storage handler # select the storage handler
if state == protocol.BOOTING: if state == protocol.BOOTING:
storage_handler = RecoveryEventHandler storage_handler = handlers.RecoveryHandler
elif state == protocol.RECOVERING: elif state == protocol.RECOVERING:
storage_handler = RecoveryEventHandler storage_handler = handlers.RecoveryHandler
elif state == protocol.VERIFYING: elif state == protocol.VERIFYING:
storage_handler = VerificationEventHandler storage_handler = handlers.VerificationHandler
elif state == protocol.RUNNING: elif state == protocol.RUNNING:
storage_handler = StorageServiceEventHandler storage_handler = handlers.StorageServiceHandler
else: else:
RuntimeError('Unexpected node type') RuntimeError('Unexpected node type')
...@@ -684,7 +677,7 @@ class Application(object): ...@@ -684,7 +677,7 @@ class Application(object):
if node_type == CLIENT_NODE_TYPE: if node_type == CLIENT_NODE_TYPE:
if state != protocol.RUNNING: if state != protocol.RUNNING:
conn.close() conn.close()
handler = ClientServiceEventHandler handler = handlers.ClientServiceHandler
elif node_type == STORAGE_NODE_TYPE: elif node_type == STORAGE_NODE_TYPE:
handler = storage_handler handler = storage_handler
handler = handler(self) handler = handler(self)
...@@ -754,7 +747,7 @@ class Application(object): ...@@ -754,7 +747,7 @@ class Application(object):
def shutdown(self): def shutdown(self):
"""Close all connections and exit""" """Close all connections and exit"""
# change handler # change handler
handler = ShutdownEventHandler(self) handler = handlers.ShutdownHandler(self)
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
c.setHandler(handler) c.setHandler(handler)
...@@ -795,20 +788,20 @@ class Application(object): ...@@ -795,20 +788,20 @@ class Application(object):
if uuid == protocol.INVALID_UUID: if uuid == protocol.INVALID_UUID:
logging.info('reject empty storage node') logging.info('reject empty storage node')
raise protocol.NotReadyError raise protocol.NotReadyError
handler = RecoveryEventHandler handler = handlers.RecoveryHandler
elif self.cluster_state == protocol.VERIFYING: elif self.cluster_state == protocol.VERIFYING:
if uuid == INVALID_UUID or node is None: if uuid == INVALID_UUID or node is None:
# if node is unknown, it has been forget when the current # if node is unknown, it has been forget when the current
# partition was validated by the admin # partition was validated by the admin
uuid = INVALID_UUID uuid = INVALID_UUID
state = protocol.PENDING_STATE state = protocol.PENDING_STATE
handler = VerificationEventHandler handler = handlers.VerificationHandler
elif self.cluster_state == protocol.RUNNING: elif self.cluster_state == protocol.RUNNING:
if uuid == INVALID_UUID or node is None: if uuid == INVALID_UUID or node is None:
# same as for verification # same as for verification
uuid = INVALID_UUID uuid = INVALID_UUID
state = protocol.PENDING_STATE state = protocol.PENDING_STATE
handler = StorageServiceEventHandler handler = handlers.StorageServiceHandler
elif self.cluster_state == protocol.STOPPING: elif self.cluster_state == protocol.STOPPING:
# FIXME: raise a ShutdowningError ? # FIXME: raise a ShutdowningError ?
raise protocol.NotReadyError raise protocol.NotReadyError
...@@ -819,17 +812,17 @@ class Application(object): ...@@ -819,17 +812,17 @@ class Application(object):
def identifyNode(self, node_type, uuid, node): def identifyNode(self, node_type, uuid, node):
state = protocol.RUNNING_STATE state = protocol.RUNNING_STATE
handler = IdentificationEventHandler handler = handlers.IdentificationHandler
if node_type == protocol.ADMIN_NODE_TYPE: if node_type == protocol.ADMIN_NODE_TYPE:
# always accept admin nodes # always accept admin nodes
klass = AdminNode klass = AdminNode
handler = AdministrationEventHandler handler = handlers.AdministrationHandler
logging.info('Accept an admin %s' % dump(uuid)) logging.info('Accept an admin %s' % dump(uuid))
elif node_type == protocol.MASTER_NODE_TYPE: elif node_type == protocol.MASTER_NODE_TYPE:
# always put other master in waiting state # always put other master in waiting state
klass = MasterNode klass = MasterNode
handler = SecondaryMasterEventHandler handler = handlers.SecondaryMasterHandler
logging.info('Accept a master %s' % dump(uuid)) logging.info('Accept a master %s' % dump(uuid))
elif node_type == protocol.CLIENT_NODE_TYPE: elif node_type == protocol.CLIENT_NODE_TYPE:
# refuse any client before running # refuse any client before running
...@@ -837,8 +830,7 @@ class Application(object): ...@@ -837,8 +830,7 @@ class Application(object):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
raise protocol.NotReadyError raise protocol.NotReadyError
klass = ClientNode klass = ClientNode
# FIXME: Apply an handler dedicated to client nodes handler = handlers.ClientServiceHandler
handler = ClientServiceEventHandler
logging.info('Accept a client %s' % dump(uuid)) logging.info('Accept a client %s' % dump(uuid))
elif node_type == protocol.STORAGE_NODE_TYPE: elif node_type == protocol.STORAGE_NODE_TYPE:
klass = StorageNode klass = StorageNode
......
...@@ -20,7 +20,7 @@ import logging ...@@ -20,7 +20,7 @@ import logging
from neo import protocol from neo import protocol
from neo.handler import EventHandler from neo.handler import EventHandler
class MasterEventHandler(EventHandler): class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
def _nodeLost(self, conn, node): def _nodeLost(self, conn, node):
...@@ -144,3 +144,81 @@ class MasterEventHandler(EventHandler): ...@@ -144,3 +144,81 @@ class MasterEventHandler(EventHandler):
app.sendPartitionTable(conn) app.sendPartitionTable(conn)
conn.answer(protocol.answerPartitionTable(app.pt.getID(), []), packet) conn.answer(protocol.answerPartitionTable(app.pt.getID(), []), packet)
class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase."""
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type in (protocol.CLIENT_NODE_TYPE, protocol.ADMIN_NODE_TYPE):
# No interest.
continue
if uuid == protocol.INVALID_UUID:
# No interest.
continue
if app.uuid == uuid:
# This looks like me...
if state == protocol.RUNNING_STATE:
# Yes, I know it.
continue
else:
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
if node is None:
# I really don't know such a node. What is this?
continue
else:
if node.getServer() != addr:
# This is different from what I know.
continue
if node.getState() == state:
# No change. Don't care.
continue
node.setState(state)
# Something wrong happened possibly. Cut the connection to
# this node, if any, and notify the information to others.
# XXX this can be very slow.
# XXX does this need to be closed in all cases ?
c = app.em.getConnectionByUUID(uuid)
if c is not None:
c.close()
app.broadcastNodeInformation(node)
if node.getNodeType() == protocol.STORAGE_NODE_TYPE:
if state == protocol.TEMPORARILY_DOWN_STATE:
cell_list = app.pt.outdate()
if len(cell_list) != 0:
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
def handleAskLastIDs(self, conn, packet):
app = self.app
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.pt.getID()), packet)
def handleAskUnfinishedTransactions(self, conn, packet):
app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet)
# Import all master handlers in the current namespace
from neo.master.handlers.administration import AdministrationHandler
from neo.master.handlers.election import ClientElectionHandler, ServerElectionHandler
from neo.master.handlers.identification import IdentificationHandler
from neo.master.handlers.recovery import RecoveryHandler
from neo.master.handlers.secondary import SecondaryMasterHandler, PrimaryMasterHandler
from neo.master.handlers.shutdown import ShutdownHandler
from neo.master.handlers.verification import VerificationHandler
from neo.master.handlers.storage import StorageServiceHandler
from neo.master.handlers.client import ClientServiceHandler
...@@ -18,12 +18,12 @@ ...@@ -18,12 +18,12 @@
import logging import logging
from neo import protocol from neo import protocol
from neo.master.handler import MasterEventHandler from neo.master.handlers import MasterHandler
from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
STORAGE_NODE_TYPE, HIDDEN_STATE, PENDING_STATE, RUNNING STORAGE_NODE_TYPE, HIDDEN_STATE, PENDING_STATE, RUNNING
from neo.util import dump from neo.util import dump
class AdministrationEventHandler(MasterEventHandler): class AdministrationHandler(MasterHandler):
"""This class deals with messages from the admin node only""" """This class deals with messages from the admin node only"""
def _nodeLost(self, conn, node): def _nodeLost(self, conn, node):
......
#
# Copyright (C) 2006-2009 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo import protocol
from neo.protocol import CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \
HIDDEN_STATE, INVALID_UUID, INTERNAL_ERROR_CODE
from neo.master.handlers import BaseServiceHandler
from neo.protocol import UnexpectedPacketError
from neo.util import dump
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
def __init__(self, conn):
self._conn = conn
self._msg_id = None
self._oid_list = None
self._uuid_set = None
self._locked_uuid_set = set()
def getConnection(self):
return self._conn
def setMessageId(self, msg_id):
self._msg_id = msg_id
def getMessageId(self):
return self._msg_id
def setOIDList(self, oid_list):
self._oid_list = oid_list
def getOIDList(self):
return self._oid_list
def setUUIDSet(self, uuid_set):
self._uuid_set = uuid_set
def getUUIDSet(self):
return self._uuid_set
def addLockedUUID(self, uuid):
if uuid in self._uuid_set:
self._locked_uuid_set.add(uuid)
def allLocked(self):
return self._uuid_set == self._locked_uuid_set
class ClientServiceHandler(BaseServiceHandler):
""" Handler dedicated to client during service state """
def connectionCompleted(self, conn):
pass
def _nodeLost(self, conn, node):
app = self.app
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
def handleAbortTransaction(self, conn, packet, tid):
try:
del self.app.finishing_transaction_dict[tid]
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
pass
def handleAskNewTID(self, conn, packet):
app = self.app
tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerNewTID(tid), packet)
def handleAskNewOIDs(self, conn, packet, num_oids):
app = self.app
oid_list = app.getNewOIDList(num_oids)
conn.answer(protocol.answerNewOIDs(oid_list), packet)
def handleFinishTransaction(self, conn, packet, oid_list, tid):
app = self.app
# If the given transaction ID is later than the last TID, the peer
# is crazy.
if app.ltid < tid:
raise UnexpectedPacketError
# Collect partitions related to this transaction.
getPartition = app.getPartition
partition_set = set()
partition_set.add(getPartition(tid))
partition_set.update((getPartition(oid) for oid in oid_list))
# Collect the UUIDs of nodes related to this transaction.
uuid_set = set()
for part in partition_set:
uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part) \
if cell.getNodeState() != HIDDEN_STATE))
# Request locking data.
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
used_uuid_set = set()
for c in app.em.getConnectionList():
if c.getUUID() in uuid_set:
c.ask(protocol.lockInformation(tid), timeout=60)
used_uuid_set.add(c.getUUID())
try:
t = app.finishing_transaction_dict[tid]
t.setOIDList(oid_list)
t.setUUIDSet(used_uuid_set)
t.setMessageId(packet.getId())
except KeyError:
logging.warn('finishing transaction %s does not exist', dump(tid))
pass
...@@ -21,12 +21,12 @@ from neo import protocol ...@@ -21,12 +21,12 @@ from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
DOWN_STATE DOWN_STATE
from neo.master.handler import MasterEventHandler from neo.master.handlers import MasterHandler
from neo.exception import ElectionFailure from neo.exception import ElectionFailure
from neo.protocol import INVALID_UUID from neo.protocol import INVALID_UUID
from neo.node import MasterNode from neo.node import MasterNode
class ElectionEventHandler(MasterEventHandler): class ElectionHandler(MasterHandler):
"""This class deals with events for a primary master election.""" """This class deals with events for a primary master election."""
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
...@@ -72,32 +72,32 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -72,32 +72,32 @@ class ElectionEventHandler(MasterEventHandler):
c.close() c.close()
node.setState(state) node.setState(state)
class ClientElectionEventHandler(MasterEventHandler): class ClientElectionHandler(MasterHandler):
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
node = self.app.nm.getNodeByServer(conn.getAddress()) node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() != BROKEN_STATE: if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet) MasterHandler.packetReceived(self, conn, packet)
def connectionStarted(self, conn): def connectionStarted(self, conn):
app = self.app app = self.app
addr = conn.getAddress() addr = conn.getAddress()
app.unconnected_master_node_set.remove(addr) app.unconnected_master_node_set.remove(addr)
app.negotiating_master_node_set.add(addr) app.negotiating_master_node_set.add(addr)
MasterEventHandler.connectionStarted(self, conn) MasterHandler.connectionStarted(self, conn)
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
conn.ask(protocol.askPrimaryMaster()) conn.ask(protocol.askPrimaryMaster())
MasterEventHandler.connectionCompleted(self, conn) MasterHandler.connectionCompleted(self, conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
self.connectionFailed(conn) self.connectionFailed(conn)
MasterEventHandler.connectionClosed(self, conn) MasterHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
self.connectionFailed(conn) self.connectionFailed(conn)
MasterEventHandler.timeoutExpired(self, conn) MasterHandler.timeoutExpired(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
app = self.app app = self.app
...@@ -109,7 +109,7 @@ class ClientElectionEventHandler(MasterEventHandler): ...@@ -109,7 +109,7 @@ class ClientElectionEventHandler(MasterEventHandler):
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
elif node.getState() == TEMPORARILY_DOWN_STATE: elif node.getState() == TEMPORARILY_DOWN_STATE:
app.unconnected_master_node_set.add(addr) app.unconnected_master_node_set.add(addr)
MasterEventHandler.connectionFailed(self, conn) MasterHandler.connectionFailed(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
app = self.app app = self.app
...@@ -118,7 +118,7 @@ class ClientElectionEventHandler(MasterEventHandler): ...@@ -118,7 +118,7 @@ class ClientElectionEventHandler(MasterEventHandler):
if node is not None: if node is not None:
node.setState(DOWN_STATE) node.setState(DOWN_STATE)
app.negotiating_master_node_set.discard(addr) app.negotiating_master_node_set.discard(addr)
MasterEventHandler.peerBroken(self, conn) MasterHandler.peerBroken(self, conn)
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, uuid, ip_address, port, num_partitions,
...@@ -208,7 +208,7 @@ class ClientElectionEventHandler(MasterEventHandler): ...@@ -208,7 +208,7 @@ class ClientElectionEventHandler(MasterEventHandler):
app.uuid, app.server[0], app.server[1], app.name)) app.uuid, app.server[0], app.server[1], app.name))
class ServerElectionEventHandler(MasterEventHandler): class ServerElectionHandler(MasterHandler):
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
...@@ -219,7 +219,7 @@ class ServerElectionEventHandler(MasterEventHandler): ...@@ -219,7 +219,7 @@ class ServerElectionEventHandler(MasterEventHandler):
node = app.nm.getNodeByServer(addr) node = app.nm.getNodeByServer(addr)
if node is not None and node.getUUID() is not None: if node is not None and node.getUUID() is not None:
node.setState(BROKEN_STATE) node.setState(BROKEN_STATE)
MasterEventHandler.peerBroken(self, conn) MasterHandler.peerBroken(self, conn)
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
......
...@@ -18,13 +18,13 @@ ...@@ -18,13 +18,13 @@
import logging import logging
from neo import protocol from neo import protocol
from neo.master.handler import MasterEventHandler from neo.master.handlers import MasterHandler
class IdentificationEventHandler(MasterEventHandler): class IdentificationHandler(MasterHandler):
"""This class deals with messages from the admin node only""" """This class deals with messages from the admin node only"""
def _nodeLost(self, conn, node): def _nodeLost(self, conn, node):
logging.warning('lost a node in IdentificationEventHandler : %s' % node) logging.warning('lost a node in IdentificationHandler : %s' % node)
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
......
...@@ -20,12 +20,12 @@ import logging ...@@ -20,12 +20,12 @@ import logging
from neo import protocol from neo import protocol
from neo.protocol import RUNNING_STATE, BROKEN_STATE, \ from neo.protocol import RUNNING_STATE, BROKEN_STATE, \
TEMPORARILY_DOWN_STATE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE TEMPORARILY_DOWN_STATE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handlers import MasterHandler
from neo.protocol import UnexpectedPacketError, INVALID_UUID, INVALID_PTID from neo.protocol import UnexpectedPacketError, INVALID_UUID, INVALID_PTID
from neo.node import StorageNode from neo.node import StorageNode
from neo.util import dump from neo.util import dump
class RecoveryEventHandler(MasterEventHandler): class RecoveryHandler(MasterHandler):
"""This class deals with events for a recovery phase.""" """This class deals with events for a recovery phase."""
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
......
...@@ -19,12 +19,12 @@ import logging ...@@ -19,12 +19,12 @@ import logging
from neo.protocol import MASTER_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, DOWN_STATE RUNNING_STATE, BROKEN_STATE, DOWN_STATE
from neo.master.handler import MasterEventHandler from neo.master.handlers import MasterHandler
from neo.exception import ElectionFailure, PrimaryFailure from neo.exception import ElectionFailure, PrimaryFailure
from neo.protocol import UnexpectedPacketError, INVALID_UUID from neo.protocol import UnexpectedPacketError, INVALID_UUID
from neo.node import MasterNode from neo.node import MasterNode
class SecondaryMasterEventHandler(MasterEventHandler): class SecondaryMasterHandler(MasterHandler):
""" Handler used by primary to handle secondary masters""" """ Handler used by primary to handle secondary masters"""
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
...@@ -40,7 +40,7 @@ class SecondaryMasterEventHandler(MasterEventHandler): ...@@ -40,7 +40,7 @@ class SecondaryMasterEventHandler(MasterEventHandler):
logging.error('/!\ NotifyNodeInformation packet from secondary master') logging.error('/!\ NotifyNodeInformation packet from secondary master')
class PrimaryMasterEventHandler(MasterEventHandler): class PrimaryMasterHandler(MasterHandler):
""" Handler used by secondaries to handle primary master""" """ Handler used by secondaries to handle primary master"""
def _nodeLost(self, conn, node): def _nodeLost(self, conn, node):
...@@ -53,7 +53,7 @@ class PrimaryMasterEventHandler(MasterEventHandler): ...@@ -53,7 +53,7 @@ class PrimaryMasterEventHandler(MasterEventHandler):
node = self.app.nm.getNodeByServer(conn.getAddress()) node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() != BROKEN_STATE: if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet) MasterHandler.packetReceived(self, conn, packet)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
raise UnexpectedPacketError raise UnexpectedPacketError
......
...@@ -19,11 +19,11 @@ import logging ...@@ -19,11 +19,11 @@ import logging
from neo import protocol from neo import protocol
from neo.protocol import CLIENT_NODE_TYPE, ADMIN_NODE_TYPE, INVALID_UUID, \ from neo.protocol import CLIENT_NODE_TYPE, ADMIN_NODE_TYPE, INVALID_UUID, \
RUNNING_STATE, STORAGE_NODE_TYPE, TEMPORARILY_DOWN_STATE, STOPPING RUNNING_STATE, STORAGE_NODE_TYPE, TEMPORARILY_DOWN_STATE, STOPPING
from neo.master.service import ServiceEventHandler from neo.master.handlers import BaseServiceHandler
from neo import decorators from neo import decorators
from neo.util import dump from neo.util import dump
class ShutdownEventHandler(ServiceEventHandler): class ShutdownHandler(BaseServiceHandler):
"""This class deals with events for a shutting down phase.""" """This class deals with events for a shutting down phase."""
......
...@@ -23,186 +23,13 @@ from neo.protocol import CLIENT_NODE_TYPE, \ ...@@ -23,186 +23,13 @@ from neo.protocol import CLIENT_NODE_TYPE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \ UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \ STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \
HIDDEN_STATE, INVALID_UUID, INTERNAL_ERROR_CODE HIDDEN_STATE, INVALID_UUID, INTERNAL_ERROR_CODE
from neo.master.handler import MasterEventHandler from neo.master.handlers import BaseServiceHandler
from neo.protocol import UnexpectedPacketError from neo.protocol import UnexpectedPacketError
from neo.exception import OperationFailure from neo.exception import OperationFailure
from neo.util import dump from neo.util import dump
class ServiceEventHandler(MasterEventHandler):
"""This class deals with events for a service phase."""
def handleNotifyNodeInformation(self, conn, packet, node_list): class StorageServiceHandler(BaseServiceHandler):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest.
continue
if uuid == INVALID_UUID:
# No interest.
continue
if app.uuid == uuid:
# This looks like me...
if state == RUNNING_STATE:
# Yes, I know it.
continue
else:
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
if node is None:
# I really don't know such a node. What is this?
continue
else:
if node.getServer() != addr:
# This is different from what I know.
continue
if node.getState() == state:
# No change. Don't care.
continue
node.setState(state)
# Something wrong happened possibly. Cut the connection to
# this node, if any, and notify the information to others.
# XXX this can be very slow.
# XXX does this need to be closed in all cases ?
c = app.em.getConnectionByUUID(uuid)
if c is not None:
c.close()
app.broadcastNodeInformation(node)
if node.getNodeType() == STORAGE_NODE_TYPE:
if state == TEMPORARILY_DOWN_STATE:
cell_list = app.pt.outdate()
if len(cell_list) != 0:
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
def handleAskLastIDs(self, conn, packet):
app = self.app
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.pt.getID()), packet)
def handleAskUnfinishedTransactions(self, conn, packet):
app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet)
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
def __init__(self, conn):
self._conn = conn
self._msg_id = None
self._oid_list = None
self._uuid_set = None
self._locked_uuid_set = set()
def getConnection(self):
return self._conn
def setMessageId(self, msg_id):
self._msg_id = msg_id
def getMessageId(self):
return self._msg_id
def setOIDList(self, oid_list):
self._oid_list = oid_list
def getOIDList(self):
return self._oid_list
def setUUIDSet(self, uuid_set):
self._uuid_set = uuid_set
def getUUIDSet(self):
return self._uuid_set
def addLockedUUID(self, uuid):
if uuid in self._uuid_set:
self._locked_uuid_set.add(uuid)
def allLocked(self):
return self._uuid_set == self._locked_uuid_set
class ClientServiceEventHandler(ServiceEventHandler):
""" Handler dedicated to client during service state """
def connectionCompleted(self, conn):
pass
def _nodeLost(self, conn, node):
app = self.app
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
def handleAbortTransaction(self, conn, packet, tid):
try:
del self.app.finishing_transaction_dict[tid]
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
pass
def handleAskNewTID(self, conn, packet):
app = self.app
tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerNewTID(tid), packet)
def handleAskNewOIDs(self, conn, packet, num_oids):
app = self.app
oid_list = app.getNewOIDList(num_oids)
conn.answer(protocol.answerNewOIDs(oid_list), packet)
def handleFinishTransaction(self, conn, packet, oid_list, tid):
app = self.app
# If the given transaction ID is later than the last TID, the peer
# is crazy.
if app.ltid < tid:
raise UnexpectedPacketError
# Collect partitions related to this transaction.
getPartition = app.getPartition
partition_set = set()
partition_set.add(getPartition(tid))
partition_set.update((getPartition(oid) for oid in oid_list))
# Collect the UUIDs of nodes related to this transaction.
uuid_set = set()
for part in partition_set:
uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part) \
if cell.getNodeState() != HIDDEN_STATE))
# Request locking data.
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
used_uuid_set = set()
for c in app.em.getConnectionList():
if c.getUUID() in uuid_set:
c.ask(protocol.lockInformation(tid), timeout=60)
used_uuid_set.add(c.getUUID())
try:
t = app.finishing_transaction_dict[tid]
t.setOIDList(oid_list)
t.setUUIDSet(used_uuid_set)
t.setMessageId(packet.getId())
except KeyError:
logging.warn('finishing transaction %s does not exist', dump(tid))
pass
class StorageServiceEventHandler(ServiceEventHandler):
""" Handler dedicated to storages during service state """ """ Handler dedicated to storages during service state """
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
......
...@@ -19,12 +19,12 @@ import logging ...@@ -19,12 +19,12 @@ import logging
from neo.protocol import CLIENT_NODE_TYPE, RUNNING_STATE, BROKEN_STATE, \ from neo.protocol import CLIENT_NODE_TYPE, RUNNING_STATE, BROKEN_STATE, \
TEMPORARILY_DOWN_STATE, ADMIN_NODE_TYPE TEMPORARILY_DOWN_STATE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handlers import MasterHandler
from neo.exception import VerificationFailure from neo.exception import VerificationFailure
from neo.protocol import INVALID_UUID from neo.protocol import INVALID_UUID
from neo.util import dump from neo.util import dump
class VerificationEventHandler(MasterEventHandler): class VerificationHandler(MasterHandler):
"""This class deals with events for a verification phase.""" """This class deals with events for a verification phase."""
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
......
from neo.master.tests.testMasterApp import MasterAppTests from neo.master.tests.testMasterApp import MasterAppTests
from neo.master.tests.testMasterPT import MasterPartitionTableTests from neo.master.tests.testMasterPT import MasterPartitionTableTests
from neo.master.tests.testMasterElectionHandler import MasterElectionTests from neo.master.tests.testElectionHandler import MasterServerElectionTests
from neo.master.tests.testMasterRecoveryHandler import MasterRecoveryTests from neo.master.tests.testElectionHandler import MasterClientElectionTests
from neo.master.tests.testMasterService import MasterServiceTests from neo.master.tests.testRecoveryHandler import MasterRecoveryTests
from neo.master.tests.testMasterVerificationHandler import MasterVerificationTests from neo.master.tests.testClientHandler import MasterClientHandlerTests
from neo.master.tests.testStorageHandler import MasterStorageHandlerTests
from neo.master.tests.testVerificationHandler import MasterVerificationTests
__all__ = [ __all__ = [
'MasterAppTests', 'MasterAppTests',
'MasterElectionTests', 'MasterServerElectionTests',
'MasterClientElectionTests',
'MasterRecoveryTests', 'MasterRecoveryTests',
'MasterServiceTests', 'MasterClientHandlerTests',
'MasterStorageHandlerTests',
'MasterVerificationeTests', 'MasterVerificationeTests',
'MasterPartitionTableTests', 'MasterPartitionTableTests',
] ]
......
This diff is collapsed.
...@@ -23,7 +23,7 @@ from struct import pack, unpack ...@@ -23,7 +23,7 @@ from struct import pack, unpack
from neo.tests.base import NeoTestBase from neo.tests.base import NeoTestBase
from neo import protocol from neo import protocol
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, INVALID_UUID
from neo.master.election import ElectionEventHandler from neo.master.handlers import ClientElectionHandler, ServerElectionHandler
from neo.master.app import Application from neo.master.app import Application
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \ PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
...@@ -61,8 +61,73 @@ def expectMessage(self, packet): ...@@ -61,8 +61,73 @@ def expectMessage(self, packet):
self.connector.expectMessage(packet) self.connector.expectMessage(packet)
class MasterClientElectionTests(NeoTestBase):
def setUp(self):
logging.basicConfig(level = logging.WARNING)
# create an application object
config = self.getConfigFile()
self.app = Application(config, "master1")
self.app.pt.clear()
self.app.em = Mock({"getConnectionList" : []})
self.app.finishing_transaction_dict = {}
for server in self.app.master_node_list:
self.app.nm.add(MasterNode(server = server))
self.election = ClientElectionHandler(self.app)
self.app.unconnected_master_node_set = set()
self.app.negotiating_master_node_set = set()
for node in self.app.nm.getMasterNodeList():
self.app.unconnected_master_node_set.add(node.getServer())
node.setState(RUNNING_STATE)
# define some variable to simulate client and storage node
self.client_port = 11022
self.storage_port = 10021
self.master_port = 10011
# apply monkey patches
self._addPacket = ClientConnection._addPacket
self.expectMessage = ClientConnection.expectMessage
ClientConnection._addPacket = _addPacket
ClientConnection.expectMessage = expectMessage
def tearDown(self):
NeoTestBase.tearDown(self)
def test_01_connectionStarted(self):
uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)})
self.assertEqual(len(self.app.unconnected_master_node_set), 1)
self.assertEqual(len(self.app.negotiating_master_node_set), 0)
self.election.connectionStarted(conn)
self.assertEqual(len(self.app.unconnected_master_node_set), 0)
self.assertEqual(len(self.app.negotiating_master_node_set), 1)
def test_02_connectionCompleted(self):
uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)})
self.election.connectionCompleted(conn)
self.checkCalledRequestNodeIdentification(conn)
def test_03_connectionFailed(self):
uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)})
self.assertEqual(len(self.app.unconnected_master_node_set), 1)
self.assertEqual(len(self.app.negotiating_master_node_set), 0)
self.election.connectionStarted(conn)
self.assertEqual(len(self.app.unconnected_master_node_set), 0)
self.assertEqual(len(self.app.negotiating_master_node_set), 1)
self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)
self.election.connectionFailed(conn)
self.assertEqual(len(self.app.unconnected_master_node_set), 1)
self.assertEqual(len(self.app.negotiating_master_node_set), 0)
self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), TEMPORARILY_DOWN_STATE)
class MasterElectionTests(NeoTestBase):
class MasterServerElectionTests(NeoTestBase):
def setUp(self): def setUp(self):
logging.basicConfig(level = logging.WARNING) logging.basicConfig(level = logging.WARNING)
...@@ -74,7 +139,7 @@ class MasterElectionTests(NeoTestBase): ...@@ -74,7 +139,7 @@ class MasterElectionTests(NeoTestBase):
self.app.finishing_transaction_dict = {} self.app.finishing_transaction_dict = {}
for server in self.app.master_node_list: for server in self.app.master_node_list:
self.app.nm.add(MasterNode(server = server)) self.app.nm.add(MasterNode(server = server))
self.election = ElectionEventHandler(self.app) self.election = ServerElectionHandler(self.app)
self.app.unconnected_master_node_set = set() self.app.unconnected_master_node_set = set()
self.app.negotiating_master_node_set = set() self.app.negotiating_master_node_set = set()
for node in self.app.nm.getMasterNodeList(): for node in self.app.nm.getMasterNodeList():
...@@ -112,19 +177,6 @@ class MasterElectionTests(NeoTestBase): ...@@ -112,19 +177,6 @@ class MasterElectionTests(NeoTestBase):
"""Do first step of identification to MN """Do first step of identification to MN
""" """
uuid = self.getNewUUID() uuid = self.getNewUUID()
args = (node_type, uuid, ip, port, self.app.name)
packet = protocol.requestNodeIdentification(*args)
# test alien cluster
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None,
"isServerConnection" : True})
self.election.handleRequestNodeIdentification(conn,
packet=packet,
node_type=node_type,
uuid=uuid,
ip_address=ip,
port=port,
name=self.app.name,)
self.checkAcceptNodeIdentification(conn, answered_packet=packet)
return uuid return uuid
# Method to test the kind of packet returned in answer # Method to test the kind of packet returned in answer
...@@ -153,39 +205,6 @@ class MasterElectionTests(NeoTestBase): ...@@ -153,39 +205,6 @@ class MasterElectionTests(NeoTestBase):
self.assertEquals(packet.getType(), ANSWER_PRIMARY_MASTER) self.assertEquals(packet.getType(), ANSWER_PRIMARY_MASTER)
# Tests # Tests
def test_01_connectionStarted(self):
uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)})
self.assertEqual(len(self.app.unconnected_master_node_set), 1)
self.assertEqual(len(self.app.negotiating_master_node_set), 0)
self.election.connectionStarted(conn)
self.assertEqual(len(self.app.unconnected_master_node_set), 0)
self.assertEqual(len(self.app.negotiating_master_node_set), 1)
def test_02_connectionCompleted(self):
uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)})
self.election.connectionCompleted(conn)
self.checkCalledRequestNodeIdentification(conn)
def test_03_connectionFailed(self):
uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)})
self.assertEqual(len(self.app.unconnected_master_node_set), 1)
self.assertEqual(len(self.app.negotiating_master_node_set), 0)
self.election.connectionStarted(conn)
self.assertEqual(len(self.app.unconnected_master_node_set), 0)
self.assertEqual(len(self.app.negotiating_master_node_set), 1)
self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)
self.election.connectionFailed(conn)
self.assertEqual(len(self.app.unconnected_master_node_set), 1)
self.assertEqual(len(self.app.negotiating_master_node_set), 0)
self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), TEMPORARILY_DOWN_STATE)
def test_04_connectionClosed(self): def test_04_connectionClosed(self):
uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port) uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment