pax_global_header 0000666 0000000 0000000 00000000064 12440376027 0014517 g ustar 00root root 0000000 0000000 52 comment=9bd318038fd4d9697f469abd7f09b12a1fa6e85f
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-admin/ 0000775 0000000 0000000 00000000000 12440376027 0022142 5 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-admin/neo/ 0000775 0000000 0000000 00000000000 12440376027 0022723 5 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-admin/neo/admin/ 0000775 0000000 0000000 00000000000 12440376027 0024013 5 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-admin/neo/admin/__init__.py 0000664 0000000 0000000 00000000000 12440376027 0026112 0 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-admin/neo/admin/app.py 0000664 0000000 0000000 00000013657 12440376027 0025161 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.connector import getConnectorHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, \
NodeTypes, NodeStates, Packets
from neo.lib.debug import register as registerLiveDebugger
class Application(object):
"""The storage node application."""
def __init__(self, config):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager(config.getDynamicMasterList())
self.name = config.getCluster()
self.server = config.getBind()
self.master_addresses, connector_name = config.getMasters()
self.connector_handler = getConnectorHandler(connector_name)
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = config.getUUID()
self.primary_master_node = None
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None
self.reset()
registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
self.nm.close()
self.em.close()
del self.__dict__
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
def log(self):
self.em.log()
self.nm.log()
if self.pt is not None:
self.pt.log()
def run(self):
try:
self._run()
except Exception:
logging.exception('Pre-mortem data:')
self.log()
raise
def _run(self):
"""Make sure that the status is sane and start a loop."""
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
# Make a listening port.
handler = AdminEventHandler(self)
self.listening_conn = ListeningConnection(self.em, handler,
addr=self.server, connector=self.connector_handler())
while self.cluster_state != ClusterStates.STOPPING:
self.connectToPrimary()
try:
while True:
self.em.poll(1)
except PrimaryFailure:
logging.error('primary master is down')
self.listening_conn.close()
while not self.em.isIdle():
self.em.poll(1)
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat
the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes
at this stage."""
nm = self.nm
nm.init()
self.cluster_state = None
for address in self.master_addresses:
self.nm.createMaster(address=address)
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN,
self.uuid, self.server)
data = bootstrap.getPrimaryConnection(self.connector_handler)
(node, conn, uuid, num_partitions, num_replicas) = data
nm.update([(node.getType(), node.getAddress(), node.getUUID(),
NodeStates.RUNNING)])
self.master_node = node
self.master_conn = conn
self.uuid = uuid
if self.pt is None:
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
row_list = []
if max_offset == 0:
max_offset = self.pt.getPartitions()
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
except IndexError:
conn.notify(Errors.ProtocolError('invalid partition table offset'))
else:
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-admin/neo/admin/handler.py 0000664 0000000 0000000 00000011607 12440376027 0026007 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2009-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return check_primary_master(lambda self, conn, *args, **kw:
self.app.master_conn.ask(klass(*args, **kw),
conn=conn, msg_id=conn.getPeerId()))
class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
@check_primary_master
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
def askNodeList(self, conn, node_type):
if node_type is None:
node_type = 'all'
node_filter = None
else:
node_filter = lambda n: n.getType() is node_type
logging.info("ask list of %s nodes", node_type)
node_list = self.app.nm.getList(node_filter)
node_information_list = [node.asTuple() for node in node_list ]
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p)
@check_primary_master
def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master
def askPrimary(self, conn):
master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
class MasterEventHandler(EventHandler):
""" This class is just used to dispacth message to right handler"""
def _connectionLost(self, conn):
app = self.app
if app.listening_conn: # if running
assert app.master_conn in (conn, None)
conn.cancelRequests("connection to master lost")
app.reset()
app.uuid = None
raise PrimaryFailure
def connectionFailed(self, conn):
self._connectionLost(conn)
def connectionClosed(self, conn):
self._connectionLost(conn)
def dispatch(self, conn, packet, kw={}):
if 'conn' in kw:
# expected answer
if packet.isResponse():
packet.setId(kw['msg_id'])
kw['conn'].answer(packet)
else:
self.app.request_handler.dispatch(conn, packet, kw)
else:
# unexpected answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet, kw)
def answerClusterState(self, conn, state):
self.app.cluster_state = state
def answerNodeInformation(self, conn):
# XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap)
logging.debug("answerNodeInformation")
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
def notifyNodeInformation(self, conn, node_list):
self.app.nm.update(node_list)
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
# XXX: to be deleted ?