Commit 9d941fb5 authored by Aurel's avatar Aurel

add new packet type and handler


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@531 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 8f563d6a
......@@ -15,7 +15,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
import logging, traceback
from neo import protocol
from neo.protocol import Packet, PacketMalformedError, UnexpectedPacketError, \
......@@ -39,7 +39,8 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
ASK_OIDS, ANSWER_OIDS, \
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE
INTERNAL_ERROR_CODE, ASK_PARTITION_LIST, ANSWER_PARTITION_LIST, ASK_NODE_LIST, \
ANSWER_NODE_LIST, SET_NODE_STATE, ANSWER_NODE_STATE
# Some decorators useful to avoid duplication of patterns in handlers
......@@ -198,7 +199,6 @@ class EventHandler(object):
except ProtocolError, e:
self.protocolError(conn, packet, *e.args)
# Packet handlers.
def handleError(self, conn, packet, code, message):
......@@ -361,6 +361,24 @@ class EventHandler(object):
def handleAnswerOIDs(self, conn, packet, oid_list):
raise UnexpectedPacketError
def handleAskPartitionList(self, conn, packet, offset_list):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPartitionList(self, conn, packet, ptid, row_list):
self.handleUnexpectedPacket(conn, packet)
def handleAskNodeList(self, conn, packet, offset_list):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNodeList(self, conn, packet, ptid, row_list):
self.handleUnexpectedPacket(conn, packet)
def handleSetNodeState(self, conn, packet, uuid, state):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNodeState(self, conn, packet, uuid, state):
self.handleUnexpectedPacket(conn, packet)
# Error packet handlers.
# XXX: why answer a protocolError to another protocolError ?
......@@ -434,6 +452,12 @@ class EventHandler(object):
d[ANSWER_OBJECT_HISTORY] = self.handleAnswerObjectHistory
d[ASK_OIDS] = self.handleAskOIDs
d[ANSWER_OIDS] = self.handleAnswerOIDs
d[ASK_PARTITION_LIST] = self.handleAskPartitionList
d[ANSWER_PARTITION_LIST] = self.handleAnswerPartitionList
d[ASK_NODE_LIST] = self.handleAskNodeList
d[ANSWER_NODE_LIST] = self.handleAnswerNodeList
d[SET_NODE_STATE] = self.handleSetNodeState
d[ANSWER_NODE_STATE] = self.handleAnswerNodeState
self.packet_dispatch_table = d
......
......@@ -67,6 +67,7 @@ class Enum(object):
def __init__(self, value_dict):
global_dict = globals()
self.enum_dict = enum_dict = {}
self.str_enum_dict = str_enum_dict = {}
for key, value in value_dict.iteritems():
# Only integer types are supported. This should be enough, and
# extending support to other types would only make moving to other
......@@ -75,10 +76,14 @@ class Enum(object):
raise TypeError, 'Enum class only support integer values.'
item = EnumItem(self, key, value)
global_dict[key] = enum_dict[value] = item
str_enum_dict[key] = item
def get(self, value, default=None):
return self.enum_dict.get(value, default)
def getFromStr(self, value, default=None):
return self.str_enum_dict.get(value, default)
def __getitem__(self, value):
return self.enum_dict[value]
......@@ -257,6 +262,25 @@ packet_types = Enum({
# Answer the requested OIDs. S -> S.
'ANSWER_OIDS': 0x8020,
# All the following messages are for neoctl to admin node
# Ask information about partition
'ASK_PARTITION_LIST': 0x0021,
# Answer information about partition
'ANSWER_PARTITION_LIST': 0x8021,
# Ask information about nodes
'ASK_NODE_LIST': 0x0022,
# Answer information about nodes
'ANSWER_NODE_LIST': 0x8022,
# Set the node state
'SET_NODE_STATE': 0x0023,
# Answer state of the node
'ANSWER_NODE_STATE': 0x8023,
})
# Error codes.
......@@ -270,10 +294,12 @@ BROKEN_NODE_DISALLOWED_CODE = 7
INTERNAL_ERROR_CODE = 8
# Node types.
MASTER_NODE_TYPE = 1
STORAGE_NODE_TYPE = 2
CLIENT_NODE_TYPE = 3
ADMIN_NODE_TYPE = 4
node_types = Enum({
'MASTER_NODE_TYPE' : 1,
'STORAGE_NODE_TYPE' : 2,
'CLIENT_NODE_TYPE' : 3,
'ADMIN_NODE_TYPE' : 4,
})
VALID_NODE_TYPE_LIST = (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE)
......@@ -283,9 +309,10 @@ node_states = Enum({
'TEMPORARILY_DOWN_STATE': 1,
'DOWN_STATE': 2,
'BROKEN_STATE': 3,
'HIDDEN_STATE' : 4,
})
VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE)
VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, HIDDEN_STATE)
# Partition cell states.
partition_cell_states = Enum({
......@@ -369,12 +396,14 @@ class Packet(object):
return PACKET_HEADER_SIZE
def encode(self):
print "encode", self._id, self._type
msg = pack('!LHL', self._id, self._type, PACKET_HEADER_SIZE + len(self._body)) + self._body
if len(msg) > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % len(msg))
return msg
__str__ = encode
# Decoders.
def decode(self):
try:
method = decode_table[self._type]
......@@ -424,6 +453,7 @@ def _decodePong(body):
pass
decode_table[PONG] = _decodePong
def _decodeRequestNodeIdentification(body):
try:
body = body
......@@ -888,6 +918,99 @@ def _decodeAnswerOIDs(body):
return (oid_list,)
decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
def _decodeAskPartitionList(body):
try:
min_offset, max_offset, uuid = unpack('!LL16s', body)
except:
raise ProtocolError(self, 'invalid ask partition list')
return (min_offset, max_offset, uuid)
decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList
def _decodeAnswerPartitionList(body):
try:
ptid, n = unpack('!8sL', body[:12])
index = 12
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = partition_cell_states.get(state)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
except:
raise ProtocolError(self, 'invalid answer partition list')
return ptid, row_list
decode_table[ANSWER_PARTITION_LIST] = _decodeAnswerPartitionList
def _decodeAskNodeList(body):
try:
node_type = unpack('!H', body)[0]
node_type = node_types.get(node_type)
if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type)
except ProtocolError:
raise
except:
raise ProtocolError(self, 'invalid ask node list')
return (node_type,)
decode_table[ASK_NODE_LIST] = _decodeAskNodeList
def _decodeAnswerNodeList(body):
try:
n = unpack('!L', body[:4])[0]
node_list = []
for i in xrange(n):
r = unpack('!H4sH16sH', body[4+i*26:30+i*26])
node_type, ip_address, port, uuid, state = r
ip_address = inet_ntoa(ip_address)
node_type = node_types.get(node_type)
if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type)
state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state)
node_list.append((node_type, ip_address, port, uuid, state))
except ProtocolError:
raise
except:
raise ProtocolError(self, 'invalid answer node information')
return (node_list,)
decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList
def _decodeSetNodeState(body):
try:
uuid, state = unpack('!16sH', body)
state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state)
except ProtocolError:
raise
except:
raise ProtocolError(self, 'invalid set node state')
return (uuid, state)
decode_table[SET_NODE_STATE] = _decodeSetNodeState
def _decodeAnswerNodeState(body):
try:
uuid, state = unpack('!16sH', body)
state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST:
raise ProtocolError(self, 'invalid node state %d' % state)
except ProtocolError:
raise
except:
raise
raise ProtocolError(self, 'invalid answer node state')
return (uuid, state)
decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState
# Packet encoding
def _error(error_code, error_message):
......@@ -1135,3 +1258,40 @@ def answerOIDs(oid_list):
body.extend(oid_list)
body = ''.join(body)
return Packet(ANSWER_OIDS, body)
def askPartitionList(min_offset, max_offset, uuid):
body = [pack('!LL16s', min_offset, max_offset, uuid)]
body = ''.join(body)
return Packet(ASK_PARTITION_LIST, body)
def answerPartitionList(ptid, row_list):
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(ANSWER_PARTITION_LIST, body)
def askNodeList(node_type):
body = [pack('!H', node_type)]
body = ''.join(body)
return Packet(ASK_NODE_LIST, body)
def answerNodeList(node_list):
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
body = ''.join(body)
return Packet(ANSWER_NODE_LIST, body)
def setNodeState(uuid, state):
body = [pack('!16sH', uuid, state)]
body = ''.join(body)
return Packet(SET_NODE_STATE, body)
def answerNodeState(uuid, state):
body = [pack('!16sH', uuid, state)]
body = ''.join(body)
return Packet(ANSWER_NODE_STATE, body)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment