Commit c681f666 authored by Julien Muchembled's avatar Julien Muchembled

Bump protocol version

parents 2b9e14e8 c156f11a
This diff is collapsed.
...@@ -14,19 +14,19 @@ ...@@ -14,19 +14,19 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging, protocol from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets from neo.lib.protocol import uuid_str, \
NodeTypes, NotReadyError, Packets, ProtocolError
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
NOT_CONNECTED_MESSAGE = 'Not connected to a primary master.'
def AdminEventHandlerType(name, bases, d): def AdminEventHandlerType(name, bases, d):
def check_primary_master(func): def check_connection(func):
def wrapper(self, *args, **kw): return lambda self, conn, *args, **kw: \
if self.app.master_conn is not None: self._checkConnection(conn) and func(self, conn, *args, **kw)
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass): def forward_ask(klass):
return lambda self, conn, *args: self.app.master_conn.ask( return lambda self, conn, *args: self.app.master_conn.ask(
...@@ -47,7 +47,7 @@ def AdminEventHandlerType(name, bases, d): ...@@ -47,7 +47,7 @@ def AdminEventHandlerType(name, bases, d):
Packets.TweakPartitionTable, Packets.TweakPartitionTable,
): ):
d[x.handler_method_name] = forward_ask(x) d[x.handler_method_name] = forward_ask(x)
return type(name, bases, {k: v if k[0] == '_' else check_primary_master(v) return type(name, bases, {k: v if k[0] == '_' else check_connection(v)
for k, v in d.iteritems()}) for k, v in d.iteritems()})
class AdminEventHandler(EventHandler): class AdminEventHandler(EventHandler):
...@@ -55,6 +55,26 @@ class AdminEventHandler(EventHandler): ...@@ -55,6 +55,26 @@ class AdminEventHandler(EventHandler):
__metaclass__ = AdminEventHandlerType __metaclass__ = AdminEventHandlerType
def _checkConnection(self, conn):
if self.app.master_conn is None:
raise NotReadyError(NOT_CONNECTED_MESSAGE)
return True
def requestIdentification(self, conn, node_type, uuid, address, name, *_):
if node_type != NodeTypes.ADMIN:
raise ProtocolError("reject non-admin node")
app = self.app
try:
backup = app.backup_dict[name]
except KeyError:
raise ProtocolError("unknown backup cluster %r" % name)
if backup.conn is not None:
raise ProtocolError("already connected")
backup.conn = conn
conn.setHandler(app.backup_handler)
conn.answer(Packets.AcceptIdentification(
NodeTypes.ADMIN, None, None))
def askPartitionList(self, conn, min_offset, max_offset, uuid): def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s", logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid)) min_offset, max_offset, uuid_str(uuid))
...@@ -83,6 +103,9 @@ class AdminEventHandler(EventHandler): ...@@ -83,6 +103,9 @@ class AdminEventHandler(EventHandler):
self.app.master_conn.send(Packets.FlushLog()) self.app.master_conn.send(Packets.FlushLog())
super(AdminEventHandler, self).flushLog(conn) super(AdminEventHandler, self).flushLog(conn)
def askMonitorInformation(self, conn):
self.app.askMonitorInformation(conn)
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
""" This class is just used to dispatch message to right handler""" """ This class is just used to dispatch message to right handler"""
...@@ -104,13 +127,93 @@ class MasterEventHandler(EventHandler): ...@@ -104,13 +127,93 @@ class MasterEventHandler(EventHandler):
forward.send(packet, kw['msg_id']) forward.send(packet, kw['msg_id'])
def answerClusterState(self, conn, state): def answerClusterState(self, conn, state):
self.app.cluster_state = state self.app.updateMonitorInformation(None, cluster_state=state)
notifyClusterInformation = answerClusterState notifyClusterInformation = answerClusterState
def sendPartitionTable(self, conn, ptid, num_replicas, row_list): def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable) app = self.app
pt.load(ptid, num_replicas, row_list, self.app.nm) app.pt = object.__new__(PartitionTable)
app.pt.load(ptid, num_replicas, row_list, app.nm)
app.partitionTableUpdated()
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list): def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm) app = self.app
app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.partitionTableUpdated()
def notifyNodeInformation(self, *args):
super(MasterEventHandler, self).notifyNodeInformation(*args)
self.app.partitionTableUpdated()
def notifyUpstreamAdmin(self, conn, addr):
app = self.app
node = app.upstream_admin
if node is None:
node = app.upstream_admin = app.nm.createAdmin()
elif node.getAddress() == addr:
return
node.setAddress(addr)
if app.upstream_admin_conn:
app.upstream_admin_conn.close()
else:
app.connectToUpstreamAdmin()
def answerLastTransaction(self, conn, ltid):
app = self.app
app.ltid = ltid
app.maybeNotify(None)
def answerRecovery(self, name, ptid, backup_tid, truncate_tid):
self.app.backup_tid = backup_tid
def monitor(func):
def wrapper(self, conn, *args, **kw):
for name, backup in self.app.backup_dict.iteritems():
if backup.conn is conn:
return func(self, name, *args, **kw)
raise AssertionError
return wrapper
class BackupHandler(EventHandler):
@monitor
def connectionClosed(self, name):
app = self.app
app.backup_dict[name] = app.backup_dict[name].__class__()
app.maybeNotify(name)
@monitor
def notifyMonitorInformation(self, name, info):
self.app.updateMonitorInformation(name, **info)
@monitor
def answerRecovery(self, name, ptid, backup_tid, truncate_tid):
self.app.backup_dict[name].backup_tid = backup_tid
@monitor
def answerLastTransaction(self, name, ltid):
app = self.app
app.backup_dict[name].ltid = ltid
app.maybeNotify(name)
class UpstreamAdminHandler(AdminEventHandler):
def _checkConnection(self, conn):
assert conn is self.app.upstream_admin_conn
return super(UpstreamAdminHandler, self)._checkConnection(conn)
def connectionClosed(self, conn):
app = self.app
if conn is app.upstream_admin_conn:
app.connectToUpstreamAdmin()
connectionFailed = connectionClosed
def _acceptIdentification(self, node):
node.send(Packets.NotifyMonitorInformation({
'cluster_state': self.app.cluster_state,
'down': self.app.down,
'pt_summary': self.app.pt_summary,
}))
...@@ -228,7 +228,7 @@ class Application(ThreadedApplication): ...@@ -228,7 +228,7 @@ class Application(ThreadedApplication):
node=node, node=node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, (), ()) self.uuid, None, self.name, None, {})
try: try:
ask(conn, p, handler=handler) ask(conn, p, handler=handler)
except ConnectionClosed: except ConnectionClosed:
...@@ -270,7 +270,7 @@ class Application(ThreadedApplication): ...@@ -270,7 +270,7 @@ class Application(ThreadedApplication):
conn = MTClientConnection(self, self.storage_event_handler, node, conn = MTClientConnection(self, self.storage_event_handler, node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, self.id_timestamp, (), ()) self.uuid, None, self.name, self.id_timestamp, {})
try: try:
self._ask(conn, p, handler=self.storage_bootstrap_handler) self._ask(conn, p, handler=self.storage_bootstrap_handler)
except ConnectionClosed: except ConnectionClosed:
......
...@@ -26,15 +26,14 @@ class BootstrapManager(EventHandler): ...@@ -26,15 +26,14 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it Manage the bootstrap stage, lookup for the primary master then connect to it
""" """
def __init__(self, app, node_type, server=None, devpath=(), new_nid=()): def __init__(self, app, node_type, server=None, **extra):
""" """
Manage the bootstrap stage of a non-master node, it lookup for the Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node primary master node, connect to it then returns when the master node
is ready. is ready.
""" """
self.server = server self.server = server
self.devpath = devpath self.extra = extra
self.new_nid = new_nid
self.node_type = node_type self.node_type = node_type
app.nm.reset() app.nm.reset()
...@@ -43,7 +42,7 @@ class BootstrapManager(EventHandler): ...@@ -43,7 +42,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid, conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, None, self.devpath, self.new_nid)) self.server, self.app.name, None, self.extra))
def connectionFailed(self, conn): def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn) EventHandler.connectionFailed(self, conn)
......
...@@ -18,6 +18,15 @@ import argparse, os, sys ...@@ -18,6 +18,15 @@ import argparse, os, sys
from functools import wraps from functools import wraps
from ConfigParser import SafeConfigParser from ConfigParser import SafeConfigParser
class _DefaultList(list):
"""
Special list type for default values of 'append' argparse actions,
so that the parser restarts from an empty list when the option is
used on the command-line.
"""
def __copy__(self):
return []
class _Required(object): class _Required(object):
...@@ -30,6 +39,8 @@ class _Required(object): ...@@ -30,6 +39,8 @@ class _Required(object):
class _Option(object): class _Option(object):
multiple = False
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
if len(args) > 1: if len(args) > 1:
self.short, self.name = args self.short, self.name = args
...@@ -51,7 +62,12 @@ class _Option(object): ...@@ -51,7 +62,12 @@ class _Option(object):
action.required = _Required(option_list, self.name) action.required = _Required(option_list, self.name)
def fromConfigFile(self, cfg, section): def fromConfigFile(self, cfg, section):
return self(cfg.get(section, self.name.replace('-', '_'))) value = cfg.get(section, self.name.replace('-', '_'))
if self.multiple:
return [self(value)
for value in value.splitlines()
if value]
return self(value)
@staticmethod @staticmethod
def parse(value): def parse(value):
...@@ -81,6 +97,11 @@ class Option(_Option): ...@@ -81,6 +97,11 @@ class Option(_Option):
kw[x] = getattr(self, x) kw[x] = getattr(self, x)
except AttributeError: except AttributeError:
pass pass
if self.multiple:
kw['action'] = 'append'
default = kw.get('default')
if default:
kw['default'] = _DefaultList(default)
return kw return kw
@staticmethod @staticmethod
...@@ -132,9 +153,6 @@ class OptionGroup(object): ...@@ -132,9 +153,6 @@ class OptionGroup(object):
class Argument(Option): class Argument(Option):
def __init__(self, name, **kw):
super(Argument, self).__init__(name, **kw)
def _asArgparse(self, parser, option_list): def _asArgparse(self, parser, option_list):
kw = {'help': self.help, 'type': self} kw = {'help': self.help, 'type': self}
for x in 'default', 'metavar', 'nargs', 'choices': for x in 'default', 'metavar', 'nargs', 'choices':
......
...@@ -28,7 +28,7 @@ class Node(object): ...@@ -28,7 +28,7 @@ class Node(object):
_connection = None _connection = None
_identified = False _identified = False
devpath = () extra = {}
id_timestamp = None id_timestamp = None
def __init__(self, manager, address=None, uuid=None, state=NodeStates.DOWN): def __init__(self, manager, address=None, uuid=None, state=NodeStates.DOWN):
......
...@@ -20,7 +20,7 @@ from msgpack import packb ...@@ -20,7 +20,7 @@ from msgpack import packb
# The protocol version must be increased whenever upgrading a node may require # The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. # to upgrade other nodes.
PROTOCOL_VERSION = 0 PROTOCOL_VERSION = 1
# By encoding the handshake packet with msgpack, the whole NEO stream can be # By encoding the handshake packet with msgpack, the whole NEO stream can be
# decoded with msgpack. The first byte is 0x92, which is different from TLS # decoded with msgpack. The first byte is 0x92, which is different from TLS
# Handshake (0x16). # Handshake (0x16).
...@@ -312,6 +312,8 @@ class Packet(object): ...@@ -312,6 +312,8 @@ class Packet(object):
class PacketRegistryFactory(dict): class PacketRegistryFactory(dict):
_next_code = 0
def __call__(self, name, base, d): def __call__(self, name, base, d):
for k, v in d.items(): for k, v in d.items():
if isinstance(v, type) and issubclass(v, Packet): if isinstance(v, type) and issubclass(v, Packet):
...@@ -323,10 +325,9 @@ class PacketRegistryFactory(dict): ...@@ -323,10 +325,9 @@ class PacketRegistryFactory(dict):
def register(self, doc, ignore_when_closed=None, request=False, error=False, def register(self, doc, ignore_when_closed=None, request=False, error=False,
_base=(Packet,), **kw): _base=(Packet,), **kw):
""" Register a packet in the packet registry """ """ Register a packet in the packet registry """
code = len(self) code = self._next_code
if doc is None: assert code < RESPONSE_MASK
self[code] = None self._next_code = code + 1
return # None registered only to skip a code number (for compatibility)
if error and not request: if error and not request:
assert not code assert not code
code = RESPONSE_MASK code = RESPONSE_MASK
...@@ -826,6 +827,18 @@ class Packets(dict): ...@@ -826,6 +827,18 @@ class Packets(dict):
:nodes: ctl -> A -> M -> * :nodes: ctl -> A -> M -> *
""") """)
AskMonitorInformation, AnswerMonitorInformation = request("""
:nodes: ctl -> A
""")
NotifyMonitorInformation = notify("""
:nodes: A -> A
""")
NotifyUpstreamAdmin = notify("""
:nodes: M -> A
""")
del notify, request del notify, request
......
...@@ -39,7 +39,8 @@ nextafter() ...@@ -39,7 +39,8 @@ nextafter()
TID_LOW_OVERFLOW = 2**32 TID_LOW_OVERFLOW = 2**32
TID_LOW_MAX = TID_LOW_OVERFLOW - 1 TID_LOW_MAX = TID_LOW_OVERFLOW - 1
SECOND_PER_TID_LOW = 60.0 / TID_LOW_OVERFLOW SECOND_FROM_UINT32 = 60. / TID_LOW_OVERFLOW
MICRO_FROM_UINT32 = 1e6 / TID_LOW_OVERFLOW
TID_CHUNK_RULES = ( TID_CHUNK_RULES = (
(-1900, 0), (-1900, 0),
(-1, 12), (-1, 12),
...@@ -52,7 +53,7 @@ def tidFromTime(tm): ...@@ -52,7 +53,7 @@ def tidFromTime(tm):
gmt = gmtime(tm) gmt = gmtime(tm)
return packTID( return packTID(
(gmt.tm_year, gmt.tm_mon, gmt.tm_mday, gmt.tm_hour, gmt.tm_min), (gmt.tm_year, gmt.tm_mon, gmt.tm_mday, gmt.tm_hour, gmt.tm_min),
int((gmt.tm_sec + (tm - int(tm))) / SECOND_PER_TID_LOW)) int((gmt.tm_sec + (tm - int(tm))) / SECOND_FROM_UINT32))
def packTID(higher, lower): def packTID(higher, lower):
""" """
...@@ -95,15 +96,10 @@ def unpackTID(ptid): ...@@ -95,15 +96,10 @@ def unpackTID(ptid):
higher.reverse() higher.reverse()
return (tuple(higher), lower) return (tuple(higher), lower)
def timeStringFromTID(ptid): def datetimeFromTID(tid):
""" higher, lower = unpackTID(tid)
Return a string in the format "yyyy-mm-dd hh:mm:ss.ssssss" from a TID seconds, lower = divmod(lower * 60, TID_LOW_OVERFLOW)
""" return datetime(*(higher + (seconds, int(lower * MICRO_FROM_UINT32))))
higher, lower = unpackTID(ptid)
seconds = lower * SECOND_PER_TID_LOW
return '%04d-%02d-%02d %02d:%02d:%09.6f' % (higher[0], higher[1], higher[2],
higher[3], higher[4], seconds)
def addTID(ptid, offset): def addTID(ptid, offset):
""" """
......
...@@ -182,12 +182,15 @@ class Application(BaseApplication): ...@@ -182,12 +182,15 @@ class Application(BaseApplication):
self.playPrimaryRole() self.playPrimaryRole()
self.playSecondaryRole() self.playSecondaryRole()
def getNodeInformationDict(self, node_list): def getNodeInformationGetter(self, node_list):
node_dict = defaultdict(list) node_dict = defaultdict(list)
admin_dict = defaultdict(list)
# group modified nodes by destination node type # group modified nodes by destination node type
for node in node_list: for node in node_list:
node_info = node.asTuple() node_info = node.asTuple()
if node.isAdmin(): if node.isAdmin():
for backup in node.extra.get('backup', ()):
admin_dict[backup].append(node_info)
continue continue
node_dict[NodeTypes.ADMIN].append(node_info) node_dict[NodeTypes.ADMIN].append(node_info)
node_dict[NodeTypes.STORAGE].append(node_info) node_dict[NodeTypes.STORAGE].append(node_info)
...@@ -197,18 +200,27 @@ class Application(BaseApplication): ...@@ -197,18 +200,27 @@ class Application(BaseApplication):
if node.isStorage(): if node.isStorage():
continue continue
node_dict[NodeTypes.MASTER].append(node_info) node_dict[NodeTypes.MASTER].append(node_info)
return node_dict def getNodeListFor(node):
node_list = node_dict.get(node.getType())
if node.isClient():
admin_list = admin_dict.get(node.extra.get('backup'))
if admin_list:
if node_list:
return node_list + admin_list
return admin_list
return node_list
return getNodeListFor
def broadcastNodesInformation(self, node_list): def broadcastNodesInformation(self, node_list):
""" """
Broadcast changes for a set a nodes Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth Send only one packet per connection to reduce bandwidth
""" """
node_dict = self.getNodeInformationDict(node_list) getNodeListFor = self.getNodeInformationGetter(node_list)
now = monotonic_time() now = monotonic_time()
# send at most one non-empty notification packet per node # send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType()) node_list = getNodeListFor(node)
# We don't skip pending storage nodes because we don't send them # We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite # the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters. # useful to notify them about new masters.
......
...@@ -99,7 +99,8 @@ class BackupApplication(object): ...@@ -99,7 +99,8 @@ class BackupApplication(object):
pt = app.pt pt = app.pt
while True: while True:
app.changeClusterState(ClusterStates.STARTING_BACKUP) app.changeClusterState(ClusterStates.STARTING_BACKUP)
bootstrap = BootstrapManager(self, NodeTypes.CLIENT) bootstrap = BootstrapManager(self, NodeTypes.CLIENT,
backup=app.name)
# {offset -> node} # {offset -> node}
self.primary_partition_dict = {} self.primary_partition_dict = {}
# [[tid]] # [[tid]]
...@@ -367,3 +368,9 @@ class BackupApplication(object): ...@@ -367,3 +368,9 @@ class BackupApplication(object):
uuid_str(cell.getUUID()), offset, uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID())) dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p) cell.getNode().send(p)
def notifyUpstreamAdmin(self, addr):
node_list = self.app.nm.getAdminList(only_identified=True)
if node_list:
min(node_list, key=lambda node: node.getUUID()).send(
Packets.NotifyUpstreamAdmin(addr))
...@@ -52,7 +52,7 @@ class MasterHandler(EventHandler): ...@@ -52,7 +52,7 @@ class MasterHandler(EventHandler):
node_list = app.nm.getList() node_list = app.nm.getList()
node_list.remove(node) node_list.remove(node)
node_list = ([node.asTuple()] # for id_timestamp node_list = ([node.asTuple()] # for id_timestamp
+ app.getNodeInformationDict(node_list)[node.getType()]) + app.getNodeInformationGetter(node_list)(node))
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list)) conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def handlerSwitched(self, conn, new): def handlerSwitched(self, conn, new):
......
...@@ -58,6 +58,12 @@ class AdministrationHandler(MasterHandler): ...@@ -58,6 +58,12 @@ class AdministrationHandler(MasterHandler):
def handlerSwitched(self, conn, new): def handlerSwitched(self, conn, new):
assert new assert new
super(AdministrationHandler, self).handlerSwitched(conn, new) super(AdministrationHandler, self).handlerSwitched(conn, new)
app = self.app.backup_app
if app is not None:
for node in app.nm.getAdminList():
if node.isRunning():
app.notifyUpstreamAdmin(node.getAddress())
break
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import ZERO_TID from neo.lib.protocol import NodeTypes, NodeStates, Packets, ZERO_TID
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler): class BackupHandler(EventHandler):
...@@ -36,6 +36,13 @@ class BackupHandler(EventHandler): ...@@ -36,6 +36,13 @@ class BackupHandler(EventHandler):
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list): def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm) self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(BackupHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, addr, _, state, _ in node_list:
if node_type == NodeTypes.ADMIN and state == NodeStates.RUNNING:
self.app.notifyUpstreamAdmin(addr)
def answerLastTransaction(self, conn, tid): def answerLastTransaction(self, conn, tid):
app = self.app app = self.app
prev_tid = app.app.getLastTransaction() prev_tid = app.app.getLastTransaction()
......
...@@ -24,7 +24,7 @@ from ..app import monotonic_time ...@@ -24,7 +24,7 @@ from ..app import monotonic_time
class IdentificationHandler(EventHandler): class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid, def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp, devpath, new_nid): address, name, id_timestamp, extra):
app = self.app app = self.app
self.checkClusterName(name) self.checkClusterName(name)
if address == app.server: if address == app.server:
...@@ -60,6 +60,7 @@ class IdentificationHandler(EventHandler): ...@@ -60,6 +60,7 @@ class IdentificationHandler(EventHandler):
# cloned/evil/buggy node connecting to us # cloned/evil/buggy node connecting to us
raise ProtocolError('already connected') raise ProtocolError('already connected')
new_nid = extra.pop('new_nid', None)
state = NodeStates.RUNNING state = NodeStates.RUNNING
if node_type == NodeTypes.CLIENT: if node_type == NodeTypes.CLIENT:
if app.cluster_state == ClusterStates.RUNNING: if app.cluster_state == ClusterStates.RUNNING:
...@@ -111,8 +112,7 @@ class IdentificationHandler(EventHandler): ...@@ -111,8 +112,7 @@ class IdentificationHandler(EventHandler):
uuid=uuid, address=address) uuid=uuid, address=address)
else: else:
node.setUUID(uuid) node.setUUID(uuid)
if devpath: node.extra = extra
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time() node.id_timestamp = monotonic_time()
node.setState(state) node.setState(state)
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
...@@ -135,7 +135,7 @@ class IdentificationHandler(EventHandler): ...@@ -135,7 +135,7 @@ class IdentificationHandler(EventHandler):
class SecondaryIdentificationHandler(EventHandler): class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid, def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp, devpath, new_nid): address, name, id_timestamp, extra):
app = self.app app = self.app
self.checkClusterName(name) self.checkClusterName(name)
if address == app.server: if address == app.server:
......
...@@ -40,7 +40,7 @@ class ElectionHandler(SecondaryHandler): ...@@ -40,7 +40,7 @@ class ElectionHandler(SecondaryHandler):
super(ElectionHandler, self).connectionCompleted(conn) super(ElectionHandler, self).connectionCompleted(conn)
app = self.app app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER, conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election, (), ())) app.uuid, app.server, app.name, app.election, {}))
def connectionFailed(self, conn): def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn) super(ElectionHandler, self).connectionFailed(conn)
......
...@@ -250,7 +250,7 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -250,7 +250,7 @@ class PartitionTable(neo.lib.pt.PartitionTable):
devpath_max = [] devpath_max = []
devpaths = [()] * node_count devpaths = [()] * node_count
if repeats > 1: if repeats > 1:
_devpaths = [x[0].devpath for x in node_list] _devpaths = [x[0].extra.get('devpath', ()) for x in node_list]
max_depth = min(map(len, _devpaths)) max_depth = min(map(len, _devpaths))
depth = 0 depth = 0
while 1: while 1:
......
...@@ -14,11 +14,11 @@ ...@@ -14,11 +14,11 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys import json, sys
from .neoctl import NeoCTL, NotReadyException from .neoctl import NeoCTL, NotReadyException
from neo.lib.node import NodeManager from neo.lib.node import NodeManager
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID from neo.lib.util import p64, u64, datetimeFromTID, tidFromTime
from neo.lib.protocol import uuid_str, formatNodeList, \ from neo.lib.protocol import uuid_str, formatNodeList, \
ClusterStates, NodeStates, NodeTypes, UUID_NAMESPACES, ZERO_TID ClusterStates, NodeStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
...@@ -29,6 +29,7 @@ action_dict = { ...@@ -29,6 +29,7 @@ action_dict = {
'node': 'getNodeList', 'node': 'getNodeList',
'cluster': 'getClusterState', 'cluster': 'getClusterState',
'primary': 'getPrimary', 'primary': 'getPrimary',
'summary': 'getSummary',
}, },
'set': { 'set': {
'cluster': 'setClusterState', 'cluster': 'setClusterState',
...@@ -100,12 +101,12 @@ class TerminalNeoCTL(object): ...@@ -100,12 +101,12 @@ class TerminalNeoCTL(object):
if backup_tid: if backup_tid:
ltid = self.neoctl.getLastTransaction() ltid = self.neoctl.getLastTransaction()
r = "backup_tid = 0x%x (%s)" % (u64(backup_tid), r = "backup_tid = 0x%x (%s)" % (u64(backup_tid),
timeStringFromTID(backup_tid)) datetimeFromTID(backup_tid))
else: else:
loid, ltid = self.neoctl.getLastIds() loid, ltid = self.neoctl.getLastIds()
r = "last_oid = 0x%x" % (u64(loid)) r = "last_oid = 0x%x" % (u64(loid))
return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \ return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \
(u64(ltid), timeStringFromTID(ltid), ptid) (u64(ltid), datetimeFromTID(ltid), ptid)
def getPartitionRowList(self, params): def getPartitionRowList(self, params):
""" """
...@@ -159,6 +160,21 @@ class TerminalNeoCTL(object): ...@@ -159,6 +160,21 @@ class TerminalNeoCTL(object):
assert len(params) == 1 assert len(params) == 1
return self.neoctl.setClusterState(self.asClusterState(params[0])) return self.neoctl.setClusterState(self.asClusterState(params[0]))
def getSummary(self, params):
"""
Get a summary of the health of this cluster and backups.
The first line reports severities: it is a commented json dump of
{severity: [backup_name | null]}
where severity is either "warning" or "problem"
and null refers to this cluster
"""
assert len(params) == 0
warning, problem, summary = self.neoctl.getMonitorInformation()
return "# %s\n%s" % (json.dumps({k: v for k, v in zip(
('warning', 'problem'),
(warning, problem),
) if v}), summary)
def setNumReplicas(self, params): def setNumReplicas(self, params):
""" """
Set number of replicas. Set number of replicas.
......
...@@ -64,3 +64,4 @@ class CommandEventHandler(EventHandler): ...@@ -64,3 +64,4 @@ class CommandEventHandler(EventHandler):
answerLastTransaction = __answer(Packets.AnswerLastTransaction) answerLastTransaction = __answer(Packets.AnswerLastTransaction)
answerRecovery = __answer(Packets.AnswerRecovery) answerRecovery = __answer(Packets.AnswerRecovery)
answerTweakPartitionTable = __answer(Packets.AnswerTweakPartitionTable) answerTweakPartitionTable = __answer(Packets.AnswerTweakPartitionTable)
answerMonitorInformation = __answer(Packets.AnswerMonitorInformation)
...@@ -216,3 +216,9 @@ class NeoCTL(BaseApplication): ...@@ -216,3 +216,9 @@ class NeoCTL(BaseApplication):
conn.send(Packets.FlushLog()) conn.send(Packets.FlushLog())
while conn.pending(): while conn.pending():
self.em.poll(1) self.em.poll(1)
def getMonitorInformation(self):
response = self.__ask(Packets.AskMonitorInformation())
if response[0] != Packets.AnswerMonitorInformation:
raise RuntimeError(response)
return response[1:]
...@@ -252,7 +252,7 @@ class Application(BaseApplication): ...@@ -252,7 +252,7 @@ class Application(BaseApplication):
# search, find, connect and identify to the primary master # search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, bootstrap = BootstrapManager(self, NodeTypes.STORAGE,
None if self.new_nid else self.server, None if self.new_nid else self.server,
self.devpath, self.new_nid) devpath=self.devpath, new_nid=self.new_nid)
self.master_node, self.master_conn = bootstrap.getPrimaryConnection() self.master_node, self.master_conn = bootstrap.getPrimaryConnection()
self.dm.setUUID(self.uuid) self.dm.setUUID(self.uuid)
......
...@@ -51,7 +51,7 @@ class Checker(object): ...@@ -51,7 +51,7 @@ class Checker(object):
else: else:
conn = ClientConnection(app, StorageOperationHandler(app), node) conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE, conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, app.id_timestamp, (), ())) uuid, app.server, name, app.id_timestamp, {}))
self.conn_dict[conn] = node.isIdentified() self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict) conn_set = set(self.conn_dict)
conn_set.discard(None) conn_set.discard(None)
......
...@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler): ...@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name, def requestIdentification(self, conn, node_type, uuid, address, name,
id_timestamp, devpath, new_nid): id_timestamp, extra):
self.checkClusterName(name) self.checkClusterName(name)
app = self.app app = self.app
# reject any incoming connections if not ready # reject any incoming connections if not ready
......
...@@ -350,7 +350,7 @@ class Replicator(object): ...@@ -350,7 +350,7 @@ class Replicator(object):
try: try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE, conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name, None if name else app.uuid, app.server, name or app.name,
app.id_timestamp, (), ())) app.id_timestamp, {}))
except ConnectionClosed: except ConnectionClosed:
if previous_node is self.current_node: if previous_node is self.current_node:
return return
......
...@@ -14,12 +14,21 @@ ...@@ -14,12 +14,21 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from functools import partial
import unittest import unittest
import transaction import transaction
from neo.lib.protocol import NodeStates from neo.lib.protocol import NodeStates
from neo.neoctl.app import TerminalNeoCTL
from . import NEOCluster, NEOFunctionalTest from . import NEOCluster, NEOFunctionalTest
class TerminalNeoCTL(TerminalNeoCTL):
def __init__(self, cluster):
self.neoctl = cluster.neoctl
def __del__(self):
pass
class ClusterTests(NEOFunctionalTest): class ClusterTests(NEOFunctionalTest):
def _tearDown(self, success): def _tearDown(self, success):
...@@ -118,12 +127,20 @@ class ClusterTests(NEOFunctionalTest): ...@@ -118,12 +127,20 @@ class ClusterTests(NEOFunctionalTest):
self.neo.start() self.neo.start()
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
self.neo.expectOudatedCells(0) self.neo.expectOudatedCells(0)
# check neoctl cli
getSummary = partial(TerminalNeoCTL(self.neo).getSummary, ())
ok_empty = '# {}\nRUNNING;' \
' UP_TO_DATE=1; ltid=0000000000000000 (1900-01-01 00:00:00)'
self.assertEqual(getSummary(), ok_empty)
# connect a client a check it's known # connect a client a check it's known
db, conn = self.neo.getZODBConnection() db, conn = self.neo.getZODBConnection()
self.assertEqual(len(self.neo.getClientlist()), 1) self.assertEqual(len(self.neo.getClientlist()), 1)
# drop the storage, the cluster is no more operational... # drop the storage, the cluster is no more operational...
self.neo.getStorageProcessList()[0].stop() self.neo.getStorageProcessList()[0].stop()
self.neo.expectClusterRecovering() self.neo.expectClusterRecovering()
# check severity returned by the cli
self.assertEqual(getSummary(),
'# {"problem": [null]}\nRECOVERING; UP_TO_DATE=1; DOWN=1')
# ...and the client gets disconnected # ...and the client gets disconnected
self.assertEqual(len(self.neo.getClientlist()), 0) self.assertEqual(len(self.neo.getClientlist()), 0)
# restart storage so that the cluster is operational again # restart storage so that the cluster is operational again
...@@ -134,6 +151,9 @@ class ClusterTests(NEOFunctionalTest): ...@@ -134,6 +151,9 @@ class ClusterTests(NEOFunctionalTest):
conn.root()['plop'] = 1 conn.root()['plop'] = 1
transaction.commit() transaction.commit()
self.assertEqual(len(self.neo.getClientlist()), 1) self.assertEqual(len(self.neo.getClientlist()), 1)
summary = getSummary()
self.assertTrue(summary.startswith('# {}\nRUNNING;'), summary)
self.assertNotEqual(summary, ok_empty)
def testStorageLostDuringRecovery(self): def testStorageLostDuringRecovery(self):
""" """
......
...@@ -325,7 +325,7 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -325,7 +325,7 @@ class MasterPartitionTableTests(NeoUnitTestBase):
pt.make(sn) pt.make(sn)
pt.log() pt.log()
for i, s in enumerate(sn, sn_count): for i, s in enumerate(sn, sn_count):
s.devpath = tuple(bin(i)[3:-1]) s.extra = {'devpath': tuple(bin(i)[3:-1])}
self.assertEqual(Counter(x[2] for x in self.tweak(pt)), { self.assertEqual(Counter(x[2] for x in self.tweak(pt)), {
CellStates.OUT_OF_DATE: 96, CellStates.OUT_OF_DATE: 96,
CellStates.FEEDING: 96, CellStates.FEEDING: 96,
...@@ -360,12 +360,12 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -360,12 +360,12 @@ class MasterPartitionTableTests(NeoUnitTestBase):
assert len(topo) <= sn_count assert len(topo) <= sn_count
sn2 = sn[:len(topo)] sn2 = sn[:len(topo)]
for s in sn2: for s in sn2:
s.devpath = () s.extra = {}
k = (1,7)[even] k = (1,7)[even]
pt = PartitionTable(np*k, i) pt = PartitionTable(np*k, i)
pt.make(sn2) pt.make(sn2)
for devpath, s in zip(topo, sn2): for devpath, s in zip(topo, sn2):
s.devpath = tuple(devpath) s.extra = {'devpath': tuple(devpath)}
if type(expected) is tuple: if type(expected) is tuple:
self.assertTrue(self.tweak(pt)) self.assertTrue(self.tweak(pt))
self.update(pt) self.update(pt)
......
...@@ -16,6 +16,7 @@ AnswerInformationLocked(p64) ...@@ -16,6 +16,7 @@ AnswerInformationLocked(p64)
AnswerLastIDs(?p64,?p64) AnswerLastIDs(?p64,?p64)
AnswerLastTransaction(p64) AnswerLastTransaction(p64)
AnswerLockedTransactions({p64:?p64}) AnswerLockedTransactions({p64:?p64})
AnswerMonitorInformation([?bin],[?bin],bin)
AnswerNewOIDs([p64]) AnswerNewOIDs([p64])
AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)]) AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)])
AnswerObject(p64,p64,?p64,?int,bin,bin,?p64) AnswerObject(p64,p64,?p64,?int,bin,bin,?p64)
...@@ -50,6 +51,7 @@ AskLastIDs() ...@@ -50,6 +51,7 @@ AskLastIDs()
AskLastTransaction() AskLastTransaction()
AskLockInformation(p64,p64) AskLockInformation(p64,p64)
AskLockedTransactions() AskLockedTransactions()
AskMonitorInformation()
AskNewOIDs(int) AskNewOIDs(int)
AskNodeList(NodeTypes) AskNodeList(NodeTypes)
AskObject(p64,?p64,?p64) AskObject(p64,?p64,?p64)
...@@ -77,6 +79,7 @@ InvalidateObjects(p64,[p64]) ...@@ -77,6 +79,7 @@ InvalidateObjects(p64,[p64])
NotPrimaryMaster(?int,[(bin,int)]) NotPrimaryMaster(?int,[(bin,int)])
NotifyClusterInformation(ClusterStates) NotifyClusterInformation(ClusterStates)
NotifyDeadlock(p64,p64) NotifyDeadlock(p64,p64)
NotifyMonitorInformation({bin:any})
NotifyNodeInformation(float,[(NodeTypes,?(bin,int),?int,NodeStates,?float)]) NotifyNodeInformation(float,[(NodeTypes,?(bin,int),?int,NodeStates,?float)])
NotifyPartitionChanges(int,int,[(int,int,CellStates)]) NotifyPartitionChanges(int,int,[(int,int,CellStates)])
NotifyPartitionCorrupted(int,[int]) NotifyPartitionCorrupted(int,[int])
...@@ -85,11 +88,12 @@ NotifyRepair(bool) ...@@ -85,11 +88,12 @@ NotifyRepair(bool)
NotifyReplicationDone(int,p64) NotifyReplicationDone(int,p64)
NotifyTransactionFinished(p64,p64) NotifyTransactionFinished(p64,p64)
NotifyUnlockInformation(p64) NotifyUnlockInformation(p64)
NotifyUpstreamAdmin((bin,int))
Ping() Ping()
Pong() Pong()
Repair([int],bool) Repair([int],bool)
Replicate(p64,bin,{int:?(bin,int)}) Replicate(p64,bin,{int:?(bin,int)})
RequestIdentification(NodeTypes,?int,?(bin,int),bin,?float,any,[int]) RequestIdentification(NodeTypes,?int,?(bin,int),bin,?float,{bin:any})
SendPartitionTable(?int,int,[[(int,CellStates)]]) SendPartitionTable(?int,int,[[(int,CellStates)]])
SetClusterState(ClusterStates) SetClusterState(ClusterStates)
SetNodeState(int,NodeStates) SetNodeState(int,NodeStates)
......
...@@ -20,6 +20,7 @@ import os, random, select, socket, sys, tempfile ...@@ -20,6 +20,7 @@ import os, random, select, socket, sys, tempfile
import thread, threading, time, traceback, weakref import thread, threading, time, traceback, weakref
from collections import deque from collections import deque
from contextlib import contextmanager from contextlib import contextmanager
from email import message_from_string
from itertools import count from itertools import count
from functools import partial, wraps from functools import partial, wraps
from zlib import decompress from zlib import decompress
...@@ -301,6 +302,14 @@ class TestSerialized(Serialized): ...@@ -301,6 +302,14 @@ class TestSerialized(Serialized):
return self._epoll.poll(timeout) return self._epoll.poll(timeout)
class FakeSMTP(list):
close = connect = lambda *_: None
def sendmail(self, *args):
self.append(args)
class Node(object): class Node(object):
def getConnectionList(self, *peers): def getConnectionList(self, *peers):
...@@ -421,7 +430,11 @@ class ServerNode(Node): ...@@ -421,7 +430,11 @@ class ServerNode(Node):
self.em.wakeup(thread.exit) self.em.wakeup(thread.exit)
class AdminApplication(ServerNode, neo.admin.app.Application): class AdminApplication(ServerNode, neo.admin.app.Application):
pass
def __setattr__(self, name, value):
if name == 'smtp':
value = FakeSMTP()
super(AdminApplication, self).__setattr__(name, value)
class MasterApplication(ServerNode, neo.master.app.Application): class MasterApplication(ServerNode, neo.master.app.Application):
pass pass
...@@ -691,6 +704,9 @@ class NEOCluster(object): ...@@ -691,6 +704,9 @@ class NEOCluster(object):
self._resource_dict[result] = self self._resource_dict[result] = self
return result[1] return result[1]
def _allocateName(self, _new=lambda: random.randint(0, 100)):
return 'neo_%s' % self._allocate('name', _new)
@staticmethod @staticmethod
def _patch(): def _patch():
cls = NEOCluster cls = NEOCluster
...@@ -717,10 +733,10 @@ class NEOCluster(object): ...@@ -717,10 +733,10 @@ class NEOCluster(object):
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None, def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'), adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True, storage_count=None, db_list=None, clear_databases=True,
compress=True, compress=True, backup_count=0,
importer=None, autostart=None, dedup=False, name=None): importer=None, autostart=None, dedup=False, name=None):
self.name = name or 'neo_%s' % self._allocate('name', self.name = name or self._allocateName()
lambda: random.randint(0, 100)) self.backup_list = [self._allocateName() for x in xrange(backup_count)]
self.compress = compress self.compress = compress
self.num_partitions = partitions self.num_partitions = partitions
master_list = [MasterApplication.newAddress() master_list = [MasterApplication.newAddress()
...@@ -759,6 +775,9 @@ class NEOCluster(object): ...@@ -759,6 +775,9 @@ class NEOCluster(object):
kw['wait'] = 0 kw['wait'] = 0
self.storage_list = [StorageApplication(database=db(x), **kw) self.storage_list = [StorageApplication(database=db(x), **kw)
for x in db_list] for x in db_list]
kw['monitor_email'] = self.name,
if backup_count:
kw['monitor_backup'] = self.backup_list
self.admin_list = [AdminApplication(**kw)] self.admin_list = [AdminApplication(**kw)]
def __repr__(self): def __repr__(self):
...@@ -1133,6 +1152,23 @@ class NEOThreadedTest(NeoTestBase): ...@@ -1133,6 +1152,23 @@ class NEOThreadedTest(NeoTestBase):
ob._p_activate() ob._p_activate()
ob._p_jar.readCurrent(ob) ob._p_jar.readCurrent(ob)
def assertNoMonitorInformation(self, cluster):
self.assertFalse(cluster.admin.smtp)
def assertMonitor(self, cluster, severity, summary, *backups):
msg = message_from_string(cluster.admin.smtp.pop(0)[2])
self.assertIn(('OK', 'WARNING', 'PROBLEM')[severity], msg['subject'])
msg = msg.get_payload().splitlines()
def assertStartsWith(a, b):
self.assertTrue(a.startswith(b), (a, b))
assertStartsWith(msg.pop(0), summary)
expected = {k.name: v for k, v in backups}
while msg:
self.assertFalse(msg.pop(0))
x = expected.pop(msg.pop(0))
assertStartsWith(msg.pop(0), ' %s' % x)
self.assertFalse(expected)
class ThreadId(list): class ThreadId(list):
......
# -*- coding: utf-8 -*-
# #
# Copyright (C) 2012-2019 Nexedi SA # Copyright (C) 2012-2019 Nexedi SA
# #
...@@ -41,10 +42,14 @@ from .test import PCounter, PCounterWithResolution # XXX ...@@ -41,10 +42,14 @@ from .test import PCounter, PCounterWithResolution # XXX
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped): def decorator(wrapped):
def wrapper(self): def wrapper(self):
with NEOCluster(partitions=partitions, **upstream_kw) as upstream: with NEOCluster(partitions=partitions, backup_count=1,
**upstream_kw) as upstream:
upstream.start() upstream.start()
name, = upstream.backup_list
with NEOCluster(partitions=partitions, upstream=upstream, with NEOCluster(partitions=partitions, upstream=upstream,
**backup_kw) as backup: name=name, **backup_kw) as backup:
self.assertMonitor(upstream, 2, 'RECOVERING',
(backup, None))
backup.start() backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic() self.tic()
...@@ -321,6 +326,10 @@ class ReplicationTests(NEOThreadedTest): ...@@ -321,6 +326,10 @@ class ReplicationTests(NEOThreadedTest):
delay = f.delayNotifyUnlockInformation() delay = f.delayNotifyUnlockInformation()
t1.commit() t1.commit()
self.tic() self.tic()
warning, problem, msg = upstream.neoctl.getMonitorInformation()
self.assertEqual(warning, (backup.name,))
self.assertFalse(problem)
self.assertTrue(msg.endswith('lag=ε'), msg)
def storeObject(orig, *args, **kw): def storeObject(orig, *args, **kw):
p.revert() p.revert()
f.remove(delay) f.remove(delay)
...@@ -331,6 +340,10 @@ class ReplicationTests(NEOThreadedTest): ...@@ -331,6 +340,10 @@ class ReplicationTests(NEOThreadedTest):
t1.begin() t1.begin()
self.assertEqual(5, ob.value) self.assertEqual(5, ob.value)
self.assertEqual(1, self.checkBackup(backup)) self.assertEqual(1, self.checkBackup(backup))
warning, problem, msg = upstream.neoctl.getMonitorInformation()
self.assertFalse(warning)
self.assertFalse(problem)
self.assertTrue(msg.endswith('lag=0.0'), msg)
@with_cluster() @with_cluster()
def testBackupEarlyInvalidation(self, upstream): def testBackupEarlyInvalidation(self, upstream):
...@@ -761,6 +774,22 @@ class ReplicationTests(NEOThreadedTest): ...@@ -761,6 +774,22 @@ class ReplicationTests(NEOThreadedTest):
@backup_test(2, backup_kw=dict(replicas=1)) @backup_test(2, backup_kw=dict(replicas=1))
def testResumingBackupReplication(self, backup): def testResumingBackupReplication(self, backup):
upstream = backup.upstream upstream = backup.upstream
for monitor in 'RECOVERING', 'VERIFYING', 'RUNNING':
monitor += '; UP_TO_DATE=2'
self.assertMonitor(upstream, 2, monitor, (backup, None))
self.assertMonitor(upstream, 0, monitor,
(backup, 'BACKINGUP; UP_TO_DATE=4;'))
def checkMonitor():
self.assertMonitor(upstream, 2, monitor,
(backup, 'BACKINGUP; OUT_OF_DATE=2, UP_TO_DATE=2; DOWN=1;'))
self.assertNoMonitorInformation(upstream)
warning, problem, _ = upstream.neoctl.getMonitorInformation()
self.assertFalse(warning)
self.assertEqual(problem, (backup.name,))
warning, problem, _ = backup.neoctl.getMonitorInformation()
self.assertFalse(warning)
self.assertEqual(problem, (None,))
t, c = upstream.getTransaction() t, c = upstream.getTransaction()
r = c.root() r = c.root()
r[1] = PCounter() r[1] = PCounter()
...@@ -789,11 +818,18 @@ class ReplicationTests(NEOThreadedTest): ...@@ -789,11 +818,18 @@ class ReplicationTests(NEOThreadedTest):
return x.pop(conn.getUUID(), 1) return x.pop(conn.getUUID(), 1)
newTransaction() newTransaction()
self.assertEqual(getBackupTid(), tids[1]) self.assertEqual(getBackupTid(), tids[1])
self.assertNoMonitorInformation(upstream)
primary.stop() primary.stop()
backup.join((primary,)) backup.join((primary,))
primary.resetNode() primary.resetNode()
checkMonitor()
primary.start() primary.start()
self.tic() self.tic()
self.assertMonitor(upstream, 1, monitor,
(backup, 'BACKINGUP; OUT_OF_DATE=2, UP_TO_DATE=2; ltid='))
warning, problem, _ = backup.neoctl.getMonitorInformation()
self.assertEqual(warning, (None,))
self.assertFalse(problem)
primary, slave = slave, primary primary, slave = slave, primary
self.assertEqual(tids, getTIDList(slave)) self.assertEqual(tids, getTIDList(slave))
self.assertEqual(tids[:1], getTIDList(primary)) self.assertEqual(tids[:1], getTIDList(primary))
...@@ -803,6 +839,11 @@ class ReplicationTests(NEOThreadedTest): ...@@ -803,6 +839,11 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(4, self.checkBackup(backup)) self.assertEqual(4, self.checkBackup(backup))
self.assertEqual(getBackupTid(min), tids[1]) self.assertEqual(getBackupTid(min), tids[1])
self.assertMonitor(upstream, 1, monitor,
(backup, 'BACKINGUP; OUT_OF_DATE=1, UP_TO_DATE=3; ltid='))
self.assertMonitor(upstream, 0, monitor,
(backup, 'BACKINGUP; UP_TO_DATE=4;'))
# Check that replication resumes from the maximum possible tid # Check that replication resumes from the maximum possible tid
# (for UP_TO_DATE cells of a backup cluster). More precisely: # (for UP_TO_DATE cells of a backup cluster). More precisely:
# - cells are handled independently (done here by blocking replication # - cells are handled independently (done here by blocking replication
...@@ -811,6 +852,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -811,6 +852,7 @@ class ReplicationTests(NEOThreadedTest):
# we interrupt replication of obj in the middle of a transaction) # we interrupt replication of obj in the middle of a transaction)
slave.stop() slave.stop()
backup.join((slave,)) backup.join((slave,))
checkMonitor()
ask = [] ask = []
def delayReplicate(conn, packet): def delayReplicate(conn, packet):
if isinstance(packet, Packets.AskFetchObjects): if isinstance(packet, Packets.AskFetchObjects):
...@@ -820,16 +862,28 @@ class ReplicationTests(NEOThreadedTest): ...@@ -820,16 +862,28 @@ class ReplicationTests(NEOThreadedTest):
return return
ask.append(packet._args) ask.append(packet._args)
conn, = upstream.master.getConnectionList(backup.master) conn, = upstream.master.getConnectionList(backup.master)
admins = upstream.admin, backup.admin
with ConnectionFilter() as f, Patch(replicator.Replicator, with ConnectionFilter() as f, Patch(replicator.Replicator,
_nextPartitionSortKey=lambda orig, self, offset: offset): _nextPartitionSortKey=lambda orig, self, offset: offset):
f.add(delayReplicate) f.add(delayReplicate)
delayReconnect = f.delayAskLastTransaction() delayReconnect = f.delayAskLastTransaction(lambda conn:
self.getConnectionApp(conn) not in admins)
# Without the following delay, the upstream admin may be notified
# that the backup is back in BACKINGUP state before getting the
# last tid (from the upstream master); note that in such case,
# we would have 2 consecutive identical notifications.
delayMonitor = f.delayNotifyMonitorInformation(
lambda _, x=iter((0,)): next(x, 1))
conn.close() conn.close()
newTransaction() newTransaction()
self.assertMonitor(upstream, 2, monitor, (backup,
'STARTING_BACKUP; OUT_OF_DATE=2, UP_TO_DATE=2; DOWN=1'))
f.remove(delayMonitor)
newTransaction() newTransaction()
checkMonitor()
newTransaction() newTransaction()
self.assertFalse(ask) self.assertFalse(ask)
self.assertEqual(f.filtered_count, 1) self.assertEqual(f.filtered_count, 2)
with Patch(replicator, FETCH_COUNT=1): with Patch(replicator, FETCH_COUNT=1):
f.remove(delayReconnect) f.remove(delayReconnect)
self.tic() self.tic()
...@@ -859,6 +913,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -859,6 +913,7 @@ class ReplicationTests(NEOThreadedTest):
]) ])
self.tic() self.tic()
self.assertEqual(2, self.checkBackup(backup)) self.assertEqual(2, self.checkBackup(backup))
checkMonitor()
@with_cluster(start_cluster=0, replicas=1) @with_cluster(start_cluster=0, replicas=1)
def testStoppingDuringReplication(self, cluster): def testStoppingDuringReplication(self, cluster):
......
...@@ -17,7 +17,7 @@ from neo.lib.connector import SocketConnector ...@@ -17,7 +17,7 @@ from neo.lib.connector import SocketConnector
from neo.lib.debug import PdbSocket from neo.lib.debug import PdbSocket
from neo.lib.node import Node from neo.lib.node import Node
from neo.lib.protocol import NodeTypes from neo.lib.protocol import NodeTypes
from neo.lib.util import timeStringFromTID, p64, u64 from neo.lib.util import datetimeFromTID, p64, u64
from neo.storage.app import DATABASE_MANAGER_DICT, \ from neo.storage.app import DATABASE_MANAGER_DICT, \
Application as StorageApplication Application as StorageApplication
from neo.tests import getTempDirectory, mysql_pool from neo.tests import getTempDirectory, mysql_pool
...@@ -533,7 +533,7 @@ class Application(StressApplication): ...@@ -533,7 +533,7 @@ class Application(StressApplication):
ltid = self.ltid ltid = self.ltid
stdscr.addstr(y, 0, stdscr.addstr(y, 0,
'last oid: 0x%x\nlast tid: 0x%x (%s)\nclients: ' 'last oid: 0x%x\nlast tid: 0x%x (%s)\nclients: '
% (u64(self.loid), u64(ltid), timeStringFromTID(ltid))) % (u64(self.loid), u64(ltid), datetimeFromTID(ltid)))
before = after = 0 before = after = 0
for i, p in enumerate(self.cluster.process_dict[Client]): for i, p in enumerate(self.cluster.process_dict[Client]):
if i: if i:
...@@ -708,7 +708,7 @@ def main(): ...@@ -708,7 +708,7 @@ def main():
ok = tid ok = tid
finally: finally:
conn.close() conn.close()
print('bad: 0x%x (%s)' % (u64(bad), timeStringFromTID(bad))) print('bad: 0x%x (%s)' % (u64(bad), datetimeFromTID(bad)))
finally: finally:
db.close() db.close()
finally: finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment