Commit 68ac26ca authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add Ask/Answer/Notify Cluster State messages.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@707 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c791f6d1
...@@ -41,7 +41,8 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT ...@@ -41,7 +41,8 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \ PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE, ASK_PARTITION_LIST, ANSWER_PARTITION_LIST, ASK_NODE_LIST, \ INTERNAL_ERROR_CODE, ASK_PARTITION_LIST, ANSWER_PARTITION_LIST, ASK_NODE_LIST, \
ANSWER_NODE_LIST, SET_NODE_STATE, ANSWER_NODE_STATE, SET_CLUSTER_STATE, \ ANSWER_NODE_LIST, SET_NODE_STATE, ANSWER_NODE_STATE, SET_CLUSTER_STATE, \
ANSWER_CLUSTER_STATE, ASK_NODE_INFORMATION, ANSWER_NODE_INFORMATION, NO_ERROR_CODE ASK_NODE_INFORMATION, ANSWER_NODE_INFORMATION, NO_ERROR_CODE, \
ASK_CLUSTER_STATE, ANSWER_CLUSTER_STATE, NOTIFY_CLUSTER_INFORMATION
class EventHandler(object): class EventHandler(object):
...@@ -344,16 +345,22 @@ class EventHandler(object): ...@@ -344,16 +345,22 @@ class EventHandler(object):
def handleAnswerNewNodes(self, conn, packet, uuid_list): def handleAnswerNewNodes(self, conn, packet, uuid_list):
raise UnexpectedPacketError raise UnexpectedPacketError
def handleSetClusterState(self, conn, packet, name, state): def handleAskNodeInformation(self, conn, packet):
raise UnexpectedPacketError
def handleAnswerNodeInformation(self, conn, packet, node_list):
raise UnexpectedPacketError
def handleAskClusterState(self, conn, packet):
raise UnexpectedPacketError raise UnexpectedPacketError
def handleAnswerClusterState(self, conn, packet, state): def handleAnswerClusterState(self, conn, packet, state):
raise UnexpectedPacketError raise UnexpectedPacketError
def handleAskNodeInformation(self, conn, packet): def handleSetClusterState(self, conn, packet, name, state):
raise UnexpectedPacketError raise UnexpectedPacketError
def handleAnswerNodeInformation(self, conn, packet, node_list): def handleNotifyClusterInformation(self, conn, packet, state):
raise UnexpectedPacketError raise UnexpectedPacketError
# Error packet handlers. # Error packet handlers.
...@@ -439,11 +446,13 @@ class EventHandler(object): ...@@ -439,11 +446,13 @@ class EventHandler(object):
d[SET_NODE_STATE] = self.handleSetNodeState d[SET_NODE_STATE] = self.handleSetNodeState
d[ANSWER_NODE_STATE] = self.handleAnswerNodeState d[ANSWER_NODE_STATE] = self.handleAnswerNodeState
d[SET_CLUSTER_STATE] = self.handleSetClusterState d[SET_CLUSTER_STATE] = self.handleSetClusterState
d[ANSWER_CLUSTER_STATE] = self.handleAnswerClusterState
d[ADD_PENDING_NODES] = self.handleAddPendingNodes d[ADD_PENDING_NODES] = self.handleAddPendingNodes
d[ANSWER_NEW_NODES] = self.handleAnswerNewNodes d[ANSWER_NEW_NODES] = self.handleAnswerNewNodes
d[ASK_NODE_INFORMATION] = self.handleAskNodeInformation d[ASK_NODE_INFORMATION] = self.handleAskNodeInformation
d[ANSWER_NODE_INFORMATION] = self.handleAnswerNodeInformation d[ANSWER_NODE_INFORMATION] = self.handleAnswerNodeInformation
d[ASK_CLUSTER_STATE] = self.handleAskClusterState
d[ANSWER_CLUSTER_STATE] = self.handleAnswerClusterState
d[NOTIFY_CLUSTER_INFORMATION] = self.handleNotifyClusterInformation
self.packet_dispatch_table = d self.packet_dispatch_table = d
......
...@@ -106,6 +106,11 @@ class MasterEventHandler(EventHandler): ...@@ -106,6 +106,11 @@ class MasterEventHandler(EventHandler):
conn.answer(protocol.answerPrimaryMaster(primary_uuid, conn.answer(protocol.answerPrimaryMaster(primary_uuid,
known_master_list), packet) known_master_list), packet)
def handleAskClusterState(self, conn, packet):
assert conn.getUUID() != protocol.INVALID_UUID
state = self.app.getClusterState()
conn.answer(protocol.answerClusterState(state), packet)
def handleAskNodeInformation(self, conn, packet): def handleAskNodeInformation(self, conn, packet):
self.app.sendNodesInformations(conn) self.app.sendNodesInformations(conn)
conn.answer(protocol.answerNodeInformation([]), packet) conn.answer(protocol.answerNodeInformation([]), packet)
......
...@@ -282,23 +282,30 @@ packet_types = Enum({ ...@@ -282,23 +282,30 @@ packet_types = Enum({
# Answer state of the node # Answer state of the node
'ANSWER_NODE_STATE': 0x8023, 'ANSWER_NODE_STATE': 0x8023,
# Set the cluster state
'SET_CLUSTER_STATE': 0x0024,
# Answer state of the cluster
'ANSWER_CLUSTER_STATE': 0x8024,
# Ask the primary to include some pending node in the partition table # Ask the primary to include some pending node in the partition table
'ADD_PENDING_NODES': 0x0025, 'ADD_PENDING_NODES': 0x0024,
# Anwer what are the nodes added in the partition table # Anwer what are the nodes added in the partition table
'ANSWER_NEW_NODES': 0x8025, 'ANSWER_NEW_NODES': 0x8024,
# Ask node information # Ask node information
'ASK_NODE_INFORMATION': 0x0026, 'ASK_NODE_INFORMATION': 0x0025,
# Answer node information # Answer node information
'ANSWER_NODE_INFORMATION': 0x8026, 'ANSWER_NODE_INFORMATION': 0x8025,
# Set the cluster state
'SET_CLUSTER_STATE': 0x0026,
# Notify information about the cluster
'NOTIFY_CLUSTER_INFORMATION': 0x8027,
# Ask state of the cluster
'ASK_CLUSTER_STATE': 0x0028,
# Answer state of the cluster
'ANSWER_CLUSTER_STATE': 0x8028,
}) })
# Error codes. # Error codes.
...@@ -943,21 +950,6 @@ def _decodeAnswerNodeState(body): ...@@ -943,21 +950,6 @@ def _decodeAnswerNodeState(body):
return (uuid, state) return (uuid, state)
decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState
@handle_errors
def _decodeSetClusterState(body):
(state, ) = unpack('!H', body[:2])
(name, _) = _readString(body, 'name', offset=2)
state = _checkClusterState(state)
return (name, state)
decode_table[SET_CLUSTER_STATE] = _decodeSetClusterState
@handle_errors
def _decodeAnswerClusterState(body):
(state, ) = unpack('!H', body)
state = _checkClusterState(state)
return (state, )
decode_table[ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState
@handle_errors @handle_errors
def _decodeAddPendingNodes(body): def _decodeAddPendingNodes(body):
(n, ) = unpack('!H', body[:2]) (n, ) = unpack('!H', body[:2])
...@@ -977,6 +969,32 @@ def _decodeAskNodeInformation(body): ...@@ -977,6 +969,32 @@ def _decodeAskNodeInformation(body):
decode_table[ASK_NODE_INFORMATION] = _decodeAskNodeInformation decode_table[ASK_NODE_INFORMATION] = _decodeAskNodeInformation
decode_table[ANSWER_NODE_INFORMATION] = _decodeNotifyNodeInformation decode_table[ANSWER_NODE_INFORMATION] = _decodeNotifyNodeInformation
def _decodeAskClusterState(body):
pass
decode_table[ASK_CLUSTER_STATE] = _decodeAskClusterState
@handle_errors
def _decodeAnswerClusterState(body):
(state, ) = unpack('!H', body)
state = _checkClusterState(state)
return (state, )
decode_table[ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState
@handle_errors
def _decodeSetClusterState(body):
(state, ) = unpack('!H', body[:2])
(name, _) = _readString(body, 'name', offset=2)
state = _checkClusterState(state)
return (name, state)
decode_table[SET_CLUSTER_STATE] = _decodeSetClusterState
@handle_errors
def _decodeNotifyClusterInformation(body):
(state, ) = unpack('!H', body)
state = _checkClusterState(state)
return (state, )
decode_table[NOTIFY_CLUSTER_INFORMATION] = _decodeNotifyClusterInformation
# Packet encoding # Packet encoding
...@@ -1266,15 +1284,6 @@ def answerNodeState(uuid, state): ...@@ -1266,15 +1284,6 @@ def answerNodeState(uuid, state):
body = ''.join(body) body = ''.join(body)
return Packet(ANSWER_NODE_STATE, body) return Packet(ANSWER_NODE_STATE, body)
def setClusterState(name, state):
body = [pack('!HL', state, len(name)), name]
body = ''.join(body)
return Packet(SET_CLUSTER_STATE, body)
def answerClusterState(state):
body = pack('!H', state)
return Packet(ANSWER_CLUSTER_STATE, body)
def addPendingNodes(uuid_list=()): def addPendingNodes(uuid_list=()):
# an empty list means all current pending nodes # an empty list means all current pending nodes
uuid_list = [pack('!16s', uuid) for uuid in uuid_list] uuid_list = [pack('!16s', uuid) for uuid in uuid_list]
...@@ -1299,3 +1308,19 @@ def answerNodeInformation(node_list): ...@@ -1299,3 +1308,19 @@ def answerNodeInformation(node_list):
body = ''.join(body) body = ''.join(body)
return Packet(ANSWER_NODE_INFORMATION, body) return Packet(ANSWER_NODE_INFORMATION, body)
def askClusterState():
return Packet(ASK_CLUSTER_STATE)
def answerClusterState(state):
body = pack('!H', state)
return Packet(ANSWER_CLUSTER_STATE, body)
def setClusterState(name, state):
body = [pack('!HL', state, len(name)), name]
body = ''.join(body)
return Packet(SET_CLUSTER_STATE, body)
def notifyClusterInformation(state):
body = pack('!H', state)
return Packet(NOTIFY_CLUSTER_INFORMATION, body)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment