Commit 773ec228 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Implement the PENDING storage node state, they start in this state and switch

to RUNNING state when requested by the admin node. This feature can be disabled
by setting neo.master.ENABLE_PENDING_NODES to False.
The neoctl syntax is : neoctl add (all | UUID+) 
and display UUIDs of added storage nodes.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@651 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5d10815c
......@@ -105,6 +105,7 @@ class AdminEventHandler(BaseEventHandler):
if node is None:
p = protocol.protocolError('invalid uuid')
conn.notify(p)
return
if node.getState() == state and modify_partition_table is False:
# no change
p = protocol.answerNodeState(node.getUUID(), node.getState())
......@@ -120,6 +121,22 @@ class AdminEventHandler(BaseEventHandler):
node = self.app.nm.getNodeByUUID(uuid)
p = protocol.answerNodeState(node.getUUID(), node.getState())
conn.answer(p, packet)
def handleAddPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list])
logging.info('Add nodes %s' % uuids)
uuid = conn.getUUID()
node = self.app.nm.getNodeByUUID(uuid)
# forward the request to primary
master_conn = self.app.master_conn
master_conn.ask(protocol.addPendingNodes(uuid_list))
self.app.nn_notified = False
while not self.app.nn_notified:
self.app.em.poll(1)
# forward the answer to neoctl
uuid_list = self.app.uuid_list
conn.answer(protocol.answerNewNodes(uuid_list), packet)
class MonitoringEventHandler(BaseEventHandler):
"""This class deals with events for monitoring cluster."""
......@@ -414,3 +431,8 @@ class MonitoringEventHandler(BaseEventHandler):
self.app.notified = True
def handleAnswerNewNodes(self, conn, packet, uuid_list):
self.app.uuid_list = uuid_list
self.app.nn_notified = True
......@@ -36,7 +36,7 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
ASK_OIDS, ANSWER_OIDS, \
ASK_OIDS, ANSWER_OIDS, ADD_PENDING_NODES, ANSWER_NEW_NODES, \
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE, ASK_PARTITION_LIST, ANSWER_PARTITION_LIST, ASK_NODE_LIST, \
......@@ -156,9 +156,11 @@ class EventHandler(object):
def unexpectedPacket(self, conn, packet, message=None, *args):
"""Handle an unexpected packet."""
if message is None:
message = 'unexpected packet type %s' % packet.getType()
message = 'unexpected packet type %s in %s' % (packet.getType(),
self.__class__.__name__)
else:
message = 'unexpected packet: %s' % message
message = 'unexpected packet: %s in %s' % (message,
self.__class__.__name__)
logging.info('%s', message)
conn.answer(protocol.protocolError(message), packet)
conn.abort()
......@@ -379,6 +381,13 @@ class EventHandler(object):
def handleAnswerNodeState(self, conn, packet, uuid, state):
raise UnexpectedPacketError
def handleAddPendingNodes(self, conn, packet, uuid_list):
raise UnexpectedPacketError
def handleAnswerNewNodes(self, conn, packet, uuid_list):
raise UnexpectedPacketError
# Error packet handlers.
# XXX: why answer a protocolError to another protocolError ?
......@@ -458,6 +467,8 @@ class EventHandler(object):
d[ANSWER_NODE_LIST] = self.handleAnswerNodeList
d[SET_NODE_STATE] = self.handleSetNodeState
d[ANSWER_NODE_STATE] = self.handleAnswerNodeState
d[ADD_PENDING_NODES] = self.handleAddPendingNodes
d[ANSWER_NEW_NODES] = self.handleAnswerNewNodes
self.packet_dispatch_table = d
......
# Set to False to include automatically new storage nodes in PT
ENABLE_PENDING_NODES = True
......@@ -40,6 +40,7 @@ from neo.master.secondary import SecondaryEventHandler
from neo.master.pt import PartitionTable
from neo.util import dump
from neo.connector import getConnectorHandler
from neo.master import ENABLE_PENDING_NODES
class Application(object):
"""The master node application."""
......@@ -616,10 +617,11 @@ class Application(object):
cell_list.extend(self.pt.tweak())
# And, add unused nodes.
node_list = self.pt.getNodeList()
for node in nm.getStorageNodeList():
if node.getState() == RUNNING_STATE and node not in node_list:
cell_list.extend(self.pt.addNode(node))
if not ENABLE_PENDING_NODES:
node_list = self.pt.getNodeList()
for node in nm.getStorageNodeList():
if node.getState() == RUNNING_STATE and node not in node_list:
cell_list.extend(self.pt.addNode(node))
# If anything changed, send the changes.
if cell_list:
......@@ -790,10 +792,9 @@ class Application(object):
return prefix + uuid
def isValidUUID(self, uuid, addr):
for node in self.nm.getNodeList():
if node.getUUID() == uuid and node.getServer() is not None \
and addr != node.getServer():
return False
node = self.nm.getNodeByUUID(uuid)
if node is not None and node.getServer() is not None and node.getServer() != addr:
return False
return uuid != self.uuid and uuid != INVALID_UUID
......
......@@ -74,6 +74,12 @@ class RecoveryEventHandler(MasterEventHandler):
logging.error('reject an alien cluster')
raise protocol.ProtocolError('invalid cluster name')
if node_type is STORAGE_NODE_TYPE and uuid is INVALID_UUID:
# refuse an empty storage node (with no UUID) to avoid potential
# UUID conflict
logging.info('reject empty storage node')
raise protocol.NotReadyError
# Here are many situations. In principle, a node should be identified by
# an UUID, since an UUID never change when moving a storage node to a different
# server, and an UUID always changes for a master node and a client node whenever
......@@ -83,6 +89,7 @@ class RecoveryEventHandler(MasterEventHandler):
# may claim a server address used by another node.
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if not app.isValidUUID(uuid, addr):
# Here we have an UUID conflict, assume that's a new node
node = None
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
......@@ -23,13 +23,14 @@ from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \
HIDDEN_STATE
HIDDEN_STATE, PENDING_STATE
from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.exception import OperationFailure, ElectionFailure
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.handler import identification_required, restrict_node_types
from neo.util import dump
from neo.master import ENABLE_PENDING_NODES
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
......@@ -174,6 +175,8 @@ class ServiceEventHandler(MasterEventHandler):
node = AdminNode(uuid = uuid)
else:
node = StorageNode(server = addr, uuid = uuid)
if ENABLE_PENDING_NODES:
node.setState(PENDING_STATE)
app.nm.add(node)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
......@@ -249,7 +252,7 @@ class ServiceEventHandler(MasterEventHandler):
conn.setUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE:
if not ENABLE_PENDING_NODES and node.getNodeType() == STORAGE_NODE_TYPE:
# If this is a storage node, add it into the partition table.
# Note that this does no harm, even if the node is not new.
if old_node is not None:
......@@ -291,8 +294,8 @@ class ServiceEventHandler(MasterEventHandler):
logging.info('sending partition table to %s:%d', *(conn.getAddress()))
app.sendPartitionTable(conn)
# If this is a storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE:
# If this is a non-pending storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE and node.getState() != PENDING_STATE:
conn.notify(protocol.startOperation())
@identification_required
......@@ -617,5 +620,38 @@ class ServiceEventHandler(MasterEventHandler):
ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list)
def handleAddPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list])
logging.debug('Add nodes %s' % uuids)
app, nm, em, pt = self.app, self.app.nm, self.app.em, self.app.pt
cell_list = []
uuid_set = set()
# take all pending nodes
for node in nm.getStorageNodeList():
if node.getState() == PENDING_STATE:
uuid_set.add(node.getUUID())
# keep only selected nodes
if uuid_list:
uuid_set = uuid_set.intersection(set(uuid_list))
# nothing to do
if not uuid_set:
logging.warning('No nodes added')
conn.answer(protocol.answerNewNodes(()), packet)
return
uuids = ', '.join([dump(uuid) for uuid in uuid_set])
logging.info('Adding nodes %s' % uuids)
# switch nodes to running state
for uuid in uuid_set:
node = nm.getNodeByUUID(uuid)
new_cells = pt.addNode(node)
cell_list.extend(new_cells)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
# start nodes
for s_conn in em.getConnectionList():
if s_conn.getUUID() in uuid_set:
s_conn.notify(protocol.startOperation())
# broadcast the new partition table
app.broadcastPartitionChanges(app.getNextPartitionTableID(), cell_list)
conn.answer(protocol.answerNewNodes(list(uuid_set)), packet)
......@@ -21,6 +21,7 @@ import logging
from mock import Mock
from struct import pack, unpack
from neo.tests.base import NeoTestBase
import neo.master
from neo import protocol
from neo.protocol import Packet, INVALID_UUID
from neo.master.service import ServiceEventHandler
......@@ -58,6 +59,7 @@ class MasterServiceTests(NeoTestBase):
self.client_address = ('127.0.0.1', self.client_port)
self.storage_address = ('127.0.0.1', self.storage_port)
def tearDown(self):
NeoTestBase.tearDown(self)
......
......@@ -20,6 +20,7 @@ import unittest
import logging
from mock import Mock
from struct import pack, unpack
import neo
from neo.tests.base import NeoTestBase
from neo.protocol import Packet, INVALID_UUID
from neo.master.verification import VerificationEventHandler
......@@ -82,6 +83,7 @@ class MasterVerificationTests(NeoTestBase):
# test alien cluster
conn = self.getFakeConnection()
self.verification.handleRequestNodeIdentification(conn, packet, *args)
self.app.nm.getNodeByServer((ip, port)).setState(RUNNING_STATE)
self.checkAcceptNodeIdentification(conn)
return uuid
......
......@@ -20,7 +20,7 @@ import logging
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
ADMIN_NODE_TYPE
PENDING_STATE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler
from neo.exception import VerificationFailure, ElectionFailure
from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
......@@ -127,7 +127,9 @@ class VerificationEventHandler(MasterEventHandler):
elif node_type == ADMIN_NODE_TYPE:
node = AdminNode(uuid = uuid)
else:
# empty storage nodes starts in PENDING state
node = StorageNode(server = addr, uuid = uuid)
node.setState(PENDING_STATE)
app.nm.add(node)
app.broadcastNodeInformation(node)
else:
......@@ -153,7 +155,11 @@ class VerificationEventHandler(MasterEventHandler):
app.broadcastNodeInformation(node)
# And insert a new one.
node.setUUID(uuid)
node.setState(RUNNING_STATE)
if node_type is STORAGE_NODE_TYPE:
# empty node
node.setState(PENDING_STATE)
else:
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# I know this node by the UUID.
......
......@@ -122,6 +122,12 @@ class Application(object):
p = protocol.setClusterState(name, state)
else:
return "unknown command options"
elif command == "add":
if len(options) == 1 and options[0] == 'all':
uuid_list = []
else:
uuid_list = [bin(opt) for opt in options]
p = protocol.addPendingNodes(uuid_list)
else:
return "unknown command"
......
......@@ -48,20 +48,20 @@ class CommandEventHandler(EventHandler):
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
raise OperationFailure, "impossible to connect to admin node %s" %conn.getAddress()
raise OperationFailure, "impossible to connect to admin node %s:%d" % conn.getAddress()
def timeoutExpired(self, conn):
EventHandler.timeoutExpired(self, conn)
raise OperationFailure, "connection to admin node %s timeout" %conn.getAddress()
raise OperationFailure, "connection to admin node %s:%d timeout" % conn.getAddress()
def connectionClosed(self, conn):
if self.app.trying_admin_node:
raise OperationFailure, "cannot connect to admin node %s:%d" %conn.getAddress()
raise OperationFailure, "cannot connect to admin node %s:%d" % conn.getAddress()
EventHandler.connectionClosed(self, conn)
def peerBroken(self, conn):
EventHandler.peerBroken(self, conn)
raise OperationFailure, "connect to admin node %s broken" %conn.getAddress()
raise OperationFailure, "connect to admin node %s:%d broken" % conn.getAddress()
def handleAnswerPartitionList(self, conn, packet, ptid, row_list):
data = ""
......@@ -86,3 +86,7 @@ class CommandEventHandler(EventHandler):
def handleAnswerNodeState(self, conn, packet, uuid, state):
self.app.result = "Node %s set to state %s" %(dump(uuid), state)
def handleAnswerNewNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list])
self.app.result = 'New storage nodes : %s' % uuids
......@@ -18,8 +18,9 @@
from time import time
import logging
from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, VALID_NODE_STATE_LIST, ADMIN_NODE_TYPE
from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
BROKEN_STATE, PENDING_STATE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
CLIENT_NODE_TYPE, VALID_NODE_STATE_LIST, ADMIN_NODE_TYPE
from neo.util import dump
class Node(object):
......
......@@ -287,6 +287,12 @@ packet_types = Enum({
# Answer state of the cluster
'ANSWER_CLUSTER_STATE': 0x8024,
# Ask the primary to include some pending node in the partition table
'ADD_PENDING_NODES': 0x0025,
# Anwer what are the nodes added in the partition table
'ANSWER_NEW_NODES': 0x8025,
})
# Error codes.
......@@ -316,9 +322,11 @@ node_states = Enum({
'DOWN_STATE': 2,
'BROKEN_STATE': 3,
'HIDDEN_STATE' : 4,
'PENDING_STATE': 5,
})
VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, HIDDEN_STATE)
VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE,
BROKEN_STATE, HIDDEN_STATE, PENDING_STATE)
# Partition cell states.
partition_cell_states = Enum({
......@@ -1049,6 +1057,28 @@ def _decodeAnswerClusterState(body):
return (uuid, state)
decode_table[ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState
def _decodeAddPendingNodes(body):
try:
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
except:
raise
raise ProtocolError(self, 'invalide add pending nodes')
return (uuid_list, )
decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes
def _decodeAnswerNewNodes(body):
try:
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
except:
raise
raise ProtocolError(self, 'invalide answer new nodes')
return (uuid_list, )
decode_table[ANSWER_NEW_NODES] = _decodeAnswerNewNodes
# Packet encoding
def _error(error_code, error_message):
......@@ -1343,3 +1373,15 @@ def answerClusterState(state):
body = [pack('!H', uuid, state)]
body = ''.join(body)
return Packet(ANSWER_CLUSTER_STATE, body)
def addPendingNodes(uuid_list=()):
# an empty list means all current pending nodes
uuid_list = [pack('!16s', uuid) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(ADD_PENDING_NODES, body)
def answerNewNodes(uuid_list):
# an empty list means no new nodes
uuid_list = [pack('!16s', uuid) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(ANSWER_NEW_NODES, body)
......@@ -19,7 +19,7 @@ import logging
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
BROKEN_STATE, VALID_CELL_STATE_LIST, HIDDEN_STATE
BROKEN_STATE, VALID_CELL_STATE_LIST, HIDDEN_STATE, PENDING_STATE
from neo.util import dump, u64
from neo.locking import RLock
......@@ -166,10 +166,12 @@ class PartitionTable(object):
TEMPORARILY_DOWN_STATE: 'T',
DOWN_STATE: 'D',
BROKEN_STATE: 'B',
HIDDEN_STATE: 'H'}
HIDDEN_STATE: 'H',
PENDING_STATE: 'P'}
cell_state_dict = { UP_TO_DATE_STATE: 'U',
OUT_OF_DATE_STATE: 'O',
FEEDING_STATE: 'F' }
FEEDING_STATE: 'F',
DISCARDED_STATE: 'D'}
node_list = self.count_dict.keys()
node_list.sort()
node_dict = {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment