Commit 0ce0731e authored by Grégory Wisniewski's avatar Grégory Wisniewski

Packet SET_CLUSTER_STATE and ANSWER_CLUSTER_STATE :

- Add handler method
- Fix encoding and decoding
- Add cluster state enumeration
Fix exception handling in newer packet decoders (no more except *: )


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@659 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent a52db4e6
...@@ -40,7 +40,8 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT ...@@ -40,7 +40,8 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \ NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \ PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE, ASK_PARTITION_LIST, ANSWER_PARTITION_LIST, ASK_NODE_LIST, \ INTERNAL_ERROR_CODE, ASK_PARTITION_LIST, ANSWER_PARTITION_LIST, ASK_NODE_LIST, \
ANSWER_NODE_LIST, SET_NODE_STATE, ANSWER_NODE_STATE ANSWER_NODE_LIST, SET_NODE_STATE, ANSWER_NODE_STATE, SET_CLUSTER_STATE, \
ANSWER_CLUSTER_STATE
# Some decorators useful to avoid duplication of patterns in handlers # Some decorators useful to avoid duplication of patterns in handlers
...@@ -387,6 +388,12 @@ class EventHandler(object): ...@@ -387,6 +388,12 @@ class EventHandler(object):
def handleAnswerNewNodes(self, conn, packet, uuid_list): def handleAnswerNewNodes(self, conn, packet, uuid_list):
raise UnexpectedPacketError raise UnexpectedPacketError
def handleSetClusterState(self, conn, packet, name, state):
raise UnexpectedPacketError
def handleAnswerClusterState(self, conn, packet, state):
raise UnexpectedPacketError
# Error packet handlers. # Error packet handlers.
...@@ -467,6 +474,8 @@ class EventHandler(object): ...@@ -467,6 +474,8 @@ class EventHandler(object):
d[ANSWER_NODE_LIST] = self.handleAnswerNodeList d[ANSWER_NODE_LIST] = self.handleAnswerNodeList
d[SET_NODE_STATE] = self.handleSetNodeState d[SET_NODE_STATE] = self.handleSetNodeState
d[ANSWER_NODE_STATE] = self.handleAnswerNodeState d[ANSWER_NODE_STATE] = self.handleAnswerNodeState
d[SET_CLUSTER_STATE] = self.handleSetClusterState
d[ANSWER_CLUSTER_STATE] = self.handleAnswerClusterState
d[ADD_PENDING_NODES] = self.handleAddPendingNodes d[ADD_PENDING_NODES] = self.handleAddPendingNodes
d[ANSWER_NEW_NODES] = self.handleAnswerNewNodes d[ANSWER_NEW_NODES] = self.handleAnswerNewNodes
......
...@@ -305,6 +305,14 @@ TIMEOUT_ERROR_CODE = 6 ...@@ -305,6 +305,14 @@ TIMEOUT_ERROR_CODE = 6
BROKEN_NODE_DISALLOWED_CODE = 7 BROKEN_NODE_DISALLOWED_CODE = 7
INTERNAL_ERROR_CODE = 8 INTERNAL_ERROR_CODE = 8
# Cluster states
cluster_states = Enum({
'BOOTING': 1,
'RUNNING': 2,
'STOPPING': 3,
})
VALID_CLUSTER_STATE_LIST = (BOOTING, RUNNING, STOPPING)
# Node types. # Node types.
node_types = Enum({ node_types = Enum({
'MASTER_NODE_TYPE' : 1, 'MASTER_NODE_TYPE' : 1,
...@@ -340,11 +348,11 @@ VALID_CELL_STATE_LIST = (UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, ...@@ -340,11 +348,11 @@ VALID_CELL_STATE_LIST = (UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE,
DISCARDED_STATE) DISCARDED_STATE)
# Other constants. # Other constants.
INVALID_UUID = '\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0' INVALID_UUID = '\0' * 16
INVALID_TID = '\0\0\0\0\0\0\0\0' INVALID_TID = '\0' * 8
INVALID_SERIAL = '\0\0\0\0\0\0\0\0' INVALID_SERIAL = '\0' * 8
INVALID_OID = '\0\0\0\0\0\0\0\0' INVALID_OID = '\0' * 8
INVALID_PTID = '\0\0\0\0\0\0\0\0' INVALID_PTID = '\0' * 8
INVALID_PARTITION = 0xffffffff INVALID_PARTITION = 0xffffffff
STORAGE_NS = 'S' STORAGE_NS = 'S'
...@@ -937,8 +945,8 @@ decode_table[ANSWER_OIDS] = _decodeAnswerOIDs ...@@ -937,8 +945,8 @@ decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
def _decodeAskPartitionList(body): def _decodeAskPartitionList(body):
try: try:
min_offset, max_offset, uuid = unpack('!LL16s', body) min_offset, max_offset, uuid = unpack('!LL16s', body)
except: except struct.error, msg:
raise ProtocolError(self, 'invalid ask partition list') raise PacketMalformedError('invalid ask partition list')
return (min_offset, max_offset, uuid) return (min_offset, max_offset, uuid)
decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList
...@@ -958,8 +966,8 @@ def _decodeAnswerPartitionList(body): ...@@ -958,8 +966,8 @@ def _decodeAnswerPartitionList(body):
cell_list.append((uuid, state)) cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list))) row_list.append((offset, tuple(cell_list)))
del cell_list[:] del cell_list[:]
except: except struct.error, msg:
raise ProtocolError(self, 'invalid answer partition list') raise PacketMalformedError('invalid answer partition list')
return ptid, row_list return ptid, row_list
decode_table[ANSWER_PARTITION_LIST] = _decodeAnswerPartitionList decode_table[ANSWER_PARTITION_LIST] = _decodeAnswerPartitionList
...@@ -968,11 +976,9 @@ def _decodeAskNodeList(body): ...@@ -968,11 +976,9 @@ def _decodeAskNodeList(body):
node_type = unpack('!H', body)[0] node_type = unpack('!H', body)[0]
node_type = node_types.get(node_type) node_type = node_types.get(node_type)
if node_type not in VALID_NODE_TYPE_LIST: if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type) raise PacketMalformedError('invalid node type %d' % node_type)
except ProtocolError: except struct.error, msg:
raise raise PacketMalformedError('invalid ask node list')
except:
raise ProtocolError(self, 'invalid ask node list')
return (node_type,) return (node_type,)
decode_table[ASK_NODE_LIST] = _decodeAskNodeList decode_table[ASK_NODE_LIST] = _decodeAskNodeList
...@@ -986,15 +992,13 @@ def _decodeAnswerNodeList(body): ...@@ -986,15 +992,13 @@ def _decodeAnswerNodeList(body):
ip_address = inet_ntoa(ip_address) ip_address = inet_ntoa(ip_address)
node_type = node_types.get(node_type) node_type = node_types.get(node_type)
if node_type not in VALID_NODE_TYPE_LIST: if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type) raise PacketMalformedError('invalid node type %d' % node_type)
state = node_states.get(state) state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST: if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state) raise PacketMalformedError('invalid node state %d' % state)
node_list.append((node_type, ip_address, port, uuid, state)) node_list.append((node_type, ip_address, port, uuid, state))
except ProtocolError: except struct.error, msg:
raise raise PacketMalformedError('invalid answer node information')
except:
raise ProtocolError(self, 'invalid answer node information')
return (node_list,) return (node_list,)
decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList
...@@ -1004,11 +1008,9 @@ def _decodeSetNodeState(body): ...@@ -1004,11 +1008,9 @@ def _decodeSetNodeState(body):
uuid, state, modify = unpack('!16sHB', body) uuid, state, modify = unpack('!16sHB', body)
state = node_states.get(state) state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST: if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state) raise PacketMalformedError('invalid node state %d' % state)
except ProtocolError: except struct.error, msg:
raise raise PacketMalformedError('invalid set node state')
except:
raise ProtocolError(self, 'invalid set node state')
return (uuid, state, modify) return (uuid, state, modify)
decode_table[SET_NODE_STATE] = _decodeSetNodeState decode_table[SET_NODE_STATE] = _decodeSetNodeState
...@@ -1017,53 +1019,43 @@ def _decodeAnswerNodeState(body): ...@@ -1017,53 +1019,43 @@ def _decodeAnswerNodeState(body):
uuid, state = unpack('!16sH', body) uuid, state = unpack('!16sH', body)
state = node_states.get(state) state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST: if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state) raise PacketMalformedError('invalid node state %d' % state)
except ProtocolError: except struct.error, msg:
raise raise PacketMalformedError('invalid answer node state')
except:
raise
raise ProtocolError(self, 'invalid answer node state')
return (uuid, state) return (uuid, state)
decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState
def _decodeSetClusterState(body): def _decodeSetClusterState(body):
try: try:
state, len_name = unpack('!HL', body) state, len_name = unpack('!HL', body[:6])
name = body[8:] name = body[6:]
if len_name != len(name): if len_name != len(name):
raise PacketMalformedError('invalid name size') raise PacketMalformedError('invalid name size')
state = cluster_states.get(state) state = cluster_states.get(state)
if state not in VALID_CLUSTER_STATE_LIST: if state is None:
raise ProtocolError(self, 'invalid cluster state %d' % state) raise PacketMalformedError('invalid cluster state %d' % state)
except ProtocolError: except struct.error, msg:
raise raise PacketMalformedError('invalid set node state')
except: return (name, state)
raise ProtocolError(self, 'invalid set node state')
return (uuid, state, modify)
decode_table[SET_CLUSTER_STATE] = _decodeSetClusterState decode_table[SET_CLUSTER_STATE] = _decodeSetClusterState
def _decodeAnswerClusterState(body): def _decodeAnswerClusterState(body):
try: try:
state = unpack('!H', body) (state, ) = unpack('!H', body)
state = cluster_states.get(state) state = cluster_states.get(state)
if state not in VALID_CLUSTER_STATE_LIST: if state is None:
raise ProtocolError(self, 'invalid cluster state %d' % state) raise PacketMalformedError('invalid cluster state %d' % state)
except ProtocolError: except struct.error, msg:
raise raise PacketMalformedError('invalid answer cluster state')
except: return (state, )
raise
raise ProtocolError(self, 'invalid answer node state')
return (uuid, state)
decode_table[ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState decode_table[ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState
def _decodeAddPendingNodes(body): def _decodeAddPendingNodes(body):
try: try:
(n, ) = unpack('!H', body[:2]) (n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)] uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
except: except struct.error, msg:
raise raise PacketMalformedError('invalid add pending nodes')
raise ProtocolError(self, 'invalide add pending nodes')
return (uuid_list, ) return (uuid_list, )
decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes
...@@ -1071,9 +1063,8 @@ def _decodeAnswerNewNodes(body): ...@@ -1071,9 +1063,8 @@ def _decodeAnswerNewNodes(body):
try: try:
(n, ) = unpack('!H', body[:2]) (n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)] uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
except: except struct.error, msg:
raise raise PacketMalformedError('invalid answer new nodes')
raise ProtocolError(self, 'invalide answer new nodes')
return (uuid_list, ) return (uuid_list, )
decode_table[ANSWER_NEW_NODES] = _decodeAnswerNewNodes decode_table[ANSWER_NEW_NODES] = _decodeAnswerNewNodes
...@@ -1365,13 +1356,12 @@ def answerNodeState(uuid, state): ...@@ -1365,13 +1356,12 @@ def answerNodeState(uuid, state):
return Packet(ANSWER_NODE_STATE, body) return Packet(ANSWER_NODE_STATE, body)
def setClusterState(name, state): def setClusterState(name, state):
body = [pack('!HL', state, len(name))+name] body = [pack('!HL', state, len(name)), name]
body = ''.join(body) body = ''.join(body)
return Packet(SET_CLUSTER_STATE, body) return Packet(SET_CLUSTER_STATE, body)
def answerClusterState(state): def answerClusterState(state):
body = [pack('!H', uuid, state)] body = pack('!H', state)
body = ''.join(body)
return Packet(ANSWER_CLUSTER_STATE, body) return Packet(ANSWER_CLUSTER_STATE, body)
def addPendingNodes(uuid_list=()): def addPendingNodes(uuid_list=()):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment