Commit ad0d2ccf authored by Grégory Wisniewski's avatar Grégory Wisniewski

Replace INVALID_UUID values with None out of protocol.py module to unify

condition. Packet encoding and decoding handle None/INVALID_UUID mapping.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@871 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 00705a8c
......@@ -19,7 +19,7 @@ import logging
from time import time
from neo.config import ConfigurationManager
from neo.protocol import INVALID_UUID, INVALID_PTID, MASTER_NODE_TYPE
from neo.protocol import INVALID_PTID, MASTER_NODE_TYPE
from neo.node import NodeManager, MasterNode
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection
......@@ -71,7 +71,7 @@ class Application(object):
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = INVALID_UUID
self.uuid = None
self.primary_master_node = None
self.ptid = INVALID_PTID
self.monitoring_handler = MasterMonitoringEventHandler(self)
......@@ -153,7 +153,7 @@ class Application(object):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid != INVALID_UUID and cell.getUUID() != uuid:
if uuid is not None and cell.getUUID() != uuid:
continue
else:
row.append((cell.getUUID(), cell.getState()))
......
......@@ -18,8 +18,7 @@
import logging
from neo.handler import EventHandler
from neo.protocol import INVALID_UUID, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
ADMIN_NODE_TYPE, TEMPORARILY_DOWN_STATE
from neo.node import MasterNode, StorageNode, ClientNode, AdminNode
from neo import protocol
......@@ -168,11 +167,11 @@ class MasterBaseEventHandler(EventHandler):
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
if uuid is not None:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
if n is not None and uuid is not None:
# node only exists by address, remove it
nm.remove(n)
n = None
......@@ -185,15 +184,15 @@ class MasterBaseEventHandler(EventHandler):
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
else:
n.setUUID(INVALID_UUID)
n.setUUID(None)
elif node_type in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
if uuid == INVALID_UUID:
if uuid is None:
# No interest.
continue
if n is None:
......
......@@ -32,7 +32,7 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, name, node_type, uuid=protocol.INVALID_UUID, server=NO_SERVER):
def __init__(self, app, name, node_type, uuid=None, server=NO_SERVER):
EventHandler.__init__(self, app)
self.primary = None
self.server = server
......
......@@ -26,7 +26,7 @@ from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import MTClientConnection
from neo import protocol
from neo.protocol import INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
from neo.protocol import INVALID_TID, INVALID_PARTITION, \
INVALID_PTID, CLIENT_NODE_TYPE, INVALID_SERIAL, \
DOWN_STATE, HIDDEN_STATE
from neo.client.handlers.master import PrimaryBootstrapHandler, \
......@@ -252,7 +252,7 @@ class Application(object):
master_node_list.append(server)
self.nm.add(MasterNode(server=server))
# no self-assigned UUID, primary master will supply us one
self.uuid = INVALID_UUID
self.uuid = None
self.mq_cache = MQ()
self.new_oid_list = []
self.ptid = INVALID_PTID
......@@ -463,7 +463,7 @@ class Application(object):
# (...per client)
# - have a sleep in the code (yuck !)
sleep(5)
if self.uuid != INVALID_UUID:
if self.uuid is not None:
# TODO: pipeline those 2 requests
# This is currently impossible because _waitMessage can only
# wait on one message at a time
......@@ -481,7 +481,7 @@ class Application(object):
finally:
conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
ready = self.uuid != INVALID_UUID and self.pt is not None \
ready = self.uuid is not None and self.pt is not None \
and self.pt.operational()
logging.info("connected to primary master node %s" % self.primary_master_node)
return conn
......
......@@ -19,7 +19,7 @@ import logging
from neo.client.handlers import BaseHandler, AnswerBaseHandler
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE
RUNNING_STATE, TEMPORARILY_DOWN_STATE
from neo.node import MasterNode, StorageNode
from neo.pt import MTPartitionTable as PartitionTable
from neo.util import dump
......@@ -63,7 +63,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
conn.setUUID(uuid)
node.setUUID(uuid)
if your_uuid != INVALID_UUID:
if your_uuid is not None:
# got an uuid from the primary master
app.uuid = your_uuid
......@@ -80,13 +80,13 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
if primary_uuid is not None:
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
......@@ -236,11 +236,11 @@ class PrimaryNotificationsHandler(BaseHandler):
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
if uuid is not None:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
if n is not None and uuid is not None:
# node only exists by address, remove it
nm.remove(n)
n = None
......@@ -253,13 +253,13 @@ class PrimaryNotificationsHandler(BaseHandler):
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
if uuid is None:
# No interest.
continue
if n is None:
......
......@@ -200,7 +200,7 @@ class EpollEventManager(object):
""" Return the connection associated to the UUID, None if the UUID is
None, invalid or not found"""
# FIXME: We may optimize this by using use a dict on UUIDs
if uuid in (None, protocol.INVALID_UUID):
if uuid is None:
return None
for conn in self.connection_dict.values():
if conn.getUUID() == uuid:
......
......@@ -24,7 +24,7 @@ from neo.config import ConfigurationManager
from neo import protocol
from neo.protocol import \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \
INVALID_OID, INVALID_TID, INVALID_PTID, \
CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
UUID_NAMESPACES, ADMIN_NODE_TYPE, BOOTING
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode
......@@ -341,7 +341,7 @@ class Application(object):
except TypeError:
ip_address, port = '0.0.0.0', 0
node_list.append((n.getNodeType(), ip_address, port,
n.getUUID() or INVALID_UUID, n.getState()))
n.getUUID(), n.getState()))
# Split the packet if too huge.
if len(node_list) == 10000:
conn.notify(protocol.notifyNodeInformation(node_list))
......@@ -709,7 +709,7 @@ class Application(object):
def getNewUUID(self, node_type):
# build an UUID
uuid = os.urandom(15)
while uuid == INVALID_UUID[1:]:
while uuid == '\00' * 15: # XXX: INVALID_UUID[1:]
uuid = os.urandom(15)
# look for the prefix
prefix = UUID_NAMESPACES.get(node_type, None)
......@@ -721,7 +721,7 @@ class Application(object):
node = self.nm.getNodeByUUID(uuid)
if node is not None and node.getServer() is not None and node.getServer() != addr:
return False
return uuid != self.uuid and uuid != INVALID_UUID
return uuid != self.uuid and uuid is not None
def getClusterState(self):
return self.cluster_state
......@@ -762,26 +762,26 @@ class Application(object):
def identifyStorageNode(self, uuid, node):
# TODO: check all cases here, when server address change...
# in verification and running states, if the node is unknown but the
# uuid != INVALID_UUID, we have to give it a new uuid, but in recovery
# uuid is not None, we have to give it a new uuid, but in recovery
# the node must keep it's UUID
state = protocol.RUNNING_STATE
handler = None
if self.cluster_state == protocol.RECOVERING:
if uuid == protocol.INVALID_UUID:
if uuid is None:
logging.info('reject empty storage node')
raise protocol.NotReadyError
handler = handlers.RecoveryHandler
elif self.cluster_state == protocol.VERIFYING:
if uuid == INVALID_UUID or node is None:
if uuid is None or node is None:
# if node is unknown, it has been forget when the current
# partition was validated by the admin
uuid = INVALID_UUID
uuid = None
state = protocol.PENDING_STATE
handler = handlers.VerificationHandler
elif self.cluster_state == protocol.RUNNING:
if uuid == INVALID_UUID or node is None:
if uuid is None or node is None:
# same as for verification
uuid = INVALID_UUID
uuid = None
state = protocol.PENDING_STATE
handler = handlers.StorageServiceHandler
elif self.cluster_state == protocol.STOPPING:
......
......@@ -118,19 +118,18 @@ class MasterHandler(EventHandler):
elif app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID()
else:
primary_uuid = protocol.INVALID_UUID
primary_uuid = None
known_master_list = [app.server + (app.uuid, )]
for n in app.nm.getMasterNodeList():
if n.getState() == protocol.BROKEN_STATE:
continue
known_master_list.append(n.getServer() + \
(n.getUUID() or protocol.INVALID_UUID, ))
known_master_list.append(n.getServer() + (n.getUUID(), ))
conn.answer(protocol.answerPrimaryMaster(primary_uuid,
known_master_list), packet)
def handleAskClusterState(self, conn, packet):
assert conn.getUUID() != protocol.INVALID_UUID
assert conn.getUUID() is not None
state = self.app.getClusterState()
conn.answer(protocol.answerClusterState(state), packet)
......@@ -155,7 +154,7 @@ class BaseServiceHandler(MasterHandler):
# No interest.
continue
if uuid == protocol.INVALID_UUID:
if uuid is None:
# No interest.
continue
......
......@@ -22,7 +22,7 @@ from neo.protocol import CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \
HIDDEN_STATE, INVALID_UUID, INTERNAL_ERROR_CODE
HIDDEN_STATE, INTERNAL_ERROR_CODE
from neo.master.handlers import BaseServiceHandler
from neo.protocol import UnexpectedPacketError
from neo.util import dump
......
......@@ -23,7 +23,6 @@ from neo.protocol import MASTER_NODE_TYPE, \
DOWN_STATE
from neo.master.handlers import MasterHandler
from neo.exception import ElectionFailure
from neo.protocol import INVALID_UUID
from neo.node import MasterNode
class ElectionHandler(MasterHandler):
......@@ -51,7 +50,7 @@ class ElectionHandler(MasterHandler):
app.nm.add(node)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if node.getUUID() is None:
......@@ -179,13 +178,13 @@ class ClientElectionHandler(MasterHandler):
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
if primary_uuid is not None:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
......
......@@ -63,7 +63,7 @@ class IdentificationHandler(MasterHandler):
# ask the app the node identification, if refused, an exception is raised
result = self.app.identifyNode(node_type, uuid, node)
(uuid, node, state, handler, klass) = result
if uuid == protocol.INVALID_UUID:
if uuid is None:
# no valid uuid, give it one
uuid = app.getNewUUID(node_type)
if node is None:
......
......@@ -21,7 +21,7 @@ from neo import protocol
from neo.protocol import RUNNING_STATE, BROKEN_STATE, \
TEMPORARILY_DOWN_STATE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE
from neo.master.handlers import MasterHandler
from neo.protocol import UnexpectedPacketError, INVALID_UUID, INVALID_PTID
from neo.protocol import UnexpectedPacketError, INVALID_PTID
from neo.node import StorageNode
from neo.util import dump
......@@ -39,7 +39,7 @@ class RecoveryHandler(MasterHandler):
# No interest.
continue
if uuid == INVALID_UUID:
if uuid is None:
# No interest.
continue
......
......@@ -21,7 +21,7 @@ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, DOWN_STATE
from neo.master.handlers import MasterHandler
from neo.exception import ElectionFailure, PrimaryFailure
from neo.protocol import UnexpectedPacketError, INVALID_UUID
from neo.protocol import UnexpectedPacketError
from neo.node import MasterNode
class SecondaryMasterHandler(MasterHandler):
......@@ -79,7 +79,7 @@ class PrimaryMasterHandler(MasterHandler):
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
......
......@@ -17,7 +17,7 @@
import logging
from neo import protocol
from neo.protocol import CLIENT_NODE_TYPE, ADMIN_NODE_TYPE, INVALID_UUID, \
from neo.protocol import CLIENT_NODE_TYPE, ADMIN_NODE_TYPE, \
RUNNING_STATE, STORAGE_NODE_TYPE, TEMPORARILY_DOWN_STATE, STOPPING
from neo.master.handlers import BaseServiceHandler
from neo import decorators
......@@ -60,7 +60,7 @@ class ShutdownHandler(BaseServiceHandler):
# No interest.
continue
if uuid == INVALID_UUID:
if uuid is None:
# No interest.
continue
......
......@@ -22,7 +22,7 @@ from neo.protocol import CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \
HIDDEN_STATE, INVALID_UUID, INTERNAL_ERROR_CODE
HIDDEN_STATE, INTERNAL_ERROR_CODE
from neo.master.handlers import BaseServiceHandler
from neo.protocol import UnexpectedPacketError
from neo.exception import OperationFailure
......
......@@ -21,7 +21,6 @@ from neo.protocol import CLIENT_NODE_TYPE, RUNNING_STATE, BROKEN_STATE, \
TEMPORARILY_DOWN_STATE, ADMIN_NODE_TYPE
from neo.master.handlers import MasterHandler
from neo.exception import VerificationFailure
from neo.protocol import INVALID_UUID
from neo.util import dump
class VerificationHandler(MasterHandler):
......@@ -41,7 +40,7 @@ class VerificationHandler(MasterHandler):
# No interest.
continue
if uuid == INVALID_UUID:
if uuid is None:
# No interest.
continue
......
......@@ -18,7 +18,7 @@
import logging
from neo.protocol import node_types, node_states
from neo.protocol import INVALID_UUID, INVALID_PTID
from neo.protocol import INVALID_PTID
from neo.event import EventManager
from neo.connection import ClientConnection
from neo.exception import OperationFailure
......@@ -118,7 +118,7 @@ def printPTAction(options):
if len(options):
uuid = bin(options.pop(0))
else:
uuid = INVALID_UUID
uuid = None
return protocol.askPartitionList(min_offset, max_offset, uuid)
action_dict = {
......
......@@ -165,7 +165,7 @@ class NodeManager(object):
return self.server_dict.get(server)
def getNodeByUUID(self, uuid):
if uuid in (None, protocol.INVALID_UUID):
if uuid is None:
return None
return self.uuid_dict.get(uuid)
......
......@@ -501,6 +501,31 @@ def _checkNodeType(type):
raise PacketMalformedError('invalide node type %d' % type)
return node_type
def _checkUUID(uuid):
if uuid == INVALID_UUID:
return None
return uuid
def _encodeUUID(uuid):
if uuid is None:
return INVALID_UUID
return uuid
def _checkOID(oid):
if oid == INVALID_OID:
return None
return oid
def _checkTID(tid):
if tid == INVALID_TID:
return None
return tid
def _checkPTID(ptid):
if ptid == INVALID_PTID:
return None
return ptid
def _readString(buffer, name, offset=0):
buffer = buffer[offset:]
(size, ) = unpack('!L', buffer[:4])
......@@ -534,6 +559,7 @@ def _decodeRequestNodeIdentification(body):
ip_address = inet_ntoa(ip_address)
(name, _) = _readString(body, 'name', offset=32)
node_type = _checkNodeType(node_type)
uuid = _checkUUID(uuid)
if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch')
return node_type, uuid, ip_address, port, name
......@@ -545,6 +571,8 @@ def _decodeAcceptNodeIdentification(body):
node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid = r
ip_address = inet_ntoa(ip_address)
node_type = _checkNodeType(node_type)
uuid = _checkUUID(uuid)
your_uuid == _checkUUID(uuid)
return (node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid)
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
......@@ -560,7 +588,9 @@ def _decodeAnswerPrimaryMaster(body):
for i in xrange(n):
ip_address, port, uuid = unpack('!4sH16s', body[20+i*22:42+i*22])
ip_address = inet_ntoa(ip_address)
uuid = _checkUUID(uuid)
known_master_list.append((ip_address, port, uuid))
primary_uuid = _checkUUID(primary_uuid)
return (primary_uuid, known_master_list)
decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
......@@ -584,6 +614,7 @@ def _decodeNotifyNodeInformation(body):
ip_address = inet_ntoa(ip_address)
node_type = _checkNodeType(node_type)
state = _checkNodeState(state)
uuid = _checkUUID(uuid)
node_list.append((node_type, ip_address, port, uuid, state))
return (node_list,)
decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
......@@ -621,6 +652,7 @@ def _decodeAnswerPartitionTable(body):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = partition_cell_states.get(state)
uuid = _checkUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
......@@ -640,6 +672,7 @@ def _decodeSendPartitionTable(body):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = partition_cell_states.get(state)
uuid = _checkUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
......@@ -653,6 +686,7 @@ def _decodeNotifyPartitionChanges(body):
for i in xrange(n):
(offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22])
state = partition_cell_states.get(state)
uuid = _checkUUID(uuid)
cell_list.append((offset, uuid, state))
return ptid, cell_list
decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
......@@ -893,7 +927,9 @@ decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
@handle_errors
def _decodeAskPartitionList(body):
return unpack('!LL16s', body) # min_offset, max_offset, uuid
(min_offset, max_offset, uuid) = unpack('!LL16s', body)
uuid = _checkUUID(uuid)
return (min_offset, max_offset, uuid)
decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList
@handle_errors
......@@ -909,6 +945,7 @@ def _decodeAnswerPartitionList(body):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = partition_cell_states.get(state)
uuid = _checkUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
......@@ -932,6 +969,7 @@ def _decodeAnswerNodeList(body):
ip_address = inet_ntoa(ip_address)
node_type = _checkNodeType(node_type)
state = _checkNodeState(state)
uuid = _checkUUID(uuid)
node_list.append((node_type, ip_address, port, uuid, state))
return (node_list,)
decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList
......@@ -940,6 +978,7 @@ decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList
def _decodeSetNodeState(body):
(uuid, state, modify) = unpack('!16sHB', body)
state = _checkNodeState(state)
uuid = _checkUUID(uuid)
return (uuid, state, modify)
decode_table[SET_NODE_STATE] = _decodeSetNodeState
......@@ -947,6 +986,7 @@ decode_table[SET_NODE_STATE] = _decodeSetNodeState
def _decodeAnswerNodeState(body):
(uuid, state) = unpack('!16sH', body)
state = _checkNodeState(state)
uuid = _checkUUID(uuid)
return (uuid, state)
decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState
......@@ -954,6 +994,7 @@ decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState
def _decodeAddPendingNodes(body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_checkUUID, uuid_list)
return (uuid_list, )
decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes
......@@ -961,6 +1002,7 @@ decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes
def _decodeAnswerNewNodes(body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_checkUUID, uuid_list)
return (uuid_list, )
decode_table[ANSWER_NEW_NODES] = _decodeAnswerNewNodes
......@@ -1031,12 +1073,15 @@ def pong():
return Packet(PONG)
def requestNodeIdentification(node_type, uuid, ip_address, port, name):
uuid = _encodeUUID(uuid)
body = pack('!LLH16s4sHL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, inet_aton(ip_address), port, len(name)) + name
return Packet(REQUEST_NODE_IDENTIFICATION, body)
def acceptNodeIdentification(node_type, uuid, ip_address,
port, num_partitions, num_replicas, your_uuid):
uuid = _encodeUUID(uuid)
your_uuid = _encodeUUID(your_uuid)
body = pack('!H16s4sHLL16s', node_type, uuid,
inet_aton(ip_address), port,
num_partitions, num_replicas, your_uuid)
......@@ -1046,9 +1091,11 @@ def askPrimaryMaster():
return Packet(ASK_PRIMARY_MASTER)
def answerPrimaryMaster(primary_uuid, known_master_list):
primary_uuid = _encodeUUID(primary_uuid)
body = [primary_uuid, pack('!L', len(known_master_list))]
for master in known_master_list:
body.append(pack('!4sH16s', inet_aton(master[0]), master[1], master[2]))
for address, port, uuid in known_master_list:
uuid = _encodeUUID(uuid)
body.append(pack('!4sH16s', inet_aton(address), port, uuid))
body = ''.join(body)
return Packet(ANSWER_PRIMARY_MASTER, body)
......@@ -1061,6 +1108,7 @@ def reelectPrimaryMaster():
def notifyNodeInformation(node_list):
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
uuid = _encodeUUID(uuid)
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
body = ''.join(body)
......@@ -1084,6 +1132,7 @@ def answerPartitionTable(ptid, row_list):
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(ANSWER_PARTITION_TABLE, body)
......@@ -1093,6 +1142,7 @@ def sendPartitionTable(ptid, row_list):
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(SEND_PARTITION_TABLE, body)
......@@ -1100,6 +1150,7 @@ def sendPartitionTable(ptid, row_list):
def notifyPartitionChanges(ptid, cell_list):
body = [pack('!8sL', ptid, len(cell_list))]
for offset, uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!L16sH', offset, uuid, state))
body = ''.join(body)
return Packet(NOTIFY_PARTITION_CHANGES, body)
......@@ -1248,6 +1299,7 @@ def answerOIDs(oid_list):
return Packet(ANSWER_OIDS, body)
def askPartitionList(min_offset, max_offset, uuid):
uuid = _encodeUUID(uuid)
body = [pack('!LL16s', min_offset, max_offset, uuid)]
body = ''.join(body)
return Packet(ASK_PARTITION_LIST, body)
......@@ -1257,6 +1309,7 @@ def answerPartitionList(ptid, row_list):
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(ANSWER_PARTITION_LIST, body)
......@@ -1269,30 +1322,33 @@ def askNodeList(node_type):
def answerNodeList(node_list):
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
uuid = _encodeUUID(uuid)
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
body = ''.join(body)
return Packet(ANSWER_NODE_LIST, body)
def setNodeState(uuid, state, modify_partition_table):
uuid = _encodeUUID(uuid)
body = [pack('!16sHB', uuid, state, modify_partition_table)]
body = ''.join(body)
return Packet(SET_NODE_STATE, body)
def answerNodeState(uuid, state):
uuid = _encodeUUID(uuid)
body = [pack('!16sH', uuid, state)]
body = ''.join(body)
return Packet(ANSWER_NODE_STATE, body)
def addPendingNodes(uuid_list=()):
# an empty list means all current pending nodes
uuid_list = [pack('!16s', uuid) for uuid in uuid_list]
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(ADD_PENDING_NODES, body)
def answerNewNodes(uuid_list):
# an empty list means no new nodes
uuid_list = [pack('!16s', uuid) for uuid in uuid_list]
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(ANSWER_NEW_NODES, body)
......@@ -1303,6 +1359,7 @@ def answerNodeInformation(node_list):
# XXX: copy-paste from notifyNodeInformation
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
uuid = _encodeUUID(uuid)
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
body = ''.join(body)
......
......@@ -23,7 +23,7 @@ from collections import deque
from neo.config import ConfigurationManager
from neo import protocol
from neo.protocol import TEMPORARILY_DOWN_STATE, INVALID_UUID, INVALID_PTID, \
from neo.protocol import TEMPORARILY_DOWN_STATE, INVALID_PTID, \
partition_cell_states, HIDDEN_STATE
from neo.node import NodeManager, MasterNode, StorageNode
from neo.event import EventManager
......@@ -81,9 +81,6 @@ class Application(object):
dm = self.dm
self.uuid = dm.getUUID()
if self.uuid is None:
self.uuid = INVALID_UUID
num_partitions = dm.getNumPartitions()
num_replicas = dm.getNumReplicas()
......
......@@ -20,7 +20,7 @@ import logging
from neo.handler import EventHandler
from neo import protocol
from neo.protocol import Packet, UnexpectedPacketError, \
INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE
from neo.util import dump
......@@ -104,11 +104,11 @@ class BaseMasterHandler(BaseStorageHandler):
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
if uuid is not None:
n = app.nm.getNodeByUUID(uuid)
if n is None:
n = app.nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
if n is not None and uuid is not None:
# node only exists by address, remove it
app.nm.remove(n)
n = None
......@@ -127,12 +127,12 @@ class BaseMasterHandler(BaseStorageHandler):
app.nm.add(n)
n.setState(state)
if uuid != INVALID_UUID:
if uuid is not None:
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
if uuid is None:
# No interest.
continue
......@@ -155,7 +155,7 @@ class BaseMasterHandler(BaseStorageHandler):
n.setState(state)
elif node_type == CLIENT_NODE_TYPE:
if uuid == INVALID_UUID:
if uuid is None:
# No interest.
continue
......
......@@ -19,7 +19,7 @@ import logging
from neo.storage.handlers import BaseMasterHandler
from neo.protocol import Packet, \
INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE, \
DISCARDED_STATE, OUT_OF_DATE_STATE, UnexpectedPacketError
......@@ -51,7 +51,7 @@ class HiddenHandler(BaseMasterHandler):
for node_type, ip_address, port, uuid, state in node_list:
if node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
if uuid == None:
# No interest.
continue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment