Commit 863a3207 authored by Grégory Wisniewski's avatar Grégory Wisniewski

In master handlers, replace calls to handleUnexpectedPacket() with raise of

UnexpectedPacketError exception or decorators previously commited.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@504 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent def8b1ad
...@@ -24,8 +24,14 @@ from neo.protocol import MASTER_NODE_TYPE, \ ...@@ -24,8 +24,14 @@ from neo.protocol import MASTER_NODE_TYPE, \
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.exception import ElectionFailure from neo.exception import ElectionFailure
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.handler import identification_required, restrict_node_types, \
client_connection_required, server_connection_required
# TODO: finalize decorators integration (identification, restriction, client...)
# TODO: here use specific decorator such as restrict_node_types which do custom
# operations such as send retryLater instead of unexpectedPacket
class ElectionEventHandler(MasterEventHandler): class ElectionEventHandler(MasterEventHandler):
"""This class deals with events for a primary master election.""" """This class deals with events for a primary master election."""
...@@ -87,191 +93,171 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -87,191 +93,171 @@ class ElectionEventHandler(MasterEventHandler):
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet) MasterEventHandler.packetReceived(self, conn, packet)
@client_connection_required
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, uuid, ip_address, port, num_partitions,
num_replicas, your_uuid): num_replicas, your_uuid):
if not conn.isServerConnection(): app = self.app
app = self.app node = app.nm.getNodeByServer(conn.getAddress())
node = app.nm.getNodeByServer(conn.getAddress()) if node_type != MASTER_NODE_TYPE:
if node_type != MASTER_NODE_TYPE: # The peer is not a master node!
# The peer is not a master node! logging.error('%s:%d is not a master node', ip_address, port)
logging.error('%s:%d is not a master node', ip_address, port) app.nm.remove(node)
app.nm.remove(node) app.negotiating_master_node_set.discard(node.getServer())
app.negotiating_master_node_set.discard(node.getServer()) conn.close()
conn.close() return
return if conn.getAddress() != (ip_address, port):
if conn.getAddress() != (ip_address, port): # The server address is different! Then why was
# The server address is different! Then why was # the connection successful?
# the connection successful? logging.error('%s:%d is waiting for %s:%d',
logging.error('%s:%d is waiting for %s:%d', conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
conn.getAddress()[0], conn.getAddress()[1], ip_address, port) app.nm.remove(node)
app.nm.remove(node) app.negotiating_master_node_set.discard(node.getServer())
app.negotiating_master_node_set.discard(node.getServer()) conn.close()
conn.close() return
return
if your_uuid != app.uuid:
# uuid conflict happened, accept the new one and restart election
app.uuid = your_uuid
raise ElectionFailure, 'new uuid supplied'
conn.setUUID(uuid)
node.setUUID(uuid)
# Ask a primary master.
conn.ask(protocol.askPrimaryMaster())
else:
self.handleUnexpectedPacket(conn, packet)
if your_uuid != app.uuid:
# uuid conflict happened, accept the new one and restart election
app.uuid = your_uuid
raise ElectionFailure, 'new uuid supplied'
conn.setUUID(uuid)
node.setUUID(uuid)
# Ask a primary master.
conn.ask(protocol.askPrimaryMaster())
@client_connection_required
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list): def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
if not conn.isServerConnection(): app = self.app
app = self.app # Register new master nodes.
# Register new master nodes. for ip_address, port, uuid in known_master_list:
for ip_address, port, uuid in known_master_list: addr = (ip_address, port)
addr = (ip_address, port) if app.server == addr:
if app.server == addr: # This is self.
# This is self. continue
continue
else:
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary = False
app.primary_master_node = primary_node
else: else:
if app.uuid < conn.getUUID(): n = app.nm.getNodeByServer(addr)
# I lost. if n is None:
app.primary = False n = MasterNode(server = addr)
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
app.negotiating_master_node_set.discard(conn.getAddress()) if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary = False
app.primary_master_node = primary_node
else: else:
self.handleUnexpectedPacket(conn, packet) if app.uuid < conn.getUUID():
# I lost.
app.primary = False
app.negotiating_master_node_set.discard(conn.getAddress())
@server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
if not conn.isServerConnection(): app = self.app
self.handleUnexpectedPacket(conn, packet) if node_type != MASTER_NODE_TYPE:
else: logging.info('reject a connection from a non-master')
app = self.app conn.answer(protocol.notReady('retry later'), packet)
if node_type != MASTER_NODE_TYPE: conn.abort()
logging.info('reject a connection from a non-master') return
conn.answer(protocol.notReady('retry later'), packet) if name != app.name:
conn.abort() logging.error('reject an alien cluster')
return conn.answer(protocol.protocolError('invalid cluster name'), packet)
if name != app.name: conn.abort()
logging.error('reject an alien cluster') return
conn.answer(protocol.protocolError('invalid cluster name'), packet)
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
app.unconnected_master_node_set.add(addr)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
conn.answer(protocol.brokenNodeDisallowedError(
'go away'), packet)
conn.abort()
return
# supplied another uuid in case of conflict
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas,
uuid)
conn.answer(p, packet)
# Next, the peer should ask a primary master node.
conn.expectMessage()
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
app.unconnected_master_node_set.add(addr)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
conn.answer(protocol.brokenNodeDisallowedError(
'go away'), packet)
conn.abort()
return
# supplied another uuid in case of conflict
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas,
uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
@identification_required
@server_connection_required
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
if not conn.isServerConnection(): uuid = conn.getUUID()
self.handleUnexpectedPacket(conn, packet) app = self.app
if app.primary:
primary_uuid = app.uuid
elif app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID()
else: else:
uuid = conn.getUUID() primary_uuid = INVALID_UUID
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
if app.primary:
primary_uuid = app.uuid
elif app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID()
else:
primary_uuid = INVALID_UUID
known_master_list = [] known_master_list = []
for n in app.nm.getMasterNodeList(): for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE: if n.getState() == BROKEN_STATE:
continue continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,) info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info) known_master_list.append(info)
p = protocol.answerPrimaryMaster(primary_uuid, known_master_list) p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.answer(p, packet) conn.answer(p, packet)
@identification_required
@server_connection_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
if not conn.isServerConnection(): uuid = conn.getUUID()
self.handleUnexpectedPacket(conn, packet) app = self.app
else: if app.primary:
uuid = conn.getUUID() # I am also the primary... So restart the election.
if uuid is None: raise ElectionFailure, 'another primary arises'
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
if app.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
app.primary = False app.primary = False
app.primary_master_node = node app.primary_master_node = node
logging.info('%s is the primary', node) logging.info('%s is the primary', node)
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
@identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
......
...@@ -23,9 +23,10 @@ from neo.protocol import MASTER_NODE_TYPE, \ ...@@ -23,9 +23,10 @@ from neo.protocol import MASTER_NODE_TYPE, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.exception import ElectionFailure from neo.exception import ElectionFailure
from neo.protocol import Packet, INVALID_UUID, INVALID_PTID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID, INVALID_PTID
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.util import dump from neo.util import dump
from neo.handler import identification_required, restrict_node_types
class RecoveryEventHandler(MasterEventHandler): class RecoveryEventHandler(MasterEventHandler):
"""This class deals with events for a recovery phase.""" """This class deals with events for a recovery phase."""
...@@ -182,12 +183,9 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -182,12 +183,9 @@ class RecoveryEventHandler(MasterEventHandler):
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.answer(p, packet) conn.answer(p, packet)
@identification_required
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
...@@ -209,24 +207,18 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -209,24 +207,18 @@ class RecoveryEventHandler(MasterEventHandler):
conn.getAddress())) conn.getAddress()))
app.sendPartitionTable(conn) app.sendPartitionTable(conn)
@identification_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
# I am also the primary... So restart the election. # I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises' raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
@identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE): if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
...@@ -275,19 +267,12 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -275,19 +267,12 @@ class RecoveryEventHandler(MasterEventHandler):
node.setState(state) node.setState(state)
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid): def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
# If the target is still unknown, set it to this node for now. # If the target is still unknown, set it to this node for now.
if app.target_uuid is None: if app.target_uuid is None:
app.target_uuid = uuid app.target_uuid = uuid
...@@ -304,17 +289,12 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -304,17 +289,12 @@ class RecoveryEventHandler(MasterEventHandler):
elif app.lptid == lptid and app.target_uuid is None: elif app.lptid == lptid and app.target_uuid is None:
app.target_uuid = uuid app.target_uuid = uuid
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
if uuid != app.target_uuid: if uuid != app.target_uuid:
# If this is not from a target node, ignore it. # If this is not from a target node, ignore it.
logging.warn('got answer partition table from %s while waiting for %s', logging.warn('got answer partition table from %s while waiting for %s',
...@@ -324,8 +304,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -324,8 +304,7 @@ class RecoveryEventHandler(MasterEventHandler):
for offset, cell_list in row_list: for offset, cell_list in row_list:
if offset >= app.num_partitions or app.pt.hasOffset(offset): if offset >= app.num_partitions or app.pt.hasOffset(offset):
# There must be something wrong. # There must be something wrong.
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
return
for uuid, state in cell_list: for uuid, state in cell_list:
n = app.nm.getNodeByUUID(uuid) n = app.nm.getNodeByUUID(uuid)
......
...@@ -24,8 +24,10 @@ from neo.protocol import MASTER_NODE_TYPE, \ ...@@ -24,8 +24,10 @@ from neo.protocol import MASTER_NODE_TYPE, \
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.exception import ElectionFailure, PrimaryFailure from neo.exception import ElectionFailure, PrimaryFailure
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.node import MasterNode from neo.node import MasterNode
from neo.handler import identification_required, restrict_node_types, \
client_connection_required, server_connection_required
class SecondaryEventHandler(MasterEventHandler): class SecondaryEventHandler(MasterEventHandler):
"""This class deals with events for a secondary master.""" """This class deals with events for a secondary master."""
...@@ -55,63 +57,55 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -55,63 +57,55 @@ class SecondaryEventHandler(MasterEventHandler):
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet) MasterEventHandler.packetReceived(self, conn, packet)
@server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
if isinstance(conn, ClientConnection): app = self.app
self.handleUnexpectedPacket(conn, packet) if name != app.name:
else: logging.error('reject an alien cluster')
app = self.app conn.answer(protocol.protocolError('invalid cluster name'), packet)
if name != app.name: conn.abort()
logging.error('reject an alien cluster') return
conn.answer(protocol.protocolError('invalid cluster name'), packet)
conn.abort() # Add a node only if it is a master node and I do not know it yet.
return if node_type == MASTER_NODE_TYPE and uuid != INVALID_UUID:
addr = (ip_address, port)
# Add a node only if it is a master node and I do not know it yet. node = app.nm.getNodeByServer(addr)
if node_type == MASTER_NODE_TYPE and uuid != INVALID_UUID: if node is None:
addr = (ip_address, port) node = MasterNode(server = addr, uuid = uuid)
node = app.nm.getNodeByServer(addr) app.nm.add(node)
if node is None:
node = MasterNode(server = addr, uuid = uuid) # Trust the UUID sent by the peer.
app.nm.add(node) node.setUUID(uuid)
# Trust the UUID sent by the peer. conn.setUUID(uuid)
node.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
conn.setUUID(uuid) app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas,
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, uuid)
app.uuid, app.server[0], app.server[1], # Next, the peer should ask a primary master node.
app.num_partitions, app.num_replicas, conn.answer(p, packet)
uuid)
conn.answer(p, packet)
# Next, the peer should ask a primary master node.
conn.expectMessage()
@identification_required
@server_connection_required
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
if isinstance(conn, ClientConnection): uuid = conn.getUUID()
self.handleUnexpectedPacket(conn, packet) app = self.app
else: primary_uuid = app.primary_master_node.getUUID()
uuid = conn.getUUID()
if uuid is None: known_master_list = []
self.handleUnexpectedPacket(conn, packet) for n in app.nm.getMasterNodeList():
return if n.getState() == BROKEN_STATE:
continue
app = self.app info = n.getServer() + (n.getUUID() or INVALID_UUID,)
primary_uuid = app.primary_master_node.getUUID() known_master_list.append(info)
known_master_list = [] p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
for n in app.nm.getMasterNodeList(): conn.answer(p, packet)
if n.getState() == BROKEN_STATE:
continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.answer(p, packet)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
......
...@@ -24,9 +24,10 @@ from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \ ...@@ -24,9 +24,10 @@ from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \ UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.exception import OperationFailure, ElectionFailure from neo.exception import OperationFailure, ElectionFailure
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.handler import identification_required, restrict_node_types
from neo.util import dump from neo.util import dump
class FinishingTransaction(object): class FinishingTransaction(object):
...@@ -196,7 +197,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -196,7 +197,7 @@ class ServiceEventHandler(MasterEventHandler):
# Otherwise, I know it only by the server address or the same # Otherwise, I know it only by the server address or the same
# server address but with a different UUID. # server address but with a different UUID.
if node.getUUID() is None: if node.getUUID() is None:
# This must be a master node. XXX Why ?? # This must be a master node loaded from configuration
if node.getNodeType() != MASTER_NODE_TYPE \ if node.getNodeType() != MASTER_NODE_TYPE \
or node_type != MASTER_NODE_TYPE: or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as # Error. This node uses the same server address as
...@@ -295,12 +296,9 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -295,12 +296,9 @@ class ServiceEventHandler(MasterEventHandler):
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.answer(p, packet) conn.answer(p, packet)
@identification_required
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
...@@ -322,24 +320,16 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -322,24 +320,16 @@ class ServiceEventHandler(MasterEventHandler):
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
conn.notify(protocol.startOperation()) conn.notify(protocol.startOperation())
@identification_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
# I am also the primary... So restart the election. # I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises' raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
@identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE): if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
...@@ -398,72 +388,46 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -398,72 +388,46 @@ class ServiceEventHandler(MasterEventHandler):
ptid = app.getNextPartitionTableID() ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list) app.broadcastPartitionChanges(ptid, cell_list)
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid): def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
# If I get a bigger value here, it is dangerous. # If I get a bigger value here, it is dangerous.
if app.loid < loid or app.ltid < ltid or app.lptid < lptid: if app.loid < loid or app.ltid < ltid or app.lptid < lptid:
logging.critical('got later information in service') logging.critical('got later information in service')
raise OperationFailure raise OperationFailure
@identification_required
@restrict_node_types(CLIENT_NODE_TYPE)
def handleAskNewTID(self, conn, packet): def handleAskNewTID(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != CLIENT_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
tid = app.getNextTID() tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn) app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerNewTID(tid), packet) conn.answer(protocol.answerNewTID(tid), packet)
@identification_required
@restrict_node_types(CLIENT_NODE_TYPE)
def handleAskNewOIDs(self, conn, packet, num_oids): def handleAskNewOIDs(self, conn, packet, num_oids):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != CLIENT_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
oid_list = app.getNewOIDList(num_oids) oid_list = app.getNewOIDList(num_oids)
conn.answer(protocol.answerNewOIDs(oid_list), packet) conn.answer(protocol.answerNewOIDs(oid_list), packet)
@identification_required
@restrict_node_types(CLIENT_NODE_TYPE)
def handleFinishTransaction(self, conn, packet, oid_list, tid): def handleFinishTransaction(self, conn, packet, oid_list, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != CLIENT_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
# If the given transaction ID is later than the last TID, the peer # If the given transaction ID is later than the last TID, the peer
# is crazy. # is crazy.
if app.ltid < tid: if app.ltid < tid:
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
return
# Collect partitions related to this transaction. # Collect partitions related to this transaction.
getPartition = app.getPartition getPartition = app.getPartition
...@@ -474,8 +438,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -474,8 +438,7 @@ class ServiceEventHandler(MasterEventHandler):
# Collect the UUIDs of nodes related to this transaction. # Collect the UUIDs of nodes related to this transaction.
uuid_set = set() uuid_set = set()
for part in partition_set: for part in partition_set:
uuid_set.update((cell.getUUID() for cell \ uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part)))
in app.pt.getCellList(part)))
# Request locking data. # Request locking data.
for c in app.em.getConnectionList(): for c in app.em.getConnectionList():
...@@ -491,24 +454,17 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -491,24 +454,17 @@ class ServiceEventHandler(MasterEventHandler):
logging.warn('finishing transaction %s does not exist', dump(tid)) logging.warn('finishing transaction %s does not exist', dump(tid))
pass pass
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleNotifyInformationLocked(self, conn, packet, tid): def handleNotifyInformationLocked(self, conn, packet, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
# If the given transaction ID is later than the last TID, the peer # If the given transaction ID is later than the last TID, the peer
# is crazy. # is crazy.
if app.ltid < tid: if app.ltid < tid:
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
return
try: try:
t = app.finishing_transaction_dict[tid] t = app.finishing_transaction_dict[tid]
...@@ -540,62 +496,39 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -540,62 +496,39 @@ class ServiceEventHandler(MasterEventHandler):
# What is this? # What is this?
pass pass
@identification_required
@restrict_node_types(CLIENT_NODE_TYPE)
def handleAbortTransaction(self, conn, packet, tid): def handleAbortTransaction(self, conn, packet, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != CLIENT_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
try: try:
del app.finishing_transaction_dict[tid] del app.finishing_transaction_dict[tid]
except KeyError: except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid)) logging.warn('aborting transaction %s does not exist', dump(tid))
pass pass
@identification_required
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.lptid), packet) conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.lptid), packet)
@identification_required
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys()) p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet) conn.answer(p, packet)
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
# This should be sent when a cell becomes up-to-date because # This should be sent when a cell becomes up-to-date because
# a replication has finished. # a replication has finished.
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node is None:
self.handleUnexpectedPacket(conn, packet)
return
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
new_cell_list = [] new_cell_list = []
for cell in cell_list: for cell in cell_list:
......
...@@ -22,7 +22,7 @@ from tempfile import mkstemp ...@@ -22,7 +22,7 @@ from tempfile import mkstemp
from mock import Mock from mock import Mock
from struct import pack, unpack from struct import pack, unpack
from neo import protocol from neo import protocol
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.master.election import ElectionEventHandler from neo.master.election import ElectionEventHandler
from neo.master.app import Application from neo.master.app import Application
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
...@@ -134,6 +134,14 @@ server: 127.0.0.1:10023 ...@@ -134,6 +134,14 @@ server: 127.0.0.1:10023
# Delete tmp file # Delete tmp file
os.remove(self.tmp_path) os.remove(self.tmp_path)
def checkUnexpectedPacketRaised(self, method, *args, **kwargs):
""" Check if the UnexpectedPacketError exception wxas raised """
self.assertRaises(UnexpectedPacketError, method, *args, **kwargs)
def checkIdenficationRequired(self, method, *args, **kwargs):
""" Check is the identification_required decorator is applied """
self.checkUnexpectedPacketRaised(method, *args, **kwargs)
def checkCalledAcceptNodeIdentification(self, conn, packet_number=0): def checkCalledAcceptNodeIdentification(self, conn, packet_number=0):
""" Check Accept Node Identification has been send""" """ Check Accept Node Identification has been send"""
self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1) self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1)
...@@ -603,8 +611,7 @@ server: 127.0.0.1:10023 ...@@ -603,8 +611,7 @@ server: 127.0.0.1:10023
"isServerConnection" : True, "isServerConnection" : True,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
self.assertEqual(len(self.app.nm.getMasterNodeList()), 1) self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
election.handleAnnouncePrimaryMaster(conn, packet) self.checkIdenficationRequired(election.handleAnnouncePrimaryMaster, conn, packet)
self.checkCalledAbort(conn)
# announce # announce
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
...@@ -644,8 +651,7 @@ server: 127.0.0.1:10023 ...@@ -644,8 +651,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
node_list = [] node_list = []
election.handleNotifyNodeInformation(conn, packet, node_list) self.checkIdenficationRequired(election.handleNotifyNodeInformation, conn, packet, node_list)
self.checkCalledAbort(conn)
# tell the master node about itself, must do nothing # tell the master node about itself, must do nothing
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
......
...@@ -22,7 +22,7 @@ from tempfile import mkstemp ...@@ -22,7 +22,7 @@ from tempfile import mkstemp
from mock import Mock from mock import Mock
from struct import pack, unpack from struct import pack, unpack
from neo import protocol from neo import protocol
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.master.recovery import RecoveryEventHandler from neo.master.recovery import RecoveryEventHandler
from neo.master.app import Application from neo.master.app import Application
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
...@@ -153,6 +153,14 @@ server: 127.0.0.1:10023 ...@@ -153,6 +153,14 @@ server: 127.0.0.1:10023
self.checkCalledAcceptNodeIdentification(conn) self.checkCalledAcceptNodeIdentification(conn)
return uuid return uuid
def checkUnexpectedPacketRaised(self, method, *args, **kwargs):
""" Check if the UnexpectedPacketError exception wxas raised """
self.assertRaises(UnexpectedPacketError, method, *args, **kwargs)
def checkIdenficationRequired(self, method, *args, **kwargs):
""" Check is the identification_required decorator is applied """
self.checkUnexpectedPacketRaised(method, *args, **kwargs)
# Method to test the kind of packet returned in answer # Method to test the kind of packet returned in answer
def checkCalledRequestNodeIdentification(self, conn, packet_number=0): def checkCalledRequestNodeIdentification(self, conn, packet_number=0):
""" Check Request Node Identification has been send""" """ Check Request Node Identification has been send"""
...@@ -164,7 +172,6 @@ server: 127.0.0.1:10023 ...@@ -164,7 +172,6 @@ server: 127.0.0.1:10023
self.assertTrue(isinstance(packet, Packet)) self.assertTrue(isinstance(packet, Packet))
self.assertEquals(packet.getType(), REQUEST_NODE_IDENTIFICATION) self.assertEquals(packet.getType(), REQUEST_NODE_IDENTIFICATION)
def checkCalledAskPrimaryMaster(self, conn, packet_number=0): def checkCalledAskPrimaryMaster(self, conn, packet_number=0):
""" Check ask primary master has been send""" """ Check ask primary master has been send"""
call = conn.mockGetNamedCalls("addPacket")[packet_number] call = conn.mockGetNamedCalls("addPacket")[packet_number]
...@@ -540,8 +547,7 @@ server: 127.0.0.1:10023 ...@@ -540,8 +547,7 @@ server: 127.0.0.1:10023
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
self.assertEqual(len(self.app.nm.getMasterNodeList()), 1) self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
recovery.handleAnnouncePrimaryMaster(conn, packet) self.checkIdenficationRequired(recovery.handleAnnouncePrimaryMaster, conn, packet)
self.checkCalledAbort(conn)
# announce # announce
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
...@@ -570,8 +576,7 @@ server: 127.0.0.1:10023 ...@@ -570,8 +576,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
node_list = [] node_list = []
recovery.handleNotifyNodeInformation(conn, packet, node_list) self.checkIdenficationRequired(recovery.handleNotifyNodeInformation, conn, packet, node_list)
self.checkCalledAbort(conn)
# tell about a client node, do nothing # tell about a client node, do nothing
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
...@@ -637,8 +642,7 @@ server: 127.0.0.1:10023 ...@@ -637,8 +642,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
recovery.handleAnswerLastIDs(conn, packet, None, None, None) self.checkIdenficationRequired(recovery.handleAnswerLastIDs, conn, packet, None, None, None)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
self.assertEquals(lptid, self.app.lptid) self.assertEquals(lptid, self.app.lptid)
...@@ -647,8 +651,7 @@ server: 127.0.0.1:10023 ...@@ -647,8 +651,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : master_uuid, conn = Mock({"getUUID" : master_uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
node_list = [] node_list = []
recovery.handleAnswerLastIDs(conn, packet, None, None, None) self.checkUnexpectedPacketRaised(recovery.handleAnswerLastIDs, conn, packet, None, None, None)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
self.assertEquals(lptid, self.app.lptid) self.assertEquals(lptid, self.app.lptid)
...@@ -681,14 +684,12 @@ server: 127.0.0.1:10023 ...@@ -681,14 +684,12 @@ server: 127.0.0.1:10023
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
recovery.handleAnswerPartitionTable(conn, packet, None, []) self.checkIdenficationRequired(recovery.handleAnswerPartitionTable, conn, packet, None, [])
self.checkCalledAbort(conn)
# not a storage node # not a storage node
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
recovery.handleAnswerPartitionTable(conn, packet, None, []) self.checkUnexpectedPacketRaised(recovery.handleAnswerPartitionTable, conn, packet, None, [])
self.checkCalledAbort(conn)
# not from target node, ignore # not from target node, ignore
uuid = self.identifyToMasterNode(STORAGE_NODE_TYPE, port=self.storage_port) uuid = self.identifyToMasterNode(STORAGE_NODE_TYPE, port=self.storage_port)
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
...@@ -728,8 +729,7 @@ server: 127.0.0.1:10023 ...@@ -728,8 +729,7 @@ server: 127.0.0.1:10023
offset = 1000000 offset = 1000000
self.assertFalse(self.app.pt.hasOffset(offset)) self.assertFalse(self.app.pt.hasOffset(offset))
cell_list = [(offset, ((uuid, DOWN_STATE,),),)] cell_list = [(offset, ((uuid, DOWN_STATE,),),)]
recovery.handleAnswerPartitionTable(conn, packet, None, cell_list) self.checkUnexpectedPacketRaised(recovery.handleAnswerPartitionTable, conn, packet, None, cell_list)
self.checkCalledAbort(conn)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -22,7 +22,7 @@ from tempfile import mkstemp ...@@ -22,7 +22,7 @@ from tempfile import mkstemp
from mock import Mock from mock import Mock
from struct import pack, unpack from struct import pack, unpack
from neo import protocol from neo import protocol
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.master.service import ServiceEventHandler from neo.master.service import ServiceEventHandler
from neo.master.app import Application from neo.master.app import Application
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
...@@ -116,6 +116,14 @@ server: 127.0.0.1:10023 ...@@ -116,6 +116,14 @@ server: 127.0.0.1:10023
# Delete tmp file # Delete tmp file
os.remove(self.tmp_path) os.remove(self.tmp_path)
def checkUnexpectedPacketRaised(self, method, *args, **kwargs):
""" Check if the UnexpectedPacketError exception wxas raised """
self.assertRaises(UnexpectedPacketError, method, *args, **kwargs)
def checkIdenficationRequired(self, method, *args, **kwargs):
""" Check is the identification_required decorator is applied """
self.checkUnexpectedPacketRaised(method, *args, **kwargs)
# Method to test the kind of packet returned in answer # Method to test the kind of packet returned in answer
def checkCalledAbort(self, conn, packet_number=0): def checkCalledAbort(self, conn, packet_number=0):
"""Check the abort method has been called and an error packet has been sent""" """Check the abort method has been called and an error packet has been sent"""
...@@ -431,8 +439,7 @@ server: 127.0.0.1:10023 ...@@ -431,8 +439,7 @@ server: 127.0.0.1:10023
# if no uuid in conn, no reelection done # if no uuid in conn, no reelection done
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
service.handleAnnouncePrimaryMaster(conn, packet) self.checkIdenficationRequired(service.handleAnnouncePrimaryMaster, conn, packet)
self.checkCalledAbort(conn)
def test_04_handleReelectPrimaryMaster(self): def test_04_handleReelectPrimaryMaster(self):
...@@ -453,8 +460,7 @@ server: 127.0.0.1:10023 ...@@ -453,8 +460,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
service.handleNotifyNodeInformation(conn, packet, node_list) self.checkIdenficationRequired(service.handleNotifyNodeInformation, conn, packet, node_list)
self.checkCalledAbort(conn)
# tell the master node that is not running any longer, it must raises # tell the master node that is not running any longer, it must raises
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
...@@ -530,8 +536,7 @@ server: 127.0.0.1:10023 ...@@ -530,8 +536,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
service.handleAnswerLastIDs(conn, packet, None, None, None) self.checkIdenficationRequired(service.handleAnswerLastIDs, conn, packet, None, None, None)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
self.assertEquals(lptid, self.app.lptid) self.assertEquals(lptid, self.app.lptid)
...@@ -540,8 +545,7 @@ server: 127.0.0.1:10023 ...@@ -540,8 +545,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : client_uuid, conn = Mock({"getUUID" : client_uuid,
"getAddress" : ("127.0.0.1", self.client_port)}) "getAddress" : ("127.0.0.1", self.client_port)})
node_list = [] node_list = []
service.handleAnswerLastIDs(conn, packet, None, None, None) self.checkUnexpectedPacketRaised(service.handleAnswerLastIDs, conn, packet, None, None, None)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
self.assertEquals(lptid, self.app.lptid) self.assertEquals(lptid, self.app.lptid)
...@@ -567,15 +571,13 @@ server: 127.0.0.1:10023 ...@@ -567,15 +571,13 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
service.handleAskNewTID(conn, packet) self.checkIdenficationRequired(service.handleAskNewTID, conn, packet)
self.checkCalledAbort(conn)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
# do not care if storage node call it # do not care if storage node call it
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
self.assertRaises(OperationFailure, service.handleAskNewTID, conn, packet) self.checkUnexpectedPacketRaised(service.handleAskNewTID, conn, packet)
self.checkCalledAbort(conn)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
# client call it # client call it
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
...@@ -597,15 +599,13 @@ server: 127.0.0.1:10023 ...@@ -597,15 +599,13 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
service.handleAskNewOIDs(conn, packet, 1) self.checkIdenficationRequired(service.handleAskNewOIDs, conn, packet, 1)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
# do not care if storage node call it # do not care if storage node call it
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
self.assertRaises(OperationFailure, service.handleAskNewOIDs, conn, packet, 1) self.checkUnexpectedPacketRaised(service.handleAskNewOIDs, conn, packet, 1)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
# client call it # client call it
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
...@@ -623,15 +623,13 @@ server: 127.0.0.1:10023 ...@@ -623,15 +623,13 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
service.handleFinishTransaction(conn, packet, [], None, ) self.checkIdenficationRequired(service.handleFinishTransaction, conn, packet, [], None, )
self.checkCalledAbort(conn)
# do not care if storage node call it # do not care if storage node call it
storage_conn = conn = Mock({"getUUID" : uuid, storage_conn = conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port), "getAddress" : ("127.0.0.1", self.storage_port),
"getSockect" : uuid}) "getSockect" : uuid})
node_list = [] node_list = []
self.assertRaises(OperationFailure, service.handleFinishTransaction, conn, packet, [], None) self.checkUnexpectedPacketRaised(service.handleFinishTransaction, conn, packet, [], None)
self.checkCalledAbort(conn)
# give an older tid than the PMN known, must abort # give an older tid than the PMN known, must abort
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
conn = Mock({"getUUID" : client_uuid, conn = Mock({"getUUID" : client_uuid,
...@@ -639,8 +637,11 @@ server: 127.0.0.1:10023 ...@@ -639,8 +637,11 @@ server: 127.0.0.1:10023
oid_list = [] oid_list = []
upper, lower = unpack('!LL', self.app.ltid) upper, lower = unpack('!LL', self.app.ltid)
new_tid = pack('!LL', upper, lower + 10) new_tid = pack('!LL', upper, lower + 10)
service.handleFinishTransaction(conn, packet, oid_list, new_tid) self.checkUnexpectedPacketRaised(service.handleFinishTransaction, conn, packet, oid_list, new_tid)
self.checkCalledAbort(conn) old_node = self.app.nm.getNodeByUUID(uuid)
self.app.nm.remove(old_node)
self.app.pt.dropNode(old_node)
# do the right job # do the right job
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
storage_uuid = self.identifyToMasterNode() storage_uuid = self.identifyToMasterNode()
...@@ -670,29 +671,30 @@ server: 127.0.0.1:10023 ...@@ -670,29 +671,30 @@ server: 127.0.0.1:10023
def test_10_handleNotifyInformationLocked(self): def test_10_handleNotifyInformationLocked(self):
service = self.service service = self.service
uuid = self.identifyToMasterNode() uuid = self.identifyToMasterNode(port=10020)
packet = Packet(msg_type=NOTIFY_INFORMATION_LOCKED) packet = Packet(msg_type=NOTIFY_INFORMATION_LOCKED)
# do not answer if no uuid # do not answer if no uuid
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
service.handleNotifyInformationLocked(conn, packet, None, ) self.checkIdenficationRequired(service.handleNotifyInformationLocked, conn, packet, None, )
self.checkCalledAbort(conn)
# do not care if client node call it # do not care if client node call it
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=11021) client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=11021)
client = conn = Mock({"getUUID" : client_uuid, client = conn = Mock({"getUUID" : client_uuid,
"getAddress" : ("127.0.0.1", 11021), "getAddress" : ("127.0.0.1", 11020),
"getSockect" : client_uuid}) "getSockect" : client_uuid})
service.handleNotifyInformationLocked(conn, packet, None) self.checkUnexpectedPacketRaised(service.handleNotifyInformationLocked, conn, packet, None)
self.checkCalledAbort(conn)
# give an older tid than the PMN known, must abort # give an older tid than the PMN known, must abort
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
oid_list = [] oid_list = []
upper, lower = unpack('!LL', self.app.ltid) upper, lower = unpack('!LL', self.app.ltid)
new_tid = pack('!LL', upper, lower + 10) new_tid = pack('!LL', upper, lower + 10)
self.assertRaises(OperationFailure, service.handleNotifyInformationLocked, conn, packet, new_tid) self.checkUnexpectedPacketRaised(service.handleNotifyInformationLocked, conn, packet, new_tid)
self.checkCalledAbort(conn) old_node = self.app.nm.getNodeByUUID(uuid)
# job done through dispatch -> peerBroken
self.app.nm.remove(old_node)
self.app.pt.dropNode(old_node)
# do the right job # do the right job
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
...@@ -718,7 +720,6 @@ server: 127.0.0.1:10023 ...@@ -718,7 +720,6 @@ server: 127.0.0.1:10023
oid_list = [] oid_list = []
tid = self.app.ltid tid = self.app.ltid
service.handleFinishTransaction(conn, packet, oid_list, tid) service.handleFinishTransaction(conn, packet, oid_list, tid)
self.assertEquals(len(conn.mockGetNamedCalls("addPacket")), 0)
self.checkCalledLockInformation(storage_conn_1) self.checkCalledLockInformation(storage_conn_1)
self.checkCalledLockInformation(storage_conn_2) self.checkCalledLockInformation(storage_conn_2)
self.assertFalse(self.app.finishing_transaction_dict.values()[0].allLocked()) self.assertFalse(self.app.finishing_transaction_dict.values()[0].allLocked())
...@@ -728,7 +729,6 @@ server: 127.0.0.1:10023 ...@@ -728,7 +729,6 @@ server: 127.0.0.1:10023
service.handleNotifyInformationLocked(storage_conn_1, packet, tid) service.handleNotifyInformationLocked(storage_conn_1, packet, tid)
self.assertEquals(len(storage_conn_1.mockGetNamedCalls("ask")), 1) self.assertEquals(len(storage_conn_1.mockGetNamedCalls("ask")), 1)
self.assertEquals(len(storage_conn_2.mockGetNamedCalls("ask")), 1) self.assertEquals(len(storage_conn_2.mockGetNamedCalls("ask")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("addPacket")), 0)
self.assertFalse(self.app.finishing_transaction_dict.values()[0].allLocked()) self.assertFalse(self.app.finishing_transaction_dict.values()[0].allLocked())
service.handleNotifyInformationLocked(storage_conn_2, packet, tid) service.handleNotifyInformationLocked(storage_conn_2, packet, tid)
...@@ -750,14 +750,12 @@ server: 127.0.0.1:10023 ...@@ -750,14 +750,12 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
service.handleAbortTransaction(conn, packet, None, ) self.checkIdenficationRequired(service.handleAbortTransaction, conn, packet, None, )
self.checkCalledAbort(conn)
# do not answer if not a client # do not answer if not a client
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
self.assertRaises(OperationFailure, service.handleAbortTransaction, conn, packet, None) self.checkUnexpectedPacketRaised(service.handleAbortTransaction, conn, packet, None)
self.checkCalledAbort(conn)
# give a bad tid, must not failed, just ignored it # give a bad tid, must not failed, just ignored it
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
conn = Mock({"getUUID" : client_uuid, conn = Mock({"getUUID" : client_uuid,
...@@ -783,8 +781,7 @@ server: 127.0.0.1:10023 ...@@ -783,8 +781,7 @@ server: 127.0.0.1:10023
# do not answer if no uuid # do not answer if no uuid
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
service.handleAskLastIDs(conn, packet ) self.checkIdenficationRequired(service.handleAskLastIDs, conn, packet )
self.checkCalledAbort(conn)
# give a uuid # give a uuid
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
...@@ -805,8 +802,8 @@ server: 127.0.0.1:10023 ...@@ -805,8 +802,8 @@ server: 127.0.0.1:10023
# do not answer if no uuid # do not answer if no uuid
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
service.handleAskUnfinishedTransactions(conn, packet) self.checkIdenficationRequired(service.handleAskUnfinishedTransactions,
self.checkCalledAbort(conn) conn, packet)
# give a uuid # give a uuid
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
...@@ -835,14 +832,15 @@ server: 127.0.0.1:10023 ...@@ -835,14 +832,15 @@ server: 127.0.0.1:10023
# do not answer if no uuid # do not answer if no uuid
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
service.handleNotifyPartitionChanges(conn, packet, None, None) self.checkIdenficationRequired(service.handleNotifyPartitionChanges,
self.checkCalledAbort(conn) conn, packet, None, None)
# do not answer if not a storage node # do not answer if not a storage node
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE,
port=self.client_port) port=self.client_port)
conn = Mock({"getUUID" : client_uuid, conn = Mock({"getUUID" : client_uuid,
"getAddress" : ("127.0.0.1", self.client_port)}) "getAddress" : ("127.0.0.1", self.client_port)})
service.handleNotifyPartitionChanges(conn, packet, None, None) self.checkUnexpectedPacketRaised(service.handleNotifyPartitionChanges,
conn, packet, None, None)
# send a bad state, must not be take into account # send a bad state, must not be take into account
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
......
...@@ -21,7 +21,7 @@ import logging ...@@ -21,7 +21,7 @@ import logging
from tempfile import mkstemp from tempfile import mkstemp
from mock import Mock from mock import Mock
from struct import pack, unpack from struct import pack, unpack
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.master.verification import VerificationEventHandler from neo.master.verification import VerificationEventHandler
from neo.master.app import Application from neo.master.app import Application
from neo import protocol from neo import protocol
...@@ -123,6 +123,14 @@ server: 127.0.0.1:10023 ...@@ -123,6 +123,14 @@ server: 127.0.0.1:10023
# Delete tmp file # Delete tmp file
os.remove(self.tmp_path) os.remove(self.tmp_path)
def checkUnexpectedPacketRaised(self, method, *args, **kwargs):
""" Check if the UnexpectedPacketError exception wxas raised """
self.assertRaises(UnexpectedPacketError, method, *args, **kwargs)
def checkIdenficationRequired(self, method, *args, **kwargs):
""" Check is the identification_required decorator is applied """
self.checkUnexpectedPacketRaised(method, *args, **kwargs)
def checkCalledAcceptNodeIdentification(self, conn, packet_number=0): def checkCalledAcceptNodeIdentification(self, conn, packet_number=0):
""" Check Accept Node Identification has been send""" """ Check Accept Node Identification has been send"""
self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1) self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1)
...@@ -559,8 +567,7 @@ server: 127.0.0.1:10023 ...@@ -559,8 +567,7 @@ server: 127.0.0.1:10023
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
self.assertEqual(len(self.app.nm.getMasterNodeList()), 1) self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
verification.handleAnnouncePrimaryMaster(conn, packet) self.checkIdenficationRequired(verification.handleAnnouncePrimaryMaster, conn, packet)
self.checkCalledAbort(conn)
# announce # announce
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
...@@ -587,8 +594,7 @@ server: 127.0.0.1:10023 ...@@ -587,8 +594,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
node_list = [] node_list = []
verification.handleNotifyNodeInformation(conn, packet, node_list) self.checkIdenficationRequired(verification.handleNotifyNodeInformation, conn, packet, node_list)
self.checkCalledAbort(conn)
# tell about a client node, do nothing # tell about a client node, do nothing
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
...@@ -653,8 +659,7 @@ server: 127.0.0.1:10023 ...@@ -653,8 +659,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : None, conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
node_list = [] node_list = []
verification.handleAnswerLastIDs(conn, packet, None, None, None) self.checkIdenficationRequired(verification.handleAnswerLastIDs, conn, packet, None, None, None)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
self.assertEquals(lptid, self.app.lptid) self.assertEquals(lptid, self.app.lptid)
...@@ -663,8 +668,7 @@ server: 127.0.0.1:10023 ...@@ -663,8 +668,7 @@ server: 127.0.0.1:10023
conn = Mock({"getUUID" : master_uuid, conn = Mock({"getUUID" : master_uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
node_list = [] node_list = []
verification.handleAnswerLastIDs(conn, packet, None, None, None) self.checkUnexpectedPacketRaised(verification.handleAnswerLastIDs, conn, packet, None, None, None)
self.checkCalledAbort(conn)
self.assertEquals(loid, self.app.loid) self.assertEquals(loid, self.app.loid)
self.assertEquals(ltid, self.app.ltid) self.assertEquals(ltid, self.app.ltid)
self.assertEquals(lptid, self.app.lptid) self.assertEquals(lptid, self.app.lptid)
...@@ -704,15 +708,13 @@ server: 127.0.0.1:10023 ...@@ -704,15 +708,13 @@ server: 127.0.0.1:10023
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
verification.handleAnswerUnfinishedTransactions(conn, packet, []) self.checkIdenficationRequired(verification.handleAnswerUnfinishedTransactions, conn, packet, [])
self.checkCalledAbort(conn)
# reject master node # reject master node
master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port) master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : master_uuid, "getUUID" : master_uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
verification.handleAnswerUnfinishedTransactions(conn, packet, []) self.checkUnexpectedPacketRaised(verification.handleAnswerUnfinishedTransactions, conn, packet, [])
self.checkCalledAbort(conn)
# do nothing # do nothing
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
...@@ -748,15 +750,13 @@ server: 127.0.0.1:10023 ...@@ -748,15 +750,13 @@ server: 127.0.0.1:10023
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
verification.handleAnswerTransactionInformation(conn, packet, None, None, None, None, None) self.checkIdenficationRequired(verification.handleAnswerTransactionInformation, conn, packet, None, None, None, None, None)
self.checkCalledAbort(conn)
# reject master node # reject master node
master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port) master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : master_uuid, "getUUID" : master_uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
verification.handleAnswerTransactionInformation(conn, packet, None, None, None, None, None) self.checkUnexpectedPacketRaised(verification.handleAnswerTransactionInformation, conn, packet, None, None, None, None, None)
self.checkCalledAbort(conn)
# do nothing, as unfinished_oid_set is None # do nothing, as unfinished_oid_set is None
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
...@@ -820,16 +820,14 @@ server: 127.0.0.1:10023 ...@@ -820,16 +820,14 @@ server: 127.0.0.1:10023
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
verification.handleTidNotFound(conn, packet, []) self.checkIdenficationRequired(verification.handleTidNotFound, conn, packet, [])
self.checkCalledAbort(conn)
# reject master node # reject master node
master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port) master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : master_uuid, "getUUID" : master_uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
verification.handleTidNotFound(conn, packet, []) self.checkUnexpectedPacketRaised(verification.handleTidNotFound, conn, packet, [])
self.checkCalledAbort(conn) # do nothing as asking_uuid_dict is True
# do nothinf as asking_uuid_dict is True
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
...@@ -858,15 +856,13 @@ server: 127.0.0.1:10023 ...@@ -858,15 +856,13 @@ server: 127.0.0.1:10023
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
verification.handleAnswerObjectPresent(conn, packet, None, None) self.checkIdenficationRequired(verification.handleAnswerObjectPresent, conn, packet, None, None)
self.checkCalledAbort(conn)
# reject master node # reject master node
master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port) master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : master_uuid, "getUUID" : master_uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
verification.handleAnswerObjectPresent(conn, packet, None, None) self.checkUnexpectedPacketRaised(verification.handleAnswerObjectPresent, conn, packet, None, None)
self.checkCalledAbort(conn)
# do nothing as asking_uuid_dict is True # do nothing as asking_uuid_dict is True
upper, lower = unpack('!LL', self.app.ltid) upper, lower = unpack('!LL', self.app.ltid)
new_tid = pack('!LL', upper, lower + 10) new_tid = pack('!LL', upper, lower + 10)
...@@ -897,15 +893,13 @@ server: 127.0.0.1:10023 ...@@ -897,15 +893,13 @@ server: 127.0.0.1:10023
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : None, "getUUID" : None,
"getAddress" : ("127.0.0.1", self.storage_port)}) "getAddress" : ("127.0.0.1", self.storage_port)})
verification.handleOidNotFound(conn, packet, []) self.checkIdenficationRequired(verification.handleOidNotFound, conn, packet, [])
self.checkCalledAbort(conn)
# reject master node # reject master node
master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port) master_uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : master_uuid, "getUUID" : master_uuid,
"getAddress" : ("127.0.0.1", self.master_port)}) "getAddress" : ("127.0.0.1", self.master_port)})
verification.handleOidNotFound(conn, packet, []) self.checkUnexpectedPacketRaised(verification.handleOidNotFound, conn, packet, [])
self.checkCalledAbort(conn)
# do nothinf as asking_uuid_dict is True # do nothinf as asking_uuid_dict is True
conn = Mock({"addPacket" : None, conn = Mock({"addPacket" : None,
"getUUID" : uuid, "getUUID" : uuid,
......
...@@ -23,9 +23,10 @@ from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ...@@ -23,9 +23,10 @@ from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE,
ADMIN_NODE_TYPE ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.exception import VerificationFailure, ElectionFailure from neo.exception import VerificationFailure, ElectionFailure
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.util import dump from neo.util import dump
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.handler import identification_required, restrict_node_types
class VerificationEventHandler(MasterEventHandler): class VerificationEventHandler(MasterEventHandler):
"""This class deals with events for a verification phase.""" """This class deals with events for a verification phase."""
...@@ -205,12 +206,9 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -205,12 +206,9 @@ class VerificationEventHandler(MasterEventHandler):
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.answer(p, packet) conn.answer(p, packet)
@identification_required
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
# Merely tell the peer that I am the primary master node. # Merely tell the peer that I am the primary master node.
...@@ -226,24 +224,18 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -226,24 +224,18 @@ class VerificationEventHandler(MasterEventHandler):
if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
app.sendPartitionTable(conn) app.sendPartitionTable(conn)
@identification_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
# I am also the primary... So restart the election. # I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises' raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
@identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE): if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
...@@ -292,19 +284,12 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -292,19 +284,12 @@ class VerificationEventHandler(MasterEventHandler):
node.setState(state) node.setState(state)
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid): def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
# If I get a bigger value here, it is dangerous. # If I get a bigger value here, it is dangerous.
if app.loid < loid or app.ltid < ltid or app.lptid < lptid: if app.loid < loid or app.ltid < ltid or app.lptid < lptid:
logging.critical('got later information in verification') logging.critical('got later information in verification')
...@@ -314,46 +299,32 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -314,46 +299,32 @@ class VerificationEventHandler(MasterEventHandler):
# Ignore this packet. # Ignore this packet.
pass pass
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list): def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
logging.info('got unfinished transactions %s from %s:%d', logging.info('got unfinished transactions %s from %s:%d',
tid_list, *(conn.getAddress())) tid_list, *(conn.getAddress()))
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
if app.asking_uuid_dict.get(uuid, True): if app.asking_uuid_dict.get(uuid, True):
# No interest. # No interest.
return return
app.unfinished_tid_set.update(tid_list) app.unfinished_tid_set.update(tid_list)
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerTransactionInformation(self, conn, packet, tid, def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list): user, desc, ext, oid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
logging.info('got OIDs %s for %s from %s:%d', logging.info('got OIDs %s for %s from %s:%d',
oid_list, tid, *(conn.getAddress())) oid_list, tid, *(conn.getAddress()))
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
if app.asking_uuid_dict.get(uuid, True): if app.asking_uuid_dict.get(uuid, True):
# No interest. # No interest.
return return
oid_set = set(oid_list) oid_set = set(oid_list)
if app.unfinished_oid_set is None: if app.unfinished_oid_set is None:
# Someone does not agree. # Someone does not agree.
...@@ -365,61 +336,40 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -365,61 +336,40 @@ class VerificationEventHandler(MasterEventHandler):
app.unfinished_oid_set = None app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleTidNotFound(self, conn, packet, message): def handleTidNotFound(self, conn, packet, message):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
logging.info('TID not found: %s', message) logging.info('TID not found: %s', message)
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
if app.asking_uuid_dict.get(uuid, True): if app.asking_uuid_dict.get(uuid, True):
# No interest. # No interest.
return return
app.unfinished_oid_set = None app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerObjectPresent(self, conn, packet, oid, tid): def handleAnswerObjectPresent(self, conn, packet, oid, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
logging.info('object %s:%s found', dump(oid), dump(tid)) logging.info('object %s:%s found', dump(oid), dump(tid))
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
if app.asking_uuid_dict.get(uuid, True): if app.asking_uuid_dict.get(uuid, True):
# No interest. # No interest.
return return
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
@identification_required
@restrict_node_types(STORAGE_NODE_TYPE)
def handleOidNotFound(self, conn, packet, message): def handleOidNotFound(self, conn, packet, message):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
logging.info('OID not found: %s', message) logging.info('OID not found: %s', message)
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != STORAGE_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
if app.asking_uuid_dict.get(uuid, True): if app.asking_uuid_dict.get(uuid, True):
# No interest. # No interest.
return return
app.object_present = False app.object_present = False
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment