Commit 982abf85 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Replace the ip_address and port bu a tuple and set it to None instead of

('0.0.0.0', 0) default value. This fix a client reject since they are no more
referenced by the default address in the node manager.
This affect protocol message *NodeIdentification, *PrimaryMaster, *NodeList 
and notifyNodeInformation 


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@874 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 93c89093
......@@ -162,9 +162,8 @@ class MasterBaseEventHandler(EventHandler):
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
# Register/update nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid is not None:
......
......@@ -70,11 +70,10 @@ class BootstrapManager(EventHandler):
# Register new master nodes.
# TODO: this job should be done by the node manager
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
node = nm.getNodeByServer(addr)
for address, uuid in known_master_list:
node = nm.getNodeByServer(address)
if node is None:
node = MasterNode(server=addr)
node = MasterNode(server=address)
nm.add(node)
node.setUUID(uuid)
......@@ -90,10 +89,10 @@ class BootstrapManager(EventHandler):
logging.info('connected to a primary master node')
conn.ask(protocol.requestNodeIdentification(self.node_type,
self.uuid, self.server[0], self.server[1], self.name))
self.uuid, self.server, self.name))
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, num_replicas, your_uuid):
uuid, address, num_partitions, num_replicas, your_uuid):
self.num_partitions = num_partitions
self.num_replicas = num_replicas
if self.uuid != your_uuid:
......
......@@ -82,7 +82,7 @@ class ConnectionPool(object):
return None
p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
app.uuid, '0.0.0.0', 0, app.name)
app.uuid, None, app.name)
msg_id = conn.ask(app.local_var.queue, p)
finally:
conn.unlock()
......@@ -292,11 +292,10 @@ class Application(object):
if s_node is None or s_node.getNodeType() != protocol.STORAGE_NODE_TYPE:
return
s_uuid = s_node.getUUID()
ip_address, port = s_node.getServer()
m_conn = self._getMasterConnection()
m_conn.lock()
try:
node_list = [(protocol.STORAGE_NODE_TYPE, ip_address, port, s_uuid, s_node.getState())]
node_list = [(protocol.STORAGE_NODE_TYPE, s_node.getServer(), s_uuid, s_node.getState())]
m_conn.notify(protocol.notifyNodeInformation(node_list))
finally:
m_conn.unlock()
......@@ -440,7 +439,7 @@ class Application(object):
self.primary_master_node = None
break
p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name)
self.uuid, None, self.name)
msg_id = conn.ask(self.local_var.queue, p)
finally:
conn.unlock()
......
......@@ -34,8 +34,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
uuid, address, num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# this must be a master node
......@@ -46,12 +45,11 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
finally:
conn.release()
return
if conn.getAddress() != (ip_address, port):
if conn.getAddress() != address:
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
conn.getAddress()[0], conn.getAddress()[1], *address)
app.nm.remove(node)
conn.lock()
try:
......@@ -74,11 +72,10 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
known_master_list):
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
for address, uuid in known_master_list:
n = app.nm.getNodeByServer(address)
if n is None:
n = MasterNode(server = addr)
n = MasterNode(server=address)
app.nm.add(n)
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
......@@ -230,10 +227,9 @@ class PrimaryNotificationsHandler(BaseHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
nm = app.nm
for node_type, ip_address, port, uuid, state in node_list:
logging.debug("notified of %s %s %d %s %s" %(node_type, ip_address, port, dump(uuid), state))
for node_type, addr, uuid, state in node_list:
logging.debug("notified of %s %s %s %s" %(node_type, addr, dump(uuid), state))
# Register new nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid is not None:
......
......@@ -71,18 +71,18 @@ class StorageBootstrapHandler(AnswerBaseHandler):
app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, num_replicas, your_uuid):
uuid, address, num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
if node_type != STORAGE_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
if conn.getAddress() != address:
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
conn.getAddress()[0], conn.getAddress()[1], *address)
app.nm.remove(node)
conn.close()
return
......
......@@ -170,12 +170,11 @@ class EventHandler(object):
raise UnexpectedPacketError(message)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
uuid, address, name):
raise UnexpectedPacketError
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
uuid, address, num_partitions, num_replicas, your_uuid):
raise UnexpectedPacketError
def handlePing(self, conn, packet):
......
......@@ -93,7 +93,6 @@ class Application(object):
addr = self.server, connector_handler = self.connector_handler)
self.cluster_state = BOOTING
# Start the election of a primary master node.
self.electPrimary()
......@@ -272,16 +271,7 @@ class Application(object):
uuid = node.getUUID()
# The server address may be None.
addr = node.getServer()
if addr is None:
ip_address, port = None, None
else:
ip_address, port = addr
if ip_address is None:
ip_address = '0.0.0.0'
if port is None:
port = 0
address = node.getServer()
if node_type == CLIENT_NODE_TYPE:
# Only to master nodes and storage nodes.
......@@ -289,12 +279,12 @@ class Application(object):
if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID())
if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
node_list = [(node_type, ip_address, port, uuid, state)]
node_list = [(node_type, address, uuid, state)]
c.notify(protocol.notifyNodeInformation(node_list))
elif node.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
for c in self.em.getConnectionList():
if c.getUUID() is not None:
node_list = [(node_type, ip_address, port, uuid, state)]
node_list = [(node_type, address, uuid, state)]
c.notify(protocol.notifyNodeInformation(node_list))
elif node.getNodeType() != ADMIN_NODE_TYPE:
raise RuntimeError('unknown node type')
......@@ -336,11 +326,10 @@ class Application(object):
for n in self.nm.getNodeList():
if n.getNodeType() != ADMIN_NODE_TYPE:
try:
ip_address, port = n.getServer()
address = n.getServer()
except TypeError:
ip_address, port = '0.0.0.0', 0
node_list.append((n.getNodeType(), ip_address, port,
n.getUUID(), n.getState()))
address = None
node_list.append((n.getNodeType(), address, n.getUUID(), n.getState()))
# Split the packet if too huge.
if len(node_list) == 10000:
conn.notify(protocol.notifyNodeInformation(node_list))
......@@ -747,16 +736,16 @@ class Application(object):
for c in self.em.getConnectionList():
node = self.nm.getNodeByUUID(c.getUUID())
if node.getType() == CLIENT_NODE_TYPE:
ip_address, port = node.getServer()
node_list = [(node.getType(), ip_address, port, node.getUUID(), DOWN_STATE)]
node_list = [(node.getType(), node.getServer(),
node.getUUID(), DOWN_STATE)]
c.notify(protocol.notifyNodeInformation(node_list))
# then ask storages and master nodes to shutdown
logging.info("asking all remaining nodes to shutdown")
for c in self.em.getConnectionList():
node = self.nm.getNodeByUUID(c.getUUID())
if node.getType() in (STORAGE_NODE_TYPE, MASTER_NODE_TYPE):
ip_address, port = node.getServer()
node_list = [(node.getType(), ip_address, port, node.getUUID(), DOWN_STATE)]
node_list = [(node.getType(), node.getServer(),
node.getUUID(), DOWN_STATE)]
c.notify(protocol.notifyNodeInformation(node_list))
# then shutdown
sys.exit("Cluster has been asked to shut down")
......
......@@ -120,11 +120,11 @@ class MasterHandler(EventHandler):
else:
primary_uuid = None
known_master_list = [app.server + (app.uuid, )]
known_master_list = [(app.server, app.uuid, )]
for n in app.nm.getMasterNodeList():
if n.getState() == protocol.BROKEN_STATE:
continue
known_master_list.append(n.getServer() + (n.getUUID(), ))
known_master_list.append((n.getServer(), n.getUUID(), ))
conn.answer(protocol.answerPrimaryMaster(primary_uuid,
known_master_list), packet)
......@@ -149,7 +149,7 @@ class BaseServiceHandler(MasterHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
if node_type in (protocol.CLIENT_NODE_TYPE, protocol.ADMIN_NODE_TYPE):
# No interest.
continue
......@@ -167,7 +167,6 @@ class BaseServiceHandler(MasterHandler):
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
......
......@@ -33,13 +33,12 @@ class ElectionHandler(MasterHandler):
if uuid is None:
raise protocol.UnexpectedPacketError
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
if node_type != MASTER_NODE_TYPE:
# No interest.
continue
# Register new master nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
......@@ -120,22 +119,22 @@ class ClientElectionHandler(MasterHandler):
MasterHandler.peerBroken(self, conn)
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions,
uuid, address, num_partitions,
num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', ip_address, port)
logging.error('%s:%d is not a master node', *address)
app.nm.remove(node)
app.negotiating_master_node_set.discard(node.getServer())
conn.close()
return
if conn.getAddress() != (ip_address, port):
if conn.getAddress() != address:
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
conn.getAddress()[0], conn.getAddress()[1], *address)
app.nm.remove(node)
app.negotiating_master_node_set.discard(node.getServer())
conn.close()
......@@ -166,17 +165,16 @@ class ClientElectionHandler(MasterHandler):
return
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
if app.server == addr:
for address, uuid in known_master_list:
if app.server == address:
# This is self.
continue
else:
n = app.nm.getNodeByServer(addr)
n = app.nm.getNodeByServer(address)
if n is None:
n = MasterNode(server = addr)
n = MasterNode(server=address)
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
app.unconnected_master_node_set.add(address)
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
......@@ -204,7 +202,7 @@ class ClientElectionHandler(MasterHandler):
# Request a node idenfitication.
conn.ask(protocol.requestNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name))
app.uuid, app.server, app.name))
class ServerElectionHandler(MasterHandler):
......@@ -221,7 +219,7 @@ class ServerElectionHandler(MasterHandler):
MasterHandler.peerBroken(self, conn)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
uuid, address, name):
if conn.getConnector() is None:
# Connection can be closed by peer after he sent
# RequestNodeIdentification if he finds the primary master before
......@@ -235,12 +233,11 @@ class ServerElectionHandler(MasterHandler):
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
raise protocol.NotReadyError
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
node = app.nm.getNodeByServer(address)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
node = MasterNode(server=address, uuid=uuid)
app.nm.add(node)
app.unconnected_master_node_set.add(addr)
app.unconnected_master_node_set.add(address)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
......@@ -248,15 +245,14 @@ class ServerElectionHandler(MasterHandler):
raise protocol.BrokenNodeDisallowedError
# supplied another uuid in case of conflict
while not app.isValidUUID(uuid, addr):
while not app.isValidUUID(uuid, address):
uuid = app.getNewUUID(node_type)
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.pt.getPartitions(),
app.pt.getReplicas(), uuid)
app.server, app.pt.getPartitions(), app.pt.getReplicas(), uuid)
conn.answer(p, packet)
def handleAnnouncePrimaryMaster(self, conn, packet):
......
......@@ -27,28 +27,27 @@ class IdentificationHandler(MasterHandler):
logging.warning('lost a node in IdentificationHandler : %s' % node)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
uuid, address, name):
self.checkClusterName(name)
app, nm = self.app, self.app.nm
server = (ip_address, port)
node_by_uuid = nm.getNodeByUUID(uuid)
node_by_addr = nm.getNodeByServer(server)
node_by_addr = nm.getNodeByServer(address)
# handle conflicts and broken nodes
node = node_by_uuid or node_by_addr
if node_by_uuid is not None:
if node.getServer() == server:
if node.getServer() == address:
if node.getState() == protocol.BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError
# the node is still alive
node.setState(protocol.RUNNING_STATE)
if node.getServer() != server:
if node.getServer() != address:
if node.getState() == protocol.RUNNING_STATE:
# still running, reject this new node
raise protocol.ProtocolError('invalid server address')
# this node has changed its address
node.setServer(server)
node.setServer(address)
node.setState(protocol.RUNNING_STATE)
if node_by_uuid is None and node_by_addr is not None:
if node.getState() == protocol.RUNNING_STATE:
......@@ -57,7 +56,7 @@ class IdentificationHandler(MasterHandler):
# FIXME: here the node was known with a different uuid but with the
# same address, is it safe to forgot the old, even if he's not
# running ?
node.setServer(server)
node.setServer(address)
node.setState(protocol.RUNNING_STATE)
# ask the app the node identification, if refused, an exception is raised
......@@ -68,7 +67,7 @@ class IdentificationHandler(MasterHandler):
uuid = app.getNewUUID(node_type)
if node is None:
# new node
node = klass(uuid=uuid, server=(ip_address, port))
node = klass(uuid=uuid, server=address)
app.nm.add(node)
handler = handler(self.app)
# set up the node
......@@ -78,7 +77,7 @@ class IdentificationHandler(MasterHandler):
conn.setUUID(uuid)
conn.setHandler(handler)
# answer
args = (protocol.MASTER_NODE_TYPE, app.uuid, app.server[0], app.server[1],
args = (protocol.MASTER_NODE_TYPE, app.uuid, app.server,
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
conn.answer(protocol.acceptNodeIdentification(*args), packet)
# trigger the event
......
......@@ -34,7 +34,7 @@ class RecoveryHandler(MasterHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest.
continue
......@@ -52,7 +52,6 @@ class RecoveryHandler(MasterHandler):
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
......
......@@ -63,13 +63,12 @@ class PrimaryMasterHandler(MasterHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
if node_type != MASTER_NODE_TYPE:
# No interest.
continue
# Register new master nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
......@@ -86,12 +85,12 @@ class PrimaryMasterHandler(MasterHandler):
n.setUUID(uuid)
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions,
uuid, address, num_partitions,
num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
assert node_type == MASTER_NODE_TYPE
assert conn.getAddress() == (ip_address, port)
assert conn.getAddress() == address
if your_uuid != app.uuid:
# uuid conflict happened, accept the new one
......
......@@ -28,7 +28,7 @@ class ShutdownHandler(BaseServiceHandler):
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
uuid, address, name):
logging.error('reject any new connection')
raise protocol.ProtocolError('cluster is shutting down')
......@@ -55,7 +55,7 @@ class ShutdownHandler(BaseServiceHandler):
# do not care about these messages as we are shutting down all nodes
return
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest.
continue
......@@ -73,7 +73,6 @@ class ShutdownHandler(BaseServiceHandler):
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
......
......@@ -35,7 +35,7 @@ class VerificationHandler(MasterHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest.
continue
......@@ -53,7 +53,6 @@ class VerificationHandler(MasterHandler):
# What?! What happened to me?
raise RuntimeError, 'I was told that I am bad'
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
node = app.nm.getNodeByServer(addr)
......
......@@ -362,12 +362,10 @@ VALID_CELL_STATE_LIST = (UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE,
# Other constants.
INVALID_UUID = '\0' * 16
INVALID_TID = '\0' * 8
INVALID_OID = '\0' * 8
INVALID_PTID = '\0' * 8
INVALID_PARTITION = 0xffffffff
# TODO: delete those definitions when tests are fixed
INVALID_SERIAL = INVALID_TID
INVALID_OID = '\0' * 16
INVALID_PARTITION = 0xffffffff
STORAGE_NS = 'S'
MASTER_NS = 'M'
......@@ -498,9 +496,20 @@ def _checkNodeState(state):
def _checkNodeType(type):
node_type = node_types.get(type)
if node_type is None:
raise PacketMalformedError('invalide node type %d' % type)
raise PacketMalformedError('invalid node type %d' % type)
return node_type
def _checkAddress(address):
if address == '\0' * 6:
return None
(ip, port) = unpack('!4sH', address)
return (inet_ntoa(ip), port)
def _encodeAddress(address):
if address is None:
return '\0' * 6
return pack('!4sH', inet_aton(address[0]), address[1])
def _checkUUID(uuid):
if uuid == INVALID_UUID:
return None
......@@ -549,26 +558,26 @@ decode_table[PONG] = _decodePong
@handle_errors
def _decodeRequestNodeIdentification(body):
r = unpack('!LLH16s4sH', body[:32])
major, minor, node_type, uuid, ip_address, port = r
ip_address = inet_ntoa(ip_address)
r = unpack('!LLH16s6s', body[:32])
major, minor, node_type, uuid, address = r
address = _checkAddress(address)
(name, _) = _readString(body, 'name', offset=32)
node_type = _checkNodeType(node_type)
uuid = _checkUUID(uuid)
if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch')
return node_type, uuid, ip_address, port, name
return node_type, uuid, address, name
decode_table[REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification
@handle_errors
def _decodeAcceptNodeIdentification(body):
r = unpack('!H16s4sHLL16s', body)
node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid = r
ip_address = inet_ntoa(ip_address)
r = unpack('!H16s6sLL16s', body)
node_type, uuid, address, num_partitions, num_replicas, your_uuid = r
address = _checkAddress(address)
node_type = _checkNodeType(node_type)
uuid = _checkUUID(uuid)
your_uuid == _checkUUID(uuid)
return (node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid)
return (node_type, uuid, address, num_partitions, num_replicas, your_uuid)
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
@handle_errors
......@@ -581,10 +590,10 @@ def _decodeAnswerPrimaryMaster(body):
(primary_uuid, n) = unpack('!16sL', body[:20])
known_master_list = []
for i in xrange(n):
ip_address, port, uuid = unpack('!4sH16s', body[20+i*22:42+i*22])
ip_address = inet_ntoa(ip_address)
address, uuid = unpack('!6s16s', body[20+i*22:42+i*22])
address = _checkAddress(address)
uuid = _checkUUID(uuid)
known_master_list.append((ip_address, port, uuid))
known_master_list.append((address, uuid))
primary_uuid = _checkUUID(primary_uuid)
return (primary_uuid, known_master_list)
decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
......@@ -604,13 +613,13 @@ def _decodeNotifyNodeInformation(body):
(n,) = unpack('!L', body[:4])
node_list = []
for i in xrange(n):
r = unpack('!H4sH16sH', body[4+i*26:30+i*26])
node_type, ip_address, port, uuid, state = r
ip_address = inet_ntoa(ip_address)
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
node_type, address, uuid, state = r
address = _checkAddress(address)
node_type = _checkNodeType(node_type)
state = _checkNodeState(state)
uuid = _checkUUID(uuid)
node_list.append((node_type, ip_address, port, uuid, state))
node_list.append((node_type, address, uuid, state))
return (node_list,)
decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
......@@ -986,13 +995,13 @@ def _decodeAnswerNodeList(body):
(n,) = unpack('!L', body[:4])
node_list = []
for i in xrange(n):
r = unpack('!H4sH16sH', body[4+i*26:30+i*26])
node_type, ip_address, port, uuid, state = r
ip_address = inet_ntoa(ip_address)
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
node_type, address, uuid, state = r
address = _checkAddress(address)
node_type = _checkNodeType(node_type)
state = _checkNodeState(state)
uuid = _checkUUID(uuid)
node_list.append((node_type, ip_address, port, uuid, state))
node_list.append((node_type, address, uuid, state))
return (node_list,)
decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList
......@@ -1094,18 +1103,19 @@ def ping():
def pong():
return Packet(PONG)
def requestNodeIdentification(node_type, uuid, ip_address, port, name):
def requestNodeIdentification(node_type, uuid, address, name):
uuid = _encodeUUID(uuid)
body = pack('!LLH16s4sHL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, inet_aton(ip_address), port, len(name)) + name
address = _encodeAddress(address)
body = pack('!LLH16s6sL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, address, len(name)) + name
return Packet(REQUEST_NODE_IDENTIFICATION, body)
def acceptNodeIdentification(node_type, uuid, ip_address,
port, num_partitions, num_replicas, your_uuid):
def acceptNodeIdentification(node_type, uuid, address,
num_partitions, num_replicas, your_uuid):
uuid = _encodeUUID(uuid)
your_uuid = _encodeUUID(your_uuid)
body = pack('!H16s4sHLL16s', node_type, uuid,
inet_aton(ip_address), port,
address = _encodeAddress(address)
body = pack('!H16s6sLL16s', node_type, uuid, address,
num_partitions, num_replicas, your_uuid)
return Packet(ACCEPT_NODE_IDENTIFICATION, body)
......@@ -1115,9 +1125,10 @@ def askPrimaryMaster():
def answerPrimaryMaster(primary_uuid, known_master_list):
primary_uuid = _encodeUUID(primary_uuid)
body = [primary_uuid, pack('!L', len(known_master_list))]
for address, port, uuid in known_master_list:
for address, uuid in known_master_list:
uuid = _encodeUUID(uuid)
body.append(pack('!4sH16s', inet_aton(address), port, uuid))
address = _encodeAddress(address)
body.append(pack('!6s16s', address, uuid))
body = ''.join(body)
return Packet(ANSWER_PRIMARY_MASTER, body)
......@@ -1129,10 +1140,10 @@ def reelectPrimaryMaster():
def notifyNodeInformation(node_list):
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(NOTIFY_NODE_INFORMATION, body)
......@@ -1143,7 +1154,7 @@ def answerLastIDs(loid, ltid, lptid):
# XXX: this is a valid oid, an error should be returned instead of this
# packet when no last IDs are known
if loid is None:
loid = '\0' * 16
loid = INVALID_OID
if ltid is None:
ltid = INVALID_TID
lptid = _encodePTID(lptid)
......@@ -1366,10 +1377,10 @@ def askNodeList(node_type):
def answerNodeList(node_list):
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(ANSWER_NODE_LIST, body)
......@@ -1403,10 +1414,9 @@ def askNodeInformation():
def answerNodeInformation(node_list):
# XXX: copy-paste from notifyNodeInformation
body = [pack('!L', len(node_list))]
for node_type, ip_address, port, uuid, state in node_list:
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
uuid, state))
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(ANSWER_NODE_INFORMATION, body)
......
......@@ -34,12 +34,11 @@ class BaseStorageHandler(EventHandler):
pass
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
uuid, address, name):
raise NotImplementedError('this method must be overridden')
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
uuid, address, num_partitions, num_replicas, your_uuid):
raise NotImplementedError('this method must be overridden')
def handleAskLastIDs(self, conn, packet):
......@@ -100,8 +99,7 @@ class BaseMasterHandler(BaseStorageHandler):
uuid = conn.getUUID()
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
addr = (ip_address, port)
for node_type, addr, uuid, state in node_list:
# Try to retrieve it from nm
n = None
if uuid is not None:
......
......@@ -49,7 +49,7 @@ class HiddenHandler(BaseMasterHandler):
if node.getNodeType() != MASTER_NODE_TYPE:
return
for node_type, ip_address, port, uuid, state in node_list:
for node_type, addr, uuid, state in node_list:
if node_type == STORAGE_NODE_TYPE:
if uuid == None:
# No interest.
......@@ -74,12 +74,11 @@ class HiddenHandler(BaseMasterHandler):
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
uuid, address, name):
pass
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
uuid, address, num_partitions, num_replicas, your_uuid):
pass
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
......
......@@ -36,7 +36,7 @@ class IdentificationHandler(BaseStorageHandler):
logging.warning('lost a node in IdentificationEventHandler')
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
uuid, address, name):
self.checkClusterName(name)
# reject any incoming connections if not ready
if not self.app.ready:
......@@ -66,7 +66,7 @@ class IdentificationHandler(BaseStorageHandler):
conn.setHandler(handler)
conn.setUUID(uuid)
node.setUUID(uuid)
args = (STORAGE_NODE_TYPE, app.uuid, app.server[0], app.server[1],
args = (STORAGE_NODE_TYPE, app.uuid, app.server,
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# accept the identification and trigger an event
conn.answer(protocol.acceptNodeIdentification(*args), packet)
......
......@@ -45,8 +45,7 @@ class ReplicationHandler(BaseStorageHandler):
self.app.replicator.reset()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
uuid, address, num_partitions, num_replicas, your_uuid):
# Nothing to do.
pass
......
......@@ -180,7 +180,7 @@ class Replicator(object):
addr = addr,
connector_handler = app.connector_handler)
p = protocol.requestNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
app.server, app.name)
self.current_connection.ask(p)
self.tid_offset = 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment