Commit c6453626 authored by Julien Muchembled's avatar Julien Muchembled

Bump protocol version

parents 64e02391 2a27239d
......@@ -21,7 +21,6 @@ from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger
......@@ -66,7 +65,6 @@ class Application(BaseApplication):
super(Application, self).close()
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
......@@ -117,40 +115,20 @@ class Application(BaseApplication):
self.cluster_state = None
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.ADMIN, self.server)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
if self.pt is None:
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
row_list = []
pt = self.pt
if max_offset == 0:
max_offset = self.pt.getPartitions()
max_offset = pt.getPartitions()
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
row_list = map(pt.getRow, xrange(min_offset, max_offset))
except IndexError:
conn.send(Errors.ProtocolError('invalid partition table offset'))
else:
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
conn.answer(Packets.AnswerPartitionList(
pt.getID(), pt.getReplicas(), row_list))
......@@ -17,11 +17,12 @@
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
if self.app.master_conn is not None:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
......@@ -74,6 +75,7 @@ class AdminEventHandler(EventHandler):
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
setNumReplicas = forward_ask(Packets.SetNumReplicas)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
......@@ -112,16 +114,12 @@ class MasterEventHandler(EventHandler):
def answerClusterState(self, conn, state):
self.app.cluster_state = state
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
......
......@@ -226,8 +226,8 @@ class Application(ThreadedApplication):
self.notifications_handler,
node=node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, (), None)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, (), ())
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
......@@ -244,7 +244,6 @@ class Application(ThreadedApplication):
# operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again.
logging.info('Initializing from master')
ask(conn, Packets.AskPartitionTable(), handler=handler)
ask(conn, Packets.AskLastTransaction(), handler=handler)
if self.pt.operational():
break
......@@ -270,7 +269,7 @@ class Application(ThreadedApplication):
conn = MTClientConnection(self, self.storage_event_handler, node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, (), self.id_timestamp)
self.uuid, None, self.name, self.id_timestamp, (), ())
try:
self._ask(conn, p, handler=self.storage_bootstrap_handler)
except ConnectionClosed:
......
......@@ -26,10 +26,6 @@ from ..exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def answerPartitionTable(self, conn, ptid, row_list):
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerLastTransaction(*args):
pass
......@@ -42,9 +38,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
except PrimaryElected, e:
self.app.primary_master_node, = e.args
def _acceptIdentification(self, node, num_partitions, num_replicas):
self.app.pt = PartitionTable(num_partitions, num_replicas)
def answerLastTransaction(self, conn, ltid):
app = self.app
app_last_tid = app.__dict__.get('last_tid', '')
......@@ -134,9 +127,12 @@ class PrimaryNotificationsHandler(MTEventHandler):
finally:
app._cache_lock_release()
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryNotificationsHandler, self).notifyNodeInformation(
......
......@@ -26,7 +26,7 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, node_type, server=None, devpath=()):
def __init__(self, app, node_type, server=None, devpath=(), new_nid=()):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
......@@ -34,9 +34,8 @@ class BootstrapManager(EventHandler):
"""
self.server = server
self.devpath = devpath
self.new_nid = new_nid
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
app.nm.reset()
uuid = property(lambda self: self.app.uuid)
......@@ -44,7 +43,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, self.devpath, None))
self.server, self.app.name, None, self.devpath, self.new_nid))
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
......@@ -53,10 +52,8 @@ class BootstrapManager(EventHandler):
def connectionLost(self, conn, new_state):
self.current = None
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.current is node, (self.current, node)
self.num_partitions = num_partitions
self.num_replicas = num_replicas
def getPrimaryConnection(self):
"""
......@@ -73,8 +70,7 @@ class BootstrapManager(EventHandler):
try:
while self.current:
if self.current.isIdentified():
return (self.current, self.current.getConnection(),
self.num_partitions, self.num_replicas)
return self.current, self.current.getConnection()
poll(1)
except PrimaryElected, e:
if self.current:
......
......@@ -26,6 +26,9 @@ from .protocol import (NodeStates, NodeTypes, Packets, uuid_str,
from .util import cached_property
class AnswerDenied(Exception):
"""Helper exception to stop packet processing and answer a Denied error"""
class DelayEvent(Exception):
pass
......@@ -98,6 +101,8 @@ class EventHandler(object):
% (m.im_class.__module__, m.im_class.__name__, m.__name__)))
except NonReadableCell, e:
conn.answer(Errors.NonReadableCell())
except AnswerDenied, e:
conn.answer(Errors.Denied(str(e)))
except AssertionError:
e = sys.exc_info()
try:
......@@ -160,8 +165,7 @@ class EventHandler(object):
def _acceptIdentification(*args):
pass
def acceptIdentification(self, conn, node_type, uuid,
num_partitions, num_replicas, your_uuid):
def acceptIdentification(self, conn, node_type, uuid, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
assert node.getConnection() is conn, (node.getConnection(), conn)
......@@ -180,7 +184,7 @@ class EventHandler(object):
elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids')
node.setIdentified()
self._acceptIdentification(node, num_partitions, num_replicas)
self._acceptIdentification(node)
return
conn.close()
......
......@@ -486,7 +486,7 @@ class NodeManager(EventQueue):
# For the first notification, we receive a full list of nodes from
# the master. Remove all unknown nodes from a previous connection.
for node in self._node_set.difference(added_list):
if app.pt.dropNode(node):
if not node.isStorage() or app.pt.dropNode(node):
self.remove(node)
self.log()
self.executeQueuedEvents()
......
......@@ -22,7 +22,7 @@ from struct import Struct
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 5
PROTOCOL_VERSION = 6
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# Avoid memory errors on corrupted data.
......@@ -62,6 +62,7 @@ class Enum(tuple):
@Enum
def ErrorCodes():
ACK
DENIED
NOT_READY
OID_NOT_FOUND
TID_NOT_FOUND
......@@ -616,10 +617,7 @@ PFCellList = PList('cell_list',
)
PFRowList = PList('row_list',
PStruct('row',
PNumber('offset'),
PFCellList,
),
PFCellList,
)
PFHistoryList = PList('history_list',
......@@ -685,15 +683,15 @@ class RequestIdentification(Packet):
PUUID('uuid'),
PAddress('address'),
PString('name'),
PList('devpath', PString('devid')),
PFloat('id_timestamp'),
# storage:
PList('devpath', PString('devid')),
PList('new_nid', PNumber('offset')),
)
_answer = PStruct('accept_identification',
PFNodeType,
PUUID('my_uuid'),
PNumber('num_partitions'),
PNumber('num_replicas'),
PUUID('your_uuid'),
)
......@@ -749,23 +747,24 @@ class LastIDs(Packet):
class PartitionTable(Packet):
"""
Ask storage node the remaining data needed by master to recover.
This is also how the clients get the full partition table on connection.
:nodes: M -> S; C -> M
:nodes: M -> S
"""
_answer = PStruct('answer_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
class NotifyPartitionTable(Packet):
"""
Send the full partition table to admin/storage nodes on connection.
Send the full partition table to admin/client/storage nodes on connection.
:nodes: M -> A, S
:nodes: M -> A, C, S
"""
_fmt = PStruct('send_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
......@@ -777,6 +776,7 @@ class PartitionChanges(Packet):
"""
_fmt = PStruct('notify_partition_changes',
PPTID('ptid'),
PNumber('num_replicas'),
PList('cell_list',
PStruct('cell',
PNumber('offset'),
......@@ -1202,6 +1202,7 @@ class PartitionList(Packet):
_answer = PStruct('answer_partition_list',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
......@@ -1253,10 +1254,14 @@ class TweakPartitionTable(Packet):
:nodes: ctl -> A -> M
"""
_fmt = PStruct('tweak_partition_table',
PBoolean('dry_run'),
PFUUIDList,
)
_answer = Error
_answer = PStruct('answer_tweak_partition_table',
PBoolean('changed'),
PFRowList,
)
class NotifyNodeInformation(Packet):
"""
......@@ -1269,6 +1274,18 @@ class NotifyNodeInformation(Packet):
PFNodeList,
)
class SetNumReplicas(Packet):
"""
Set the number of replicas.
:nodes: ctl -> A -> M
"""
_fmt = PStruct('set_num_replicas',
PNumber('num_replicas'),
)
_answer = Error
class SetClusterState(Packet):
"""
Set the cluster state.
......@@ -1762,8 +1779,10 @@ class Packets(dict):
SetNodeState, ignore_when_closed=False)
AddPendingNodes = register(
AddPendingNodes, ignore_when_closed=False)
TweakPartitionTable = register(
TweakPartitionTable, ignore_when_closed=False)
TweakPartitionTable, AnswerTweakPartitionTable = register(
TweakPartitionTable)
SetNumReplicas = register(
SetNumReplicas, ignore_when_closed=False)
SetClusterState = register(
SetClusterState, ignore_when_closed=False)
Repair = register(
......
......@@ -86,15 +86,9 @@ class PartitionTable(object):
'a cell became non-readable whereas all cells were readable'
def __init__(self, num_partitions, num_replicas):
self._id = None
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
# Note: don't use [[]] * num_partition construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(num_partitions)]
self.count_dict = {}
self.clear()
def getID(self):
return self._id
......@@ -113,7 +107,7 @@ class PartitionTable(object):
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(self.np)]
self.count_dict.clear()
self.count_dict = {}
def getAssignedPartitionList(self, uuid):
""" Return the partition assigned to the specified UUID """
......@@ -203,31 +197,31 @@ class PartitionTable(object):
del self.count_dict[node]
return not count
def load(self, ptid, row_list, nm):
def _load(self, ptid, num_replicas, row_list, getByUUID):
self.__init__(len(row_list), num_replicas)
self._id = ptid
for offset, row in enumerate(row_list):
for uuid, state in row:
node = getByUUID(uuid)
self._setCell(offset, node, state)
def load(self, ptid, num_replicas, row_list, nm):
"""
Load the partition table with the specified PTID, discard all previous
content.
"""
self.clear()
self._id = ptid
for offset, row in row_list:
if offset >= self.getPartitions():
raise IndexError
for uuid, state in row:
node = nm.getByUUID(uuid)
# the node must be known by the node manager
assert node is not None
self._setCell(offset, node, state)
self._load(ptid, num_replicas, row_list, nm.getByUUID)
logging.debug('partition table loaded (ptid=%s)', ptid)
self.log()
def update(self, ptid, cell_list, nm):
def update(self, ptid, num_replicas, cell_list, nm):
"""
Update the partition with the cell list supplied. If a node
is not known, it is created in the node manager and set as unavailable
"""
assert self._id < ptid, (self._id, ptid)
self._id = ptid
self.nr = num_replicas
readable_list = []
for row in self.partition_list:
if not all(cell.isReadable() for cell in row):
......@@ -310,14 +304,11 @@ class PartitionTable(object):
return True
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return []
return [(cell.getUUID(), cell.getState()) for cell in row]
return [(cell.getUUID(), cell.getState())
for cell in self.partition_list[offset]]
def getRowList(self):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
return map(self.getRow, xrange(self.np))
class MTPartitionTable(PartitionTable):
""" Thread-safe aware version of the partition table, override only methods
......
......@@ -16,6 +16,7 @@
import sys
from collections import defaultdict
from functools import partial
from time import time
from neo.lib import logging, util
......@@ -76,13 +77,11 @@ class Application(BaseApplication):
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Master node"
parser = cls.option_parser
parser.description = "NEO Master node"
cls.addCommonServerOptions('master', '127.0.0.1:10000', '')
_ = _.group('master')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
_ = parser.group('master')
_.int('A', 'autostart',
help="minimum number of pending storage nodes to automatically"
" start new cluster (to avoid unwanted recreation of the"
......@@ -94,6 +93,10 @@ class Application(BaseApplication):
_.int('i', 'nid',
help="specify an NID to use for this process (testing purpose)")
_ = parser.group('database creation')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
def __init__(self, config):
super(Application, self).__init__(
config.get('ssl'), config.get('dynamic_master_list'))
......@@ -117,14 +120,14 @@ class Application(BaseApplication):
replicas = config['replicas']
partitions = config['partitions']
if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
sys.exit('replicas must be a positive integer')
if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas)
sys.exit('partitions must be more than zero')
logging.info('Configuration:')
logging.info('Partitions: %d', partitions)
logging.info('Replicas : %d', replicas)
logging.info('Name : %s', self.name)
self.newPartitionTable = partial(PartitionTable, partitions, replicas)
self.listening_conn = None
self.cluster_state = None
......@@ -196,7 +199,7 @@ class Application(BaseApplication):
node_dict[NodeTypes.MASTER].append(node_info)
return node_dict
def broadcastNodesInformation(self, node_list, exclude=None):
def broadcastNodesInformation(self, node_list):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
......@@ -209,20 +212,26 @@ class Application(BaseApplication):
# We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters.
if node_list and node is not exclude:
if node_list:
node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list):
def broadcastPartitionChanges(self, cell_list, num_replicas=None):
"""Broadcast a Notify Partition Changes packet."""
if cell_list:
ptid = self.pt.setNextID()
self.pt.logUpdated()
packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
pt = self.pt
if num_replicas is not None:
pt.setReplicas(num_replicas)
elif cell_list:
num_replicas = pt.getReplicas()
else:
return
packet = Packets.NotifyPartitionChanges(
pt.setNextID(), num_replicas, cell_list)
pt.logUpdated()
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
def provideService(self):
"""
......@@ -437,16 +446,7 @@ class Application(BaseApplication):
conn.send(notification_packet)
elif conn.isServer():
continue
if node.isClient():
if state == ClusterStates.RUNNING:
handler = self.client_service_handler
elif state == ClusterStates.BACKINGUP:
handler = self.client_ro_service_handler
else:
if state != ClusterStates.STOPPING:
conn.abort()
continue
elif node.isMaster():
if node.isMaster():
if state == ClusterStates.RECOVERING:
handler = self.election_handler
else:
......@@ -454,10 +454,16 @@ class Application(BaseApplication):
elif node.isStorage() and storage_handler:
handler = storage_handler
else:
# There's a single handler type for admins.
# Client can't change handler without being first disconnected.
assert state in (
ClusterStates.STOPPING,
ClusterStates.STOPPING_BACKUP,
) or not node.isClient(), (state, node)
continue # keep handler
if type(handler) is not type(conn.getLastHandler()):
conn.setHandler(handler)
handler.connectionCompleted(conn, new=False)
handler.handlerSwitched(conn, new=False)
self.cluster_state = state
def getNewUUID(self, uuid, address, node_type):
......
......@@ -111,17 +111,12 @@ class BackupApplication(object):
else:
break
poll(1)
node, conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
node, conn = bootstrap.getPrimaryConnection()
try:
app.changeClusterState(ClusterStates.BACKINGUP)
del bootstrap, node
if num_partitions != pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
self.ignore_invalidations = True
self.pt = PartitionTable(num_partitions, num_replicas)
conn.setHandler(BackupHandler(self))
conn.ask(Packets.AskPartitionTable())
conn.ask(Packets.AskLastTransaction())
# debug variable to log how big 'tid_list' can be.
self.debug_tid_count = 0
......
......@@ -23,10 +23,6 @@ from neo.lib.protocol import Packets
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def connectionCompleted(self, conn, new=None):
if new is None:
super(MasterHandler, self).connectionCompleted(conn)
def connectionLost(self, conn, new_state=None):
if self.app.listening_conn: # if running
self._connectionLost(conn)
......@@ -59,17 +55,20 @@ class MasterHandler(EventHandler):
+ app.getNodeInformationDict(node_list)[node.getType()])
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn):
def handlerSwitched(self, conn, new):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
# Except storages during recovery and secondary masters, all nodes
# receives the full partition table as soon as they're identified.
# It is also sent in 2 other cases:
# - to admins during recovery, whenever a newer PT is loaded;
# - to storage when switching from recovery to verification.
# After that, non-master nodes only receive incremental updates.
conn.send(Packets.SendPartitionTable(
pt.getID(), pt.getReplicas(), pt.getRowList()))
class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase."""
def connectionCompleted(self, conn, new):
pt = self.app.pt
conn.send(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
"""Common handler class for storage nodes."""
def connectionLost(self, conn, new_state):
app = self.app
......
......@@ -15,14 +15,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
from functools import wraps
from . import MasterHandler
from ..app import monotonic_time, StateChangedException
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.handler import AnswerDenied
from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
NodeStates, NodeTypes, Packets, uuid_str
from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = {
......@@ -38,9 +40,25 @@ NODE_STATE_WORKFLOW = {
NodeTypes.STORAGE: (NodeStates.DOWN, NodeStates.UNKNOWN),
}
def check_state(*states):
def decorator(wrapped):
def wrapper(self, *args):
state = self.app.getClusterState()
if state not in states:
raise AnswerDenied('%s RPC can not be used in %s state'
% (wrapped.__name__, state))
wrapped(self, *args)
return wraps(wrapped)(wrapper)
return decorator
class AdministrationHandler(MasterHandler):
"""This class deals with messages from the admin node only"""
def handlerSwitched(self, conn, new):
assert new
super(AdministrationHandler, self).handlerSwitched(conn, new)
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
if node is not None:
......@@ -58,30 +76,28 @@ class AdministrationHandler(MasterHandler):
# check request
try:
if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
raise ProtocolError('Can not switch to this state')
raise AnswerDenied('Can not switch to this state')
except KeyError:
if state != ClusterStates.STOPPING:
raise ProtocolError('Invalid state requested')
raise AnswerDenied('Invalid state requested')
# change state
if state == ClusterStates.VERIFYING:
storage_list = app.nm.getStorageList(only_identified=True)
if not storage_list:
raise ProtocolError('Cannot exit recovery without any '
'storage node')
raise AnswerDenied(
'Cannot exit recovery without any storage node')
for node in storage_list:
assert node.isPending(), node
if node.getConnection().isPending():
# XXX: It's wrong to use ProtocolError here. We must reply
# less aggressively because the admin has no way to
# know that there's still pending activity.
raise ProtocolError('Cannot exit recovery now: node %r is '
'entering cluster' % (node, ))
raise AnswerDenied(
'Cannot exit recovery now: node %r is entering cluster'
% node,)
app._startup_allowed = True
state = app.cluster_state
elif state == ClusterStates.STARTING_BACKUP:
if app.tm.hasPending() or app.nm.getClientList(True):
raise ProtocolError("Can not switch to %s state with pending"
raise AnswerDenied("Can not switch to %s state with pending"
" transactions or connected clients" % state)
conn.answer(Errors.Ack('Cluster state changed'))
......@@ -93,11 +109,11 @@ class AdministrationHandler(MasterHandler):
app = self.app
node = app.nm.getByUUID(uuid)
if node is None:
raise ProtocolError('unknown node')
raise AnswerDenied('unknown node')
if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
raise ProtocolError('can not switch node to this state')
raise AnswerDenied('can not switch node to %s state' % state)
if uuid == app.uuid:
raise ProtocolError('can not kill primary master node')
raise AnswerDenied('can not kill primary master node')
state_changed = state != node.getState()
message = ('state changed' if state_changed else
......@@ -107,7 +123,7 @@ class AdministrationHandler(MasterHandler):
try:
cell_list = app.pt.dropNodeList([node], keep)
except PartitionTableException, e:
raise ProtocolError(str(e))
raise AnswerDenied(str(e))
node.setState(state)
if node.isConnected():
# notify itself so it can shutdown
......@@ -134,16 +150,17 @@ class AdministrationHandler(MasterHandler):
monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node])
# XXX: Would it be safe to allow more states ?
__change_pt_rpc = check_state(
ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP)
@__change_pt_rpc
def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join(map(uuid_str, uuid_list))
logging.debug('Add nodes %s', uuids)
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not add nodes in %s state' % state)
# take all pending nodes
node_list = list(app.pt.addNodeList(node
for node in app.nm.getStorageList()
......@@ -165,31 +182,44 @@ class AdministrationHandler(MasterHandler):
for uuid in uuid_list:
node = getByUUID(uuid)
if node is None or not (node.isStorage() and node.isIdentified()):
raise ProtocolError("invalid storage node %s" % uuid_str(uuid))
raise AnswerDenied("invalid storage node %s" % uuid_str(uuid))
node_list.append(node)
repair = Packets.NotifyRepair(*args)
for node in node_list:
node.send(repair)
conn.answer(Errors.Ack(''))
def tweakPartitionTable(self, conn, uuid_list):
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not tweak partition table in %s state'
% state)
app.broadcastPartitionChanges(app.pt.tweak([node
for node in app.nm.getStorageList()
if node.getUUID() in uuid_list or not node.isRunning()]))
@__change_pt_rpc
def setNumReplicas(self, conn, num_replicas):
self.app.broadcastPartitionChanges((), num_replicas)
conn.answer(Errors.Ack(''))
def truncate(self, conn, tid):
@__change_pt_rpc
def tweakPartitionTable(self, conn, dry_run, uuid_list):
app = self.app
if app.cluster_state != ClusterStates.RUNNING:
raise ProtocolError('Can not truncate in this state')
drop_list = [node for node in app.nm.getStorageList()
if node.getUUID() in uuid_list or not node.isRunning()]
if dry_run:
pt = object.__new__(app.pt.__class__)
new_nodes = pt.load(app.pt.getID(), app.pt.getReplicas(),
app.pt.getRowList(), app.nm)
assert not new_nodes
pt.addNodeList(node
for node, count in app.pt.count_dict.iteritems()
if not count)
else:
pt = app.pt
try:
changed_list = pt.tweak(drop_list)
except PartitionTableException, e:
raise AnswerDenied(str(e))
if not dry_run:
app.broadcastPartitionChanges(changed_list)
conn.answer(Packets.AnswerTweakPartitionTable(
bool(changed_list), pt.getRowList()))
@check_state(ClusterStates.RUNNING)
def truncate(self, conn, tid):
conn.answer(Errors.Ack(''))
raise StoppedOperation(tid)
......@@ -237,3 +267,5 @@ class AdministrationHandler(MasterHandler):
node.send(Packets.CheckPartition(
offset, source, min_tid, max_tid))
conn.answer(Errors.Ack(''))
del __change_pt_rpc
......@@ -17,6 +17,7 @@
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.protocol import ZERO_TID
from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler):
"""Handler dedicated to upstream master during BACKINGUP state"""
......@@ -25,12 +26,15 @@ class BackupHandler(EventHandler):
if self.app.app.listening_conn: # if running
raise PrimaryFailure('connection lost')
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
app = self.app
pt = app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
if pt.getPartitions() != app.app.pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def answerLastTransaction(self, conn, tid):
app = self.app
......
......@@ -22,6 +22,10 @@ from . import MasterHandler
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
def handlerSwitched(self, conn, new):
assert new
super(ClientServiceHandler, self).handlerSwitched(conn, new)
def _connectionLost(self, conn):
# cancel its transactions and forgot the node
app = self.app
......
......@@ -17,14 +17,14 @@
from neo.lib import logging
from neo.lib.exception import PrimaryElected
from neo.lib.handler import EventHandler
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \
NotReadyError, Packets, ProtocolError, uuid_str
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
NodeTypes, NotReadyError, Packets, ProtocolError, uuid_str
from ..app import monotonic_time
class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......@@ -77,6 +77,16 @@ class IdentificationHandler(EventHandler):
manager = app
state, handler = manager.identifyStorageNode(
uuid is not None and node is not None)
if not address:
if app.cluster_state == ClusterStates.RECOVERING:
raise NotReadyError
if uuid or not new_nid:
raise ProtocolError
state = NodeStates.DOWN
# We'll let the storage node close the connection. If we
# aborted it at the end of the method, BootstrapManager
# (which is used by storage nodes) could see the closure
# and try to reconnect to a master.
human_readable_node_type = ' storage (%s) ' % (state, )
elif node_type == NodeTypes.MASTER:
if app.election:
......@@ -105,24 +115,27 @@ class IdentificationHandler(EventHandler):
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time()
node.setState(state)
app.broadcastNodesInformation([node])
if new_nid:
changed_list = []
for offset in new_nid:
changed_list.append((offset, uuid, CellStates.OUT_OF_DATE))
app.pt._setCell(offset, node, CellStates.OUT_OF_DATE)
app.broadcastPartitionChanges(changed_list)
conn.setHandler(handler)
node.setConnection(conn, not node.isIdentified())
app.broadcastNodesInformation([node], node)
conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
uuid))
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True)
handler.handlerSwitched(conn, True)
class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......
......@@ -23,6 +23,9 @@ from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
class SecondaryHandler(MasterHandler):
"""Handler used by primary to handle secondary masters"""
def handlerSwitched(self, conn, new):
pass
def _connectionLost(self, conn):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
......@@ -30,21 +33,20 @@ class SecondaryHandler(MasterHandler):
app.broadcastNodesInformation([node])
class ElectionHandler(MasterHandler):
class ElectionHandler(SecondaryHandler):
"""Handler used by primary to handle secondary masters during election"""
def connectionCompleted(self, conn, new=None):
if new is None:
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, (), app.election))
def connectionCompleted(self, conn):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election, (), ()))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
self.connectionLost(conn)
def _acceptIdentification(self, node, *args):
def _acceptIdentification(self, node):
raise PrimaryElected(node)
def _connectionLost(self, *args):
......@@ -66,7 +68,7 @@ class ElectionHandler(MasterHandler):
class PrimaryHandler(ElectionHandler):
"""Handler used by secondaries to handle primary master"""
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.app.primary_master is node, (self.app.primary_master, node)
def _connectionLost(self, conn):
......
......@@ -26,10 +26,10 @@ from . import BaseServiceHandler
class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """
def connectionCompleted(self, conn, new):
def handlerSwitched(self, conn, new):
app = self.app
if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new)
super(StorageServiceHandler, self).handlerSwitched(conn, new)
node = app.nm.getByUUID(conn.getUUID())
if node.isRunning(): # node may be PENDING
app.startStorage(node)
......
......@@ -56,6 +56,10 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self._id += 1
return self._id
def setReplicas(self, num_replicas):
assert num_replicas >= 0, num_replicas
self.nr = num_replicas
def make(self, node_list):
"""Make a new partition table from scratch."""
assert self._id is None and node_list, (self._id, node_list)
......@@ -108,26 +112,19 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self.num_filled_rows = len(filter(None, self.partition_list))
return change_list
def load(self, ptid, row_list, nm):
def load(self, ptid, num_replicas, row_list, nm):
"""
Load a partition table from a storage node during the recovery.
Return the new storage nodes registered
"""
# check offsets
for offset, _row in row_list:
if offset >= self.getPartitions():
raise IndexError, offset
# store the partition table
self.clear()
self._id = ptid
new_nodes = []
for offset, row in row_list:
for uuid, state in row:
node = nm.getByUUID(uuid)
if node is None:
node = nm.createStorage(uuid=uuid)
new_nodes.append(node.asTuple())
self._setCell(offset, node, state)
def getByUUID(nid):
node = nm.getByUUID(nid)
if node is None:
node = nm.createStorage(uuid=nid)
new_nodes.append(node.asTuple())
return node
self._load(ptid, num_replicas, row_list, getByUUID)
return new_nodes
def setUpToDate(self, node, offset):
......@@ -183,7 +180,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
few readable cells, some cells are instead marked as FEEDING. This is
a preliminary step to drop these nodes, otherwise the partition table
could become non-operational.
- Other nodes must have the same number of cells, off by 1.
In fact, the code touching these cells is disabled (see NOTE below).
- Other nodes must have the same number of non-feeding cells, off by 1.
- When a transaction creates new objects (oids are roughly allocated
sequentially), we expect better performance by maximizing the number
of involved nodes (i.e. parallelizing writes).
......@@ -232,6 +230,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
# Collect some data in a usable form for the rest of the method.
node_list = {node: {} for node in self.count_dict
if node not in drop_list}
if not node_list:
raise neo.lib.pt.PartitionTableException("Can't remove all nodes.")
drop_list = defaultdict(list)
for offset, row in enumerate(self.partition_list):
for cell in row:
......@@ -420,6 +420,22 @@ class PartitionTable(neo.lib.pt.PartitionTable):
outdated_list[offset] -= 1
for offset, cell in cell_dict.iteritems():
discard_list[offset].append(cell)
# NOTE: The following line disables the next 2 lines, which actually
# causes cells in drop_list to be discarded, now or later;
# drop_list could be renamed into ignore_list.
# 1. Deleting data partition per partition is a lot of work, so
# why ask nodes in drop_list to do that when the goal is
# simply to trash the whole underlying database?
# 2. By excluding nodes from a tweak, it becomes possible to have
# parts of the partition table that are tweaked differently.
# This may require to temporarily change the number of
# replicas for the part being tweaked. In the future, this
# number may be specified in the 'tweak' command, to avoid
# race conditions with setUpToDate().
# Overall, a common use case is when importing a ZODB to NEO,
# to keep the initial importing node up until the database is
# split and replicated to the final nodes.
drop_list = {}
for offset, drop_list in drop_list.iteritems():
discard_list[offset] += drop_list
# We have sorted cells to discard in order to first deallocate nodes
......
......@@ -28,7 +28,7 @@ class RecoveryManager(MasterHandler):
def __init__(self, app):
# The target node's uuid to request next.
self.target_ptid = None
self.target_ptid = 0
self.ask_pt = []
self.backup_tid_dict = {}
self.truncate_dict = {}
......@@ -52,9 +52,8 @@ class RecoveryManager(MasterHandler):
"""
logging.info('begin the recovery of the status')
app = self.app
pt = app.pt
pt = app.pt = app.newPartitionTable()
app.changeClusterState(ClusterStates.RECOVERING)
pt.clear()
self.try_secondary = True
......@@ -113,7 +112,7 @@ class RecoveryManager(MasterHandler):
for node in node_list:
conn = node.getConnection()
conn.send(truncate)
self.connectionCompleted(conn, False)
self.handlerSwitched(conn, False)
continue
node_list = pt.getConnectedNodeList()
break
......@@ -140,12 +139,12 @@ class RecoveryManager(MasterHandler):
logging.info('creating a new partition table')
pt.make(node_list)
self._notifyAdmins(Packets.SendPartitionTable(
pt.getID(), pt.getRowList()))
pt.getID(), pt.getReplicas(), pt.getRowList()))
else:
cell_list = pt.outdate()
if cell_list:
self._notifyAdmins(Packets.NotifyPartitionChanges(
pt.setNextID(), cell_list))
pt.setNextID(), pt.getReplicas(), cell_list))
if app.backup_tid:
pt.setBackupTidDict(self.backup_tid_dict)
app.backup_tid = pt.getBackupTid()
......@@ -175,16 +174,16 @@ class RecoveryManager(MasterHandler):
if node is None or node.getState() == new_state:
return
node.setState(new_state)
# broadcast to all so that admin nodes gets informed
self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn, new):
def handlerSwitched(self, conn, new):
# ask the last IDs to perform the recovery
conn.ask(Packets.AskRecovery())
def answerRecovery(self, conn, ptid, backup_tid, truncate_tid):
uuid = conn.getUUID()
if self.target_ptid <= ptid:
# ptid is None if the node has an empty partition table.
if ptid and self.target_ptid <= ptid:
# Maybe a newer partition table.
if self.target_ptid == ptid and self.ask_pt:
# Another node is already asked.
......@@ -197,17 +196,14 @@ class RecoveryManager(MasterHandler):
self.backup_tid_dict[uuid] = backup_tid
self.truncate_dict[uuid] = truncate_tid
def answerPartitionTable(self, conn, ptid, row_list):
def answerPartitionTable(self, conn, ptid, num_replicas, row_list):
# If this is not from a target node, ignore it.
if ptid == self.target_ptid:
app = self.app
try:
new_nodes = app.pt.load(ptid, row_list, app.nm)
except IndexError:
raise ProtocolError('Invalid offset')
new_nodes = app.pt.load(ptid, num_replicas, row_list, app.nm)
self._notifyAdmins(
Packets.NotifyNodeInformation(monotonic_time(), new_nodes),
Packets.SendPartitionTable(ptid, row_list))
Packets.SendPartitionTable(ptid, num_replicas, row_list))
self.ask_pt = ()
uuid = conn.getUUID()
app.backup_tid = self.backup_tid_dict[uuid]
......
......@@ -30,6 +30,7 @@ action_dict = {
},
'set': {
'cluster': 'setClusterState',
'replicas': 'setNumReplicas',
},
'check': 'checkReplicas',
'start': 'startCluster',
......@@ -105,10 +106,10 @@ class TerminalNeoCTL(object):
max_offset = int(max_offset)
if node is not None:
node = self.asNode(node)
ptid, row_list = self.neoctl.getPartitionRowList(
ptid, num_replicas, row_list = self.neoctl.getPartitionRowList(
min_offset=min_offset, max_offset=max_offset, node=node)
# TODO: return ptid
return self.formatRowList(row_list)
return '# ptid: %s, replicas: %s\n%s' % (ptid, num_replicas,
self.formatRowList(enumerate(row_list, min_offset)))
def getNodeList(self, params):
"""
......@@ -140,6 +141,18 @@ class TerminalNeoCTL(object):
assert len(params) == 1
return self.neoctl.setClusterState(self.asClusterState(params[0]))
def setNumReplicas(self, params):
"""
Set number of replicas.
Parameters: nr
nr: positive number (0 means no redundancy)
"""
assert len(params) == 1
nr = int(params[0])
if nr < 0:
sys.exit('invalid number of replicas')
return self.neoctl.setNumReplicas(nr)
def startCluster(self, params):
"""
Starts cluster operation after a startup.
......@@ -167,10 +180,18 @@ class TerminalNeoCTL(object):
def tweakPartitionTable(self, params):
"""
Optimize partition table.
No partition will be assigned to specified storage nodes.
Parameters: [node [...]]
No change is done to the specified/down storage nodes and they don't
count as replicas. The purpose of listing nodes is usually to drop
them once the data is replicated to other nodes.
Parameters: [-n] [node [...]]
-n: dry run
"""
return self.neoctl.tweakPartitionTable(map(self.asNode, params))
dry_run = params[0] == '-n'
changed, row_list = self.neoctl.tweakPartitionTable(
map(self.asNode, params[dry_run:]), dry_run)
if changed:
return self.formatRowList(enumerate(row_list))
return 'No change done.'
def killNode(self, params):
"""
......
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from neo.lib.handler import EventHandler
from neo.lib.protocol import ErrorCodes, Packets
......@@ -44,8 +45,8 @@ class CommandEventHandler(EventHandler):
def ack(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.ACK, msg))
def protocolError(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.PROTOCOL_ERROR, msg))
def denied(self, conn, msg):
sys.exit(msg)
def notReady(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.NOT_READY, msg))
......@@ -62,3 +63,4 @@ class CommandEventHandler(EventHandler):
answerLastIDs = __answer(Packets.AnswerLastIDs)
answerLastTransaction = __answer(Packets.AnswerLastTransaction)
answerRecovery = __answer(Packets.AnswerRecovery)
answerTweakPartitionTable = __answer(Packets.AnswerTweakPartitionTable)
......@@ -91,8 +91,14 @@ class NeoCTL(BaseApplication):
raise RuntimeError(response)
return response[2]
def tweakPartitionTable(self, uuid_list=()):
response = self.__ask(Packets.TweakPartitionTable(uuid_list))
def tweakPartitionTable(self, uuid_list=(), dry_run=False):
response = self.__ask(Packets.TweakPartitionTable(dry_run, uuid_list))
if response[0] != Packets.AnswerTweakPartitionTable:
raise RuntimeError(response)
return response[1:]
def setNumReplicas(self, nr):
response = self.__ask(Packets.SetNumReplicas(nr))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
......@@ -163,7 +169,7 @@ class NeoCTL(BaseApplication):
response = self.__ask(packet)
if response[0] != Packets.AnswerPartitionList:
raise RuntimeError(response)
return response[1:3] # ptid, row_list
return response[1:]
def startCluster(self):
"""
......
......@@ -63,6 +63,11 @@ class Application(BaseApplication):
help="do not delete data of discarded cells, which is useful for"
" big databases because the current implementation is"
" inefficient (this option should disappear in the future)")
_.bool('new-nid',
help="request a new NID from a cluster that is already"
" operational, update the database with the new NID and exit,"
" which makes easier to quickly set up a replica by copying"
" the database of another node while it was stopped")
_ = parser.group('database creation')
_.int('i', 'nid',
......@@ -118,10 +123,16 @@ class Application(BaseApplication):
self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only
if 'nid' in config:
self.uuid = config['nid']
logging.node(self.name, self.uuid)
if config.get('new_nid'):
self.new_nid = [x[0] for x in self.dm.iterAssignedCells()]
if not self.new_nid:
sys.exit('database is empty')
self.uuid = None
else:
self.new_nid = ()
if 'nid' in config: # for testing purpose only
self.uuid = config['nid']
logging.node(self.name, self.uuid)
registerLiveDebugger(on_log=self.log)
......@@ -158,36 +169,27 @@ class Application(BaseApplication):
# load configuration
self.uuid = dm.getUUID()
logging.node(self.name, self.uuid)
num_partitions = dm.getNumPartitions()
num_replicas = dm.getNumReplicas()
ptid = dm.getPTID()
# check partition table configuration
if num_partitions is not None and num_replicas is not None:
if num_partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
# create a partition table
self.pt = PartitionTable(num_partitions, num_replicas)
logging.info('Configuration loaded:')
logging.info('PTID : %s', dump(ptid))
logging.info('PTID : %s', dump(dm.getPTID()))
logging.info('Name : %s', self.name)
logging.info('Partitions: %s', num_partitions)
logging.info('Replicas : %s', num_replicas)
def loadPartitionTable(self):
"""Load a partition table from the database."""
self.pt.clear()
ptid = self.dm.getPTID()
if ptid is None:
self.pt = PartitionTable(0, 0)
return
cell_list = []
row_list = []
for offset, uuid, state in self.dm.getPartitionTable():
while len(row_list) <= offset:
row_list.append([])
# register unknown nodes
if self.nm.getByUUID(uuid) is None:
self.nm.createStorage(uuid=uuid)
cell_list.append((offset, uuid, CellStates[state]))
self.pt.update(ptid, cell_list, self.nm)
row_list[offset].append((uuid, CellStates[state]))
self.pt = object.__new__(PartitionTable)
self.pt.load(ptid, self.dm.getNumReplicas(), row_list, self.nm)
def run(self):
try:
......@@ -247,29 +249,16 @@ class Application(BaseApplication):
Note that I do not accept any connection from non-master nodes
at this stage."""
pt = self.pt
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
bootstrap = BootstrapManager(self, NodeTypes.STORAGE,
None if self.new_nid else self.server,
self.devpath, self.new_nid)
self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
self.dm.setUUID(self.uuid)
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
# table, because the table might be incomplete.
if pt is not None:
self.loadPartitionTable()
if num_partitions != pt.getPartitions():
raise RuntimeError('the number of partitions is inconsistent')
if pt is None or pt.getReplicas() != num_replicas:
# changing number of replicas is not an issue
self.dm.setNumPartitions(num_partitions)
self.dm.setNumReplicas(num_replicas)
self.pt = PartitionTable(num_partitions, num_replicas)
self.loadPartitionTable()
# Reload a partition table from the database,
# in case that we're in RECOVERING phase.
self.loadPartitionTable()
def initialize(self):
logging.debug('initializing...')
......
......@@ -51,7 +51,7 @@ class Checker(object):
else:
conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, (), app.id_timestamp))
uuid, app.server, name, app.id_timestamp, (), ()))
self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict)
conn_set.discard(None)
......
......@@ -378,8 +378,8 @@ class ImporterDatabaseManager(DatabaseManager):
conf = self._conf
db = self.db = buildDatabaseManager(conf['adapter'],
(conf['database'], conf.get('engine'), conf['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions
query erase getPartitionTable _iterAssignedCells
for x in """getConfiguration _setConfiguration _getMaxPartition
query erase getPartitionTable iterAssignedCells
updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit
......@@ -396,7 +396,7 @@ class ImporterDatabaseManager(DatabaseManager):
self._writeback.committed()
self.commit = db.commit = commit
def _updateReadable(self):
def _updateReadable(*_):
raise AssertionError
def setUUID(self, nid):
......@@ -443,7 +443,8 @@ class ImporterDatabaseManager(DatabaseManager):
self.zodb_ltid = max(x.ltid for x in self.zodb)
zodb = self.zodb[-1]
self.zodb_loid = zodb.shift_oid + zodb.next_oid - 1
self.zodb_tid = self.db.getLastTID(self.zodb_ltid) or 0
self.zodb_tid = self._getMaxPartition() is not None and \
self.db.getLastTID(self.zodb_ltid) or 0
if callable(self._import): # XXX: why ?
if self.zodb_tid == self.zodb_ltid:
self._finished()
......@@ -726,7 +727,7 @@ class WriteBack(object):
self._event = Event()
self._idle = Event()
self._stop = Event()
self._np = self._db.getNumPartitions()
self._np = 1 + self._db._getMaxPartition()
self._db = cPickle.dumps(self._db, 2)
self._process = Process(target=self._run)
self._process.daemon = True
......
......@@ -102,25 +102,24 @@ class DatabaseManager(object):
finally:
db.close()
_cached_attr_list = (
'_readable_set', '_getPartition', '_getReadablePartition')
def __getattr__(self, attr):
if attr in ('_readable_set', '_getPartition', '_getReadablePartition'):
if attr in self._cached_attr_list:
self._updateReadable()
return self.__getattribute__(attr)
def _partitionTableChanged(self):
try:
del (self._readable_set,
self._getPartition,
self._getReadablePartition)
except AttributeError:
pass
def __enter__(self):
assert not self.LOCK, "not a secondary connection"
# XXX: All config caching should be done in this class,
# rather than in backend classes.
self._config.clear()
self._partitionTableChanged()
try:
for attr in self._cached_attr_list:
delattr(self, attr)
except AttributeError:
pass
def __exit__(self, t, v, tb):
if v is None:
......@@ -180,6 +179,10 @@ class DatabaseManager(object):
def erase(self):
""""""
def restore(self, dump): # for tests
self.erase()
self._restore(dump)
def _setup(self, dedup=False):
"""To be overridden by the backend to set up a database
......@@ -305,21 +308,6 @@ class DatabaseManager(object):
for x, tid in ((x, None), (nid, tid)))
self.setConfiguration('nid', str(nid))
def getNumPartitions(self):
"""
Load the number of partitions from a database.
"""
n = self.getConfiguration('partitions')
if n is not None:
return int(n)
def setNumPartitions(self, num_partitions):
"""
Store the number of partitions into a database.
"""
self.setConfiguration('partitions', num_partitions)
self._partitionTableChanged()
def getNumReplicas(self):
"""
Load the number of replicas from a database.
......@@ -328,12 +316,6 @@ class DatabaseManager(object):
if n is not None:
return int(n)
def setNumReplicas(self, num_replicas):
"""
Store the number of replicas into a database.
"""
self.setConfiguration('replicas', num_replicas)
def getName(self):
"""
Load a name from a database.
......@@ -394,8 +376,9 @@ class DatabaseManager(object):
tids are in unpacked format.
"""
if self.getNumPartitions():
return max(self._getLastTID(x, max_tid) for x in self._readable_set)
x = self._readable_set
if x:
return max(self._getLastTID(x, max_tid) for x in x)
def _getLastIDs(self, partition):
"""Return max(tid) & max(oid) for objects of given partition
......@@ -532,7 +515,7 @@ class DatabaseManager(object):
None if data_serial is None else util.p64(data_serial))
@requires(_getPartitionTable)
def _iterAssignedCells(self):
def iterAssignedCells(self):
my_nid = self.getUUID()
return ((offset, tid) for offset, nid, tid in self._getPartitionTable()
if my_nid == nid)
......@@ -556,13 +539,15 @@ class DatabaseManager(object):
"""
"""
@requires(_getDataLastId)
def _updateReadable(self):
try:
readable_set = self.__dict__['_readable_set']
except KeyError:
def _getMaxPartition(self):
"""
"""
@requires(_getDataLastId, _getMaxPartition)
def _updateReadable(self, reset=True):
if reset:
readable_set = self._readable_set = set()
np = self.getNumPartitions()
np = 1 + self._getMaxPartition()
def _getPartition(x, np=np):
return x % np
def _getReadablePartition(x, np=np, r=readable_set):
......@@ -577,14 +562,15 @@ class DatabaseManager(object):
i = self._getDataLastId(p)
d.append(p << 48 if i is None else i + 1)
else:
readable_set = self._readable_set
readable_set.clear()
readable_set.update(x[0] for x in self._iterAssignedCells()
readable_set.update(x[0] for x in self.iterAssignedCells()
if -x[1] in READABLE)
@requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, cell_list, reset=False):
def changePartitionTable(self, ptid, num_replicas, cell_list, reset=False):
my_nid = self.getUUID()
pt = dict(self._iterAssignedCells())
pt = dict(self.iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be
# incomplete.
backup_tid = self.getBackupTID()
......@@ -603,13 +589,14 @@ class DatabaseManager(object):
outofdate_tid(offset)))
for offset, nid, state in cell_list]
self._changePartitionTable(cell_list, reset)
self._updateReadable()
self._updateReadable(reset)
assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid))
self._setConfiguration('replicas', str(num_replicas))
@requires(_changePartitionTable)
def updateCellTID(self, partition, tid):
t, = (t for p, t in self._iterAssignedCells() if p == partition)
t, = (t for p, t in self.iterAssignedCells() if p == partition)
if t < 0:
return
tid = util.u64(tid)
......@@ -631,7 +618,7 @@ class DatabaseManager(object):
next_tid = util.u64(backup_tid)
if next_tid:
next_tid += 1
for offset, tid in self._iterAssignedCells():
for offset, tid in self.iterAssignedCells():
if tid >= 0: # OUT_OF_DATE
yield offset, p64(tid and tid + 1)
elif -tid in READABLE:
......@@ -873,7 +860,7 @@ class DatabaseManager(object):
assert tid, tid
cell_list = []
my_nid = self.getUUID()
for partition, state in self._iterAssignedCells():
for partition, state in self.iterAssignedCells():
if state > tid:
cell_list.append((partition, my_nid, tid))
self._deleteRange(partition, tid)
......
......@@ -273,6 +273,12 @@ class MySQLDatabaseManager(DatabaseManager):
" ELSE 1-state"
" END as tid")
# Let's wait for a more important change to clean up,
# so that users can still downgrade.
if 0:
def _migrate4(self, schema_dict):
self._setConfiguration('partitions', None)
def _setup(self, dedup=False):
self._config.clear()
q = self.query
......@@ -421,6 +427,9 @@ class MySQLDatabaseManager(DatabaseManager):
q("ALTER TABLE config MODIFY value VARBINARY(%s) NULL" % len(value))
q(sql)
def _getMaxPartition(self):
return self.query("SELECT MAX(`partition`) FROM pt")[0][0]
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
......@@ -979,7 +988,7 @@ class MySQLDatabaseManager(DatabaseManager):
cmd += self._cmdline()
return subprocess.check_output(cmd)
def restore(self, sql):
def _restore(self, sql):
import subprocess
cmd = ['mysql']
cmd += self._cmdline()
......
......@@ -145,6 +145,12 @@ class SQLiteDatabaseManager(DatabaseManager):
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state END")
# Let's wait for a more important change to clean up,
# so that users can still downgrade.
if 0:
def _migrate4(self, schema_dict, index_dict):
self._setConfiguration('partitions', None)
def _setup(self, dedup=False):
# BBB: SQLite has transactional DDL but before Python 3.6,
# the binding automatically commits between such statements.
......@@ -266,6 +272,9 @@ class SQLiteDatabaseManager(DatabaseManager):
else:
q("REPLACE INTO config VALUES (?,?)", (key, str(value)))
def _getMaxPartition(self):
return self.query("SELECT MAX(`partition`) FROM pt").next()[0]
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
......@@ -713,5 +722,5 @@ class SQLiteDatabaseManager(DatabaseManager):
main[-1:-1] = data
return '\n'.join(main) + '\n'
def restore(self, sql):
def _restore(self, sql):
self.conn.executescript(sql)
......@@ -65,14 +65,14 @@ class BaseMasterHandler(BaseHandler):
# See comment in ClientOperationHandler.connectionClosed
self.app.tm.abortFor(uuid, even_if_voted=True)
def notifyPartitionChanges(self, conn, ptid, cell_list):
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id')
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.dm.changePartitionTable(ptid, num_replicas, cell_list)
if app.operational:
app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit()
......
......@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
devpath, id_timestamp):
id_timestamp, devpath, new_nid):
self.checkClusterName(name)
app = self.app
# reject any incoming connections if not ready
......@@ -65,6 +65,6 @@ class IdentificationHandler(EventHandler):
conn.setHandler(handler)
node.setConnection(conn, force)
# accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid))
conn.answer(Packets.AcceptIdentification(
NodeTypes.STORAGE, uuid and app.uuid, uuid))
handler.connectionCompleted(conn)
......@@ -20,10 +20,10 @@ from neo.lib.protocol import Packets, ProtocolError, ZERO_TID
class InitializationHandler(BaseMasterHandler):
def sendPartitionTable(self, conn, ptid, row_list):
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
app = self.app
pt = app.pt
pt.load(ptid, row_list, app.nm)
pt.load(ptid, num_replicas, row_list, app.nm)
if not pt.filled():
raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence.
......@@ -44,7 +44,7 @@ class InitializationHandler(BaseMasterHandler):
logging.debug('drop data for partitions %r', unassigned)
dm.dropPartitions(unassigned)
dm.changePartitionTable(ptid, cell_list, reset=True)
dm.changePartitionTable(ptid, num_replicas, cell_list, reset=True)
dm.commit()
def truncate(self, conn, tid):
......@@ -68,7 +68,8 @@ class InitializationHandler(BaseMasterHandler):
def askPartitionTable(self, conn):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
conn.answer(Packets.AnswerPartitionTable(
pt.getID(), pt.getReplicas(), pt.getRowList()))
def askLockedTransactions(self, conn):
conn.answer(Packets.AnswerLockedTransactions(
......
......@@ -350,7 +350,7 @@ class Replicator(object):
try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name,
(), app.id_timestamp))
app.id_timestamp, (), ()))
except ConnectionClosed:
if previous_node is self.current_node:
return
......
......@@ -98,9 +98,12 @@ class TransactionManager(EventQueue):
self._load_lock_dict = {}
self._replicated = {}
self._replicating = set()
def getPartition(self, oid):
from neo.lib.util import u64
np = app.pt.getPartitions()
np = self._app.pt.getPartitions()
self.getPartition = lambda oid: u64(oid) % np
return self.getPartition(oid)
def discarded(self, offset_list):
self._replicating.difference_update(offset_list)
......
......@@ -433,7 +433,7 @@ class NEOCluster(object):
pending_count += 1
if pending_count == target[0]:
neoctl.startCluster()
except (NotReadyException, RuntimeError):
except (NotReadyException, SystemExit):
pass
if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster')
......@@ -445,7 +445,7 @@ class NEOCluster(object):
def start(last_try):
try:
self.neoctl.startCluster()
except (NotReadyException, RuntimeError), e:
except (NotReadyException, SystemExit), e:
return False, e
return True, None
self.expectCondition(start)
......@@ -649,10 +649,10 @@ class NEOCluster(object):
def expectOudatedCells(self, number, *args, **kw):
def callback(last_try):
row_list = self.neoctl.getPartitionRowList()[1]
row_list = self.neoctl.getPartitionRowList()[2]
number_of_outdated = 0
for row in row_list:
for cell in row[1]:
for cell in row:
if cell[1] == CellStates.OUT_OF_DATE:
number_of_outdated += 1
return number_of_outdated == number, number_of_outdated
......@@ -660,10 +660,10 @@ class NEOCluster(object):
def expectAssignedCells(self, process, number, *args, **kw):
def callback(last_try):
row_list = self.neoctl.getPartitionRowList()[1]
row_list = self.neoctl.getPartitionRowList()[2]
assigned_cells_number = 0
for row in row_list:
for cell in row[1]:
for cell in row:
if cell[0] == process.getUUID():
assigned_cells_number += 1
return assigned_cells_number == number, assigned_cells_number
......
......@@ -47,7 +47,7 @@ class MasterTests(NEOFunctionalTest):
break
neoctl.killNode(uuid)
self.neo.expectDead(master)
self.assertRaises(RuntimeError, neoctl.killNode, primary_uuid)
self.assertRaises(SystemExit, neoctl.killNode, primary_uuid)
def testStoppingPrimaryWithTwoSecondaries(self):
# Wait for masters to stabilize
......
......@@ -172,7 +172,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectOudatedCells(2)
self.neo.expectClusterRunning()
self.assertRaises(RuntimeError, self.neo.neoctl.killNode,
self.assertRaises(SystemExit, self.neo.neoctl.killNode,
started[1].getUUID())
started[1].stop()
# Cluster not operational anymore. Only cells of second storage that
......@@ -323,7 +323,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectStorageUnknown(started[0])
self.neo.expectAssignedCells(started[0], 0)
self.neo.expectAssignedCells(started[1], 10)
self.assertRaises(RuntimeError, self.neo.neoctl.dropNode,
self.assertRaises(SystemExit, self.neo.neoctl.dropNode,
started[1].getUUID())
self.neo.expectClusterRunning()
......
......@@ -30,8 +30,6 @@ class MasterClientHandlerTests(NeoUnitTestBase):
config = self.getMasterConfiguration(master_number=1, replicas=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.pt.setID(1)
self.app.em = Mock()
self.app.loid = '\0' * 8
self.app.tm.setLastTID('\0' * 8)
......
......@@ -26,7 +26,6 @@ class MasterAppTests(NeoUnitTestBase):
# create an application object
config = self.getMasterConfiguration()
self.app = Application(config)
self.app.pt.clear()
def _tearDown(self, success):
self.app.close()
......
......@@ -289,7 +289,9 @@ class MasterPartitionTableTests(NeoUnitTestBase):
pt.addNodeList(sn[1:3])
self.assertPartitionTable(pt, 'U..|U..|U..|U..|U..|U..|U..')
self.update(pt, self.tweak(pt, sn[:1]))
self.assertPartitionTable(pt, '.U.|..U|.U.|..U|.U.|..U|.U.')
# See note in PartitionTable.tweak() about drop_list.
#self.assertPartitionTable(pt,'.U.|..U|.U.|..U|.U.|..U|.U.')
self.assertPartitionTable(pt, 'UU.|U.U|UU.|U.U|UU.|U.U|UU.')
def test_18_tweakBigPT(self):
seed = repr(time.time())
......
......@@ -18,8 +18,8 @@ import unittest
from ..mock import Mock
from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, Packets
from neo.master.handlers.storage import StorageServiceHandler
from neo.master.app import Application
from neo.master.handlers.storage import StorageServiceHandler
class MasterStorageHandlerTests(NeoUnitTestBase):
......@@ -29,7 +29,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
config = self.getMasterConfiguration(master_number=1, replicas=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.em = Mock()
self.service = StorageServiceHandler(self.app)
......
......@@ -56,7 +56,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
self.app.pt = Mock({'getID': 1})
count = len(self.app.nm.getList())
self.assertRaises(ProtocolError, self.operation.notifyPartitionChanges,
conn, 0, ())
conn, 0, 0, ())
self.assertEqual(self.app.pt.getID(), 1)
self.assertEqual(len(self.app.nm.getList()), count)
calls = self.app.replicator.mockGetNamedCalls('removePartition')
......@@ -84,13 +84,13 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
ptid = 2
app.dm = Mock({ })
app.replicator = Mock({})
self.operation.notifyPartitionChanges(conn, ptid, cells)
self.operation.notifyPartitionChanges(conn, ptid, 1, cells)
# ptid set
self.assertEqual(app.pt.getID(), ptid)
# dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(ptid, cells)
calls[0].checkArgs(ptid, 1, cells)
if __name__ == "__main__":
unittest.main()
......@@ -48,30 +48,15 @@ class StorageDBTests(NeoUnitTestBase):
raise NotImplementedError
def setNumPartitions(self, num_partitions, reset=0):
try:
db = self._db
except AttributeError:
self._db = db = self.getDB(reset)
else:
if reset:
db.setup(reset)
else:
try:
n = db.getNumPartitions()
except KeyError:
n = 0
if num_partitions == n:
return
if num_partitions < n:
db.dropPartitions(n)
db.setNumPartitions(num_partitions)
self.assertEqual(num_partitions, db.getNumPartitions())
assert not hasattr(self, '_db')
self._db = db = self.getDB(reset)
uuid = self.getStorageUUID()
db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID())
db.changePartitionTable(1,
db.changePartitionTable(1, 0,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True)
self.assertEqual(num_partitions, 1 + db._getMaxPartition())
db.commit()
def checkConfigEntry(self, get_call, set_call, value):
......@@ -102,16 +87,6 @@ class StorageDBTests(NeoUnitTestBase):
db = self.getDB()
self.checkConfigEntry(db.getName, db.setName, 'TEST_NAME')
def test_getPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
uuid1, uuid2 = self.getStorageUUID(), self.getStorageUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE)
db.changePartitionTable(1, [cell1, cell2], 1)
result = db.getPartitionTable()
self.assertEqual(set(result), {cell1, cell2})
def getOIDs(self, count):
return map(p64, xrange(count))
......@@ -202,52 +177,6 @@ class StorageDBTests(NeoUnitTestBase):
self.assertEqual(self.db.getObject(oid1, before_tid=tid2),
OBJECT_T1_NEXT)
def test_setPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
ptid = 1
uuid = self.getStorageUUID()
cell1 = 0, uuid, CellStates.OUT_OF_DATE
cell2 = 1, uuid, CellStates.UP_TO_DATE
cell3 = 1, uuid, CellStates.DISCARDED
# no partition table
self.assertEqual(list(db.getPartitionTable()), [])
# set one
db.changePartitionTable(ptid, [cell1], 1)
result = db.getPartitionTable()
self.assertEqual(list(result), [cell1])
# then another
db.changePartitionTable(ptid, [cell2], 1)
result = db.getPartitionTable()
self.assertEqual(list(result), [cell2])
# drop discarded cells
db.changePartitionTable(ptid, [cell2, cell3], 1)
result = db.getPartitionTable()
self.assertEqual(list(result), [])
def test_changePartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
ptid = 1
uuid = self.getStorageUUID()
cell1 = 0, uuid, CellStates.OUT_OF_DATE
cell2 = 1, uuid, CellStates.UP_TO_DATE
cell3 = 1, uuid, CellStates.DISCARDED
# no partition table
self.assertEqual(list(db.getPartitionTable()), [])
# set one
db.changePartitionTable(ptid, [cell1])
result = db.getPartitionTable()
self.assertEqual(list(result), [cell1])
# add more entries
db.changePartitionTable(ptid, [cell2])
result = db.getPartitionTable()
self.assertEqual(set(result), {cell1, cell2})
# drop discarded cells
db.changePartitionTable(ptid, [cell2, cell3])
result = db.getPartitionTable()
self.assertEqual(list(result), [cell1])
def test_commitTransaction(self):
oid1, oid2 = self.getOIDs(2)
tid1, tid2 = self.getTIDs(2)
......
......@@ -19,12 +19,9 @@ class Handler(MasterEventHandler):
super(Handler, self).answerClusterState(conn, state)
self.app.refresh('state')
def answerPartitionTable(self, *args):
super(Handler, self).answerPartitionTable(*args)
self.app.refresh('pt')
def sendPartitionTable(self, *args):
raise AssertionError
super(Handler, self).sendPartitionTable(*args)
self.app.refresh('pt')
def notifyPartitionChanges(self, *args):
super(Handler, self).notifyPartitionChanges(*args)
......
......@@ -383,7 +383,10 @@ class ServerNode(Node):
assert not self.is_alive()
init_args = self._init_args
init_args['reset'] = False
assert set(kw).issubset(init_args), (kw, init_args)
if __debug__:
x = set(kw).difference(init_args)
assert not x or x.issubset(self.option_parser.getOptionDict()), (
kw, init_args)
init_args.update(kw)
self.close()
self.__init__(**init_args)
......@@ -810,7 +813,7 @@ class NEOCluster(object):
master_list = self.master_list
if storage_list is None:
storage_list = self.storage_list
def answerPartitionTable(release, orig, *args):
def sendPartitionTable(release, orig, *args):
orig(*args)
release()
def dispatch(release, orig, handler, *args):
......@@ -826,7 +829,7 @@ class NEOCluster(object):
if state in expected_state:
release()
with Serialized.until(MasterEventHandler,
answerPartitionTable=answerPartitionTable) as tic1, \
sendPartitionTable=sendPartitionTable) as tic1, \
Serialized.until(RecoveryManager, dispatch=dispatch) as tic2, \
Serialized.until(MasterEventHandler,
notifyClusterInformation=notifyClusterInformation) as tic3:
......@@ -851,9 +854,13 @@ class NEOCluster(object):
expected_state = (NodeStates.PENDING
if state == ClusterStates.RECOVERING
else NodeStates.RUNNING)
for node in self.storage_list if storage_list is None else storage_list:
for node, expected_state in (
storage_list if isinstance(storage_list, dict) else
dict.fromkeys(self.storage_list if storage_list is None else
storage_list, expected_state)
).iteritems():
state = self.getNodeState(node)
assert state == expected_state, (repr(node), state)
assert state == expected_state, (repr(node), state, expected_state)
def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw):
if self.started:
......@@ -927,7 +934,7 @@ class NEOCluster(object):
def startCluster(self):
try:
self.neoctl.startCluster()
except RuntimeError:
except SystemExit:
Serialized.tic()
if self.neoctl.getClusterState() not in (
ClusterStates.BACKINGUP,
......@@ -1006,18 +1013,18 @@ class NEOCluster(object):
"""Sort storages so that storage_list[i] has partition i for all i"""
pt = [{x.getUUID() for x in x}
for x in self.primary_master.pt.partition_list]
n = len(self.storage_list)
r = []
x = [iter(pt[0])]
try:
while 1:
try:
r.append(next(x[-1]))
except StopIteration:
del r[-1], x[-1]
else:
x.append(iter(pt[len(r)].difference(r)))
except IndexError:
assert len(r) == len(self.storage_list)
while 1:
try:
r.append(next(x[-1]))
except StopIteration:
del r[-1], x[-1]
else:
if len(r) == n:
break
x.append(iter(pt[len(r)].difference(r)))
x = {x.uuid: x for x in self.storage_list}
self.storage_list[:] = (x[r] for r in r)
return self.storage_list
......
......@@ -42,6 +42,7 @@ from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.transactions import Transaction
from neo.master.handlers.client import ClientServiceHandler
from neo.master.pt import PartitionTable
from neo.storage.database import DatabaseFailure
from neo.storage.handlers.client import ClientOperationHandler
from neo.storage.handlers.identification import IdentificationHandler
......@@ -1307,7 +1308,7 @@ class Test(NEOThreadedTest):
del conn._queue[:] # XXX
conn.close()
if 1:
with Patch(cluster.master.pt, make=make), \
with Patch(PartitionTable, make=make), \
Patch(InitializationHandler,
askPartitionTable=askPartitionTable) as p:
cluster.start()
......@@ -2336,8 +2337,8 @@ class Test(NEOThreadedTest):
for x in 'ab':
r[x] = PCounterWithResolution()
t1.commit()
cluster.stop(replicas=1)
cluster.start()
cluster.neoctl.setNumReplicas(1)
self.tic()
s0, s1 = cluster.sortStorageList()
t1, c1 = cluster.getTransaction()
r = c1.root()
......@@ -2521,8 +2522,8 @@ class Test(NEOThreadedTest):
for x in 'ab':
r[x] = PCounterWithResolution()
t1.commit()
cluster.stop(replicas=1)
cluster.start()
cluster.neoctl.setNumReplicas(1)
self.tic()
s0, s1 = cluster.sortStorageList()
t1, c1 = cluster.getTransaction()
r = c1.root()
......@@ -2823,9 +2824,9 @@ class Test(NEOThreadedTest):
dm = s.dm
dm.commit()
dump_dict[s.uuid] = dm.dump()
dm.erase()
with open(path % (s.getAdapter(), s.uuid)) as f:
dm.restore(f.read())
dm.setConfiguration('partitions', None) # XXX: see dm._migrate4
with NEOCluster(storage_count=3, partitions=3, replicas=1,
name=self._testMethodName) as cluster:
s1, s2, s3 = cluster.storage_list
......
......@@ -29,7 +29,7 @@ from neo.storage.database.manager import DatabaseManager
from neo.storage import replicator
from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection
from neo.lib.protocol import CellStates, ClusterStates, Packets, \
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import add64, p64, u64
from .. import Patch, TransactionalResource
......@@ -74,6 +74,8 @@ class ReplicationTests(NEOThreadedTest):
source_dict = {x.uuid: x for x in cluster.upstream.storage_list}
for storage in cluster.storage_list:
self.assertFalse(storage.dm._uncommitted_data)
if storage.pt is None:
storage.loadPartitionTable()
self.assertEqual(np, storage.pt.getPartitions())
for partition in pt.getAssignedPartitionList(storage.uuid):
cell_list = upstream_pt.getCellList(partition, readable=True)
......@@ -89,6 +91,7 @@ class ReplicationTests(NEOThreadedTest):
checksum_list = [
self.checksumPartition(storage_dict[x.getUUID()], offset)
for x in pt.getCellList(offset)]
self.assertLess(1, len(checksum_list))
self.assertEqual(1, len(set(checksum_list)),
(offset, checksum_list))
......@@ -445,13 +448,13 @@ class ReplicationTests(NEOThreadedTest):
return isinstance(packet, delayed) and \
packet.decode()[0] == offset and \
conn in s1.getConnectionList(s0)
def changePartitionTable(orig, ptid, cell_list):
def changePartitionTable(orig, ptid, num_replicas, cell_list):
if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
connection_filter.remove(delayAskFetch)
# XXX: this is currently not done by
# default for performance reason
orig.im_self.dropPartitions((offset,))
return orig(ptid, cell_list)
return orig(ptid, num_replicas, cell_list)
np = cluster.num_partitions
s0, s1, s2 = cluster.storage_list
for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
......@@ -511,7 +514,9 @@ class ReplicationTests(NEOThreadedTest):
for x in 'ab':
r[x] = PCounter()
t.commit()
cluster.stop(replicas=1)
cluster.neoctl.setNumReplicas(1)
self.tic()
cluster.stop()
cluster.start((s1, s2))
with ConnectionFilter() as f:
f.delayAddObject()
......@@ -928,6 +933,40 @@ class ReplicationTests(NEOThreadedTest):
def testReplicationBlockedByUnfinished2(self):
self.testReplicationBlockedByUnfinished1(True)
@with_cluster(partitions=6, storage_count=4, start_cluster=0)
def testCloneStorage(self, cluster):
"""
Test cloning of storage nodes using --new-nid instead NEO replication.
"""
s01 = cluster.storage_list[:2]
s23 = cluster.storage_list[2:]
cluster.start(storage_list=s01)
cluster.importZODB()(6)
self.tic()
with Patch(cluster, storage_list=s01):
cluster.sortStorageList()
cluster.stop()
cluster.storage_list[:2] = s01
storage_dict = {}
for s, d in zip(s01, s23):
d.dm.restore(s.dm.dump())
d.resetNode(new_nid=True)
storage_dict[s] = NodeStates.RUNNING
storage_dict[d] = NodeStates.DOWN
cluster.start(storage_dict)
cluster.join(s23)
for d in s23:
d.resetNode(new_nid=False)
d.start()
self.tic()
self.checkReplicas(cluster)
expected = '|'.join(['U.U.|.U.U'] * 3)
self.assertPartitionTable(cluster, expected)
cluster.neoctl.setNumReplicas(1)
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertPartitionTable(cluster, expected)
@with_cluster(partitions=5, replicas=2, storage_count=3)
def testCheckReplicas(self, cluster):
from neo.storage import checker
......@@ -940,8 +979,8 @@ class ReplicationTests(NEOThreadedTest):
return s0.uuid
def check(expected_state, expected_count):
self.assertEqual(expected_count, len([None
for row in cluster.neoctl.getPartitionRowList()[1]
for cell in row[1]
for row in cluster.neoctl.getPartitionRowList()[2]
for cell in row
if cell[1] == CellStates.CORRUPTED]))
self.assertEqual(expected_state, cluster.neoctl.getClusterState())
np = cluster.num_partitions
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment