Commit 8ba42463 authored by Julien Muchembled's avatar Julien Muchembled

Merge protocol v0

parents a33c624c 2b9e14e8
Change History
==============
1.12 (2019-04-28)
-----------------
Most changes in this version focus on the ability to migrate efficiently
and reliably a big ZODB to NEO, which required changes in the protocol.
See testSplitAndMakeResilientUsingClone for an example of scenario.
Better cluster management:
- New --new-nid storage option for fast cloning.
- The number of wanted replicas is now a property of the database, which is
modifiable when the cluster is running, and reported by `neoctl print pt`.
- Better error reporting from the master to neoctl for denied requests.
- tweak: do not touch cells of nodes that are intended to be dropped.
- tweak: do not crash when trying to remove all nodes.
- tweak: new neoctl option to ask the master to simulate.
- neoctl: better display of full partition tables.
- master: reject drop/tweak commands that could lead to unwanted status.
Importer:
- Fix possible data loss on writeback.
- v1.9 broke replication (as source) once the import is finished.
- Speed up startup when the import is already finished.
- Fix closure of ZODB, and also do it when the import is finished.
- Fix hidden "maximum recursion depth exceeded" at startup.
- Fix resumption when using SQLite.
- v1.10 broke resumption when there are new transactions since the import
started.
MySQL:
- Better support of RocksDB by specifying column families.
- Fix handling of connection strings (--database) without credentials.
1.11 (2019-03-11)
-----------------
......
......@@ -18,10 +18,8 @@ from neo.lib import logging
from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from .handler import AdminEventHandler, MasterEventHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger
......@@ -36,8 +34,8 @@ class Application(BaseApplication):
cls.addCommonServerOptions('admin', '127.0.0.1:9999')
_ = _.group('admin')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
_.int('i', 'nid',
help="specify an NID to use for this process (testing purpose)")
def __init__(self, config):
super(Application, self).__init__(
......@@ -53,9 +51,8 @@ class Application(BaseApplication):
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = config.get('uuid')
self.uuid = config.get('nid')
logging.node(self.name, self.uuid)
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None
self.reset()
......@@ -66,7 +63,6 @@ class Application(BaseApplication):
super(Application, self).close()
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
......@@ -117,40 +113,20 @@ class Application(BaseApplication):
self.cluster_state = None
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.ADMIN, self.server)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
if self.pt is None:
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
row_list = []
pt = self.pt
if max_offset == 0:
max_offset = self.pt.getPartitions()
max_offset = pt.getPartitions()
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
row_list = map(pt.getRow, xrange(min_offset, max_offset))
except IndexError:
conn.send(Errors.ProtocolError('invalid partition table offset'))
else:
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
conn.answer(Packets.AnswerPartitionList(
pt.getID(), pt.getReplicas(), row_list))
......@@ -17,30 +17,49 @@
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return check_primary_master(lambda self, conn, *args, **kw:
self.app.master_conn.ask(klass(*args, **kw),
conn=conn, msg_id=conn.getPeerId()))
def AdminEventHandlerType(name, bases, d):
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.master_conn is not None:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return lambda self, conn, *args: self.app.master_conn.ask(
klass(*args), conn=conn, msg_id=conn.getPeerId())
del d['__metaclass__']
for x in (
Packets.AddPendingNodes,
Packets.AskLastIDs,
Packets.AskLastTransaction,
Packets.AskRecovery,
Packets.CheckReplicas,
Packets.Repair,
Packets.SetClusterState,
Packets.SetNodeState,
Packets.SetNumReplicas,
Packets.Truncate,
Packets.TweakPartitionTable,
):
d[x.handler_method_name] = forward_ask(x)
return type(name, bases, {k: v if k[0] == '_' else check_primary_master(v)
for k, v in d.iteritems()})
class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
@check_primary_master
__metaclass__ = AdminEventHandlerType
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
def askNodeList(self, conn, node_type):
if node_type is None:
node_type = 'all'
......@@ -53,36 +72,22 @@ class AdminEventHandler(EventHandler):
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p)
@check_primary_master
def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master
def askPrimary(self, conn):
master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
@check_primary_master
def flushLog(self, conn):
self.app.master_conn.send(Packets.FlushLog())
super(AdminEventHandler, self).flushLog(conn)
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
askRecovery = forward_ask(Packets.AskRecovery)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
class MasterEventHandler(EventHandler):
""" This class is just used to dispatch message to right handler"""
def _connectionLost(self, conn):
def connectionClosed(self, conn):
app = self.app
if app.listening_conn: # if running
assert app.master_conn in (conn, None)
......@@ -91,42 +96,21 @@ class MasterEventHandler(EventHandler):
app.uuid = None
raise PrimaryFailure
def connectionFailed(self, conn):
self._connectionLost(conn)
def connectionClosed(self, conn):
self._connectionLost(conn)
def dispatch(self, conn, packet, kw={}):
if 'conn' in kw:
# expected answer
if packet.isResponse():
packet.setId(kw['msg_id'])
kw['conn'].answer(packet)
else:
self.app.request_handler.dispatch(conn, packet, kw)
else:
# unexpected answers and notifications
forward = kw.get('conn')
if forward is None:
super(MasterEventHandler, self).dispatch(conn, packet, kw)
else:
forward.send(packet, kw['msg_id'])
def answerClusterState(self, conn, state):
self.app.cluster_state = state
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
notifyClusterInformation = answerClusterState
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
# XXX: to be deleted ?
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
......@@ -13,6 +13,13 @@
##############################################################################
def patch():
# For msgpack & Py2/ZODB5.
try:
from zodbpickle import binary
binary._pack = bytes.__str__
except ImportError:
pass
from hashlib import md5
from ZODB.Connection import Connection
......
......@@ -18,6 +18,7 @@ import heapq
import random
import time
from collections import defaultdict
try:
from ZODB._compat import dumps, loads, _protocol
except ImportError:
......@@ -76,11 +77,11 @@ class Application(ThreadedApplication):
self.primary_master_node = None
self.trying_master_node = None
# no self-assigned UUID, primary master will supply us one
# no self-assigned NID, primary master will supply us one
self._cache = ClientCache() if cache_size is None else \
ClientCache(max_size=cache_size)
self._loading = defaultdict(lambda: (Lock(), []))
self.new_oid_list = ()
self.new_oids = ()
self.last_oid = '\0' * 8
self.storage_event_handler = storage.StorageEventHandler(self)
self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
......@@ -181,7 +182,7 @@ class Application(ThreadedApplication):
with self._connecting_to_master_node:
result = self.master_conn
if result is None:
self.new_oid_list = ()
self.new_oids = ()
result = self.master_conn = self._connectToPrimaryNode()
return result
......@@ -220,8 +221,8 @@ class Application(ThreadedApplication):
self.notifications_handler,
node=node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, (), None)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, (), ())
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
......@@ -238,7 +239,6 @@ class Application(ThreadedApplication):
# operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again.
logging.info('Initializing from master')
ask(conn, Packets.AskPartitionTable(), handler=handler)
ask(conn, Packets.AskLastTransaction(), handler=handler)
if self.pt.operational():
break
......@@ -264,7 +264,7 @@ class Application(ThreadedApplication):
conn = MTClientConnection(self, self.storage_event_handler, node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, (), self.id_timestamp)
self.uuid, None, self.name, self.id_timestamp, (), ())
try:
self._ask(conn, p, handler=self.storage_bootstrap_handler)
except ConnectionClosed:
......@@ -306,15 +306,19 @@ class Application(ThreadedApplication):
"""Get a new OID."""
self._oid_lock_acquire()
try:
if not self.new_oid_list:
for oid in self.new_oids:
break
else:
# Get new oid list from master node
# we manage a list of oid here to prevent
# from asking too many time new oid one by one
# from master node
self._askPrimary(Packets.AskNewOIDs(100))
if not self.new_oid_list:
for oid in self.new_oids:
break
else:
raise NEOStorageError('new_oid failed')
self.last_oid = oid = self.new_oid_list.pop()
self.last_oid = oid
return oid
finally:
self._oid_lock_release()
......@@ -611,7 +615,7 @@ class Application(ThreadedApplication):
# user and description are cast to str in case they're unicode.
# BBB: This is not required anymore with recent ZODB.
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), ext, txn_context.cache_dict)
str(transaction.description), ext, list(txn_context.cache_dict))
queue = txn_context.queue
conn_dict = txn_context.conn_dict
# Ask in parallel all involved storage nodes to commit object metadata.
......@@ -696,7 +700,7 @@ class Application(ThreadedApplication):
else:
try:
notify(Packets.AbortTransaction(txn_context.ttid,
txn_context.conn_dict))
list(txn_context.conn_dict)))
except ConnectionClosed:
pass
# We don't need to flush queue, as it won't be reused by future
......@@ -731,7 +735,8 @@ class Application(ThreadedApplication):
for oid in checked_list:
del cache_dict[oid]
ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list)
p = Packets.AskFinishTransaction(ttid, list(cache_dict),
checked_list)
try:
tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
assert tid
......@@ -765,17 +770,11 @@ class Application(ThreadedApplication):
def undo(self, undone_tid, txn):
txn_context = self._txn_container.get(txn)
txn_info, txn_ext = self._getTransactionInformation(undone_tid)
txn_oid_list = txn_info['oids']
# Regroup objects per partition, to ask a minimum set of storage.
partition_oid_dict = {}
for oid in txn_oid_list:
partition = self.pt.getPartition(oid)
try:
oid_list = partition_oid_dict[partition]
except KeyError:
oid_list = partition_oid_dict[partition] = []
oid_list.append(oid)
partition_oid_dict = defaultdict(list)
for oid in txn_info['oids']:
partition_oid_dict[self.pt.getPartition(oid)].append(oid)
# Ask storage the undo serial (serial at which object's previous data
# is)
......@@ -817,8 +816,8 @@ class Application(ThreadedApplication):
raise UndoError('non-undoable transaction')
# Send undo data to all storage nodes.
for oid in txn_oid_list:
current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
for oid, (current_serial, undo_serial, is_current) in \
undo_object_tid_dict.iteritems():
if is_current:
data = None
else:
......@@ -852,7 +851,7 @@ class Application(ThreadedApplication):
self._store(txn_context, oid, current_serial, data, undo_serial)
self.waitStoreResponses(txn_context)
return None, txn_oid_list
return None, list(undo_object_tid_dict)
def _getTransactionInformation(self, tid):
return self._askStorageForRead(tid,
......@@ -933,9 +932,9 @@ class Application(ThreadedApplication):
for serial, size in self._askStorageForRead(oid, packet):
txn_info, txn_ext = self._getTransactionInformation(serial)
# create history dict
txn_info.pop('id')
txn_info.pop('oids')
txn_info.pop('packed')
del txn_info['id']
del txn_info['oids']
del txn_info['packed']
txn_info['tid'] = serial
txn_info['version'] = ''
txn_info['size'] = size
......
......@@ -26,10 +26,6 @@ from ..exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def answerPartitionTable(self, conn, ptid, row_list):
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerLastTransaction(*args):
pass
......@@ -42,9 +38,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
except PrimaryElected, e:
self.app.primary_master_node, = e.args
def _acceptIdentification(self, node, num_partitions, num_replicas):
self.app.pt = PartitionTable(num_partitions, num_replicas)
def answerLastTransaction(self, conn, ltid):
app = self.app
app_last_tid = app.__dict__.get('last_tid', '')
......@@ -131,9 +124,12 @@ class PrimaryNotificationsHandler(MTEventHandler):
if db is not None:
db.invalidate(tid, oid_list)
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryNotificationsHandler, self).notifyNodeInformation(
......@@ -161,8 +157,7 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
self.app.setHandlerData(ttid)
def answerNewOIDs(self, conn, oid_list):
oid_list.reverse()
self.app.new_oid_list = oid_list
self.app.new_oids = iter(oid_list)
def incompleteTransaction(self, conn, message):
raise NEOStorageError("storage nodes for which vote failed can not be"
......
......@@ -26,7 +26,7 @@ from .exception import NEOStorageError
class _WakeupPacket(object):
handler_method_name = 'pong'
decode = tuple
_args = ()
getId = int
class Transaction(object):
......
......@@ -26,7 +26,7 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, node_type, server=None, devpath=()):
def __init__(self, app, node_type, server=None, devpath=(), new_nid=()):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
......@@ -34,9 +34,8 @@ class BootstrapManager(EventHandler):
"""
self.server = server
self.devpath = devpath
self.new_nid = new_nid
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
app.nm.reset()
uuid = property(lambda self: self.app.uuid)
......@@ -44,7 +43,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, self.devpath, None))
self.server, self.app.name, None, self.devpath, self.new_nid))
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
......@@ -53,10 +52,8 @@ class BootstrapManager(EventHandler):
def connectionLost(self, conn, new_state):
self.current = None
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.current is node, (self.current, node)
self.num_partitions = num_partitions
self.num_replicas = num_replicas
def getPrimaryConnection(self):
"""
......@@ -73,8 +70,7 @@ class BootstrapManager(EventHandler):
try:
while self.current:
if self.current.isIdentified():
return (self.current, self.current.getConnection(),
self.num_partitions, self.num_replicas)
return self.current, self.current.getConnection()
poll(1)
except PrimaryElected, e:
if self.current:
......
......@@ -16,12 +16,19 @@
from functools import wraps
from time import time
import msgpack
from msgpack.exceptions import UnpackValueError
from . import attributeTracker, logging
from .connector import ConnectorException, ConnectorDelayedConnection
from .locking import RLock
from .protocol import uuid_str, Errors, PacketMalformedError, Packets
from .util import dummy_read_buffer, ReadBuffer
from .protocol import uuid_str, Errors, PacketMalformedError, Packets, \
Unpacker
@apply
class dummy_read_buffer(msgpack.Unpacker):
def feed(self, _):
pass
class ConnectionClosed(Exception):
pass
......@@ -209,7 +216,7 @@ class BaseConnection(object):
def _getReprInfo(self):
r = [
('uuid', uuid_str(self.getUUID())),
('nid', uuid_str(self.getUUID())),
('address', ('[%s]:%s' if ':' in self.addr[0] else '%s:%s')
% self.addr if self.addr else '?'),
('handler', self.getHandler()),
......@@ -291,7 +298,7 @@ class ListeningConnection(BaseConnection):
# message.
else:
conn._connected()
self.em.addWriter(conn) # for ENCODED_VERSION
self.em.addWriter(conn) # for HANDSHAKE_PACKET
def getAddress(self):
return self.connector.getAddress()
......@@ -310,12 +317,12 @@ class Connection(BaseConnection):
client = False
server = False
peer_id = None
_parser_state = None
_total_unpacked = 0
_timeout = None
def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer()
self.read_buf = Unpacker()
self.cur_id = 0
self.aborted = False
self.uuid = None
......@@ -425,42 +432,39 @@ class Connection(BaseConnection):
self._closure()
def _parse(self):
read = self.read_buf.read
version = read(4)
if version is None:
return
from .protocol import (ENCODED_VERSION, MAX_PACKET_SIZE,
PACKET_HEADER_FORMAT, Packets)
if version != ENCODED_VERSION:
logging.warning('Protocol version mismatch with %r', self)
from .protocol import HANDSHAKE_PACKET, MAGIC_SIZE, Packets
read_buf = self.read_buf
handshake = read_buf.read_bytes(len(HANDSHAKE_PACKET))
if handshake != HANDSHAKE_PACKET:
if HANDSHAKE_PACKET.startswith(handshake): # unlikely so tested last
# Not enough data and there's no API to know it in advance.
# Put it back.
read_buf.feed(handshake)
return
if HANDSHAKE_PACKET.startswith(handshake[:MAGIC_SIZE]):
logging.warning('Protocol version mismatch with %r', self)
else:
logging.debug('Rejecting non-NEO %r', self)
raise ConnectorException
header_size = PACKET_HEADER_FORMAT.size
unpack = PACKET_HEADER_FORMAT.unpack
read_next = read_buf.next
read_pos = read_buf.tell
def parse():
state = self._parser_state
if state is None:
header = read(header_size)
if header is None:
return
msg_id, msg_type, msg_len = unpack(header)
try:
packet_klass = Packets[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
else:
msg_id, packet_klass, msg_len = state
data = read(msg_len)
if data is None:
# Not enough.
if state is None:
self._parser_state = msg_id, packet_klass, msg_len
else:
self._parser_state = None
packet = packet_klass()
packet.setContent(msg_id, data)
return packet
try:
msg_id, msg_type, args = read_next()
except StopIteration:
return
except UnpackValueError as e:
raise PacketMalformedError(str(e))
try:
packet_klass = Packets[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
pos = read_pos()
packet = packet_klass(*args)
packet.setId(msg_id)
packet.size = pos - self._total_unpacked
self._total_unpacked = pos
return packet
self._parse = parse
return parse()
......@@ -513,7 +517,7 @@ class Connection(BaseConnection):
def close(self):
if self.connector is None:
assert self._on_close is None
assert not self.read_buf
assert not self.read_buf.read_bytes(1)
assert not self.isPending()
return
# process the network events with the last registered handler to
......@@ -524,7 +528,7 @@ class Connection(BaseConnection):
if self._on_close is not None:
self._on_close()
self._on_close = None
self.read_buf.clear()
self.read_buf = dummy_read_buffer
try:
if self.connecting:
handler.connectionFailed(self)
......
......@@ -19,7 +19,7 @@ import ssl
import errno
from time import time
from . import logging
from .protocol import ENCODED_VERSION
from .protocol import HANDSHAKE_PACKET
# Global connector registry.
# Fill by calling registerConnectorHandler.
......@@ -74,15 +74,14 @@ class SocketConnector(object):
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.queued = [ENCODED_VERSION]
self.queue_size = len(ENCODED_VERSION)
self.queued = [HANDSHAKE_PACKET]
self.queue_size = len(HANDSHAKE_PACKET)
return self
def queue(self, data):
was_empty = not self.queued
self.queued += data
for data in data:
self.queue_size += len(data)
self.queued.append(data)
self.queue_size += len(data)
return was_empty
def _error(self, op, exc=None):
......@@ -172,7 +171,7 @@ class SocketConnector(object):
except socket.error, e:
self._error('recv', e)
if data:
read_buf.append(data)
read_buf.feed(data)
return
self._error('recv')
......@@ -278,7 +277,7 @@ class _SSL:
def receive(self, read_buf):
try:
while 1:
read_buf.append(self.socket.recv(4096))
read_buf.feed(self.socket.recv(4096))
except ssl.SSLWantReadError:
pass
except socket.error, e:
......
......@@ -23,7 +23,7 @@ NOBODY = []
class _ConnectionClosed(object):
handler_method_name = 'connectionClosed'
decode = tuple
_args = ()
class getId(object):
def __eq__(self, other):
......
......@@ -26,6 +26,9 @@ from .protocol import (NodeStates, NodeTypes, Packets, uuid_str,
from .util import cached_property
class AnswerDenied(Exception):
"""Helper exception to stop packet processing and answer a Denied error"""
class DelayEvent(Exception):
pass
......@@ -68,7 +71,7 @@ class EventHandler(object):
method = getattr(self, packet.handler_method_name)
except AttributeError:
raise UnexpectedPacketError('no handler found')
args = packet.decode() or ()
args = packet._args
method(conn, *args, **kw)
except DelayEvent, e:
assert not kw, kw
......@@ -76,9 +79,6 @@ class EventHandler(object):
except UnexpectedPacketError, e:
if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args)
except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', conn, e)
conn.close()
except NotReadyError, message:
if not conn.isClosed():
if not message.args:
......@@ -98,6 +98,8 @@ class EventHandler(object):
% (m.im_class.__module__, m.im_class.__name__, m.__name__)))
except NonReadableCell, e:
conn.answer(Errors.NonReadableCell())
except AnswerDenied, e:
conn.answer(Errors.Denied(str(e)))
except AssertionError:
e = sys.exc_info()
try:
......@@ -160,8 +162,7 @@ class EventHandler(object):
def _acceptIdentification(*args):
pass
def acceptIdentification(self, conn, node_type, uuid,
num_partitions, num_replicas, your_uuid):
def acceptIdentification(self, conn, node_type, uuid, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
assert node.getConnection() is conn, (node.getConnection(), conn)
......@@ -180,7 +181,7 @@ class EventHandler(object):
elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids')
node.setIdentified()
self._acceptIdentification(node, num_partitions, num_replicas)
self._acceptIdentification(node)
return
conn.close()
......
......@@ -74,7 +74,7 @@ def implements(obj, ignore=()):
assert not wrong_signature, wrong_signature
return obj
def _set_code(func):
def _stub(func):
args, varargs, varkw, _ = inspect.getargspec(func)
if varargs:
args.append("*" + varargs)
......@@ -82,16 +82,25 @@ def _set_code(func):
args.append("**" + varkw)
exec "def %s(%s): raise NotImplementedError\nf = %s" % (
func.__name__, ",".join(args), func.__name__)
func.func_code = f.func_code
return f
def abstract(func):
_set_code(func)
func.__abstract__ = 1
return func
f = _stub(func)
f.__abstract__ = 1
f.__defaults__ = func.__defaults__
f.__doc__ = func.__doc__
return f
def requires(*args):
for func in args:
_set_code(func)
# Tolerate useless abstract decoration on required method (e.g. it
# simplifies the implementation of a fallback decorator), but remove
# marker since it does not need to be implemented if it's required
# by a method that is overridden.
try:
del func.__abstract__
except AttributeError:
func.__code__ = _stub(func).__code__
def decorator(func):
func.__requires__ = args
return func
......
......@@ -152,7 +152,8 @@ class NEOLogger(Logger):
def _setup(self, filename=None, reset=False):
from . import protocol as p
global uuid_str
global packb, uuid_str
packb = p.packb
uuid_str = p.uuid_str
if self._db is not None:
self._db.close()
......@@ -250,7 +251,7 @@ class NEOLogger(Logger):
'>' if r.outgoing else '<', uuid_str(r.uuid), ip, port)
msg = r.msg
if msg is not None:
msg = buffer(msg)
msg = buffer(msg if type(msg) is bytes else packb(msg))
q = "INSERT INTO packet VALUES (?,?,?,?,?,?)"
x = [r.created, nid, r.msg_id, r.code, peer, msg]
else:
......@@ -299,9 +300,14 @@ class NEOLogger(Logger):
def packet(self, connection, packet, outgoing):
if self._db is not None:
body = packet._body
if self._max_packet and self._max_packet < len(body):
body = None
if self._max_packet and self._max_packet < packet.size:
args = None
else:
args = packet._args
try:
hash(args)
except TypeError:
args = packb(args)
self._queue(PacketRecord(
created=time(),
msg_id=packet._id,
......@@ -309,7 +315,7 @@ class NEOLogger(Logger):
outgoing=outgoing,
uuid=connection.getUUID(),
addr=connection.getAddress(),
msg=body))
msg=args))
def node(self, *cluster_nid):
name = self.name and str(self.name)
......
......@@ -486,7 +486,7 @@ class NodeManager(EventQueue):
# For the first notification, we receive a full list of nodes from
# the master. Remove all unknown nodes from a previous connection.
for node in self._node_set.difference(added_list):
if app.pt.dropNode(node):
if not node.isStorage() or app.pt.dropNode(node):
self.remove(node)
self.log()
self.executeQueuedEvents()
......
This diff is collapsed.
......@@ -86,15 +86,9 @@ class PartitionTable(object):
'a cell became non-readable whereas all cells were readable'
def __init__(self, num_partitions, num_replicas):
self._id = None
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
# Note: don't use [[]] * num_partition construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(num_partitions)]
self.count_dict = {}
self.clear()
def getID(self):
return self._id
......@@ -113,7 +107,16 @@ class PartitionTable(object):
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(self.np)]
self.count_dict.clear()
self.count_dict = {}
def addNodeList(self, node_list):
"""Add nodes"""
added_list = []
for node in node_list:
if node not in self.count_dict:
self.count_dict[node] = 0
added_list.append(node)
return added_list
def getAssignedPartitionList(self, uuid):
""" Return the partition assigned to the specified UUID """
......@@ -203,31 +206,31 @@ class PartitionTable(object):
del self.count_dict[node]
return not count
def load(self, ptid, row_list, nm):
def _load(self, ptid, num_replicas, row_list, getByUUID):
self.__init__(len(row_list), num_replicas)
self._id = ptid
for offset, row in enumerate(row_list):
for uuid, state in row:
node = getByUUID(uuid)
self._setCell(offset, node, state)
def load(self, ptid, num_replicas, row_list, nm):
"""
Load the partition table with the specified PTID, discard all previous
content.
"""
self.clear()
self._id = ptid
for offset, row in row_list:
if offset >= self.getPartitions():
raise IndexError
for uuid, state in row:
node = nm.getByUUID(uuid)
# the node must be known by the node manager
assert node is not None
self._setCell(offset, node, state)
self._load(ptid, num_replicas, row_list, nm.getByUUID)
logging.debug('partition table loaded (ptid=%s)', ptid)
self.log()
def update(self, ptid, cell_list, nm):
def update(self, ptid, num_replicas, cell_list, nm):
"""
Update the partition with the cell list supplied. If a node
is not known, it is created in the node manager and set as unavailable
"""
assert self._id < ptid, (self._id, ptid)
self._id = ptid
self.nr = num_replicas
readable_list = []
for row in self.partition_list:
if not all(cell.isReadable() for cell in row):
......@@ -310,14 +313,11 @@ class PartitionTable(object):
return True
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return []
return [(cell.getUUID(), cell.getState()) for cell in row]
return [(cell.getUUID(), cell.getState())
for cell in self.partition_list[offset]]
def getRowList(self):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
return map(self.getRow, xrange(self.np))
class MTPartitionTable(PartitionTable):
""" Thread-safe aware version of the partition table, override only methods
......
......@@ -166,65 +166,6 @@ def parseMasterList(masters):
return map(parseNodeAddress, masters.split())
class ReadBuffer(object):
"""
Implementation of a lazy buffer. Main purpose if to reduce useless
copies of data by storing chunks and join them only when the requested
size is available.
TODO: For better performance, use:
- socket.recv_into (64kiB blocks)
- struct.unpack_from
- and a circular buffer of dynamic size (initial size:
twice the length passed to socket.recv_into ?)
"""
def __init__(self):
self.size = 0
self.content = deque()
def append(self, data):
""" Append some data and compute the new buffer size """
self.size += len(data)
self.content.append(data)
def __len__(self):
""" Return the current buffer size """
return self.size
def read(self, size):
""" Read and consume size bytes """
if self.size < size:
return None
self.size -= size
chunk_list = []
pop_chunk = self.content.popleft
append_data = chunk_list.append
to_read = size
# select required chunks
while to_read > 0:
chunk_data = pop_chunk()
to_read -= len(chunk_data)
append_data(chunk_data)
if to_read < 0:
# too many bytes consumed, cut the last chunk
last_chunk = chunk_list[-1]
keep, let = last_chunk[:to_read], last_chunk[to_read:]
self.content.appendleft(let)
chunk_list[-1] = keep
# join all chunks (one copy)
data = ''.join(chunk_list)
assert len(data) == size
return data
def clear(self):
""" Erase all buffer content """
self.size = 0
self.content.clear()
dummy_read_buffer = ReadBuffer()
dummy_read_buffer.append = lambda _: None
class cached_property(object):
"""
A property that is only computed once per instance and then replaces itself
......
......@@ -16,6 +16,7 @@
import sys
from collections import defaultdict
from functools import partial
from time import time
from neo.lib import logging, util
......@@ -76,13 +77,11 @@ class Application(BaseApplication):
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Master node"
parser = cls.option_parser
parser.description = "NEO Master node"
cls.addCommonServerOptions('master', '127.0.0.1:10000', '')
_ = _.group('master')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
_ = parser.group('master')
_.int('A', 'autostart',
help="minimum number of pending storage nodes to automatically"
" start new cluster (to avoid unwanted recreation of the"
......@@ -91,8 +90,12 @@ class Application(BaseApplication):
help='the name of cluster to backup')
_('M', 'upstream-masters', parse=util.parseMasterList,
help='list of master nodes in the cluster to backup')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
_.int('i', 'nid',
help="specify an NID to use for this process (testing purpose)")
_ = parser.group('database creation')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
def __init__(self, config):
super(Application, self).__init__(
......@@ -108,7 +111,7 @@ class Application(BaseApplication):
for master_address in config['masters']:
self.nm.createMaster(address=master_address)
self._node = self.nm.createMaster(address=self.server,
uuid=config.get('uuid'))
uuid=config.get('nid'))
logging.node(self.name, self.uuid)
logging.debug('IP address is %s, port is %d', *self.server)
......@@ -117,14 +120,14 @@ class Application(BaseApplication):
replicas = config['replicas']
partitions = config['partitions']
if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
sys.exit('replicas must be a positive integer')
if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas)
sys.exit('partitions must be more than zero')
logging.info('Configuration:')
logging.info('Partitions: %d', partitions)
logging.info('Replicas : %d', replicas)
logging.info('Name : %s', self.name)
self.newPartitionTable = partial(PartitionTable, partitions, replicas)
self.listening_conn = None
self.cluster_state = None
......@@ -196,7 +199,7 @@ class Application(BaseApplication):
node_dict[NodeTypes.MASTER].append(node_info)
return node_dict
def broadcastNodesInformation(self, node_list, exclude=None):
def broadcastNodesInformation(self, node_list):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
......@@ -209,20 +212,26 @@ class Application(BaseApplication):
# We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters.
if node_list and node is not exclude:
if node_list:
node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list):
def broadcastPartitionChanges(self, cell_list, num_replicas=None):
"""Broadcast a Notify Partition Changes packet."""
if cell_list:
ptid = self.pt.setNextID()
self.pt.logUpdated()
packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
pt = self.pt
if num_replicas is not None:
pt.setReplicas(num_replicas)
elif cell_list:
num_replicas = pt.getReplicas()
else:
return
packet = Packets.NotifyPartitionChanges(
pt.setNextID(), num_replicas, cell_list)
pt.logUpdated()
for node in self.nm.getIdentifiedList():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
def provideService(self):
"""
......@@ -437,16 +446,7 @@ class Application(BaseApplication):
conn.send(notification_packet)
elif conn.isServer():
continue
if node.isClient():
if state == ClusterStates.RUNNING:
handler = self.client_service_handler
elif state == ClusterStates.BACKINGUP:
handler = self.client_ro_service_handler
else:
if state != ClusterStates.STOPPING:
conn.abort()
continue
elif node.isMaster():
if node.isMaster():
if state == ClusterStates.RECOVERING:
handler = self.election_handler
else:
......@@ -454,10 +454,16 @@ class Application(BaseApplication):
elif node.isStorage() and storage_handler:
handler = storage_handler
else:
# There's a single handler type for admins.
# Client can't change handler without being first disconnected.
assert state in (
ClusterStates.STOPPING,
ClusterStates.STOPPING_BACKUP,
) or not node.isClient(), (state, node)
continue # keep handler
if type(handler) is not type(conn.getLastHandler()):
conn.setHandler(handler)
handler.connectionCompleted(conn, new=False)
handler.handlerSwitched(conn, new=False)
self.cluster_state = state
def getNewUUID(self, uuid, address, node_type):
......@@ -578,7 +584,9 @@ class Application(BaseApplication):
self.tm.executeQueuedEvents()
def startStorage(self, node):
node.send(Packets.StartOperation(self.backup_tid))
# XXX: Is this boolean 'backup' field needed ?
# Maybe this can be deduced from cluster state.
node.send(Packets.StartOperation(bool(self.backup_tid)))
uuid = node.getUUID()
assert uuid not in self.storage_starting_set
if uuid not in self.storage_ready_dict:
......
......@@ -65,6 +65,7 @@ There is no conflict of node id between the 2 clusters:
class BackupApplication(object):
pt = None
server = None # like in BaseApplication
uuid = None
def __init__(self, app, name, master_addresses):
......@@ -111,17 +112,12 @@ class BackupApplication(object):
else:
break
poll(1)
node, conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
node, conn = bootstrap.getPrimaryConnection()
try:
app.changeClusterState(ClusterStates.BACKINGUP)
del bootstrap, node
if num_partitions != pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
self.ignore_invalidations = True
self.pt = PartitionTable(num_partitions, num_replicas)
conn.setHandler(BackupHandler(self))
conn.ask(Packets.AskPartitionTable())
conn.ask(Packets.AskLastTransaction())
# debug variable to log how big 'tid_list' can be.
self.debug_tid_count = 0
......
......@@ -23,10 +23,6 @@ from neo.lib.protocol import Packets
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def connectionCompleted(self, conn, new=None):
if new is None:
super(MasterHandler, self).connectionCompleted(conn)
def connectionLost(self, conn, new_state=None):
if self.app.listening_conn: # if running
self._connectionLost(conn)
......@@ -59,17 +55,20 @@ class MasterHandler(EventHandler):
+ app.getNodeInformationDict(node_list)[node.getType()])
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn):
def handlerSwitched(self, conn, new):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
# Except storages during recovery and secondary masters, all nodes
# receives the full partition table as soon as they're identified.
# It is also sent in 2 other cases:
# - to admins during recovery, whenever a newer PT is loaded;
# - to storage when switching from recovery to verification.
# After that, non-master nodes only receive incremental updates.
conn.send(Packets.SendPartitionTable(
pt.getID(), pt.getReplicas(), pt.getRowList()))
class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase."""
def connectionCompleted(self, conn, new):
pt = self.app.pt
conn.send(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
"""Common handler class for storage nodes."""
def connectionLost(self, conn, new_state):
app = self.app
......
......@@ -15,14 +15,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
from functools import wraps
from . import MasterHandler
from ..app import monotonic_time, StateChangedException
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.handler import AnswerDenied
from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
NodeStates, NodeTypes, Packets, uuid_str
from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = {
......@@ -38,9 +40,25 @@ NODE_STATE_WORKFLOW = {
NodeTypes.STORAGE: (NodeStates.DOWN, NodeStates.UNKNOWN),
}
def check_state(*states):
def decorator(wrapped):
def wrapper(self, *args):
state = self.app.getClusterState()
if state not in states:
raise AnswerDenied('%s RPC can not be used in %s state'
% (wrapped.__name__, state))
wrapped(self, *args)
return wraps(wrapped)(wrapper)
return decorator
class AdministrationHandler(MasterHandler):
"""This class deals with messages from the admin node only"""
def handlerSwitched(self, conn, new):
assert new
super(AdministrationHandler, self).handlerSwitched(conn, new)
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
if node is not None:
......@@ -58,30 +76,28 @@ class AdministrationHandler(MasterHandler):
# check request
try:
if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
raise ProtocolError('Can not switch to this state')
raise AnswerDenied('Can not switch to this state')
except KeyError:
if state != ClusterStates.STOPPING:
raise ProtocolError('Invalid state requested')
raise AnswerDenied('Invalid state requested')
# change state
if state == ClusterStates.VERIFYING:
storage_list = app.nm.getStorageList(only_identified=True)
if not storage_list:
raise ProtocolError('Cannot exit recovery without any '
'storage node')
raise AnswerDenied(
'Cannot exit recovery without any storage node')
for node in storage_list:
assert node.isPending(), node
if node.getConnection().isPending():
# XXX: It's wrong to use ProtocolError here. We must reply
# less aggressively because the admin has no way to
# know that there's still pending activity.
raise ProtocolError('Cannot exit recovery now: node %r is '
'entering cluster' % (node, ))
raise AnswerDenied(
'Cannot exit recovery now: node %r is entering cluster'
% node,)
app._startup_allowed = True
state = app.cluster_state
elif state == ClusterStates.STARTING_BACKUP:
if app.tm.hasPending() or app.nm.getClientList(True):
raise ProtocolError("Can not switch to %s state with pending"
raise AnswerDenied("Can not switch to %s state with pending"
" transactions or connected clients" % state)
conn.answer(Errors.Ack('Cluster state changed'))
......@@ -93,21 +109,24 @@ class AdministrationHandler(MasterHandler):
app = self.app
node = app.nm.getByUUID(uuid)
if node is None:
raise ProtocolError('unknown node')
raise AnswerDenied('unknown node')
if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
raise ProtocolError('can not switch node to this state')
raise AnswerDenied('can not switch node to %s state' % state)
if uuid == app.uuid:
raise ProtocolError('can not kill primary master node')
raise AnswerDenied('can not kill primary master node')
state_changed = state != node.getState()
message = ('state changed' if state_changed else
'node already in %s state' % state)
if node.isStorage():
keep = state == NodeStates.DOWN
if node.isRunning() and not keep:
raise AnswerDenied(
"a running node must be stopped before removal")
try:
cell_list = app.pt.dropNodeList([node], keep)
except PartitionTableException, e:
raise ProtocolError(str(e))
raise AnswerDenied(str(e))
node.setState(state)
if node.isConnected():
# notify itself so it can shutdown
......@@ -134,16 +153,17 @@ class AdministrationHandler(MasterHandler):
monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node])
# XXX: Would it be safe to allow more states ?
__change_pt_rpc = check_state(
ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP)
@__change_pt_rpc
def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join(map(uuid_str, uuid_list))
logging.debug('Add nodes %s', uuids)
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not add nodes in %s state' % state)
# take all pending nodes
node_list = list(app.pt.addNodeList(node
for node in app.nm.getStorageList()
......@@ -165,31 +185,50 @@ class AdministrationHandler(MasterHandler):
for uuid in uuid_list:
node = getByUUID(uuid)
if node is None or not (node.isStorage() and node.isIdentified()):
raise ProtocolError("invalid storage node %s" % uuid_str(uuid))
raise AnswerDenied("invalid storage node %s" % uuid_str(uuid))
node_list.append(node)
repair = Packets.NotifyRepair(*args)
for node in node_list:
node.send(repair)
conn.answer(Errors.Ack(''))
def tweakPartitionTable(self, conn, uuid_list):
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not tweak partition table in %s state'
% state)
app.broadcastPartitionChanges(app.pt.tweak([node
for node in app.nm.getStorageList()
if node.getUUID() in uuid_list or not node.isRunning()]))
@__change_pt_rpc
def setNumReplicas(self, conn, num_replicas):
self.app.broadcastPartitionChanges((), num_replicas)
conn.answer(Errors.Ack(''))
def truncate(self, conn, tid):
@__change_pt_rpc
def tweakPartitionTable(self, conn, dry_run, uuid_list):
app = self.app
if app.cluster_state != ClusterStates.RUNNING:
raise ProtocolError('Can not truncate in this state')
drop_list = []
for node in app.nm.getStorageList():
if node.getUUID() in uuid_list or node.isPending():
drop_list.append(node)
elif not node.isRunning():
drop_list.append(node)
raise AnswerDenied(
'tweak: down nodes must be listed explicitly')
if dry_run:
pt = object.__new__(app.pt.__class__)
new_nodes = pt.load(app.pt.getID(), app.pt.getReplicas(),
app.pt.getRowList(), app.nm)
assert not new_nodes
pt.addNodeList(node
for node, count in app.pt.count_dict.iteritems()
if not count)
else:
pt = app.pt
try:
changed_list = pt.tweak(drop_list)
except PartitionTableException, e:
raise AnswerDenied(str(e))
if not dry_run:
app.broadcastPartitionChanges(changed_list)
conn.answer(Packets.AnswerTweakPartitionTable(
bool(changed_list), pt.getRowList()))
@check_state(ClusterStates.RUNNING)
def truncate(self, conn, tid):
conn.answer(Errors.Ack(''))
raise StoppedOperation(tid)
......@@ -237,3 +276,5 @@ class AdministrationHandler(MasterHandler):
node.send(Packets.CheckPartition(
offset, source, min_tid, max_tid))
conn.answer(Errors.Ack(''))
del __change_pt_rpc
......@@ -17,6 +17,7 @@
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.protocol import ZERO_TID
from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler):
"""Handler dedicated to upstream master during BACKINGUP state"""
......@@ -25,12 +26,15 @@ class BackupHandler(EventHandler):
if self.app.app.listening_conn: # if running
raise PrimaryFailure('connection lost')
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
app = self.app
pt = app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
if pt.getPartitions() != app.app.pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def answerLastTransaction(self, conn, tid):
app = self.app
......
......@@ -22,6 +22,10 @@ from . import MasterHandler
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
def handlerSwitched(self, conn, new):
assert new
super(ClientServiceHandler, self).handlerSwitched(conn, new)
def _connectionLost(self, conn):
# cancel its transactions and forgot the node
app = self.app
......
......@@ -17,14 +17,14 @@
from neo.lib import logging
from neo.lib.exception import PrimaryElected
from neo.lib.handler import EventHandler
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \
NotReadyError, Packets, ProtocolError, uuid_str
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
NodeTypes, NotReadyError, Packets, ProtocolError, uuid_str
from ..app import monotonic_time
class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......@@ -77,6 +77,16 @@ class IdentificationHandler(EventHandler):
manager = app
state, handler = manager.identifyStorageNode(
uuid is not None and node is not None)
if not address:
if app.cluster_state == ClusterStates.RECOVERING:
raise NotReadyError
if uuid or not new_nid:
raise ProtocolError
state = NodeStates.DOWN
# We'll let the storage node close the connection. If we
# aborted it at the end of the method, BootstrapManager
# (which is used by storage nodes) could see the closure
# and try to reconnect to a master.
human_readable_node_type = ' storage (%s) ' % (state, )
elif node_type == NodeTypes.MASTER:
if app.election:
......@@ -105,24 +115,27 @@ class IdentificationHandler(EventHandler):
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time()
node.setState(state)
app.broadcastNodesInformation([node])
if new_nid:
changed_list = []
for offset in new_nid:
changed_list.append((offset, uuid, CellStates.OUT_OF_DATE))
app.pt._setCell(offset, node, CellStates.OUT_OF_DATE)
app.broadcastPartitionChanges(changed_list)
conn.setHandler(handler)
node.setConnection(conn, not node.isIdentified())
app.broadcastNodesInformation([node], node)
conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
uuid))
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True)
handler.handlerSwitched(conn, True)
class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......
......@@ -23,6 +23,9 @@ from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
class SecondaryHandler(MasterHandler):
"""Handler used by primary to handle secondary masters"""
def handlerSwitched(self, conn, new):
pass
def _connectionLost(self, conn):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
......@@ -30,21 +33,20 @@ class SecondaryHandler(MasterHandler):
app.broadcastNodesInformation([node])
class ElectionHandler(MasterHandler):
class ElectionHandler(SecondaryHandler):
"""Handler used by primary to handle secondary masters during election"""
def connectionCompleted(self, conn, new=None):
if new is None:
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, (), app.election))
def connectionCompleted(self, conn):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election, (), ()))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
self.connectionLost(conn)
def _acceptIdentification(self, node, *args):
def _acceptIdentification(self, node):
raise PrimaryElected(node)
def _connectionLost(self, *args):
......@@ -66,7 +68,7 @@ class ElectionHandler(MasterHandler):
class PrimaryHandler(ElectionHandler):
"""Handler used by secondaries to handle primary master"""
def _acceptIdentification(self, node, num_partitions, num_replicas):
def _acceptIdentification(self, node):
assert self.app.primary_master is node, (self.app.primary_master, node)
def _connectionLost(self, conn):
......
......@@ -26,10 +26,10 @@ from . import BaseServiceHandler
class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """
def connectionCompleted(self, conn, new):
def handlerSwitched(self, conn, new):
app = self.app
if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new)
super(StorageServiceHandler, self).handlerSwitched(conn, new)
node = app.nm.getByUUID(conn.getUUID())
if node.isRunning(): # node may be PENDING
app.startStorage(node)
......
......@@ -56,6 +56,10 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self._id += 1
return self._id
def setReplicas(self, num_replicas):
assert num_replicas >= 0, num_replicas
self.nr = num_replicas
def make(self, node_list):
"""Make a new partition table from scratch."""
assert self._id is None and node_list, (self._id, node_list)
......@@ -108,26 +112,19 @@ class PartitionTable(neo.lib.pt.PartitionTable):
self.num_filled_rows = len(filter(None, self.partition_list))
return change_list
def load(self, ptid, row_list, nm):
def load(self, ptid, num_replicas, row_list, nm):
"""
Load a partition table from a storage node during the recovery.
Return the new storage nodes registered
"""
# check offsets
for offset, _row in row_list:
if offset >= self.getPartitions():
raise IndexError, offset
# store the partition table
self.clear()
self._id = ptid
new_nodes = []
for offset, row in row_list:
for uuid, state in row:
node = nm.getByUUID(uuid)
if node is None:
node = nm.createStorage(uuid=uuid)
new_nodes.append(node.asTuple())
self._setCell(offset, node, state)
def getByUUID(nid):
node = nm.getByUUID(nid)
if node is None:
node = nm.createStorage(uuid=nid)
new_nodes.append(node.asTuple())
return node
self._load(ptid, num_replicas, row_list, getByUUID)
return new_nodes
def setUpToDate(self, node, offset):
......@@ -166,15 +163,6 @@ class PartitionTable(neo.lib.pt.PartitionTable):
return cell_list
def addNodeList(self, node_list):
"""Add nodes"""
added_list = []
for node in node_list:
if node not in self.count_dict:
self.count_dict[node] = 0
added_list.append(node)
return added_list
def tweak(self, drop_list=()):
"""Optimize partition table
......@@ -183,7 +171,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
few readable cells, some cells are instead marked as FEEDING. This is
a preliminary step to drop these nodes, otherwise the partition table
could become non-operational.
- Other nodes must have the same number of cells, off by 1.
In fact, the code touching these cells is disabled (see NOTE below).
- Other nodes must have the same number of non-feeding cells, off by 1.
- When a transaction creates new objects (oids are roughly allocated
sequentially), we expect better performance by maximizing the number
of involved nodes (i.e. parallelizing writes).
......@@ -232,6 +221,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
# Collect some data in a usable form for the rest of the method.
node_list = {node: {} for node in self.count_dict
if node not in drop_list}
if not node_list:
raise neo.lib.pt.PartitionTableException("Can't remove all nodes.")
drop_list = defaultdict(list)
for offset, row in enumerate(self.partition_list):
for cell in row:
......@@ -420,6 +411,22 @@ class PartitionTable(neo.lib.pt.PartitionTable):
outdated_list[offset] -= 1
for offset, cell in cell_dict.iteritems():
discard_list[offset].append(cell)
# NOTE: The following line disables the next 2 lines, which actually
# causes cells in drop_list to be discarded, now or later;
# drop_list could be renamed into ignore_list.
# 1. Deleting data partition per partition is a lot of work, so
# why ask nodes in drop_list to do that when the goal is
# simply to trash the whole underlying database?
# 2. By excluding nodes from a tweak, it becomes possible to have
# parts of the partition table that are tweaked differently.
# This may require to temporarily change the number of
# replicas for the part being tweaked. In the future, this
# number may be specified in the 'tweak' command, to avoid
# race conditions with setUpToDate().
# Overall, a common use case is when importing a ZODB to NEO,
# to keep the initial importing node up until the database is
# split and replicated to the final nodes.
drop_list = {}
for offset, drop_list in drop_list.iteritems():
discard_list[offset] += drop_list
# We have sorted cells to discard in order to first deallocate nodes
......
......@@ -28,7 +28,7 @@ class RecoveryManager(MasterHandler):
def __init__(self, app):
# The target node's uuid to request next.
self.target_ptid = None
self.target_ptid = 0
self.ask_pt = []
self.backup_tid_dict = {}
self.truncate_dict = {}
......@@ -52,9 +52,8 @@ class RecoveryManager(MasterHandler):
"""
logging.info('begin the recovery of the status')
app = self.app
pt = app.pt
pt = app.pt = app.newPartitionTable()
app.changeClusterState(ClusterStates.RECOVERING)
pt.clear()
self.try_secondary = True
......@@ -113,7 +112,7 @@ class RecoveryManager(MasterHandler):
for node in node_list:
conn = node.getConnection()
conn.send(truncate)
self.connectionCompleted(conn, False)
self.handlerSwitched(conn, False)
continue
node_list = pt.getConnectedNodeList()
break
......@@ -140,12 +139,12 @@ class RecoveryManager(MasterHandler):
logging.info('creating a new partition table')
pt.make(node_list)
self._notifyAdmins(Packets.SendPartitionTable(
pt.getID(), pt.getRowList()))
pt.getID(), pt.getReplicas(), pt.getRowList()))
else:
cell_list = pt.outdate()
if cell_list:
self._notifyAdmins(Packets.NotifyPartitionChanges(
pt.setNextID(), cell_list))
pt.setNextID(), pt.getReplicas(), cell_list))
if app.backup_tid:
pt.setBackupTidDict(self.backup_tid_dict)
app.backup_tid = pt.getBackupTid()
......@@ -175,16 +174,16 @@ class RecoveryManager(MasterHandler):
if node is None or node.getState() == new_state:
return
node.setState(new_state)
# broadcast to all so that admin nodes gets informed
self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn, new):
def handlerSwitched(self, conn, new):
# ask the last IDs to perform the recovery
conn.ask(Packets.AskRecovery())
def answerRecovery(self, conn, ptid, backup_tid, truncate_tid):
uuid = conn.getUUID()
if self.target_ptid <= ptid:
# ptid is None if the node has an empty partition table.
if ptid and self.target_ptid <= ptid:
# Maybe a newer partition table.
if self.target_ptid == ptid and self.ask_pt:
# Another node is already asked.
......@@ -197,17 +196,14 @@ class RecoveryManager(MasterHandler):
self.backup_tid_dict[uuid] = backup_tid
self.truncate_dict[uuid] = truncate_tid
def answerPartitionTable(self, conn, ptid, row_list):
def answerPartitionTable(self, conn, ptid, num_replicas, row_list):
# If this is not from a target node, ignore it.
if ptid == self.target_ptid:
app = self.app
try:
new_nodes = app.pt.load(ptid, row_list, app.nm)
except IndexError:
raise ProtocolError('Invalid offset')
new_nodes = app.pt.load(ptid, num_replicas, row_list, app.nm)
self._notifyAdmins(
Packets.NotifyNodeInformation(monotonic_time(), new_nodes),
Packets.SendPartitionTable(ptid, row_list))
Packets.SendPartitionTable(ptid, num_replicas, row_list))
self.ask_pt = ()
uuid = conn.getUUID()
app.backup_tid = self.backup_tid_dict[uuid]
......
......@@ -16,9 +16,11 @@
import sys
from .neoctl import NeoCTL, NotReadyException
from neo.lib.node import NodeManager
from neo.lib.pt import PartitionTable
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.protocol import uuid_str, formatNodeList, \
ClusterStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
ClusterStates, NodeStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
action_dict = {
'print': {
......@@ -30,6 +32,7 @@ action_dict = {
},
'set': {
'cluster': 'setClusterState',
'replicas': 'setNumReplicas',
},
'check': 'checkReplicas',
'start': 'startCluster',
......@@ -46,6 +49,11 @@ uuid_int = (lambda ns: lambda uuid:
(ns[uuid[0]] << 24) + int(uuid[1:])
)({str(k)[0]: v for k, v in UUID_NAMESPACES.iteritems()})
class dummy_app:
id_timestamp = uuid = 0
class TerminalNeoCTL(object):
def __init__(self, *args, **kw):
self.neoctl = NeoCTL(*args, **kw)
......@@ -67,6 +75,15 @@ class TerminalNeoCTL(object):
asNode = staticmethod(uuid_int)
def formatPartitionTable(self, row_list):
nm = NodeManager()
nm.update(dummy_app, 1,
self.neoctl.getNodeList(node_type=NodeTypes.STORAGE))
pt = object.__new__(PartitionTable)
pt._load(None, None, row_list, nm.getByUUID)
pt.addNodeList(nm.getByStateList(NodeStates.RUNNING))
return '\n'.join(line[4:] for line in pt._format())
def formatRowList(self, row_list):
return '\n'.join('%03d |%s' % (offset,
''.join(' %s - %s |' % (uuid_str(uuid), state)
......@@ -105,10 +122,12 @@ class TerminalNeoCTL(object):
max_offset = int(max_offset)
if node is not None:
node = self.asNode(node)
ptid, row_list = self.neoctl.getPartitionRowList(
ptid, num_replicas, row_list = self.neoctl.getPartitionRowList(
min_offset=min_offset, max_offset=max_offset, node=node)
# TODO: return ptid
return self.formatRowList(row_list)
return '# ptid: %s, replicas: %s\n%s' % (ptid, num_replicas,
self.formatRowList(enumerate(row_list, min_offset))
if min_offset or max_offset else
self.formatPartitionTable(row_list))
def getNodeList(self, params):
"""
......@@ -140,6 +159,18 @@ class TerminalNeoCTL(object):
assert len(params) == 1
return self.neoctl.setClusterState(self.asClusterState(params[0]))
def setNumReplicas(self, params):
"""
Set number of replicas.
Parameters: nr
nr: positive number (0 means no redundancy)
"""
assert len(params) == 1
nr = int(params[0])
if nr < 0:
sys.exit('invalid number of replicas')
return self.neoctl.setNumReplicas(nr)
def startCluster(self, params):
"""
Starts cluster operation after a startup.
......@@ -167,10 +198,18 @@ class TerminalNeoCTL(object):
def tweakPartitionTable(self, params):
"""
Optimize partition table.
No partition will be assigned to specified storage nodes.
Parameters: [node [...]]
No change is done to the specified/down storage nodes and they don't
count as replicas. The purpose of listing nodes is usually to drop
them once the data is replicated to other nodes.
Parameters: [-n] [node [...]]
-n: dry run
"""
return self.neoctl.tweakPartitionTable(map(self.asNode, params))
dry_run = params[0] == '-n'
changed, row_list = self.neoctl.tweakPartitionTable(
map(self.asNode, params[dry_run:]), dry_run)
if changed:
return self.formatPartitionTable(row_list)
return 'No change done.'
def killNode(self, params):
"""
......@@ -203,7 +242,7 @@ class TerminalNeoCTL(object):
node: if "all", ask all connected storage nodes to repair,
otherwise, only the given list of storage nodes.
"""
dry_run = "01".index(params.pop(0))
dry_run = bool("01".index(params.pop(0)))
return self.neoctl.repair(self._getStorageList(params), dry_run)
def truncate(self, params):
......
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from neo.lib.handler import EventHandler
from neo.lib.protocol import ErrorCodes, Packets
......@@ -44,8 +45,8 @@ class CommandEventHandler(EventHandler):
def ack(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.ACK, msg))
def protocolError(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.PROTOCOL_ERROR, msg))
def denied(self, conn, msg):
sys.exit(msg)
def notReady(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.NOT_READY, msg))
......@@ -62,3 +63,4 @@ class CommandEventHandler(EventHandler):
answerLastIDs = __answer(Packets.AnswerLastIDs)
answerLastTransaction = __answer(Packets.AnswerLastTransaction)
answerRecovery = __answer(Packets.AnswerRecovery)
answerTweakPartitionTable = __answer(Packets.AnswerTweakPartitionTable)
......@@ -91,8 +91,14 @@ class NeoCTL(BaseApplication):
raise RuntimeError(response)
return response[2]
def tweakPartitionTable(self, uuid_list=()):
response = self.__ask(Packets.TweakPartitionTable(uuid_list))
def tweakPartitionTable(self, uuid_list=(), dry_run=False):
response = self.__ask(Packets.TweakPartitionTable(dry_run, uuid_list))
if response[0] != Packets.AnswerTweakPartitionTable:
raise RuntimeError(response)
return response[1:]
def setNumReplicas(self, nr):
response = self.__ask(Packets.SetNumReplicas(nr))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
......@@ -163,7 +169,7 @@ class NeoCTL(BaseApplication):
response = self.__ask(packet)
if response[0] != Packets.AnswerPartitionList:
raise RuntimeError(response)
return response[1:3] # ptid, row_list
return response[1:]
def startCluster(self):
"""
......
......@@ -157,27 +157,49 @@ class Log(object):
for x in 'uuid_str', 'Packets', 'PacketMalformedError':
setattr(self, x, g[x])
x = {}
try:
Unpacker = g['Unpacker']
except KeyError:
unpackb = None
else:
from msgpack import ExtraData, UnpackException
def unpackb(data):
u = Unpacker()
u.feed(data)
data = u.unpack()
if u.read_bytes(1):
raise ExtraData
return data
self.PacketMalformedError = UnpackException
self.unpackb = unpackb
if self._decode > 1:
PStruct = g['PStruct']
PBoolean = g['PBoolean']
def hasData(item):
items = item._items
for i, item in enumerate(items):
if isinstance(item, PStruct):
j = hasData(item)
if j:
return (i,) + j
elif (isinstance(item, PBoolean)
and item._name == 'compression'
and i + 2 < len(items)
and items[i+2]._name == 'data'):
return i,
for p in self.Packets.itervalues():
if p._fmt is not None:
path = hasData(p._fmt)
if path:
assert not hasattr(p, '_neolog'), p
x[p._code] = path
try:
PStruct = g['PStruct']
except KeyError:
for p in self.Packets.itervalues():
data_path = getattr(p, 'data_path', (None,))
if p._code >> 15 == data_path[0]:
x[p._code] = data_path[1:]
else:
PBoolean = g['PBoolean']
def hasData(item):
items = item._items
for i, item in enumerate(items):
if isinstance(item, PStruct):
j = hasData(item)
if j:
return (i,) + j
elif (isinstance(item, PBoolean)
and item._name == 'compression'
and i + 2 < len(items)
and items[i+2]._name == 'data'):
return i,
for p in self.Packets.itervalues():
if p._fmt is not None:
path = hasData(p._fmt)
if path:
assert not hasattr(p, '_neolog'), p
x[p._code] = path
self._getDataPath = x.get
try:
......@@ -215,11 +237,13 @@ class Log(object):
if body is not None:
log = getattr(p, '_neolog', None)
if log or self._decode:
p = p()
p._id = msg_id
p._body = body
try:
args = p.decode()
if self.unpackb:
args = self.unpackb(body)
else:
p = p()
p._body = body
args = p.decode()
except self.PacketMalformedError:
msg.append("Can't decode packet")
else:
......
......@@ -51,13 +51,11 @@ UNIT_TEST_MODULES = [
'neo.tests.master.testClientHandler',
'neo.tests.master.testMasterApp',
'neo.tests.master.testMasterPT',
'neo.tests.master.testRecovery',
'neo.tests.master.testStorageHandler',
'neo.tests.master.testTransactions',
# storage application
'neo.tests.storage.testClientHandler',
'neo.tests.storage.testMasterHandler',
'neo.tests.storage.testStorageApp',
'neo.tests.storage.testStorage' + os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
'neo.tests.storage.testTransactions',
# client application
......@@ -298,6 +296,13 @@ class TestRunner(BenchmarkRunner):
x('-S', '--stop-on-success', action='store_true', default=None,
help='Opposite of --stop-on-error: stop as soon as a test'
' passes. Details about errors are not printed at exit.')
x = parser.add_mutually_exclusive_group().add_argument
x('-p', '--dump-protocol', const=True,
dest='protocol', action='store_const',
help='Dump schema of protocol instead of checking it.')
x('-P', '--no-check-protocol', const=False,
dest='protocol', action='store_const',
help='Do not check schema of protocol.')
_('-r', '--readable-tid', action='store_true',
help='Change master behaviour to generate readable TIDs for easier'
' debugging (rather than from current time).')
......@@ -347,6 +352,7 @@ Environment Variables:
coverage = args.coverage,
cov_unit = args.cov_unit,
only = args.only,
protocol = args.protocol,
stop_on_success = args.stop_on_success,
readable_tid = args.readable_tid,
)
......@@ -374,19 +380,26 @@ Environment Variables:
self.__coverage.save()
del self.__coverage
orig(self, success)
try:
for _ in xrange(config.loop):
if config.unit:
runner.run('Unit tests', UNIT_TEST_MODULES, only)
if config.functional:
runner.run('Functional tests', FUNC_TEST_MODULES, only)
if config.zodb:
runner.run('ZODB tests', ZODB_TEST_MODULES, only)
except KeyboardInterrupt:
config['mail_to'] = None
traceback.print_exc()
except StopOnSuccess:
pass
if config.protocol is False:
from contextlib import nested
protocol_checker = nested()
else:
from neo.tests.protocol_checker import protocolChecker
protocol_checker = protocolChecker(config.protocol)
with protocol_checker:
try:
for _ in xrange(config.loop):
if config.unit:
runner.run('Unit tests', UNIT_TEST_MODULES, only)
if config.functional:
runner.run('Functional tests', FUNC_TEST_MODULES, only)
if config.zodb:
runner.run('ZODB tests', ZODB_TEST_MODULES, only)
except KeyboardInterrupt:
config['mail_to'] = None
traceback.print_exc()
except StopOnSuccess:
pass
if config.coverage:
coverage.stop()
if coverage.neotestrunner:
......
......@@ -63,11 +63,16 @@ class Application(BaseApplication):
help="do not delete data of discarded cells, which is useful for"
" big databases because the current implementation is"
" inefficient (this option should disappear in the future)")
_.bool('new-nid',
help="request a new NID from a cluster that is already"
" operational, update the database with the new NID and exit,"
" which makes easier to quickly set up a replica by copying"
" the database of another node while it was stopped")
_ = parser.group('database creation')
_.int('u', 'uuid',
help="specify an UUID to use for this process. Previously"
" assigned UUID takes precedence (i.e. you should"
_.int('i', 'nid',
help="specify an NID to use for this process. Previously"
" assigned NID takes precedence (i.e. you should"
" always use reset with this switch)")
_('e', 'engine', help="database engine (MySQL only)")
_.bool('dedup',
......@@ -118,10 +123,16 @@ class Application(BaseApplication):
self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only
if 'uuid' in config:
self.uuid = config['uuid']
logging.node(self.name, self.uuid)
if config.get('new_nid'):
self.new_nid = [x[0] for x in self.dm.iterAssignedCells()]
if not self.new_nid:
sys.exit('database is empty')
self.uuid = None
else:
self.new_nid = ()
if 'nid' in config: # for testing purpose only
self.uuid = config['nid']
logging.node(self.name, self.uuid)
registerLiveDebugger(on_log=self.log)
......@@ -158,36 +169,27 @@ class Application(BaseApplication):
# load configuration
self.uuid = dm.getUUID()
logging.node(self.name, self.uuid)
num_partitions = dm.getNumPartitions()
num_replicas = dm.getNumReplicas()
ptid = dm.getPTID()
# check partition table configuration
if num_partitions is not None and num_replicas is not None:
if num_partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
# create a partition table
self.pt = PartitionTable(num_partitions, num_replicas)
logging.info('Configuration loaded:')
logging.info('PTID : %s', dump(ptid))
logging.info('PTID : %s', dump(dm.getPTID()))
logging.info('Name : %s', self.name)
logging.info('Partitions: %s', num_partitions)
logging.info('Replicas : %s', num_replicas)
def loadPartitionTable(self):
"""Load a partition table from the database."""
self.pt.clear()
ptid = self.dm.getPTID()
if ptid is None:
self.pt = PartitionTable(0, 0)
return
cell_list = []
row_list = []
for offset, uuid, state in self.dm.getPartitionTable():
while len(row_list) <= offset:
row_list.append([])
# register unknown nodes
if self.nm.getByUUID(uuid) is None:
self.nm.createStorage(uuid=uuid)
cell_list.append((offset, uuid, CellStates[state]))
self.pt.update(ptid, cell_list, self.nm)
row_list[offset].append((uuid, CellStates[state]))
self.pt = object.__new__(PartitionTable)
self.pt.load(ptid, self.dm.getNumReplicas(), row_list, self.nm)
def run(self):
try:
......@@ -247,29 +249,16 @@ class Application(BaseApplication):
Note that I do not accept any connection from non-master nodes
at this stage."""
pt = self.pt
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
bootstrap = BootstrapManager(self, NodeTypes.STORAGE,
None if self.new_nid else self.server,
self.devpath, self.new_nid)
self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
self.dm.setUUID(self.uuid)
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
# table, because the table might be incomplete.
if pt is not None:
self.loadPartitionTable()
if num_partitions != pt.getPartitions():
raise RuntimeError('the number of partitions is inconsistent')
if pt is None or pt.getReplicas() != num_replicas:
# changing number of replicas is not an issue
self.dm.setNumPartitions(num_partitions)
self.dm.setNumReplicas(num_replicas)
self.pt = PartitionTable(num_partitions, num_replicas)
self.loadPartitionTable()
# Reload a partition table from the database,
# in case that we're in RECOVERING phase.
self.loadPartitionTable()
def initialize(self):
logging.debug('initializing...')
......
......@@ -51,7 +51,7 @@ class Checker(object):
else:
conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, (), app.id_timestamp))
uuid, app.server, name, app.id_timestamp, (), ()))
self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict)
conn_set.discard(None)
......
......@@ -33,7 +33,7 @@ from ZODB.FileStorage import FileStorage
from ..app import option_defaults
from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager
from .manager import DatabaseManager, Fallback
from neo.lib import compress, logging, patch, util
from neo.lib.interfaces import implements
from neo.lib.protocol import BackendNotImplemented, MAX_TID
......@@ -216,7 +216,7 @@ class ZODB(object):
self._connect = _connect
config = section.config
if 'read_only' in config.getSectionAttributes():
has_next_oid = config.read_only = hasattr(self, 'next_oid')
has_next_oid = config.read_only = 'next_oid' in self.__dict__
if not has_next_oid:
import gc
# This will reopen read-only as soon as we know the last oid.
......@@ -378,8 +378,8 @@ class ImporterDatabaseManager(DatabaseManager):
conf = self._conf
db = self.db = buildDatabaseManager(conf['adapter'],
(conf['database'], conf.get('engine'), conf['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions
query erase getPartitionTable _iterAssignedCells
for x in """getConfiguration _setConfiguration _getMaxPartition
query erase getPartitionTable iterAssignedCells
updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit
......@@ -396,9 +396,16 @@ class ImporterDatabaseManager(DatabaseManager):
self._writeback.committed()
self.commit = db.commit = commit
def _updateReadable(self):
def _updateReadable(*_):
raise AssertionError
def setUUID(self, nid):
old_nid = self.getUUID()
if old_nid:
assert old_nid == nid, (old_nid, nid)
else:
self.setConfiguration('nid', str(nid))
def changePartitionTable(self, *args, **kw):
self.db.changePartitionTable(*args, **kw)
if self._writeback:
......@@ -413,7 +420,7 @@ class ImporterDatabaseManager(DatabaseManager):
if self._writeback:
self._writeback.close()
self.db.close()
if isinstance(self.zodb, list): # _setup called
if isinstance(self.zodb, tuple): # _setup called
for zodb in self.zodb:
zodb.close()
......@@ -436,9 +443,13 @@ class ImporterDatabaseManager(DatabaseManager):
self.zodb_ltid = max(x.ltid for x in self.zodb)
zodb = self.zodb[-1]
self.zodb_loid = zodb.shift_oid + zodb.next_oid - 1
self.zodb_tid = self.db.getLastTID(self.zodb_ltid) or 0
if callable(self._import):
self._import = self._import()
self.zodb_tid = self._getMaxPartition() is not None and \
self.db.getLastTID(self.zodb_ltid) or 0
if callable(self._import): # XXX: why ?
if self.zodb_tid == self.zodb_ltid:
self._finished()
else:
self._import = self._import()
def doOperation(self, app):
if self._import:
......@@ -498,12 +509,19 @@ class ImporterDatabaseManager(DatabaseManager):
if process:
process.join()
self.commit()
self._finished()
def _finished(self):
logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.")
self._import = None
for x in """getObject getReplicationTIDList getReplicationObjectList
_fetchObject _getDataTID getLastObjectTID
""".split():
setattr(self, x, getattr(self.db, x))
for zodb in self.zodb:
zodb.close()
self.zodb = None
def _iter_zodb(self, zodb_list):
util.setproctitle('neostorage: import')
......@@ -556,6 +574,19 @@ class ImporterDatabaseManager(DatabaseManager):
return (max(tid, util.p64(self.zodb_ltid)),
max(oid, util.p64(self.zodb_loid)))
def _getObject(self, oid, tid=None, before_tid=None):
p64 = util.p64
r = self.getObject(p64(oid),
None if tid is None else p64(tid),
None if before_tid is None else p64(before_tid))
if r:
serial, next_serial, compression, checksum, data, data_serial = r
u64 = util.u64
return (u64(serial),
next_serial and u64(next_serial),
compression, checksum, data,
data_serial and u64(data_serial))
def getObject(self, oid, tid=None, before_tid=None):
u64 = util.u64
u_oid = u64(oid)
......@@ -623,7 +654,11 @@ class ImporterDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None):
# Even if everything is imported, we can't truncate below
# because it would import again if we restart with this backend.
if min_tid < self.zodb_ltid:
# This is also incompatible with writeback, because ZODB has
# no API to truncate.
if min_tid < self.zodb_ltid or self._writeback:
# XXX: That's late to fail. The master should ask storage nodes
# whether truncation is possible before going further.
raise NotImplementedError
self.db._deleteRange(partition, min_tid, max_tid)
......@@ -667,6 +702,12 @@ class ImporterDatabaseManager(DatabaseManager):
length, partition)
return r
def _fetchObject(*_):
raise AssertionError
getLastObjectTID = Fallback.getLastObjectTID.__func__
_getDataTID = Fallback._getDataTID.__func__
def getObjectHistory(self, *args, **kw):
raise BackendNotImplemented(self.getObjectHistory)
......@@ -678,6 +719,7 @@ class WriteBack(object):
_changed = False
_process = None
chunk_size = 100
def __init__(self, db, storage):
self._db = db
......@@ -705,7 +747,7 @@ class WriteBack(object):
self._event = Event()
self._idle = Event()
self._stop = Event()
self._np = self._db.getNumPartitions()
self._np = 1 + self._db._getMaxPartition()
self._db = cPickle.dumps(self._db, 2)
self._process = Process(target=self._run)
self._process.daemon = True
......@@ -737,7 +779,6 @@ class WriteBack(object):
def iterator(self):
db = self._db
np = self._np
chunk_size = max(2, 1000 // np)
offset_list = xrange(np)
while 1:
with db:
......@@ -748,23 +789,26 @@ class WriteBack(object):
if np == len(db._readable_set):
while 1:
tid_list = []
loop = False
max_tid = MAX_TID
for offset in offset_list:
x = db.getReplicationTIDList(
self.min_tid, MAX_TID, chunk_size, offset)
self.min_tid, max_tid, self.chunk_size, offset)
tid_list += x
if len(x) == chunk_size:
loop = True
if tid_list:
tid_list.sort()
for tid in tid_list:
if self._stop.is_set():
return
yield TransactionRecord(db, tid)
if len(x) == self.chunk_size:
max_tid = x[-1]
if not tid_list:
break
tid_list.sort()
for tid in tid_list:
if self._stop.is_set():
return
yield TransactionRecord(db, tid)
if tid == max_tid:
break
else:
self.min_tid = util.add64(tid, 1)
if loop:
continue
break
break
self.min_tid = util.add64(tid, 1)
if not self._event.is_set():
self._idle.set()
self._event.wait()
......@@ -785,7 +829,10 @@ class TransactionRecord(BaseStorage.TransactionRecord):
def __iter__(self):
tid = self.tid
for oid in self._oid_list:
_, compression, _, data, data_tid = self._db.fetchObject(oid, tid)
r = self._db.fetchObject(oid, tid)
if r is None: # checkCurrentSerialInTransaction
continue
_, compression, _, data, data_tid = r
if data is not None:
data = compress.decompress_list[compression](data)
yield BaseStorage.DataRecord(oid, tid, data, data_tid)
This diff is collapsed.
This diff is collapsed.
......@@ -79,6 +79,7 @@ class SQLiteDatabaseManager(DatabaseManager):
def _connect(self):
logging.info('connecting to SQLite database %r', self.db)
self.conn = sqlite3.connect(self.db, check_same_thread=False)
self.conn.text_factory = str
self.lock(self.db)
if self.UNSAFE:
q = self.query
......@@ -144,6 +145,12 @@ class SQLiteDatabaseManager(DatabaseManager):
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state END")
# Let's wait for a more important change to clean up,
# so that users can still downgrade.
if 0:
def _migrate4(self, schema_dict, index_dict):
self._setConfiguration('partitions', None)
def _setup(self, dedup=False):
# BBB: SQLite has transactional DDL but before Python 3.6,
# the binding automatically commits between such statements.
......@@ -265,6 +272,9 @@ class SQLiteDatabaseManager(DatabaseManager):
else:
q("REPLACE INTO config VALUES (?,?)", (key, str(value)))
def _getMaxPartition(self):
return self.query("SELECT MAX(`partition`) FROM pt").next()[0]
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
......@@ -451,8 +461,12 @@ class SQLiteDatabaseManager(DatabaseManager):
return r
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data WHERE id=?", (data_id,)).fetchone()
compression, checksum, data = self.query(
"SELECT compression, hash, value FROM data WHERE id=?",
(data_id,)).fetchone()
if checksum:
return compression, str(checksum), str(data)
return compression, checksum, data
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getReadablePartition(oid)
......@@ -712,5 +726,5 @@ class SQLiteDatabaseManager(DatabaseManager):
main[-1:-1] = data
return '\n'.join(main) + '\n'
def restore(self, sql):
def _restore(self, sql):
self.conn.executescript(sql)
......@@ -65,14 +65,14 @@ class BaseMasterHandler(BaseHandler):
# See comment in ClientOperationHandler.connectionClosed
self.app.tm.abortFor(uuid, even_if_voted=True)
def notifyPartitionChanges(self, conn, ptid, cell_list):
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id')
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.dm.changePartitionTable(ptid, num_replicas, cell_list)
if app.operational:
app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit()
......
......@@ -53,7 +53,7 @@ class ClientOperationHandler(BaseHandler):
p = Errors.TidNotFound('%s does not exist' % dump(tid))
else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[4], t[0])
bool(t[4]), t[0])
conn.answer(p)
def getEventQueue(self):
......@@ -183,12 +183,13 @@ class ClientOperationHandler(BaseHandler):
getObjectFromTransaction = app.tm.getObjectFromTransaction
object_tid_dict = {}
for oid in oid_list:
current_serial, undo_serial, is_current = findUndoTID(oid, ttid,
r = findUndoTID(oid, ttid,
ltid, undone_tid, getObjectFromTransaction(ttid, oid))
if current_serial is None:
p = Errors.OidNotFound(dump(oid))
break
object_tid_dict[oid] = (current_serial, undo_serial, is_current)
if r:
if not r[0]:
p = Errors.OidNotFound(dump(oid))
break
object_tid_dict[oid] = r
else:
p = Packets.AnswerObjectUndoSerial(object_tid_dict)
conn.answer(p)
......
......@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
devpath, id_timestamp):
id_timestamp, devpath, new_nid):
self.checkClusterName(name)
app = self.app
# reject any incoming connections if not ready
......@@ -65,6 +65,5 @@ class IdentificationHandler(EventHandler):
conn.setHandler(handler)
node.setConnection(conn, force)
# accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid))
handler.connectionCompleted(conn)
conn.answer(Packets.AcceptIdentification(
NodeTypes.STORAGE, uuid and app.uuid, uuid))
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -26,7 +26,6 @@ class MasterAppTests(NeoUnitTestBase):
# create an application object
config = self.getMasterConfiguration()
self.app = Application(config)
self.app.pt.clear()
def _tearDown(self, success):
self.app.close()
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment