Commit 37c75a72 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Remove 'packet' parameter of handler's methods.

Almost any use of packet was to retreive the msg_id of the request and pass it to answer().
In storage's replicator, critical TIDs are indexed with UUID instead of remote connection ID (full replicator review required).
In admin node, expected answers are still queued with the msg_id but it seems useless as no concurrent queries should happen.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1569 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 91ff3fef
...@@ -133,8 +133,6 @@ RC - Review output of pylint (CODE) ...@@ -133,8 +133,6 @@ RC - Review output of pylint (CODE)
an incoming packet that trigger the poll() system call. an incoming packet that trigger the poll() system call.
- Allow daemonize NEO processes, re-use code from TIDStorage and support - Allow daemonize NEO processes, re-use code from TIDStorage and support
start/stop/restart/status commands. start/stop/restart/status commands.
- Remove 'packet' parameter from handler methods. Set the last_received_id
attribute on the connection and reload it from answer() method.
- Consider don't close the connection after sending a packet but wait (a - Consider don't close the connection after sending a packet but wait (a
bit) for the closure from the remote peer. bit) for the closure from the remote peer.
- Rename packets: - Rename packets:
......
...@@ -141,7 +141,7 @@ class Application(object): ...@@ -141,7 +141,7 @@ class Application(object):
self.master_conn.ask(Packets.AskNodeInformation()) self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable([])) self.master_conn.ask(Packets.AskPartitionTable([]))
def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id): def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt # we have a pt
self.pt.log() self.pt.log()
row_list = [] row_list = []
...@@ -164,4 +164,4 @@ class Application(object): ...@@ -164,4 +164,4 @@ class Application(object):
conn.notify(p) conn.notify(p)
return return
p = Packets.AnswerPartitionList(self.ptid, row_list) p = Packets.AnswerPartitionList(self.ptid, row_list)
conn.answer(p, msg_id) conn.answer(p)
...@@ -26,7 +26,7 @@ from neo.util import dump ...@@ -26,7 +26,7 @@ from neo.util import dump
class AdminEventHandler(EventHandler): class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster.""" """This class deals with events for administrating cluster."""
def askPartitionList(self, conn, packet, min_offset, max_offset, uuid): def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s" % logging.info("ask partition list from %s to %s for %s" %
(min_offset, max_offset, dump(uuid))) (min_offset, max_offset, dump(uuid)))
app = self.app app = self.app
...@@ -41,22 +41,21 @@ class AdminEventHandler(EventHandler): ...@@ -41,22 +41,21 @@ class AdminEventHandler(EventHandler):
{'min_offset' : min_offset, {'min_offset' : min_offset,
'max_offset' : max_offset, 'max_offset' : max_offset,
'uuid' : uuid, 'uuid' : uuid,
'msg_id' : packet.getId()}) 'msg_id' : conn.getPeerId()})
else: else:
app.sendPartitionTable(conn, min_offset, max_offset, uuid, app.sendPartitionTable(conn, min_offset, max_offset, uuid)
packet.getId())
def askNodeList(self, conn, packet, node_type): def askNodeList(self, conn, node_type):
logging.info("ask node list for %s" %(node_type)) logging.info("ask node list for %s" %(node_type))
def node_filter(n): def node_filter(n):
return n.getType() is node_type return n.getType() is node_type
node_list = self.app.nm.getList(node_filter) node_list = self.app.nm.getList(node_filter)
node_information_list = [node.asTuple() for node in node_list ] node_information_list = [node.asTuple() for node in node_list ]
p = Packets.AnswerNodeList(node_information_list) p = Packets.AnswerNodeList(node_information_list)
conn.answer(p, packet.getId()) conn.answer(p)
def setNodeState(self, conn, packet, uuid, state, modify_partition_table): def setNodeState(self, conn, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s" %(dump(uuid), state)) logging.info("set node state for %s-%s" %(dump(uuid), state))
node = self.app.nm.getByUUID(uuid) node = self.app.nm.getByUUID(uuid)
if node is None: if node is None:
...@@ -64,32 +63,32 @@ class AdminEventHandler(EventHandler): ...@@ -64,32 +63,32 @@ class AdminEventHandler(EventHandler):
if node.getState() == state and modify_partition_table is False: if node.getState() == state and modify_partition_table is False:
# no change # no change
p = protocol.ack('no change') p = protocol.ack('no change')
conn.answer(p, packet.getId()) conn.answer(p)
return return
# forward to primary master node # forward to primary master node
if self.app.master_conn is None: if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.') raise protocol.NotReadyError('Not connected to a primary master.')
p = Packets.SetNodeState(uuid, state, modify_partition_table) p = Packets.SetNodeState(uuid, state, modify_partition_table)
msg_id = self.app.master_conn.ask(p) msg_id = self.app.master_conn.ask(p)
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
def setClusterState(self, conn, packet, state): def setClusterState(self, conn, state):
# forward to primary # forward to primary
if self.app.master_conn is None: if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.') raise protocol.NotReadyError('Not connected to a primary master.')
p = Packets.SetClusterState(state) p = Packets.SetClusterState(state)
msg_id = self.app.master_conn.ask(p) msg_id = self.app.master_conn.ask(p)
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
def addPendingNodes(self, conn, packet, uuid_list): def addPendingNodes(self, conn, uuid_list):
if self.app.master_conn is None: if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.') raise protocol.NotReadyError('Not connected to a primary master.')
logging.info('Add nodes %s' % [dump(uuid) for uuid in uuid_list]) logging.info('Add nodes %s' % [dump(uuid) for uuid in uuid_list])
# forward the request to primary # forward the request to primary
msg_id = self.app.master_conn.ask(Packets.AddPendingNodes(uuid_list)) msg_id = self.app.master_conn.ask(Packets.AddPendingNodes(uuid_list))
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
def askClusterState(self, conn, packet): def askClusterState(self, conn):
if self.app.cluster_state is None: if self.app.cluster_state is None:
if self.app.master_conn is None: if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary ' \ raise protocol.NotReadyError('Not connected to a primary ' \
...@@ -97,17 +96,15 @@ class AdminEventHandler(EventHandler): ...@@ -97,17 +96,15 @@ class AdminEventHandler(EventHandler):
# required it from PMN first # required it from PMN first
msg_id = self.app.master_conn.ask(Packets.AskClusterState()) msg_id = self.app.master_conn.ask(Packets.AskClusterState())
self.app.dispatcher.register(msg_id, conn, self.app.dispatcher.register(msg_id, conn,
{'msg_id' : packet.getId()}) {'msg_id' : conn.getPeerId()})
else: else:
conn.answer(Packets.AnswerClusterState(self.app.cluster_state), conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
packet.getId())
def askPrimary(self, conn, packet): def askPrimary(self, conn):
if self.app.master_conn is None: if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.') raise protocol.NotReadyError('Not connected to a primary master.')
master_node = self.app.master_node master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID(), []), conn.answer(Packets.AnswerPrimary(master_node.getUUID(), []))
packet.getId())
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
""" This class is just used to dispacth message to right handler""" """ This class is just used to dispacth message to right handler"""
...@@ -141,17 +138,17 @@ class MasterEventHandler(EventHandler): ...@@ -141,17 +138,17 @@ class MasterEventHandler(EventHandler):
# unexpectexd answers and notifications # unexpectexd answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet) super(MasterEventHandler, self).dispatch(conn, packet)
def answerNodeInformation(self, conn, packet): def answerNodeInformation(self, conn):
# XXX: This will no more exists when the initialization module will be # XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap) # implemented for factorize code (as done for bootstrap)
logging.debug("answerNodeInformation") logging.debug("answerNodeInformation")
def answerPartitionTable(self, conn, packet, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
# XXX: This will no more exists when the initialization module will be # XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap) # implemented for factorize code (as done for bootstrap)
logging.debug("answerPartitionTable") logging.debug("answerPartitionTable")
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
app = self.app app = self.app
if ptid < app.ptid: if ptid < app.ptid:
# Ignore this packet. # Ignore this packet.
...@@ -159,7 +156,7 @@ class MasterEventHandler(EventHandler): ...@@ -159,7 +156,7 @@ class MasterEventHandler(EventHandler):
app.ptid = ptid app.ptid = ptid
app.pt.update(ptid, cell_list, app.nm) app.pt.update(ptid, cell_list, app.nm)
def sendPartitionTable(self, conn, packet, ptid, row_list): def sendPartitionTable(self, conn, ptid, row_list):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
nm = app.nm nm = app.nm
...@@ -176,10 +173,10 @@ class MasterEventHandler(EventHandler): ...@@ -176,10 +173,10 @@ class MasterEventHandler(EventHandler):
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
pt.log() pt.log()
def notifyClusterInformation(self, conn, packet, cluster_state): def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state self.app.cluster_state = cluster_state
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
app = self.app app = self.app
app.nm.update(node_list) app.nm.update(node_list)
if not app.pt.filled(): if not app.pt.filled():
...@@ -191,34 +188,33 @@ class MasterEventHandler(EventHandler): ...@@ -191,34 +188,33 @@ class MasterEventHandler(EventHandler):
class MasterRequestEventHandler(EventHandler): class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node""" """ This class handle all answer from primary master node"""
def __answerNeoCTL(self, msg_id, packet): def __answerNeoCTL(self, conn, packet):
msg_id = conn.getPeerId()
client_conn, kw = self.app.dispatcher.pop(msg_id) client_conn, kw = self.app.dispatcher.pop(msg_id)
client_conn.answer(packet, kw['msg_id']) client_conn.answer(packet)
def answerClusterState(self, conn, packet, state): def answerClusterState(self, conn, state):
logging.info("answerClusterState for a conn") logging.info("answerClusterState for a conn")
self.app.cluster_state = state self.app.cluster_state = state
self.__answerNeoCTL(packet.getId(), self.__answerNeoCTL(conn, Packets.AnswerClusterState(state))
Packets.AnswerClusterState(state))
def answerNewNodes(self, conn, packet, uuid_list): def answerNewNodes(self, conn, uuid_list):
logging.info("answerNewNodes for a conn") logging.info("answerNewNodes for a conn")
self.__answerNeoCTL(packet.getId(), self.__answerNeoCTL(conn, Packets.AnswerNewNodes(uuid_list))
Packets.AnswerNewNodes(uuid_list))
def answerPartitionTable(self, conn, packet, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
logging.info("answerPartitionTable for a conn") logging.info("answerPartitionTable for a conn")
client_conn, kw = self.app.dispatcher.pop(packet.getId()) client_conn, kw = self.app.dispatcher.pop(conn.getPeerId())
# sent client the partition table # sent client the partition table
self.app.sendPartitionTable(client_conn, **kw) self.app.sendPartitionTable(client_conn)
def answerNodeState(self, conn, packet, uuid, state): def answerNodeState(self, conn, uuid, state):
self.__answerNeoCTL(packet.getId(), self.__answerNeoCTL(conn,
Packets.AnswerNodeState(uuid, state)) Packets.AnswerNodeState(uuid, state))
def ack(self, conn, packet, msg): def ack(self, conn, msg):
self.__answerNeoCTL(packet.getId(), protocol.ack(msg)) self.__answerNeoCTL(conn, protocol.ack(msg))
def protocolError(self, conn, packet, msg): def protocolError(self, conn, msg):
self.__answerNeoCTL(packet.getId(), protocol.protocolError(msg)) self.__answerNeoCTL(conn, protocol.protocolError(msg))
...@@ -69,7 +69,7 @@ class BootstrapManager(EventHandler): ...@@ -69,7 +69,7 @@ class BootstrapManager(EventHandler):
""" """
self.current = None self.current = None
def notReady(self, conn, packet, message): def notReady(self, conn, message):
""" """
The primary master send this message when it is still not ready to The primary master send this message when it is still not ready to
handle the client node. handle the client node.
...@@ -79,7 +79,7 @@ class BootstrapManager(EventHandler): ...@@ -79,7 +79,7 @@ class BootstrapManager(EventHandler):
self.current = None self.current = None
conn.close() conn.close()
def answerPrimary(self, conn, packet, primary_uuid, known_master_list): def answerPrimary(self, conn, primary_uuid, known_master_list):
""" """
A master answer who's the primary. If it's another node, connect to it. A master answer who's the primary. If it's another node, connect to it.
If it's itself then the primary is successfully found, ask If it's itself then the primary is successfully found, ask
...@@ -108,7 +108,7 @@ class BootstrapManager(EventHandler): ...@@ -108,7 +108,7 @@ class BootstrapManager(EventHandler):
conn.ask(Packets.RequestIdentification(self.node_type, conn.ask(Packets.RequestIdentification(self.node_type,
self.uuid, self.server, self.name)) self.uuid, self.server, self.name))
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
""" """
The primary master has accepted the node. The primary master has accepted the node.
......
...@@ -25,12 +25,12 @@ from neo.util import dump ...@@ -25,12 +25,12 @@ from neo.util import dump
class PrimaryBootstrapHandler(AnswerBaseHandler): class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """ """ Bootstrap handler used when looking for the primary master """
def notReady(self, conn, packet, message): def notReady(self, conn, message):
app = self.app app = self.app
app.trying_master_node = None app.trying_master_node = None
app.setNodeNotReady() app.setNodeNotReady()
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
app = self.app app = self.app
node = app.nm.getByAddress(conn.getAddress()) node = app.nm.getByAddress(conn.getAddress())
...@@ -49,7 +49,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -49,7 +49,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
# Always create partition table # Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas) app.pt = PartitionTable(num_partitions, num_replicas)
def answerPrimary(self, conn, packet, primary_uuid, def answerPrimary(self, conn, primary_uuid,
known_master_list): known_master_list):
app = self.app app = self.app
# Register new master nodes. # Register new master nodes.
...@@ -81,10 +81,10 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -81,10 +81,10 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
def answerPartitionTable(self, conn, packet, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
pass pass
def answerNodeInformation(self, conn, packet): def answerNodeInformation(self, conn):
pass pass
class PrimaryNotificationsHandler(BaseHandler): class PrimaryNotificationsHandler(BaseHandler):
...@@ -117,10 +117,10 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -117,10 +117,10 @@ class PrimaryNotificationsHandler(BaseHandler):
logging.critical("primary master node is broken") logging.critical("primary master node is broken")
BaseHandler.peerBroken(self, conn) BaseHandler.peerBroken(self, conn)
def stopOperation(self, conn, packet): def stopOperation(self, conn):
logging.critical("master node ask to stop operation") logging.critical("master node ask to stop operation")
def invalidateObjects(self, conn, packet, oid_list, tid): def invalidateObjects(self, conn, oid_list, tid):
app = self.app app = self.app
app._cache_lock_acquire() app._cache_lock_acquire()
try: try:
...@@ -142,15 +142,15 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -142,15 +142,15 @@ class PrimaryNotificationsHandler(BaseHandler):
# to avoid a dead lock. It is safe to not check the master connection # to avoid a dead lock. It is safe to not check the master connection
# because it's in the master handler, so the connection is already # because it's in the master handler, so the connection is already
# established. # established.
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
pt = self.app.pt pt = self.app.pt
if pt.filled(): if pt.filled():
pt.update(ptid, cell_list, self.app.nm) pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, packet, ptid, row_list): def sendPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm) self.app.pt.load(ptid, row_list, self.app.nm)
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
app = self.app app = self.app
self.app.nm.update(node_list) self.app.nm.update(node_list)
for node_type, addr, uuid, state in node_list: for node_type, addr, uuid, state in node_list:
...@@ -169,16 +169,16 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -169,16 +169,16 @@ class PrimaryNotificationsHandler(BaseHandler):
class PrimaryAnswersHandler(AnswerBaseHandler): class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """ """ Handle that process expected packets from the primary master """
def answerBeginTransaction(self, conn, packet, tid): def answerBeginTransaction(self, conn, tid):
app = self.app app = self.app
app.setTID(tid) app.setTID(tid)
def answerNewOIDs(self, conn, packet, oid_list): def answerNewOIDs(self, conn, oid_list):
app = self.app app = self.app
app.new_oid_list = oid_list app.new_oid_list = oid_list
app.new_oid_list.reverse() app.new_oid_list.reverse()
def answerTransactionFinished(self, conn, packet, tid): def answerTransactionFinished(self, conn, tid):
app = self.app app = self.app
if tid == app.getTID(): if tid == app.getTID():
app.setTransactionFinished() app.setTransactionFinished()
......
...@@ -48,11 +48,11 @@ class StorageEventHandler(BaseHandler): ...@@ -48,11 +48,11 @@ class StorageEventHandler(BaseHandler):
class StorageBootstrapHandler(AnswerBaseHandler): class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """ """ Handler used when connecting to a storage node """
def notReady(self, conn, packet, message): def notReady(self, conn, message):
app = self.app app = self.app
app.setNodeNotReady() app.setNodeNotReady()
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
app = self.app app = self.app
node = app.nm.getByAddress(conn.getAddress()) node = app.nm.getByAddress(conn.getAddress())
...@@ -67,24 +67,24 @@ class StorageBootstrapHandler(AnswerBaseHandler): ...@@ -67,24 +67,24 @@ class StorageBootstrapHandler(AnswerBaseHandler):
class StorageAnswersHandler(AnswerBaseHandler): class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """ """ Handle all messages related to ZODB operations """
def answerObject(self, conn, packet, oid, start_serial, end_serial, def answerObject(self, conn, oid, start_serial, end_serial,
compression, checksum, data): compression, checksum, data):
app = self.app app = self.app
app.local_var.asked_object = (oid, start_serial, end_serial, app.local_var.asked_object = (oid, start_serial, end_serial,
compression, checksum, data) compression, checksum, data)
def answerStoreObject(self, conn, packet, conflicting, oid, serial): def answerStoreObject(self, conn, conflicting, oid, serial):
app = self.app app = self.app
if conflicting: if conflicting:
app.local_var.object_stored = -1, serial app.local_var.object_stored = -1, serial
else: else:
app.local_var.object_stored = oid, serial app.local_var.object_stored = oid, serial
def answerStoreTransaction(self, conn, packet, tid): def answerStoreTransaction(self, conn, tid):
app = self.app app = self.app
app.setTransactionVoted() app.setTransactionVoted()
def answerTransactionInformation(self, conn, packet, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, oid_list):
app = self.app app = self.app
# transaction information are returned as a dict # transaction information are returned as a dict
...@@ -96,12 +96,12 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -96,12 +96,12 @@ class StorageAnswersHandler(AnswerBaseHandler):
info['oids'] = oid_list info['oids'] = oid_list
app.local_var.txn_info = info app.local_var.txn_info = info
def answerObjectHistory(self, conn, packet, oid, history_list): def answerObjectHistory(self, conn, oid, history_list):
app = self.app app = self.app
# history_list is a list of tuple (serial, size) # history_list is a list of tuple (serial, size)
app.local_var.history = oid, history_list app.local_var.history = oid, history_list
def oidNotFound(self, conn, packet, message): def oidNotFound(self, conn, message):
app = self.app app = self.app
# This can happen either when : # This can happen either when :
# - loading an object # - loading an object
...@@ -109,12 +109,12 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -109,12 +109,12 @@ class StorageAnswersHandler(AnswerBaseHandler):
app.local_var.asked_object = -1 app.local_var.asked_object = -1
app.local_var.history = -1 app.local_var.history = -1
def tidNotFound(self, conn, packet, message): def tidNotFound(self, conn, message):
app = self.app app = self.app
# This can happen when requiring txn informations # This can happen when requiring txn informations
app.local_var.txn_info = -1 app.local_var.txn_info = -1
def answerTIDs(self, conn, packet, tid_list): def answerTIDs(self, conn, tid_list):
app = self.app app = self.app
app.local_var.node_tids[conn.getUUID()] = tid_list app.local_var.node_tids[conn.getUUID()] = tid_list
This diff is collapsed.
...@@ -44,208 +44,208 @@ class PacketLogger(EventHandler): ...@@ -44,208 +44,208 @@ class PacketLogger(EventHandler):
except PacketMalformedError: except PacketMalformedError:
logging.warning("Can't decode packet for logging") logging.warning("Can't decode packet for logging")
return return
log_message = logger(conn, packet, *args) log_message = logger(conn, *args)
if log_message is not None: if log_message is not None:
logging.debug('#0x%08x %s', packet.getId(), log_message) logging.debug('#0x%08x %s', packet.getId(), log_message)
# Packet loggers # Packet loggers
def error(self, conn, packet, code, message): def error(self, conn, code, message):
return "%s (%s)" % (code, message) return "%s (%s)" % (code, message)
def requestIdentification(self, conn, packet, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
logging.debug('Request identification for cluster %s' % (name, )) logging.debug('Request identification for cluster %s' % (name, ))
pass pass
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
pass pass
def askPrimary(self, conn, packet): def askPrimary(self, conn):
pass pass
def answerPrimary(self, conn, packet, primary_uuid, def answerPrimary(self, conn, primary_uuid,
known_master_list): known_master_list):
pass pass
def announcePrimary(self, conn, packet): def announcePrimary(self, conn):
pass pass
def reelectPrimary(self, conn, packet): def reelectPrimary(self, conn):
pass pass
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
pass pass
def askLastIDs(self, conn, packet): def askLastIDs(self, conn):
pass pass
def answerLastIDs(self, conn, packet, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
pass pass
def askPartitionTable(self, conn, packet, offset_list): def askPartitionTable(self, conn, offset_list):
pass pass
def answerPartitionTable(self, conn, packet, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
pass pass
def sendPartitionTable(self, conn, packet, ptid, row_list): def sendPartitionTable(self, conn, ptid, row_list):
pass pass
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
pass pass
def startOperation(self, conn, packet): def startOperation(self, conn):
pass pass
def stopOperation(self, conn, packet): def stopOperation(self, conn):
pass pass
def askUnfinishedTransactions(self, conn, packet): def askUnfinishedTransactions(self, conn):
pass pass
def answerUnfinishedTransactions(self, conn, packet, tid_list): def answerUnfinishedTransactions(self, conn, tid_list):
pass pass
def askObjectPresent(self, conn, packet, oid, tid): def askObjectPresent(self, conn, oid, tid):
pass pass
def answerObjectPresent(self, conn, packet, oid, tid): def answerObjectPresent(self, conn, oid, tid):
pass pass
def deleteTransaction(self, conn, packet, tid): def deleteTransaction(self, conn, tid):
pass pass
def commitTransaction(self, conn, packet, tid): def commitTransaction(self, conn, tid):
pass pass
def askBeginTransaction(self, conn, packet, tid): def askBeginTransaction(self, conn, tid):
pass pass
def answerBeginTransaction(self, conn, packet, tid): def answerBeginTransaction(self, conn, tid):
pass pass
def askNewOIDs(self, conn, packet, num_oids): def askNewOIDs(self, conn, num_oids):
pass pass
def answerNewOIDs(self, conn, packet, num_oids): def answerNewOIDs(self, conn, num_oids):
pass pass
def finishTransaction(self, conn, packet, oid_list, tid): def finishTransaction(self, conn, oid_list, tid):
pass pass
def answerTransactionFinished(self, conn, packet, tid): def answerTransactionFinished(self, conn, tid):
pass pass
def lockInformation(self, conn, packet, tid): def lockInformation(self, conn, tid):
pass pass
def notifyInformationLocked(self, conn, packet, tid): def notifyInformationLocked(self, conn, tid):
pass pass
def invalidateObjects(self, conn, packet, oid_list, tid): def invalidateObjects(self, conn, oid_list, tid):
pass pass
def notifyUnlockInformation(self, conn, packet, tid): def notifyUnlockInformation(self, conn, tid):
pass pass
def askStoreObject(self, conn, packet, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
pass pass
def answerStoreObject(self, conn, packet, conflicting, oid, serial): def answerStoreObject(self, conn, conflicting, oid, serial):
pass pass
def abortTransaction(self, conn, packet, tid): def abortTransaction(self, conn, tid):
pass pass
def askStoreTransaction(self, conn, packet, tid, user, desc, def askStoreTransaction(self, conn, tid, user, desc,
ext, oid_list): ext, oid_list):
pass pass
def answerStoreTransaction(self, conn, packet, tid): def answerStoreTransaction(self, conn, tid):
pass pass
def askObject(self, conn, packet, oid, serial, tid): def askObject(self, conn, oid, serial, tid):
pass pass
def answerObject(self, conn, packet, oid, serial_start, def answerObject(self, conn, oid, serial_start,
serial_end, compression, checksum, data): serial_end, compression, checksum, data):
pass pass
def askTIDs(self, conn, packet, first, last, partition): def askTIDs(self, conn, first, last, partition):
pass pass
def answerTIDs(self, conn, packet, tid_list): def answerTIDs(self, conn, tid_list):
pass pass
def askTransactionInformation(self, conn, packet, tid): def askTransactionInformation(self, conn, tid):
pass pass
def answerTransactionInformation(self, conn, packet, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, oid_list):
pass pass
def askObjectHistory(self, conn, packet, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
pass pass
def answerObjectHistory(self, conn, packet, oid, history_list): def answerObjectHistory(self, conn, oid, history_list):
pass pass
def askOIDs(self, conn, packet, first, last, partition): def askOIDs(self, conn, first, last, partition):
pass pass
def answerOIDs(self, conn, packet, oid_list): def answerOIDs(self, conn, oid_list):
pass pass
def askPartitionList(self, conn, packet, min_offset, max_offset, uuid): def askPartitionList(self, conn, min_offset, max_offset, uuid):
pass pass
def answerPartitionList(self, conn, packet, ptid, row_list): def answerPartitionList(self, conn, ptid, row_list):
pass pass
def askNodeList(self, conn, packet, offset_list): def askNodeList(self, conn, offset_list):
pass pass
def answerNodeList(self, conn, packet, node_list): def answerNodeList(self, conn, node_list):
pass pass
def setNodeState(self, conn, packet, uuid, state, modify_partition_table): def setNodeState(self, conn, uuid, state, modify_partition_table):
pass pass
def answerNodeState(self, conn, packet, uuid, state): def answerNodeState(self, conn, uuid, state):
pass pass
def addPendingNodes(self, conn, packet, uuid_list): def addPendingNodes(self, conn, uuid_list):
pass pass
def answerNewNodes(self, conn, packet, uuid_list): def answerNewNodes(self, conn, uuid_list):
pass pass
def askNodeInformation(self, conn, packet): def askNodeInformation(self, conn):
pass pass
def answerNodeInformation(self, conn, packet): def answerNodeInformation(self, conn):
pass pass
def askClusterState(self, conn, packet): def askClusterState(self, conn):
pass pass
def answerClusterState(self, conn, packet, state): def answerClusterState(self, conn, state):
pass pass
def setClusterState(self, conn, packet, state): def setClusterState(self, conn, state):
pass pass
def notifyClusterInformation(self, conn, packet, state): def notifyClusterInformation(self, conn, state):
pass pass
def notifyLastOID(self, conn, packet, oid): def notifyLastOID(self, conn, oid):
pass pass
def notifyReplicationDone(self, conn, packet, offset): def notifyReplicationDone(self, conn, offset):
pass pass
......
...@@ -23,10 +23,10 @@ from neo.protocol import NodeTypes, NodeStates, Packets ...@@ -23,10 +23,10 @@ from neo.protocol import NodeTypes, NodeStates, Packets
class MasterHandler(EventHandler): class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
def protocolError(self, conn, packet, message): def protocolError(self, conn, message):
logging.error('Protocol error %s %s' % (message, conn.getAddress())) logging.error('Protocol error %s %s' % (message, conn.getAddress()))
def askPrimary(self, conn, packet): def askPrimary(self, conn):
if conn.getConnector() is None: if conn.getConnector() is None:
# Connection can be closed by peer after he sent AskPrimary # Connection can be closed by peer after he sent AskPrimary
# if he finds the primary master before we answer him. # if he finds the primary master before we answer him.
...@@ -50,24 +50,22 @@ class MasterHandler(EventHandler): ...@@ -50,24 +50,22 @@ class MasterHandler(EventHandler):
conn.answer(Packets.AnswerPrimary( conn.answer(Packets.AnswerPrimary(
primary_uuid, primary_uuid,
known_master_list), known_master_list),
packet.getId(),
) )
def askClusterState(self, conn, packet): def askClusterState(self, conn):
assert conn.getUUID() is not None assert conn.getUUID() is not None
state = self.app.getClusterState() state = self.app.getClusterState()
conn.answer(Packets.AnswerClusterState(state), packet.getId()) conn.answer(Packets.AnswerClusterState(state))
def askNodeInformation(self, conn, packet): def askNodeInformation(self, conn):
self.app.sendNodesInformations(conn) self.app.sendNodesInformations(conn)
conn.answer(Packets.AnswerNodeInformation(), packet.getId()) conn.answer(Packets.AnswerNodeInformation())
def askPartitionTable(self, conn, packet, offset_list): def askPartitionTable(self, conn, offset_list):
assert len(offset_list) == 0 assert len(offset_list) == 0
app = self.app app = self.app
app.sendPartitionTable(conn) app.sendPartitionTable(conn)
conn.answer(Packets.AnswerPartitionTable(app.pt.getID(), []), conn.answer(Packets.AnswerPartitionTable(app.pt.getID(), []))
packet.getId())
DISCONNECTED_STATE_DICT = { DISCONNECTED_STATE_DICT = {
......
...@@ -29,20 +29,20 @@ class AdministrationHandler(MasterHandler): ...@@ -29,20 +29,20 @@ class AdministrationHandler(MasterHandler):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
self.app.nm.remove(node) self.app.nm.remove(node)
def askPrimary(self, conn, packet): def askPrimary(self, conn):
app = self.app app = self.app
# I'm the primary # I'm the primary
conn.answer(Packets.AnswerPrimary(app.uuid, []), packet.getId()) conn.answer(Packets.AnswerPrimary(app.uuid, []))
def setClusterState(self, conn, packet, state): def setClusterState(self, conn, state):
self.app.changeClusterState(state) self.app.changeClusterState(state)
p = protocol.ack('cluster state changed') p = protocol.ack('cluster state changed')
conn.answer(p, packet.getId()) conn.answer(p)
if state == ClusterStates.STOPPING: if state == ClusterStates.STOPPING:
self.app.cluster_state = state self.app.cluster_state = state
self.app.shutdown() self.app.shutdown()
def setNodeState(self, conn, packet, uuid, state, modify_partition_table): def setNodeState(self, conn, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s : %s" % logging.info("set node state for %s-%s : %s" %
(dump(uuid), state, modify_partition_table)) (dump(uuid), state, modify_partition_table))
app = self.app app = self.app
...@@ -55,13 +55,13 @@ class AdministrationHandler(MasterHandler): ...@@ -55,13 +55,13 @@ class AdministrationHandler(MasterHandler):
# get message for self # get message for self
if state != NodeStates.RUNNING: if state != NodeStates.RUNNING:
p = protocol.ack('node state changed') p = protocol.ack('node state changed')
conn.answer(p, packet.getId()) conn.answer(p)
app.shutdown() app.shutdown()
if node.getState() == state: if node.getState() == state:
# no change, just notify admin node # no change, just notify admin node
p = protocol.ack('node state changed') p = protocol.ack('node state changed')
conn.answer(p, packet.getId()) conn.answer(p)
return return
if state == NodeStates.RUNNING: if state == NodeStates.RUNNING:
...@@ -88,10 +88,10 @@ class AdministrationHandler(MasterHandler): ...@@ -88,10 +88,10 @@ class AdministrationHandler(MasterHandler):
# /!\ send the node information *after* the partition table change # /!\ send the node information *after* the partition table change
node.setState(state) node.setState(state)
p = protocol.ack('state changed') p = protocol.ack('state changed')
conn.answer(p, packet.getId()) conn.answer(p)
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
def addPendingNodes(self, conn, packet, uuid_list): def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list]) uuids = ', '.join([dump(uuid) for uuid in uuid_list])
logging.debug('Add nodes %s' % uuids) logging.debug('Add nodes %s' % uuids)
app, nm, em, pt = self.app, self.app.nm, self.app.em, self.app.pt app, nm, em, pt = self.app, self.app.nm, self.app.em, self.app.pt
...@@ -108,7 +108,7 @@ class AdministrationHandler(MasterHandler): ...@@ -108,7 +108,7 @@ class AdministrationHandler(MasterHandler):
if not uuid_set: if not uuid_set:
logging.warning('No nodes added') logging.warning('No nodes added')
p = protocol.ack('no nodes added') p = protocol.ack('no nodes added')
conn.answer(p, packet.getId()) conn.answer(p)
return return
uuids = ', '.join([dump(uuid) for uuid in uuid_set]) uuids = ', '.join([dump(uuid) for uuid in uuid_set])
logging.info('Adding nodes %s' % uuids) logging.info('Adding nodes %s' % uuids)
...@@ -127,4 +127,4 @@ class AdministrationHandler(MasterHandler): ...@@ -127,4 +127,4 @@ class AdministrationHandler(MasterHandler):
# broadcast the new partition table # broadcast the new partition table
app.broadcastPartitionChanges(cell_list) app.broadcastPartitionChanges(cell_list)
p = protocol.ack('node added') p = protocol.ack('node added')
conn.answer(p, packet.getId()) conn.answer(p)
...@@ -33,22 +33,22 @@ class ClientServiceHandler(BaseServiceHandler): ...@@ -33,22 +33,22 @@ class ClientServiceHandler(BaseServiceHandler):
self.app.tm.abortFor(node) self.app.tm.abortFor(node)
self.app.nm.remove(node) self.app.nm.remove(node)
def abortTransaction(self, conn, packet, tid): def abortTransaction(self, conn, tid):
if tid in self.app.tm: if tid in self.app.tm:
self.app.tm.remove(tid) self.app.tm.remove(tid)
else: else:
logging.warn('aborting transaction %s does not exist', dump(tid)) logging.warn('aborting transaction %s does not exist', dump(tid))
def askBeginTransaction(self, conn, packet, tid): def askBeginTransaction(self, conn, tid):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
tid = self.app.tm.begin(node, tid) tid = self.app.tm.begin(node, tid)
conn.answer(Packets.AnswerBeginTransaction(tid), packet.getId()) conn.answer(Packets.AnswerBeginTransaction(tid))
def askNewOIDs(self, conn, packet, num_oids): def askNewOIDs(self, conn, num_oids):
oid_list = self.app.getNewOIDList(num_oids) oid_list = self.app.getNewOIDList(num_oids)
conn.answer(Packets.AnswerNewOIDs(oid_list), packet.getId()) conn.answer(Packets.AnswerNewOIDs(oid_list))
def finishTransaction(self, conn, packet, oid_list, tid): def finishTransaction(self, conn, oid_list, tid):
app = self.app app = self.app
# If the given transaction ID is later than the last TID, the peer # If the given transaction ID is later than the last TID, the peer
# is crazy. # is crazy.
...@@ -76,5 +76,5 @@ class ClientServiceHandler(BaseServiceHandler): ...@@ -76,5 +76,5 @@ class ClientServiceHandler(BaseServiceHandler):
c.ask(Packets.LockInformation(tid), timeout=60) c.ask(Packets.LockInformation(tid), timeout=60)
used_uuid_set.add(c.getUUID()) used_uuid_set.add(c.getUUID())
app.tm.prepare(tid, oid_list, used_uuid_set, packet.getId()) app.tm.prepare(tid, oid_list, used_uuid_set, conn.getPeerId())
...@@ -25,7 +25,7 @@ from neo.exception import ElectionFailure ...@@ -25,7 +25,7 @@ from neo.exception import ElectionFailure
class ElectionHandler(MasterHandler): class ElectionHandler(MasterHandler):
"""This class deals with events for a primary master election.""" """This class deals with events for a primary master election."""
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None: if uuid is None:
raise protocol.ProtocolError('Not identified') raise protocol.ProtocolError('Not identified')
...@@ -66,7 +66,7 @@ class ClientElectionHandler(ElectionHandler): ...@@ -66,7 +66,7 @@ class ClientElectionHandler(ElectionHandler):
# FIXME: this packet is not allowed here, but handled in MasterHandler # FIXME: this packet is not allowed here, but handled in MasterHandler
# a global handler review is required. # a global handler review is required.
def askPrimary(self, conn, packet): def askPrimary(self, conn):
from neo.protocol import UnexpectedPacketError from neo.protocol import UnexpectedPacketError
raise UnexpectedPacketError, "askPrimary on server connection" raise UnexpectedPacketError, "askPrimary on server connection"
...@@ -124,7 +124,7 @@ class ClientElectionHandler(ElectionHandler): ...@@ -124,7 +124,7 @@ class ClientElectionHandler(ElectionHandler):
self.app.negotiating_master_node_set.discard(addr) self.app.negotiating_master_node_set.discard(addr)
MasterHandler.peerBroken(self, conn) MasterHandler.peerBroken(self, conn)
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
app = self.app app = self.app
node = app.nm.getByAddress(conn.getAddress()) node = app.nm.getByAddress(conn.getAddress())
...@@ -150,7 +150,7 @@ class ClientElectionHandler(ElectionHandler): ...@@ -150,7 +150,7 @@ class ClientElectionHandler(ElectionHandler):
app.negotiating_master_node_set.discard(conn.getAddress()) app.negotiating_master_node_set.discard(conn.getAddress())
def answerPrimary(self, conn, packet, primary_uuid, known_master_list): def answerPrimary(self, conn, primary_uuid, known_master_list):
if conn.getConnector() is None: if conn.getConnector() is None:
# Connection can be closed by peer after he sent # Connection can be closed by peer after he sent
# AnswerPrimary if he finds the primary master before we # AnswerPrimary if he finds the primary master before we
...@@ -207,7 +207,7 @@ class ClientElectionHandler(ElectionHandler): ...@@ -207,7 +207,7 @@ class ClientElectionHandler(ElectionHandler):
class ServerElectionHandler(ElectionHandler): class ServerElectionHandler(ElectionHandler):
def reelectPrimary(self, conn, packet): def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
def peerBroken(self, conn): def peerBroken(self, conn):
...@@ -218,7 +218,7 @@ class ServerElectionHandler(ElectionHandler): ...@@ -218,7 +218,7 @@ class ServerElectionHandler(ElectionHandler):
node.setBroken() node.setBroken()
MasterHandler.peerBroken(self, conn) MasterHandler.peerBroken(self, conn)
def requestIdentification(self, conn, packet, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
if conn.getConnector() is None: if conn.getConnector() is None:
# Connection can be closed by peer after he sent # Connection can be closed by peer after he sent
...@@ -256,9 +256,9 @@ class ServerElectionHandler(ElectionHandler): ...@@ -256,9 +256,9 @@ class ServerElectionHandler(ElectionHandler):
app.pt.getReplicas(), app.pt.getReplicas(),
uuid uuid
) )
conn.answer(p, packet.getId()) conn.answer(p)
def announcePrimary(self, conn, packet): def announcePrimary(self, conn):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None: if uuid is None:
raise protocol.ProtocolError('Not identified') raise protocol.ProtocolError('Not identified')
......
...@@ -27,7 +27,7 @@ class IdentificationHandler(MasterHandler): ...@@ -27,7 +27,7 @@ class IdentificationHandler(MasterHandler):
def nodeLost(self, conn, node): def nodeLost(self, conn, node):
logging.warning('lost a node in IdentificationHandler : %s' % node) logging.warning('lost a node in IdentificationHandler : %s' % node)
def requestIdentification(self, conn, packet, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
self.checkClusterName(name) self.checkClusterName(name)
...@@ -77,7 +77,7 @@ class IdentificationHandler(MasterHandler): ...@@ -77,7 +77,7 @@ class IdentificationHandler(MasterHandler):
# answer # answer
args = (NodeTypes.MASTER, app.uuid, app.pt.getPartitions(), args = (NodeTypes.MASTER, app.uuid, app.pt.getPartitions(),
app.pt.getReplicas(), uuid) app.pt.getReplicas(), uuid)
conn.answer(Packets.AcceptIdentification(*args), packet.getId()) conn.answer(Packets.AcceptIdentification(*args))
# trigger the event # trigger the event
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
......
...@@ -35,7 +35,7 @@ class RecoveryHandler(MasterHandler): ...@@ -35,7 +35,7 @@ class RecoveryHandler(MasterHandler):
# ask the last IDs to perform the recovery # ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs()) conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, packet, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
app = self.app app = self.app
pt = app.pt pt = app.pt
...@@ -48,7 +48,7 @@ class RecoveryHandler(MasterHandler): ...@@ -48,7 +48,7 @@ class RecoveryHandler(MasterHandler):
app.pt.setID(lptid) app.pt.setID(lptid)
conn.ask(Packets.AskPartitionTable([])) conn.ask(Packets.AskPartitionTable([]))
def answerPartitionTable(self, conn, packet, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
if uuid != app.target_uuid: if uuid != app.target_uuid:
......
...@@ -34,13 +34,13 @@ class SecondaryMasterHandler(MasterHandler): ...@@ -34,13 +34,13 @@ class SecondaryMasterHandler(MasterHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
pass pass
def announcePrimary(self, conn, packet): def announcePrimary(self, conn):
raise ElectionFailure, 'another primary arises' raise ElectionFailure, 'another primary arises'
def reelectPrimary(self, conn, packet): def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
logging.error('/!\ NotifyNodeInformation packet from secondary master') logging.error('/!\ NotifyNodeInformation packet from secondary master')
...@@ -58,10 +58,10 @@ class PrimaryHandler(MasterHandler): ...@@ -58,10 +58,10 @@ class PrimaryHandler(MasterHandler):
self.app.primary_master_node.setDown() self.app.primary_master_node.setDown()
raise PrimaryFailure, 'primary master is dead' raise PrimaryFailure, 'primary master is dead'
def reelectPrimary(self, conn, packet): def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
app = self.app app = self.app
for node_type, addr, uuid, state in node_list: for node_type, addr, uuid, state in node_list:
if node_type != NodeTypes.MASTER: if node_type != NodeTypes.MASTER:
...@@ -83,7 +83,7 @@ class PrimaryHandler(MasterHandler): ...@@ -83,7 +83,7 @@ class PrimaryHandler(MasterHandler):
if n.getUUID() is None: if n.getUUID() is None:
n.setUUID(uuid) n.setUUID(uuid)
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, uuid, num_partitions,
num_replicas, your_uuid): num_replicas, your_uuid):
app = self.app app = self.app
...@@ -97,8 +97,8 @@ class PrimaryHandler(MasterHandler): ...@@ -97,8 +97,8 @@ class PrimaryHandler(MasterHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
node.setUUID(uuid) node.setUUID(uuid)
def answerPrimary(self, conn, packet, primary_uuid, known_master_list): def answerPrimary(self, conn, primary_uuid, known_master_list):
pass pass
def notifyClusterInformation(self, conn, packet, state): def notifyClusterInformation(self, conn, state):
pass pass
...@@ -22,20 +22,20 @@ from neo.master.handlers import BaseServiceHandler ...@@ -22,20 +22,20 @@ from neo.master.handlers import BaseServiceHandler
class ShutdownHandler(BaseServiceHandler): class ShutdownHandler(BaseServiceHandler):
"""This class deals with events for a shutting down phase.""" """This class deals with events for a shutting down phase."""
def requestIdentification(self, conn, packet, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
logging.error('reject any new connection') logging.error('reject any new connection')
raise protocol.ProtocolError('cluster is shutting down') raise protocol.ProtocolError('cluster is shutting down')
def askPrimary(self, conn, packet): def askPrimary(self, conn):
logging.error('reject any new demand for primary master') logging.error('reject any new demand for primary master')
raise protocol.ProtocolError('cluster is shutting down') raise protocol.ProtocolError('cluster is shutting down')
def askBeginTransaction(self, conn, packet, tid): def askBeginTransaction(self, conn, tid):
logging.error('reject any new demand for new tid') logging.error('reject any new demand for new tid')
raise protocol.ProtocolError('cluster is shutting down') raise protocol.ProtocolError('cluster is shutting down')
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
# don't care about notifications since we are shutdowning # don't care about notifications since we are shutdowning
pass pass
...@@ -41,16 +41,16 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -41,16 +41,16 @@ class StorageServiceHandler(BaseServiceHandler):
# partition must not oudated to allows a cluster restart. # partition must not oudated to allows a cluster restart.
self.app.outdateAndBroadcastPartition() self.app.outdateAndBroadcastPartition()
def askLastIDs(self, conn, packet): def askLastIDs(self, conn):
app = self.app app = self.app
conn.answer(Packets.AnswerLastIDs(app.loid, app.tm.getLastTID(), conn.answer(Packets.AnswerLastIDs(app.loid, app.tm.getLastTID(),
app.pt.getID()), packet.getId()) app.pt.getID()))
def askUnfinishedTransactions(self, conn, packet): def askUnfinishedTransactions(self, conn):
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList()) p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
conn.answer(p, packet.getId()) conn.answer(p)
def notifyInformationLocked(self, conn, packet, tid): def notifyInformationLocked(self, conn, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
node = app.nm.getByUUID(uuid) node = app.nm.getByUUID(uuid)
...@@ -81,7 +81,7 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -81,7 +81,7 @@ class StorageServiceHandler(BaseServiceHandler):
if node.isClient(): if node.isClient():
if node is t.getNode(): if node is t.getNode():
p = Packets.AnswerTransactionFinished(tid) p = Packets.AnswerTransactionFinished(tid)
c.answer(p, t.getMessageId()) c.answer(p, msg_id=t.getMessageId())
else: else:
c.notify(Packets.InvalidateObjects(t.getOIDList(), tid)) c.notify(Packets.InvalidateObjects(t.getOIDList(), tid))
elif node.isStorage(): elif node.isStorage():
...@@ -91,7 +91,7 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -91,7 +91,7 @@ class StorageServiceHandler(BaseServiceHandler):
# remove transaction from manager # remove transaction from manager
self.app.tm.remove(tid) self.app.tm.remove(tid)
def notifyReplicationDone(self, conn, packet, offset): def notifyReplicationDone(self, conn, offset):
uuid = conn.getUUID() uuid = conn.getUUID()
node = self.app.nm.getByUUID(uuid) node = self.app.nm.getByUUID(uuid)
logging.debug("node %s is up for offset %s" % (dump(uuid), offset)) logging.debug("node %s is up for offset %s" % (dump(uuid), offset))
......
...@@ -31,7 +31,7 @@ class VerificationHandler(BaseServiceHandler): ...@@ -31,7 +31,7 @@ class VerificationHandler(BaseServiceHandler):
if not self.app.pt.operational(): if not self.app.pt.operational():
raise VerificationFailure, 'cannot continue verification' raise VerificationFailure, 'cannot continue verification'
def answerLastIDs(self, conn, packet, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
app = self.app app = self.app
# If I get a bigger value here, it is dangerous. # If I get a bigger value here, it is dangerous.
if app.loid < loid or ltid > app.tm.getLastTID() \ if app.loid < loid or ltid > app.tm.getLastTID() \
...@@ -39,7 +39,7 @@ class VerificationHandler(BaseServiceHandler): ...@@ -39,7 +39,7 @@ class VerificationHandler(BaseServiceHandler):
logging.critical('got later information in verification') logging.critical('got later information in verification')
raise VerificationFailure raise VerificationFailure
def answerUnfinishedTransactions(self, conn, packet, tid_list): def answerUnfinishedTransactions(self, conn, tid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
logging.info('got unfinished transactions %s from %s:%d', logging.info('got unfinished transactions %s from %s:%d',
tid_list, *(conn.getAddress())) tid_list, *(conn.getAddress()))
...@@ -50,7 +50,7 @@ class VerificationHandler(BaseServiceHandler): ...@@ -50,7 +50,7 @@ class VerificationHandler(BaseServiceHandler):
app.unfinished_tid_set.update(tid_list) app.unfinished_tid_set.update(tid_list)
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
def answerTransactionInformation(self, conn, packet, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, oid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
...@@ -68,7 +68,7 @@ class VerificationHandler(BaseServiceHandler): ...@@ -68,7 +68,7 @@ class VerificationHandler(BaseServiceHandler):
app.unfinished_oid_set = None app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
def tidNotFound(self, conn, packet, message): def tidNotFound(self, conn, message):
uuid = conn.getUUID() uuid = conn.getUUID()
logging.info('TID not found: %s', message) logging.info('TID not found: %s', message)
app = self.app app = self.app
...@@ -78,7 +78,7 @@ class VerificationHandler(BaseServiceHandler): ...@@ -78,7 +78,7 @@ class VerificationHandler(BaseServiceHandler):
app.unfinished_oid_set = None app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
def answerObjectPresent(self, conn, packet, oid, tid): def answerObjectPresent(self, conn, oid, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
logging.info('object %s:%s found', dump(oid), dump(tid)) logging.info('object %s:%s found', dump(oid), dump(tid))
app = self.app app = self.app
...@@ -87,7 +87,7 @@ class VerificationHandler(BaseServiceHandler): ...@@ -87,7 +87,7 @@ class VerificationHandler(BaseServiceHandler):
return return
app.asking_uuid_dict[uuid] = True app.asking_uuid_dict[uuid] = True
def oidNotFound(self, conn, packet, message): def oidNotFound(self, conn, message):
uuid = conn.getUUID() uuid = conn.getUUID()
logging.info('OID not found: %s', message) logging.info('OID not found: %s', message)
app = self.app app = self.app
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.protocol import ErrorCodes from neo.protocol import ErrorCodes, Packets
class CommandEventHandler(EventHandler): class CommandEventHandler(EventHandler):
""" Base handler for command """ """ Base handler for command """
...@@ -50,18 +50,20 @@ class CommandEventHandler(EventHandler): ...@@ -50,18 +50,20 @@ class CommandEventHandler(EventHandler):
super(CommandEventHandler, self).peerBroken(conn) super(CommandEventHandler, self).peerBroken(conn)
self.__disconnected() self.__disconnected()
def __answer(self, conn, packet, *args): def ack(self, conn, msg):
self.__respond((packet.getType(), ) + args) self.__respond((Packets.Error, ErrorCodes.ACK, msg))
def ack(self, conn, packet, msg): def notReady(self, conn, msg):
self.__respond((packet.getType(), ErrorCodes.ACK, msg)) self.__respond((Packets.Error, ErrorCodes.NOT_READY, msg))
def notReady(self, conn, packet, msg): def __answer(packet_type):
self.__respond((packet.getType(), ErrorCodes.NOT_READY, msg)) def answer(self, conn, *args):
self.__respond((packet_type, ) + args)
return answer
answerPartitionList = __answer answerPartitionList = __answer(Packets.AnswerPartitionList)
answerNodeList = __answer answerNodeList = __answer(Packets.AnswerNodeList)
answerNodeState = __answer answerNodeState = __answer(Packets.AnswerNodeState)
answerClusterState = __answer answerClusterState = __answer(Packets.AnswerClusterState)
answerNewNodes = __answer answerNewNodes = __answer(Packets.AnswerNewNodes)
answerPrimary = __answer answerPrimary = __answer(Packets.AnswerPrimary)
...@@ -33,18 +33,18 @@ class BaseMasterHandler(BaseStorageHandler): ...@@ -33,18 +33,18 @@ class BaseMasterHandler(BaseStorageHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
raise PrimaryFailure('connection lost') raise PrimaryFailure('connection lost')
def reelectPrimary(self, conn, packet): def reelectPrimary(self, conn):
raise PrimaryFailure('re-election occurs') raise PrimaryFailure('re-election occurs')
def notifyClusterInformation(self, conn, packet, state): def notifyClusterInformation(self, conn, state):
logging.error('ignoring notify cluster information in %s' % logging.error('ignoring notify cluster information in %s' %
self.__class__.__name__) self.__class__.__name__)
def notifyLastOID(self, conn, packet, oid): def notifyLastOID(self, conn, oid):
self.app.loid = oid self.app.loid = oid
self.app.dm.setLastOID(oid) self.app.dm.setLastOID(oid)
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
"""Store information on nodes, only if this is sent by a primary """Store information on nodes, only if this is sent by a primary
master node.""" master node."""
self.app.nm.update(node_list) self.app.nm.update(node_list)
...@@ -64,7 +64,7 @@ class BaseMasterHandler(BaseStorageHandler): ...@@ -64,7 +64,7 @@ class BaseMasterHandler(BaseStorageHandler):
class BaseClientAndStorageOperationHandler(BaseStorageHandler): class BaseClientAndStorageOperationHandler(BaseStorageHandler):
""" Accept requests common to client and storage nodes """ """ Accept requests common to client and storage nodes """
def askTIDs(self, conn, packet, first, last, partition): def askTIDs(self, conn, first, last, partition):
# This method is complicated, because I must return TIDs only # This method is complicated, because I must return TIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
...@@ -78,9 +78,9 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler): ...@@ -78,9 +78,9 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
tid_list = app.dm.getTIDList(first, last - first, tid_list = app.dm.getTIDList(first, last - first,
app.pt.getPartitions(), partition_list) app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerTIDs(tid_list), packet.getId()) conn.answer(Packets.AnswerTIDs(tid_list))
def askObjectHistory(self, conn, packet, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
if first >= last: if first >= last:
raise protocol.ProtocolError( 'invalid offsets') raise protocol.ProtocolError( 'invalid offsets')
...@@ -88,10 +88,9 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler): ...@@ -88,10 +88,9 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
history_list = app.dm.getObjectHistory(oid, first, last - first) history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None: if history_list is None:
history_list = [] history_list = []
p = Packets.AnswerObjectHistory(oid, history_list) conn.answer(Packets.AnswerObjectHistory(oid, history_list))
conn.answer(p, packet.getId())
def askTransactionInformation(self, conn, packet, tid): def askTransactionInformation(self, conn, tid):
app = self.app app = self.app
t = app.dm.getTransaction(tid) t = app.dm.getTransaction(tid)
if t is None: if t is None:
...@@ -99,13 +98,13 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler): ...@@ -99,13 +98,13 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
else: else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[0]) t[0])
conn.answer(p, packet.getId()) conn.answer(p)
def askObject(self, conn, packet, oid, serial, tid): def askObject(self, conn, oid, serial, tid):
app = self.app app = self.app
if oid in app.load_lock_dict: if oid in app.load_lock_dict:
# Delay the response. # Delay the response.
app.queueEvent(self.askObject, conn, packet, oid, app.queueEvent(self.askObject, conn, oid,
serial, tid) serial, tid)
return return
o = app.dm.getObject(oid, serial, tid) o = app.dm.getObject(oid, serial, tid)
...@@ -118,5 +117,5 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler): ...@@ -118,5 +117,5 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
else: else:
logging.debug('oid = %s not found', dump(oid)) logging.debug('oid = %s not found', dump(oid))
p = protocol.oidNotFound('%s does not exist' % dump(oid)) p = protocol.oidNotFound('%s does not exist' % dump(oid))
conn.answer(p, packet.getId()) conn.answer(p)
...@@ -85,7 +85,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -85,7 +85,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
BaseClientAndStorageOperationHandler.connectionCompleted(self, conn) BaseClientAndStorageOperationHandler.connectionCompleted(self, conn)
def abortTransaction(self, conn, packet, tid): def abortTransaction(self, conn, tid):
app = self.app app = self.app
try: try:
t = app.transaction_dict[tid] t = app.transaction_dict[tid]
...@@ -105,7 +105,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -105,7 +105,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
except KeyError: except KeyError:
pass pass
def askStoreTransaction(self, conn, packet, tid, user, desc, def askStoreTransaction(self, conn, tid, user, desc,
ext, oid_list): ext, oid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
...@@ -113,9 +113,9 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -113,9 +113,9 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
if t.isLastOIDChanged(): if t.isLastOIDChanged():
self.app.dm.setLastOID(self.app.loid) self.app.dm.setLastOID(self.app.loid)
t.addTransaction(oid_list, user, desc, ext) t.addTransaction(oid_list, user, desc, ext)
conn.answer(Packets.AnswerStoreTransaction(tid), packet.getId()) conn.answer(Packets.AnswerStoreTransaction(tid))
def askStoreObject(self, conn, packet, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
# First, check for the locking state. # First, check for the locking state.
...@@ -124,15 +124,13 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -124,15 +124,13 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
if locking_tid is not None: if locking_tid is not None:
if locking_tid < tid: if locking_tid < tid:
# Delay the response. # Delay the response.
app.queueEvent(self.askStoreObject, conn, packet, app.queueEvent(self.askStoreObject, conn, oid, serial,
oid, serial, compression, checksum, compression, checksum, data, tid)
data, tid)
else: else:
# If a newer transaction already locks this object, # If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately. # do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid)) logging.info('unresolvable conflict in %s', dump(oid))
p = Packets.AnswerStoreObject(1, oid, locking_tid) conn.answer(Packets.AnswerStoreObject(1, oid, locking_tid))
conn.answer(p, packet.getId())
return return
# Next, check if this is generated from the latest revision. # Next, check if this is generated from the latest revision.
...@@ -141,14 +139,12 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -141,14 +139,12 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
last_serial = history_list[0][0] last_serial = history_list[0][0]
if last_serial != serial: if last_serial != serial:
logging.info('resolvable conflict in %s', dump(oid)) logging.info('resolvable conflict in %s', dump(oid))
p = Packets.AnswerStoreObject(1, oid, last_serial) conn.answer(Packets.AnswerStoreObject(1, oid, last_serial))
conn.answer(p, packet.getId())
return return
# Now store the object. # Now store the object.
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid)) t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addObject(oid, compression, checksum, data) t.addObject(oid, compression, checksum, data)
p = Packets.AnswerStoreObject(0, oid, serial) conn.answer(Packets.AnswerStoreObject(0, oid, serial))
conn.answer(p, packet.getId())
app.store_lock_dict[oid] = tid app.store_lock_dict[oid] = tid
# check if a greater OID last the last generated was used # check if a greater OID last the last generated was used
......
...@@ -27,7 +27,7 @@ class HiddenHandler(BaseMasterHandler): ...@@ -27,7 +27,7 @@ class HiddenHandler(BaseMasterHandler):
self.app = app self.app = app
BaseMasterHandler.__init__(self, app) BaseMasterHandler.__init__(self, app)
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
"""Store information on nodes, only if this is sent by a primary """Store information on nodes, only if this is sent by a primary
master node.""" master node."""
app = self.app app = self.app
...@@ -42,28 +42,28 @@ class HiddenHandler(BaseMasterHandler): ...@@ -42,28 +42,28 @@ class HiddenHandler(BaseMasterHandler):
erase_db = state == NodeStates.DOWN erase_db = state == NodeStates.DOWN
self.app.shutdown(erase=erase_db) self.app.shutdown(erase=erase_db)
def requestIdentification(self, conn, packet, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
pass pass
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
pass pass
def answerPrimary(self, conn, packet, primary_uuid, def answerPrimary(self, conn, primary_uuid,
known_master_list): known_master_list):
pass pass
def askLastIDs(self, conn, packet): def askLastIDs(self, conn):
pass pass
def askPartitionTable(self, conn, packet, offset_list): def askPartitionTable(self, conn, offset_list):
pass pass
def sendPartitionTable(self, conn, packet, ptid, row_list): def sendPartitionTable(self, conn, ptid, row_list):
pass pass
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
app = self.app app = self.app
...@@ -85,59 +85,59 @@ class HiddenHandler(BaseMasterHandler): ...@@ -85,59 +85,59 @@ class HiddenHandler(BaseMasterHandler):
elif state == CellStates.OUT_OF_DATE: elif state == CellStates.OUT_OF_DATE:
app.replicator.addPartition(offset) app.replicator.addPartition(offset)
def startOperation(self, conn, packet): def startOperation(self, conn):
self.app.operational = True self.app.operational = True
def stopOperation(self, conn, packet): def stopOperation(self, conn):
pass pass
def askUnfinishedTransactions(self, conn, packet): def askUnfinishedTransactions(self, conn):
pass pass
def askTransactionInformation(self, conn, packet, tid): def askTransactionInformation(self, conn, tid):
pass pass
def askObjectPresent(self, conn, packet, oid, tid): def askObjectPresent(self, conn, oid, tid):
pass pass
def deleteTransaction(self, conn, packet, tid): def deleteTransaction(self, conn, tid):
pass pass
def commitTransaction(self, conn, packet, tid): def commitTransaction(self, conn, tid):
pass pass
def lockInformation(self, conn, packet, tid): def lockInformation(self, conn, tid):
pass pass
def notifyUnlockInformation(self, conn, packet, tid): def notifyUnlockInformation(self, conn, tid):
pass pass
def askObject(self, conn, packet, oid, serial, tid): def askObject(self, conn, oid, serial, tid):
pass pass
def askTIDs(self, conn, packet, first, last, partition): def askTIDs(self, conn, first, last, partition):
pass pass
def askObjectHistory(self, conn, packet, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
pass pass
def askStoreTransaction(self, conn, packet, tid, user, desc, def askStoreTransaction(self, conn, tid, user, desc,
ext, oid_list): ext, oid_list):
pass pass
def askStoreObject(self, conn, packet, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
pass pass
def abortTransaction(self, conn, packet, tid): def abortTransaction(self, conn, tid):
logging.debug('ignoring abort transaction') logging.debug('ignoring abort transaction')
def answerLastIDs(self, conn, packet, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
logging.debug('ignoring answer last ids') logging.debug('ignoring answer last ids')
def answerUnfinishedTransactions(self, conn, packet, tid_list): def answerUnfinishedTransactions(self, conn, tid_list):
logging.debug('ignoring answer unfinished transactions') logging.debug('ignoring answer unfinished transactions')
def askOIDs(self, conn, packet, first, last, partition): def askOIDs(self, conn, first, last, partition):
logging.debug('ignoring ask oids') logging.debug('ignoring ask oids')
...@@ -28,7 +28,7 @@ class IdentificationHandler(BaseStorageHandler): ...@@ -28,7 +28,7 @@ class IdentificationHandler(BaseStorageHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
logging.warning('A connection was lost during identification') logging.warning('A connection was lost during identification')
def requestIdentification(self, conn, packet, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
self.checkClusterName(name) self.checkClusterName(name)
# reject any incoming connections if not ready # reject any incoming connections if not ready
...@@ -61,6 +61,6 @@ class IdentificationHandler(BaseStorageHandler): ...@@ -61,6 +61,6 @@ class IdentificationHandler(BaseStorageHandler):
args = (NodeTypes.STORAGE, app.uuid, app.pt.getPartitions(), args = (NodeTypes.STORAGE, app.uuid, app.pt.getPartitions(),
app.pt.getReplicas(), uuid) app.pt.getReplicas(), uuid)
# accept the identification and trigger an event # accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(*args), packet.getId()) conn.answer(Packets.AcceptIdentification(*args))
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
...@@ -22,19 +22,19 @@ from neo import protocol ...@@ -22,19 +22,19 @@ from neo import protocol
class InitializationHandler(BaseMasterHandler): class InitializationHandler(BaseMasterHandler):
def answerNodeInformation(self, conn, packet): def answerNodeInformation(self, conn):
self.app.has_node_information = True self.app.has_node_information = True
def notifyNodeInformation(self, conn, packet, node_list): def notifyNodeInformation(self, conn, node_list):
# the whole node list is received here # the whole node list is received here
BaseMasterHandler.notifyNodeInformation(self, conn, packet, node_list) BaseMasterHandler.notifyNodeInformation(self, conn, node_list)
def sendPartitionTable(self, conn, packet, ptid, row_list): def sendPartitionTable(self, conn, ptid, row_list):
"""A primary master node sends this packet to synchronize a partition """A primary master node sends this packet to synchronize a partition
table. Note that the message can be split into multiple packets.""" table. Note that the message can be split into multiple packets."""
self.app.pt.load(ptid, row_list, self.app.nm) self.app.pt.load(ptid, row_list, self.app.nm)
def answerPartitionTable(self, conn, packet, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
app = self.app app = self.app
pt = app.pt pt = app.pt
assert not row_list assert not row_list
...@@ -58,7 +58,7 @@ class InitializationHandler(BaseMasterHandler): ...@@ -58,7 +58,7 @@ class InitializationHandler(BaseMasterHandler):
app.dm.setPartitionTable(ptid, cell_list) app.dm.setPartitionTable(ptid, cell_list)
self.app.has_partition_table = True self.app.has_partition_table = True
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
# XXX: Currently it safe to ignore those packets because the master is # XXX: Currently it safe to ignore those packets because the master is
# single threaded, it send the partition table without any changes at # single threaded, it send the partition table without any changes at
# the same time. Latter it should be needed to put in queue any changes # the same time. Latter it should be needed to put in queue any changes
......
...@@ -25,16 +25,16 @@ from neo.exception import OperationFailure ...@@ -25,16 +25,16 @@ from neo.exception import OperationFailure
class MasterOperationHandler(BaseMasterHandler): class MasterOperationHandler(BaseMasterHandler):
""" This handler is used for the primary master """ """ This handler is used for the primary master """
def stopOperation(self, conn, packet): def stopOperation(self, conn):
raise OperationFailure('operation stopped') raise OperationFailure('operation stopped')
def answerLastIDs(self, conn, packet, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
self.app.replicator.setCriticalTID(packet, ltid) self.app.replicator.setCriticalTID(conn.getUUID(), ltid)
def answerUnfinishedTransactions(self, conn, packet, tid_list): def answerUnfinishedTransactions(self, conn, tid_list):
self.app.replicator.setUnfinishedTIDList(tid_list) self.app.replicator.setUnfinishedTIDList(tid_list)
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
app = self.app app = self.app
...@@ -56,7 +56,7 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -56,7 +56,7 @@ class MasterOperationHandler(BaseMasterHandler):
elif state == CellStates.OUT_OF_DATE: elif state == CellStates.OUT_OF_DATE:
app.replicator.addPartition(offset) app.replicator.addPartition(offset)
def lockInformation(self, conn, packet, tid): def lockInformation(self, conn, tid):
app = self.app app = self.app
try: try:
t = app.transaction_dict[tid] t = app.transaction_dict[tid]
...@@ -67,9 +67,9 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -67,9 +67,9 @@ class MasterOperationHandler(BaseMasterHandler):
app.dm.storeTransaction(tid, object_list, t.getTransaction()) app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError: except KeyError:
pass pass
conn.answer(Packets.NotifyInformationLocked(tid), packet.getId()) conn.answer(Packets.NotifyInformationLocked(tid))
def notifyUnlockInformation(self, conn, packet, tid): def notifyUnlockInformation(self, conn, tid):
app = self.app app = self.app
try: try:
t = app.transaction_dict[tid] t = app.transaction_dict[tid]
......
...@@ -36,12 +36,12 @@ class ReplicationHandler(BaseStorageHandler): ...@@ -36,12 +36,12 @@ class ReplicationHandler(BaseStorageHandler):
logging.error('replication is stopped due to connection failure') logging.error('replication is stopped due to connection failure')
self.app.replicator.reset() self.app.replicator.reset()
def acceptIdentification(self, conn, packet, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
# set the UUID on the connection # set the UUID on the connection
conn.setUUID(uuid) conn.setUUID(uuid)
def answerTIDs(self, conn, packet, tid_list): def answerTIDs(self, conn, tid_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn: if app.replicator.current_connection is not conn:
return return
...@@ -68,7 +68,7 @@ class ReplicationHandler(BaseStorageHandler): ...@@ -68,7 +68,7 @@ class ReplicationHandler(BaseStorageHandler):
conn.ask(p, timeout=300) conn.ask(p, timeout=300)
app.replicator.oid_offset = 0 app.replicator.oid_offset = 0
def answerTransactionInformation(self, conn, packet, tid, def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list): user, desc, ext, oid_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn: if app.replicator.current_connection is not conn:
...@@ -77,7 +77,7 @@ class ReplicationHandler(BaseStorageHandler): ...@@ -77,7 +77,7 @@ class ReplicationHandler(BaseStorageHandler):
# Directly store the transaction. # Directly store the transaction.
app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), False) app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), False)
def answerOIDs(self, conn, packet, oid_list): def answerOIDs(self, conn, oid_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn: if app.replicator.current_connection is not conn:
return return
...@@ -93,7 +93,7 @@ class ReplicationHandler(BaseStorageHandler): ...@@ -93,7 +93,7 @@ class ReplicationHandler(BaseStorageHandler):
# finished. # finished.
app.replicator.replication_done = True app.replicator.replication_done = True
def answerObjectHistory(self, conn, packet, oid, history_list): def answerObjectHistory(self, conn, oid, history_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn: if app.replicator.current_connection is not conn:
return return
...@@ -127,7 +127,7 @@ class ReplicationHandler(BaseStorageHandler): ...@@ -127,7 +127,7 @@ class ReplicationHandler(BaseStorageHandler):
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.ask(p, timeout=300) conn.ask(p, timeout=300)
def answerObject(self, conn, packet, oid, serial_start, def answerObject(self, conn, oid, serial_start,
serial_end, compression, checksum, data): serial_end, compression, checksum, data):
app = self.app app = self.app
if app.replicator.current_connection is not conn: if app.replicator.current_connection is not conn:
......
...@@ -21,14 +21,13 @@ from neo.protocol import Packets ...@@ -21,14 +21,13 @@ from neo.protocol import Packets
class StorageOperationHandler(BaseClientAndStorageOperationHandler): class StorageOperationHandler(BaseClientAndStorageOperationHandler):
def askLastIDs(self, conn, packet): def askLastIDs(self, conn):
app = self.app app = self.app
oid = app.dm.getLastOID() oid = app.dm.getLastOID()
tid = app.dm.getLastTID() tid = app.dm.getLastTID()
p = Packets.AnswerLastIDs(oid, tid, app.pt.getID()) conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
conn.answer(p, packet.getId())
def askOIDs(self, conn, packet, first, last, partition): def askOIDs(self, conn, first, last, partition):
# This method is complicated, because I must return OIDs only # This method is complicated, because I must return OIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
...@@ -43,5 +42,5 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -43,5 +42,5 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
oid_list = app.dm.getOIDList(first, last - first, oid_list = app.dm.getOIDList(first, last - first,
app.pt.getPartitions(), partition_list) app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerOIDs(oid_list), packet.getId()) conn.answer(Packets.AnswerOIDs(oid_list))
...@@ -26,14 +26,13 @@ from neo.exception import OperationFailure ...@@ -26,14 +26,13 @@ from neo.exception import OperationFailure
class VerificationHandler(BaseMasterHandler): class VerificationHandler(BaseMasterHandler):
"""This class deals with events for a verification phase.""" """This class deals with events for a verification phase."""
def askLastIDs(self, conn, packet): def askLastIDs(self, conn):
app = self.app app = self.app
oid = app.dm.getLastOID() oid = app.dm.getLastOID()
tid = app.dm.getLastTID() tid = app.dm.getLastTID()
p = Packets.AnswerLastIDs(oid, tid, app.pt.getID()) conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
conn.answer(p, packet.getId())
def askPartitionTable(self, conn, packet, offset_list): def askPartitionTable(self, conn, offset_list):
app, pt = self.app, self.app.pt app, pt = self.app, self.app.pt
if not offset_list: if not offset_list:
# all is requested # all is requested
...@@ -51,10 +50,9 @@ class VerificationHandler(BaseMasterHandler): ...@@ -51,10 +50,9 @@ class VerificationHandler(BaseMasterHandler):
except IndexError: except IndexError:
raise protocol.ProtocolError('invalid partition table offset') raise protocol.ProtocolError('invalid partition table offset')
p = Packets.AnswerPartitionTable(app.pt.getID(), row_list) conn.answer(Packets.AnswerPartitionTable(app.pt.getID(), row_list))
conn.answer(p, packet.getId())
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
app = self.app app = self.app
...@@ -66,18 +64,17 @@ class VerificationHandler(BaseMasterHandler): ...@@ -66,18 +64,17 @@ class VerificationHandler(BaseMasterHandler):
app.pt.update(ptid, cell_list, app.nm) app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list) app.dm.changePartitionTable(ptid, cell_list)
def startOperation(self, conn, packet): def startOperation(self, conn):
self.app.operational = True self.app.operational = True
def stopOperation(self, conn, packet): def stopOperation(self, conn):
raise OperationFailure('operation stopped') raise OperationFailure('operation stopped')
def askUnfinishedTransactions(self, conn, packet): def askUnfinishedTransactions(self, conn):
tid_list = self.app.dm.getUnfinishedTIDList() tid_list = self.app.dm.getUnfinishedTIDList()
p = Packets.AnswerUnfinishedTransactions(tid_list) conn.answer(Packets.AnswerUnfinishedTransactions(tid_list))
conn.answer(p, packet.getId())
def askTransactionInformation(self, conn, packet, tid): def askTransactionInformation(self, conn, tid):
app = self.app app = self.app
t = app.dm.getTransaction(tid, all=True) t = app.dm.getTransaction(tid, all=True)
if t is None: if t is None:
...@@ -85,19 +82,19 @@ class VerificationHandler(BaseMasterHandler): ...@@ -85,19 +82,19 @@ class VerificationHandler(BaseMasterHandler):
else: else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[0]) t[0])
conn.answer(p, packet.getId()) conn.answer(p)
def askObjectPresent(self, conn, packet, oid, tid): def askObjectPresent(self, conn, oid, tid):
if self.app.dm.objectPresent(oid, tid): if self.app.dm.objectPresent(oid, tid):
p = Packets.AnswerObjectPresent(oid, tid) p = Packets.AnswerObjectPresent(oid, tid)
else: else:
p = protocol.oidNotFound( p = protocol.oidNotFound(
'%s:%s do not exist' % (dump(oid), dump(tid))) '%s:%s do not exist' % (dump(oid), dump(tid)))
conn.answer(p, packet.getId()) conn.answer(p)
def deleteTransaction(self, conn, packet, tid): def deleteTransaction(self, conn, tid):
self.app.dm.deleteTransaction(tid, all = True) self.app.dm.deleteTransaction(tid, all = True)
def commitTransaction(self, conn, packet, tid): def commitTransaction(self, conn, tid):
self.app.dm.finishTransaction(tid) self.app.dm.finishTransaction(tid)
...@@ -113,25 +113,24 @@ class Replicator(object): ...@@ -113,25 +113,24 @@ class Replicator(object):
"""Return whether there is any pending partition.""" """Return whether there is any pending partition."""
return len(self.partition_dict) or len(self.new_partition_dict) return len(self.partition_dict) or len(self.new_partition_dict)
def setCriticalTID(self, packet, tid): def setCriticalTID(self, uuid, tid):
"""This is a callback from OperationEventHandler.""" """This is a callback from OperationEventHandler."""
msg_id = packet.getId()
try: try:
partition_list = self.critical_tid_dict[msg_id] partition_list = self.critical_tid_dict[uuid]
logging.debug('setting critical TID %s to %s', logging.debug('setting critical TID %s to %s', dump(tid),
dump(tid),
', '.join([str(p.getRID()) for p in partition_list])) ', '.join([str(p.getRID()) for p in partition_list]))
for partition in self.critical_tid_dict[msg_id]: for partition in self.critical_tid_dict[uuid]:
partition.setCriticalTID(tid) partition.setCriticalTID(tid)
del self.critical_tid_dict[msg_id] del self.critical_tid_dict[uuid]
except KeyError: except KeyError:
logging.debug("setCriticalTID raised KeyError for msg_id %s" % logging.debug("setCriticalTID raised KeyError for %s" %
(msg_id, )) (dump(uuid), ))
def _askCriticalTID(self): def _askCriticalTID(self):
conn = self.primary_master_connection conn = self.primary_master_connection
msg_id = conn.ask(Packets.AskLastIDs()) conn.ask(Packets.AskLastIDs())
self.critical_tid_dict[msg_id] = self.new_partition_dict.values() uuid = conn.getUUID()
self.critical_tid_dict[uuid] = self.new_partition_dict.values()
self.partition_dict.update(self.new_partition_dict) self.partition_dict.update(self.new_partition_dict)
self.new_partition_dict = {} self.new_partition_dict = {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment