...
 
Commits (30)
Showing 62 changed files with 1098 additions and 920 deletions
Change History
==============
1.12 (2019-04-28)
-----------------
Most changes in this version focus on the ability to migrate efficiently
and reliably a big ZODB to NEO, which required changes in the protocol.
See testSplitAndMakeResilientUsingClone for an example of scenario.
Better cluster management:
- New --new-nid storage option for fast cloning.
- The number of wanted replicas is now a property of the database, which is
modifiable when the cluster is running, and reported by `neoctl print pt`.
- Better error reporting from the master to neoctl for denied requests.
- tweak: do not touch cells of nodes that are intended to be dropped.
- tweak: do not crash when trying to remove all nodes.
- tweak: new neoctl option to ask the master to simulate.
- neoctl: better display of full partition tables.
- master: reject drop/tweak commands that could lead to unwanted status.
Importer:
- Fix possible data loss on writeback.
- v1.9 broke replication (as source) once the import is finished.
- Speed up startup when the import is already finished.
- Fix closure of ZODB, and also do it when the import is finished.
- Fix hidden "maximum recursion depth exceeded" at startup.
- Fix resumption when using SQLite.
- v1.10 broke resumption when there are new transactions since the import
started.
MySQL:
- Better support of RocksDB by specifying column families.
- Fix handling of connection strings (--database) without credentials.
1.11 (2019-03-11)
-----------------
......
......@@ -21,7 +21,6 @@ from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger
......@@ -36,8 +35,8 @@ class Application(BaseApplication):
cls.addCommonServerOptions('admin', '127.0.0.1:9999')
_ = _.group('admin')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
_.int('i', 'nid',
help="specify an NID to use for this process (testing purpose)")
def __init__(self, config):
super(Application, self).__init__(
......@@ -53,7 +52,7 @@ class Application(BaseApplication):
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = config.get('uuid')
self.uuid = config.get('nid')
logging.node(self.name, self.uuid)
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self)
......@@ -66,7 +65,6 @@ class Application(BaseApplication):
super(Application, self).close()
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
......@@ -117,40 +115,20 @@ class Application(BaseApplication):
self.cluster_state = None
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.ADMIN, self.server)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
if self.pt is None:
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
row_list = []
pt = self.pt
if max_offset == 0:
max_offset = self.pt.getPartitions()
max_offset = pt.getPartitions()
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
row_list = map(pt.getRow, xrange(min_offset, max_offset))
except IndexError:
conn.send(Errors.ProtocolError('invalid partition table offset'))
else:
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
conn.answer(Packets.AnswerPartitionList(
pt.getID(), pt.getReplicas(), row_list))
......@@ -17,11 +17,12 @@
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
if self.app.master_conn is not None:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
......@@ -74,6 +75,7 @@ class AdminEventHandler(EventHandler):
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
setNumReplicas = forward_ask(Packets.SetNumReplicas)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
......@@ -112,16 +114,12 @@ class MasterEventHandler(EventHandler):
def answerClusterState(self, conn, state):
self.app.cluster_state = state
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
......
......@@ -76,7 +76,7 @@ class Application(ThreadedApplication):
self.primary_master_node = None
self.trying_master_node = None
# no self-assigned UUID, primary master will supply us one
# no self-assigned NID, primary master will supply us one
self._cache = ClientCache() if cache_size is None else \
ClientCache(max_size=cache_size)
self._loading = defaultdict(lambda: (Lock(), []))
......@@ -220,8 +220,8 @@ class Application(ThreadedApplication):
self.notifications_handler,
node=node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, (), None)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, (), ())
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
......@@ -238,7 +238,6 @@ class Application(ThreadedApplication):
# operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again.
logging.info('Initializing from master')
ask(conn, Packets.AskPartitionTable(), handler=handler)
ask(conn, Packets.AskLastTransaction(), handler=handler)
if self.pt.operational():
break
......@@ -264,7 +263,7 @@ class Application(ThreadedApplication):
conn = MTClientConnection(self, self.storage_event_handler, node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, (), self.id_timestamp)
self.uuid, None, self.name, self.id_timestamp, (), ())
try:
self._ask(conn, p, handler=self.storage_bootstrap_handler)
except ConnectionClosed:
......
......@@ -26,10 +26,6 @@ from ..exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def answerPartitionTable(self, conn, ptid, row_list):
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerLastTransaction(*args):
pass
......@@ -42,9 +38,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
except PrimaryElected, e:
self.app.primary_master_node, = e.args
def _acceptIdentification(self, node, num_partitions, num_replicas):
self.app.pt = PartitionTable(num_partitions, num_replicas)
def answerLastTransaction(self, conn, ltid):
app = self.app
app_last_tid = app.__dict__.get('last_tid', '')
......@@ -131,9 +124,12 @@ class PrimaryNotificationsHandler(MTEventHandler):
if db is not None:
db.invalidate(tid, oid_list)
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryNotificationsHandler, self).notifyNodeInformation(
......
......@@ -26,7 +26,7 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, node_type, server=None, devpath=()):
def __init__(self, app, node_type, server=None, devpath=(), new_nid=()):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
......@@ -34,9 +34,8 @@ class BootstrapManager(EventHandler):
"""
self.server = server
self.devpath = devpath
self.new_nid = new_nid
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
app.nm.reset()
uuid = property(lambda self: self.app.uuid)
......@@ -44,7 +43,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, self.devpath, None))
self.server, self.app.name, None, self.devpath, self.new_nid))
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
......@@ -53,10 +52,8 @@ class BootstrapManager(EventHandler):
def connectionLost(self, conn, new_state):
self.current = None
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.current is node, (self.current, node)
self.num_partitions = num_partitions
self.num_replicas = num_replicas
def getPrimaryConnection(self):
"""
......@@ -73,8 +70,7 @@ class BootstrapManager(EventHandler):
try:
while self.current:
if self.current.isIdentified():
return (self.current, self.current.getConnection(),
self.num_partitions, self.num_replicas)
return self.current, self.current.getConnection()
poll(1)
except PrimaryElected, e:
if self.current:
......
......@@ -209,7 +209,7 @@ class BaseConnection(object):
def _getReprInfo(self):
r = [
('uuid', uuid_str(self.getUUID())),
('nid', uuid_str(self.getUUID())),
('address', ('[%s]:%s' if ':' in self.addr[0] else '%s:%s')
% self.addr if self.addr else '?'),
('handler', self.getHandler()),
......
......@@ -26,6 +26,9 @@ from .protocol import (NodeStates, NodeTypes, Packets, uuid_str,
from .util import cached_property
class AnswerDenied(Exception):
"""Helper exception to stop packet processing and answer a Denied error"""
class DelayEvent(Exception):
pass
......@@ -98,6 +101,8 @@ class EventHandler(object):
% (m.im_class.__module__, m.im_class.__name__, m.__name__)))
except NonReadableCell, e:
conn.answer(Errors.NonReadableCell())
except AnswerDenied, e:
conn.answer(Errors.Denied(str(e)))
except AssertionError:
e = sys.exc_info()
try:
......@@ -160,8 +165,7 @@ class EventHandler(object):
def _acceptIdentification(*args):
pass
def acceptIdentification(self, conn, node_type, uuid,
num_partitions, num_replicas, your_uuid):
def acceptIdentification(self, conn, node_type, uuid, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
assert node.getConnection() is conn, (node.getConnection(), conn)
......@@ -180,7 +184,7 @@ class EventHandler(object):
elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids')
node.setIdentified()
self._acceptIdentification(node, num_partitions, num_replicas)
self._acceptIdentification(node)
return
conn.close()
......
......@@ -486,7 +486,7 @@ class NodeManager(EventQueue):
# For the first notification, we receive a full list of nodes from
# the master. Remove all unknown nodes from a previous connection.
for node in self._node_set.difference(added_list):
if app.pt.dropNode(node):
if not node.isStorage() or app.pt.dropNode(node):
self.remove(node)
self.log()
self.executeQueuedEvents()
......
......@@ -22,7 +22,7 @@ from struct import Struct
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 5
PROTOCOL_VERSION = 6
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# Avoid memory errors on corrupted data.
......@@ -62,6 +62,7 @@ class Enum(tuple):
@Enum
def ErrorCodes():
ACK
DENIED
NOT_READY
OID_NOT_FOUND
TID_NOT_FOUND
......@@ -616,10 +617,7 @@ PFCellList = PList('cell_list',
)
PFRowList = PList('row_list',
PStruct('row',
PNumber('offset'),
PFCellList,
),
PFCellList,
)
PFHistoryList = PList('history_list',
......@@ -685,15 +683,15 @@ class RequestIdentification(Packet):
PUUID('uuid'),
PAddress('address'),
PString('name'),
PList('devpath', PString('devid')),
PFloat('id_timestamp'),
# storage:
PList('devpath', PString('devid')),
PList('new_nid', PNumber('offset')),
)
_answer = PStruct('accept_identification',
PFNodeType,
PUUID('my_uuid'),
PNumber('num_partitions'),
PNumber('num_replicas'),
PUUID('your_uuid'),
)
......@@ -749,23 +747,24 @@ class LastIDs(Packet):
class PartitionTable(Packet):
"""
Ask storage node the remaining data needed by master to recover.
This is also how the clients get the full partition table on connection.
:nodes: M -> S; C -> M
:nodes: M -> S
"""
_answer = PStruct('answer_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
class NotifyPartitionTable(Packet):
"""
Send the full partition table to admin/storage nodes on connection.
Send the full partition table to admin/client/storage nodes on connection.
:nodes: M -> A, S
:nodes: M -> A, C, S
"""
_fmt = PStruct('send_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
......@@ -777,6 +776,7 @@ class PartitionChanges(Packet):
"""
_fmt = PStruct('notify_partition_changes',
PPTID('ptid'),
PNumber('num_replicas'),
PList('cell_list',
PStruct('cell',
PNumber('offset'),
......@@ -1202,6 +1202,7 @@ class PartitionList(Packet):
_answer = PStruct('answer_partition_list',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
......@@ -1253,10 +1254,14 @@ class TweakPartitionTable(Packet):
:nodes: ctl -> A -> M
"""
_fmt = PStruct('tweak_partition_table',
PBoolean('dry_run'),
PFUUIDList,
)
_answer = Error
_answer = PStruct('answer_tweak_partition_table',
PBoolean('changed'),
PFRowList,
)
class NotifyNodeInformation(Packet):
"""
......@@ -1269,6 +1274,18 @@ class NotifyNodeInformation(Packet):
PFNodeList,
)
class SetNumReplicas(Packet):
"""
Set the number of replicas.
:nodes: ctl -> A -> M
"""
_fmt = PStruct('set_num_replicas',
PNumber('num_replicas'),
)
_answer = Error
class SetClusterState(Packet):
"""
Set the cluster state.
......@@ -1762,8 +1779,10 @@ class Packets(dict):
SetNodeState, ignore_when_closed=False)
AddPendingNodes = register(
AddPendingNodes, ignore_when_closed=False)
TweakPartitionTable = register(
TweakPartitionTable, ignore_when_closed=False)
TweakPartitionTable, AnswerTweakPartitionTable = register(
TweakPartitionTable)
SetNumReplicas = register(
SetNumReplicas, ignore_when_closed=False)
SetClusterState = register(
SetClusterState, ignore_when_closed=False)
Repair = register(
......
......@@ -86,15 +86,9 @@ class PartitionTable(object):
'a cell became non-readable whereas all cells were readable'
def __init__(self, num_partitions, num_replicas):
self._id = None
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
# Note: don't use [[]] * num_partition construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(num_partitions)]
self.count_dict = {}
self.clear()
def getID(self):
return self._id
......@@ -113,7 +107,16 @@ class PartitionTable(object):
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(self.np)]
self.count_dict.clear()
self.count_dict = {}
def addNodeList(self, node_list):
"""Add nodes"""
added_list = []
for node in node_list:
if node not in self.count_dict:
self.count_dict[node] = 0
added_list.append(node)
return added_list
def getAssignedPartitionList(self, uuid):
""" Return the partition assigned to the specified UUID """
......@@ -203,31 +206,31 @@ class PartitionTable(object):
del self.count_dict[node]
return not count
def load(self, ptid, row_list, nm):
def _load(self, ptid, num_replicas, row_list, getByUUID):
self.__init__(len(row_list), num_replicas)
self._id = ptid
for offset, row in enumerate(row_list):
for uuid, state in row:
node = getByUUID(uuid)
self._setCell(offset, node, state)
def load(self, ptid, num_replicas, row_list, nm):
"""
Load the partition table with the specified PTID, discard all previous
content.
"""
self.clear()
self._id = ptid
for offset, row in row_list:
if offset >= self.getPartitions():
raise IndexError
for uuid, state in row:
node = nm.getByUUID(uuid)
# the node must be known by the node manager
assert node is not None
self._setCell(offset, node, state)
self._load(ptid, num_replicas, row_list, nm.getByUUID)
logging.debug('partition table loaded (ptid=%s)', ptid)
self.log()
def update(self, ptid, cell_list, nm):
def update(self, ptid, num_replicas, cell_list, nm):
"""
Update the partition with the cell list supplied. If a node
is not known, it is created in the node manager and set as unavailable
"""
assert self._id < ptid, (self._id, ptid)
self._id = ptid
self.nr = num_replicas
readable_list = []
for row in self.partition_list:
if not all(cell.isReadable() for cell in row):
......@@ -310,14 +313,11 @@ class PartitionTable(object):
return True
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return []
return [(cell.getUUID(), cell.getState()) for cell in row]
return [(cell.getUUID(), cell.getState())
for cell in self.partition_list[offset]]
def getRowList(self):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
return map(self.getRow, xrange(self.np))
class MTPartitionTable(PartitionTable):
""" Thread-safe aware version of the partition table, override only methods
......
......@@ -16,6 +16,7 @@
import sys
from collections import defaultdict
from functools import partial
from time import time
from neo.lib import logging, util
......@@ -76,13 +77,11 @@ class Application(BaseApplication):
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Master node"
parser = cls.option_parser
parser.description = "NEO Master node"
cls.addCommonServerOptions('master', '127.0.0.1:10000', '')
_ = _.group('master')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
_ = parser.group('master')
_.int('A', 'autostart',
help="minimum number of pending storage nodes to automatically"
" start new cluster (to avoid unwanted recreation of the"
......@@ -91,8 +90,12 @@ class Application(BaseApplication):
help='the name of cluster to backup')
_('M', 'upstream-masters', parse=util.parseMasterList,
help='list of master nodes in the cluster to backup')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
_.int('i', 'nid',
help="specify an NID to use for this process (testing purpose)")
_ = parser.group('database creation')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
def __init__(self, config):
super(Application, self).__init__(
......@@ -108,7 +111,7 @@ class Application(BaseApplication):
for master_address in config['masters']:
self.nm.createMaster(address=master_address)
self._node = self.nm.createMaster(address=self.server,
uuid=config.get('uuid'))
uuid=config.get('nid'))
logging.node(self.name, self.uuid)
logging.debug('IP address is %s, port is %d', *self.server)
......@@ -117,14 +120,14 @@ class Application(BaseApplication):
replicas = config['replicas']
partitions = config['partitions']
if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
sys.exit('replicas must be a positive integer')
if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas)
sys.exit('partitions must be more than zero')
logging.info('Configuration:')
logging.info('Partitions: %d', partitions)
logging.info('Replicas : %d', replicas)
logging.info('Name : %s', self.name)
self.newPartitionTable = partial(PartitionTable, partitions, replicas)
self.listening_conn = None
self.cluster_state = None
......@@ -196,7 +199,7 @@ class Application(BaseApplication):
node_dict[NodeTypes.MASTER].append(node_info)
return node_dict
def broadcastNodesInformation(self, node_list, exclude=None):
def broadcastNodesInformation(self, node_list):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
......@@ -209,20 +212,26 @@ class Application(BaseApplication):
# We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters.
if node_list and node is not exclude:
if node_list:
node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list):
def broadcastPartitionChanges(self, cell_list, num_replicas=None):
"""Broadcast a Notify Partition Changes packet."""
if cell_list:
ptid = self.pt.setNextID()
self.pt.logUpdated()
packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
pt = self.pt
if num_replicas is not None:
pt.setReplicas(num_replicas)
elif cell_list:
num_replicas = pt.getReplicas()
else:
return
packet = Packets.NotifyPartitionChanges(
pt.setNextID(), num_replicas, cell_list)
pt.logUpdated()
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
def provideService(self):
"""
......@@ -437,16 +446,7 @@ class Application(BaseApplication):
conn.send(notification_packet)
elif conn.isServer():
continue
if node.isClient():
if state == ClusterStates.RUNNING:
handler = self.client_service_handler
elif state == ClusterStates.BACKINGUP:
handler = self.client_ro_service_handler
else:
if state != ClusterStates.STOPPING:
conn.abort()
continue
elif node.isMaster():
if node.isMaster():
if state == ClusterStates.RECOVERING:
handler = self.election_handler
else:
......@@ -454,10 +454,16 @@ class Application(BaseApplication):
elif node.isStorage() and storage_handler:
handler = storage_handler
else:
# There's a single handler type for admins.
# Client can't change handler without being first disconnected.
assert state in (
ClusterStates.STOPPING,
ClusterStates.STOPPING_BACKUP,
) or not node.isClient(), (state, node)
continue # keep handler
if type(handler) is not type(conn.getLastHandler()):
conn.setHandler(handler)
handler.connectionCompleted(conn, new=False)
handler.handlerSwitched(conn, new=False)
self.cluster_state = state
def getNewUUID(self, uuid, address, node_type):
......
......@@ -111,17 +111,12 @@ class BackupApplication(object):
else:
break
poll(1)
node, conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
node, conn = bootstrap.getPrimaryConnection()
try:
app.changeClusterState(ClusterStates.BACKINGUP)
del bootstrap, node
if num_partitions != pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
self.ignore_invalidations = True
self.pt = PartitionTable(num_partitions, num_replicas)
conn.setHandler(BackupHandler(self))
conn.ask(Packets.AskPartitionTable())
conn.ask(Packets.AskLastTransaction())
# debug variable to log how big 'tid_list' can be.
self.debug_tid_count = 0
......
......@@ -23,10 +23,6 @@ from neo.lib.protocol import Packets
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def connectionCompleted(self, conn, new=None):
if new is None:
super(MasterHandler, self).connectionCompleted(conn)
def connectionLost(self, conn, new_state=None):
if self.app.listening_conn: # if running
self._connectionLost(conn)
......@@ -59,17 +55,20 @@ class MasterHandler(EventHandler):
+ app.getNodeInformationDict(node_list)[node.getType()])
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn):
def handlerSwitched(self, conn, new):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
# Except storages during recovery and secondary masters, all nodes
# receives the full partition table as soon as they're identified.
# It is also sent in 2 other cases:
# - to admins during recovery, whenever a newer PT is loaded;
# - to storage when switching from recovery to verification.
# After that, non-master nodes only receive incremental updates.
conn.send(Packets.SendPartitionTable(
pt.getID(), pt.getReplicas(), pt.getRowList()))
class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase."""
def connectionCompleted(self, conn, new):
pt = self.app.pt
conn.send(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
"""Common handler class for storage nodes."""
def connectionLost(self, conn, new_state):
app = self.app
......
......@@ -15,14 +15,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
from functools import wraps
from . import MasterHandler
from ..app import monotonic_time, StateChangedException
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.handler import AnswerDenied
from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
NodeStates, NodeTypes, Packets, uuid_str
from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = {
......@@ -38,9 +40,25 @@ NODE_STATE_WORKFLOW = {
NodeTypes.STORAGE: (NodeStates.DOWN, NodeStates.UNKNOWN),
}
def check_state(*states):
def decorator(wrapped):
def wrapper(self, *args):
state = self.app.getClusterState()
if state not in states:
raise AnswerDenied('%s RPC can not be used in %s state'
% (wrapped.__name__, state))
wrapped(self, *args)
return wraps(wrapped)(wrapper)
return decorator
class AdministrationHandler(MasterHandler):
"""This class deals with messages from the admin node only"""
def handlerSwitched(self, conn, new):
assert new
super(AdministrationHandler, self).handlerSwitched(conn, new)
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
if node is not None:
......@@ -58,30 +76,28 @@ class AdministrationHandler(MasterHandler):
# check request
try:
if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
raise ProtocolError('Can not switch to this state')
raise AnswerDenied('Can not switch to this state')
except KeyError:
if state != ClusterStates.STOPPING:
raise ProtocolError('Invalid state requested')
raise AnswerDenied('Invalid state requested')
# change state
if state == ClusterStates.VERIFYING:
storage_list = app.nm.getStorageList(only_identified=True)
if not storage_list:
raise ProtocolError('Cannot exit recovery without any '
'storage node')
raise AnswerDenied(
'Cannot exit recovery without any storage node')
for node in storage_list:
assert node.isPending(), node
if node.getConnection().isPending():
# XXX: It's wrong to use ProtocolError here. We must reply
# less aggressively because the admin has no way to
# know that there's still pending activity.
raise ProtocolError('Cannot exit recovery now: node %r is '
'entering cluster' % (node, ))
raise AnswerDenied(
'Cannot exit recovery now: node %r is entering cluster'
% node,)
app._startup_allowed = True
state = app.cluster_state
elif state == ClusterStates.STARTING_BACKUP:
if app.tm.hasPending() or app.nm.getClientList(True):
raise ProtocolError("Can not switch to %s state with pending"
raise AnswerDenied("Can not switch to %s state with pending"
" transactions or connected clients" % state)
conn.answer(Errors.Ack('Cluster state changed'))
......@@ -93,21 +109,24 @@ class AdministrationHandler(MasterHandler):
app = self.app
node = app.nm.getByUUID(uuid)
if node is None:
raise ProtocolError('unknown node')
raise AnswerDenied('unknown node')
if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
raise ProtocolError('can not switch node to this state')
raise AnswerDenied('can not switch node to %s state' % state)
if uuid == app.uuid:
raise ProtocolError('can not kill primary master node')
raise AnswerDenied('can not kill primary master node')
state_changed = state != node.getState()
message = ('state changed' if state_changed else
'node already in %s state' % state)
if node.isStorage():
keep = state == NodeStates.DOWN
if node.isRunning() and not keep:
raise AnswerDenied(
"a running node must be stopped before removal")
try:
cell_list = app.pt.dropNodeList([node], keep)
except PartitionTableException, e:
raise ProtocolError(str(e))
raise AnswerDenied(str(e))
node.setState(state)
if node.isConnected():
# notify itself so it can shutdown
......@@ -134,16 +153,17 @@ class AdministrationHandler(MasterHandler):
monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node])
# XXX: Would it be safe to allow more states ?
__change_pt_rpc = check_state(
ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP)
@__change_pt_rpc
def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join(map(uuid_str, uuid_list))
logging.debug('Add nodes %s', uuids)
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not add nodes in %s state' % state)
# take all pending nodes
node_list = list(app.pt.addNodeList(node
for node in app.nm.getStorageList()
......@@ -165,31 +185,50 @@ class AdministrationHandler(MasterHandler):
for uuid in uuid_list:
node = getByUUID(uuid)
if node is None or not (node.isStorage() and node.isIdentified()):
raise ProtocolError("invalid storage node %s" % uuid_str(uuid))
raise AnswerDenied("invalid storage node %s" % uuid_str(uuid))
node_list.append(node)
repair = Packets.NotifyRepair(*args)
for node in node_list:
node.send(repair)
conn.answer(Errors.Ack(''))
def tweakPartitionTable(self, conn, uuid_list):
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not tweak partition table in %s state'
% state)
app.broadcastPartitionChanges(app.pt.tweak([node
for node in app.nm.getStorageList()
if node.getUUID() in uuid_list or not node.isRunning()]))
@__change_pt_rpc
def setNumReplicas(self, conn, num_replicas):
self.app.broadcastPartitionChanges((), num_replicas)
conn.answer(Errors.Ack(''))
def truncate(self, conn, tid):
@__change_pt_rpc
def tweakPartitionTable(self, conn, dry_run, uuid_list):
app = self.app
if app.cluster_state != ClusterStates.RUNNING:
raise ProtocolError('Can not truncate in this state')
drop_list = []
for node in app.nm.getStorageList():
if node.getUUID() in uuid_list or node.isPending():
drop_list.append(node)
elif not node.isRunning():
drop_list.append(node)
raise AnswerDenied(
'tweak: down nodes must be listed explicitly')
if dry_run:
pt = object.__new__(app.pt.__class__)
new_nodes = pt.load(app.pt.getID(), app.pt.getReplicas(),
app.pt.getRowList(), app.nm)
assert not new_nodes
pt.addNodeList(node
for node, count in app.pt.count_dict.iteritems()
if not count)
else:
pt = app.pt
try:
changed_list = pt.tweak(drop_list)
except PartitionTableException, e:
raise AnswerDenied(str(e))
if not dry_run:
app.broadcastPartitionChanges(changed_list)
conn.answer(Packets.AnswerTweakPartitionTable(
bool(changed_list), pt.getRowList()))
@check_state(ClusterStates.RUNNING)
def truncate(self, conn, tid):
conn.answer(Errors.Ack(''))
raise StoppedOperation(tid)
......@@ -237,3 +276,5 @@ class AdministrationHandler(MasterHandler):
node.send(Packets.CheckPartition(
offset, source, min_tid, max_tid))
conn.answer(Errors.Ack(''))
del __change_pt_rpc
......@@ -17,6 +17,7 @@
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.protocol import ZERO_TID
from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler):
"""Handler dedicated to upstream master during BACKINGUP state"""
......@@ -25,12 +26,15 @@ class BackupHandler(EventHandler):
if self.app.app.listening_conn: # if running
raise PrimaryFailure('connection lost')
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
app = self.app
pt = app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
if pt.getPartitions() != app.app.pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def answerLastTransaction(self, conn, tid):
app = self.app
......
......@@ -22,6 +22,10 @@ from . import MasterHandler
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
def handlerSwitched(self, conn, new):
assert new
super(ClientServiceHandler, self).handlerSwitched(conn, new)
def _connectionLost(self, conn):
# cancel its transactions and forgot the node
app = self.app
......
......@@ -17,14 +17,14 @@
from neo.lib import logging
from neo.lib.exception import PrimaryElected
from neo.lib.handler import EventHandler
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \
NotReadyError, Packets, ProtocolError, uuid_str
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
NodeTypes, NotReadyError, Packets, ProtocolError, uuid_str
from ..app import monotonic_time
class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......@@ -77,6 +77,16 @@ class IdentificationHandler(EventHandler):
manager = app
state, handler = manager.identifyStorageNode(
uuid is not None and node is not None)
if not address:
if app.cluster_state == ClusterStates.RECOVERING:
raise NotReadyError
if uuid or not new_nid:
raise ProtocolError
state = NodeStates.DOWN
# We'll let the storage node close the connection. If we
# aborted it at the end of the method, BootstrapManager
# (which is used by storage nodes) could see the closure
# and try to reconnect to a master.
human_readable_node_type = ' storage (%s) ' % (state, )
elif node_type == NodeTypes.MASTER:
if app.election:
......@@ -105,24 +115,27 @@ class IdentificationHandler(EventHandler):
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time()
node.setState(state)
app.broadcastNodesInformation([node])
if new_nid:
changed_list = []
for offset in new_nid:
changed_list.append((offset, uuid, CellStates.OUT_OF_DATE))
app.pt._setCell(offset, node, CellStates.OUT_OF_DATE)
app.broadcastPartitionChanges(changed_list)
conn.setHandler(handler)
node.setConnection(conn, not node.isIdentified())
app.broadcastNodesInformation([node], node)
conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
uuid))
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True)
handler.handlerSwitched(conn, True)
class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......
......@@ -23,6 +23,9 @@ from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
class SecondaryHandler(MasterHandler):
"""Handler used by primary to handle secondary masters"""
def handlerSwitched(self, conn, new):
pass
def _connectionLost(self, conn):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
......@@ -30,21 +33,20 @@ class SecondaryHandler(MasterHandler):
app.broadcastNodesInformation([node])
class ElectionHandler(MasterHandler):
class ElectionHandler(SecondaryHandler):
"""Handler used by primary to handle secondary masters during election"""
def connectionCompleted(self, conn, new=None):
if new is None:
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, (), app.election))
def connectionCompleted(self, conn):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election, (), ()))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
self.connectionLost(conn)
def _acceptIdentification(self, node, *args):
def _acceptIdentification(self, node):
raise PrimaryElected(node)
def _connectionLost(self, *args):
......@@ -66,7 +68,7 @@ class ElectionHandler(MasterHandler):
class PrimaryHandler(ElectionHandler):
"""Handler used by secondaries to handle primary master"""
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.app.primary_master is node, (self.app.primary_master, node)
def _connectionLost(self, conn):
......
......@@ -26,10 +26,10 @@ from . import BaseServiceHandler
class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """
def connectionCompleted(self, conn, new):
def handlerSwitched(self, conn, new):
app = self.app
if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new)
super(StorageServiceHandler, self).handlerSwitched(conn, new)
node = app.nm.getByUUID(conn.getUUID())
if node.isRunning(): # node may be PENDING
app.startStorage(node)
......
......@@ -56,6 +56,10 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self._id += 1
return self._id
def setReplicas(self, num_replicas):
assert num_replicas >= 0, num_replicas
self.nr = num_replicas
def make(self, node_list):
"""Make a new partition table from scratch."""
assert self._id is None and node_list, (self._id, node_list)
......@@ -108,26 +112,19 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self.num_filled_rows = len(filter(None, self.partition_list))
return change_list
def load(self, ptid, row_list, nm):
def load(self, ptid, num_replicas, row_list, nm):
"""
Load a partition table from a storage node during the recovery.
Return the new storage nodes registered
"""
# check offsets
for offset, _row in row_list:
if offset >= self.getPartitions():
raise IndexError, offset
# store the partition table
self.clear()
self._id = ptid
new_nodes = []
for offset, row in row_list:
for uuid, state in row:
node = nm.getByUUID(uuid)
if node is None:
node = nm.createStorage(uuid=uuid)
new_nodes.append(node.asTuple())
self._setCell(offset, node, state)
def getByUUID(nid):
node = nm.getByUUID(nid)
if node is None:
node = nm.createStorage(uuid=nid)
new_nodes.append(node.asTuple())
return node
self._load(ptid, num_replicas, row_list, getByUUID)
return new_nodes
def setUpToDate(self, node, offset):
......@@ -166,15 +163,6 @@ class PartitionTable(neo.lib.pt.PartitionTable):
return cell_list
def addNodeList(self, node_list):
"""Add nodes"""
added_list = []
for node in node_list:
if node not in self.count_dict:
self.count_dict[node] = 0
added_list.append(node)
return added_list
def tweak(self, drop_list=()):
"""Optimize partition table
......@@ -183,7 +171,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
few readable cells, some cells are instead marked as FEEDING. This is
a preliminary step to drop these nodes, otherwise the partition table
could become non-operational.
- Other nodes must have the same number of cells, off by 1.
In fact, the code touching these cells is disabled (see NOTE below).
- Other nodes must have the same number of non-feeding cells, off by 1.
- When a transaction creates new objects (oids are roughly allocated
sequentially), we expect better performance by maximizing the number
of involved nodes (i.e. parallelizing writes).
......@@ -232,6 +221,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
# Collect some data in a usable form for the rest of the method.
node_list = {node: {} for node in self.count_dict
if node not in drop_list}
if not node_list:
raise neo.lib.pt.PartitionTableException("Can't remove all nodes.")
drop_list = defaultdict(list)
for offset, row in enumerate(self.partition_list):
for cell in row:
......@@ -420,6 +411,22 @@ class PartitionTable(neo.lib.pt.PartitionTable):
outdated_list[offset] -= 1
for offset, cell in cell_dict.iteritems():
discard_list[offset].append(cell)
# NOTE: The following line disables the next 2 lines, which actually
# causes cells in drop_list to be discarded, now or later;
# drop_list could be renamed into ignore_list.
# 1. Deleting data partition per partition is a lot of work, so
# why ask nodes in drop_list to do that when the goal is
# simply to trash the whole underlying database?
# 2. By excluding nodes from a tweak, it becomes possible to have
# parts of the partition table that are tweaked differently.
# This may require to temporarily change the number of
# replicas for the part being tweaked. In the future, this
# number may be specified in the 'tweak' command, to avoid
# race conditions with setUpToDate().
# Overall, a common use case is when importing a ZODB to NEO,
# to keep the initial importing node up until the database is
# split and replicated to the final nodes.
drop_list = {}
for offset, drop_list in drop_list.iteritems():
discard_list[offset] += drop_list
# We have sorted cells to discard in order to first deallocate nodes
......
......@@ -28,7 +28,7 @@ class RecoveryManager(MasterHandler):
def __init__(self, app):
# The target node's uuid to request next.
self.target_ptid = None
self.target_ptid = 0
self.ask_pt = []
self.backup_tid_dict = {}
self.truncate_dict = {}
......@@ -52,9 +52,8 @@ class RecoveryManager(MasterHandler):
"""
logging.info('begin the recovery of the status')
app = self.app
pt = app.pt
pt = app.pt = app.newPartitionTable()
app.changeClusterState(ClusterStates.RECOVERING)
pt.clear()
self.try_secondary = True
......@@ -113,7 +112,7 @@ class RecoveryManager(MasterHandler):
for node in node_list:
conn = node.getConnection()
conn.send(truncate)
self.connectionCompleted(conn, False)
self.handlerSwitched(conn, False)
continue
node_list = pt.getConnectedNodeList()
break
......@@ -140,12 +139,12 @@ class RecoveryManager(MasterHandler):
logging.info('creating a new partition table')
pt.make(node_list)
self._notifyAdmins(Packets.SendPartitionTable(
pt.getID(), pt.getRowList()))
pt.getID(), pt.getReplicas(), pt.getRowList()))
else:
cell_list = pt.outdate()
if cell_list:
self._notifyAdmins(Packets.NotifyPartitionChanges(
pt.setNextID(), cell_list))
pt.setNextID(), pt.getReplicas(), cell_list))
if app.backup_tid:
pt.setBackupTidDict(self.backup_tid_dict)
app.backup_tid = pt.getBackupTid()
......@@ -175,16 +174,16 @@ class RecoveryManager(MasterHandler):
if node is None or node.getState() == new_state:
return
node.setState(new_state)
# broadcast to all so that admin nodes gets informed
self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn, new):
def handlerSwitched(self, conn, new):
# ask the last IDs to perform the recovery
conn.ask(Packets.AskRecovery())
def answerRecovery(self, conn, ptid, backup_tid, truncate_tid):
uuid = conn.getUUID()
if self.target_ptid <= ptid:
# ptid is None if the node has an empty partition table.
if ptid and self.target_ptid <= ptid:
# Maybe a newer partition table.
if self.target_ptid == ptid and self.ask_pt:
# Another node is already asked.
......@@ -197,17 +196,14 @@ class RecoveryManager(MasterHandler):
self.backup_tid_dict[uuid] = backup_tid
self.truncate_dict[uuid] = truncate_tid
def answerPartitionTable(self, conn, ptid, row_list):
def answerPartitionTable(self, conn, ptid, num_replicas, row_list):
# If this is not from a target node, ignore it.
if ptid == self.target_ptid:
app = self.app
try:
new_nodes = app.pt.load(ptid, row_list, app.nm)
except IndexError:
raise ProtocolError('Invalid offset')
new_nodes = app.pt.load(ptid, num_replicas, row_list, app.nm)
self._notifyAdmins(
Packets.NotifyNodeInformation(monotonic_time(), new_nodes),
Packets.SendPartitionTable(ptid, row_list))
Packets.SendPartitionTable(ptid, num_replicas, row_list))
self.ask_pt = ()
uuid = conn.getUUID()
app.backup_tid = self.backup_tid_dict[uuid]
......
......@@ -16,9 +16,11 @@
import sys
from .neoctl import NeoCTL, NotReadyException
from neo.lib.node import NodeManager
from neo.lib.pt import PartitionTable
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.protocol import uuid_str, formatNodeList, \
ClusterStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
ClusterStates, NodeStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
action_dict = {
'print': {
......@@ -30,6 +32,7 @@ action_dict = {
},
'set': {
'cluster': 'setClusterState',
'replicas': 'setNumReplicas',
},
'check': 'checkReplicas',
'start': 'startCluster',
......@@ -46,6 +49,11 @@ uuid_int = (lambda ns: lambda uuid:
(ns[uuid[0]] << 24) + int(uuid[1:])
)({str(k)[0]: v for k, v in UUID_NAMESPACES.iteritems()})
class dummy_app:
id_timestamp = uuid = 0
class TerminalNeoCTL(object):
def __init__(self, *args, **kw):
self.neoctl = NeoCTL(*args, **kw)
......@@ -67,6 +75,15 @@ class TerminalNeoCTL(object):
asNode = staticmethod(uuid_int)
def formatPartitionTable(self, row_list):
nm = NodeManager()
nm.update(dummy_app, 1,
self.neoctl.getNodeList(node_type=NodeTypes.STORAGE))
pt = object.__new__(PartitionTable)
pt._load(None, None, row_list, nm.getByUUID)
pt.addNodeList(nm.getByStateList(NodeStates.RUNNING))
return '\n'.join(line[4:] for line in pt._format())
def formatRowList(self, row_list):
return '\n'.join('%03d |%s' % (offset,
''.join(' %s - %s |' % (uuid_str(uuid), state)
......@@ -105,10 +122,12 @@ class TerminalNeoCTL(object):
max_offset = int(max_offset)
if node is not None:
node = self.asNode(node)
ptid, row_list = self.neoctl.getPartitionRowList(
ptid, num_replicas, row_list = self.neoctl.getPartitionRowList(
min_offset=min_offset, max_offset=max_offset, node=node)
# TODO: return ptid
return self.formatRowList(row_list)
return '# ptid: %s, replicas: %s\n%s' % (ptid, num_replicas,
self.formatRowList(enumerate(row_list, min_offset))
if min_offset or max_offset else
self.formatPartitionTable(row_list))
def getNodeList(self, params):
"""
......@@ -140,6 +159,18 @@ class TerminalNeoCTL(object):
assert len(params) == 1
return self.neoctl.setClusterState(self.asClusterState(params[0]))
def setNumReplicas(self, params):
"""
Set number of replicas.
Parameters: nr
nr: positive number (0 means no redundancy)
"""
assert len(params) == 1
nr = int(params[0])
if nr < 0:
sys.exit('invalid number of replicas')
return self.neoctl.setNumReplicas(nr)
def startCluster(self, params):
"""
Starts cluster operation after a startup.
......@@ -167,10 +198,18 @@ class TerminalNeoCTL(object):
def tweakPartitionTable(self, params):
"""
Optimize partition table.
No partition will be assigned to specified storage nodes.
Parameters: [node [...]]
No change is done to the specified/down storage nodes and they don't
count as replicas. The purpose of listing nodes is usually to drop
them once the data is replicated to other nodes.
Parameters: [-n] [node [...]]
-n: dry run
"""
return self.neoctl.tweakPartitionTable(map(self.asNode, params))
dry_run = params[0] == '-n'
changed, row_list = self.neoctl.tweakPartitionTable(
map(self.asNode, params[dry_run:]), dry_run)
if changed:
return self.formatPartitionTable(row_list)
return 'No change done.'
def killNode(self, params):
"""
......
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from neo.lib.handler import EventHandler
from neo.lib.protocol import ErrorCodes, Packets
......@@ -44,8 +45,8 @@ class CommandEventHandler(EventHandler):
def ack(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.ACK, msg))
def protocolError(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.PROTOCOL_ERROR, msg))
def denied(self, conn,