Commit 10d8f0ce authored by Julien Muchembled's avatar Julien Muchembled

WIP: Make admin node a web-app

The goal is to get rid off the neoctl command-line tool, and to manage the
cluster via a web browser, or tools like 'wget'. Then, it will be possible to
provide an web user interface to connect to the underlying DB of any storage
node, usually a SQL client.

The design of admin app is finished:
- it's threaded like client's
- it's a WSGI app

I also hacked a HTTP API as quickly as possible to make all tests pass.

TODO:
- define a better HTTP API
- there's no UI at all yet
- remove all unused packets from the protocol (those that were only used
  between neoctl and admin node)

There's currently no UI implemented.

There are a few dead files (not deleted yet) in case that they contain a few
pieces of useful code:
 neo/neoctl/app.py
 neo/neoctl/handler.py
 neo/scripts/neoctl.py
parent a72ddfb3
......@@ -14,127 +14,201 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import thread
import weakref
from bottle import Bottle, HTTPError, request, response
from copy import deepcopy
from logging import ERROR, INFO
from wsgiref.simple_server import WSGIRequestHandler
from . import handler
from neo.lib import logging
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, \
NodeTypes, NodeStates, Packets
from neo.lib.debug import register as registerLiveDebugger
from neo.lib.protocol import ClusterStates, NodeTypes, NodeStates, Packets
from neo.lib.threaded_app import ThreadedApplication
from neo.lib.util import p64, tidFromTime
class Application(object):
"""The storage node application."""
def raiseNotReady():
raise HTTPError(503, 'Not connected to a primary master')
def __init__(self, config):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager(config.getDynamicMasterList())
for address in config.getMasters():
self.nm.createMaster(address=address)
self.name = config.getCluster()
self.server = config.getBind()
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = config.getUUID()
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None
self.reset()
registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
self.nm.close()
self.em.close()
del self.__dict__
class ObjectBottle(object):
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
def __init__(self, weakref=False, *args, **kw):
self._app = Bottle(*args, **kw)
self._weakref = weakref
def log(self):
self.em.log()
self.nm.log()
if self.pt is not None:
self.pt.log()
def __getattr__(self, name):
return getattr(self._app, name)
def run(self):
try:
self._run()
except Exception:
logging.exception('Pre-mortem data:')
self.log()
logging.flush()
raise
def __get__(self, obj, cls):
if obj is None: return self
app = obj.bottle = deepcopy(self._app)
if self._weakref:
obj = weakref.ref(obj)
app.install(lambda f: lambda *a, **k: f(obj(), *a, **k))
else:
app.install(lambda f: lambda *a, **k: f(obj, *a, **k))
return app
class RequestHandler(WSGIRequestHandler):
def _log(self, level, format, *args):
logging.log(level, "%s %s", self.client_address[0], format % args)
def log_error(self, *args):
self._log(ERROR, *args)
def log_message(self, *args):
self._log(INFO, *args)
def _run(self):
"""Make sure that the status is sane and start a loop."""
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
# Make a listening port.
handler = AdminEventHandler(self)
self.listening_conn = ListeningConnection(self.em, handler, self.server)
class Application(ThreadedApplication):
"""The storage node application."""
bottle = ObjectBottle(weakref=True)
cluster_state = None
def __init__(self, config):
super(Application, self).__init__(config.getMasters(),
config.getCluster(),
config.getDynamicMasterList())
self.master_event_handler = handler.MasterEventHandler(self)
self.notifications_handler = handler.MasterNotificationsHandler(self)
self.primary_handler = handler.PrimaryAnswersHandler(self)
def _run(self):
"""Make sure that the status is sane and start a loop."""
try:
poll = self.em.poll
while self.cluster_state != ClusterStates.STOPPING:
self.connectToPrimary()
try:
while True:
self.em.poll(1)
poll(1)
except PrimaryFailure:
self.nm.log()
logging.error('primary master is down')
self.listening_conn.close()
finally:
self.master_conn = None
while not self.em.isIdle():
self.em.poll(1)
poll(1)
finally:
self.interrupt_main()
interrupt_main = staticmethod(thread.interrupt_main)
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat
the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes
at this stage.
"""
self.master_node = None
self.uuid = None
self.cluster_state = None
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN,
self.uuid, self.server)
data = bootstrap.getPrimaryConnection()
(node, conn, uuid, num_partitions, num_replicas) = data
self.master_node = node
self.master_conn = conn
self.uuid = uuid
if self.pt is None:
bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN)
(self.master_node, self.master_conn, self.uuid,
num_partitions, num_replicas) = bootstrap.getPrimaryConnection()
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
self.master_conn.setHandler(self.notifications_handler)
self.master_conn.convertToMT(self.dispatcher)
def _askPrimary(self, packet, **kw):
""" Send a request to the primary master and process its answer """
return self._ask(self._getMasterConnection(), packet,
handler=self.primary_handler, **kw)
def _getMasterConnection(self):
conn = self.master_conn
if conn is None or conn.isClosed():
raiseNotReady()
return conn
def serve(self, **kw):
self.start()
self.bottle.run(server='wsgiref', handler_class=RequestHandler,
quiet=1, **kw)
def asTID(self, value):
if '.' in value:
return tidFromTime(float(value))
return p64(int(value, 0))
@bottle.route('/getClusterState')
def getClusterState(self):
if self.cluster_state is not None:
return str(self.cluster_state)
def _setClusterState(self, state):
self._askPrimary(Packets.SetClusterState(state))
@bottle.route('/setClusterState')
def setClusterState(self):
self._setClusterState(getattr(ClusterStates, request.query.state))
@bottle.route('/startCluster')
def startCluster(self):
self._setClusterState(ClusterStates.VERIFYING)
@bottle.route('/enableStorageList')
def enableStorageList(self):
node_list = request.query.node_list
self._askPrimary(Packets.AddPendingNodes(map(int,
request.query.node_list.split(',')) if node_list else ()))
@bottle.route('/tweakPartitionTable')
def tweakPartitionTable(self):
node_list = request.query.node_list
self._askPrimary(Packets.TweakPartitionTable(map(int,
request.query.node_list.split(',')) if node_list else ()))
@bottle.route('/getNodeList')
def getNodeList(self):
node_type = request.query.node_type
if node_type:
node_type = getattr(NodeTypes, node_type)
node_filter = lambda node: node.getType() is node_type
else:
node_filter = None
node_list = []
self._getMasterConnection()
for node in self.nm.getList(node_filter):
node_type, address, uuid, state = node = node.asTuple()
node_list.append((str(node_type), address, uuid, str(state)))
response.content_type = 'application/json'
return json.dumps(node_list)
@bottle.route('/getPrimary')
def getPrimary(self):
return str(getattr(self.master_node, 'getUUID', raiseNotReady)())
def _setNodeState(self, node, state):
self._askPrimary(Packets.SetNodeState(node, state))
@bottle.route('/killNode')
def killNode(self):
self._setNodeState(int(request.query.node), NodeStates.UNKNOWN)
@bottle.route('/dropNode')
def killNode(self):
self._setNodeState(int(request.query.node), NodeStates.DOWN)
@bottle.route('/getPartitionRowList')
def getPartitionRowList(self):
min_offset = int(request.query.min_offset)
max_offset = int(request.query.max_offset)
uuid = request.query.node
uuid = int(uuid) if uuid else None
row_list = []
if max_offset == 0:
max_offset = self.pt.getPartitions()
......@@ -144,11 +218,30 @@ class Application(object):
try:
for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
row.append((cell.getUUID(), str(cell.getState())))
except TypeError:
pass
row_list.append((offset, row))
except IndexError:
conn.notify(Errors.ProtocolError('invalid partition table offset'))
raise HTTPError(400, 'invalid partition table offset')
response.content_type = 'application/json'
return json.dumps((self.pt.getID(), row_list))
@bottle.route('/checkReplicas')
def checkReplicas(self):
partition_dict = {}
for partition in request.query.pt.split(','):
partition, source = partition.split(':')
source = int(source) if source else None
if partition:
partition_dict[int(partition)] = source
elif partition_dict:
raise HTTPError(400)
else:
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
self._getMasterConnection() # just for correct error handling
partition_dict = dict.fromkeys(xrange(self.pt.getPartitions()),
source)
max_tid = request.query.max_tid
self._askPrimary(Packets.CheckReplicas(partition_dict,
self.asTID(request.query.min_tid),
self.asTID(max_tid) if max_tid else None))
......@@ -14,73 +14,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from bottle import HTTPError
from neo.lib import logging
from neo.lib.handler import AnswerBaseHandler, EventHandler, MTEventHandler
from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return check_primary_master(lambda self, conn, *args, **kw:
self.app.master_conn.ask(klass(*args, **kw),
conn=conn, msg_id=conn.getPeerId()))
class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
@check_primary_master
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
def askNodeList(self, conn, node_type):
if node_type is None:
node_type = 'all'
node_filter = None
else:
node_filter = lambda n: n.getType() is node_type
logging.info("ask list of %s nodes", node_type)
node_list = self.app.nm.getList(node_filter)
node_information_list = [node.asTuple() for node in node_list ]
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p)
@check_primary_master
def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master
def askPrimary(self, conn):
master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
class MasterEventHandler(EventHandler):
""" This class is just used to dispacth message to right handler"""
def _connectionLost(self, conn):
app = self.app
if app.listening_conn: # if running
assert app.master_conn in (conn, None)
conn.cancelRequests("connection to master lost")
app.reset()
app.uuid = None
self.app.nm.getByUUID(conn.getUUID()).setUnknown()
if self.app.master_conn is not None:
assert self.app.master_conn is conn
raise PrimaryFailure
def connectionFailed(self, conn):
......@@ -89,18 +34,6 @@ class MasterEventHandler(EventHandler):
def connectionClosed(self, conn):
self._connectionLost(conn)
def dispatch(self, conn, packet, kw={}):
if 'conn' in kw:
# expected answer
if packet.isResponse():
packet.setId(kw['msg_id'])
kw['conn'].answer(packet)
else:
self.app.request_handler.dispatch(conn, packet, kw)
else:
# unexpected answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet, kw)
def answerClusterState(self, conn, state):
self.app.cluster_state = state
......@@ -109,23 +42,22 @@ class MasterEventHandler(EventHandler):
# implemented for factorize code (as done for bootstrap)
logging.debug("answerNodeInformation")
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list):
self.app.nm.update(node_list)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
class MasterNotificationsHandler(MasterEventHandler, MTEventHandler):
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
notifyClusterInformation = MasterEventHandler.answerClusterState.im_func
sendPartitionTable = MasterEventHandler.answerPartitionTable.im_func
def notifyNodeInformation(self, conn, node_list):
self.app.nm.update(node_list)
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
class MasterRequestEventHandler(EventHandler):
class PrimaryAnswersHandler(AnswerBaseHandler):
""" This class handle all answer from primary master node"""
# XXX: to be deleted ?
def protocolError(self, conn, message):
raise HTTPError(400, message)
......@@ -661,6 +661,11 @@ class ClientConnection(Connection):
handler.connectionStarted(self)
self._connect()
def convertToMT(self, dispatcher):
assert self.__class__ is ClientConnection, self
self.__class__ = MTClientConnection
self._initMT(dispatcher)
def _connect(self):
try:
self.connector.makeClientConnection()
......@@ -762,11 +767,14 @@ class MTClientConnection(ClientConnection):
return wrapper
def __init__(self, *args, **kwargs):
self.lock = lock = RLock()
self.dispatcher = kwargs.pop('dispatcher')
with lock:
self._initMT(kwargs.pop('dispatcher'))
with self.lock:
super(MTClientConnection, self).__init__(*args, **kwargs)
def _initMT(self, dispatcher):
self.lock = RLock()
self.dispatcher = dispatcher
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None,
queue=None, **kw):
with self.lock:
......
......@@ -14,165 +14,100 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
from neo.lib.node import NodeManager
from .handler import CommandEventHandler
import json, socket
from urllib import URLopener, urlencode
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, NodeStates, \
ZERO_TID
from neo.lib.util import u64
class NotReadyException(Exception):
pass
class NeoCTL(object):
connection = None
connected = False
def __init__(self, address):
self.nm = nm = NodeManager()
self.server = nm.createAdmin(address=address)
self.em = EventManager()
self.handler = CommandEventHandler(self)
self.response_queue = []
def close(self):
self.em.close()
self.nm.close()
del self.__dict__
def __getConnection(self):
if not self.connected:
self.connection = ClientConnection(self.em, self.handler,
self.server)
# Never delay reconnection to master. This speeds up unit tests
# and it should not change anything for normal use.
self.connection.setReconnectionNoDelay()
while not self.connected:
self.em.poll(1)
if self.connection is None:
raise NotReadyException('not connected')
return self.connection
def __ask(self, packet):
# TODO: make thread-safe
connection = self.__getConnection()
connection.ask(packet)
response_queue = self.response_queue
assert len(response_queue) == 0
while self.connected:
self.em.poll(1)
if response_queue:
break
else:
raise NotReadyException, 'Connection closed'
response = response_queue.pop()
if response[0] == Packets.Error and \
response[1] == ErrorCodes.NOT_READY:
raise NotReadyException(response[2])
return response
host, port = address
if ":" in host:
host = "[%s]" % host
self.base_url = "http://%s:%s/" % (host, port)
self._open = URLopener().open
def _ask(self, path, **kw):
if kw:
path += "?" + urlencode(sorted(x for x in kw.iteritems()
if '' is not x[1] is not None))
try:
return self._open(self.base_url + path).read()
except IOError, e:
e0 = e[0]
if e0 == 'socket error' or e0 == 'http error' and e[1] == 503:
raise NotReadyException
raise
def enableStorageList(self, uuid_list):
"""
Put all given storage nodes in "running" state.
"""
packet = Packets.AddPendingNodes(uuid_list)
response = self.__ask(packet)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
self._ask('enableStorageList', node_list=','.join(map(str, uuid_list)))
def tweakPartitionTable(self, uuid_list=()):
response = self.__ask(Packets.TweakPartitionTable(uuid_list))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
self._ask('tweakPartitionTable', node_list=','.join(map(str, uuid_list)))
def setClusterState(self, state):
"""
Set cluster state.
"""
packet = Packets.SetClusterState(state)
response = self.__ask(packet)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def _setNodeState(self, node, state):
"""
Kill node, or remove it permanently
"""
response = self.__ask(Packets.SetNodeState(node, state))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
self._ask('setClusterState', state=state)
def getClusterState(self):
"""
Get cluster state.
"""
packet = Packets.AskClusterState()
response = self.__ask(packet)
if response[0] != Packets.AnswerClusterState:
raise RuntimeError(response)
return response[1]
def getLastIds(self):
response = self.__ask(Packets.AskLastIDs())
if response[0] != Packets.AnswerLastIDs:
raise RuntimeError(response)
return response[1:]
def getLastTransaction(self):
response = self.__ask(Packets.AskLastTransaction())
if response[0] != Packets.AnswerLastTransaction:
raise RuntimeError(response)
return response[1]
state = self._ask('getClusterState')
if state:
return getattr(ClusterStates, state)
def getNodeList(self, node_type=None):
"""
Get a list of nodes, filtering with given type.
"""
packet = Packets.AskNodeList(node_type)
response = self.__ask(packet)
if response[0] != Packets.AnswerNodeList:
raise RuntimeError(response)
return response[1] # node_list
node_list = json.loads(self._ask('getNodeList', node_type=node_type))
return ((getattr(NodeTypes, node_type), address and tuple(address),
uuid, getattr(NodeStates, state))
for node_type, address, uuid, state in node_list)
def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
"""
Get a list of partition rows, bounded by min & max and involving
given node.
"""
packet = Packets.AskPartitionList(min_offset, max_offset, node)
response = self.__ask(packet)
if response[0] != Packets.AnswerPartitionList:
raise RuntimeError(response)
return response[1:3] # ptid, row_list
ptid, row_list = json.loads(self._ask('getPartitionRowList',
min_offset=min_offset, max_offset=max_offset, node=node))
return ptid, [(offset, [(node, getattr(CellStates, state))
for node, state in row])
for offset, row in row_list]
def startCluster(self):
"""
Set cluster into "verifying" state.
"""
return self.setClusterState(ClusterStates.VERIFYING)
self._ask('startCluster')
def killNode(self, node):
return self._setNodeState(node, NodeStates.UNKNOWN)
self._ask('killNode', node=node)
def dropNode(self, node):
return self._setNodeState(node, NodeStates.DOWN)
self._ask('dropNode', node=node)
def getPrimary(self):
"""
Return the primary master UUID.
"""
packet = Packets.AskPrimary()
response = self.__ask(packet)
if response[0] != Packets.AnswerPrimary:
raise RuntimeError(response)
return response[1]
def checkReplicas(self, *args):
response = self.__ask(Packets.CheckReplicas(*args))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
return int(self._ask('getPrimary'))
def checkReplicas(self, partition_dict, min_tid=ZERO_TID, max_tid=None):
kw = {'pt': ','.join('%s:%s' % (k, '' if v is None else v)
for k, v in partition_dict.iteritems())}
if max_tid is not None:
kw['max_tid'] = u64(max_tid)
self._ask('checkReplicas', min_tid=u64(min_tid), **kw)
......@@ -38,6 +38,7 @@ defaults = dict(
masters = '127.0.0.1:10000',
)
def main(args=None):
# build configuration dict from command line options
(options, args) = parser.parse_args(args=args)
......@@ -60,6 +61,5 @@ def main(args=None):
# and then, load and run the application
from neo.admin.app import Application
app = Application(config)
app.run()
host, port = config.getBind()
Application(config).serve(host=host, port=port)
......@@ -359,7 +359,7 @@ class NEOCluster(object):
pending_count += 1
if pending_count == target[0]:
neoctl.startCluster()
except (NotReadyException, RuntimeError):
except (NotReadyException, IOError):
pass
if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster')
......
......@@ -48,7 +48,7 @@ class MasterTests(NEOFunctionalTest):
break
self.neo.neoctl.killNode(uuid)
self.neo.expectDead(master)
self.assertRaises(RuntimeError, self.neo.neoctl.killNode, primary_uuid)
self.assertRaises(IOError, self.neo.neoctl.killNode, primary_uuid)
def testStoppingPrimaryWithTwoSecondaries(self):
# Wait for masters to stabilize
......
......@@ -176,7 +176,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectOudatedCells(2)
self.neo.expectClusterRunning()
self.assertRaises(RuntimeError, self.neo.neoctl.killNode,
self.assertRaises(IOError, self.neo.neoctl.killNode,
started[1].getUUID())
started[1].stop()
# Cluster not operational anymore. Only cells of second storage that
......@@ -327,7 +327,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectStorageNotKnown(started[0])
self.neo.expectAssignedCells(started[0], 0)
self.neo.expectAssignedCells(started[1], 10)
self.assertRaises(RuntimeError, self.neo.neoctl.dropNode,
self.assertRaises(IOError, self.neo.neoctl.dropNode,
started[1].getUUID())
self.neo.expectClusterRunning()
......
......@@ -21,11 +21,13 @@ import traceback
from collections import deque
from ConfigParser import SafeConfigParser
from contextlib import contextmanager
from cStringIO import StringIO
from itertools import count
from functools import wraps
from urllib import splitquery
from zlib import decompress
from mock import Mock
import transaction, ZODB
import bottle, transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app
from neo.client import Storage
......@@ -163,13 +165,12 @@ class Serialized(object):
next_lock.release()
cls._sched_lock.acquire()
def __init__(self, app, busy=True):
def __init__(self, app):
self._epoll = app.em.epoll
app.em.epoll = self
# XXX: It may have been initialized before the SimpleQueue is patched.
thread_container = getattr(app, '_thread_container', None)
thread_container is None or thread_container.__init__()
if busy:
self._busy.add(self) # block tic until app waits for polling
def __getattr__(self, attr):
......@@ -208,20 +209,6 @@ class Serialized(object):
cls._epoll.unregister(fd)
self._release_next()
class TestSerialized(Serialized):
def __init__(*args):
Serialized.__init__(busy=False, *args)
def poll(self, timeout):
if timeout:
while 1:
r = self._epoll.poll(0)
if r:
return r
Serialized.tic(step=1)
return self._epoll.poll(timeout)
class Node(object):
......@@ -237,6 +224,24 @@ class Node(object):
def filterConnection(self, *peers):
return ConnectionFilter(self.getConnectionList(*peers))
def run(self):
try:
super(Node, self).run()
finally:
self._afterRun()
self.em.epoll.exit()
def _afterRun(self):
logging.debug('stopping %r', self)
try:
self.listening_conn.close()
except AttributeError:
pass
def start(self):
isinstance(self.em.epoll, Serialized) or Serialized(self)
super(Node, self).start()
class ServerNode(Node):
_server_class_dict = {}
......@@ -300,33 +305,40 @@ class ServerNode(Node):
self.__dict__.clear()
self.__init__(**kw)
def start(self):
Serialized(self)
threading.Thread.start(self)
def run(self):
try:
super(ServerNode, self).run()
finally:
self._afterRun()
logging.debug('stopping %r', self)
self.em.epoll.exit()
def _afterRun(self):
try:
self.listening_conn.close()
except AttributeError:
pass
def getListeningAddress(self):
try:
return self.listening_conn.getAddress()
except AttributeError:
raise ConnectorConnectionRefusedException
class AdminApplication(ServerNode, neo.admin.app.Application):
class ThreadedNode(Node):
def __init__(self, *args, **kw):
super(ThreadedNode, self).__init__(*args, **kw)
self.poll_thread.node_name = getattr(self, 'node_name', self.name)
def _afterRun(self):
pass
class AdminApplication(ServerNode, ThreadedNode, neo.admin.app.Application):
def interrupt_main(self):
pass
def start(self):
super(AdminApplication, self).start()
return
host, port = BIND
l = threading.Lock()
l.acquire()
self.start = l.release
try:
self.run = lambda: self.serve(host=host, port=port)
threading.Thread.start(self)
l.acquire()
finally:
del self.start, self.run
class MasterApplication(ServerNode, neo.master.app.Application):
pass
......@@ -368,21 +380,7 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
(r,), = self.dm.query("SELECT COUNT(*) FROM " + table)
return r
class ClientApplication(Node, neo.client.app.Application):
def __init__(self, master_nodes, name, **kw):
super(ClientApplication, self).__init__(master_nodes, name, **kw)
self.poll_thread.node_name = name
def run(self):
try:
super(ClientApplication, self).run()
finally:
self.em.epoll.exit()
def start(self):
isinstance(self.em.epoll, Serialized) or Serialized(self)
super(ClientApplication, self).start()
class ClientApplication(ThreadedNode, neo.client.app.Application):
def getConnectionList(self, *peers):
for peer in peers:
......@@ -394,10 +392,25 @@ class ClientApplication(Node, neo.client.app.Application):
yield conn
class NeoCTL(neo.neoctl.app.NeoCTL):
# Bypass HTTP layer
def __init__(self, *args, **kw):
super(NeoCTL, self).__init__(*args, **kw)
TestSerialized(self)
base_url = '/'
def __init__(self, admin):
self._admin = weakref.proxy(admin)
def _open(self, path):
environ = {'REQUEST_METHOD': 'GET'}
environ['PATH_INFO'], environ['QUERY_STRING'] = splitquery(path)
bottle.request.bind(environ)
route, args = self._admin.bottle.match(environ)
try:
result = route.call(*args)
if isinstance(result, Exception):
raise result
except bottle.HTTPError, e:
raise IOError('http error', e.status_code)
return StringIO(result or '')
class LoggerThreadName(str):
......@@ -603,7 +616,7 @@ class NEOCluster(object):
self.admin_list = [AdminApplication(**kw)]
self.client = ClientApplication(name=self.name,
master_nodes=self.master_nodes, compress=compress)
self.neoctl = NeoCTL(self.admin.getVirtualAddress())
self.neoctl = NeoCTL(self.admin)
def __repr__(self):
return "<%s(%s) at 0x%x>" % (self.__class__.__name__,
......@@ -638,7 +651,6 @@ class NEOCluster(object):
node.resetNode(**kw)
self.client = ClientApplication(name=self.name,
master_nodes=self.master_nodes)
self.neoctl = NeoCTL(self.admin.getVirtualAddress())
def start(self, storage_list=None, fast_startup=False):
self._patch()
......@@ -663,7 +675,7 @@ class NEOCluster(object):
def startCluster(self):
try:
self.neoctl.startCluster()
except RuntimeError:
except IOError:
Serialized.tic()
if self.neoctl.getClusterState() not in (
ClusterStates.BACKINGUP,
......@@ -696,9 +708,12 @@ class NEOCluster(object):
def stop(self):
logging.debug("stopping %s", self)
self.__dict__.pop('_db', self.client).close()
node_list = self.admin_list + self.storage_list + self.master_list
node_list = self.storage_list + self.master_list
for node in node_list:
node.em.wakeup(True)
for node in self.admin_list:
node.close()
node_list.append(node.poll_thread)
node_list.append(self.client.poll_thread)
self.join(node_list)
logging.debug("stopped %s", self)
......@@ -747,7 +762,6 @@ class NEOCluster(object):
def __del__(self, __print_exc=traceback.print_exc):
try:
self.neoctl.close()
for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'):
node.close()
......
......@@ -534,9 +534,10 @@ class Test(NEOThreadedTest):
# tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING)
# all nodes except clients should exit
cluster.join(cluster.master_list
+ cluster.storage_list
+ cluster.admin_list)
node_list = cluster.master_list + cluster.storage_list
for node in cluster.admin_list:
node_list.append(node.poll_thread)
cluster.join(node_list)
finally:
cluster.stop()
cluster.reset() # reopen DB to check partition tables
......
......@@ -26,7 +26,7 @@ if not os.path.exists('mock.py'):
zodb_require = ['ZODB3>=3.10', 'ZODB3<3.11dev']
extras_require = {
'admin': [],
'admin': ['bottle'],
'client': zodb_require,
'ctl': [],
'master': [],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment