Commit e434c253 authored by Julien Muchembled's avatar Julien Muchembled

New feature: monitoring

This task is done by the admin node, in 2 possible ways:
- email notifications, as soon as some state change;
- new 'neoctl print summary' command that can be used periodically
  to check the health of the database.
They report the same information.

About backup clusters:

The admin of the main cluster also monitors selected backup clusters,
with the help of their admin nodes.

Internally, when a backup master node connects to the upstream master node,
it receives the address of the upstream admin node and forwards it to its
admin node, which is therefore able to connect to the upstream admin node.
So the 2 admin nodes remain connected and communicate in 2 ways:
- the backup node notifies upstream about the health of the backup cluster;
- the upstream node queries the backup node periodically to check whether
  replication is not too late.

TODO:

A few things are hard-coded and we may want to configure them:
- backup lateness is checked every 10 min;
- backup is expected to never be late.

There's also no delay to prevent 2 consecutive emails from having the same
Date: (unfortunately, the RFC 5322 does not allow sub-second precision),
in which case the MUA can display them in random order. This is mostly
confusing when one notification is OK and the other is not, because one
may wonder if there's a new problem.
parent 82c142c4
This diff is collapsed.
......@@ -14,19 +14,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging, protocol
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.protocol import uuid_str, \
NodeTypes, NotReadyError, Packets, ProtocolError
from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure
NOT_CONNECTED_MESSAGE = 'Not connected to a primary master.'
def AdminEventHandlerType(name, bases, d):
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.master_conn is not None:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def check_connection(func):
return lambda self, conn, *args, **kw: \
self._checkConnection(conn) and func(self, conn, *args, **kw)
def forward_ask(klass):
return lambda self, conn, *args: self.app.master_conn.ask(
......@@ -47,7 +47,7 @@ def AdminEventHandlerType(name, bases, d):
Packets.TweakPartitionTable,
):
d[x.handler_method_name] = forward_ask(x)
return type(name, bases, {k: v if k[0] == '_' else check_primary_master(v)
return type(name, bases, {k: v if k[0] == '_' else check_connection(v)
for k, v in d.iteritems()})
class AdminEventHandler(EventHandler):
......@@ -55,6 +55,26 @@ class AdminEventHandler(EventHandler):
__metaclass__ = AdminEventHandlerType
def _checkConnection(self, conn):
if self.app.master_conn is None:
raise NotReadyError(NOT_CONNECTED_MESSAGE)
return True
def requestIdentification(self, conn, node_type, uuid, address, name, *_):
if node_type != NodeTypes.ADMIN:
raise ProtocolError("reject non-admin node")
app = self.app
try:
backup = app.backup_dict[name]
except KeyError:
raise ProtocolError("unknown backup cluster %r" % name)
if backup.conn is not None:
raise ProtocolError("already connected")
backup.conn = conn
conn.setHandler(app.backup_handler)
conn.answer(Packets.AcceptIdentification(
NodeTypes.ADMIN, None, None))
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid))
......@@ -83,6 +103,9 @@ class AdminEventHandler(EventHandler):
self.app.master_conn.send(Packets.FlushLog())
super(AdminEventHandler, self).flushLog(conn)
def askMonitorInformation(self, conn):
self.app.askMonitorInformation(conn)
class MasterEventHandler(EventHandler):
""" This class is just used to dispatch message to right handler"""
......@@ -104,13 +127,93 @@ class MasterEventHandler(EventHandler):
forward.send(packet, kw['msg_id'])
def answerClusterState(self, conn, state):
self.app.cluster_state = state
self.app.updateMonitorInformation(None, cluster_state=state)
notifyClusterInformation = answerClusterState
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
app = self.app
app.pt = object.__new__(PartitionTable)
app.pt.load(ptid, num_replicas, row_list, app.nm)
app.partitionTableUpdated()
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
app = self.app
app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.partitionTableUpdated()
def notifyNodeInformation(self, *args):
super(MasterEventHandler, self).notifyNodeInformation(*args)
self.app.partitionTableUpdated()
def notifyUpstreamAdmin(self, conn, addr):
app = self.app
node = app.upstream_admin
if node is None:
node = app.upstream_admin = app.nm.createAdmin()
elif node.getAddress() == addr:
return
node.setAddress(addr)
if app.upstream_admin_conn:
app.upstream_admin_conn.close()
else:
app.connectToUpstreamAdmin()
def answerLastTransaction(self, conn, ltid):
app = self.app
app.ltid = ltid
app.maybeNotify(None)
def answerRecovery(self, name, ptid, backup_tid, truncate_tid):
self.app.backup_tid = backup_tid
def monitor(func):
def wrapper(self, conn, *args, **kw):
for name, backup in self.app.backup_dict.iteritems():
if backup.conn is conn:
return func(self, name, *args, **kw)
raise AssertionError
return wrapper
class BackupHandler(EventHandler):
@monitor
def connectionClosed(self, name):
app = self.app
app.backup_dict[name] = app.backup_dict[name].__class__()
app.maybeNotify(name)
@monitor
def notifyMonitorInformation(self, name, info):
self.app.updateMonitorInformation(name, **info)
@monitor
def answerRecovery(self, name, ptid, backup_tid, truncate_tid):
self.app.backup_dict[name].backup_tid = backup_tid
@monitor
def answerLastTransaction(self, name, ltid):
app = self.app
app.backup_dict[name].ltid = ltid
app.maybeNotify(name)
class UpstreamAdminHandler(AdminEventHandler):
def _checkConnection(self, conn):
assert conn is self.app.upstream_admin_conn
return super(UpstreamAdminHandler, self)._checkConnection(conn)
def connectionClosed(self, conn):
app = self.app
if conn is app.upstream_admin_conn:
app.connectToUpstreamAdmin()
connectionFailed = connectionClosed
def _acceptIdentification(self, node):
node.send(Packets.NotifyMonitorInformation({
'cluster_state': self.app.cluster_state,
'down': self.app.down,
'pt_summary': self.app.pt_summary,
}))
......@@ -18,6 +18,15 @@ import argparse, os, sys
from functools import wraps
from ConfigParser import SafeConfigParser
class _DefaultList(list):
"""
Special list type for default values of 'append' argparse actions,
so that the parser restarts from an empty list when the option is
used on the command-line.
"""
def __copy__(self):
return []
class _Required(object):
......@@ -30,6 +39,8 @@ class _Required(object):
class _Option(object):
multiple = False
def __init__(self, *args, **kw):
if len(args) > 1:
self.short, self.name = args
......@@ -51,7 +62,12 @@ class _Option(object):
action.required = _Required(option_list, self.name)
def fromConfigFile(self, cfg, section):
return self(cfg.get(section, self.name.replace('-', '_')))
value = cfg.get(section, self.name.replace('-', '_'))
if self.multiple:
return [self(value)
for value in value.splitlines()
if value]
return self(value)
@staticmethod
def parse(value):
......@@ -81,6 +97,11 @@ class Option(_Option):
kw[x] = getattr(self, x)
except AttributeError:
pass
if self.multiple:
kw['action'] = 'append'
default = kw.get('default')
if default:
kw['default'] = _DefaultList(default)
return kw
@staticmethod
......@@ -132,9 +153,6 @@ class OptionGroup(object):
class Argument(Option):
def __init__(self, name, **kw):
super(Argument, self).__init__(name, **kw)
def _asArgparse(self, parser, option_list):
kw = {'help': self.help, 'type': self}
for x in 'default', 'metavar', 'nargs', 'choices':
......
......@@ -826,6 +826,18 @@ class Packets(dict):
:nodes: ctl -> A -> M -> *
""")
AskMonitorInformation, AnswerMonitorInformation = request("""
:nodes: ctl -> A
""")
NotifyMonitorInformation = notify("""
:nodes: A -> A
""")
NotifyUpstreamAdmin = notify("""
:nodes: M -> A
""")
del notify, request
......
......@@ -39,7 +39,8 @@ nextafter()
TID_LOW_OVERFLOW = 2**32
TID_LOW_MAX = TID_LOW_OVERFLOW - 1
SECOND_PER_TID_LOW = 60.0 / TID_LOW_OVERFLOW
SECOND_FROM_UINT32 = 60. / TID_LOW_OVERFLOW
MICRO_FROM_UINT32 = 1e6 / TID_LOW_OVERFLOW
TID_CHUNK_RULES = (
(-1900, 0),
(-1, 12),
......@@ -52,7 +53,7 @@ def tidFromTime(tm):
gmt = gmtime(tm)
return packTID(
(gmt.tm_year, gmt.tm_mon, gmt.tm_mday, gmt.tm_hour, gmt.tm_min),
int((gmt.tm_sec + (tm - int(tm))) / SECOND_PER_TID_LOW))
int((gmt.tm_sec + (tm - int(tm))) / SECOND_FROM_UINT32))
def packTID(higher, lower):
"""
......@@ -95,15 +96,10 @@ def unpackTID(ptid):
higher.reverse()
return (tuple(higher), lower)
def timeStringFromTID(ptid):
"""
Return a string in the format "yyyy-mm-dd hh:mm:ss.ssssss" from a TID
"""
higher, lower = unpackTID(ptid)
seconds = lower * SECOND_PER_TID_LOW
return '%04d-%02d-%02d %02d:%02d:%09.6f' % (higher[0], higher[1], higher[2],
higher[3], higher[4], seconds)
def datetimeFromTID(tid):
higher, lower = unpackTID(tid)
seconds, lower = divmod(lower * 60, TID_LOW_OVERFLOW)
return datetime(*(higher + (seconds, int(lower * MICRO_FROM_UINT32))))
def addTID(ptid, offset):
"""
......
......@@ -182,12 +182,15 @@ class Application(BaseApplication):
self.playPrimaryRole()
self.playSecondaryRole()
def getNodeInformationDict(self, node_list):
def getNodeInformationGetter(self, node_list):
node_dict = defaultdict(list)
admin_dict = defaultdict(list)
# group modified nodes by destination node type
for node in node_list:
node_info = node.asTuple()
if node.isAdmin():
for backup in node.extra.get('backup', ()):
admin_dict[backup].append(node_info)
continue
node_dict[NodeTypes.ADMIN].append(node_info)
node_dict[NodeTypes.STORAGE].append(node_info)
......@@ -197,18 +200,27 @@ class Application(BaseApplication):
if node.isStorage():
continue
node_dict[NodeTypes.MASTER].append(node_info)
return node_dict
def getNodeListFor(node):
node_list = node_dict.get(node.getType())
if node.isClient():
admin_list = admin_dict.get(node.extra.get('backup'))
if admin_list:
if node_list:
return node_list + admin_list
return admin_list
return node_list
return getNodeListFor
def broadcastNodesInformation(self, node_list):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
"""
node_dict = self.getNodeInformationDict(node_list)
getNodeListFor = self.getNodeInformationGetter(node_list)
now = monotonic_time()
# send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType())
node_list = getNodeListFor(node)
# We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters.
......
......@@ -99,7 +99,8 @@ class BackupApplication(object):
pt = app.pt
while True:
app.changeClusterState(ClusterStates.STARTING_BACKUP)
bootstrap = BootstrapManager(self, NodeTypes.CLIENT)
bootstrap = BootstrapManager(self, NodeTypes.CLIENT,
backup=app.name)
# {offset -> node}
self.primary_partition_dict = {}
# [[tid]]
......@@ -367,3 +368,9 @@ class BackupApplication(object):
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p)
def notifyUpstreamAdmin(self, addr):
node_list = self.app.nm.getAdminList(only_identified=True)
if node_list:
min(node_list, key=lambda node: node.getUUID()).send(
Packets.NotifyUpstreamAdmin(addr))
......@@ -52,7 +52,7 @@ class MasterHandler(EventHandler):
node_list = app.nm.getList()
node_list.remove(node)
node_list = ([node.asTuple()] # for id_timestamp
+ app.getNodeInformationDict(node_list)[node.getType()])
+ app.getNodeInformationGetter(node_list)(node))
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def handlerSwitched(self, conn, new):
......
......@@ -58,6 +58,12 @@ class AdministrationHandler(MasterHandler):
def handlerSwitched(self, conn, new):
assert new
super(AdministrationHandler, self).handlerSwitched(conn, new)
app = self.app.backup_app
if app is not None:
for node in app.nm.getAdminList():
if node.isRunning():
app.notifyUpstreamAdmin(node.getAddress())
break
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
......
......@@ -16,7 +16,7 @@
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.protocol import ZERO_TID
from neo.lib.protocol import NodeTypes, NodeStates, Packets, ZERO_TID
from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler):
......@@ -36,6 +36,13 @@ class BackupHandler(EventHandler):
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(BackupHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, addr, _, state, _ in node_list:
if node_type == NodeTypes.ADMIN and state == NodeStates.RUNNING:
self.app.notifyUpstreamAdmin(addr)
def answerLastTransaction(self, conn, tid):
app = self.app
prev_tid = app.app.getLastTransaction()
......
......@@ -14,11 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import json, sys
from .neoctl import NeoCTL, NotReadyException
from neo.lib.node import NodeManager
from neo.lib.pt import PartitionTable
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.util import p64, u64, datetimeFromTID, tidFromTime
from neo.lib.protocol import uuid_str, formatNodeList, \
ClusterStates, NodeStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
......@@ -29,6 +29,7 @@ action_dict = {
'node': 'getNodeList',
'cluster': 'getClusterState',
'primary': 'getPrimary',
'summary': 'getSummary',
},
'set': {
'cluster': 'setClusterState',
......@@ -100,12 +101,12 @@ class TerminalNeoCTL(object):
if backup_tid:
ltid = self.neoctl.getLastTransaction()
r = "backup_tid = 0x%x (%s)" % (u64(backup_tid),
timeStringFromTID(backup_tid))
datetimeFromTID(backup_tid))
else:
loid, ltid = self.neoctl.getLastIds()
r = "last_oid = 0x%x" % (u64(loid))
return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \
(u64(ltid), timeStringFromTID(ltid), ptid)
(u64(ltid), datetimeFromTID(ltid), ptid)
def getPartitionRowList(self, params):
"""
......@@ -159,6 +160,21 @@ class TerminalNeoCTL(object):
assert len(params) == 1
return self.neoctl.setClusterState(self.asClusterState(params[0]))
def getSummary(self, params):
"""
Get a summary of the health of this cluster and backups.
The first line reports severities: it is a commented json dump of
{severity: [backup_name | null]}
where severity is either "warning" or "problem"
and null refers to this cluster
"""
assert len(params) == 0
warning, problem, summary = self.neoctl.getMonitorInformation()
return "# %s\n%s" % (json.dumps({k: v for k, v in zip(
('warning', 'problem'),
(warning, problem),
) if v}), summary)
def setNumReplicas(self, params):
"""
Set number of replicas.
......
......@@ -64,3 +64,4 @@ class CommandEventHandler(EventHandler):
answerLastTransaction = __answer(Packets.AnswerLastTransaction)
answerRecovery = __answer(Packets.AnswerRecovery)
answerTweakPartitionTable = __answer(Packets.AnswerTweakPartitionTable)
answerMonitorInformation = __answer(Packets.AnswerMonitorInformation)
......@@ -216,3 +216,9 @@ class NeoCTL(BaseApplication):
conn.send(Packets.FlushLog())
while conn.pending():
self.em.poll(1)
def getMonitorInformation(self):
response = self.__ask(Packets.AskMonitorInformation())
if response[0] != Packets.AnswerMonitorInformation:
raise RuntimeError(response)
return response[1:]
......@@ -14,12 +14,21 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from functools import partial
import unittest
import transaction
from neo.lib.protocol import NodeStates
from neo.neoctl.app import TerminalNeoCTL
from . import NEOCluster, NEOFunctionalTest
class TerminalNeoCTL(TerminalNeoCTL):
def __init__(self, cluster):
self.neoctl = cluster.neoctl
def __del__(self):
pass
class ClusterTests(NEOFunctionalTest):
def _tearDown(self, success):
......@@ -118,12 +127,20 @@ class ClusterTests(NEOFunctionalTest):
self.neo.start()
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(0)
# check neoctl cli
getSummary = partial(TerminalNeoCTL(self.neo).getSummary, ())
ok_empty = '# {}\nRUNNING;' \
' UP_TO_DATE=1; ltid=0000000000000000 (1900-01-01 00:00:00)'
self.assertEqual(getSummary(), ok_empty)
# connect a client a check it's known
db, conn = self.neo.getZODBConnection()
self.assertEqual(len(self.neo.getClientlist()), 1)
# drop the storage, the cluster is no more operational...
self.neo.getStorageProcessList()[0].stop()
self.neo.expectClusterRecovering()
# check severity returned by the cli
self.assertEqual(getSummary(),
'# {"problem": [null]}\nRECOVERING; UP_TO_DATE=1; DOWN=1')
# ...and the client gets disconnected
self.assertEqual(len(self.neo.getClientlist()), 0)
# restart storage so that the cluster is operational again
......@@ -134,6 +151,9 @@ class ClusterTests(NEOFunctionalTest):
conn.root()['plop'] = 1
transaction.commit()
self.assertEqual(len(self.neo.getClientlist()), 1)
summary = getSummary()
self.assertTrue(summary.startswith('# {}\nRUNNING;'), summary)
self.assertNotEqual(summary, ok_empty)
def testStorageLostDuringRecovery(self):
"""
......
......@@ -16,6 +16,7 @@ AnswerInformationLocked(p64)
AnswerLastIDs(?p64,?p64)
AnswerLastTransaction(p64)
AnswerLockedTransactions({p64:?p64})
AnswerMonitorInformation([?bin],[?bin],bin)
AnswerNewOIDs([p64])
AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)])
AnswerObject(p64,p64,?p64,?int,bin,bin,?p64)
......@@ -50,6 +51,7 @@ AskLastIDs()
AskLastTransaction()
AskLockInformation(p64,p64)
AskLockedTransactions()
AskMonitorInformation()
AskNewOIDs(int)
AskNodeList(NodeTypes)
AskObject(p64,?p64,?p64)
......@@ -77,6 +79,7 @@ InvalidateObjects(p64,[p64])
NotPrimaryMaster(?int,[(bin,int)])
NotifyClusterInformation(ClusterStates)
NotifyDeadlock(p64,p64)
NotifyMonitorInformation({bin:any})
NotifyNodeInformation(float,[(NodeTypes,?(bin,int),?int,NodeStates,?float)])
NotifyPartitionChanges(int,int,[(int,int,CellStates)])
NotifyPartitionCorrupted(int,[int])
......@@ -85,6 +88,7 @@ NotifyRepair(bool)
NotifyReplicationDone(int,p64)
NotifyTransactionFinished(p64,p64)
NotifyUnlockInformation(p64)
NotifyUpstreamAdmin((bin,int))
Ping()
Pong()
Repair([int],bool)
......
......@@ -20,6 +20,7 @@ import os, random, select, socket, sys, tempfile
import thread, threading, time, traceback, weakref
from collections import deque
from contextlib import contextmanager
from email import message_from_string
from itertools import count
from functools import partial, wraps
from zlib import decompress
......@@ -301,6 +302,14 @@ class TestSerialized(Serialized):
return self._epoll.poll(timeout)
class FakeSMTP(list):
close = connect = lambda *_: None
def sendmail(self, *args):
self.append(args)
class Node(object):
def getConnectionList(self, *peers):
......@@ -421,7 +430,11 @@ class ServerNode(Node):
self.em.wakeup(thread.exit)
class AdminApplication(ServerNode, neo.admin.app.Application):
pass
def __setattr__(self, name, value):
if name == 'smtp':
value = FakeSMTP()
super(AdminApplication, self).__setattr__(name, value)
class MasterApplication(ServerNode, neo.master.app.Application):
pass
......@@ -691,6 +704,9 @@ class NEOCluster(object):
self._resource_dict[result] = self
return result[1]
def _allocateName(self, _new=lambda: random.randint(0, 100)):
return 'neo_%s' % self._allocate('name', _new)
@staticmethod
def _patch():
cls = NEOCluster
......@@ -717,10 +733,10 @@ class NEOCluster(object):
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True,
compress=True,
compress=True, backup_count=0,
importer=None, autostart=None, dedup=False, name=None):
self.name = name or 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100))
self.name = name or self._allocateName()
self.backup_list = [self._allocateName() for x in xrange(backup_count)]
self.compress = compress
self.num_partitions = partitions
master_list = [MasterApplication.newAddress()
......@@ -759,6 +775,9 @@ class NEOCluster(object):
kw['wait'] = 0
self.storage_list = [StorageApplication(database=db(x), **kw)
for x in db_list]
kw['monitor_email'] = self.name,
if backup_count:
kw['monitor_backup'] = self.backup_list
self.admin_list = [AdminApplication(**kw)]
def __repr__(self):
......@@ -1133,6 +1152,23 @@ class NEOThreadedTest(NeoTestBase):
ob._p_activate()
ob._p_jar.readCurrent(ob)
def assertNoMonitorInformation(self, cluster):
self.assertFalse(cluster.admin.smtp)
def assertMonitor(self, cluster, severity, summary, *backups):
msg = message_from_string(cluster.admin.smtp.pop(0)[2])
self.assertIn(('OK', 'WARNING', 'PROBLEM')[severity], msg['subject'])
msg = msg.get_payload().splitlines()
def assertStartsWith(a, b):
self.assertTrue(a.startswith(b), (a, b))
assertStartsWith(msg.pop(0), summary)
expected = {k.name: v for k, v in backups}
while msg:
self.assertFalse(msg.pop(0))
x = expected.pop(msg.pop(0))
assertStartsWith(msg.pop(0), ' %s' % x)
self.assertFalse(expected)
class ThreadId(list):
......
# -*- coding: utf-8 -*-
#
# Copyright (C) 2012-2019 Nexedi SA
#
......@@ -41,10 +42,14 @@ from .test import PCounter, PCounterWithResolution # XXX
def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped):
def wrapper(self):
with NEOCluster(partitions=partitions, **upstream_kw) as upstream:
with NEOCluster(partitions=partitions, backup_count=1,
**upstream_kw) as upstream:
upstream.start()
name, = upstream.backup_list
with NEOCluster(partitions=partitions, upstream=upstream,
**backup_kw) as backup:
name=name, **backup_kw) as backup:
self.assertMonitor(upstream, 2, 'RECOVERING',
(backup, None))
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
......@@ -321,6 +326,10 @@ class ReplicationTests(NEOThreadedTest):
delay = f.delayNotifyUnlockInformation()
t1.commit()
self.tic()
warning, problem, msg = upstream.neoctl.getMonitorInformation()
self.assertEqual(warning, (backup.name,))
self.assertFalse(problem)
self.assertTrue(msg.endswith('lag=ε'), msg)
def storeObject(orig, *args, **kw):
p.revert()
f.remove(delay)
......@@ -331,6 +340,10 @@ class ReplicationTests(NEOThreadedTest):
t1.begin()
self.assertEqual(5, ob.value)
self.assertEqual(1, self.checkBackup(backup))
warning, problem, msg = upstream.neoctl.getMonitorInformation()
self.assertFalse(warning)
self.assertFalse(problem)
self.assertTrue(msg.endswith('lag=0.0'), msg)
@with_cluster()
def testBackupEarlyInvalidation(self, upstream):
......@@ -761,6 +774,22 @@ class ReplicationTests(NEOThreadedTest):
@backup_test(2, backup_kw=dict(replicas=1))
def testResumingBackupReplication(self, backup):
upstream = backup.upstream
for monitor in 'RECOVERING', 'VERIFYING', 'RUNNING':
monitor += '; UP_TO_DATE=2'
self.assertMonitor(upstream, 2, monitor, (backup, None))
self.assertMonitor(upstream, 0, monitor,
(backup, 'BACKINGUP; UP_TO_DATE=4;'))
def checkMonitor():
self.assertMonitor(upstream, 2, monitor,
(backup, 'BACKINGUP; OUT_OF_DATE=2, UP_TO_DATE=2; DOWN=1;'))
self.assertNoMonitorInformation(upstream)
warning, problem, _ = upstream.neoctl.getMonitorInformation()
self.assertFalse(warning)
self.assertEqual(problem, (backup.name,))
warning, problem, _ = backup.neoctl.getMonitorInformation()
self.assertFalse(warning)
self.assertEqual(problem, (None,))
t, c = upstream.getTransaction()
r = c.root()
r[1] = PCounter()
......@@ -789,11 +818,18 @@ class ReplicationTests(NEOThreadedTest):
return x.pop(conn.getUUID(), 1)
newTransaction()
self.assertEqual(getBackupTid(), tids[1])
self.assertNoMonitorInformation(upstream)
primary.stop()
backup.join((primary,))
primary.resetNode()
checkMonitor()
primary.start()
self.tic()
self.assertMonitor(upstream, 1, monitor,
(backup, 'BACKINGUP; OUT_OF_DATE=2, UP_TO_DATE=2; ltid='))
warning, problem, _ = backup.neoctl.getMonitorInformation()
self.assertEqual(warning, (None,))
self.assertFalse(problem)
primary, slave = slave, primary
self.assertEqual(tids, getTIDList(slave))
self.assertEqual(tids[:1], getTIDList(primary))
......@@ -803,6 +839,11 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(4, self.checkBackup(backup))
self.assertEqual(getBackupTid(min), tids[1])
self.assertMonitor(upstream, 1, monitor,
(backup, 'BACKINGUP; OUT_OF_DATE=1, UP_TO_DATE=3; ltid='))
self.assertMonitor(upstream, 0, monitor,
(backup, 'BACKINGUP; UP_TO_DATE=4;'))
# Check that replication resumes from the maximum possible tid
# (for UP_TO_DATE cells of a backup cluster). More precisely:
# - cells are handled independently (done here by blocking replication
......@@ -811,6 +852,7 @@ class ReplicationTests(NEOThreadedTest):
# we interrupt replication of obj in the middle of a transaction)
slave.stop()
backup.join((slave,))
checkMonitor()
ask = []
def delayReplicate(conn, packet):
if isinstance(packet, Packets.AskFetchObjects):
......@@ -820,16 +862,28 @@ class ReplicationTests(NEOThreadedTest):
return
ask.append(packet._args)
conn, = upstream.master.getConnectionList(backup.master)
admins = upstream.admin, backup.admin
with ConnectionFilter() as f, Patch(replicator.Replicator,
_nextPartitionSortKey=lambda orig, self, offset: offset):
f.add(delayReplicate)
delayReconnect = f.delayAskLastTransaction()
delayReconnect = f.delayAskLastTransaction(lambda conn:
self.getConnectionApp(conn) not in admins)
# Without the following delay, the upstream admin may be notified
# that the backup is back in BACKINGUP state before getting the
# last tid (from the upstream master); note that in such case,
# we would have 2 consecutive identical notifications.
delayMonitor = f.delayNotifyMonitorInformation(
lambda _, x=iter((0,)): next(x, 1))
conn.close()
newTransaction()
self.assertMonitor(upstream, 2, monitor, (backup,
'STARTING_BACKUP; OUT_OF_DATE=2, UP_TO_DATE=2; DOWN=1'))
f.remove(delayMonitor)
newTransaction()
checkMonitor()
newTransaction()
self.assertFalse(ask)
self.assertEqual(f.filtered_count, 1)
self.assertEqual(f.filtered_count, 2)
with Patch(replicator, FETCH_COUNT=1):
f.remove(delayReconnect)
self.tic()
......@@ -859,6 +913,7 @@ class ReplicationTests(NEOThreadedTest):
])
self.tic()
self.assertEqual(2, self.checkBackup(backup))
checkMonitor()
@with_cluster(start_cluster=0, replicas=1)
def testStoppingDuringReplication(self, cluster):
......
......@@ -17,7 +17,7 @@ from neo.lib.connector import SocketConnector
from neo.lib.debug import PdbSocket
from neo.lib.node import Node
from neo.lib.protocol import NodeTypes
from neo.lib.util import timeStringFromTID, p64, u64
from neo.lib.util import datetimeFromTID, p64, u64
from neo.storage.app import DATABASE_MANAGER_DICT, \
Application as StorageApplication
from neo.tests import getTempDirectory, mysql_pool
......@@ -533,7 +533,7 @@ class Application(StressApplication):
ltid = self.ltid
stdscr.addstr(y, 0,
'last oid: 0x%x\nlast tid: 0x%x (%s)\nclients: '
% (u64(self.loid), u64(ltid), timeStringFromTID(ltid)))
% (u64(self.loid), u64(ltid), datetimeFromTID(ltid)))
before = after = 0
for i, p in enumerate(self.cluster.process_dict[Client]):
if i:
......@@ -708,7 +708,7 @@ def main():
ok = tid
finally:
conn.close()
print('bad: 0x%x (%s)' % (u64(bad), timeStringFromTID(bad)))
print('bad: 0x%x (%s)' % (u64(bad), datetimeFromTID(bad)))
finally:
db.close()
finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment