pax_global_header 0000666 0000000 0000000 00000000064 13461166417 0014523 g ustar 00root root 0000000 0000000 52 comment=6332112cba979dfd29b40fe9f98d097911fde696
neoppod-v1.12-neo-neoctl/ 0000775 0000000 0000000 00000000000 13461166417 0015341 5 ustar 00root root 0000000 0000000 neoppod-v1.12-neo-neoctl/neo/ 0000775 0000000 0000000 00000000000 13461166417 0016122 5 ustar 00root root 0000000 0000000 neoppod-v1.12-neo-neoctl/neo/neoctl/ 0000775 0000000 0000000 00000000000 13461166417 0017406 5 ustar 00root root 0000000 0000000 neoppod-v1.12-neo-neoctl/neo/neoctl/__init__.py 0000664 0000000 0000000 00000000000 13461166417 0021505 0 ustar 00root root 0000000 0000000 neoppod-v1.12-neo-neoctl/neo/neoctl/app.py 0000664 0000000 0000000 00000032123 13461166417 0020541 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sys
from .neoctl import NeoCTL, NotReadyException
from neo.lib.node import NodeManager
from neo.lib.pt import PartitionTable
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.protocol import uuid_str, formatNodeList, \
ClusterStates, NodeStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
action_dict = {
'print': {
'ids': 'getLastIds',
'pt': 'getPartitionRowList',
'node': 'getNodeList',
'cluster': 'getClusterState',
'primary': 'getPrimary',
},
'set': {
'cluster': 'setClusterState',
'replicas': 'setNumReplicas',
},
'check': 'checkReplicas',
'start': 'startCluster',
'add': 'enableStorageList',
'tweak': 'tweakPartitionTable',
'drop': 'dropNode',
'kill': 'killNode',
'prune_orphan': 'pruneOrphan',
'truncate': 'truncate',
'flush_log': 'flushLog',
}
uuid_int = (lambda ns: lambda uuid:
(ns[uuid[0]] << 24) + int(uuid[1:])
)({str(k)[0]: v for k, v in UUID_NAMESPACES.iteritems()})
class dummy_app:
id_timestamp = uuid = 0
class TerminalNeoCTL(object):
def __init__(self, *args, **kw):
self.neoctl = NeoCTL(*args, **kw)
def __del__(self):
self.neoctl.close()
# Utility methods (could be functions)
def asNodeType(self, value):
return getattr(NodeTypes, value.upper())
def asClusterState(self, value):
return getattr(ClusterStates, value.upper())
def asTID(self, value):
if '.' in value:
return tidFromTime(float(value))
return p64(int(value, 0))
asNode = staticmethod(uuid_int)
def formatPartitionTable(self, row_list):
nm = NodeManager()
nm.update(dummy_app, 1,
self.neoctl.getNodeList(node_type=NodeTypes.STORAGE))
pt = object.__new__(PartitionTable)
pt._load(None, None, row_list, nm.getByUUID)
pt.addNodeList(nm.getByStateList(NodeStates.RUNNING))
return '\n'.join(line[4:] for line in pt._format())
def formatRowList(self, row_list):
return '\n'.join('%03d |%s' % (offset,
''.join(' %s - %s |' % (uuid_str(uuid), state)
for (uuid, state) in cell_list))
for (offset, cell_list) in row_list)
# Actual actions
def getLastIds(self, params):
"""
Get last ids.
"""
assert not params
ptid, backup_tid, truncate_tid = self.neoctl.getRecovery()
if backup_tid:
ltid = self.neoctl.getLastTransaction()
r = "backup_tid = 0x%x (%s)" % (u64(backup_tid),
timeStringFromTID(backup_tid))
else:
loid, ltid = self.neoctl.getLastIds()
r = "last_oid = 0x%x" % (u64(loid))
return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \
(u64(ltid), timeStringFromTID(ltid), ptid)
def getPartitionRowList(self, params):
"""
Get a list of partition rows, bounded by min & max and involving
given node.
Parameters: [min [max [node]]]
min: offset of the first row to fetch (starts at 0)
max: offset of the last row to fetch (0 for no limit)
node: filters the list of nodes serving a line to this node
"""
params = params + [0, 0, None][len(params):]
min_offset, max_offset, node = params
min_offset = int(min_offset)
max_offset = int(max_offset)
if node is not None:
node = self.asNode(node)
ptid, num_replicas, row_list = self.neoctl.getPartitionRowList(
min_offset=min_offset, max_offset=max_offset, node=node)
return '# ptid: %s, replicas: %s\n%s' % (ptid, num_replicas,
self.formatRowList(enumerate(row_list, min_offset))
if min_offset or max_offset else
self.formatPartitionTable(row_list))
def getNodeList(self, params):
"""
Get a list of nodes, filtering with given type.
Parameters: [type]
type: type of node to display
"""
assert len(params) < 2
if len(params):
node_type = self.asNodeType(params[0])
else:
node_type = None
node_list = self.neoctl.getNodeList(node_type=node_type)
return '\n'.join(formatNodeList(node_list)) or 'Empty list!'
def getClusterState(self, params):
"""
Get cluster state.
"""
assert len(params) == 0
return str(self.neoctl.getClusterState())
def setClusterState(self, params):
"""
Set cluster state.
Parameters: state
state: state to put the cluster in
"""
assert len(params) == 1
return self.neoctl.setClusterState(self.asClusterState(params[0]))
def setNumReplicas(self, params):
"""
Set number of replicas.
Parameters: nr
nr: positive number (0 means no redundancy)
"""
assert len(params) == 1
nr = int(params[0])
if nr < 0:
sys.exit('invalid number of replicas')
return self.neoctl.setNumReplicas(nr)
def startCluster(self, params):
"""
Starts cluster operation after a startup.
Equivalent to:
set cluster verifying
"""
assert len(params) == 0
return self.neoctl.startCluster()
def _getStorageList(self, params):
if len(params) == 1 and params[0] == 'all':
node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
return [node[2] for node in node_list]
return map(self.asNode, params)
def enableStorageList(self, params):
"""
Enable cluster to make use of pending storages.
Parameters: node [node [...]]
node: if "all", add all pending storage nodes,
otherwise, the list of storage nodes to enable.
"""
return self.neoctl.enableStorageList(self._getStorageList(params))
def tweakPartitionTable(self, params):
"""
Optimize partition table.
No change is done to the specified/down storage nodes and they don't
count as replicas. The purpose of listing nodes is usually to drop
them once the data is replicated to other nodes.
Parameters: [-n] [node [...]]
-n: dry run
"""
dry_run = params[0] == '-n'
changed, row_list = self.neoctl.tweakPartitionTable(
map(self.asNode, params[dry_run:]), dry_run)
if changed:
return self.formatPartitionTable(row_list)
return 'No change done.'
def killNode(self, params):
"""
Kill redundant nodes (either a storage or a secondary master).
Parameters: node
"""
return self.neoctl.killNode(self.asNode(*params))
def dropNode(self, params):
"""
Remove storage node permanently.
Parameters: node
"""
return self.neoctl.dropNode(self.asNode(*params))
def getPrimary(self, params):
"""
Get primary master node.
"""
return uuid_str(self.neoctl.getPrimary())
def pruneOrphan(self, params):
"""
Fix database by deleting unreferenced raw data
This can take a long time.
Parameters: dry_run node [node [...]]
dry_run: 0 or 1
node: if "all", ask all connected storage nodes to repair,
otherwise, only the given list of storage nodes.
"""
dry_run = "01".index(params.pop(0))
return self.neoctl.repair(self._getStorageList(params), dry_run)
def truncate(self, params):
"""
Truncate the database at the given tid.
The cluster must be in RUNNING state, without any pending transaction.
This causes the cluster to go back in RECOVERING state, waiting all
nodes to be pending (do not use 'start' command unless you're sure
the missing nodes don't need to be truncated).
Parameters: tid
"""
self.neoctl.truncate(self.asTID(*params))
def checkReplicas(self, params):
"""
Test whether partitions have corrupted metadata
Any corrupted cell is put in CORRUPTED state, possibly make the
cluster non operational.
Parameters: [partition]:[reference] ... [min_tid [max_tid]]
reference: node id of a storage with known good data
If not given, and if the cluster is in backup mode, an upstream
cell is automatically taken as reference.
"""
partition_dict = {}
params = iter(params)
min_tid = ZERO_TID
max_tid = None
for p in params:
try:
partition, source = p.split(':')
except ValueError:
min_tid = self.asTID(p)
try:
max_tid = self.asTID(params.next())
except StopIteration:
pass
break
source = self.asNode(source) if source else None
if partition:
partition_dict[int(partition)] = source
else:
assert not partition_dict
np = len(self.neoctl.getPartitionRowList()[1])
partition_dict = dict.fromkeys(xrange(np), source)
self.neoctl.checkReplicas(partition_dict, min_tid, max_tid)
def flushLog(self, params):
"""
Ask all nodes in the cluster to flush their logs.
If there are backup clusters, only their primary masters flush.
"""
assert not params
self.neoctl.flushLog()
class Application(object):
"""The storage node application."""
def __init__(self, *args, **kw):
self.neoctl = TerminalNeoCTL(*args, **kw)
def execute(self, args):
"""Execute the command given."""
# print node type : print list of node of the given type
# (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...)
# set node uuid state [1|0] : set the node for the given uuid to the
# state (RUNNING, DOWN...) and modify the partition if asked
# set cluster name [shutdown|operational] : either shutdown the
# cluster or mark it as operational
if not args:
return self.usage()
current_action = action_dict
level = 0
try:
while level < len(args) and \
isinstance(current_action, dict):
current_action = current_action[args[level]]
level += 1
except KeyError:
sys.exit('invalid command: ' + ' '.join(args))
action = getattr(self.neoctl, current_action)
try:
return action(args[level:])
except NotReadyException, message:
return 'ERROR: %s' % (message, )
def _usage(self, action_dict, level=0):
result = []
append = result.append
sub_level = level + 1
for name, action in action_dict.iteritems():
append('%s%s' % (' ' * level, name))
if isinstance(action, dict):
append(self._usage(action, level=sub_level))
else:
real_action = getattr(self.neoctl, action, None)
if real_action is None:
continue
docstring = getattr(real_action, '__doc__', None)
if docstring is None:
docstring = '(no docstring)'
docstring_line_list = docstring.split('\n')
# Strip empty lines at beginning & end of line list
for end in (0, -1):
while len(docstring_line_list) \
and docstring_line_list[end] == '':
docstring_line_list.pop(end)
# Get the indentation of first line, to preserve other lines
# relative indentation.
first_line = docstring_line_list[0]
base_indentation = len(first_line) - len(first_line.lstrip())
result.extend([(' ' * sub_level) + x[base_indentation:] \
for x in docstring_line_list])
return '\n'.join(result)
def usage(self):
output_list = ('Available commands:', self._usage(action_dict),
"TID arguments can be either integers or timestamps as floats,"
" e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'"
" for 2012-01-01 12:34:56 UTC")
return '\n'.join(output_list)
neoppod-v1.12-neo-neoctl/neo/neoctl/handler.py 0000664 0000000 0000000 00000004510 13461166417 0021375 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sys
from neo.lib.handler import EventHandler
from neo.lib.protocol import ErrorCodes, Packets
class CommandEventHandler(EventHandler):
""" Base handler for command """
def connectionCompleted(self, conn):
# connected to admin node
self.app.connected = True
super(CommandEventHandler, self).connectionCompleted(conn)
def __disconnected(self):
app = self.app
app.connected = False
app.connection = None
def __respond(self, response):
self.app.response_queue.append(response)
def connectionClosed(self, conn):
super(CommandEventHandler, self).connectionClosed(conn)
self.__disconnected()
def connectionFailed(self, conn):
super(CommandEventHandler, self).connectionFailed(conn)
self.__disconnected()
def ack(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.ACK, msg))
def denied(self, conn, msg):
sys.exit(msg)
def notReady(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.NOT_READY, msg))
def __answer(packet_type):
def answer(self, conn, *args):
self.__respond((packet_type, ) + args)
return answer
answerPartitionList = __answer(Packets.AnswerPartitionList)
answerNodeList = __answer(Packets.AnswerNodeList)
answerClusterState = __answer(Packets.AnswerClusterState)
answerPrimary = __answer(Packets.AnswerPrimary)
answerLastIDs = __answer(Packets.AnswerLastIDs)
answerLastTransaction = __answer(Packets.AnswerLastTransaction)
answerRecovery = __answer(Packets.AnswerRecovery)
answerTweakPartitionTable = __answer(Packets.AnswerTweakPartitionTable)
neoppod-v1.12-neo-neoctl/neo/neoctl/neoctl.py 0000664 0000000 0000000 00000017106 13461166417 0021251 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import argparse
from neo.lib import util
from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
from .handler import CommandEventHandler
class NotReadyException(Exception):
pass
@buildOptionParser
class NeoCTL(BaseApplication):
connection = None
connected = False
@classmethod
def _buildOptionParser(cls):
# XXX: Use argparse sub-commands.
parser = cls.option_parser
parser.description = "NEO Control node"
parser('a', 'address', default='127.0.0.1:9999',
parse=lambda x: util.parseNodeAddress(x, 9999),
help="address of an admin node")
parser.argument('cmd', nargs=argparse.REMAINDER,
help="command to execute; if not supplied,"
" the list of available commands is displayed")
def __init__(self, address, **kw):
super(NeoCTL, self).__init__(**kw)
self.server = self.nm.createAdmin(address=address)
self.handler = CommandEventHandler(self)
self.response_queue = []
def __getConnection(self):
if not self.connected:
self.connection = ClientConnection(self, self.handler, self.server)
# Never delay reconnection to master. This speeds up unit tests
# and it should not change anything for normal use.
try:
self.connection.setReconnectionNoDelay()
except ConnectionClosed:
self.connection = None
while not self.connected:
if self.connection is None:
raise NotReadyException('not connected')
self.em.poll(1)
return self.connection
def __ask(self, packet):
# TODO: make thread-safe
connection = self.__getConnection()
connection.ask(packet)
response_queue = self.response_queue
assert len(response_queue) == 0
while self.connected:
self.em.poll(1)
if response_queue:
break
else:
raise NotReadyException, 'Connection closed'
response = response_queue.pop()
if response[0] == Packets.Error and \
response[1] == ErrorCodes.NOT_READY:
raise NotReadyException(response[2])
return response
def enableStorageList(self, uuid_list):
"""
Put all given storage nodes in "running" state.
"""
packet = Packets.AddPendingNodes(uuid_list)
response = self.__ask(packet)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def tweakPartitionTable(self, uuid_list=(), dry_run=False):
response = self.__ask(Packets.TweakPartitionTable(dry_run, uuid_list))
if response[0] != Packets.AnswerTweakPartitionTable:
raise RuntimeError(response)
return response[1:]
def setNumReplicas(self, nr):
response = self.__ask(Packets.SetNumReplicas(nr))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def setClusterState(self, state):
"""
Set cluster state.
"""
packet = Packets.SetClusterState(state)
response = self.__ask(packet)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def _setNodeState(self, node, state):
"""
Kill node, or remove it permanently
"""
response = self.__ask(Packets.SetNodeState(node, state))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def getClusterState(self):
"""
Get cluster state.
"""
packet = Packets.AskClusterState()
response = self.__ask(packet)
if response[0] != Packets.AnswerClusterState:
raise RuntimeError(response)
return response[1]
def getLastIds(self):
response = self.__ask(Packets.AskLastIDs())
if response[0] != Packets.AnswerLastIDs:
raise RuntimeError(response)
return response[1:]
def getLastTransaction(self):
response = self.__ask(Packets.AskLastTransaction())
if response[0] != Packets.AnswerLastTransaction:
raise RuntimeError(response)
return response[1]
def getRecovery(self):
response = self.__ask(Packets.AskRecovery())
if response[0] != Packets.AnswerRecovery:
raise RuntimeError(response)
return response[1:]
def getNodeList(self, node_type=None):
"""
Get a list of nodes, filtering with given type.
"""
packet = Packets.AskNodeList(node_type)
response = self.__ask(packet)
if response[0] != Packets.AnswerNodeList:
raise RuntimeError(response)
return response[1] # node_list
def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
"""
Get a list of partition rows, bounded by min & max and involving
given node.
"""
packet = Packets.AskPartitionList(min_offset, max_offset, node)
response = self.__ask(packet)
if response[0] != Packets.AnswerPartitionList:
raise RuntimeError(response)
return response[1:]
def startCluster(self):
"""
Set cluster into "verifying" state.
"""
return self.setClusterState(ClusterStates.VERIFYING)
def killNode(self, node):
return self._setNodeState(node, NodeStates.DOWN)
def dropNode(self, node):
return self._setNodeState(node, NodeStates.UNKNOWN)
def getPrimary(self):
"""
Return the primary master UUID.
"""
packet = Packets.AskPrimary()
response = self.__ask(packet)
if response[0] != Packets.AnswerPrimary:
raise RuntimeError(response)
return response[1]
def repair(self, *args):
response = self.__ask(Packets.Repair(*args))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def truncate(self, tid):
response = self.__ask(Packets.Truncate(tid))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def checkReplicas(self, *args):
response = self.__ask(Packets.CheckReplicas(*args))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def flushLog(self):
conn = self.__getConnection()
conn.send(Packets.FlushLog())
while conn.pending():
self.em.poll(1)