Commit c109d56c authored by Vincent Pelletier's avatar Vincent Pelletier

Change Connection.answer prototype: it now takes a msg_id as parameter, not a...

Change Connection.answer prototype: it now takes a msg_id as parameter, not a message any longer. This make this method easier to call from places where the entire message might not be available.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1045 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f04b641b
......@@ -28,7 +28,7 @@ class AdminEventHandler(EventHandler):
def __notConnected(self, conn, packet):
conn.answer(protocol.notReady('Not connected to a primary master.'),
packet)
packet.getId())
def handleAskPartitionList(self, conn, packet, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s" %(min_offset, max_offset, dump(uuid)))
......@@ -64,7 +64,7 @@ class AdminEventHandler(EventHandler):
port = 0
node_information_list.append((node.getNodeType(), (ip, port), node.getUUID(), node.getState()))
p = protocol.answerNodeList(node_information_list)
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s" %(dump(uuid), state))
......@@ -76,7 +76,7 @@ class AdminEventHandler(EventHandler):
if node.getState() == state and modify_partition_table is False:
# no change
p = protocol.answerNodeState(node.getUUID(), node.getState())
conn.answer(p, packet)
conn.answer(p, packet.getId())
return
# forward to primary master node
master_conn = self.app.master_conn
......@@ -120,7 +120,7 @@ class AdminEventHandler(EventHandler):
msg_id = master_conn.ask(protocol.askClusterState())
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
return
conn.answer(protocol.answerClusterState(self.app.cluster_state), packet)
conn.answer(protocol.answerClusterState(self.app.cluster_state), packet.getId())
class MasterEventHandler(EventHandler):
......
......@@ -242,7 +242,7 @@ class Connection(BaseConnection):
packet_type = packet.getType()
if packet_type == protocol.PING:
# Send a pong notification
self.answer(protocol.pong(), packet)
self.answer(protocol.pong(), packet.getId())
elif packet_type != protocol.PONG:
# Skip PONG packets, its only purpose is to drop IdleEvent
# generated upong ping.
......@@ -400,9 +400,8 @@ class Connection(BaseConnection):
return msg_id
@not_closed
def answer(self, packet, answered_packet):
def answer(self, packet, msg_id):
""" Answer to a packet by re-using its ID for the packet answer """
msg_id = answered_packet.getId()
packet.setId(msg_id)
self._addPacket(packet)
......
......@@ -101,7 +101,7 @@ class EventHandler(object):
logging.error('malformed packet %s from %s:%d: %s', packet.getType(), *args)
response = protocol.protocolError(message)
if packet is not None:
conn.answer(response, packet)
conn.answer(response, packet.getId())
else:
conn.notify(response)
conn.abort()
......@@ -116,23 +116,23 @@ class EventHandler(object):
message = 'unexpected packet: %s in %s' % (message,
self.__class__.__name__)
logging.error('%s', message)
conn.answer(protocol.protocolError(message), packet)
conn.answer(protocol.protocolError(message), packet.getId())
conn.abort()
self.peerBroken(conn)
def brokenNodeDisallowedError(self, conn, packet, *args):
""" Called when a broken node send packets """
conn.answer(protocol.brokenNodeDisallowedError('go away'), packet)
conn.answer(protocol.brokenNodeDisallowedError('go away'), packet.getId())
conn.abort()
def notReadyError(self, conn, packet, *args):
""" Called when the node is not ready """
conn.answer(protocol.notReady('retry later'), packet)
conn.answer(protocol.notReady('retry later'), packet.getId())
conn.abort()
def protocolError(self, conn, packet, message='', *args):
""" Called for any other protocol error """
conn.answer(protocol.protocolError(message), packet)
conn.answer(protocol.protocolError(message), packet.getId())
conn.abort()
def dispatch(self, conn, packet):
......
......@@ -78,22 +78,24 @@ class MasterHandler(EventHandler):
continue
known_master_list.append((n.getServer(), n.getUUID(), ))
conn.answer(protocol.answerPrimaryMaster(primary_uuid,
known_master_list), packet)
known_master_list),
packet.getId())
def handleAskClusterState(self, conn, packet):
assert conn.getUUID() is not None
state = self.app.getClusterState()
conn.answer(protocol.answerClusterState(state), packet)
conn.answer(protocol.answerClusterState(state), packet.getId())
def handleAskNodeInformation(self, conn, packet):
self.app.sendNodesInformations(conn)
conn.answer(protocol.answerNodeInformation([]), packet)
conn.answer(protocol.answerNodeInformation([]), packet.getId())
def handleAskPartitionTable(self, conn, packet, offset_list):
assert len(offset_list) == 0
app = self.app
app.sendPartitionTable(conn)
conn.answer(protocol.answerPartitionTable(app.pt.getID(), []), packet)
conn.answer(protocol.answerPartitionTable(app.pt.getID(), []),
packet.getId())
class BaseServiceHandler(MasterHandler):
......@@ -101,10 +103,10 @@ class BaseServiceHandler(MasterHandler):
def handleAskLastIDs(self, conn, packet):
app = self.app
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.pt.getID()), packet)
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.pt.getID()), packet.getId())
def handleAskUnfinishedTransactions(self, conn, packet):
app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet)
conn.answer(p, packet.getId())
......@@ -32,12 +32,12 @@ class AdministrationHandler(MasterHandler):
def handleAskPrimaryMaster(self, conn, packet):
app = self.app
# I'm the primary
conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet)
conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet.getId())
def handleSetClusterState(self, conn, packet, state):
self.app.changeClusterState(state)
p = protocol.noError('cluster state changed')
conn.answer(p, packet)
conn.answer(p, packet.getId())
if state == protocol.STOPPING:
self.app.cluster_state = state
self.app.shutdown()
......@@ -48,7 +48,7 @@ class AdministrationHandler(MasterHandler):
node = app.nm.getNodeByUUID(uuid)
if node is None:
p = protocol.protocolError('invalid uuid')
conn.answer(p, packet)
conn.answer(p, packet.getId())
return
if uuid == app.uuid:
......@@ -56,19 +56,19 @@ class AdministrationHandler(MasterHandler):
if state == RUNNING_STATE:
# yes I know
p = protocol.noError('node state changed')
conn.answer(p, packet)
conn.answer(p, packet.getId())
return
else:
# I was asked to shutdown
node.setState(state)
p = protocol.noError('node state changed')
conn.answer(p, packet)
conn.answer(p, packet.getId())
app.shutdown()
if node.getState() == state:
# no change, just notify admin node
p = protocol.noError('node state changed')
conn.answer(p, packet)
conn.answer(p, packet.getId())
else:
# first make sure to have a connection to the node
node_conn = None
......@@ -85,7 +85,7 @@ class AdministrationHandler(MasterHandler):
node.setState(state)
p = protocol.noError('state changed')
conn.answer(p, packet)
conn.answer(p, packet.getId())
app.broadcastNodeInformation(node)
# If this is a storage node, ask it to start.
if node.isStorage() and state == RUNNING_STATE \
......@@ -128,7 +128,7 @@ class AdministrationHandler(MasterHandler):
if not uuid_set:
logging.warning('No nodes added')
p = protocol.noError('no nodes added')
conn.answer(p, packet)
conn.answer(p, packet.getId())
return
uuids = ', '.join([dump(uuid) for uuid in uuid_set])
logging.info('Adding nodes %s' % uuids)
......@@ -146,4 +146,4 @@ class AdministrationHandler(MasterHandler):
# broadcast the new partition table
app.broadcastPartitionChanges(app.pt.setNextID(), cell_list)
p = protocol.noError('node added')
conn.answer(p, packet)
conn.answer(p, packet.getId())
......@@ -104,11 +104,11 @@ class ClientServiceHandler(BaseServiceHandler):
tid = app.getNextTID()
app.ltid = tid
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerBeginTransaction(tid), packet)
conn.answer(protocol.answerBeginTransaction(tid), packet.getId())
def handleAskNewOIDs(self, conn, packet, num_oids):
oid_list = self.app.getNewOIDList(num_oids)
conn.answer(protocol.answerNewOIDs(oid_list), packet)
conn.answer(protocol.answerNewOIDs(oid_list), packet.getId())
def handleFinishTransaction(self, conn, packet, oid_list, tid):
app = self.app
......
......@@ -253,7 +253,7 @@ class ServerElectionHandler(MasterHandler):
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, app.uuid,
app.server, app.pt.getPartitions(), app.pt.getReplicas(), uuid)
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
......
......@@ -76,7 +76,7 @@ class IdentificationHandler(MasterHandler):
# answer
args = (protocol.MASTER_NODE_TYPE, app.uuid, app.server,
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
conn.answer(protocol.acceptNodeIdentification(*args), packet)
conn.answer(protocol.acceptNodeIdentification(*args), packet.getId())
# trigger the event
handler.connectionCompleted(conn)
app.broadcastNodeInformation(node)
......
......@@ -99,7 +99,7 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
tid_list = app.dm.getTIDList(first, last - first,
app.pt.getPartitions(), partition_list)
conn.answer(protocol.answerTIDs(tid_list), packet)
conn.answer(protocol.answerTIDs(tid_list), packet.getId())
def handleAskObjectHistory(self, conn, packet, oid, first, last):
if first >= last:
......@@ -110,7 +110,7 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
if history_list is None:
history_list = []
p = protocol.answerObjectHistory(oid, history_list)
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app
......@@ -119,7 +119,7 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
p = protocol.tidNotFound('%s does not exist' % dump(tid))
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleAskObject(self, conn, packet, oid, serial, tid):
app = self.app
......@@ -138,5 +138,5 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
else:
logging.debug('oid = %s not found', dump(oid))
p = protocol.oidNotFound('%s does not exist' % dump(oid))
conn.answer(p, packet)
conn.answer(p, packet.getId())
......@@ -103,7 +103,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
app = self.app
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addTransaction(oid_list, user, desc, ext)
conn.answer(protocol.answerStoreTransaction(tid), packet)
conn.answer(protocol.answerStoreTransaction(tid), packet.getId())
def handleAskStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid):
......@@ -127,7 +127,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
# do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid))
p = protocol.answerStoreObject(1, oid, locking_tid)
conn.answer(p, packet)
conn.answer(p, packet.getId())
return
# Next, check if this is generated from the latest revision.
......@@ -137,12 +137,12 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
if last_serial != serial:
logging.info('resolvable conflict in %s', dump(oid))
p = protocol.answerStoreObject(1, oid, last_serial)
conn.answer(p, packet)
conn.answer(p, packet.getId())
return
# Now store the object.
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addObject(oid, compression, checksum, data)
p = protocol.answerStoreObject(0, oid, serial)
conn.answer(p, packet)
conn.answer(p, packet.getId())
app.store_lock_dict[oid] = tid
......@@ -68,6 +68,6 @@ class IdentificationHandler(BaseStorageHandler):
args = (protocol.STORAGE_NODE_TYPE, app.uuid, app.server,
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# accept the identification and trigger an event
conn.answer(protocol.acceptNodeIdentification(*args), packet)
conn.answer(protocol.acceptNodeIdentification(*args), packet.getId())
handler.connectionCompleted(conn)
......@@ -69,7 +69,7 @@ class MasterOperationHandler(BaseMasterHandler):
app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError:
pass
conn.answer(protocol.notifyInformationLocked(tid), packet)
conn.answer(protocol.notifyInformationLocked(tid), packet.getId())
def handleUnlockInformation(self, conn, packet, tid):
app = self.app
......
......@@ -28,7 +28,7 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
oid = app.dm.getLastOID()
tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.ptid)
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleAskOIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return OIDs only
......@@ -51,5 +51,5 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
partition_list = [partition]
oid_list = app.dm.getOIDList(first, last - first,
app.pt.getPartitions(), partition_list)
conn.answer(protocol.answerOIDs(oid_list), packet)
conn.answer(protocol.answerOIDs(oid_list), packet.getId())
......@@ -30,7 +30,7 @@ class VerificationHandler(BaseMasterHandler):
oid = app.dm.getLastOID()
tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.ptid)
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleAskPartitionTable(self, conn, packet, offset_list):
app, pt = self.app, self.app.pt
......@@ -51,7 +51,7 @@ class VerificationHandler(BaseMasterHandler):
raise protocol.ProtocolError('invalid partition table offset')
p = protocol.answerPartitionTable(app.ptid, row_list)
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
......@@ -75,7 +75,7 @@ class VerificationHandler(BaseMasterHandler):
def handleAskUnfinishedTransactions(self, conn, packet):
tid_list = self.app.dm.getUnfinishedTIDList()
p = protocol.answerUnfinishedTransactions(tid_list)
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app
......@@ -84,7 +84,7 @@ class VerificationHandler(BaseMasterHandler):
p = protocol.tidNotFound('%s does not exist' % dump(tid))
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleAskObjectPresent(self, conn, packet, oid, tid):
if self.app.dm.objectPresent(oid, tid):
......@@ -92,7 +92,7 @@ class VerificationHandler(BaseMasterHandler):
else:
p = protocol.oidNotFound(
'%s:%s do not exist' % (dump(oid), dump(tid)))
conn.answer(p, packet)
conn.answer(p, packet.getId())
def handleDeleteTransaction(self, conn, packet, tid):
self.app.dm.deleteTransaction(tid, all = True)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment