Commit ef5fc508 by Julien Muchembled

Make the number of replicas modifiable when the cluster is running

neoctl gets a new command to change the number of replicas.

The number of replicas becomes a new partition table attribute and
like the PT id, it is stored in the config table. On the other side,
the configuration value for the number of partitions is dropped,
since it can be computed from the partition table, which is
always stored in full.

The -p/-r master options now only apply at database creation.

Some implementation notes:

- The protocol is slightly optimized in that the master now sends
  automatically the whole partition tables to the admin & client
  nodes upon connection, like for storage nodes.
  This makes the protocol more consistent, and the master is the
  only remaining node requesting partition tables, during recovery.

- Some parts become tricky because app.pt can be None in more cases.
  For example, the extra condition in NodeManager.update
  (before app.pt.dropNode) was added for this is the reason.
  Or the 'loadPartitionTable' method (storage) that is not inlined
  because of unit tests.
  Overall, this commit simplifies more than it complicates.

- In the master handlers, we stop hijacking the 'connectionCompleted'
  method for tasks to be performed (often send the full partition
  table) on handler switches.

- The admin's 'bootstrapped' flag could have been removed earlier:
  race conditions can't happen since the AskNodeInformation packet
  was removed (commit d048a52d).
1 parent 27e3f620
......@@ -21,7 +21,6 @@ from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger
......@@ -66,7 +65,6 @@ class Application(BaseApplication):
super(Application, self).close()
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
......@@ -117,39 +115,17 @@ class Application(BaseApplication):
self.cluster_state = None
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.ADMIN, self.server)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
if self.pt is None:
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
row_list = []
if max_offset == 0:
max_offset = self.pt.getPartitions()
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
row_list = map(self.pt.getRow, xrange(min_offset, max_offset))
except IndexError:
conn.send(Errors.ProtocolError('invalid partition table offset'))
else:
......
......@@ -17,11 +17,12 @@
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
if self.app.master_conn is not None:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
......@@ -74,6 +75,7 @@ class AdminEventHandler(EventHandler):
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
setNumReplicas = forward_ask(Packets.SetNumReplicas)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
......@@ -112,16 +114,12 @@ class MasterEventHandler(EventHandler):
def answerClusterState(self, conn, state):
self.app.cluster_state = state
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
......
......@@ -244,7 +244,6 @@ class Application(ThreadedApplication):
# operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again.
logging.info('Initializing from master')
ask(conn, Packets.AskPartitionTable(), handler=handler)
ask(conn, Packets.AskLastTransaction(), handler=handler)
if self.pt.operational():
break
......
......@@ -26,10 +26,6 @@ from ..exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def answerPartitionTable(self, conn, ptid, row_list):
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerLastTransaction(*args):
pass
......@@ -42,9 +38,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
except PrimaryElected, e:
self.app.primary_master_node, = e.args
def _acceptIdentification(self, node, num_partitions, num_replicas):
self.app.pt = PartitionTable(num_partitions, num_replicas)
def answerLastTransaction(self, conn, ltid):
app = self.app
app_last_tid = app.__dict__.get('last_tid', '')
......@@ -134,9 +127,12 @@ class PrimaryNotificationsHandler(MTEventHandler):
finally:
app._cache_lock_release()
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryNotificationsHandler, self).notifyNodeInformation(
......
......@@ -36,8 +36,6 @@ class BootstrapManager(EventHandler):
self.devpath = devpath
self.new_nid = new_nid
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
app.nm.reset()
uuid = property(lambda self: self.app.uuid)
......@@ -54,10 +52,8 @@ class BootstrapManager(EventHandler):
def connectionLost(self, conn, new_state):
self.current = None
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.current is node, (self.current, node)
self.num_partitions = num_partitions
self.num_replicas = num_replicas
def getPrimaryConnection(self):
"""
......@@ -74,8 +70,7 @@ class BootstrapManager(EventHandler):
try:
while self.current:
if self.current.isIdentified():
return (self.current, self.current.getConnection(),
self.num_partitions, self.num_replicas)
return self.current, self.current.getConnection()
poll(1)
except PrimaryElected, e:
if self.current:
......
......@@ -160,8 +160,7 @@ class EventHandler(object):
def _acceptIdentification(*args):
pass
def acceptIdentification(self, conn, node_type, uuid,
num_partitions, num_replicas, your_uuid):
def acceptIdentification(self, conn, node_type, uuid, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
assert node.getConnection() is conn, (node.getConnection(), conn)
......@@ -180,7 +179,7 @@ class EventHandler(object):
elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids')
node.setIdentified()
self._acceptIdentification(node, num_partitions, num_replicas)
self._acceptIdentification(node)
return
conn.close()
......
......@@ -486,7 +486,7 @@ class NodeManager(EventQueue):
# For the first notification, we receive a full list of nodes from
# the master. Remove all unknown nodes from a previous connection.
for node in self._node_set.difference(added_list):
if app.pt.dropNode(node):
if not node.isStorage() or app.pt.dropNode(node):
self.remove(node)
self.log()
self.executeQueuedEvents()
......
......@@ -616,10 +616,7 @@ PFCellList = PList('cell_list',
)
PFRowList = PList('row_list',
PStruct('row',
PNumber('offset'),
PFCellList,
),
PFCellList,
)
PFHistoryList = PList('history_list',
......@@ -694,8 +691,6 @@ class RequestIdentification(Packet):
_answer = PStruct('accept_identification',
PFNodeType,
PUUID('my_uuid'),
PNumber('num_partitions'),
PNumber('num_replicas'),
PUUID('your_uuid'),
)
......@@ -751,23 +746,24 @@ class LastIDs(Packet):
class PartitionTable(Packet):
"""
Ask storage node the remaining data needed by master to recover.
This is also how the clients get the full partition table on connection.
:nodes: M -> S; C -> M
:nodes: M -> S
"""
_answer = PStruct('answer_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
class NotifyPartitionTable(Packet):
"""
Send the full partition table to admin/storage nodes on connection.
Send the full partition table to admin/client/storage nodes on connection.
:nodes: M -> A, S
:nodes: M -> A, C, S
"""
_fmt = PStruct('send_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
......@@ -779,6 +775,7 @@ class PartitionChanges(Packet):
"""
_fmt = PStruct('notify_partition_changes',
PPTID('ptid'),
PNumber('num_replicas'),
PList('cell_list',
PStruct('cell',
PNumber('offset'),
......@@ -1271,6 +1268,18 @@ class NotifyNodeInformation(Packet):
PFNodeList,
)
class SetNumReplicas(Packet):
"""
Set the number of replicas.
:nodes: ctl -> A -> M
"""
_fmt = PStruct('set_num_replicas',
PNumber('num_replicas'),
)
_answer = Error
class SetClusterState(Packet):
"""
Set the cluster state.
......@@ -1766,6 +1775,8 @@ class Packets(dict):
AddPendingNodes, ignore_when_closed=False)
TweakPartitionTable = register(
TweakPartitionTable, ignore_when_closed=False)
SetNumReplicas = register(
SetNumReplicas, ignore_when_closed=False)
SetClusterState = register(
SetClusterState, ignore_when_closed=False)
Repair = register(
......
......@@ -86,15 +86,9 @@ class PartitionTable(object):
'a cell became non-readable whereas all cells were readable'
def __init__(self, num_partitions, num_replicas):
self._id = None
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
# Note: don't use [[]] * num_partition construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(num_partitions)]
self.count_dict = {}
self.clear()
def getID(self):
return self._id
......@@ -113,7 +107,7 @@ class PartitionTable(object):
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(self.np)]
self.count_dict.clear()
self.count_dict = {}
def getAssignedPartitionList(self, uuid):
""" Return the partition assigned to the specified UUID """
......@@ -203,31 +197,31 @@ class PartitionTable(object):
del self.count_dict[node]
return not count
def load(self, ptid, row_list, nm):
def _load(self, ptid, num_replicas, row_list, getByUUID):
self.__init__(len(row_list), num_replicas)
self._id = ptid
for offset, row in enumerate(row_list):
for uuid, state in row:
node = getByUUID(uuid)
self._setCell(offset, node, state)
def load(self, ptid, num_replicas, row_list, nm):
"""
Load the partition table with the specified PTID, discard all previous
content.
"""
self.clear()
self._id = ptid
for offset, row in row_list:
if offset >= self.getPartitions():
raise IndexError
for uuid, state in row:
node = nm.getByUUID(uuid)
# the node must be known by the node manager
assert node is not None
self._setCell(offset, node, state)
self._load(ptid, num_replicas, row_list, nm.getByUUID)
logging.debug('partition table loaded (ptid=%s)', ptid)
self.log()
def update(self, ptid, cell_list, nm):
def update(self, ptid, num_replicas, cell_list, nm):
"""
Update the partition with the cell list supplied. If a node
is not known, it is created in the node manager and set as unavailable
"""
assert self._id < ptid, (self._id, ptid)
self._id = ptid
self.nr = num_replicas
readable_list = []
for row in self.partition_list:
if not all(cell.isReadable() for cell in row):
......@@ -310,14 +304,11 @@ class PartitionTable(object):
return True
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return []
return [(cell.getUUID(), cell.getState()) for cell in row]
return [(cell.getUUID(), cell.getState())
for cell in self.partition_list[offset]]
def getRowList(self):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
return map(self.getRow, xrange(self.np))
class MTPartitionTable(PartitionTable):
""" Thread-safe aware version of the partition table, override only methods
......
......@@ -16,6 +16,7 @@
import sys
from collections import defaultdict
from functools import partial
from time import time
from neo.lib import logging, util
......@@ -76,13 +77,11 @@ class Application(BaseApplication):
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Master node"
parser = cls.option_parser
parser.description = "NEO Master node"
cls.addCommonServerOptions('master', '127.0.0.1:10000', '')
_ = _.group('master')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
_ = parser.group('master')
_.int('A', 'autostart',
help="minimum number of pending storage nodes to automatically"
" start new cluster (to avoid unwanted recreation of the"
......@@ -94,6 +93,10 @@ class Application(BaseApplication):
_.int('i', 'nid',
help="specify an NID to use for this process (testing purpose)")
_ = parser.group('database creation')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
def __init__(self, config):
super(Application, self).__init__(
config.get('ssl'), config.get('dynamic_master_list'))
......@@ -117,14 +120,14 @@ class Application(BaseApplication):
replicas = config['replicas']
partitions = config['partitions']
if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
sys.exit('replicas must be a positive integer')
if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas)
sys.exit('partitions must be more than zero')
logging.info('Configuration:')
logging.info('Partitions: %d', partitions)
logging.info('Replicas : %d', replicas)
logging.info('Name : %s', self.name)
self.newPartitionTable = partial(PartitionTable, partitions, replicas)
self.listening_conn = None
self.cluster_state = None
......@@ -212,17 +215,23 @@ class Application(BaseApplication):
if node_list:
node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list):
def broadcastPartitionChanges(self, cell_list, num_replicas=None):
"""Broadcast a Notify Partition Changes packet."""
if cell_list:
ptid = self.pt.setNextID()
self.pt.logUpdated()
packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
pt = self.pt
if num_replicas is not None:
pt.setReplicas(num_replicas)
elif cell_list:
num_replicas = pt.getReplicas()
else:
return
packet = Packets.NotifyPartitionChanges(
pt.setNextID(), num_replicas, cell_list)
pt.logUpdated()
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
def provideService(self):
"""
......@@ -437,16 +446,7 @@ class Application(BaseApplication):
conn.send(notification_packet)
elif conn.isServer():
continue
if node.isClient():
if state == ClusterStates.RUNNING:
handler = self.client_service_handler
elif state == ClusterStates.BACKINGUP:
handler = self.client_ro_service_handler
else:
if state != ClusterStates.STOPPING:
conn.abort()
continue
elif node.isMaster():
if node.isMaster():
if state == ClusterStates.RECOVERING:
handler = self.election_handler
else:
......@@ -454,10 +454,16 @@ class Application(BaseApplication):
elif node.isStorage() and storage_handler:
handler = storage_handler
else:
# There's a single handler type for admins.
# Client can't change handler without being first disconnected.
assert state in (
ClusterStates.STOPPING,
ClusterStates.STOPPING_BACKUP,
) or not node.isClient(), (state, node)
continue # keep handler
if type(handler) is not type(conn.getLastHandler()):
conn.setHandler(handler)
handler.connectionCompleted(conn, new=False)
handler.handlerSwitched(conn, new=False)
self.cluster_state = state
def getNewUUID(self, uuid, address, node_type):
......
......@@ -111,17 +111,12 @@ class BackupApplication(object):
else:
break
poll(1)
node, conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
node, conn = bootstrap.getPrimaryConnection()
try:
app.changeClusterState(ClusterStates.BACKINGUP)
del bootstrap, node
if num_partitions != pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
self.ignore_invalidations = True
self.pt = PartitionTable(num_partitions, num_replicas)
conn.setHandler(BackupHandler(self))
conn.ask(Packets.AskPartitionTable())
conn.ask(Packets.AskLastTransaction())
# debug variable to log how big 'tid_list' can be.
self.debug_tid_count = 0
......
......@@ -23,10 +23,6 @@ from neo.lib.protocol import Packets
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def connectionCompleted(self, conn, new=None):
if new is None:
super(MasterHandler, self).connectionCompleted(conn)
def connectionLost(self, conn, new_state=None):
if self.app.listening_conn: # if running
self._connectionLost(conn)
......@@ -59,17 +55,20 @@ class MasterHandler(EventHandler):
+ app.getNodeInformationDict(node_list)[node.getType()])
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn):
def handlerSwitched(self, conn, new):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
# Except storages during recovery and secondary masters, all nodes
# receives the full partition table as soon as they're identified.
# It is also sent in 2 other cases:
# - to admins during recovery, whenever a newer PT is loaded;
# - to storage when switching from recovery to verification.
# After that, non-master nodes only receive incremental updates.
conn.send(Packets.SendPartitionTable(
pt.getID(), pt.getReplicas(), pt.getRowList()))
class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase."""
def connectionCompleted(self, conn, new):
pt = self.app.pt
conn.send(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
"""Common handler class for storage nodes."""
def connectionLost(self, conn, new_state):
app = self.app
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
from functools import wraps
from . import MasterHandler
from ..app import monotonic_time, StateChangedException
......@@ -38,9 +39,25 @@ NODE_STATE_WORKFLOW = {
NodeTypes.STORAGE: (NodeStates.DOWN, NodeStates.UNKNOWN),
}
def check_state(*states):
def decorator(wrapped):
def wrapper(self, *args):
state = self.app.getClusterState()
if state not in states:
raise ProtocolError('%s RPC can not be used in %s state'
% (wrapped.__name__, state))
wrapped(self, *args)
return wraps(wrapped)(wrapper)
return decorator
class AdministrationHandler(MasterHandler):
"""This class deals with messages from the admin node only"""
def handlerSwitched(self, conn, new):
assert new
super(AdministrationHandler, self).handlerSwitched(conn, new)
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
if node is not None:
......@@ -134,16 +151,17 @@ class AdministrationHandler(MasterHandler):
monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node])
# XXX: Would it be safe to allow more states ?
__change_pt_rpc = check_state(
ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP)
@__change_pt_rpc
def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join(map(uuid_str, uuid_list))
logging.debug('Add nodes %s', uuids)
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not add nodes in %s state' % state)
# take all pending nodes
node_list = list(app.pt.addNodeList(node
for node in app.nm.getStorageList()
......@@ -172,24 +190,21 @@ class AdministrationHandler(MasterHandler):
node.send(repair)
conn.answer(Errors.Ack(''))
@__change_pt_rpc
def setNumReplicas(self, conn, num_replicas):
self.app.broadcastPartitionChanges((), num_replicas)
conn.answer(Errors.Ack(''))
@__change_pt_rpc
def tweakPartitionTable(self, conn, uuid_list):
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not tweak partition table in %s state'
% state)
app.broadcastPartitionChanges(app.pt.tweak([node
for node in app.nm.getStorageList()
if node.getUUID() in uuid_list or not node.isRunning()]))
conn.answer(Errors.Ack(''))
@check_state(ClusterStates.RUNNING)
def truncate(self, conn, tid):
app = self.app
if app.cluster_state != ClusterStates.RUNNING:
raise ProtocolError('Can not truncate in this state')
conn.answer(Errors.Ack(''))
raise StoppedOperation(tid)
......@@ -237,3 +252,5 @@ class AdministrationHandler(MasterHandler):
node.send(Packets.CheckPartition(
offset, source, min_tid, max_tid))
conn.answer(Errors.Ack(''))
del __change_pt_rpc
......@@ -17,6 +17,7 @@
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.protocol import ZERO_TID
from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler):
"""Handler dedicated to upstream master during BACKINGUP state"""
......@@ -25,12 +26,15 @@ class BackupHandler(EventHandler):
if self.app.app.listening_conn: # if running
raise PrimaryFailure('connection lost')
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
app = self.app
pt = app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
if pt.getPartitions() != app.app.pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def answerLastTransaction(self, conn, tid):
app = self.app
......
......@@ -22,6 +22,10 @@ from . import MasterHandler
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
def handlerSwitched(self, conn, new):
assert new
super(ClientServiceHandler, self).handlerSwitched(conn, new)
def _connectionLost(self, conn):
# cancel its transactions and forgot the node
app = self.app
......
......@@ -128,11 +128,9 @@ class IdentificationHandler(EventHandler):
conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
uuid))
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True)
handler.handlerSwitched(conn, True)
class SecondaryIdentificationHandler(EventHandler):
......
......@@ -23,6 +23,9 @@ from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
class SecondaryHandler(MasterHandler):
"""Handler used by primary to handle secondary masters"""
def handlerSwitched(self, conn, new):
pass
def _connectionLost(self, conn):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
......@@ -30,21 +33,20 @@ class SecondaryHandler(MasterHandler):
app.broadcastNodesInformation([node])
class ElectionHandler(MasterHandler):
class ElectionHandler(SecondaryHandler):
"""Handler used by primary to handle secondary masters during election"""
def connectionCompleted(self, conn, new=None):
if new is None:
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election, (), ()))
def connectionCompleted(self, conn):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election, (), ()))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
self.connectionLost(conn)
def _acceptIdentification(self, node, *args):
def _acceptIdentification(self, node):
raise PrimaryElected(node)
def _connectionLost(self, *args):
......@@ -66,7 +68,7 @@ class ElectionHandler(MasterHandler):
class PrimaryHandler(ElectionHandler):
"""Handler used by secondaries to handle primary master"""
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.app.primary_master is node, (self.app.primary_master, node)
def _connectionLost(self, conn):
......
......@@ -26,10 +26,10 @@ from . import BaseServiceHandler
class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """
def connectionCompleted(self, conn, new):
def handlerSwitched(self, conn, new):
app = self.app
if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new)
super(StorageServiceHandler, self).handlerSwitched(conn, new)
node = app.nm.getByUUID(conn.getUUID())
if node.isRunning(): # node may be PENDING
app.startStorage(node)
......
......@@ -56,6 +56,10 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self._id += 1
return self._id
def setReplicas(self, num_replicas):
assert num_replicas >= 0, num_replicas
self.nr = num_replicas
def make(self, node_list):
"""Make a new partition table from scratch."""
assert self._id is None and node_list, (self._id, node_list)
......@@ -108,26 +112,19 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self.num_filled_rows = len(filter(None, self.partition_list))
return change_list
def load(self, ptid, row_list, nm):
def load(self, ptid, num_replicas, row_list, nm):
"""
Load a partition table from a storage node during the recovery.
Return the new storage nodes registered
"""
# check offsets
for offset, _row in row_list:
if offset >= self.getPartitions():
raise IndexError, offset
# store the partition table
self.clear()
self._id = ptid
new_nodes = []
for offset, row in row_list:
for uuid, state in row:
node = nm.getByUUID(uuid)
if node is None:
node = nm.createStorage(uuid=uuid)
new_nodes.append(node.asTuple())
self._setCell(offset, node, state)
def getByUUID(nid):
node = nm.getByUUID(nid)
if node is None:
node = nm.createStorage(uuid=nid)
new_nodes.append(node.asTuple())
return node
self._load(ptid, num_replicas, row_list, getByUUID)
return new_nodes
def setUpToDate(self, node, offset):
......
......@@ -28,7 +28,7 @@ class RecoveryManager(MasterHandler):
def __init__(self, app):
# The target node's uuid to request next.
self.target_ptid = None
self.target_ptid = 0
self.ask_pt = []
self.backup_tid_dict = {}
self.truncate_dict = {}
......@@ -52,9 +52,8 @@ class RecoveryManager(MasterHandler):
"""
logging.info('begin the recovery of the status')
app = self.app
pt = app.pt
pt = app.pt = app.newPartitionTable()
app.changeClusterState(ClusterStates.RECOVERING)
pt.clear()
self.try_secondary = True
......@@ -113,7 +112,7 @@ class RecoveryManager(MasterHandler):
for node in node_list:
conn = node.getConnection()
conn.send(truncate)
self.connectionCompleted(conn, False)
self.handlerSwitched(conn, False)
continue
node_list = pt.getConnectedNodeList()
break
......@@ -140,12 +139,12 @@ class RecoveryManager(MasterHandler):
logging.info('creating a new partition table')
pt.make(node_list)
self._notifyAdmins(Packets.SendPartitionTable(
pt.getID(), pt.getRowList()))
pt.getID(), pt.getReplicas(), pt.getRowList()))
else:
cell_list