Commit ef71d454 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Implement the UUID generation from the primary master node only.

A field is added to the acceptNodeidentification packet to send an UUID to node
that is connecting. UUID generator is put in the master app.py and called from
election.py, recovery.py and verification.py. Storage node start with an
INVALID_UUID or the previous if exists.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@416 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 57fadf09
......@@ -199,7 +199,8 @@ class Application(object):
self.primary_master_node = None
self.master_node_list = master_nodes.split(' ')
self.master_conn = None
self.uuid = None
# no self-assigned UUID, primary master will supply us one
self.uuid = INVALID_UUID
self.mq_cache = MQ()
self.new_oid_list = []
self.ptid = None
......@@ -235,16 +236,10 @@ class Application(object):
lock = Lock()
self._connecting_to_master_node_acquire = lock.acquire
self._connecting_to_master_node_release = lock.release
# XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID.
if self.uuid is None:
while 1:
uuid = os.urandom(16)
if uuid != INVALID_UUID:
break
self.uuid = uuid
# Connect to master node
self.connectToPrimaryMasterNode()
if self.uuid == INVALID_UUID:
raise NEOStorageError('No UUID given from the primary master')
def getQueue(self):
try:
......
......@@ -315,7 +315,7 @@ class ClientAnswerEventHandler(BaseClientEventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
......@@ -342,6 +342,10 @@ class ClientAnswerEventHandler(BaseClientEventHandler):
app.num_partitions = num_partitions
app.num_replicas = num_replicas
if your_uuid != INVALID_UUID:
# got an uuid from the primary master
app.uuid = your_uuid
# Ask a primary master.
conn.lock()
try:
......
......@@ -130,7 +130,7 @@ class EventHandler(object):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
num_partitions, num_replicas, your_uuid):
self.handleUnexpectedPacket(conn, packet)
def handlePing(self, conn, packet):
......
......@@ -24,7 +24,8 @@ from neo.config import ConfigurationManager
from neo.protocol import Packet, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \
CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE
CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
UUID_NAMESPACES
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection, ServerConnection
......@@ -67,9 +68,8 @@ class Application(object):
self.primary = None
self.primary_master_node = None
# XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID.
self.uuid = self.getNewUUID()
# Generate an UUID for self
self.uuid = self.getNewUUID(MASTER_NODE_TYPE)
# The last OID.
self.loid = INVALID_OID
......@@ -783,8 +783,21 @@ class Application(object):
def getNewOIDList(self, num_oids):
return [self.getNextOID() for i in xrange(num_oids)]
def getNewUUID(self):
uuid = INVALID_UUID
while uuid == INVALID_UUID:
uuid = os.urandom(16)
return uuid
def getNewUUID(self, node_type):
# build an UUID
uuid = os.urandom(15)
while uuid == INVALID_UUID[1:]:
uuid = os.urandom(15)
# look for the prefix
prefix = UUID_NAMESPACES.get(node_type, None)
if prefix is None:
raise RuntimeError, 'No UUID namespace found for this node type'
return prefix + uuid
def isValidUUID(self, uuid, addr):
for node in self.nm.getNodeList():
if addr != node.getServer() and node.getUUID() == uuid:
return False
return uuid != self.uuid and uuid != INVALID_UUID
......@@ -90,7 +90,7 @@ class ElectionEventHandler(MasterEventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions,
num_replicas):
num_replicas, your_uuid):
if not conn.isListeningConnection():
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
......@@ -111,6 +111,11 @@ class ElectionEventHandler(MasterEventHandler):
conn.close()
return
if your_uuid != app.uuid:
# uuid conflict happened, accept the new one and restart election
app.uuid = your_uuid
raise ElectionFailure, 'new uuid supplied'
conn.setUUID(uuid)
node.setUUID(uuid)
......@@ -203,14 +208,18 @@ class ElectionEventHandler(MasterEventHandler):
conn.abort()
return
# Trust the UUID sent by the peer.
# supplied another uuid in case of conflict
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas)
app.num_partitions, app.num_replicas,
uuid)
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
......
......@@ -62,8 +62,8 @@ class RecoveryEventHandler(MasterEventHandler):
def packetReceived(self, conn, packet):
MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
def handleRequestNodeIdentification(self, conn, packet, node_type, uuid,
ip_address, port, name):
app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
logging.info('reject a connection from a client')
......@@ -84,7 +84,11 @@ class RecoveryEventHandler(MasterEventHandler):
#
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
addr = (ip_address, port)
# generate a new uuid for this node
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is None:
......@@ -169,7 +173,7 @@ class RecoveryEventHandler(MasterEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas)
app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
......
......@@ -67,7 +67,7 @@ class SecondaryEventHandler(MasterEventHandler):
return
# Add a node only if it is a master node and I do not know it yet.
if node_type == MASTER_NODE_TYPE:
if node_type == MASTER_NODE_TYPE and uuid != INVALID_UUID:
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
......@@ -82,7 +82,8 @@ class SecondaryEventHandler(MasterEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas)
app.num_partitions, app.num_replicas,
uuid)
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
......
......@@ -156,6 +156,9 @@ class ServiceEventHandler(MasterEventHandler):
# However, master nodes can be known only as the server addresses.
# And, a node may claim a server address used by another node.
addr = (ip_address, port)
# generate a new uuid for this node
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
old_node = None
......@@ -285,7 +288,7 @@ class ServiceEventHandler(MasterEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas)
app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
......
......@@ -107,7 +107,12 @@ class VerificationEventHandler(MasterEventHandler):
#
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
addr = (ip_address, port)
# generate a new uuid for this node
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is None:
......@@ -192,7 +197,7 @@ class VerificationEventHandler(MasterEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas)
app.num_partitions, app.num_replicas, uuid)
conn.addPacket(p)
# Next, the peer should ask a primary master node.
conn.expectMessage()
......
......@@ -301,6 +301,16 @@ INVALID_OID = '\0\0\0\0\0\0\0\0'
INVALID_PTID = '\0\0\0\0\0\0\0\0'
INVALID_PARTITION = 0xffffffff
STORAGE_NS = 'S'
MASTER_NS = 'M'
CLIENT_NS = 'C'
UUID_NAMESPACES = {
STORAGE_NODE_TYPE: STORAGE_NS,
MASTER_NODE_TYPE: MASTER_NS,
CLIENT_NODE_TYPE: CLIENT_NS,
}
class ProtocolError(Exception): pass
class Packet(object):
......@@ -401,12 +411,12 @@ class Packet(object):
return self
def acceptNodeIdentification(self, msg_id, node_type, uuid, ip_address,
port, num_partitions, num_replicas):
port, num_partitions, num_replicas, your_uuid):
self._id = msg_id
self._type = ACCEPT_NODE_IDENTIFICATION
self._body = pack('!H16s4sHLL', node_type, uuid,
self._body = pack('!H16s4sHLL16s', node_type, uuid,
inet_aton(ip_address), port,
num_partitions, num_replicas)
num_partitions, num_replicas, your_uuid)
return self
def askPrimaryMaster(self, msg_id):
......@@ -780,14 +790,14 @@ class Packet(object):
def _decodeAcceptNodeIdentification(self):
try:
node_type, uuid, ip_address, port, num_partitions, num_replicas \
= unpack('!H16s4sHLL', self._body)
node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid \
= unpack('!H16s4sHLL16s', self._body)
ip_address = inet_ntoa(ip_address)
except:
raise ProtocolError(self, 'invalid accept node identification')
if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type)
return node_type, uuid, ip_address, port, num_partitions, num_replicas
return node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
def _decodeAskPrimaryMaster(self):
......
......@@ -75,14 +75,7 @@ class Application(object):
self.uuid = dm.getUUID()
if self.uuid is None:
# XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID.
while 1:
uuid = os.urandom(16)
if uuid != INVALID_UUID:
break
self.uuid = uuid
dm.setUUID(uuid)
self.uuid = INVALID_UUID
self.num_partitions = dm.getNumPartitions()
......@@ -138,6 +131,8 @@ class Application(object):
while 1:
self.operational = False
self.connectToPrimaryMaster()
if self.uuid == INVALID_UUID:
raise RuntimeError, 'No UUID supplied from the primary master'
try:
while 1:
try:
......
......@@ -146,7 +146,7 @@ class BootstrapEventHandler(StorageEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
0, 0)
0, 0, uuid)
conn.addPacket(p)
# Now the master node should know that I am not the right one.
......@@ -154,7 +154,7 @@ class BootstrapEventHandler(StorageEventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
num_partitions, num_replicas, your_uuid):
if conn.isListeningConnection():
self.handleUnexpectedPacket(conn, packet)
else:
......@@ -187,6 +187,10 @@ class BootstrapEventHandler(StorageEventHandler):
elif app.num_replicas != num_replicas:
raise RuntimeError('the number of replicas is inconsistent')
if your_uuid != INVALID_UUID and app.uuid != your_uuid:
# got an uuid from the primary master
app.uuid = your_uuid
conn.setUUID(uuid)
node.setUUID(uuid)
......
......@@ -41,7 +41,7 @@ class StorageEventHandler(EventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
num_partitions, num_replicas, your_uuid):
raise NotImplementedError('this method must be overridden')
def handleAskPrimaryMaster(self, conn, packet):
......
......@@ -176,7 +176,8 @@ class OperationEventHandler(StorageEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas)
app.num_partitions, app.num_replicas,
uuid)
conn.addPacket(p)
if node_type == MASTER_NODE_TYPE:
......@@ -184,7 +185,7 @@ class OperationEventHandler(StorageEventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
num_partitions, num_replicas, your_uuid):
if not conn.isListeningConnection():
raise NotImplementedError
else:
......
......@@ -74,7 +74,7 @@ class ReplicationEventHandler(StorageEventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
num_partitions, num_replicas, your_uuid):
# Nothing to do.
pass
......
......@@ -100,7 +100,7 @@ class VerificationEventHandler(StorageEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas)
uuid)
conn.addPacket(p)
# Now the master node should know that I am not the right one.
......@@ -108,7 +108,7 @@ class VerificationEventHandler(StorageEventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
num_partitions, num_replicas, your_uuid):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment