Commit aded81bc authored by Grégory Wisniewski's avatar Grégory Wisniewski

Use decorators and UnexpectedPacketError exception instead of calls

handleUnexpectedPacket() in storage handlers.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@505 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 863a3207
...@@ -23,10 +23,12 @@ from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ ...@@ -23,10 +23,12 @@ from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.protocol import Packet from neo.protocol import Packet, UnexpectedPacketError
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.storage.verification import VerificationEventHandler from neo.storage.verification import VerificationEventHandler
from neo.util import dump from neo.util import dump
from neo.handler import identification_required, restrict_node_types, \
server_connection_required, client_connection_required
class BootstrapEventHandler(StorageEventHandler): class BootstrapEventHandler(StorageEventHandler):
"""This class deals with events for a bootstrap phase.""" """This class deals with events for a bootstrap phase."""
...@@ -105,144 +107,138 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -105,144 +107,138 @@ class BootstrapEventHandler(StorageEventHandler):
conn.close() conn.close()
@server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
if not conn.isServerConnection(): app = self.app
self.handleUnexpectedPacket(conn, packet) if node_type != MASTER_NODE_TYPE:
else: logging.info('reject a connection from a non-master')
app = self.app conn.answer(protocol.notReady('retry later'), packet)
if node_type != MASTER_NODE_TYPE: conn.abort()
logging.info('reject a connection from a non-master') return
conn.answer(protocol.notReady('retry later'), packet) if name != app.name:
conn.abort() logging.error('reject an alien cluster')
return conn.answer(protocol.protocolError('invalid cluster name'), packet)
if name != app.name:
logging.error('reject an alien cluster')
conn.answer(protocol.protocolError('invalid cluster name'), packet)
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError('go away')
conn.answer(p, packet)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], 0, 0, uuid)
conn.answer(p, packet)
# Now the master node should know that I am not the right one.
conn.abort() conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError('go away')
conn.answer(p, packet)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], 0, 0, uuid)
conn.answer(p, packet)
# Now the master node should know that I am not the right one.
conn.abort()
@client_connection_required
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas, your_uuid): num_partitions, num_replicas, your_uuid):
if conn.isServerConnection(): app = self.app
self.handleUnexpectedPacket(conn, packet) node = app.nm.getNodeByServer(conn.getAddress())
else: if node_type != MASTER_NODE_TYPE:
app = self.app # The peer is not a master node!
node = app.nm.getNodeByServer(conn.getAddress()) logging.error('%s:%d is not a master node', ip_address, port)
if node_type != MASTER_NODE_TYPE: app.nm.remove(node)
# The peer is not a master node! conn.close()
logging.error('%s:%d is not a master node', ip_address, port) return
app.nm.remove(node) if conn.getAddress() != (ip_address, port):
conn.close() # The server address is different! Then why was
return # the connection successful?
if conn.getAddress() != (ip_address, port): logging.error('%s:%d is waiting for %s:%d',
# The server address is different! Then why was conn.getAddress()[0], conn.getAddress()[1],
# the connection successful? ip_address, port)
logging.error('%s:%d is waiting for %s:%d', app.nm.remove(node)
conn.getAddress()[0], conn.getAddress()[1], conn.close()
ip_address, port) return
app.nm.remove(node)
conn.close() if app.num_partitions is None or app.num_replicas is None or \
return app.num_replicas != num_replicas:
# changing number of replicas is not an issue
if app.num_partitions is None or app.num_replicas is None or \ app.num_partitions = num_partitions
app.num_replicas != num_replicas: app.dm.setNumPartitions(app.num_partitions)
# changing number of replicas is not an issue app.num_replicas = num_replicas
app.num_partitions = num_partitions app.dm.setNumReplicas(app.num_replicas)
app.dm.setNumPartitions(app.num_partitions) app.pt = PartitionTable(num_partitions, num_replicas)
app.num_replicas = num_replicas app.loadPartitionTable()
app.dm.setNumReplicas(app.num_replicas) app.ptid = app.dm.getPTID()
app.pt = PartitionTable(num_partitions, num_replicas) elif app.num_partitions != num_partitions:
app.loadPartitionTable() raise RuntimeError('the number of partitions is inconsistent')
app.ptid = app.dm.getPTID()
elif app.num_partitions != num_partitions:
raise RuntimeError('the number of partitions is inconsistent') if your_uuid != INVALID_UUID and app.uuid != your_uuid:
# got an uuid from the primary master
app.uuid = your_uuid
if your_uuid != INVALID_UUID and app.uuid != your_uuid: app.dm.setUUID(app.uuid)
# got an uuid from the primary master logging.info('Got a new UUID from master : %s' % dump(app.uuid))
app.uuid = your_uuid
app.dm.setUUID(app.uuid) conn.setUUID(uuid)
logging.info('Got a new UUID from master : %s' % dump(app.uuid)) node.setUUID(uuid)
conn.setUUID(uuid) # Ask a primary master.
node.setUUID(uuid) conn.ask(protocol.askPrimaryMaster())
# Ask a primary master. @client_connection_required
conn.ask(protocol.askPrimaryMaster())
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
if conn.isServerConnection(): app = self.app
self.handleUnexpectedPacket(conn, packet) # Register new master nodes.
else: for ip_address, port, uuid in known_master_list:
app = self.app addr = (ip_address, port)
# Register new master nodes. n = app.nm.getNodeByServer(addr)
for ip_address, port, uuid in known_master_list: if n is None:
addr = (ip_address, port) n = MasterNode(server = addr)
n = app.nm.getNodeByServer(addr) app.nm.add(n)
if n is None:
n = MasterNode(server = addr) if uuid != INVALID_UUID:
app.nm.add(n) # If I don't know the UUID yet, believe what the peer
# told me at the moment.
if uuid != INVALID_UUID: if n.getUUID() is None or n.getUUID() != uuid:
# If I don't know the UUID yet, believe what the peer n.setUUID(uuid)
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid: if primary_uuid != INVALID_UUID:
n.setUUID(uuid) primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
if primary_uuid != INVALID_UUID: # I don't know such a node. Probably this information
primary_node = app.nm.getNodeByUUID(primary_uuid) # is old. So ignore it.
if primary_node is None: pass
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
app.primary_master_node = primary_node
if app.trying_master_node is primary_node:
# I am connected to the right one.
logging.info('connected to a primary master node')
# This is a workaround to prevent handling of
# packets for the verification phase.
handler = VerificationEventHandler(app)
conn.setHandler(handler)
else:
app.trying_master_node = None
conn.close()
else: else:
if app.primary_master_node is not None: app.primary_master_node = primary_node
# The primary master node is not a primary master node if app.trying_master_node is primary_node:
# any longer. # I am connected to the right one.
app.primary_master_node = None logging.info('connected to a primary master node')
# This is a workaround to prevent handling of
# packets for the verification phase.
handler = VerificationEventHandler(app)
conn.setHandler(handler)
else:
app.trying_master_node = None
conn.close()
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
logging.warning('/!\ handleAskLastIDs') logging.warning('/!\ handleAskLastIDs')
......
...@@ -18,13 +18,14 @@ ...@@ -18,13 +18,14 @@
import logging import logging
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.protocol import Packet, \ from neo.protocol import Packet, UnexpectedPacketError, \
INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.util import dump from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.exception import PrimaryFailure from neo.exception import PrimaryFailure
from neo.handler import identification_required, restrict_node_types
class StorageEventHandler(EventHandler): class StorageEventHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
...@@ -69,24 +70,17 @@ class StorageEventHandler(EventHandler): ...@@ -69,24 +70,17 @@ class StorageEventHandler(EventHandler):
known_master_list): known_master_list):
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
@identification_required
@restrict_node_types(MASTER_NODE_TYPE)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
"""Theoretically speaking, I should not get this message, """Theoretically speaking, I should not get this message,
because the primary master election must happen when I am because the primary master election must happen when I am
not connected to any master node.""" not connected to any master node."""
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node is None: if node is None:
raise RuntimeError('I do not know the uuid %r' % dump(uuid)) raise RuntimeError('I do not know the uuid %r' % dump(uuid))
if node.getNodeType() != MASTER_NODE_TYPE:
self.handleUnexpectedPacket(conn, packet)
return
if app.primary_master_node is None: if app.primary_master_node is None:
# Hmm... I am somehow connected to the primary master already. # Hmm... I am somehow connected to the primary master already.
app.primary_master_node = node app.primary_master_node = node
...@@ -106,19 +100,16 @@ class StorageEventHandler(EventHandler): ...@@ -106,19 +100,16 @@ class StorageEventHandler(EventHandler):
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise PrimaryFailure('re-election occurs') raise PrimaryFailure('re-election occurs')
@identification_required
@restrict_node_types(MASTER_NODE_TYPE)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
"""Store information on nodes, only if this is sent by a primary """Store information on nodes, only if this is sent by a primary
master node.""" master node."""
# XXX it might be better to implement this callback in each handler. # XXX it might be better to implement this callback in each handler.
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() != MASTER_NODE_TYPE \ if app.primary_master_node is None \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid: or app.primary_master_node.getUUID() != uuid:
return return
...@@ -209,21 +200,21 @@ class StorageEventHandler(EventHandler): ...@@ -209,21 +200,21 @@ class StorageEventHandler(EventHandler):
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
def handleAskObject(self, conn, packet, oid, serial, tid): def handleAskObject(self, conn, packet, oid, serial, tid):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAskTIDs(self, conn, packet, first, last, partition): def handleAskTIDs(self, conn, packet, first, last, partition):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAskObjectHistory(self, conn, packet, oid, first, last): def handleAskObjectHistory(self, conn, packet, oid, first, last):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAskStoreTransaction(self, conn, packet, tid, user, desc, def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
ext, oid_list): ext, oid_list):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAskStoreObject(self, conn, packet, oid, serial, def handleAskStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAbortTransaction(self, conn, packet, tid): def handleAbortTransaction(self, conn, packet, tid):
logging.info('ignoring abort transaction') logging.info('ignoring abort transaction')
......
...@@ -19,16 +19,18 @@ import logging ...@@ -19,16 +19,18 @@ import logging
from neo import protocol from neo import protocol
from neo.storage.handler import StorageEventHandler from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, INVALID_SERIAL, INVALID_TID, \ from neo.protocol import INVALID_SERIAL, INVALID_TID, \
INVALID_PARTITION, \ INVALID_PARTITION, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DISCARDED_STATE, OUT_OF_DATE_STATE DISCARDED_STATE, OUT_OF_DATE_STATE
from neo.util import dump from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.protocol import Packet from neo.protocol import Packet, UnexpectedPacketError
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import PrimaryFailure, OperationFailure
from neo.handler import identification_required, restrict_node_types, \
server_connection_required, client_connection_required
class TransactionInformation(object): class TransactionInformation(object):
"""This class represents information on a transaction.""" """This class represents information on a transaction."""
...@@ -131,123 +133,115 @@ class OperationEventHandler(StorageEventHandler): ...@@ -131,123 +133,115 @@ class OperationEventHandler(StorageEventHandler):
StorageEventHandler.peerBroken(self, conn) StorageEventHandler.peerBroken(self, conn)
@server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
if not conn.isServerConnection(): app = self.app
self.handleUnexpectedPacket(conn, packet) if name != app.name:
else: logging.error('reject an alien cluster')
app = self.app p = protocol.protocolError('invalid cluster name')
if name != app.name: conn.answer(p, packet)
logging.error('reject an alien cluster') conn.abort()
p = protocol.protocolError('invalid cluster name') return
conn.answer(p, packet)
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
if node_type == MASTER_NODE_TYPE:
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If I do not know such a node, and it is not even a master
# node, simply reject it.
logging.error('reject an unknown node %s', dump(uuid))
conn.answer(protocol.notReady('unknown node'), packet)
conn.abort() conn.abort()
return return
else:
addr = (ip_address, port) # If this node is broken, reject it.
node = app.nm.getNodeByUUID(uuid) if node.getUUID() == uuid:
if node is None: if node.getState() == BROKEN_STATE:
if node_type == MASTER_NODE_TYPE: p = protocol.brokenNodeDisallowedError('go away')
node = app.nm.getNodeByServer(addr) conn.answer(p, packet)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If I do not know such a node, and it is not even a master
# node, simply reject it.
logging.error('reject an unknown node %s', dump(uuid))
conn.answer(protocol.notReady('unknown node'), packet)
conn.abort() conn.abort()
return return
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError('go away')
conn.answer(p, packet)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.num_partitions,
app.num_replicas, uuid)
conn.answer(p, packet)
if node_type == MASTER_NODE_TYPE: # Trust the UUID sent by the peer.
conn.abort() node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.num_partitions,
app.num_replicas, uuid)
conn.answer(p, packet)
if node_type == MASTER_NODE_TYPE:
conn.abort()
@client_connection_required
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas, your_uuid): num_partitions, num_replicas, your_uuid):
if not conn.isServerConnection(): raise NotImplementedError
raise NotImplementedError
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAskPartitionTable(self, conn, packet, offset_list): def handleAskPartitionTable(self, conn, packet, offset_list):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleSendPartitionTable(self, conn, packet, ptid, row_list): def handleSendPartitionTable(self, conn, packet, ptid, row_list):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
@client_connection_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
if not conn.isServerConnection(): app = self.app
app = self.app nm = app.nm
nm = app.nm pt = app.pt
pt = app.pt if app.ptid >= ptid:
if app.ptid >= ptid: # Ignore this packet.
# Ignore this packet. logging.info('ignoring older partition changes')
logging.info('ignoring older partition changes') return
return
# First, change the table on memory. # First, change the table on memory.
app.ptid = ptid app.ptid = ptid
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node is None: if node is None:
node = StorageNode(uuid = uuid) node = StorageNode(uuid = uuid)
if uuid != app.uuid: if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node) nm.add(node)
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
if uuid == app.uuid: if uuid == app.uuid:
# If this is for myself, this can affect replications. # If this is for myself, this can affect replications.
if state == DISCARDED_STATE: if state == DISCARDED_STATE:
app.replicator.removePartition(offset) app.replicator.removePartition(offset)
elif state == OUT_OF_DATE_STATE: elif state == OUT_OF_DATE_STATE:
app.replicator.addPartition(offset) app.replicator.addPartition(offset)
# Then, the database. # Then, the database.
app.dm.changePartitionTable(ptid, cell_list) app.dm.changePartitionTable(ptid, cell_list)
else:
self.handleUnexpectedPacket(conn, packet)
def handleStartOperation(self, conn, packet): def handleStartOperation(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
@client_connection_required
def handleStopOperation(self, conn, packet): def handleStopOperation(self, conn, packet):
if not conn.isServerConnection(): raise OperationFailure('operation stopped')
raise OperationFailure('operation stopped')
else:
self.handleUnexpectedPacket(conn, packet)
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleAskTransactionInformation(self, conn, packet, tid): def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app app = self.app
...@@ -260,51 +254,46 @@ class OperationEventHandler(StorageEventHandler): ...@@ -260,51 +254,46 @@ class OperationEventHandler(StorageEventHandler):
conn.answer(p, packet) conn.answer(p, packet)
def handleAskObjectPresent(self, conn, packet, oid, tid): def handleAskObjectPresent(self, conn, packet, oid, tid):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleDeleteTransaction(self, conn, packet, tid): def handleDeleteTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
def handleCommitTransaction(self, conn, packet, tid): def handleCommitTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
@client_connection_required
def handleLockInformation(self, conn, packet, tid): def handleLockInformation(self, conn, packet, tid):
if not conn.isServerConnection(): app = self.app
app = self.app try:
try: t = app.transaction_dict[tid]
t = app.transaction_dict[tid] object_list = t.getObjectList()
object_list = t.getObjectList() for o in object_list:
for o in object_list: app.load_lock_dict[o[0]] = tid
app.load_lock_dict[o[0]] = tid
app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError:
pass
conn.answer(protocol.notifyInformationLocked(tid), packet) app.dm.storeTransaction(tid, object_list, t.getTransaction())
else: except KeyError:
self.handleUnexpectedPacket(conn, packet) pass
conn.answer(protocol.notifyInformationLocked(tid), packet)
@client_connection_required
def handleUnlockInformation(self, conn, packet, tid): def handleUnlockInformation(self, conn, packet, tid):
if not conn.isServerConnection(): app = self.app
app = self.app try:
try: t = app.transaction_dict[tid]
t = app.transaction_dict[tid] object_list = t.getObjectList()
object_list = t.getObjectList() for o in object_list:
for o in object_list: oid = o[0]
oid = o[0] del app.load_lock_dict[oid]
del app.load_lock_dict[oid] del app.store_lock_dict[oid]
del app.store_lock_dict[oid]
app.dm.finishTransaction(tid) app.dm.finishTransaction(tid)
del app.transaction_dict[tid] del app.transaction_dict[tid]
# Now it may be possible to execute some events. # Now it may be possible to execute some events.
app.executeQueuedEvents() app.executeQueuedEvents()
except KeyError: except KeyError:
pass pass
else:
self.handleUnexpectedPacket(conn, packet)
def handleAskObject(self, conn, packet, oid, serial, tid): def handleAskObject(self, conn, packet, oid, serial, tid):
app = self.app app = self.app
...@@ -369,25 +358,19 @@ class OperationEventHandler(StorageEventHandler): ...@@ -369,25 +358,19 @@ class OperationEventHandler(StorageEventHandler):
p = protocol.answerObjectHistory(oid, history_list) p = protocol.answerObjectHistory(oid, history_list)
conn.answer(p, packet) conn.answer(p, packet)
@identification_required
def handleAskStoreTransaction(self, conn, packet, tid, user, desc, def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
ext, oid_list): ext, oid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid)) t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addTransaction(oid_list, user, desc, ext) t.addTransaction(oid_list, user, desc, ext)
conn.answer(protocol.answerStoreTransaction(tid), packet) conn.answer(protocol.answerStoreTransaction(tid), packet)
@identification_required
def handleAskStoreObject(self, conn, packet, oid, serial, def handleAskStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
# First, check for the locking state. # First, check for the locking state.
app = self.app app = self.app
locking_tid = app.store_lock_dict.get(oid) locking_tid = app.store_lock_dict.get(oid)
...@@ -421,12 +404,9 @@ class OperationEventHandler(StorageEventHandler): ...@@ -421,12 +404,9 @@ class OperationEventHandler(StorageEventHandler):
conn.answer(p, packet) conn.answer(p, packet)
app.store_lock_dict[oid] = tid app.store_lock_dict[oid] = tid
@identification_required
def handleAbortTransaction(self, conn, packet, tid): def handleAbortTransaction(self, conn, packet, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app app = self.app
try: try:
t = app.transaction_dict[tid] t = app.transaction_dict[tid]
...@@ -446,17 +426,13 @@ class OperationEventHandler(StorageEventHandler): ...@@ -446,17 +426,13 @@ class OperationEventHandler(StorageEventHandler):
except KeyError: except KeyError:
pass pass
@client_connection_required
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid): def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
if not conn.isServerConnection(): self.app.replicator.setCriticalTID(packet, ltid)
self.app.replicator.setCriticalTID(packet, ltid)
else:
self.handleUnexpectedPacket(conn, packet)
@client_connection_required
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list): def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
if not conn.isServerConnection(): self.app.replicator.setUnfinishedTIDList(tid_list)
self.app.replicator.setUnfinishedTIDList(tid_list)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAskOIDs(self, conn, packet, first, last, partition): def handleAskOIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return OIDs only # This method is complicated, because I must return OIDs only
......
...@@ -109,6 +109,14 @@ server: 127.0.0.1:10020 ...@@ -109,6 +109,14 @@ server: 127.0.0.1:10020
def getLastUUID(self): def getLastUUID(self):
return self.uuid return self.uuid
def checkUnexpectedPacketRaised(self, method, *args, **kwargs):
""" Check if the UnexpectedPacketError exception wxas raised """
self.assertRaises(UnexpectedPacketError, method, *args, **kwargs)
def checkIdenficationRequired(self, method, *args, **kwargs):
""" Check is the identification_required decorator is applied """
self.checkUnexpectedPacketRaised(method, *args, **kwargs)
# Method to test the kind of packet returned in answer # Method to test the kind of packet returned in answer
def checkCalledRequestNodeIdentification(self, conn, packet_number=0): def checkCalledRequestNodeIdentification(self, conn, packet_number=0):
""" Check Request Node Identification has been send""" """ Check Request Node Identification has been send"""
......
...@@ -50,6 +50,14 @@ class StorageOperationTests(unittest.TestCase): ...@@ -50,6 +50,14 @@ class StorageOperationTests(unittest.TestCase):
return min(ptids), max(ptids) return min(ptids), max(ptids)
ptid = min(ptids) ptid = min(ptids)
def checkUnexpectedPacketRaised(self, method, *args, **kwargs):
""" Check if the UnexpectedPacketError exception wxas raised """
self.assertRaises(UnexpectedPacketError, method, *args, **kwargs)
def checkIdenficationRequired(self, method, *args, **kwargs):
""" Check is the identification_required decorator is applied """
self.checkUnexpectedPacketRaised(method, *args, **kwargs)
def checkCalledAbort(self, conn, packet_number=0): def checkCalledAbort(self, conn, packet_number=0):
"""Check the abort method has been called and an error packet has been sent""" """Check the abort method has been called and an error packet has been sent"""
# sometimes we answer an error, sometimes we just send it # sometimes we answer an error, sometimes we just send it
...@@ -81,9 +89,7 @@ class StorageOperationTests(unittest.TestCase): ...@@ -81,9 +89,7 @@ class StorageOperationTests(unittest.TestCase):
packet = Packet(msg_type=_msg_type) packet = Packet(msg_type=_msg_type)
# hook # hook
self.operation.peerBroken = lambda c: c.peerBrokendCalled() self.operation.peerBroken = lambda c: c.peerBrokendCalled()
_call(conn=conn, packet=packet, **kwargs) self.checkUnexpectedPacketRaised(_call, conn=conn, packet=packet, **kwargs)
self.checkCalledAbort(conn)
self.assertEquals(len(conn.mockGetNamedCalls("peerBrokendCalled")), 1)
def checkNoPacketSent(self, conn): def checkNoPacketSent(self, conn):
# no packet should be sent # no packet should be sent
......
...@@ -36,7 +36,7 @@ from neo.protocol import ACCEPT_NODE_IDENTIFICATION, REQUEST_NODE_IDENTIFICATION ...@@ -36,7 +36,7 @@ from neo.protocol import ACCEPT_NODE_IDENTIFICATION, REQUEST_NODE_IDENTIFICATION
UNLOCK_INFORMATION, TID_NOT_FOUND_CODE, ASK_TRANSACTION_INFORMATION, ANSWER_TRANSACTION_INFORMATION, \ UNLOCK_INFORMATION, TID_NOT_FOUND_CODE, ASK_TRANSACTION_INFORMATION, ANSWER_TRANSACTION_INFORMATION, \
ANSWER_PARTITION_TABLE,SEND_PARTITION_TABLE, COMMIT_TRANSACTION ANSWER_PARTITION_TABLE,SEND_PARTITION_TABLE, COMMIT_TRANSACTION
from neo.protocol import ERROR, BROKEN_NODE_DISALLOWED_CODE, ASK_PRIMARY_MASTER from neo.protocol import ERROR, BROKEN_NODE_DISALLOWED_CODE, ASK_PRIMARY_MASTER
from neo.protocol import ANSWER_PRIMARY_MASTER from neo.protocol import ANSWER_PRIMARY_MASTER, UnexpectedPacketError
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import PrimaryFailure, OperationFailure
from neo.storage.mysqldb import MySQLDatabaseManager, p64, u64 from neo.storage.mysqldb import MySQLDatabaseManager, p64, u64
...@@ -127,6 +127,14 @@ server: 127.0.0.1:10020 ...@@ -127,6 +127,14 @@ server: 127.0.0.1:10020
return min(ptids), max(ptids) return min(ptids), max(ptids)
ptid = min(ptids) ptid = min(ptids)
def checkUnexpectedPacketRaised(self, method, *args, **kwargs):
""" Check if the UnexpectedPacketError exception wxas raised """
self.assertRaises(UnexpectedPacketError, method, *args, **kwargs)
def checkIdenficationRequired(self, method, *args, **kwargs):
""" Check is the identification_required decorator is applied """
self.checkUnexpectedPacketRaised(method, *args, **kwargs)
def checkCalledAbort(self, conn, packet_number=0): def checkCalledAbort(self, conn, packet_number=0):
"""Check the abort method has been called and an error packet has been sent""" """Check the abort method has been called and an error packet has been sent"""
# sometimes we answer an error, sometimes we just notify it # sometimes we answer an error, sometimes we just notify it
...@@ -306,9 +314,8 @@ server: 127.0.0.1:10020 ...@@ -306,9 +314,8 @@ server: 127.0.0.1:10020
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
p = Packet(msg_type=ACCEPT_NODE_IDENTIFICATION) p = Packet(msg_type=ACCEPT_NODE_IDENTIFICATION)
self.verification.handleAcceptNodeIdentification(conn, p, CLIENT_NODE_TYPE, self.checkUnexpectedPacketRaised(self.verification.handleAcceptNodeIdentification,
self.getNewUUID(),"127.0.0.1", self.client_port, 1009, 2, uuid) conn, p, CLIENT_NODE_TYPE, self.getNewUUID(),"127.0.0.1", self.client_port, 1009, 2, uuid)
self.checkCalledAbort(conn)
def test_07_handleAnswerPrimaryMaster(self): def test_07_handleAnswerPrimaryMaster(self):
# reject server connection # reject server connection
...@@ -317,8 +324,7 @@ server: 127.0.0.1:10020 ...@@ -317,8 +324,7 @@ server: 127.0.0.1:10020
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
self.verification.handleAnswerPrimaryMaster(conn, packet,self.getNewUUID(), ()) self.checkUnexpectedPacketRaised(self.verification.handleAnswerPrimaryMaster, conn, packet,self.getNewUUID(), ())
self.checkCalledAbort(conn)
# raise id uuid is different # raise id uuid is different
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
...@@ -343,8 +349,7 @@ server: 127.0.0.1:10020 ...@@ -343,8 +349,7 @@ server: 127.0.0.1:10020
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
self.verification.handleAskLastIDs(conn, packet) self.checkUnexpectedPacketRaised(self.verification.handleAskLastIDs, conn, packet)
self.checkCalledAbort(conn)
# return invalid if db store nothing # return invalid if db store nothing
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
...@@ -402,8 +407,7 @@ server: 127.0.0.1:10020 ...@@ -402,8 +407,7 @@ server: 127.0.0.1:10020
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
self.verification.handleAskPartitionTable(conn, packet, [1,]) self.checkUnexpectedPacketRaised(self.verification.handleAskPartitionTable, conn, packet, [1,])
self.checkCalledAbort(conn)
# try to get unknown offset # try to get unknown offset
self.assertEqual(len(self.app.pt.getNodeList()), 0) self.assertEqual(len(self.app.pt.getNodeList()), 0)
...@@ -449,9 +453,8 @@ server: 127.0.0.1:10020 ...@@ -449,9 +453,8 @@ server: 127.0.0.1:10020
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
self.app.ptid = 1 self.app.ptid = 1
self.verification.handleSendPartitionTable(conn, packet, 0, ()) self.checkUnexpectedPacketRaised(self.verification.handleSendPartitionTable, conn, packet, 0, ())
self.assertEquals(self.app.ptid, 1) self.assertEquals(self.app.ptid, 1)
self.checkCalledAbort(conn)
# send a table # send a table
conn = Mock({"getUUID" : uuid, conn = Mock({"getUUID" : uuid,
...@@ -496,9 +499,8 @@ server: 127.0.0.1:10020 ...@@ -496,9 +499,8 @@ server: 127.0.0.1:10020
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
self.app.ptid = 1 self.app.ptid = 1
self.verification.handleNotifyPartitionChanges(conn, packet, 0, ()) self.checkUnexpectedPacketRaised(self.verification.handleNotifyPartitionChanges, conn, packet, 0, ())
self.assertEquals(self.app.ptid, 1) self.assertEquals(self.app.ptid, 1)
self.checkCalledAbort(conn)
# old partition change # old partition change
conn = Mock({ conn = Mock({
...@@ -534,8 +536,7 @@ server: 127.0.0.1:10020 ...@@ -534,8 +536,7 @@ server: 127.0.0.1:10020
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': True }) 'isServerConnection': True })
packet = Packet(msg_type=STOP_OPERATION) packet = Packet(msg_type=STOP_OPERATION)
self.verification.handleStartOperation(conn, packet) self.checkUnexpectedPacketRaised(self.verification.handleStartOperation, conn, packet)
self.checkCalledAbort(conn)
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': False }) 'isServerConnection': False })
self.assertFalse(self.app.operational) self.assertFalse(self.app.operational)
...@@ -547,8 +548,7 @@ server: 127.0.0.1:10020 ...@@ -547,8 +548,7 @@ server: 127.0.0.1:10020
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': True }) 'isServerConnection': True })
packet = Packet(msg_type=STOP_OPERATION) packet = Packet(msg_type=STOP_OPERATION)
self.verification.handleStopOperation(conn, packet) self.checkUnexpectedPacketRaised(self.verification.handleStopOperation, conn, packet)
self.checkCalledAbort(conn)
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': False }) 'isServerConnection': False })
packet = Packet(msg_type=STOP_OPERATION) packet = Packet(msg_type=STOP_OPERATION)
...@@ -559,8 +559,7 @@ server: 127.0.0.1:10020 ...@@ -559,8 +559,7 @@ server: 127.0.0.1:10020
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': True }) 'isServerConnection': True })
packet = Packet(msg_type=ASK_UNFINISHED_TRANSACTIONS) packet = Packet(msg_type=ASK_UNFINISHED_TRANSACTIONS)
self.verification.handleAskUnfinishedTransactions(conn, packet) self.checkUnexpectedPacketRaised(self.verification.handleAskUnfinishedTransactions, conn, packet)
self.checkCalledAbort(conn)
# client connection with no data # client connection with no data
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': False}) 'isServerConnection': False})
...@@ -688,8 +687,7 @@ server: 127.0.0.1:10020 ...@@ -688,8 +687,7 @@ server: 127.0.0.1:10020
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': True }) 'isServerConnection': True })
packet = Packet(msg_type=ASK_OBJECT_PRESENT) packet = Packet(msg_type=ASK_OBJECT_PRESENT)
self.verification.handleAskObjectPresent(conn, packet, p64(1), p64(2)) self.checkUnexpectedPacketRaised(self.verification.handleAskObjectPresent, conn, packet, p64(1), p64(2))
self.checkCalledAbort(conn)
# client connection with no data # client connection with no data
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': False}) 'isServerConnection': False})
...@@ -724,8 +722,7 @@ server: 127.0.0.1:10020 ...@@ -724,8 +722,7 @@ server: 127.0.0.1:10020
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': True }) 'isServerConnection': True })
packet = Packet(msg_type=ASK_OBJECT_PRESENT) packet = Packet(msg_type=ASK_OBJECT_PRESENT)
self.verification.handleDeleteTransaction(conn, packet, p64(1)) self.checkUnexpectedPacketRaised(self.verification.handleDeleteTransaction, conn, packet, p64(1))
self.checkCalledAbort(conn)
# client connection with no data # client connection with no data
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServerConnection': False}) 'isServerConnection': False})
...@@ -747,8 +744,7 @@ server: 127.0.0.1:10020 ...@@ -747,8 +744,7 @@ server: 127.0.0.1:10020
dm = Mock() dm = Mock()
self.app.dm = dm self.app.dm = dm
packet = Packet(msg_type=COMMIT_TRANSACTION) packet = Packet(msg_type=COMMIT_TRANSACTION)
self.verification.handleCommitTransaction(conn, packet, p64(1)) self.checkUnexpectedPacketRaised(self.verification.handleCommitTransaction, conn, packet, p64(1))
self.checkCalledAbort(conn)
self.assertEqual(len(dm.mockGetNamedCalls("finishTransaction")), 0) self.assertEqual(len(dm.mockGetNamedCalls("finishTransaction")), 0)
# commit a transaction # commit a transaction
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port), conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
......
...@@ -20,13 +20,15 @@ import logging ...@@ -20,13 +20,15 @@ import logging
from neo import protocol from neo import protocol
from neo.storage.handler import StorageEventHandler from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_OID, INVALID_TID, \ from neo.protocol import INVALID_OID, INVALID_TID, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
Packet Packet, UnexpectedPacketError
from neo.util import dump from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import PrimaryFailure, OperationFailure
from neo.handler import identification_required, restrict_node_types, \
server_connection_required, client_connection_required
class VerificationEventHandler(StorageEventHandler): class VerificationEventHandler(StorageEventHandler):
"""This class deals with events for a verification phase.""" """This class deals with events for a verification phase."""
...@@ -61,149 +63,105 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -61,149 +63,105 @@ class VerificationEventHandler(StorageEventHandler):
StorageEventHandler.peerBroken(self, conn) StorageEventHandler.peerBroken(self, conn)
@server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
if not conn.isServerConnection(): app = self.app
self.handleUnexpectedPacket(conn, packet) if node_type != MASTER_NODE_TYPE:
else: logging.info('reject a connection from a non-master')
app = self.app conn.answer(protocol.notReady('retry later'), packet)
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
conn.answer(protocol.notReady('retry later'), packet)
conn.abort()
return
if name != app.name:
logging.error('reject an alien cluster')
conn.answer(protocol.protocolError(
'invalid cluster name'), packet)
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError('go away')
conn.answer(p, packet)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.num_partitions,
app.num_replicas, uuid)
conn.answer(p, packet)
# Now the master node should know that I am not the right one.
conn.abort() conn.abort()
return
if name != app.name:
logging.error('reject an alien cluster')
conn.answer(protocol.protocolError(
'invalid cluster name'), packet)
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = protocol.brokenNodeDisallowedError('go away')
conn.answer(p, packet)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.num_partitions,
app.num_replicas, uuid)
conn.answer(p, packet)
# Now the master node should know that I am not the right one.
conn.abort()
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas, your_uuid): num_partitions, num_replicas, your_uuid):
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
@client_connection_required
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
if not conn.isServerConnection(): app = self.app
app = self.app if app.primary_master_node.getUUID() != primary_uuid:
if app.primary_master_node.getUUID() != primary_uuid: raise PrimaryFailure('the primary master node seems to have changed')
raise PrimaryFailure('the primary master node seems to have changed') # XXX is it better to deal with known_master_list here?
# XXX is it better to deal with known_master_list here? # But a primary master node is supposed not to send any info
# But a primary master node is supposed not to send any info # with this packet, so it would be useless.
# with this packet, so it would be useless.
else:
self.handleUnexpectedPacket(conn, packet)
@client_connection_required
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
if not conn.isServerConnection(): app = self.app
app = self.app oid = app.dm.getLastOID() or INVALID_OID
oid = app.dm.getLastOID() or INVALID_OID tid = app.dm.getLastTID() or INVALID_TID
tid = app.dm.getLastTID() or INVALID_TID p = protocol.answerLastIDs(oid, tid, app.ptid)
p = protocol.answerLastIDs(oid, tid, app.ptid) conn.answer(p, packet)
conn.answer(p, packet)
else:
self.handleUnexpectedPacket(conn, packet)
@client_connection_required
def handleAskPartitionTable(self, conn, packet, offset_list): def handleAskPartitionTable(self, conn, packet, offset_list):
if not conn.isServerConnection(): app = self.app
app = self.app row_list = []
row_list = [] try:
try: for offset in offset_list:
for offset in offset_list: row = []
row = [] try:
try: for cell in app.pt.getCellList(offset):
for cell in app.pt.getCellList(offset): row.append((cell.getUUID(), cell.getState()))
row.append((cell.getUUID(), cell.getState())) except TypeError:
except TypeError: pass
pass row_list.append((offset, row))
row_list.append((offset, row)) except IndexError:
except IndexError: p = protocol.protocolError( 'invalid partition table offset')
p = protocol.protocolError( 'invalid partition table offset') conn.answer(p, packer)
conn.answer(p, packer) return
return
p = protocol.answerPartitionTable(app.ptid, row_list)
p = protocol.answerPartitionTable(app.ptid, row_list) conn.answer(p, packet)
conn.answer(p, packet)
else:
self.handleUnexpectedPacket(conn, packet)
@client_connection_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list): def handleSendPartitionTable(self, conn, packet, ptid, row_list):
"""A primary master node sends this packet to synchronize a partition """A primary master node sends this packet to synchronize a partition
table. Note that the message can be split into multiple packets.""" table. Note that the message can be split into multiple packets."""
if not conn.isServerConnection(): app = self.app
app = self.app nm = app.nm
nm = app.nm pt = app.pt
pt = app.pt if app.ptid != ptid:
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
if pt.filled():
# If the table is filled, I assume that the table is ready
# to use. Thus install it into the database for persistency.
cell_list = []
for offset in xrange(app.num_partitions):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(),
cell.getState()))
app.dm.setPartitionTable(ptid, cell_list)
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
if not conn.isServerConnection():
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
logging.info('ignoring older partition changes')
return
# First, change the table on memory.
app.ptid = ptid app.ptid = ptid
for offset, uuid, state in cell_list: pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node is None: if node is None:
node = StorageNode(uuid = uuid) node = StorageNode(uuid = uuid)
...@@ -213,31 +171,56 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -213,31 +171,56 @@ class VerificationEventHandler(StorageEventHandler):
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
# Then, the database. if pt.filled():
app.dm.changePartitionTable(ptid, cell_list) # If the table is filled, I assume that the table is ready
else: # to use. Thus install it into the database for persistency.
self.handleUnexpectedPacket(conn, packet) cell_list = []
for offset in xrange(app.num_partitions):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(),
cell.getState()))
app.dm.setPartitionTable(ptid, cell_list)
@client_connection_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
logging.info('ignoring older partition changes')
return
# First, change the table on memory.
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
# Then, the database.
app.dm.changePartitionTable(ptid, cell_list)
@client_connection_required
def handleStartOperation(self, conn, packet): def handleStartOperation(self, conn, packet):
if not conn.isServerConnection(): self.app.operational = True
self.app.operational = True
else:
self.handleUnexpectedPacket(conn, packet)
@client_connection_required
def handleStopOperation(self, conn, packet): def handleStopOperation(self, conn, packet):
if not conn.isServerConnection(): raise OperationFailure('operation stopped')
raise OperationFailure('operation stopped')
else:
self.handleUnexpectedPacket(conn, packet)
@client_connection_required
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
if not conn.isServerConnection(): tid_list = self.app.dm.getUnfinishedTIDList()
app = self.app p = protocol.answerUnfinishedTransactions(tid_list)
tid_list = app.dm.getUnfinishedTIDList() conn.answer(p, packet)
p = protocol.answerUnfinishedTransactions(tid_list)
conn.answer(p, packet)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAskTransactionInformation(self, conn, packet, tid): def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app app = self.app
...@@ -255,31 +238,22 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -255,31 +238,22 @@ class VerificationEventHandler(StorageEventHandler):
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0]) p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet) conn.answer(p, packet)
@client_connection_required
def handleAskObjectPresent(self, conn, packet, oid, tid): def handleAskObjectPresent(self, conn, packet, oid, tid):
if not conn.isServerConnection(): if self.app.dm.objectPresent(oid, tid):
app = self.app p = protocol.answerObjectPresent(oid, tid)
if app.dm.objectPresent(oid, tid):
p = protocol.answerObjectPresent(oid, tid)
else:
p = protocol.oidNotFound(
'%s:%s do not exist' % (dump(oid), dump(tid)))
conn.answer(p, packet)
else: else:
self.handleUnexpectedPacket(conn, packet) p = protocol.oidNotFound(
'%s:%s do not exist' % (dump(oid), dump(tid)))
conn.answer(p, packet)
@client_connection_required
def handleDeleteTransaction(self, conn, packet, tid): def handleDeleteTransaction(self, conn, packet, tid):
if not conn.isServerConnection(): self.app.dm.deleteTransaction(tid, all = True)
app = self.app
app.dm.deleteTransaction(tid, all = True)
else:
self.handleUnexpectedPacket(conn, packet)
@client_connection_required
def handleCommitTransaction(self, conn, packet, tid): def handleCommitTransaction(self, conn, packet, tid):
if not conn.isServerConnection(): self.app.dm.finishTransaction(tid)
app = self.app
app.dm.finishTransaction(tid)
else:
self.handleUnexpectedPacket(conn, packet)
def handleLockInformation(self, conn, packet, tid): def handleLockInformation(self, conn, packet, tid):
pass pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment