Commit a4846b10 authored by Julien Muchembled's avatar Julien Muchembled

WIP: Make admin node a web-app

The goal is to get rid off the neoctl command-line tool, and to manage the
cluster via a web browser, or tools like 'wget'. Then, it will be possible to
provide an web user interface to connect to the underlying DB of any storage
node, usually a SQL client.

The design of admin app is finished:
- it's threaded like clients
- it's a WSGI app

I also hacked a HTTP API as quickly as possible to make all tests pass.

TODO:
- SSL
- define a better HTTP API
- there's no UI at all yet
- remove all unused packets from the protocol (those that were only used
  between neoctl and admin node)

There are a few dead files, not deleted yet, in case that they contain a few
pieces of useful code:
 neo/neoctl/app.py
 neo/neoctl/handler.py
 neo/scripts/neoctl.py
parent 0b34a051
...@@ -14,20 +14,89 @@ ...@@ -14,20 +14,89 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import thread
import weakref
from copy import deepcopy
from collections import defaultdict
from json import dumps as json_dumps
from logging import ERROR, INFO
from wsgiref.simple_server import WSGIRequestHandler
from bottle import Bottle, HTTPError, request, response
from . import handler
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication, buildOptionParser from neo.lib.app import buildOptionParser
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager from neo.lib.bootstrap import BootstrapManager
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger from neo.lib.pt import PartitionTable
from neo.lib.threaded_app import ThreadedApplication
from neo.lib.util import p64, tidFromTime, u64
def dry_run():
return bool(int(request.query.dry_run))
def json(x):
response.content_type = 'application/json'
return json_dumps(x)
def rowsForJson(row_list):
x = []
for row in row_list:
y = defaultdict(list)
for uuid, state in row:
y[state].append(uuid)
x.append(y)
return x
def raiseNotReady():
raise HTTPError(503, 'Not connected to a primary master')
class Bottle(Bottle):
def default_error_handler(self, res):
return res.body
class ObjectBottle(object):
def __init__(self, weakref=False, *args, **kw):
self._app = Bottle(*args, **kw)
self._weakref = weakref
def __getattr__(self, name):
return getattr(self._app, name)
def __get__(self, obj, cls):
if obj is None: return self
app = obj.bottle = deepcopy(self._app)
if self._weakref:
obj = weakref.ref(obj)
app.install(lambda f: lambda *a, **k: f(obj(), *a, **k))
else:
app.install(lambda f: lambda *a, **k: f(obj, *a, **k))
return app
class RequestHandler(WSGIRequestHandler):
def _log(self, level, format, *args):
logging.log(level, "%s %s", self.client_address[0], format % args)
def log_error(self, *args):
self._log(ERROR, *args)
def log_message(self, *args):
self._log(INFO, *args)
@buildOptionParser @buildOptionParser
class Application(BaseApplication): class Application(ThreadedApplication):
"""The storage node application.""" """The storage node application."""
bottle = ObjectBottle(weakref=True)
cluster_state = None
@classmethod @classmethod
def _buildOptionParser(cls): def _buildOptionParser(cls):
_ = cls.option_parser _ = cls.option_parser
...@@ -40,95 +109,216 @@ class Application(BaseApplication): ...@@ -40,95 +109,216 @@ class Application(BaseApplication):
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__( super(Application, self).__init__(
config.get('ssl'), config.get('dynamic_master_list')) config['masters'], config['cluster'],
for address in config['masters']: ssl=config.get('ssl'),
self.nm.createMaster(address=address) dynamic_master_list=config.get('dynamic_master_list'))
self.http = config['bind']
self.name = config['cluster'] logging.debug('IP address is %s, port is %d', *self.http)
self.server = config['bind']
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = config.get('nid') self.uuid = config.get('nid')
logging.node(self.name, self.uuid) logging.node(self.name, self.uuid)
self.request_handler = MasterRequestEventHandler(self) self.master_bootstrap_handler = handler.MasterBootstrapHandler(self)
self.master_event_handler = MasterEventHandler(self) self.master_event_handler = handler.MasterEventHandler(self)
self.cluster_state = None self.primary_handler = handler.PrimaryAnswersHandler(self)
self.reset()
registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
super(Application, self).close()
def reset(self):
self.master_conn = None
self.master_node = None
def log(self):
self.em.log()
self.nm.log()
if self.pt is not None:
self.pt.log()
def run(self):
try:
self._run()
except Exception:
logging.exception('Pre-mortem data:')
self.log()
logging.flush()
raise
def _run(self): def _run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
if len(self.name) == 0: try:
raise RuntimeError, 'cluster name must be non-empty' poll = self.em.poll
while self.cluster_state != ClusterStates.STOPPING:
# Make a listening port. self.connectToPrimary()
handler = AdminEventHandler(self) try:
self.listening_conn = ListeningConnection(self, handler, self.server) while True:
poll(1)
while self.cluster_state != ClusterStates.STOPPING: except PrimaryFailure:
self.connectToPrimary() self.nm.log()
try: logging.error('primary master is down')
while True: finally:
self.em.poll(1) self.master_conn = None
except PrimaryFailure: while not self.em.isIdle():
logging.error('primary master is down') poll(1)
self.listening_conn.close() finally:
while not self.em.isIdle(): self.interrupt_main()
self.em.poll(1)
interrupt_main = staticmethod(thread.interrupt_main)
def connectToPrimary(self): def connectToPrimary(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat If a primary master node is not elected or ready, repeat
the attempt of a connection periodically. the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes
at this stage.
""" """
self.pt = PartitionTable(0, 0)
self.master_node = None
self.uuid = None
self.cluster_state = None self.cluster_state = None
# search, find, connect and identify to the primary master bootstrap = BootstrapManager(self, NodeTypes.ADMIN)
bootstrap = BootstrapManager(self, NodeTypes.ADMIN, self.server) self.master_node, conn = bootstrap.getPrimaryConnection()
self.master_node, self.master_conn = bootstrap.getPrimaryConnection() conn.setHandler(self.master_bootstrap_handler)
conn.ask(Packets.AskClusterState())
conn.convertToMT(self.dispatcher)
conn.setHandler(self.master_event_handler)
self.master_conn = conn
def _askPrimary(self, packet, **kw):
""" Send a request to the primary master and process its answer """
return self._ask(self._getMasterConnection(), packet,
handler=self.primary_handler, **kw)
def _getMasterConnection(self):
conn = self.master_conn
if conn is None or conn.isClosed():
raiseNotReady()
return conn
def serve(self, **kw):
self.start()
host, port = self.http
self.bottle.run(server='wsgiref', handler_class=RequestHandler,
quiet=1, host=host, port=port)
def asTID(self, value):
if '.' in value:
return tidFromTime(float(value))
return p64(int(value, 0))
def queryStorageList(self, allow_all=True):
node_list = request.query.node_list
if allow_all and node_list == 'all':
return [node.getUUID() for node in self.nm.getStorageList()]
return map(int, request.query.node_list.split(',')) if node_list else ()
@bottle.route('/getClusterState')
def getClusterState(self):
if self.cluster_state is not None:
return str(self.cluster_state)
def _setClusterState(self, state):
return self._askPrimary(Packets.SetClusterState(state))
@bottle.route('/setClusterState')
def setClusterState(self):
self._setClusterState(getattr(ClusterStates, request.query.state))
@bottle.route('/startCluster')
def startCluster(self):
self._setClusterState(ClusterStates.VERIFYING)
@bottle.route('/enableStorageList')
def enableStorageList(self):
return self._askPrimary(Packets.AddPendingNodes(
self.queryStorageList()))
@bottle.route('/tweakPartitionTable')
def tweakPartitionTable(self):
changed, row_list = self._askPrimary(Packets.TweakPartitionTable(
dry_run(), self.queryStorageList(False)))
return json((changed, rowsForJson(row_list)))
# passive handler @bottle.route('/setNumReplicas')
self.master_conn.setHandler(self.master_event_handler) def setNumReplicas(self):
self.master_conn.ask(Packets.AskClusterState()) return self._askPrimary(Packets.SetNumReplicas(int(request.query.nr)))
def sendPartitionTable(self, conn, min_offset, max_offset, uuid): @bottle.route('/getLastIds')
def getLastIds(self):
loid, ltid = self._askPrimary(Packets.AskLastIDs())
return json((u64(loid), u64(ltid)))
@bottle.route('/getLastTransaction')
def getLastTransaction(self):
return self._askPrimary(Packets.AskLastTransaction())
@bottle.route('/getRecovery')
def getRecovery(self):
ptid, backup_tid, truncate_tid = self._askPrimary(Packets.AskRecovery())
return json((ptid,
backup_tid and u64(backup_tid),
truncate_tid and u64(truncate_tid),
))
@bottle.route('/getNodeList')
def getNodeList(self):
node_type = request.query.node_type
if node_type:
node_type = getattr(NodeTypes, node_type)
node_filter = lambda node: node.getType() is node_type
else:
node_filter = None
node_list = []
self._getMasterConnection()
for node in self.nm.getList(node_filter):
node_type, address, uuid, state, id_timestamp = \
node = node.asTuple()
node_list.append((str(node_type), address, uuid,
str(state), id_timestamp))
return json(node_list)
@bottle.route('/getPrimary')
def getPrimary(self):
return str(getattr(self.master_node, 'getUUID', raiseNotReady)())
def _setNodeState(self, node, state):
return self._askPrimary(Packets.SetNodeState(node, state))
@bottle.route('/dropNode')
def dropNode(self):
self._setNodeState(int(request.query.node), NodeStates.UNKNOWN)
@bottle.route('/killNode')
def killNode(self):
self._setNodeState(int(request.query.node), NodeStates.DOWN)
@bottle.route('/getPartitionRowList')
def getPartitionRowList(self):
q = request.query
min_offset = q.min_offset
max_offset = q.max_offset
uuid = q.node
pt = self.pt pt = self.pt
if max_offset == 0:
max_offset = pt.getPartitions()
try: try:
row_list = map(pt.getRow, xrange(min_offset, max_offset)) min_offset = int(min_offset) if min_offset else 0
max_offset = max_offset and int(max_offset) or pt.getPartitions()
uuid = int(uuid) if uuid else None
except ValueError, e:
raise HTTPError(400, str(e))
row_list = []
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append(row)
except IndexError: except IndexError:
conn.send(Errors.ProtocolError('invalid partition table offset')) raise HTTPError(400, 'invalid partition table offset')
else: return json((pt.getID(), pt.getReplicas(), rowsForJson(row_list)))
conn.answer(Packets.AnswerPartitionList(
pt.getID(), pt.getReplicas(), row_list)) @bottle.route('/repair')
def repair(self):
return self._askPrimary(Packets.Repair(
self.queryStorageList(), dry_run()))
@bottle.route('/truncate')
def truncate(self):
return self._askPrimary(Packets.Truncate(self.asTID(request.query.tid)))
@bottle.route('/checkReplicas')
def checkReplicas(self):
partition_dict = {}
for partition in request.query.pt.split(','):
partition, source = partition.split(':')
source = int(source) if source else None
if partition:
partition_dict[int(partition)] = source
elif partition_dict:
raise HTTPError(400)
else:
self._getMasterConnection() # just for correct error handling
partition_dict = dict.fromkeys(xrange(self.pt.getPartitions()),
source)
max_tid = request.query.max_tid
return self._askPrimary(Packets.CheckReplicas(partition_dict,
self.asTID(request.query.min_tid),
self.asTID(max_tid) if max_tid else None))
...@@ -14,102 +14,25 @@ ...@@ -14,102 +14,25 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging, protocol from bottle import HTTPError
from neo.lib.handler import EventHandler from neo.lib.handler import AnswerBaseHandler, EventHandler, MTEventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
def check_primary_master(func): class MasterBootstrapHandler(EventHandler):
def wrapper(self, *args, **kw):
if self.app.master_conn is not None:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return check_primary_master(lambda self, conn, *args, **kw:
self.app.master_conn.ask(klass(*args, **kw),
conn=conn, msg_id=conn.getPeerId()))
class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
@check_primary_master
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
def askNodeList(self, conn, node_type):
if node_type is None:
node_type = 'all'
node_filter = None
else:
node_filter = lambda n: n.getType() is node_type
logging.info("ask list of %s nodes", node_type)
node_list = self.app.nm.getList(node_filter)
node_information_list = [node.asTuple() for node in node_list ]
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p)
@check_primary_master
def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master
def askPrimary(self, conn):
master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
@check_primary_master
def flushLog(self, conn):
self.app.master_conn.send(Packets.FlushLog())
super(AdminEventHandler, self).flushLog(conn)
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
askRecovery = forward_ask(Packets.AskRecovery)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
setNumReplicas = forward_ask(Packets.SetNumReplicas)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
class MasterEventHandler(EventHandler):
""" This class is just used to dispatch message to right handler"""
def _connectionLost(self, conn):
app = self.app
if app.listening_conn: # if running
assert app.master_conn in (conn, None)
conn.cancelRequests("connection to master lost")
app.reset()
app.uuid = None
raise PrimaryFailure
def connectionFailed(self, conn): def connectionFailed(self, conn):
self._connectionLost(conn) raise AssertionError
def connectionClosed(self, conn): def connectionClosed(self, conn):
self._connectionLost(conn) app = self.app
try:
def dispatch(self, conn, packet, kw={}): app.__dict__.pop('pt').clear()
if 'conn' in kw: except KeyError:
# expected answer pass
if packet.isResponse(): if app.master_conn is not None:
packet.setId(kw['msg_id']) assert app.master_conn is conn
kw['conn'].answer(packet) raise PrimaryFailure
else:
self.app.request_handler.dispatch(conn, packet, kw)
else:
# unexpected answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet, kw)
def answerClusterState(self, conn, state): def answerClusterState(self, conn, state):
self.app.cluster_state = state self.app.cluster_state = state
...@@ -124,7 +47,26 @@ class MasterEventHandler(EventHandler): ...@@ -124,7 +47,26 @@ class MasterEventHandler(EventHandler):
def notifyClusterInformation(self, conn, cluster_state): def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state self.app.cluster_state = cluster_state
class MasterEventHandler(MasterBootstrapHandler, MTEventHandler):
pass
class MasterRequestEventHandler(EventHandler): class PrimaryAnswersHandler(AnswerBaseHandler):
""" This class handle all answer from primary master node""" """ This class handle all answer from primary master node"""
# XXX: to be deleted ?
def ack(self, conn, message):
super(PrimaryAnswersHandler, self).ack(conn, message)
self.app.setHandlerData(message)
def denied(self, conn, message):
raise HTTPError(405, message)
def protocolError(self, conn, message):
raise HTTPError(500, message)
def answerClusterState(self, conn, state):
self.app.cluster_state = state
answerRecovery = \
answerTweakPartitionTable = \
lambda self, conn, *args: self.app.setHandlerData(args)
...@@ -607,6 +607,14 @@ class ClientConnection(Connection): ...@@ -607,6 +607,14 @@ class ClientConnection(Connection):
handler.connectionStarted(self) handler.connectionStarted(self)
self._connect() self._connect()
def convertToMT(self, dispatcher):
# XXX: The bootstrap code of the client should at least been moved to
# threaded_app so that the admin can reuse it. Ideally, we'd like
# to also merge with BootstrapManager.
assert self.__class__ is ClientConnection, self
self.__class__ = MTClientConnection
self._initMT(dispatcher)
def _connect(self): def _connect(self):
try: try:
connected = self.connector.makeClientConnection() connected = self.connector.makeClientConnection()
...@@ -701,11 +709,14 @@ class MTClientConnection(ClientConnection): ...@@ -701,11 +709,14 @@ class MTClientConnection(ClientConnection):
return func(*args, **kw) return func(*args, **kw)
return wrapper return wrapper
def __init__(self, *args, **kwargs): def __init__(self, *args, **kw):
self.lock = lock = RLock() self._initMT(kw.pop('dispatcher'))
self.dispatcher = kwargs.pop('dispatcher') with self.lock:
with lock: super(MTClientConnection, self).__init__(*args, **kw)
super(MTClientConnection, self).__init__(*args, **kwargs)
def _initMT(self, dispatcher):
self.lock = RLock()
self.dispatcher = dispatcher
# Alias without lock (cheaper than super()) # Alias without lock (cheaper than super())
_ask = ClientConnection.ask.__func__ _ask = ClientConnection.ask.__func__
......
...@@ -72,7 +72,9 @@ class ThreadedApplication(BaseApplication): ...@@ -72,7 +72,9 @@ class ThreadedApplication(BaseApplication):
logging.debug('Stopping %s', self.poll_thread) logging.debug('Stopping %s', self.poll_thread)
self.em.wakeup(thread.exit) self.em.wakeup(thread.exit)
else: else:
super(ThreadedApplication, self).close() self._close()
_close = BaseApplication.close.__func__
def start(self): def start(self):
self.poll_thread.is_alive() or self.poll_thread.start() self.poll_thread.is_alive() or self.poll_thread.start()
...@@ -82,7 +84,7 @@ class ThreadedApplication(BaseApplication): ...@@ -82,7 +84,7 @@ class ThreadedApplication(BaseApplication):
try: try:
self._run() self._run()
finally: finally:
super(ThreadedApplication, self).close() self._close()
logging.debug("Poll thread stopped") logging.debug("Poll thread stopped")
def _run(self): def _run(self):
...@@ -135,8 +137,15 @@ class ThreadedApplication(BaseApplication): ...@@ -135,8 +137,15 @@ class ThreadedApplication(BaseApplication):
handler.dispatch(conn, packet, kw) handler.dispatch(conn, packet, kw)
def _ask(self, conn, packet, handler=None, **kw): def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None) # The following line is more than optimization. If an admin node sends
queue = self._thread_container.queue # a packet that causes the master to disconnect (e.g. stop a cluster),
# we want at least to return the answer for this request, even if the
# polling thread already exited and cleared self.__dict__: returning
# the result of getHandlerData() would raise an AttributeError.
# This is tested by testShutdown (neo.tests.threaded.test.Test).
thread_container = self._thread_container
thread_container.answer = None
queue = thread_container.queue
msg_id = conn.ask(packet, queue=queue, **kw) msg_id = conn.ask(packet, queue=queue, **kw)
get = queue.get get = queue.get
_handlePacket = self._handlePacket _handlePacket = self._handlePacket
...@@ -144,6 +153,5 @@ class ThreadedApplication(BaseApplication): ...@@ -144,6 +153,5 @@ class ThreadedApplication(BaseApplication):
qconn, qpacket, kw = get(True) qconn, qpacket, kw = get(True)
if conn is qconn and msg_id == qpacket.getId(): if conn is qconn and msg_id == qpacket.getId():
_handlePacket(qconn, qpacket, kw, handler) _handlePacket(qconn, qpacket, kw, handler)
break return thread_container.answer # see above comment
_handlePacket(qconn, qpacket, kw) _handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
# #
# Copyright (C) 2006-2019 Nexedi SA # Copyright (C) 2006-2015 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -14,205 +14,143 @@ ...@@ -14,205 +14,143 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse import json, socket, sys
from neo.lib import util from urllib import urlencode
from neo.lib.app import BaseApplication, buildOptionParser from urllib2 import urlopen, HTTPError, URLError
from neo.lib.connection import ClientConnection, ConnectionClosed from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, NodeStates, \
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets ZERO_TID
from .handler import CommandEventHandler from neo.lib.util import p64, u64
def ptFromJson(row_list):
return tuple(
tuple(sorted((uuid, getattr(CellStates, state))
for state, uuid_list in row.iteritems()
for uuid in uuid_list))
for row in row_list)
class NotReadyException(Exception): class NotReadyException(Exception):
pass pass
@buildOptionParser class NeoCTL(object):
class NeoCTL(BaseApplication):
_open = staticmethod(urlopen)
connection = None
connected = False def __init__(self, address, ssl=None):
host, port = address
@classmethod if ":" in host:
def _buildOptionParser(cls): host = "[%s]" % host
# XXX: Use argparse sub-commands. self.base_url = "http://%s:%s/" % (host, port)
parser = cls.option_parser
parser.description = "NEO Control node" def _ask(self, path, **kw):
parser('a', 'address', default='127.0.0.1:9999', if kw:
parse=lambda x: util.parseNodeAddress(x, 9999), path += "?" + urlencode(sorted(x for x in kw.iteritems()
help="address of an admin node") if '' is not x[1] is not None))
parser.argument('cmd', nargs=argparse.REMAINDER, try:
help="command to execute; if not supplied," return self._open(self.base_url + path).read()
" the list of available commands is displayed") except URLError, e:
if isinstance(e, HTTPError):
def __init__(self, address, **kw): body = e.read()
super(NeoCTL, self).__init__(**kw) if e.code != 503:
self.server = self.nm.createAdmin(address=address) sys.exit(body)
self.handler = CommandEventHandler(self) else:
self.response_queue = [] body = str(e)
raise NotReadyException(body)
def __getConnection(self):
if not self.connected:
self.connection = ClientConnection(self, self.handler, self.server)
# Never delay reconnection to master. This speeds up unit tests
# and it should not change anything for normal use.
try:
self.connection.setReconnectionNoDelay()
except ConnectionClosed:
self.connection = None
while not self.connected:
if self.connection is None:
raise NotReadyException('not connected')
self.em.poll(1)
return self.connection
def __ask(self, packet):
# TODO: make thread-safe
connection = self.__getConnection()
connection.ask(packet)
response_queue = self.response_queue
assert len(response_queue) == 0
while self.connected:
self.em.poll(1)
if response_queue:
break
else:
raise NotReadyException, 'Connection closed'
response = response_queue.pop()
if response[0] == Packets.Error and \
response[1] == ErrorCodes.NOT_READY:
raise NotReadyException(response[2])
return response
def enableStorageList(self, uuid_list): def enableStorageList(self, uuid_list):
""" """
Put all given storage nodes in "running" state. Put all given storage nodes in "running" state.
""" """
packet = Packets.AddPendingNodes(uuid_list) return self._ask('enableStorageList',
response = self.__ask(packet) node_list=','.join(map(str, uuid_list)))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def tweakPartitionTable(self, uuid_list=(), dry_run=False): def tweakPartitionTable(self, uuid_list=(), dry_run=False):
response = self.__ask(Packets.TweakPartitionTable(dry_run, uuid_list)) changed, row_list = json.loads(self._ask('tweakPartitionTable',
if response[0] != Packets.AnswerTweakPartitionTable: node_list=','.join(map(str, uuid_list)),
raise RuntimeError(response) dry_run=int(dry_run)))
return response[1:] return changed, ptFromJson(row_list)
def setNumReplicas(self, nr): def setNumReplicas(self, nr):
response = self.__ask(Packets.SetNumReplicas(nr)) return self._ask('setNumReplicas', nr=nr)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def setClusterState(self, state): def setClusterState(self, state):
""" """
Set cluster state. Set cluster state.
""" """
packet = Packets.SetClusterState(state) return self._ask('setClusterState', state=state)
response = self.__ask(packet)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def _setNodeState(self, node, state):
"""
Kill node, or remove it permanently
"""
response = self.__ask(Packets.SetNodeState(node, state))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def getClusterState(self): def getClusterState(self):
""" """
Get cluster state. Get cluster state.
""" """
packet = Packets.AskClusterState() state = self._ask('getClusterState')
response = self.__ask(packet) if state:
if response[0] != Packets.AnswerClusterState: return getattr(ClusterStates, state)
raise RuntimeError(response)
return response[1]
def getLastIds(self): def getLastIds(self):
response = self.__ask(Packets.AskLastIDs()) loid, ltid = json.loads(self._ask('getLastIds'))
if response[0] != Packets.AnswerLastIDs: return p64(loid), p64(ltid)
raise RuntimeError(response)
return response[1:]
def getLastTransaction(self): def getLastTransaction(self):
response = self.__ask(Packets.AskLastTransaction()) return p64(int(self._ask('getLastTransaction')))
if response[0] != Packets.AnswerLastTransaction:
raise RuntimeError(response)
return response[1]
def getRecovery(self): def getRecovery(self):
response = self.__ask(Packets.AskRecovery()) ptid, backup_tid, truncate_tid = json.loads(self._ask('getRecovery'))
if response[0] != Packets.AnswerRecovery: return (ptid,
raise RuntimeError(response) None if backup_tid is None else p64(backup_tid),
return response[1:] None if truncate_tid is None else p64(truncate_tid),
)
def getNodeList(self, node_type=None): def getNodeList(self, node_type=None):
""" """
Get a list of nodes, filtering with given type. Get a list of nodes, filtering with given type.
""" """
packet = Packets.AskNodeList(node_type) node_list = json.loads(self._ask('getNodeList', node_type=node_type))
response = self.__ask(packet) return [(getattr(NodeTypes, node_type), address and tuple(address),
if response[0] != Packets.AnswerNodeList: uuid, getattr(NodeStates, state), id_timestamp)
raise RuntimeError(response) for node_type, address, uuid, state, id_timestamp in node_list]
return response[1] # node_list
def getPartitionRowList(self, min_offset=0, max_offset=0, node=None): def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
""" """
Get a list of partition rows, bounded by min & max and involving Get a list of partition rows, bounded by min & max and involving
given node. given node.
""" """
packet = Packets.AskPartitionList(min_offset, max_offset, node) ptid, nr, row_list = json.loads(self._ask('getPartitionRowList',
response = self.__ask(packet) min_offset=min_offset, max_offset=max_offset, node=node))
if response[0] != Packets.AnswerPartitionList: from pprint import pprint as pp
raise RuntimeError(response) return ptid, nr, ptFromJson(row_list)
return response[1:]
def startCluster(self): def startCluster(self):
""" """
Set cluster into "verifying" state. Set cluster into "verifying" state.
""" """
return self.setClusterState(ClusterStates.VERIFYING) return self._ask('startCluster')
def killNode(self, node): def killNode(self, node):
return self._setNodeState(node, NodeStates.DOWN) return self._ask('killNode', node=node)
def dropNode(self, node): def dropNode(self, node):
return self._setNodeState(node, NodeStates.UNKNOWN) return self._ask('dropNode', node=node)
def getPrimary(self): def getPrimary(self):
""" """
Return the primary master UUID. Return the primary master UUID.
""" """
packet = Packets.AskPrimary() return int(self._ask('getPrimary'))
response = self.__ask(packet)
if response[0] != Packets.AnswerPrimary: def repair(self, uuid_list, dry_run):
raise RuntimeError(response) return self._ask('repair',
return response[1] node_list=','.join(map(str, uuid_list)),
dry_run=int(dry_run))
def repair(self, *args):
response = self.__ask(Packets.Repair(*args))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def truncate(self, tid): def truncate(self, tid):
response = self.__ask(Packets.Truncate(tid)) return self._ask('truncate', tid=u64(tid))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def checkReplicas(self, *args): def checkReplicas(self, partition_dict, min_tid=ZERO_TID, max_tid=None):
response = self.__ask(Packets.CheckReplicas(*args)) kw = {'pt': ','.join('%s:%s' % (k, '' if v is None else v)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK: for k, v in partition_dict.iteritems())}
raise RuntimeError(response) if max_tid is not None:
return response[2] kw['max_tid'] = u64(max_tid)
return self._ask('checkReplicas', min_tid=u64(min_tid), **kw)
def flushLog(self): def flushLog(self):
conn = self.__getConnection() return self._ask('flushLog')
conn.send(Packets.FlushLog())
while conn.pending():
self.em.poll(1)
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
from neo.lib import logging from neo.lib import logging
def main(args=None): def main(args=None):
from neo.admin.app import Application from neo.admin.app import Application
config = Application.option_parser.parse(args) config = Application.option_parser.parse(args)
...@@ -28,5 +29,5 @@ def main(args=None): ...@@ -28,5 +29,5 @@ def main(args=None):
# and then, load and run the application # and then, load and run the application
app = Application(config) app = Application(config)
app.run() app.serve()
...@@ -702,14 +702,16 @@ class NEOCluster(object): ...@@ -702,14 +702,16 @@ class NEOCluster(object):
def expectStorageUnknown(self, process, *args, **kw): def expectStorageUnknown(self, process, *args, **kw):
process_uuid = process.getUUID() process_uuid = process.getUUID()
def expected_storage_not_known(last_try): def expected_storage_not_known(last_try):
for storage in self.getStorageList(): try:
if storage[2] == process_uuid: for storage in self.getStorageList():
return False, storage if storage[2] == process_uuid:
return False, storage
except NotReadyException:
return False, None
return True, None return True, None
self.expectCondition(expected_storage_not_known, *args, **kw) self.expectCondition(expected_storage_not_known, *args, **kw)
def __del__(self): def __del__(self):
self.neoctl.close()
if self.cleanup_on_delete: if self.cleanup_on_delete:
os.removedirs(self.temp_dir) os.removedirs(self.temp_dir)
......
...@@ -20,13 +20,15 @@ import os, random, select, socket, sys, tempfile ...@@ -20,13 +20,15 @@ import os, random, select, socket, sys, tempfile
import thread, threading, time, traceback, weakref import thread, threading, time, traceback, weakref
from collections import deque from collections import deque
from contextlib import contextmanager from contextlib import contextmanager
from cStringIO import StringIO
from itertools import count from itertools import count
from functools import partial, wraps from functools import partial, wraps
from urllib import splitquery
from zlib import decompress from zlib import decompress
import transaction, ZODB import bottle, transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app import neo.client.app, neo.neoctl.app
from neo.admin.handler import MasterEventHandler from neo.admin.handler import MasterBootstrapHandler
from neo.client import Storage from neo.client import Storage
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import BaseConnection, \ from neo.lib.connection import BaseConnection, \
...@@ -38,6 +40,7 @@ from neo.lib.protocol import uuid_str, \ ...@@ -38,6 +40,7 @@ from neo.lib.protocol import uuid_str, \
ClusterStates, Enum, NodeStates, NodeTypes, Packets ClusterStates, Enum, NodeStates, NodeTypes, Packets
from neo.lib.util import cached_property, parseMasterList, p64 from neo.lib.util import cached_property, parseMasterList, p64
from neo.master.recovery import RecoveryManager from neo.master.recovery import RecoveryManager
from neo.neoctl.neoctl import NotReadyException
from .. import (getTempDirectory, setupMySQLdb, from .. import (getTempDirectory, setupMySQLdb,
ImporterConfigParser, NeoTestBase, Patch, ImporterConfigParser, NeoTestBase, Patch,
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX) ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX)
...@@ -237,7 +240,7 @@ class Serialized(object): ...@@ -237,7 +240,7 @@ class Serialized(object):
next_lock.release() next_lock.release()
cls._sched_lock.acquire() cls._sched_lock.acquire()
def __init__(self, app, busy=True): def __init__(self, app):
if self._disabled: if self._disabled:
return return
self._epoll = app.em.epoll self._epoll = app.em.epoll
...@@ -245,8 +248,7 @@ class Serialized(object): ...@@ -245,8 +248,7 @@ class Serialized(object):
# XXX: It may have been initialized before the SimpleQueue is patched. # XXX: It may have been initialized before the SimpleQueue is patched.
thread_container = getattr(app, '_thread_container', None) thread_container = getattr(app, '_thread_container', None)
thread_container is None or thread_container.__init__() thread_container is None or thread_container.__init__()
if busy: self._busy.add(self) # block tic until app waits for polling
self._busy.add(self) # block tic until app waits for polling
def __getattr__(self, attr): def __getattr__(self, attr):
if attr in ('close', 'modify', 'register', 'unregister'): if attr in ('close', 'modify', 'register', 'unregister'):
...@@ -284,23 +286,6 @@ class Serialized(object): ...@@ -284,23 +286,6 @@ class Serialized(object):
cls._epoll.unregister(fd) cls._epoll.unregister(fd)
self._release_next() self._release_next()
class TestSerialized(Serialized):
def __init__(*args):
Serialized.__init__(busy=False, *args)
def poll(self, timeout):
if timeout:
for x in TIC_LOOP:
r = self._epoll.poll(0)
if r:
return r
Serialized.tic(step=1, timeout=.001)
ConnectionFilter.log()
raise Exception("tic is looping forever")
return self._epoll.poll(timeout)
class Node(object): class Node(object):
def getConnectionList(self, *peers): def getConnectionList(self, *peers):
...@@ -321,6 +306,26 @@ class Node(object): ...@@ -321,6 +306,26 @@ class Node(object):
def filterConnection(self, *peers): def filterConnection(self, *peers):
return ConnectionFilter(self.getConnectionList(*peers)) return ConnectionFilter(self.getConnectionList(*peers))
def start(self):
isinstance(self.em.epoll, Serialized) or Serialized(self)
super(Node, self).start()
def _run(self):
try:
super(Node, self)._run()
finally:
self._afterRun()
if isinstance(self.em.epoll, Serialized):
self.em.epoll.exit()
def _afterRun(self):
logging.debug('stopping %r', self)
try:
self.listening_conn.close()
self.listening_conn = None
except AttributeError:
pass
class ServerNode(Node): class ServerNode(Node):
_server_class_dict = {} _server_class_dict = {}
...@@ -391,26 +396,6 @@ class ServerNode(Node): ...@@ -391,26 +396,6 @@ class ServerNode(Node):
self.close() self.close()
self.__init__(**init_args) self.__init__(**init_args)
def start(self):
Serialized(self)
threading.Thread.start(self)
def run(self):
try:
super(ServerNode, self).run()
finally:
self._afterRun()
logging.debug('stopping %r', self)
if isinstance(self.em.epoll, Serialized):
self.em.epoll.exit()
def _afterRun(self):
try:
self.listening_conn.close()
self.listening_conn = None
except AttributeError:
pass
def getListeningAddress(self): def getListeningAddress(self):
try: try:
return self.listening_conn.getAddress() return self.listening_conn.getAddress()
...@@ -420,8 +405,39 @@ class ServerNode(Node): ...@@ -420,8 +405,39 @@ class ServerNode(Node):
def stop(self): def stop(self):
self.em.wakeup(thread.exit) self.em.wakeup(thread.exit)
class AdminApplication(ServerNode, neo.admin.app.Application): class ThreadedNode(Node):
pass
def __init__(self, *args, **kw):
self._on_thread_exit = kw.pop("on_thread_exit", None)
super(ThreadedNode, self).__init__(*args, **kw)
self.poll_thread.node_name = getattr(self, 'node_name', self.name)
def _afterRun(self):
pass
def _close(self):
x = self._on_thread_exit
x is None or x()
super(ThreadedNode, self)._close()
class AdminApplication(ThreadedNode, ServerNode, neo.admin.app.Application):
def interrupt_main(self):
pass
def start(self):
super(AdminApplication, self).start()
return # XXX
host, port = BIND
l = threading.Lock()
l.acquire()
self.start = l.release
try:
self.run = lambda: self.serve(host=host, port=port)
threading.Thread.start(self)
l.acquire()
finally:
del self.start, self.run
class MasterApplication(ServerNode, neo.master.app.Application): class MasterApplication(ServerNode, neo.master.app.Application):
pass pass
...@@ -454,13 +470,12 @@ class StorageApplication(ServerNode, neo.storage.app.Application): ...@@ -454,13 +470,12 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
(r,), = self.dm.query("SELECT COUNT(*) FROM " + table) (r,), = self.dm.query("SELECT COUNT(*) FROM " + table)
return r return r
class ClientApplication(Node, neo.client.app.Application): class ClientApplication(ThreadedNode, neo.client.app.Application):
max_reconnection_to_master = 10 max_reconnection_to_master = 10
def __init__(self, master_nodes, name, **kw): def __init__(self, *args, **kw):
super(ClientApplication, self).__init__(master_nodes, name, **kw) super(ClientApplication, self).__init__(*args, **kw)
self.poll_thread.node_name = name
# Smaller cache to speed up tests that checks behaviour when it's too # Smaller cache to speed up tests that checks behaviour when it's too
# small. See also NEOCluster.cache_size # small. See also NEOCluster.cache_size
self._cache.max_size //= 1024 self._cache.max_size //= 1024
...@@ -497,10 +512,26 @@ class ClientApplication(Node, neo.client.app.Application): ...@@ -497,10 +512,26 @@ class ClientApplication(Node, neo.client.app.Application):
conn.close() conn.close()
class NeoCTL(neo.neoctl.app.NeoCTL): class NeoCTL(neo.neoctl.app.NeoCTL):
# Bypass HTTP layer
base_url = '/'
def __init__(self, *args, **kw): def __init__(self, cluster):
super(NeoCTL, self).__init__(*args, **kw) self._cluster = cluster
TestSerialized(self)
def _open(self, path):
environ = {'REQUEST_METHOD': 'GET'}
environ['PATH_INFO'], environ['QUERY_STRING'] = splitquery(path)
bottle.request.bind(environ)
route, args = self._cluster.admin.bottle.match(environ)
try:
result = route.call(*args)
if isinstance(result, Exception):
raise result
except bottle.HTTPError, e:
if e.status_code != 503:
sys.exit(e.body)
raise NotReadyException
return StringIO(result or '')
class LoggerThreadName(str): class LoggerThreadName(str):
...@@ -655,6 +686,7 @@ class ConnectionFilter(object): ...@@ -655,6 +686,7 @@ class ConnectionFilter(object):
class NEOCluster(object): class NEOCluster(object):
admin = None
SSL = None SSL = None
def __init__(orig, self): # temporary definition for SimpleQueue patch def __init__(orig, self): # temporary definition for SimpleQueue patch
...@@ -726,7 +758,7 @@ class NEOCluster(object): ...@@ -726,7 +758,7 @@ class NEOCluster(object):
master_list = [MasterApplication.newAddress() master_list = [MasterApplication.newAddress()
for _ in xrange(master_count)] for _ in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % x for x in master_list) self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
kw = dict(replicas=replicas, adapter=adapter, self._server_kw = kw = dict(replicas=replicas, adapter=adapter,
partitions=partitions, reset=clear_databases, dedup=dedup) partitions=partitions, reset=clear_databases, dedup=dedup)
kw['cluster'] = weak_self = weakref.proxy(self) kw['cluster'] = weak_self = weakref.proxy(self)
kw['ssl'] = self.SSL kw['ssl'] = self.SSL
...@@ -759,13 +791,13 @@ class NEOCluster(object): ...@@ -759,13 +791,13 @@ class NEOCluster(object):
kw['wait'] = 0 kw['wait'] = 0
self.storage_list = [StorageApplication(database=db(x), **kw) self.storage_list = [StorageApplication(database=db(x), **kw)
for x in db_list] for x in db_list]
self.admin_list = [AdminApplication(**kw)] self.neoctl = NeoCTL(weak_self)
def __repr__(self): def __repr__(self):
return "<%s(%s) at 0x%x>" % (self.__class__.__name__, return "<%s(%s) at 0x%x>" % (self.__class__.__name__,
self.name, id(self)) self.name, id(self))
# A few shortcuts that work when there's only 1 master/storage/admin # A few shortcuts that work when there's only 1 master/storage
@property @property
def master(self): def master(self):
master, = self.master_list master, = self.master_list
...@@ -774,10 +806,6 @@ class NEOCluster(object): ...@@ -774,10 +806,6 @@ class NEOCluster(object):
def storage(self): def storage(self):
storage, = self.storage_list storage, = self.storage_list
return storage return storage
@property
def admin(self):
admin, = self.admin_list
return admin
### ###
# More handy shortcuts for tests # More handy shortcuts for tests
...@@ -808,11 +836,13 @@ class NEOCluster(object): ...@@ -808,11 +836,13 @@ class NEOCluster(object):
def start(self, storage_list=None, master_list=None, recovering=False): def start(self, storage_list=None, master_list=None, recovering=False):
self.started = True self.started = True
self._patch() self._patch()
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
if master_list is None: if master_list is None:
master_list = self.master_list master_list = self.master_list
if storage_list is None: if storage_list is None:
storage_list = self.storage_list storage_list = self.storage_list
self.admin = AdminApplication(
on_thread_exit=lambda: delattr(self, "admin"),
**self._server_kw)
def sendPartitionTable(release, orig, *args): def sendPartitionTable(release, orig, *args):
orig(*args) orig(*args)
release() release()
...@@ -828,15 +858,14 @@ class NEOCluster(object): ...@@ -828,15 +858,14 @@ class NEOCluster(object):
orig(handler, conn, state) orig(handler, conn, state)
if state in expected_state: if state in expected_state:
release() release()
with Serialized.until(MasterEventHandler, with Serialized.until(MasterBootstrapHandler,
sendPartitionTable=sendPartitionTable) as tic1, \ sendPartitionTable=sendPartitionTable) as tic1, \
Serialized.until(RecoveryManager, dispatch=dispatch) as tic2, \ Serialized.until(RecoveryManager, dispatch=dispatch) as tic2, \
Serialized.until(MasterEventHandler, Serialized.until(MasterBootstrapHandler,
notifyClusterInformation=notifyClusterInformation) as tic3: notifyClusterInformation=notifyClusterInformation) as tic3:
for node in master_list: for node in master_list:
node.start() node.start()
for node in self.admin_list: self.admin.start()
node.start()
tic1() tic1()
for node in storage_list: for node in storage_list:
node.start() node.start()
...@@ -866,39 +895,36 @@ class NEOCluster(object): ...@@ -866,39 +895,36 @@ class NEOCluster(object):
if self.started: if self.started:
del self.started del self.started
logging.debug("stopping %s", self) logging.debug("stopping %s", self)
admin = self.admin
admin is None or admin.close()
client = self.__dict__.get("client") client = self.__dict__.get("client")
client is None or self.__dict__.pop("db", client).close() client is None or self.__dict__.pop("db", client).close()
node_list = self.admin_list + self.storage_list + self.master_list node_list = self.storage_list + self.master_list
for node in node_list: for node in node_list:
node.stop() node.stop()
try: self.join(node_list, (admin, client))
node_list.append(client.poll_thread)
except AttributeError: # client is None or thread is already stopped
pass
self.join(node_list)
self.neoctl.close()
del self.neoctl
logging.debug("stopped %s", self) logging.debug("stopped %s", self)
self._unpatch() self._unpatch()
if clear_database is None: if clear_database is None:
try: try:
for node_type in 'admin', 'storage', 'master': for node in self.storage_list:
for node in getattr(self, node_type + '_list'): node.close()
node.close() for node in self.master_list:
node.close()
except: except:
__print_exc() __print_exc()
raise raise
else: else:
for node_type in 'master', 'storage', 'admin': for node_type in 'master', 'storage':
reset_kw = kw.copy() reset_kw = kw.copy()
if node_type == 'storage': if node_type == 'storage':
reset_kw['reset'] = clear_database reset_kw['reset'] = clear_database
for node in getattr(self, node_type + '_list'): for node in getattr(self, node_type + '_list'):
node.resetNode(**reset_kw) node.resetNode(**reset_kw)
def _newClient(self): def _newClient(self, **kw):
return ClientApplication(name=self.name, master_nodes=self.master_nodes, return ClientApplication(name=self.name, master_nodes=self.master_nodes,
compress=self.compress, ssl=self.SSL) compress=self.compress, ssl=self.SSL, **kw)
@contextmanager @contextmanager
def newClient(self, with_db=False): def newClient(self, with_db=False):
...@@ -918,14 +944,7 @@ class NEOCluster(object): ...@@ -918,14 +944,7 @@ class NEOCluster(object):
@cached_property @cached_property
def client(self): def client(self):
client = self._newClient() return self._newClient(on_thread_exit=lambda: delattr(self, "client"))
# Make sure client won't be reused after it was closed.
def close():
client = self.client
del self.client, client.close
client.close()
client.close = close
return client
@cached_property @cached_property
def db(self): def db(self):
...@@ -950,8 +969,13 @@ class NEOCluster(object): ...@@ -950,8 +969,13 @@ class NEOCluster(object):
state = self.getNodeState(node) state = self.getNodeState(node)
assert state == NodeStates.RUNNING, state assert state == NodeStates.RUNNING, state
def join(self, thread_list, timeout=5): def join(self, thread_list, threaded_node_list=(), timeout=5):
timeout += time.time() timeout += time.time()
for node in threaded_node_list:
try:
thread_list.append(node.poll_thread)
except AttributeError:
pass # node is None or thread is already stopped
while thread_list: while thread_list:
# Map with repr before that threads become unprintable. # Map with repr before that threads become unprintable.
assert time.time() < timeout, map(repr, thread_list) assert time.time() < timeout, map(repr, thread_list)
......
...@@ -852,9 +852,8 @@ class Test(NEOThreadedTest): ...@@ -852,9 +852,8 @@ class Test(NEOThreadedTest):
t1.commit() t1.commit()
self.assertRaises(ConnectionClosed, t2.join) self.assertRaises(ConnectionClosed, t2.join)
# all nodes except clients should exit # all nodes except clients should exit
cluster.join(cluster.master_list cluster.join(cluster.master_list + cluster.storage_list,
+ cluster.storage_list (cluster.admin,))
+ cluster.admin_list)
cluster.stop() # stop and reopen DB to check partition tables cluster.stop() # stop and reopen DB to check partition tables
cluster.start() cluster.start()
pt = cluster.admin.pt pt = cluster.admin.pt
...@@ -2753,8 +2752,8 @@ class Test(NEOThreadedTest): ...@@ -2753,8 +2752,8 @@ class Test(NEOThreadedTest):
@with_cluster(start_cluster=0, master_count=2) @with_cluster(start_cluster=0, master_count=2)
def testIdentifyUnknownMaster(self, cluster): def testIdentifyUnknownMaster(self, cluster):
m0, m1 = cluster.master_list m0, m1 = cluster.master_list
cluster.master_nodes = () with Patch(cluster, master_nodes=()):
m0.resetNode() m0.resetNode()
cluster.start(master_list=(m0,)) cluster.start(master_list=(m0,))
m1.start() m1.start()
self.tic() self.tic()
......
...@@ -47,7 +47,7 @@ get3rdParty(x, '3rdparty/' + x, 'https://lab.nexedi.com/nexedi/erp5' ...@@ -47,7 +47,7 @@ get3rdParty(x, '3rdparty/' + x, 'https://lab.nexedi.com/nexedi/erp5'
zodb_require = ['ZODB3>=3.10dev'] zodb_require = ['ZODB3>=3.10dev']
extras_require = { extras_require = {
'admin': [], 'admin': ['bottle'],
'client': zodb_require, 'client': zodb_require,
'ctl': [], 'ctl': [],
'master': [], 'master': [],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment