pax_global_header 0000666 0000000 0000000 00000000064 13276562645 0014532 g ustar 00root root 0000000 0000000 52 comment=30a02bdc1f256d74a4a78e10d9f74df606f7d132
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/ 0000775 0000000 0000000 00000000000 13276562645 0021512 5 ustar 00root root 0000000 0000000 neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/ 0000775 0000000 0000000 00000000000 13276562645 0022273 5 ustar 00root root 0000000 0000000 neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/ 0000775 0000000 0000000 00000000000 13276562645 0023041 5 ustar 00root root 0000000 0000000 neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/__init__.py 0000664 0000000 0000000 00000001300 13276562645 0025144 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from .logger import logging
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/app.py 0000664 0000000 0000000 00000004631 13276562645 0024177 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2015-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from . import logging
from .event import EventManager
from .node import NodeManager
class BaseApplication(object):
server = None
ssl = None
def __init__(self, ssl=None, dynamic_master_list=None):
if ssl:
if not all(ssl):
raise ValueError("To enable encryption, 3 files must be"
" provided: the CA certificate, and the certificate"
" of this node with its private key.")
ca, cert, key = ssl
import ssl
version, version_name = max((getattr(ssl, k), k)
for k in dir(ssl) if k.startswith("PROTOCOL_TLSv"))
self.ssl = context = ssl.SSLContext(version)
context.options |= (0
| ssl.OP_CIPHER_SERVER_PREFERENCE
| ssl.OP_NO_COMPRESSION
)
context.set_ciphers(ssl._RESTRICTED_SERVER_CIPHERS)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(ca)
context.load_cert_chain(cert, key)
context.verify_flags |= ssl.VERIFY_X509_STRICT | (
context.cert_store_stats()['crl'] and ssl.VERIFY_CRL_CHECK_LEAF)
logging.info("TLS %s enabled for %s",
float(version_name[13:].replace("_", ".")), self)
self._handlers = {}
self.em = EventManager()
self.nm = NodeManager(dynamic_master_list)
# XXX: Do not implement __del__ unless all references to the Application
# become weak.
# Due to cyclic references, Python < 3.4 would never call it unless
# it's closed explicitly, and in this case, there's nothing to do.
def close(self):
self.nm.close()
self.em.close()
self.__dict__.clear()
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/attributeTracker.py 0000664 0000000 0000000 00000003673 13276562645 0026743 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
ATTRIBUTE_TRACKER_ENABLED = False
from .locking import LockUser
"""
Usage example:
from neo import attributeTracker
class Foo(object):
...
def assertBar(self, expected_value):
if self.bar_attr != expected_value:
attributeTracker.whoSet(self, 'bar_attr')
attributeTracker.track(Foo)
"""
MODIFICATION_CONTAINER_ID = '_attribute_tracker_dict'
def tracker_setattr(self, attr, value, setattr):
modification_container = getattr(self, MODIFICATION_CONTAINER_ID, None)
if modification_container is None:
modification_container = {}
setattr(self, MODIFICATION_CONTAINER_ID, modification_container)
modification_container[attr] = LockUser()
setattr(self, attr, value)
if ATTRIBUTE_TRACKER_ENABLED:
def track(klass):
original_setattr = klass.__setattr__
def klass_tracker_setattr(self, attr, value):
tracker_setattr(self, attr, value, original_setattr)
klass.__setattr__ = klass_tracker_setattr
else:
def track(klass):
pass
def whoSet(instance, attr):
result = getattr(instance, MODIFICATION_CONTAINER_ID, None)
if result is not None:
result = result.get(attr)
if result is not None:
result = result.formatStack()
return result
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/bootstrap.py 0000664 0000000 0000000 00000006645 13276562645 0025443 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from . import logging
from .exception import PrimaryElected
from .handler import EventHandler
from .protocol import Packets
from .connection import ClientConnection
class BootstrapManager(EventHandler):
"""
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, node_type, server=None):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
is ready.
"""
self.server = server
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
app.nm.reset()
uuid = property(lambda self: self.app.uuid)
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, None))
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
self.current = None
def connectionLost(self, conn, new_state):
self.current = None
def _acceptIdentification(self, node, num_partitions, num_replicas):
assert self.current is node, (self.current, node)
self.num_partitions = num_partitions
self.num_replicas = num_replicas
def getPrimaryConnection(self):
"""
Primary lookup/connection process.
Returns when the connection is made.
"""
logging.info('connecting to a primary master node')
app = self.app
poll = app.em.poll
index = 0
self.current = None
# retry until identified to the primary
while True:
try:
while self.current:
if self.current.isIdentified():
return (self.current, self.current.getConnection(),
self.num_partitions, self.num_replicas)
poll(1)
except PrimaryElected, e:
if self.current:
self.current.getConnection().close()
self.current, = e.args
index = app.nm.getMasterList().index(self.current)
else:
# select a master
master_list = app.nm.getMasterList()
index = (index + 1) % len(master_list)
self.current = master_list[index]
ClientConnection(app, self, self.current)
# Note that the connection may be already closed. This happens when
# the kernel reacts so quickly to a closed port that 'connect'
# fails on the first call. In such case, poll(1) would deadlock
# if there's no other connection to timeout.
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/compress.py 0000664 0000000 0000000 00000003464 13276562645 0025255 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import zlib
decompress_list = (
lambda data: data,
zlib.decompress,
)
def parseOption(value):
x = value.split('=', 1)
try:
alg = ('zlib',).index(x[0])
if len(x) == 1:
return alg, None
level = int(x[1])
except Exception:
raise ValueError("not a valid 'compress' option: %r" % value)
if 0 < level <= zlib.Z_BEST_COMPRESSION:
return alg, level
raise ValueError("invalid compression level: %r" % level)
def getCompress(value):
if value:
alg, level = (0, None) if value is True else value
_compress = zlib.compress
if level:
zlib_compress = _compress
_compress = lambda data: zlib_compress(data, level)
alg += 1
assert 0 < alg < len(decompress_list), 'invalid compression algorithm'
def compress(data):
size = len(data)
compressed = _compress(data)
if len(compressed) < size:
return size, alg, compressed
return size, 0, data
compress._compress = _compress # for testBasicStore
return compress
return lambda data: (len(data), 0, data)
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/config.py 0000664 0000000 0000000 00000011567 13276562645 0024672 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os
from optparse import OptionParser
from ConfigParser import SafeConfigParser, NoOptionError
from . import util
from .util import parseNodeAddress
def getOptionParser():
parser = OptionParser()
parser.add_option('-l', '--logfile',
help='log debugging information to specified SQLite DB')
parser.add_option('--ca', help='certificate authority in PEM format')
parser.add_option('--cert', help='certificate in PEM format')
parser.add_option('--key', help='private key in PEM format')
return parser
def getServerOptionParser():
parser = getOptionParser()
parser.add_option('-f', '--file', help='specify a configuration file')
parser.add_option('-s', '--section', help='specify a configuration section')
parser.add_option('-c', '--cluster', help='the cluster name')
parser.add_option('-m', '--masters', help='master node list')
parser.add_option('-b', '--bind', help='the local address to bind to')
parser.add_option('-D', '--dynamic-master-list',
help='path of the file containing dynamic master node list')
return parser
class ConfigurationManager(object):
"""
Configuration manager that load options from a configuration file and
command line arguments
"""
def __init__(self, defaults, options, section):
self.argument_list = options = {k: v
for k, v in options.__dict__.iteritems()
if v is not None}
self.defaults = defaults
config_file = options.pop('file', None)
if config_file:
self.parser = SafeConfigParser(defaults)
self.parser.read(config_file)
else:
self.parser = None
self.section = options.pop('section', section)
def __get(self, key, optional=False):
value = self.argument_list.get(key)
if value is None:
if self.parser is None:
value = self.defaults.get(key)
else:
try:
value = self.parser.get(self.section, key)
except NoOptionError:
pass
if value is None and not optional:
raise RuntimeError("Option '%s' is undefined'" % (key, ))
return value
def __getPath(self, *args, **kw):
path = self.__get(*args, **kw)
if path:
return os.path.expanduser(path)
def getLogfile(self):
return self.__getPath('logfile', True)
def getSSL(self):
r = [self.__getPath(key, True) for key in ('ca', 'cert', 'key')]
if any(r):
return r
def getMasters(self):
""" Get the master node list except itself """
return util.parseMasterList(self.__get('masters'))
def getBind(self):
""" Get the address to bind to """
bind = self.__get('bind')
return parseNodeAddress(bind, 0)
def getDisableDropPartitions(self):
return self.__get('disable_drop_partitions', True)
def getDatabase(self):
return self.__get('database')
def getEngine(self):
return self.__get('engine', True)
def getWait(self):
# XXX: see also DatabaseManager.__init__
return self.__get('wait')
def getDynamicMasterList(self):
return self.__getPath('dynamic_master_list', optional=True)
def getAdapter(self):
return self.__get('adapter')
def getCluster(self):
cluster = self.__get('cluster')
assert cluster != '', "Cluster name must be non-empty"
return cluster
def getReplicas(self):
return int(self.__get('replicas'))
def getPartitions(self):
return int(self.__get('partitions'))
def getReset(self):
# only from command line
return self.argument_list.get('reset', False)
def getUUID(self):
# only from command line
uuid = self.argument_list.get('uuid', None)
if uuid:
return int(uuid)
def getUpstreamCluster(self):
return self.__get('upstream_cluster', True)
def getUpstreamMasters(self):
return util.parseMasterList(self.__get('upstream_masters'))
def getAutostart(self):
n = self.__get('autostart', True)
if n:
return int(n)
def getDedup(self):
return self.__get('dedup', True)
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/connection.py 0000664 0000000 0000000 00000060444 13276562645 0025562 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from functools import wraps
from time import time
from . import attributeTracker, logging
from .connector import ConnectorException, ConnectorDelayedConnection
from .locking import RLock
from .protocol import uuid_str, Errors, PacketMalformedError, Packets
from .util import dummy_read_buffer, ReadBuffer
class ConnectionClosed(Exception):
pass
class HandlerSwitcher(object):
_is_handling = False
_pending = ({}, None),
def __init__(self, handler):
# pending handlers and related requests
self._pending = []
self.setHandler(handler)
def close(self):
self.__dict__.clear()
def isPending(self):
return bool(self._pending[0][0])
def cancelRequests(self, conn, message):
if self.isPending():
p = Errors.ProtocolError(message)
while True:
request_dict, handler = self._pending[0]
while request_dict:
msg_id, request = request_dict.popitem()
p.setId(msg_id)
handler.packetReceived(conn, p, request[1])
if len(self._pending) == 1:
break
del self._pending[0]
def getHandler(self):
return self._pending[0][1]
def getLastHandler(self):
""" Return the last (may be unapplied) handler registered """
return self._pending[-1][1]
def emit(self, request, kw={}):
# register the request in the current handler
_pending = self._pending
if self._is_handling:
# If this is called while handling a packet, the response is to
# be expected for the current handler...
(request_dict, _) = _pending[0]
else:
# ...otherwise, queue for the latest handler
assert len(_pending) == 1 or _pending[0][0]
(request_dict, _) = _pending[-1]
msg_id = request.getId()
answer_class = request.getAnswerClass()
assert answer_class is not None, "Not a request"
assert msg_id not in request_dict, "Packet id already expected"
request_dict[msg_id] = answer_class, kw
def handle(self, connection, packet):
assert not self._is_handling
self._is_handling = True
try:
self._handle(connection, packet)
finally:
self._is_handling = False
def _handle(self, connection, packet):
pending = self._pending
assert len(pending) == 1 or pending[0][0], pending
logging.packet(connection, packet, False)
if connection.isClosed() and (connection.isAborted() or
packet.ignoreOnClosedConnection()):
logging.debug('Ignoring packet %r on closed connection %r',
packet, connection)
return
if not packet.isResponse(): # notification
# XXX: If there are several handlers, which one to use ?
pending[0][1].packetReceived(connection, packet)
return
msg_id = packet.getId()
request_dict, handler = pending[0]
# checkout the expected answer class
try:
klass, kw = request_dict.pop(msg_id)
except KeyError:
klass = None
kw = {}
try:
if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(connection, packet, kw)
else:
logging.error('Unexpected answer %r in %r', packet, connection)
if not connection.isClosed():
connection.answer(Errors.ProtocolError(
'Unexpected answer: %r' % packet))
connection.abort()
finally:
# apply a pending handler if no more answers are pending
while len(pending) > 1 and not pending[0][0]:
del pending[0]
logging.debug('Apply handler %r on %r', pending[0][1],
connection)
def setHandler(self, handler):
can_apply = len(self._pending) == 1 and not self._pending[0][0]
if can_apply:
# nothing is pending, change immediately
self._pending[0][1] = handler
else:
# put the next handler in queue
self._pending.append([{}, handler])
return can_apply
class BaseConnection(object):
"""A base connection
About timeouts:
In the past, ask() took a timeout parameter as a way to close the
connection if the remote node was too long to reply, with the idea
that something went wrong. There was no known bug but this feature was
actually a bad idea.
It is impossible to test whether the remote node is in good state or
not. The experience shows that timeouts were always triggered because
the remote nodes were simply too slow. Waiting remains the best option
and anything else would only make things worse.
The only case where it could make sense to react on a slow request is
when there is redundancy, more exactly for read requests to storage
nodes when there are replicas. A client node could resend its request
to another node, _without_ breaking the first connection (then wait for
the first reply and ignore the other).
The previous timeout implementation (before May 2017) was not well
suited to support the above use case so most of the code has been
removed, but it may contain some interesting parts.
Currently, since applicative pings have been replaced by TCP
keepalives, timeouts are only used for 2 things:
- to avoid reconnecting too fast
- to close idle client connections
"""
from .connector import SocketConnector as ConnectorClass
def __init__(self, event_manager, handler, connector, addr=None):
assert connector is not None, "Need a low-level connector"
self.em = event_manager
self.connector = connector
self.addr = addr
self._handlers = HandlerSwitcher(handler)
# XXX: do not use getHandler
getHandler = property(lambda self: self._handlers.getHandler)
getLastHandler = property(lambda self: self._handlers.getLastHandler)
isPending = property(lambda self: self._handlers.isPending)
def cancelRequests(self, *args, **kw):
return self._handlers.cancelRequests(self, *args, **kw)
def getTimeout(self):
pass
def lockWrapper(self, func):
return func
def getConnector(self):
return self.connector
def getAddress(self):
return self.addr
def readable(self):
raise NotImplementedError
def writable(self):
raise NotImplementedError
def close(self):
"""Close the connection."""
if self.connector is not None:
self.em.unregister(self, True)
self.connector = None
self.aborted = False
def _getReprInfo(self):
r = [
('uuid', uuid_str(self.getUUID())),
('address', ('[%s]:%s' if ':' in self.addr[0] else '%s:%s')
% self.addr if self.addr else '?'),
('handler', self.getHandler()),
]
connector = self.connector
if connector is None:
return r, ['closed']
r.append(('fd', connector.getDescriptor()))
return r, ['aborted'] if self.isAborted() else []
def __repr__(self):
r, flags = self._getReprInfo()
r = map('%s=%s'.__mod__, r)
r += flags
return '<%s(%s) at %x>' % (
self.__class__.__name__,
', '.join(r),
id(self),
)
def setHandler(self, handler):
changed = self._handlers.setHandler(handler)
if changed:
logging.debug('Handler changed on %r', self)
else:
logging.debug('Delay handler %r on %r', handler, self)
return changed
def getUUID(self):
return None
def isClosed(self):
return self.connector is None or self.isAborted()
def isAborted(self):
return False
def isListening(self):
return False
def isServer(self):
return False
def isClient(self):
return False
def hasPendingMessages(self):
return False
def whoSetConnector(self):
"""
Debugging method: call this method to know who set the current
connector value.
"""
return attributeTracker.whoSet(self, 'connector')
attributeTracker.track(BaseConnection)
class ListeningConnection(BaseConnection):
"""A listen connection."""
def __init__(self, app, handler, addr):
self._ssl = app.ssl
logging.debug('listening to %s:%d', *addr)
connector = self.ConnectorClass(addr)
BaseConnection.__init__(self, app.em, handler, connector, addr)
connector.makeListeningConnection()
self.em.register(self)
def readable(self):
connector, addr = self.connector.accept()
logging.debug('accepted a connection from %s:%d', *addr)
conn = ServerConnection(self.em, self.getHandler(), connector, addr)
if self._ssl:
conn.connecting = True
connector.ssl(self._ssl, conn._connected)
# Nothing to send as long as we haven't received a ClientHello
# message.
else:
conn._connected()
self.em.addWriter(conn) # for ENCODED_VERSION
def getAddress(self):
return self.connector.getAddress()
def isListening(self):
return True
class Connection(BaseConnection):
"""A connection."""
# XXX: rename isPending, hasPendingMessages & pending methods
buffering = False
connecting = True
client = False
server = False
peer_id = None
_parser_state = None
_timeout = None
def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer()
self.cur_id = 0
self.aborted = False
self.uuid = None
self._queue = []
self._on_close = None
def _getReprInfo(self):
r, flags = super(Connection, self)._getReprInfo()
if self._queue:
r.append(('len(queue)', len(self._queue)))
if self._on_close is not None:
r.append(('on_close', getattr(self._on_close, '__name__', '?')))
flags.extend(x for x in ('connecting', 'client', 'server')
if getattr(self, x))
return r, flags
def setOnClose(self, callback):
self._on_close = callback
def isClient(self):
return self.client
def isServer(self):
return self.server
def asClient(self):
try:
del self._timeout
except AttributeError:
self.client = True
else:
assert self.client
def asServer(self):
self.server = True
def _closeClient(self):
if self.server:
del self._timeout
self.client = False
self.send(Packets.CloseClient())
else:
self.close()
def closeClient(self):
# Currently, the only usage that is really useful is between a backup
# storage node and an upstream one, to avoid:
# - maintaining many connections for nothing when there's no write
# activity for a long time (and waste resources with keepalives)
# - reconnecting too often (i.e. be reactive) when there's moderate
# activity (think of a timer with a period of 1 minute)
if self.connector is not None and self.client:
self._timeout = time() + 100
def isAborted(self):
return self.aborted
def getUUID(self):
return self.uuid
def setUUID(self, uuid):
self.uuid = uuid
def setPeerId(self, peer_id):
assert peer_id is not None
self.peer_id = peer_id
def getPeerId(self):
return self.peer_id
def _getNextId(self):
next_id = self.cur_id
self.cur_id = (next_id + 1) & 0xffffffff
return next_id
def getTimeout(self):
if not self._queue:
return self._timeout
def onTimeout(self):
assert self._timeout
self._closeClient()
def abort(self):
"""Abort dealing with this connection."""
assert self.pending()
if self.connecting:
self.close()
return
logging.debug('aborting a connector for %r', self)
self.aborted = True
self.read_buf = dummy_read_buffer
if self._on_close is not None:
self._on_close()
self._on_close = None
def writable(self):
"""Called when self is writable."""
try:
if self.connector.send():
if self.aborted:
self.close()
else:
self.em.removeWriter(self)
except ConnectorException:
self._closure()
def _parse(self):
read = self.read_buf.read
version = read(4)
if version is None:
return
from .protocol import (ENCODED_VERSION, MAX_PACKET_SIZE,
PACKET_HEADER_FORMAT, Packets)
if version != ENCODED_VERSION:
logging.warning('Protocol version mismatch with %r', self)
raise ConnectorException
header_size = PACKET_HEADER_FORMAT.size
unpack = PACKET_HEADER_FORMAT.unpack
def parse():
state = self._parser_state
if state is None:
header = read(header_size)
if header is None:
return
msg_id, msg_type, msg_len = unpack(header)
try:
packet_klass = Packets[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
else:
msg_id, packet_klass, msg_len = state
data = read(msg_len)
if data is None:
# Not enough.
if state is None:
self._parser_state = msg_id, packet_klass, msg_len
else:
self._parser_state = None
packet = packet_klass()
packet.setContent(msg_id, data)
return packet
self._parse = parse
return parse()
def readable(self):
"""Called when self is readable."""
# last known remote activity
try:
try:
if self.connector.receive(self.read_buf):
self.em.addWriter(self)
finally:
# A connector may read some data
# before raising ConnectorException
while 1:
packet = self._parse()
if packet is None:
break
self._queue.append(packet)
except ConnectorException:
self._closure()
except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', self, e)
self._closure()
return not not self._queue
def hasPendingMessages(self):
"""
Returns True if there are messages queued and awaiting processing.
"""
return not not self._queue
def process(self):
"""
Process a pending packet.
"""
# check out packet and process it with current handler
self._handlers.handle(self, self._queue.pop(0))
def pending(self):
connector = self.connector
return connector is not None and connector.queued
@property
def setReconnectionNoDelay(self):
try:
return self.connector.setReconnectionNoDelay
except AttributeError:
raise ConnectionClosed
def close(self):
if self.connector is None:
assert self._on_close is None
assert not self.read_buf
assert not self.isPending()
return
# process the network events with the last registered handler to
# solve issues where a node is lost with pending handlers and
# create unexpected side effects.
handler = self._handlers.getLastHandler()
super(Connection, self).close()
if self._on_close is not None:
self._on_close()
self._on_close = None
self.read_buf.clear()
try:
if self.connecting:
handler.connectionFailed(self)
self.connecting = False
else:
handler.connectionClosed(self)
finally:
self._handlers.close()
def _closure(self):
assert self.connector is not None, self.whoSetConnector()
while self._queue:
self._handlers.handle(self, self._queue.pop(0))
self.close()
def _addPacket(self, packet):
"""Add a packet into the write buffer."""
if self.connector.queue(packet.encode()):
if packet.nodelay or 65536 < self.connector.queue_size:
assert not self.buffering
# enable polling for writing.
self.em.addWriter(self)
else:
self.buffering = True
elif self.buffering and (65536 < self.connector.queue_size
or packet.nodelay):
self.buffering = False
self.em.addWriter(self)
logging.packet(self, packet, True)
def send(self, packet, msg_id=None):
""" Then a packet with a new ID """
if self.isClosed():
raise ConnectionClosed
packet.setId(self._getNextId() if msg_id is None else msg_id)
self._addPacket(packet)
def ask(self, packet, **kw):
"""
Send a packet with a new ID and register the expectation of an answer
"""
if self.isClosed():
raise ConnectionClosed
msg_id = self._getNextId()
packet.setId(msg_id)
self._addPacket(packet)
self._handlers.emit(packet, kw)
return msg_id
def answer(self, packet):
""" Answer to a packet by re-using its ID for the packet answer """
assert packet.isResponse(), packet
if self.isClosed():
if packet.ignoreOnClosedConnection() and not packet.isError():
raise ConnectionClosed
return
packet.setId(self.peer_id)
self._addPacket(packet)
def _connected(self):
self.connecting = False
self.getHandler().connectionCompleted(self)
class ClientConnection(Connection):
"""A connection from this node to a remote node."""
client = True
def __init__(self, app, handler, node):
self._ssl = app.ssl
addr = node.getAddress()
connector = self.ConnectorClass(addr)
Connection.__init__(self, app.em, handler, connector, addr)
node.setConnection(self)
handler.connectionStarted(self)
self._connect()
def _connect(self):
try:
connected = self.connector.makeClientConnection()
except ConnectorDelayedConnection, c:
connect_limit, = c.args
self.getTimeout = lambda: connect_limit
self.onTimeout = self._delayedConnect
self.em.register(self, timeout_only=True)
except ConnectorException:
self._closure()
else:
self.em.register(self)
if connected:
self._maybeConnected()
# There's always the protocol version to send.
self.em.addWriter(self)
def _delayedConnect(self):
del self.getTimeout, self.onTimeout
self._connect()
def writable(self):
"""Called when self is writable."""
if self.connector.getError():
self._closure()
else:
self._maybeConnected()
self.writable()
def _maybeConnected(self):
self.writable = self.lockWrapper(super(ClientConnection, self).writable)
if self._ssl:
self.connector.ssl(self._ssl, self._connected)
else:
self._connected()
class ServerConnection(Connection):
"""A connection from a remote node to this node."""
server = True
def __init__(self, *args, **kw):
Connection.__init__(self, *args, **kw)
self.em.register(self)
class MTConnectionType(type):
def __init__(cls, *args):
if __debug__:
for name in 'answer',:
setattr(cls, name, cls.lockCheckWrapper(name))
for name in 'close', 'send':
setattr(cls, name, cls.__class__.lockWrapper(cls, name))
for name in ('_delayedConnect', 'onTimeout',
'process', 'readable', 'writable'):
setattr(cls, name, cls.__class__.lockWrapper(cls, name, True))
def lockCheckWrapper(cls, name):
def wrapper(self, *args, **kw):
# XXX: Unfortunately, RLock does not has any public method
# to test whether we own the lock or not.
assert self.lock._is_owned(), (self, args, kw)
return getattr(super(cls, self), name)(*args, **kw)
return wraps(getattr(cls, name).im_func)(wrapper)
def lockWrapper(cls, name, maybe_closed=False):
if maybe_closed:
def wrapper(self):
with self.lock:
if self.isClosed():
logging.info("%r.%s()", self, name)
else:
return getattr(super(cls, self), name)()
else:
def wrapper(self, *args, **kw):
with self.lock:
return getattr(super(cls, self), name)(*args, **kw)
return wraps(getattr(cls, name).im_func)(wrapper)
class MTClientConnection(ClientConnection):
"""A Multithread-safe version of ClientConnection."""
__metaclass__ = MTConnectionType
def lockWrapper(self, func):
lock = self.lock
def wrapper(*args, **kw):
with lock:
return func(*args, **kw)
return wrapper
def __init__(self, *args, **kwargs):
self.lock = lock = RLock()
self.dispatcher = kwargs.pop('dispatcher')
with lock:
super(MTClientConnection, self).__init__(*args, **kwargs)
# Alias without lock (cheaper than super())
_ask = ClientConnection.ask.__func__
def ask(self, packet, queue=None, **kw):
with self.lock:
if queue is None:
if type(packet) is Packets.Ping:
return self._ask(packet, **kw)
raise TypeError('Only Ping packet can be asked'
' without a queue, got a %r.' % packet)
msg_id = self._ask(packet, **kw)
self.dispatcher.register(self, msg_id, queue)
return msg_id
# Currently, on connected connections, we only use timeouts for
# closeClient, which is never used for MTClientConnection.
# So we disable the logic completely as a precaution, and for performance.
# What is specific to MTClientConnection is that the poll thread must be
# woken up whenever the timeout is changed to a smaller value.
def closeClient(self):
# For example here, in addition to what the super method does,
# we may have to call `self.em.wakeup()`
raise NotImplementedError
def getTimeout(self):
pass
def onTimeout(self):
# It is possible that another thread manipulated the connection while
# getting a timeout from epoll. Only the poll thread fills _queue
# so we know that it is empty, but we may have to check timeout values
# again (i.e. compare time() with the result of getTimeout()).
raise NotImplementedError
###
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/connector.py 0000664 0000000 0000000 00000030314 13276562645 0025406 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import socket
import ssl
import errno
from time import time
from . import logging
from .protocol import ENCODED_VERSION
# Global connector registry.
# Fill by calling registerConnectorHandler.
# Read by calling SocketConnector.__new__
connector_registry = {}
def registerConnectorHandler(connector_handler):
connector_registry[connector_handler.af_type] = connector_handler
class SocketConnector(object):
""" This class is a wrapper for a socket """
is_closed = is_server = None
connect_limit = {}
CONNECT_LIMIT = 1
def __new__(cls, addr, s=None):
if s is None:
host, port = addr
for af_type, cls in connector_registry.iteritems():
try :
socket.inet_pton(af_type, host)
break
except socket.error:
pass
else:
raise ValueError("Unknown type of host", host)
self = object.__new__(cls)
self.addr = cls._normAddress(addr)
if s is None:
s = socket.socket(af_type, socket.SOCK_STREAM)
else:
self.is_server = True
self.is_closed = False
self.socket = s
self.socket_fd = s.fileno()
# always use non-blocking sockets
s.setblocking(0)
# TCP keepalive, enabled on both sides to detect:
# - remote host crash
# - network failure
# They're more efficient than applicative pings and we don't want
# to consider the connection dead if the remote node is busy.
# The following 3 lines are specific to Linux. It seems that OSX
# has similar options (TCP_KEEPALIVE/TCP_KEEPINTVL/TCP_KEEPCNT),
# and Windows has SIO_KEEPALIVE_VALS (fixed count of 10).
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.queued = [ENCODED_VERSION]
self.queue_size = len(ENCODED_VERSION)
return self
def queue(self, data):
was_empty = not self.queued
self.queued += data
for data in data:
self.queue_size += len(data)
return was_empty
def _error(self, op, exc=None):
if exc is None:
logging.debug('%r closed in %s', self, op)
else:
logging.debug("%s failed for %s: %s (%s)",
op, self, errno.errorcode[exc.errno], exc.strerror)
raise ConnectorException
# Threaded tests monkey-patch the following 2 operations.
_connect = lambda self, addr: self.socket.connect(addr)
_bind = lambda self, addr: self.socket.bind(addr)
def makeClientConnection(self):
assert self.is_closed is None
addr = self.addr
try:
connect_limit = self.connect_limit[addr]
if time() < connect_limit:
# Next call to queue() must return False
# in order not to enable polling for writing.
self.queued or self.queued.append('')
raise ConnectorDelayedConnection(connect_limit)
if self.queued and not self.queued[0]:
del self.queued[0]
except KeyError:
pass
self.connect_limit[addr] = time() + self.CONNECT_LIMIT
self.is_server = self.is_closed = False
try:
self._connect(addr)
except socket.error, e:
if e.errno == errno.EINPROGRESS:
return False
self._error('connect', e)
return True
def makeListeningConnection(self):
assert self.is_closed is None
self.is_closed = False
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._bind(self.addr)
self.socket.listen(5)
except socket.error, e:
self.socket.close()
self._error('listen', e)
def ssl(self, ssl, on_handshake_done=None):
self.socket = ssl.wrap_socket(self.socket,
server_side=self.is_server,
do_handshake_on_connect=False,
suppress_ragged_eofs=False)
self.__class__ = self.SSLHandshakeConnectorClass
self.on_handshake_done = on_handshake_done
self.queued or self.queued.append('')
def getError(self):
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
def getDescriptor(self):
# this descriptor must only be used by the event manager, where it
# guarantee uniqueness only while the connector is opened and
# registered in epoll
return self.socket_fd
@staticmethod
def _normAddress(addr):
return addr
def getAddress(self):
return self._normAddress(self.socket.getsockname())
def accept(self):
try:
s, addr = self.socket.accept()
s = self.__class__(addr, s)
return s, s.addr
except socket.error, e:
self._error('accept', e)
def receive(self, read_buf):
try:
data = self.socket.recv(65536)
except socket.error, e:
self._error('recv', e)
if data:
read_buf.append(data)
return
self._error('recv')
def send(self):
# XXX: unefficient for big packets
msg = ''.join(self.queued)
if msg:
try:
n = self.socket.send(msg)
except socket.error, e:
self._error('send', e)
# Do nothing special if n == 0:
# - it never happens for simple sockets;
# - for SSL sockets, this is always the case unless everything
# could be sent.
if n != len(msg):
self.queued[:] = msg[n:],
self.queue_size -= n
return False
del self.queued[:]
self.queue_size = 0
else:
assert not self.queued
return True
def shutdown(self):
self.is_closed = True
try:
if self.connect_limit[self.addr] < time():
del self.connect_limit[self.addr]
except KeyError:
pass
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error, e:
if e.errno != errno.ENOTCONN:
raise
return self.socket.close
def setReconnectionNoDelay(self):
"""Mark as successful so that we can reconnect without delay"""
self.connect_limit.pop(self.addr, None)
def __repr__(self):
if self.is_closed is None:
state = 'never opened'
else:
if self.is_closed:
state = 'closed '
else:
state = 'opened '
if self.is_server is None:
state += 'listening'
else:
if self.is_server:
state += 'from '
else:
state += 'to '
state += str(self.addr)
return '<%s at 0x%x fileno %s %s, %s>' % (self.__class__.__name__,
id(self), '?' if self.is_closed else self.socket_fd,
self.getAddress(), state)
class SocketConnectorIPv4(SocketConnector):
" Wrapper for IPv4 sockets"
af_type = socket.AF_INET
class SocketConnectorIPv6(SocketConnector):
" Wrapper for IPv6 sockets"
af_type = socket.AF_INET6
@staticmethod
def _normAddress(addr):
return addr[:2]
registerConnectorHandler(SocketConnectorIPv4)
registerConnectorHandler(SocketConnectorIPv6)
def overlay_connector_class(cls):
name = cls.__name__[1:]
alias = name + 'ConnectorClass'
for base in connector_registry.itervalues():
setattr(base, alias, type(name + base.__name__,
cls.__bases__ + (base,), cls.__dict__))
return cls
@overlay_connector_class
class _SSL:
def _error(self, op, exc=None):
if isinstance(exc, ssl.SSLError):
if not isinstance(exc, ssl.SSLEOFError):
logging.debug("%s failed for %s: %s", op, self, exc)
raise ConnectorException
exc = None
SocketConnector._error(self, op, exc)
def receive(self, read_buf):
try:
while 1:
read_buf.append(self.socket.recv(4096))
except ssl.SSLWantReadError:
pass
except socket.error, e:
self._error('recv', e)
@overlay_connector_class
class _SSLHandshake(_SSL):
# WKRD: Unfortunately, SSL_do_handshake(3SSL) does not try to reject
# non-SSL connections as soon as possible, by checking the first
# byte. It even does nothing before receiving a full TLSPlaintext
# frame (5 bytes).
# The NEO protocol is such that a client connection is always the
# first to send a packet, as soon as the connection is established,
# and without waiting that the protocol versions are checked.
# So in practice, non-SSL connection to SSL would never hang, but
# there's another issue: such case results in WRONG_VERSION_NUMBER
# instead of something like UNEXPECTED_RECORD, because the SSL
# version is checked first.
# For better logging, we try to detect non-SSL connections with
# MSG_PEEK. This only works reliably on server side.
# For SSL client connections, 2 things may prevent the workaround to
# log that the remote node has not enabled SSL:
# - non-SSL data received (or connection closed) before the first
# call to 'recv' in 'do_handshake'
# - the server connection detects a wrong protocol version before it
# sent its one
def _handshake(self, read_buf=None):
# ???Writer | send | receive
# -----------+--------+--------
# want read | remove | -
# want write | - | add
try:
self.socket.do_handshake()
except ssl.SSLWantReadError:
return read_buf is None
except ssl.SSLWantWriteError:
return read_buf is not None
except socket.error, e:
# OpenSSL 1.1 may raise socket.error(0)
# where previous versions raised SSLEOFError.
self._error('send' if read_buf is None else 'recv',
e if e.errno else None)
if not self.queued[0]:
del self.queued[0]
del self.receive, self.send
self.__class__ = self.SSLConnectorClass
cipher, proto, bits = self.socket.cipher()
logging.debug("SSL handshake done for %s: %s %s", self, cipher, bits)
if self.on_handshake_done:
self.on_handshake_done()
del self.on_handshake_done
if read_buf is None:
return self.send()
self.receive(read_buf)
return self.queued
def send(self, read_buf=None):
handshake = self.receive = self.send = self._handshake
return handshake(read_buf)
def receive(self, read_buf):
try:
content_type = self.socket._sock.recv(1, socket.MSG_PEEK)
except socket.error, e:
self._error('recv', e)
if content_type == '\26': # handshake
return self.send(read_buf)
if content_type:
logging.debug('Rejecting non-SSL %r', self)
raise ConnectorException
self._error('recv')
class ConnectorException(Exception):
pass
class ConnectorDelayedConnection(ConnectorException):
pass
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/debug.py 0000664 0000000 0000000 00000005611 13276562645 0024504 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2010-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import traceback
import signal
import imp
import os
import sys
from functools import wraps
import neo
# kill -RTMIN+2
# Dump information to logs.
# kill -RTMIN+3
# Loads (or reloads) neo.debug module.
# The content is up to you (it's only imported). It can be a breakpoint.
def safe_handler(func):
def wrapper(sig, frame):
try:
func(sig, frame)
except:
# Prevent exception from exiting signal handler, so mistakes in
# "debug" module don't kill process.
traceback.print_exc()
return wraps(func)(wrapper)
@safe_handler
def debugHandler(sig, frame):
file, filename, (suffix, mode, type) = imp.find_module('debug',
neo.__path__)
imp.load_module('neo.debug', file, filename, (suffix, mode, type))
def getPdb(**kw):
try: # try ipython if available
import IPython
shell = IPython.terminal.embed.InteractiveShellEmbed()
return IPython.core.debugger.Pdb(shell.colors, **kw)
except (AttributeError, ImportError):
import pdb
return pdb.Pdb(**kw)
_debugger = None
def winpdb(depth=0):
import rpdb2
depth += 1
if rpdb2.g_debugger is not None:
return rpdb2.setbreak(depth)
script = rpdb2.calc_frame_path(sys._getframe(depth))
pwd = str(os.getpid()) + os.getcwd().replace('/', '_').replace('-', '_')
pid = os.fork()
if pid:
try:
rpdb2.start_embedded_debugger(pwd, depth=depth)
finally:
os.waitpid(pid, 0)
else:
try:
os.execlp('python', 'python', '-c', """import os\nif not os.fork():
import rpdb2, winpdb
rpdb2_raw_input = rpdb2._raw_input
rpdb2._raw_input = lambda s: \
s == rpdb2.STR_PASSWORD_INPUT and %r or rpdb2_raw_input(s)
winpdb.g_ignored_warnings[winpdb.STR_EMBEDDED_WARNING] = True
winpdb.main()
""" % pwd, '-a', script)
finally:
os.abort()
def register(on_log=None):
if on_log is not None:
@safe_handler
def on_log_signal(signum, signal):
on_log()
signal.signal(signal.SIGRTMIN+2, on_log_signal)
signal.signal(signal.SIGRTMIN+3, debugHandler)
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/dispatcher.py 0000664 0000000 0000000 00000010674 13276562645 0025551 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from functools import wraps
from .locking import Lock, Empty
EMPTY = {}
NOBODY = []
@apply
class _ConnectionClosed(object):
handler_method_name = 'connectionClosed'
decode = tuple
class getId(object):
def __eq__(self, other):
return True
def giant_lock(func):
def wrapped(self, *args, **kw):
self.lock_acquire()
try:
return func(self, *args, **kw)
finally:
self.lock_release()
return wraps(func)(wrapped)
class Dispatcher:
"""Register a packet, connection pair as expecting a response packet."""
def __init__(self):
self.message_table = {}
self.queue_dict = {}
lock = Lock()
self.lock_acquire = lock.acquire
self.lock_release = lock.release
@giant_lock
def dispatch(self, conn, msg_id, packet, kw):
"""
Retrieve register-time provided queue, and put conn and packet in it.
"""
queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None)
if queue is None:
return False
elif queue is NOBODY:
return True
self._decrefQueue(queue)
queue.put((conn, packet, kw))
return True
def _decrefQueue(self, queue):
queue_id = id(queue)
queue_dict = self.queue_dict
if queue_dict[queue_id] == 1:
queue_dict.pop(queue_id)
else:
queue_dict[queue_id] -= 1
def _increfQueue(self, queue):
queue_id = id(queue)
queue_dict = self.queue_dict
try:
queue_dict[queue_id] += 1
except KeyError:
queue_dict[queue_id] = 1
@giant_lock
def register(self, conn, msg_id, queue):
"""Register an expectation for a reply."""
self.message_table.setdefault(id(conn), {})[msg_id] = queue
self._increfQueue(queue)
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads expecting responses from that connection """
self.lock_acquire()
try:
message_table = self.message_table.pop(id(conn), EMPTY)
finally:
self.lock_release()
notified_set = set()
_decrefQueue = self._decrefQueue
for queue in message_table.itervalues():
if queue is NOBODY:
continue
queue_id = id(queue)
if queue_id not in notified_set:
queue.put((conn, _ConnectionClosed, EMPTY))
notified_set.add(queue_id)
_decrefQueue(queue)
@giant_lock
def forget_queue(self, queue, flush_queue=True):
"""
Forget all pending messages for given queue.
Actually makes them "expected by nobody", so we know we can ignore
them, and not detect it as an error.
flush_queue (boolean, default=True)
All packets in queue get flushed.
"""
# XXX: expensive lookup: we iterate over the whole dict
found = 0
for message_table in self.message_table.itervalues():
for msg_id, t_queue in message_table.iteritems():
if queue is t_queue:
found += 1
message_table[msg_id] = NOBODY
refcount = self.queue_dict.pop(id(queue), 0)
assert refcount == found, (refcount, found)
if flush_queue:
get = queue.get
while True:
try:
get(block=False)
except Empty:
break
def registered(self, conn):
"""Check if a connection is registered into message table."""
return len(self.message_table.get(id(conn), EMPTY)) != 0
@giant_lock
def pending(self, queue):
return not queue.empty() or self.queue_dict.get(id(queue), 0) > 0
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/event.py 0000664 0000000 0000000 00000027313 13276562645 0024542 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os
from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EAGAIN, EEXIST, EINTR, ENOENT
from . import logging
from .locking import Lock
@apply
def dictionary_changed_size_during_iteration():
d = {}; i = iter(d); d[0] = 0
try:
next(i)
except RuntimeError as e:
return str(e)
raise AssertionError
class EpollEventManager(object):
"""This class manages connections and events based on epoll(5)."""
_timeout = None
def __init__(self):
self.connection_dict = {}
self.reader_set = set()
self.writer_set = set()
self.epoll = epoll()
self._pending_processing = []
self._trigger_list = []
self._trigger_fd, w = os.pipe()
os.close(w)
self._trigger_lock = Lock()
close_list = []
self._closeAppend = close_list.append
l = Lock()
self._closeAcquire = l.acquire
_release = l.release
def release():
try:
while close_list:
close_list.pop()()
finally:
_release()
self._closeRelease = release
def close(self):
os.close(self._trigger_fd)
for c in self.connection_dict.values():
c.close()
del self.__dict__
def getConnectionList(self):
# XXX: use index
while 1:
# See _poll() about the use of self.connection_dict.itervalues()
try:
return [x for x in self.connection_dict.itervalues()
if not x.isAborted()]
except RuntimeError, e:
if str(e) != dictionary_changed_size_during_iteration:
raise
logging.info("%r", e)
def getClientList(self):
# XXX: use index
return [c for c in self.getConnectionList() if c.isClient()]
def getServerList(self):
# XXX: use index
return [c for c in self.getConnectionList() if c.isServer()]
def getConnectionListByUUID(self, uuid):
""" Return the connection associated to the UUID, None if the UUID is
None, invalid or not found"""
# XXX: use index
# XXX: consider remove UUID from connection and thus this method
if uuid is None:
return None
result = []
append = result.append
for conn in self.getConnectionList():
if conn.getUUID() == uuid:
append(conn)
return result
# epoll_wait always waits for EPOLLERR & EPOLLHUP so we're forced
# to unregister when we want to ignore all events for a connection.
def register(self, conn, timeout_only=False):
fd = conn.getConnector().getDescriptor()
self.connection_dict[fd] = conn
if timeout_only:
self.wakeup()
else:
self.epoll.register(fd)
self.addReader(conn)
def unregister(self, conn, close=False):
new_pending_processing = [x for x in self._pending_processing
if x is not conn]
# Check that we removed at most one entry from
# self._pending_processing .
assert len(new_pending_processing) > len(self._pending_processing) - 2
self._pending_processing = new_pending_processing
connector = conn.getConnector()
fd = connector.getDescriptor()
try:
del self.connection_dict[fd]
self.epoll.unregister(fd)
except KeyError:
pass
except IOError, e:
if e.errno != ENOENT:
raise
else:
self.reader_set.discard(fd)
self.writer_set.discard(fd)
if close:
self._closeAppend(connector.shutdown())
if self._closeAcquire(0):
self._closeRelease()
return
if close:
# The connection is not registered, so do not wait for epoll
# to wake up (which may not even happen, and lead to EMFILE).
connector.shutdown()()
def isIdle(self):
return not (self._pending_processing or self.writer_set)
def _addPendingConnection(self, conn):
pending_processing = self._pending_processing
if conn not in pending_processing:
pending_processing.append(conn)
def poll(self, blocking=1):
if not self._pending_processing:
# Fetch messages from polled file descriptors
self._poll(blocking)
if not self._pending_processing:
return
to_process = self._pending_processing.pop(0)
try:
to_process.process()
finally:
# ...and requeue if there are pending messages
if to_process.hasPendingMessages():
self._addPendingConnection(to_process)
# Non-blocking call: as we handled a packet, we should just offer
# poll a chance to fetch & send already-available data, but it must
# not delay us.
self._poll(0)
def _poll(self, blocking):
if blocking:
# self.connection_dict may be changed at any time by another thread,
# which may cause itervalues() to fail. But this happens so rarely,
# that for performance reasons, we prefer to retry, rather than:
# - protect self.connection_dict with a lock
# - or iterate over an atomic copy.
while 1:
try:
timeout = self._timeout
timeout_object = self
for conn in self.connection_dict.itervalues():
t = conn.getTimeout()
if t and (timeout is None or t < timeout):
timeout = t
timeout_object = conn
break
except RuntimeError, e:
if str(e) != dictionary_changed_size_during_iteration:
raise
logging.info("%r", e)
# Make sure epoll_wait does not return too early, because it has a
# granularity of 1ms and Python 2.7 rounds the timeout towards zero.
# See also https://bugs.python.org/issue20452 (fixed in Python 3).
blocking = .001 + max(0, timeout - time()) if timeout else -1
# From this point, and until we have processed all fds returned by
# epoll, we must prevent any fd from being closed, because they could
# be reallocated by new connection, either by this thread or by another.
# Sockets to close are queued, and they're really closed in the
# 'finally' clause.
self._closeAcquire()
try:
event_list = self.epoll.poll(blocking)
except IOError, exc:
if exc.errno in (0, EAGAIN):
logging.info('epoll.poll triggered undocumented error %r',
exc.errno)
elif exc.errno != EINTR:
raise
return
else:
if event_list:
wlist = []
elist = []
for fd, event in event_list:
if event & EPOLLIN:
try:
conn = self.connection_dict[fd]
except KeyError:
continue
if conn.readable():
self._addPendingConnection(conn)
if event & EPOLLOUT:
wlist.append(fd)
if event & (EPOLLERR | EPOLLHUP):
elist.append(fd)
for fd in wlist:
try:
conn = self.connection_dict[fd]
except KeyError:
continue
conn.writable()
for fd in elist:
try:
conn = self.connection_dict[fd]
except KeyError:
if fd == self._trigger_fd:
with self._trigger_lock:
self.epoll.unregister(fd)
action_list = self._trigger_list
try:
while action_list:
action_list.pop(0)()
finally:
del action_list[:]
continue
if conn.readable():
self._addPendingConnection(conn)
return
finally:
self._closeRelease()
if blocking > 0:
logging.debug('timeout triggered for %r', timeout_object)
timeout_object.onTimeout()
def onTimeout(self):
on_timeout = self._on_timeout
del self._on_timeout
self._timeout = None
on_timeout()
def setTimeout(self, *args):
self._timeout, self._on_timeout = args
def wakeup(self, *actions):
with self._trigger_lock:
self._trigger_list += actions
try:
self.epoll.register(self._trigger_fd)
except IOError, e:
# Ignore if 'wakeup' is called several times in a row.
if e.errno != EEXIST:
raise
def addReader(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd not in self.reader_set:
self.reader_set.add(fd)
self.epoll.modify(fd, EPOLLIN | (
fd in self.writer_set and EPOLLOUT))
def removeReader(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd in self.reader_set:
self.reader_set.remove(fd)
self.epoll.modify(fd, fd in self.writer_set and EPOLLOUT)
def addWriter(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd not in self.writer_set:
self.writer_set.add(fd)
self.epoll.modify(fd, EPOLLOUT | (
fd in self.reader_set and EPOLLIN))
def removeWriter(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd in self.writer_set:
self.writer_set.remove(fd)
self.epoll.modify(fd, fd in self.reader_set and EPOLLIN)
def log(self):
logging.info('Event Manager:')
logging.info(' Readers: %r', list(self.reader_set))
logging.info(' Writers: %r', list(self.writer_set))
logging.info(' Connections:')
pending_set = set(self._pending_processing)
for fd, conn in self.connection_dict.items():
logging.info(' %r: %r (pending=%r)', fd, conn,
conn in pending_set)
# Default to EpollEventManager.
EventManager = EpollEventManager
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/exception.py 0000664 0000000 0000000 00000001605 13276562645 0025413 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
class NeoException(Exception):
pass
class PrimaryElected(NeoException):
pass
class PrimaryFailure(NeoException):
pass
class StoppedOperation(NeoException):
pass
class NodeNotReady(NeoException):
pass
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/handler.py 0000664 0000000 0000000 00000032120 13276562645 0025026 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sys
from collections import deque
from operator import itemgetter
from . import logging
from .connection import ConnectionClosed
from .exception import PrimaryElected
from .protocol import (NodeStates, NodeTypes, Packets, uuid_str,
Errors, BackendNotImplemented, NonReadableCell, NotReadyError,
PacketMalformedError, ProtocolError, UnexpectedPacketError)
from .util import cached_property
class DelayEvent(Exception):
pass
class EventHandler(object):
"""This class handles events."""
def __new__(cls, app, *args, **kw):
try:
return app._handlers[cls]
except AttributeError: # for BackupApplication
self = object.__new__(cls)
except KeyError:
self = object.__new__(cls)
if cls.__init__ is object.__init__:
app._handlers[cls] = self
self.app = app
return self
def __repr__(self):
return self.__class__.__name__
def __unexpectedPacket(self, conn, packet, message=None):
"""Handle an unexpected packet."""
if message is None:
message = 'unexpected packet type %s in %s' % (type(packet),
self.__class__.__name__)
else:
message = 'unexpected packet: %s in %s' % (message,
self.__class__.__name__)
logging.error(message)
conn.answer(Errors.ProtocolError(message))
conn.abort()
def dispatch(self, conn, packet, kw={}):
"""This is a helper method to handle various packet types."""
try:
conn.setPeerId(packet.getId())
try:
method = getattr(self, packet.handler_method_name)
except AttributeError:
raise UnexpectedPacketError('no handler found')
args = packet.decode() or ()
method(conn, *args, **kw)
except DelayEvent, e:
assert not kw, kw
self.getEventQueue().queueEvent(method, conn, args, *e.args)
except UnexpectedPacketError, e:
if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args)
except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', conn, e)
conn.close()
except NotReadyError, message:
if not conn.isClosed():
if not message.args:
message = 'Retry Later'
message = str(message)
conn.answer(Errors.NotReady(message))
conn.abort()
except ProtocolError, message:
if not conn.isClosed():
message = str(message)
conn.answer(Errors.ProtocolError(message))
conn.abort()
except BackendNotImplemented, message:
m = message[0]
conn.answer(Errors.BackendNotImplemented(
"%s.%s does not implement %s"
% (m.im_class.__module__, m.im_class.__name__, m.__name__)))
except NonReadableCell, e:
conn.answer(Errors.NonReadableCell())
except AssertionError:
e = sys.exc_info()
try:
try:
conn.close()
except Exception:
logging.exception("")
raise e[0], e[1], e[2]
finally:
del e
def checkClusterName(self, name):
# raise an exception if the given name mismatch the current cluster name
if self.app.name != name:
logging.error('reject an alien cluster')
raise ProtocolError('invalid cluster name')
# Network level handlers
def packetReceived(self, *args):
"""Called when a packet is received."""
self.dispatch(*args)
def connectionStarted(self, conn):
"""Called when a connection is started."""
logging.debug('connection started for %r', conn)
def connectionCompleted(self, conn):
"""Called when a connection is completed."""
logging.debug('connection completed for %r (from %s:%u)',
conn, *conn.getConnector().getAddress())
def connectionFailed(self, conn):
"""Called when a connection failed."""
logging.debug('connection failed for %r', conn)
def connectionClosed(self, conn):
"""Called when a connection is closed by the peer."""
logging.debug('connection closed for %r', conn)
self.connectionLost(conn, NodeStates.DOWN)
def connectionLost(self, conn, new_state):
""" this is a method to override in sub-handlers when there is no need
to make distinction from the kind event that closed the connection """
pass
# Packet handlers.
def notPrimaryMaster(self, conn, primary, known_master_list):
nm = self.app.nm
for address in known_master_list:
nm.createMaster(address=address)
if primary is not None:
primary = known_master_list[primary]
assert primary != self.app.server
raise PrimaryElected(nm.getByAddress(primary))
def _acceptIdentification(*args):
pass
def acceptIdentification(self, conn, node_type, uuid,
num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
assert node.getConnection() is conn, (node.getConnection(), conn)
if node.getType() == node_type:
if node_type == NodeTypes.MASTER:
other = app.nm.getByUUID(uuid)
if other is not None:
other.setUUID(None)
node.setUUID(uuid)
node.setRunning()
if your_uuid is None:
raise ProtocolError('No UUID supplied')
logging.info('connected to a primary master node')
if app.uuid != your_uuid:
app.uuid = your_uuid
logging.info('Got a new UUID: %s', uuid_str(your_uuid))
app.id_timestamp = None
elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids')
node.setIdentified()
self._acceptIdentification(node, num_partitions, num_replicas)
return
conn.close()
def notifyNodeInformation(self, conn, *args):
app = self.app
app.nm.update(app, *args)
def ping(self, conn):
conn.answer(Packets.Pong())
def pong(self, conn):
pass
def closeClient(self, conn):
conn.server = False
if not conn.client:
conn.close()
# Error packet handlers.
def error(self, conn, code, message, **kw):
try:
getattr(self, Errors[code])(conn, message)
except (AttributeError, ValueError):
raise UnexpectedPacketError(message)
# XXX: For some errors, the connection should have been closed by the remote
# peer. But what happens if it's not the case because of some bug ?
def protocolError(self, conn, message):
logging.error('protocol error: %s', message)
def notReadyError(self, conn, message):
logging.error('not ready: %s', message)
def timeoutError(self, conn, message):
logging.error('timeout error: %s', message)
def ack(self, conn, message):
logging.debug("no error message: %s", message)
def backendNotImplemented(self, conn, message):
raise NotImplementedError(message)
class MTEventHandler(EventHandler):
"""Base class of handler implementations for MTClientConnection"""
@cached_property
def dispatcher(self):
return self.app.dispatcher
def dispatch(self, conn, packet, kw={}):
assert conn.lock._is_owned() # XXX: see also lockCheckWrapper
super(MTEventHandler, self).dispatch(conn, packet, kw)
def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread."""
if packet.isResponse():
if packet.poll_thread:
self.dispatch(conn, packet, kw)
kw = {}
if not (self.dispatcher.dispatch(conn, packet.getId(), packet, kw)
or type(packet) is Packets.Pong or conn.isClosed()):
raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet))
else:
self.dispatch(conn, packet, kw)
def connectionLost(self, conn, new_state):
self.dispatcher.unregister(conn)
def connectionFailed(self, conn):
self.dispatcher.unregister(conn)
def unexpectedInAnswerHandler(*args, **kw):
raise Exception('Unexpected event in an answer handler')
class AnswerBaseHandler(EventHandler):
connectionStarted = unexpectedInAnswerHandler
connectionCompleted = unexpectedInAnswerHandler
connectionFailed = unexpectedInAnswerHandler
timeoutExpired = unexpectedInAnswerHandler
connectionClosed = unexpectedInAnswerHandler
packetReceived = unexpectedInAnswerHandler
protocolError = unexpectedInAnswerHandler
def acceptIdentification(*args):
pass
def connectionClosed(self, conn):
raise ConnectionClosed
class _DelayedConnectionEvent(EventHandler):
# WARNING: This assumes that the connection handler does not change.
handler_method_name = '_func'
__new__ = object.__new__
def __init__(self, func, conn, args):
self._args = args
self._conn = conn
self._func = func
self._msg_id = conn.getPeerId()
def __call__(self):
conn = self._conn
if not conn.isClosed():
msg_id = conn.getPeerId()
try:
self.dispatch(conn, self)
finally:
conn.setPeerId(msg_id)
def __repr__(self):
return '<%s: 0x%x %s>' % (self._func.__name__, self._msg_id, self._conn)
def decode(self):
return self._args
def getEventQueue(self):
raise
def getId(self):
return self._msg_id
class EventQueue(object):
def __init__(self):
self._event_queue = []
self._executing_event = -1
# Stable sort when 2 keys are equal.
# XXX: Is it really useful to keep events with same key ordered
# chronologically ? The caller could use more specific keys. For
# write-locks (by the storage node), the locking tid seems enough.
sortQueuedEvents = (lambda key=itemgetter(0): lambda self:
self._event_queue.sort(key=key))()
def queueEvent(self, func, conn=None, args=(), key=None):
assert self._executing_event < 0, self._executing_event
self._event_queue.append((key, func if conn is None else
_DelayedConnectionEvent(func, conn, args)))
if key is not None:
self.sortQueuedEvents()
def sortAndExecuteQueuedEvents(self):
if self._executing_event < 0:
self.sortQueuedEvents()
self.executeQueuedEvents()
else:
# We can't sort events when they're being processed.
self._executing_event = 1
def executeQueuedEvents(self):
# Not reentrant. When processing a queued event, calling this method
# only tells the caller to retry all events from the beginning, because
# events for the same connection must be processed in chronological
# order.
queue = self._event_queue
if queue: # return quickly if the queue is empty
self._executing_event += 1
if self._executing_event:
return
done = []
while 1:
try:
for i, event in enumerate(queue):
try:
event[1]()
done.append(i)
except DelayEvent:
pass
if self._executing_event:
break
else:
break
finally:
while done:
del queue[done.pop()]
self._executing_event = 0
# What sortAndExecuteQueuedEvents could not do immediately
# is done here:
if event[0] is not None:
self.sortQueuedEvents()
self._executing_event = -1
def logQueuedEvents(self):
if self._event_queue:
logging.info(" Pending events:")
for event in self._event_queue:
logging.info(' %r', event)
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/interfaces.py 0000664 0000000 0000000 00000006211 13276562645 0025536 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2015-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import inspect
def check_signature(reference, function):
# args, varargs, varkw, defaults
A, B, C, D = inspect.getargspec(reference)
a, b, c, d = inspect.getargspec(function)
x = len(A) - len(a)
if x < 0: # ignore extra default parameters
if B or x + len(d) < 0:
return False
del a[x:]
d = d[:x] or None
elif x: # different signature
return a == A[:-x] and (b or a and c) and (d or ()) == (D or ())[:-x]
return a == A and (b or not B) and (c or not C) and d == D
def implements(obj, ignore=()):
ignore = set(ignore)
not_implemented = []
wrong_signature = []
if isinstance(obj, type):
tobj = obj
else:
tobj = type(obj)
mro = tobj.mro()
mro.reverse()
base = []
for name in dir(obj):
for x in mro:
try:
x = getattr(x, name)
break
except AttributeError:
pass
if hasattr(x, "__abstract__") or hasattr(x, "__requires__"):
base.append((name, x.__func__))
try:
while 1:
name, func = base.pop()
x = getattr(obj, name)
if type(getattr(x, '__self__', None)) is tobj:
x = x.__func__
if x is func:
try:
x = func.__requires__
except AttributeError:
try:
ignore.remove(name)
except KeyError:
not_implemented.append(name)
else:
base.extend((x.__name__, x) for x in x)
elif not check_signature(func, x):
wrong_signature.append(name)
except IndexError: # base empty
assert not ignore, ignore
assert not not_implemented, not_implemented
assert not wrong_signature, wrong_signature
return obj
def _set_code(func):
args, varargs, varkw, _ = inspect.getargspec(func)
if varargs:
args.append("*" + varargs)
if varkw:
args.append("**" + varkw)
exec "def %s(%s): raise NotImplementedError\nf = %s" % (
func.__name__, ",".join(args), func.__name__)
func.func_code = f.func_code
def abstract(func):
_set_code(func)
func.__abstract__ = 1
return func
def requires(*args):
for func in args:
_set_code(func)
def decorator(func):
func.__requires__ = args
return func
return decorator
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/locking.py 0000664 0000000 0000000 00000015215 13276562645 0025045 0 ustar 00root root 0000000 0000000 import os
import sys
import threading
import traceback
from collections import deque
from time import time
from Queue import Empty
"""
Verbose locking classes.
Python threading module contains a simple logging mechanism, but:
- It's limitted to RLock class
- It's enabled instance by instance
- Choice to log or not is done at instantiation
- It does not emit any log before trying to acquire lock
This file defines a VerboseLock class implementing basic lock API and
logging in appropriate places with extensive details.
It can be toggled globally by setting NEO_VERBOSE_LOCKING environment
variable to a non-empty value before this module is loaded.
There is no overhead at all when disabled (passthrough to threading
classes).
"""
class LockUser(object):
def __init__(self, message, level=0):
t = threading.currentThread()
ident = getattr(t, 'node_name', t.name)
# This class is instantiated from a place desiring to known what
# called it.
# limit=1 would return execution position in this method
# limit=2 would return execution position in caller
# limit=3 returns execution position in caller's caller
# Additionnal level value (should be positive only) can be used when
# more intermediate calls are involved
self.stack = stack = traceback.extract_stack()[:-2-level]
path, line_number, func_name, line = stack[-1]
# Simplify path. Only keep 3 last path elements. It is enough for
# current Neo directory structure.
path = os.path.join('...', *path.split(os.path.sep)[-3:])
self.time = time()
self.ident = "%s@%r %s:%s %s" % (
ident, self.time, path, line_number, line)
self.note(message)
self.ident = ident
def __eq__(self, other):
return isinstance(other, self.__class__) and self.ident == other.ident
def __repr__(self):
return "%s@%r" % (self.ident, self.time)
def formatStack(self):
return ''.join(traceback.format_list(self.stack))
def note():
write = sys.stderr.write
flush = sys.stderr.flush
def note(self, message):
write("[%s] %s\n" % (self.ident, message))
flush()
return note
note = note()
class VerboseLockBase(object):
_error_class = threading.ThreadError
_release_error = 'release unlocked lock'
def __init__(self, check_owner, name=None, verbose=None):
self._check_owner = check_owner
self._name = name or '<%s@%X>' % (self.__class__.__name__, id(self))
self.owner = None
self.waiting = []
LockUser(repr(self) + " created", 1)
def acquire(self, blocking=1):
owner = self.owner if self._locked() else None
me = LockUser("%s.acquire(%s). Owned by %r. Waiting: %r"
% (self, blocking, owner, self.waiting))
if blocking:
if self._check_owner and me == owner:
me.note("I already own this lock: %r" % owner)
me.note("Owner traceback:\n%s" % owner.formatStack())
me.note("My traceback:\n%s" % me.formatStack())
self.waiting.append(me)
try:
locked = self.lock.acquire(blocking)
finally:
if blocking:
self.waiting.remove(me)
if locked:
self.owner = me
me.note("Lock granted. Waiting: " + repr(self.waiting))
return locked
__enter__ = acquire
def release(self):
me = LockUser("%s.release(). Waiting: %r" % (self, self.waiting))
try:
return self.lock.release()
except self._error_class:
t, v, tb = sys.exc_info()
if str(v) == self._release_error:
raise t, "%s %s (%s)" % (v, self, me), tb
raise
def __exit__(self, t, v, tb):
self.release()
def _locked(self):
raise NotImplementedError
def __repr__(self):
return self._name
class VerboseRLock(VerboseLockBase):
_error_class = RuntimeError
_release_error = 'cannot release un-acquired lock'
def __init__(self, **kw):
super(VerboseRLock, self).__init__(check_owner=False, **kw)
self.lock = threading.RLock()
def _locked(self):
return self.lock._RLock__block.locked()
def _is_owned(self):
return self.lock._is_owned()
class VerboseLock(VerboseLockBase):
def __init__(self, check_owner=True, **kw):
super(VerboseLock, self).__init__(check_owner, **kw)
self.lock = threading.Lock()
def locked(self):
return self.lock.locked()
_locked = locked
class VerboseSemaphore(VerboseLockBase):
def __init__(self, value=1, check_owner=True, **kw):
super(VerboseSemaphore, self).__init__(check_owner, **kw)
self.lock = threading.Semaphore(value)
def _locked(self):
return not self.lock._Semaphore__value
if os.getenv('NEO_VERBOSE_LOCKING'):
Lock = VerboseLock
RLock = VerboseRLock
Semaphore = VerboseSemaphore
else:
Lock = threading.Lock
RLock = threading.RLock
Semaphore = threading.Semaphore
class SimpleQueue(object):
"""
Similar to Queue.Queue but with simpler locking scheme, reducing lock
contention on "put" (benchmark shows 60% less time spent in "put").
As a result:
- only a single consumer possible ("get" vs. "get" race condition)
- only a single producer possible ("put" vs. "put" race condition)
- no blocking size limit possible
- no consumer -> producer notifications (task_done/join API)
Queue is on the critical path: any moment spent here increases client
application wait for object data, transaction completion, etc.
As we have a single consumer (client application's thread) and a single
producer (lib.dispatcher, which can be called from several threads but
serialises calls internally) for each queue, Queue.Queue's locking scheme
can be relaxed to reduce latency.
"""
__slots__ = ('_lock', '_unlock', '_popleft', '_append', '_queue')
def __init__(self):
lock = Lock()
self._lock = lock.acquire
self._unlock = lock.release
self._queue = queue = deque()
self._popleft = queue.popleft
self._append = queue.append
def get(self, block):
if block:
self._lock(False)
while True:
try:
return self._popleft()
except IndexError:
if not block:
raise Empty
self._lock()
def put(self, item):
self._append(item)
self._lock(False)
self._unlock()
def empty(self):
return not self._queue
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/logger.py 0000664 0000000 0000000 00000023476 13276562645 0024706 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from collections import deque
from functools import wraps
from logging import getLogger, Formatter, Logger, StreamHandler, \
DEBUG, WARNING
from time import time
from traceback import format_exception
import bz2, inspect, neo, os, signal, sqlite3, sys, threading
# Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE = ( 234360832 # extra memory used
- 16777264 # sum of raw data ('msg' attribute)
) // 187509 # number of records
FMT = ('%(asctime)s %(levelname)-9s %(name)-10s'
' [%(module)14s:%(lineno)3d] \n%(message)s')
class _Formatter(Formatter):
def formatTime(self, record, datefmt=None):
return Formatter.formatTime(self, record,
'%Y-%m-%d %H:%M:%S') + '.%04d' % (record.msecs * 10)
def format(self, record):
lines = iter(Formatter.format(self, record).splitlines())
prefix = lines.next()
return '\n'.join(prefix + line for line in lines)
class PacketRecord(object):
args = None
levelno = DEBUG
__init__ = property(lambda self: self.__dict__.update)
class NEOLogger(Logger):
default_root_handler = StreamHandler()
default_root_handler.setFormatter(_Formatter(FMT))
def __init__(self):
Logger.__init__(self, None)
self.parent = root = getLogger()
if not root.handlers:
root.addHandler(self.default_root_handler)
self._db = None
self._record_queue = deque()
self._record_size = 0
self._async = set()
l = threading.Lock()
self._acquire = l.acquire
release = l.release
def _release():
try:
while self._async:
self._async.pop()(self)
finally:
release()
self._release = _release
self.backlog()
def __enter__(self):
self._acquire()
return self._db
def __exit__(self, t, v, tb):
self._release()
def __async(wrapped):
def wrapper(self):
self._async.add(wrapped)
if self._acquire(0):
self._release()
return wraps(wrapped)(wrapper)
@__async
def reopen(self):
if self._db is None:
return
q = self._db.execute
if not q("SELECT id FROM packet LIMIT 1").fetchone():
q("DROP TABLE protocol")
# DROP TABLE already replaced previous data with zeros,
# so VACUUM is not really useful. But here, it should be free.
q("VACUUM")
self._setup(q("PRAGMA database_list").fetchone()[2])
@__async
def flush(self):
if self._db is None:
return
try:
for r in self._record_queue:
self._emit(r)
finally:
# Always commit, to not lose any record that we could emit.
self.commit()
self._record_queue.clear()
self._record_size = 0
def commit(self):
try:
self._db.commit()
except sqlite3.OperationalError as e:
x = e.args[0]
if x != 'database is locked':
raise
sys.stderr.write('%s: retrying to emit log...' % x)
while 1:
try:
self._db.commit()
break
except sqlite3.OperationalError as e:
if e.args[0] != x:
raise
sys.stderr.write(' ok\n')
def backlog(self, max_size=1<<24, max_packet=None):
with self:
self._max_packet = max_packet
self._max_size = max_size
if max_size is None:
self.flush()
else:
q = self._record_queue
while max_size < self._record_size:
self._record_size -= RECORD_SIZE + len(q.popleft().msg)
def _setup(self, filename=None, reset=False):
from . import protocol as p
global uuid_str
uuid_str = p.uuid_str
if self._db is not None:
self._db.close()
if not filename:
self._db = None
self._record_queue.clear()
self._record_size = 0
return
if filename:
self._db = sqlite3.connect(filename, check_same_thread=False)
q = self._db.execute
if self._max_size is None:
q("PRAGMA synchronous = OFF")
if 1: # Not only when logging everything,
# but also for interoperability with logrotate.
q("PRAGMA journal_mode = MEMORY")
if reset:
for t in 'log', 'packet':
q('DROP TABLE IF EXISTS ' + t)
q("""CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date REAL NOT NULL,
name TEXT,
level INTEGER NOT NULL,
pathname TEXT,
lineno INTEGER,
msg TEXT)
""")
q("""CREATE INDEX IF NOT EXISTS _log_i1 ON log(date)""")
q("""CREATE TABLE IF NOT EXISTS packet (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date REAL NOT NULL,
name TEXT,
msg_id INTEGER NOT NULL,
code INTEGER NOT NULL,
peer TEXT NOT NULL,
body BLOB)
""")
q("""CREATE INDEX IF NOT EXISTS _packet_i1 ON packet(date)""")
q("""CREATE TABLE IF NOT EXISTS protocol (
date REAL PRIMARY KEY NOT NULL,
text BLOB NOT NULL)
""")
with open(inspect.getsourcefile(p)) as p:
p = buffer(bz2.compress(p.read()))
for t, in q("SELECT text FROM protocol ORDER BY date DESC"):
if p == t:
break
else:
try:
t = self._record_queue[0].created
except IndexError:
t = time()
with self._db:
q("INSERT INTO protocol VALUES (?,?)", (t, p))
def setup(self, filename=None, reset=False):
with self:
self._setup(filename, reset)
__del__ = setup
def fork(self):
with self:
pid = os.fork()
if pid:
return pid
self._setup()
def isEnabledFor(self, level):
return True
def _emit(self, r):
if type(r) is PacketRecord:
ip, port = r.addr
peer = ('%s %s ([%s]:%s)' if ':' in ip else '%s %s (%s:%s)') % (
'>' if r.outgoing else '<', uuid_str(r.uuid), ip, port)
msg = r.msg
if msg is not None:
msg = buffer(msg)
self._db.execute("INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)",
(r.created, r._name, r.msg_id, r.code, peer, msg))
else:
pathname = os.path.relpath(r.pathname, *neo.__path__)
self._db.execute("INSERT INTO log VALUES (NULL,?,?,?,?,?,?)",
(r.created, r._name, r.levelno, pathname, r.lineno, r.msg))
def _queue(self, record):
record._name = self.name and str(self.name)
self._acquire()
try:
if self._max_size is None:
self._emit(record)
self.commit()
else:
self._record_size += RECORD_SIZE + len(record.msg or '')
q = self._record_queue
q.append(record)
if record.levelno < WARNING:
while self._max_size < self._record_size:
self._record_size -= RECORD_SIZE + len(q.popleft().msg)
else:
self.flush()
finally:
self._release()
def callHandlers(self, record):
if self._db is not None:
record.msg = record.getMessage()
record.args = None
if record.exc_info:
record.msg = (record.msg and record.msg + '\n') + ''.join(
format_exception(*record.exc_info)).strip()
record.exc_info = None
self._queue(record)
if Logger.isEnabledFor(self, record.levelno):
record.name = self.name or 'NEO'
self.parent.callHandlers(record)
def packet(self, connection, packet, outgoing):
if self._db is not None:
body = packet._body
if self._max_packet and self._max_packet < len(body):
body = None
self._queue(PacketRecord(
created=time(),
msg_id=packet._id,
code=packet._code,
outgoing=outgoing,
uuid=connection.getUUID(),
addr=connection.getAddress(),
msg=body))
logging = NEOLogger()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
signal.signal(signal.SIGRTMIN+1, lambda signum, frame: logging.reopen())
def patch():
def fork():
with logging:
pid = os_fork()
if not pid:
logging._setup()
return pid
os_fork = os.fork
os.fork = fork
patch()
del patch
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/node.py 0000664 0000000 0000000 00000047402 13276562645 0024347 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import errno, json, os
from time import time
from . import attributeTracker, logging
from .handler import DelayEvent, EventQueue
from .protocol import formatNodeList, uuid_str, \
NodeTypes, NodeStates, NotReadyError, ProtocolError
class Node(object):
"""This class represents a node."""
_connection = None
_identified = False
id_timestamp = None
def __init__(self, manager, address=None, uuid=None, state=NodeStates.DOWN):
self._state = state
self._address = address
self._uuid = uuid
self._manager = manager
self._last_state_change = time()
manager.add(self)
@property
def send(self):
assert self.isConnected(), 'Not connected'
return self._connection.send
@property
def ask(self):
assert self.isConnected(), 'Not connected'
return self._connection.ask
@property
def answer(self):
assert self.isConnected(), 'Not connected'
return self._connection.answer
def getLastStateChange(self):
return self._last_state_change
def getState(self):
return self._state
def setState(self, new_state):
if self._state == new_state:
return
if new_state == NodeStates.UNKNOWN:
self._manager.remove(self)
self._state = new_state
else:
old_state = self._state
self._state = new_state
self._manager._updateState(self, old_state)
self._last_state_change = time()
def setAddress(self, address):
if self._address == address:
return
old_address = self._address
self._address = address
self._manager._updateAddress(self, old_address)
def getAddress(self):
return self._address
def setUUID(self, uuid):
if self._uuid == uuid:
return
old_uuid = self._uuid
self._uuid = uuid
self._manager._updateUUID(self, old_uuid)
if self._connection is not None:
self._connection.setUUID(uuid)
def getUUID(self):
return self._uuid
def onConnectionClosed(self):
"""
Callback from node's connection when closed
"""
assert self._connection is not None
del self._connection
self._identified = False
def setConnection(self, connection, force=None):
"""
Define the connection that is currently available to this node.
If there is already a connection set, 'force' must be given:
the new connection replaces the old one if it is true. In any case,
the node must be managed by the same handler for the client and
server parts.
"""
assert connection.getUUID() in (None, self._uuid), connection
connection.setUUID(self._uuid)
conn = self._connection
if conn is None:
self._connection = connection
if connection.isServer():
self.setIdentified()
else:
assert force is not None, \
attributeTracker.whoSet(self, '_connection')
# The test on peer_id is there to protect against buggy nodes.
# XXX: handler comparison does not cover all cases: there may
# be a pending handler change, which won't be detected, or a future
# handler change which is not prevented. Complete implementation
# should allow different handlers for each connection direction,
# with in-packets client/server indicators to decide which handler
# (server-ish or client-ish) to use. There is currently no need for
# the full-fledged functionality, and it is simpler this way.
if not force or conn.getPeerId() is not None or \
type(conn.getHandler()) is not type(connection.getHandler()):
raise ProtocolError("already connected")
def on_closed():
self._connection = connection
assert connection.isServer()
self.setIdentified()
conn.setOnClose(on_closed)
conn.close()
assert not connection.isClosed(), connection
connection.setOnClose(self.onConnectionClosed)
def getConnection(self):
"""
Returns the connection to the node if available
"""
assert self._connection is not None
return self._connection
def isConnected(self, connecting=False):
"""
Returns True is a connection is established with the node
"""
return self._connection is not None and (connecting or
not self._connection.connecting)
def setIdentified(self):
assert self._connection is not None
self._identified = True
def isIdentified(self):
"""
Returns True if identification packets have been exchanged
"""
return self._identified
def __repr__(self):
addr = self._address
return '<%s(uuid=%s%s, state=%s, connection=%r%s) at %x>' % (
self.__class__.__name__,
uuid_str(self._uuid),
', address=' + ('[%s]:%s' if ':' in addr[0] else '%s:%s') % addr
if addr else '',
self._state,
self._connection,
'' if self._identified else ', not identified',
id(self),
)
def asTuple(self):
""" Returned tuple is intended to be used in protocol encoders """
return (self.getType(), self._address, self._uuid, self._state,
self.id_timestamp)
def __gt__(self, node):
# sort per UUID if defined
if self._uuid is not None:
return self._uuid > node._uuid
return self._address > node._address
def whoSetState(self):
"""
Debugging method: call this method to know who set the current
state value.
"""
return attributeTracker.whoSet(self, '_state')
attributeTracker.track(Node)
class MasterDB(object):
"""
Manages accesses to master's address database.
"""
def __init__(self, path):
self._path = path
try:
with open(path) as db:
self._set = set(map(tuple, json.load(db)))
except IOError, e:
if e.errno != errno.ENOENT:
raise
self._set = set()
self._save(True)
def _save(self, raise_on_error=False):
tmp = self._path + '#neo#'
try:
with open(tmp, 'w') as db:
json.dump(list(self._set), db)
os.rename(tmp, self._path)
except EnvironmentError:
if raise_on_error:
raise
logging.exception('failed saving list of master nodes to %r',
self._path)
finally:
try:
os.remove(tmp)
except OSError:
pass
def remove(self, addr):
if addr in self._set:
self._set.remove(addr)
self._save()
def addremove(self, old, new):
assert old != new
if None is not new not in self._set:
self._set.add(new)
elif old not in self._set:
return
self._set.discard(old)
self._save()
def __repr__(self):
return '<%s@%s: %s>' % (self.__class__.__name__, self._path,
', '.join(sorted(('[%s]:%s' if ':' in x[0] else '%s:%s') % x
for x in self._set)))
def __iter__(self):
return iter(self._set)
class NodeManager(EventQueue):
"""This class manages node status."""
_master_db = None
# TODO: rework getXXXList() methods, filter first by node type
# - getStorageList(identified=True, connected=True, )
# - getList(...)
def __init__(self, master_db=None):
"""
master_db (string)
Path to a file containing master nodes' addresses. Used to automate
master list updates. If not provided, no automation will happen.
"""
self._node_set = set()
self._address_dict = {}
self._uuid_dict = {}
self._type_dict = {}
self._state_dict = {}
if master_db is not None:
self._master_db = db = MasterDB(master_db)
for addr in db:
self.createMaster(address=addr)
self.reset()
close = __init__
def reset(self):
EventQueue.__init__(self)
self._timestamp = 0
def add(self, node):
if node in self._node_set:
logging.warning('adding a known node %r, ignoring', node)
return
assert not node.isUnknown(), node
self._node_set.add(node)
self._updateAddress(node, None)
self._updateUUID(node, None)
self.__updateSet(self._type_dict, None, node.getType(), node)
self.__updateSet(self._state_dict, None, node.getState(), node)
def remove(self, node):
self._node_set.remove(node)
# a node may have not be indexed by uuid or address, eg.:
# - a client or admin node that don't have listening address
self._address_dict.pop(node.getAddress(), None)
# - a master known by address but without UUID
self._uuid_dict.pop(node.getUUID(), None)
self._state_dict[node.getState()].remove(node)
self._type_dict[node.getType()].remove(node)
if node.isMaster() and self._master_db is not None:
self._master_db.remove(node.getAddress())
def __update(self, index_dict, old_key, new_key, node):
""" Update an index from old to new key """
if old_key is not None:
assert index_dict[old_key] is node, '%r is stored as %s, ' \
'moving %r to %s' % (index_dict[old_key], old_key, node,
new_key)
del index_dict[old_key]
if new_key is not None:
assert index_dict.get(new_key, node) is node, 'Adding %r at %r ' \
'would overwrite %r' % (node, new_key, index_dict[new_key])
index_dict[new_key] = node
def _updateAddress(self, node, old_address):
address = node.getAddress()
self.__update(self._address_dict, old_address, address, node)
if node.isMaster() and self._master_db is not None:
self._master_db.addremove(old_address, address)
def _updateUUID(self, node, old_uuid):
self.__update(self._uuid_dict, old_uuid, node.getUUID(), node)
def __updateSet(self, set_dict, old_key, new_key, node):
""" Update a set index from old to new key """
if old_key in set_dict:
set_dict[old_key].remove(node)
set_dict.setdefault(new_key, set()).add(node)
def _updateState(self, node, old_state):
assert not node.isUnknown(), node
self.__updateSet(self._state_dict, old_state, node.getState(), node)
def getList(self, node_filter=None):
if node_filter is None:
return list(self._node_set)
return filter(node_filter, self._node_set)
def getIdentifiedList(self, pool_set=None):
"""
Returns a generator to iterate over identified nodes
pool_set is an iterable of UUIDs allowed
"""
return [x for x in self._node_set if x.isIdentified() and (
pool_set is None or x.getUUID() in pool_set)]
def getConnectedList(self):
"""
Returns a generator to iterate over connected nodes
"""
# TODO: use an index
return [x for x in self._node_set if x.isConnected()]
def getByStateList(self, state):
""" Get a node list filtered per the node state """
return list(self._state_dict.get(state, ()))
def _getTypeList(self, node_type, only_identified=False):
node_set = self._type_dict.get(node_type, ())
if only_identified:
return [x for x in node_set if x.isIdentified()]
return list(node_set)
def getByAddress(self, address):
""" Return the node that match with a given address """
return self._address_dict.get(address, None)
def getByUUID(self, uuid, *id_timestamp):
"""Return the node that matches with a given UUID
If an id timestamp is passed, DelayEvent is raised if identification
must be delayed. This is because we rely only on the notifications from
the master to recognize nodes (otherwise, we could get id conflicts)
and such notifications may be late in some cases, even when the master
expects us to not reject the connection.
"""
node = self._uuid_dict.get(uuid)
if id_timestamp:
id_timestamp, = id_timestamp
if not node or node.id_timestamp != id_timestamp:
if self._timestamp < id_timestamp:
raise DelayEvent
# The peer got disconnected from the master.
raise NotReadyError('unknown by master')
return node
def _createNode(self, klass, address=None, uuid=None, **kw):
by_address = self.getByAddress(address)
by_uuid = self.getByUUID(uuid)
if by_address is None and by_uuid is None:
node = klass(self, address=address, uuid=uuid, **kw)
else:
if by_uuid is None or by_address is by_uuid:
node = by_address
elif by_address is None:
node = by_uuid
else:
raise ValueError('Got different nodes for uuid %s: %r and '
'address %r: %r.' % (uuid_str(uuid), by_uuid, address,
by_address))
if uuid is not None:
node_uuid = node.getUUID()
if node_uuid is None:
node.setUUID(uuid)
elif node_uuid != uuid:
raise ValueError('Expected uuid %s on node %r' % (
uuid_str(uuid), node))
if address is not None:
node_address = node.getAddress()
if node_address is None:
node.setAddress(address)
elif node_address != address:
raise ValueError('Expected address %r on node %r' % (
address, node))
assert node.__class__ is klass, (node.__class__, klass)
return node
def createFromNodeType(self, node_type, **kw):
return self._createNode(NODE_TYPE_MAPPING[node_type], **kw)
def update(self, app, timestamp, node_list):
assert self._timestamp < timestamp, (self._timestamp, timestamp)
self._timestamp = timestamp
added_list = [] if app.id_timestamp is None else None
for node_type, addr, uuid, state, id_timestamp in node_list:
# This should be done here (although klass might not be used in this
# iteration), as it raises if type is not valid.
klass = NODE_TYPE_MAPPING[node_type]
# lookup in current table
node_by_uuid = self.getByUUID(uuid)
node_by_addr = self.getByAddress(addr)
node = node_by_addr or node_by_uuid
log_args = node_type, uuid_str(uuid), addr, state, id_timestamp
if node is None:
assert state != NodeStates.UNKNOWN, (self._node_set,) + log_args
node = self._createNode(klass, address=addr, uuid=uuid,
state=state)
logging.debug('creating node %r', node)
else:
assert isinstance(node, klass), 'node %r is not ' \
'of expected type: %r' % (node, klass)
if None is not node_by_uuid is not node_by_addr is not None:
assert added_list is not None, \
'Discrepancy between node_by_uuid (%r) and ' \
'node_by_addr (%r)' % (node_by_uuid, node_by_addr)
node_by_uuid.setUUID(None)
if state == NodeStates.UNKNOWN:
logging.debug('dropping node %r (%r), found with %s '
'%s %s %s %s', node, node.isConnected(), *log_args)
if node.isConnected():
# Cut this connection, node removed by handler.
# It's important for a storage to disconnect nodes that
# aren't connected to the primary master, in order to
# avoid conflict of node id. The clients will first
# reconnect to the master because they cleared their
# partition table upon disconnection.
node.getConnection().close()
if app.uuid != uuid: # XXX
dropped = app.pt.dropNode(node)
assert dropped, node
self.remove(node)
continue
logging.debug('updating node %r to %s %s %s %s %s',
node, *log_args)
node.setUUID(uuid)
node.setAddress(addr)
node.setState(state)
node.id_timestamp = id_timestamp
if app.uuid == uuid:
app.id_timestamp = id_timestamp
if added_list is not None:
added_list.append(node)
if added_list is not None:
assert app.id_timestamp is not None
# For the first notification, we receive a full list of nodes from
# the master. Remove all unknown nodes from a previous connection.
for node in self._node_set.difference(added_list):
if app.pt.dropNode(node):
self.remove(node)
self.log()
self.executeQueuedEvents()
def log(self):
logging.info('Node manager : %u nodes', len(self._node_set))
if self._node_set:
logging.info('\n'.join(formatNodeList(
map(Node.asTuple, self._node_set), ' * ')))
self.logQueuedEvents()
@apply
def NODE_TYPE_MAPPING():
def setmethod(cls, attr, value):
assert not hasattr(cls, attr), (cls, attr)
setattr(cls, attr, value)
def setfullmethod(cls, attr, value):
value.__name__ = attr
setmethod(cls, attr, value)
def camel_case(enum):
return str(enum).replace('_', ' ').title().replace(' ', '')
def setStateAccessors(state):
name = camel_case(state)
setfullmethod(Node, 'set' + name, lambda self: self.setState(state))
setfullmethod(Node, 'is' + name, lambda self: self._state == state)
map(setStateAccessors, NodeStates)
node_type_dict = {}
getType = lambda node_type: staticmethod(lambda: node_type)
true = staticmethod(lambda: True)
createNode = lambda cls: lambda self, **kw: self._createNode(cls, **kw)
getList = lambda node_type: lambda self, only_identified=False: \
self._getTypeList(node_type, only_identified)
bases = Node,
for node_type in NodeTypes:
name = camel_case(node_type)
is_name = 'is' + name
setmethod(Node, is_name, bool)
node_type_dict[node_type] = cls = type(name + 'Node', bases, {
'getType': getType(node_type),
is_name: true,
})
setfullmethod(NodeManager, 'create' + name, createNode(cls))
setfullmethod(NodeManager, 'get%sList' % name, getList(node_type))
return node_type_dict
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/patch.py 0000664 0000000 0000000 00000005517 13276562645 0024522 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2015-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
def speedupFileStorageTxnLookup():
"""Speed up lookup of start position when instantiating an iterator
FileStorage does not index the file positions of transactions.
With this patch, we use the existing {oid->file_pos} index to bisect the
the closest file position to start iterating.
"""
from array import array
from bisect import bisect
from collections import defaultdict
from neo.lib import logging
from ZODB.FileStorage.FileStorage import FileStorage, FileIterator
typecode = 'L' if array('I').itemsize < 4 else 'I'
class Start(object):
def __init__(self, read_data_header, h, tid):
self.read_data_header = read_data_header
self.h = h << 32
self.tid = tid
def __lt__(self, l):
return self.tid < self.read_data_header(self.h | l).tid
def iterator(self, start=None, stop=None):
if start:
try:
index = self._tidindex
except AttributeError:
logging.info("Building index for faster lookup of"
" transactions in the FileStorage DB.")
# Cache a sorted list of all the file pos from oid index.
# To reduce memory usage, the list is splitted in arrays of
# low order 32-bit words.
tindex = defaultdict(lambda: array(typecode))
for x in self._index.itervalues():
tindex[x >> 32].append(x & 0xffffffff)
index = self._tidindex = []
for h, l in sorted(tindex.iteritems()):
l = array(typecode, sorted(l))
x = self._read_data_header(h << 32 | l[0])
index.append((x.tid, h, l))
logging.info("... index built")
x = bisect(index, (start,)) - 1
if x >= 0:
x, h, index = index[x]
x = self._read_data_header
h = x(h << 32 | index[bisect(index, Start(x, h, start)) - 1])
return FileIterator(self._file_name, start, stop, h.tloc)
return FileIterator(self._file_name, start, stop)
FileStorage.iterator = iterator
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/protocol.py 0000664 0000000 0000000 00000134457 13276562645 0025272 0 ustar 00root root 0000000 0000000
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sys
import traceback
from cStringIO import StringIO
from struct import Struct
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 1
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# Avoid memory errors on corrupted data.
MAX_PACKET_SIZE = 0x4000000
PACKET_HEADER_FORMAT = Struct('!LHL')
RESPONSE_MASK = 0x8000
class Enum(tuple):
class Item(int):
__slots__ = '_name', '_enum'
def __str__(self):
return self._name
def __repr__(self):
return "" % (self._name, self)
def __eq__(self, other):
if type(other) is self.__class__:
assert other._enum is self._enum
return other is self
return other == int(self)
def __new__(cls, func):
names = func.func_code.co_names
self = tuple.__new__(cls, map(cls.Item, xrange(len(names))))
self._name = func.__name__
for item, name in zip(self, names):
setattr(self, name, item)
item._name = name
item._enum = self
return self
def __repr__(self):
return "" % self._name
@Enum
def ErrorCodes():
ACK
NOT_READY
OID_NOT_FOUND
TID_NOT_FOUND
OID_DOES_NOT_EXIST
PROTOCOL_ERROR
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
NON_READABLE_CELL
READ_ONLY_ACCESS
INCOMPLETE_TRANSACTION
@Enum
def ClusterStates():
# The cluster is initially in the RECOVERING state, and it goes back to
# this state whenever the partition table becomes non-operational again.
# An election of the primary master always happens, in case of a network
# cut between a primary master and all other nodes. The primary master:
# - first recovers its own data by reading it from storage nodes;
# - waits for the partition table be operational;
# - automatically switch to VERIFYING if the cluster can be safely started.
RECOVERING
# Transient state, used to:
# - replay the transaction log, in case of unclean shutdown;
# - and actually truncate the DB if the user asked to do so.
# Then, the cluster either goes to RUNNING or STARTING_BACKUP state.
VERIFYING
# Normal operation. The DB is read-writable by clients.
RUNNING
# Transient state to shutdown the whole cluster.
STOPPING
# Transient state, during which the master (re)connect to the upstream
# master.
STARTING_BACKUP
# Backup operation. The master is notified of new transactions thanks to
# invalidations and orders storage nodes to fetch them from upstream.
# Because cells are synchronized independently, the DB is often
# inconsistent.
BACKINGUP
# Transient state, when the user decides to go back to RUNNING state.
# The master stays in this state until the DB is consistent again.
# In case of failure, the cluster will go back to backup mode.
STOPPING_BACKUP
@Enum
def NodeTypes():
MASTER
STORAGE
CLIENT
ADMIN
@Enum
def NodeStates():
UNKNOWN
DOWN
RUNNING
PENDING
@Enum
def CellStates():
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Write-only cell. Last transactions are missing because storage is/was down
# for a while, or because it is new for the partition. It usually becomes
# UP_TO_DATE when replication is done.
OUT_OF_DATE
# Same as UP_TO_DATE, except that it will be discarded as soon as another
# node finishes to replicate it. It means a partition is moved from 1 node
# to another. It is also discarded immediately if out-of-date.
FEEDING
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
# A check revealed that data differs from other replicas. Cell is neither
# readable nor writable.
CORRUPTED
# used for logging
node_state_prefix_dict = {
NodeStates.RUNNING: 'R',
NodeStates.DOWN: 'D',
NodeStates.UNKNOWN: 'U',
NodeStates.PENDING: 'P',
}
# used for logging
cell_state_prefix_dict = {
CellStates.UP_TO_DATE: 'U',
CellStates.OUT_OF_DATE: 'O',
CellStates.FEEDING: 'F',
CellStates.DISCARDED: 'D',
CellStates.CORRUPTED: 'C',
}
# Other constants.
INVALID_TID = \
INVALID_OID = '\xff' * 8
INVALID_PARTITION = 0xffffffff
ZERO_HASH = '\0' * 20
ZERO_TID = \
ZERO_OID = '\0' * 8
MAX_TID = '\x7f' + '\xff' * 7 # SQLite does not accept numbers above 2^63-1
# High-order byte:
# 7 6 5 4 3 2 1 0
# | | | | +-+-+-+-- reserved (0)
# | +-+-+---------- node type
# +---------------- temporary if negative
# UUID namespaces are required to prevent conflicts when the master generate
# new uuid before it knows uuid of existing storage nodes. So only the high
# order bit is really important and the 31 other bits could be random.
# Extra namespace information and non-randomness of 3 LOB help to read logs.
UUID_NAMESPACES = {
NodeTypes.STORAGE: 0x00,
NodeTypes.MASTER: -0x10,
NodeTypes.CLIENT: -0x20,
NodeTypes.ADMIN: -0x30,
}
uuid_str = (lambda ns: lambda uuid:
ns[uuid >> 24] + str(uuid & 0xffffff) if uuid else str(uuid)
)({v: str(k)[0] for k, v in UUID_NAMESPACES.iteritems()})
class ProtocolError(Exception):
""" Base class for protocol errors, close the connection """
class PacketMalformedError(ProtocolError):
"""Close the connection"""
class UnexpectedPacketError(ProtocolError):
"""Close the connection"""
class NotReadyError(ProtocolError):
""" Just close the connection """
class BackendNotImplemented(Exception):
""" Method not implemented by backend storage """
class NonReadableCell(Exception):
"""Read-access to a cell that is actually non-readable
This happens in case of race condition at processing partition table
updates: client's PT is older or newer than storage's. The latter case is
possible because the master must validate any end of replication, which
means that the storage node can't anticipate the PT update (concurrently,
there may be a first tweaks that moves the replicated cell to another node,
and a second one that moves it back).
On such event, the client must retry, preferably another cell.
"""
class Packet(object):
"""
Base class for any packet definition. The _fmt class attribute must be
defined for any non-empty packet.
"""
_ignore_when_closed = False
_request = None
_answer = None
_body = None
_code = None
_fmt = None
_id = None
nodelay = True
poll_thread = False
def __init__(self, *args):
assert self._code is not None, "Packet class not registered"
if args:
buf = StringIO()
self._fmt.encode(buf.write, args)
self._body = buf.getvalue()
else:
self._body = ''
def decode(self):
assert self._body is not None
if self._fmt is None:
return ()
buf = StringIO(self._body)
try:
return self._fmt.decode(buf.read)
except ParseError, msg:
name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
def setContent(self, msg_id, body):
""" Register the packet content for future decoding """
self._id = msg_id
self._body = body
def setId(self, value):
self._id = value
def getId(self):
assert self._id is not None, "No identifier applied on the packet"
return self._id
def encode(self):
""" Encode a packet as a string to send it over the network """
content = self._body
return (PACKET_HEADER_FORMAT.pack(self._id, self._code, len(content)),
content)
def __len__(self):
return PACKET_HEADER_FORMAT.size + len(self._body)
def __repr__(self):
return '%s[%r]' % (self.__class__.__name__, self._id)
def __eq__(self, other):
""" Compare packets with their code instead of content """
if other is None:
return False
assert isinstance(other, Packet)
return self._code == other._code
def isError(self):
return isinstance(self, Error)
def isResponse(self):
return self._code & RESPONSE_MASK == RESPONSE_MASK
def getAnswerClass(self):
return self._answer
def ignoreOnClosedConnection(self):
"""
Tells if this packet must be ignored when its connection is closed
when it is handled.
"""
return self._ignore_when_closed
class ParseError(Exception):
"""
An exception that encapsulate another and build the 'path' of the
packet item that generate the error.
"""
def __init__(self, item, trace):
Exception.__init__(self)
self._trace = trace
self._items = [item]
def append(self, item):
self._items.append(item)
def __repr__(self):
chain = '/'.join([item.getName() for item in reversed(self._items)])
return 'at %s:\n%s' % (chain, self._trace)
__str__ = __repr__
# packet parsers
class PItem(object):
"""
Base class for any packet item, _encode and _decode must be overridden
by subclasses.
"""
def __init__(self, name):
self._name = name
def __repr__(self):
return self.__class__.__name__
def getName(self):
return self._name
def _trace(self, method, *args):
try:
return method(*args)
except ParseError, e:
# trace and forward exception
e.append(self)
raise
except Exception:
# original exception, encapsulate it
trace = ''.join(traceback.format_exception(*sys.exc_info())[2:])
raise ParseError(self, trace)
def encode(self, writer, items):
return self._trace(self._encode, writer, items)
def decode(self, reader):
return self._trace(self._decode, reader)
def _encode(self, writer, items):
raise NotImplementedError, self.__class__.__name__
def _decode(self, reader):
raise NotImplementedError, self.__class__.__name__
class PStruct(PItem):
"""
Aggregate other items
"""
def __init__(self, name, *items):
PItem.__init__(self, name)
self._items = items
def _encode(self, writer, items):
assert len(self._items) == len(items), (items, self._items)
for item, value in zip(self._items, items):
item.encode(writer, value)
def _decode(self, reader):
return tuple([item.decode(reader) for item in self._items])
class PStructItem(PItem):
"""
A single value encoded with struct
"""
def __init__(self, name):
PItem.__init__(self, name)
struct = Struct(self._fmt)
self.pack = struct.pack
self.unpack = struct.unpack
self.size = struct.size
def _encode(self, writer, value):
writer(self.pack(value))
def _decode(self, reader):
return self.unpack(reader(self.size))[0]
class PStructItemOrNone(PStructItem):
def _encode(self, writer, value):
return writer(self._None if value is None else self.pack(value))
def _decode(self, reader):
value = reader(self.size)
return None if value == self._None else self.unpack(value)[0]
class POption(PStruct):
def _encode(self, writer, value):
if value is None:
writer('\0')
else:
writer('\1')
PStruct._encode(self, writer, value)
def _decode(self, reader):
if '\0\1'.index(reader(1)):
return PStruct._decode(self, reader)
class PList(PStructItem):
"""
A list of homogeneous items
"""
_fmt = '!L'
def __init__(self, name, item):
PStructItem.__init__(self, name)
self._item = item
def _encode(self, writer, items):
writer(self.pack(len(items)))
item = self._item
for value in items:
item.encode(writer, value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
item = self._item
return [item.decode(reader) for _ in xrange(length)]
class PDict(PStructItem):
"""
A dictionary with custom key and value formats
"""
_fmt = '!L'
def __init__(self, name, key, value):
PStructItem.__init__(self, name)
self._key = key
self._value = value
def _encode(self, writer, item):
assert isinstance(item , dict), (type(item), item)
writer(self.pack(len(item)))
key, value = self._key, self._value
for k, v in item.iteritems():
key.encode(writer, k)
value.encode(writer, v)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
key, value = self._key, self._value
new_dict = {}
for _ in xrange(length):
k = key.decode(reader)
v = value.decode(reader)
new_dict[k] = v
return new_dict
class PEnum(PStructItem):
"""
Encapsulate an enumeration value
"""
_fmt = '!l'
def __init__(self, name, enum):
PStructItem.__init__(self, name)
self._enum = enum
def _encode(self, writer, item):
if item is None:
item = -1
writer(self.pack(item))
def _decode(self, reader):
code = self.unpack(reader(self.size))[0]
if code == -1:
return None
try:
return self._enum[code]
except KeyError:
enum = self._enum.__class__.__name__
raise ValueError, 'Invalid code for %s enum: %r' % (enum, code)
class PString(PStructItem):
"""
A variable-length string
"""
_fmt = '!L'
def _encode(self, writer, value):
writer(self.pack(len(value)))
writer(value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
return reader(length)
class PAddress(PString):
"""
An host address (IPv4/IPv6)
"""
def __init__(self, name):
PString.__init__(self, name)
self._port = Struct('!H')
def _encode(self, writer, address):
if address:
host, port = address
PString._encode(self, writer, host)
writer(self._port.pack(port))
else:
PString._encode(self, writer, '')
def _decode(self, reader):
host = PString._decode(self, reader)
if host:
p = self._port
return host, p.unpack(reader(p.size))[0]
class PBoolean(PStructItem):
"""
A boolean value, encoded as a single byte
"""
_fmt = '!?'
class PNumber(PStructItem):
"""
A integer number (4-bytes length)
"""
_fmt = '!L'
class PIndex(PStructItem):
"""
A big integer to defined indexes in a huge list.
"""
_fmt = '!Q'
class PPTID(PStructItemOrNone):
"""
A None value means an invalid PTID
"""
_fmt = '!Q'
_None = Struct(_fmt).pack(0)
class PChecksum(PItem):
"""
A hash (SHA1)
"""
def _encode(self, writer, checksum):
assert len(checksum) == 20, (len(checksum), checksum)
writer(checksum)
def _decode(self, reader):
return reader(20)
class PSignedNull(PStructItemOrNone):
_fmt = '!l'
_None = Struct(_fmt).pack(0)
class PUUID(PSignedNull):
"""
An UUID (node identifier, 4-bytes signed integer)
"""
class PTID(PItem):
"""
A transaction identifier
"""
def _encode(self, writer, tid):
if tid is None:
tid = INVALID_TID
assert len(tid) == 8, (len(tid), tid)
writer(tid)
def _decode(self, reader):
tid = reader(8)
if tid == INVALID_TID:
tid = None
return tid
# same definition, for now
POID = PTID
class PFloat(PStructItemOrNone):
"""
A float number (8-bytes length)
"""
_fmt = '!d'
_None = '\xff' * 8
# common definitions
PFEmpty = PStruct('no_content')
PFNodeType = PEnum('type', NodeTypes)
PFNodeState = PEnum('state', NodeStates)
PFCellState = PEnum('state', CellStates)
PFNodeList = PList('node_list',
PStruct('node',
PFNodeType,
PAddress('address'),
PUUID('uuid'),
PFNodeState,
PFloat('id_timestamp'),
),
)
PFCellList = PList('cell_list',
PStruct('cell',
PUUID('uuid'),
PFCellState,
),
)
PFRowList = PList('row_list',
PStruct('row',
PNumber('offset'),
PFCellList,
),
)
PFHistoryList = PList('history_list',
PStruct('history_entry',
PTID('serial'),
PNumber('size'),
),
)
PFUUIDList = PList('uuid_list',
PUUID('uuid'),
)
PFTidList = PList('tid_list',
PTID('tid'),
)
PFOidList = PList('oid_list',
POID('oid'),
)
# packets definition
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
usually. Any -> Any.
"""
_fmt = PStruct('error',
PNumber('code'),
PString('message'),
)
class Ping(Packet):
"""
Check if a peer is still alive. Any -> Any.
"""
_answer = PFEmpty
class CloseClient(Packet):
"""
Tell peer it can close the connection if it has finished with us. Any -> Any
"""
class RequestIdentification(Packet):
"""
Request a node identification. This must be the first packet for any
connection. Any -> Any.
"""
poll_thread = True
_fmt = PStruct('request_identification',
PFNodeType,
PUUID('uuid'),
PAddress('address'),
PString('name'),
PFloat('id_timestamp'),
)
_answer = PStruct('accept_identification',
PFNodeType,
PUUID('my_uuid'),
PNumber('num_partitions'),
PNumber('num_replicas'),
PUUID('your_uuid'),
)
class PrimaryMaster(Packet):
"""
Ask current primary master's uuid. CTL -> A.
"""
_answer = PStruct('answer_primary',
PUUID('primary_uuid'),
)
class NotPrimaryMaster(Packet):
"""
Send list of known master nodes. SM -> Any.
"""
_fmt = PStruct('not_primary_master',
PSignedNull('primary'),
PList('known_master_list',
PAddress('address'),
),
)
class Recovery(Packet):
"""
Ask all data needed by master to recover. PM -> S, S -> PM.
"""
_answer = PStruct('answer_recovery',
PPTID('ptid'),
PTID('backup_tid'),
PTID('truncate_tid'),
)
class LastIDs(Packet):
"""
Ask the last OID/TID so that a master can initialize its TransactionManager.
PM -> S, S -> PM.
"""
_answer = PStruct('answer_last_ids',
POID('last_oid'),
PTID('last_tid'),
)
class PartitionTable(Packet):
"""
Ask the full partition table. PM -> S.
Answer rows in a partition table. S -> PM.
"""
_answer = PStruct('answer_partition_table',
PPTID('ptid'),
PFRowList,
)
class NotifyPartitionTable(Packet):
"""
Send rows in a partition table to update other nodes. PM -> S, C.
"""
_fmt = PStruct('send_partition_table',
PPTID('ptid'),
PFRowList,
)
class PartitionChanges(Packet):
"""
Notify a subset of a partition table. This is used to notify changes.
PM -> S, C.
"""
_fmt = PStruct('notify_partition_changes',
PPTID('ptid'),
PList('cell_list',
PStruct('cell',
PNumber('offset'),
PUUID('uuid'),
PFCellState,
),
),
)
class StartOperation(Packet):
"""
Tell a storage nodes to start an operation. Until a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
_fmt = PStruct('start_operation',
# XXX: Is this boolean needed ? Maybe this
# can be deduced from cluster state.
PBoolean('backup'),
)
class StopOperation(Packet):
"""
Tell a storage node to stop an operation. Once a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
class UnfinishedTransactions(Packet):
"""
Ask unfinished transactions S -> PM.
Answer unfinished transactions PM -> S.
"""
_fmt = PStruct('ask_unfinished_transactions',
PList('row_list',
PNumber('offset'),
),
)
_answer = PStruct('answer_unfinished_transactions',
PTID('max_tid'),
PList('tid_list',
PTID('unfinished_tid'),
),
)
class LockedTransactions(Packet):
"""
Ask locked transactions PM -> S.
Answer locked transactions S -> PM.
"""
_answer = PStruct('answer_locked_transactions',
PDict('tid_dict',
PTID('ttid'),
PTID('tid'),
),
)
class FinalTID(Packet):
"""
Return final tid if ttid has been committed. * -> S. C -> PM.
"""
_fmt = PStruct('final_tid',
PTID('ttid'),
)
_answer = PStruct('final_tid',
PTID('tid'),
)
class ValidateTransaction(Packet):
"""
Commit a transaction. PM -> S.
"""
_fmt = PStruct('validate_transaction',
PTID('ttid'),
PTID('tid'),
)
class BeginTransaction(Packet):
"""
Ask to begin a new transaction. C -> PM.
Answer when a transaction begin, give a TID if necessary. PM -> C.
"""
_fmt = PStruct('ask_begin_transaction',
PTID('tid'),
)
_answer = PStruct('answer_begin_transaction',
PTID('tid'),
)
class FailedVote(Packet):
"""
Report storage nodes for which vote failed. C -> M
True is returned if it's still possible to finish the transaction.
"""
_fmt = PStruct('failed_vote',
PTID('tid'),
PFUUIDList,
)
_answer = Error
class FinishTransaction(Packet):
"""
Finish a transaction. C -> PM.
Answer when a transaction is finished. PM -> C.
"""
poll_thread = True
_fmt = PStruct('ask_finish_transaction',
PTID('tid'),
PFOidList,
PList('checked_list',
POID('oid'),
),
)
_answer = PStruct('answer_information_locked',
PTID('ttid'),
PTID('tid'),
)
class NotifyTransactionFinished(Packet):
"""
Notify that a transaction blocking a replication is now finished
M -> S
"""
_fmt = PStruct('notify_transaction_finished',
PTID('ttid'),
PTID('max_tid'),
)
class LockInformation(Packet):
"""
Lock information on a transaction. PM -> S.
Notify information on a transaction locked. S -> PM.
"""
_fmt = PStruct('ask_lock_informations',
PTID('ttid'),
PTID('tid'),
)
_answer = PStruct('answer_information_locked',
PTID('ttid'),
)
class InvalidateObjects(Packet):
"""
Invalidate objects. PM -> C.
"""
_fmt = PStruct('ask_finish_transaction',
PTID('tid'),
PFOidList,
)
class UnlockInformation(Packet):
"""
Unlock information on a transaction. PM -> S.
"""
_fmt = PStruct('notify_unlock_information',
PTID('ttid'),
)
class GenerateOIDs(Packet):
"""
Ask new object IDs. C -> PM.
Answer new object IDs. PM -> C.
"""
_fmt = PStruct('ask_new_oids',
PNumber('num_oids'),
)
_answer = PStruct('answer_new_oids',
PFOidList,
)
class Deadlock(Packet):
"""
Ask master to generate a new TTID that will be used by the client
to rebase a transaction. S -> PM -> C
"""
_fmt = PStruct('notify_deadlock',
PTID('ttid'),
PTID('locking_tid'),
)
class RebaseTransaction(Packet):
"""
Rebase transaction. C -> S.
"""
_fmt = PStruct('ask_rebase_transaction',
PTID('ttid'),
PTID('locking_tid'),
)
_answer = PStruct('answer_rebase_transaction',
PFOidList,
)
class RebaseObject(Packet):
"""
Rebase object. C -> S.
XXX: It is a request packet to simplify the implementation. For more
efficiency, this should be turned into a notification, and the
RebaseTransaction should answered once all objects are rebased
(so that the client can still wait on something).
"""
_fmt = PStruct('ask_rebase_object',
PTID('ttid'),
PTID('oid'),
)
_answer = PStruct('answer_rebase_object',
POption('conflict',
PTID('serial'),
PTID('conflict_serial'),
POption('data',
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
),
)
)
class StoreObject(Packet):
"""
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
As for IStorage, 'serial' is ZERO_TID for new objects.
"""
_fmt = PStruct('ask_store_object',
POID('oid'),
PTID('serial'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
PTID('tid'),
)
_answer = PStruct('answer_store_object',
PTID('conflict'),
)
class AbortTransaction(Packet):
"""
Abort a transaction. C -> S and C -> PM -> S.
"""
_fmt = PStruct('abort_transaction',
PTID('tid'),
PFUUIDList, # unused for * -> S
)
class StoreTransaction(Packet):
"""
Ask to store a transaction. C -> S.
Answer if transaction has been stored. S -> C.
"""
_fmt = PStruct('ask_store_transaction',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PFOidList,
)
_answer = PFEmpty
class VoteTransaction(Packet):
"""
Ask to store a transaction. C -> S.
Answer if transaction has been stored. S -> C.
"""
_fmt = PStruct('ask_vote_transaction',
PTID('tid'),
)
_answer = PFEmpty
class GetObject(Packet):
"""
Ask a stored object by its OID and a serial or a TID if given. If a serial
is specified, the specified revision of an object will be returned. If
a TID is specified, an object right before the TID will be returned. C -> S.
Answer the requested object. S -> C.
"""
_fmt = PStruct('ask_object',
POID('oid'),
PTID('serial'),
PTID('tid'),
)
_answer = PStruct('answer_object',
POID('oid'),
PTID('serial_start'),
PTID('serial_end'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
)
class TIDList(Packet):
"""
Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). C -> S.
Answer the requested TIDs. S -> C.
"""
_fmt = PStruct('ask_tids',
PIndex('first'),
PIndex('last'),
PNumber('partition'),
)
_answer = PStruct('answer_tids',
PFTidList,
)
class TIDListFrom(Packet):
"""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
C -> S.
Answer the requested TIDs. S -> C
"""
_fmt = PStruct('tid_list_from',
PTID('min_tid'),
PTID('max_tid'),
PNumber('length'),
PNumber('partition'),
)
_answer = PStruct('answer_tids',
PFTidList,
)
class TransactionInformation(Packet):
"""
Ask information about a transaction. Any -> S.
Answer information (user, description) about a transaction. S -> Any.
"""
_fmt = PStruct('ask_transaction_information',
PTID('tid'),
)
_answer = PStruct('answer_transaction_information',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PFOidList,
)
class ObjectHistory(Packet):
"""
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C -> S.
Answer history information (serial, size) for an object. S -> C.
"""
_fmt = PStruct('ask_object_history',
POID('oid'),
PIndex('first'),
PIndex('last'),
)
_answer = PStruct('answer_object_history',
POID('oid'),
PFHistoryList,
)
class PartitionList(Packet):
"""
All the following messages are for neoctl to admin node
Ask information about partition
Answer information about partition
"""
_fmt = PStruct('ask_partition_list',
PNumber('min_offset'),
PNumber('max_offset'),
PUUID('uuid'),
)
_answer = PStruct('answer_partition_list',
PPTID('ptid'),
PFRowList,
)
class NodeList(Packet):
"""
Ask information about nodes
Answer information about nodes
"""
_fmt = PStruct('ask_node_list',
PFNodeType,
)
_answer = PStruct('answer_node_list',
PFNodeList,
)
class SetNodeState(Packet):
"""
Set the node state
"""
_fmt = PStruct('set_node_state',
PUUID('uuid'),
PFNodeState,
)
_answer = Error
class AddPendingNodes(Packet):
"""
Ask the primary to include some pending node in the partition table
"""
_fmt = PStruct('add_pending_nodes',
PFUUIDList,
)
_answer = Error
class TweakPartitionTable(Packet):
"""
Ask the primary to optimize the partition table. A -> PM.
"""
_fmt = PStruct('tweak_partition_table',
PFUUIDList,
)
_answer = Error
class NotifyNodeInformation(Packet):
"""
Notify information about one or more nodes. PM -> Any.
"""
_fmt = PStruct('notify_node_informations',
PFloat('id_timestamp'),
PFNodeList,
)
class SetClusterState(Packet):
"""
Set the cluster state
"""
_fmt = PStruct('set_cluster_state',
PEnum('state', ClusterStates),
)
_answer = Error
class Repair(Packet):
"""
Ask storage nodes to repair their databases. ctl -> A -> M
"""
_flags = map(PBoolean, ('dry_run',
# 'prune_orphan' (commented because it's the only option for the moment)
))
_fmt = PStruct('repair',
PFUUIDList,
*_flags)
_answer = Error
class RepairOne(Packet):
"""
See Repair. M -> S
"""
_fmt = PStruct('repair', *Repair._flags)
class ClusterInformation(Packet):
"""
Notify information about the cluster
"""
_fmt = PStruct('notify_cluster_information',
PEnum('state', ClusterStates),
)
class ClusterState(Packet):
"""
Ask state of the cluster
Answer state of the cluster
"""
_answer = PStruct('answer_cluster_state',
PEnum('state', ClusterStates),
)
class ObjectUndoSerial(Packet):
"""
Ask storage the serial where object data is when undoing given transaction,
for a list of OIDs.
C -> S
Answer serials at which object data is when undoing a given transaction.
object_tid_dict has the following format:
key: oid
value: 3-tuple
current_serial (TID)
The latest serial visible to the undoing transaction.
undo_serial (TID)
Where undone data is (tid at which data is before given undo).
is_current (bool)
If current_serial's data is current on storage.
S -> C
"""
_fmt = PStruct('ask_undo_transaction',
PTID('tid'),
PTID('ltid'),
PTID('undone_tid'),
PFOidList,
)
_answer = PStruct('answer_undo_transaction',
PDict('object_tid_dict',
POID('oid'),
PStruct('object_tid_value',
PTID('current_serial'),
PTID('undo_serial'),
PBoolean('is_current'),
),
),
)
class CheckCurrentSerial(Packet):
"""
Verifies if given serial is current for object oid in the database, and
take a write lock on it (so that this state is not altered until
transaction ends).
Answer to AskCheckCurrentSerial.
Same structure as AnswerStoreObject, to handle the same way, except there
is nothing to invalidate in any client's cache.
"""
_fmt = PStruct('ask_check_current_serial',
PTID('tid'),
POID('oid'),
PTID('serial'),
)
_answer = StoreObject._answer
class Pack(Packet):
"""
Request a pack at given TID.
C -> M
M -> S
Inform that packing it over.
S -> M
M -> C
"""
_fmt = PStruct('ask_pack',
PTID('tid'),
)
_answer = PStruct('answer_pack',
PBoolean('status'),
)
class CheckReplicas(Packet):
"""
ctl -> A
A -> M
"""
_fmt = PStruct('check_replicas',
PDict('partition_dict',
PNumber('partition'),
PUUID('source'),
),
PTID('min_tid'),
PTID('max_tid'),
)
_answer = Error
class CheckPartition(Packet):
"""
M -> S
"""
_fmt = PStruct('check_partition',
PNumber('partition'),
PStruct('source',
PString('upstream_name'),
PAddress('address'),
),
PTID('min_tid'),
PTID('max_tid'),
)
class CheckTIDRange(Packet):
"""
Ask some stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
S -> S
Stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
_fmt = PStruct('ask_check_tid_range',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
)
_answer = PStruct('answer_check_tid_range',
PNumber('count'),
PChecksum('checksum'),
PTID('max_tid'),
)
class CheckSerialRange(Packet):
"""
Ask some stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
S -> S
Stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
_fmt = PStruct('ask_check_serial_range',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
POID('min_oid'),
)
_answer = PStruct('answer_check_serial_range',
PNumber('count'),
PChecksum('tid_checksum'),
PTID('max_tid'),
PChecksum('oid_checksum'),
POID('max_oid'),
)
class PartitionCorrupted(Packet):
"""
S -> M
"""
_fmt = PStruct('partition_corrupted',
PNumber('partition'),
PList('cell_list',
PUUID('uuid'),
),
)
class LastTransaction(Packet):
"""
Ask last committed TID.
C -> M
Answer last committed TID.
M -> C
"""
poll_thread = True
_answer = PStruct('answer_last_transaction',
PTID('tid'),
)
class NotifyReady(Packet):
"""
Notify that node is ready to serve requests.
S -> M
"""
pass
# replication
class FetchTransactions(Packet):
"""
S -> S
"""
_fmt = PStruct('ask_transaction_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
PFTidList, # already known transactions
)
_answer = PStruct('answer_transaction_list',
PTID('pack_tid'),
PTID('next_tid'),
PFTidList, # transactions to delete
)
class AddTransaction(Packet):
"""
S -> S
"""
nodelay = False
_fmt = PStruct('add_transaction',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PTID('ttid'),
PFOidList,
)
class FetchObjects(Packet):
"""
S -> S
"""
_fmt = PStruct('ask_object_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
POID('min_oid'),
PDict('object_dict', # already known objects
PTID('serial'),
PFOidList,
),
)
_answer = PStruct('answer_object_list',
PTID('pack_tid'),
PTID('next_tid'),
POID('next_oid'),
PDict('object_dict', # objects to delete
PTID('serial'),
PFOidList,
),
)
class AddObject(Packet):
"""
S -> S
"""
nodelay = False
_fmt = PStruct('add_object',
POID('oid'),
PTID('serial'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
)
class Replicate(Packet):
"""
Notify a storage node to replicate partitions up to given 'tid'
and from given sources.
M -> S
- upstream_name: replicate from an upstream cluster
- address: address of the source storage node, or None if there's no new
data up to 'tid' for the given partition
"""
_fmt = PStruct('replicate',
PTID('tid'),
PString('upstream_name'),
PDict('source_dict',
PNumber('partition'),
PAddress('address'),
)
)
class ReplicationDone(Packet):
"""
Notify the master node that a partition has been successfully replicated
from a storage to another.
S -> M
"""
_fmt = PStruct('notify_replication_done',
PNumber('offset'),
PTID('tid'),
)
class Truncate(Packet):
"""
Request DB to be truncated. Also used to leave backup mode.
"""
_fmt = PStruct('truncate',
PTID('tid'),
)
_answer = Error
StaticRegistry = {}
def register(request, ignore_when_closed=None):
""" Register a packet in the packet registry """
code = len(StaticRegistry)
if request is Error:
code |= RESPONSE_MASK
# register the request
StaticRegistry[code] = request
if request is None:
return # None registered only to skip a code number (for compatibility)
request._code = code
answer = request._answer
if ignore_when_closed is None:
# By default, on a closed connection:
# - request: ignore
# - answer: keep
# - notification: keep
ignore_when_closed = answer is not None
request._ignore_when_closed = ignore_when_closed
if answer in (Error, None):
return request
# build a class for the answer
answer = type('Answer%s' % (request.__name__, ), (Packet, ), {})
answer._fmt = request._answer
answer.poll_thread = request.poll_thread
# compute the answer code
code = code | RESPONSE_MASK
answer._request = request
assert answer._code is None, "Answer of %s is already used" % (request, )
answer._code = code
request._answer = answer
# and register the answer packet
assert code not in StaticRegistry, "Duplicate response packet code"
StaticRegistry[code] = answer
return (request, answer)
class Packets(dict):
"""
Packet registry that checks packet code uniqueness and provides an index
"""
def __metaclass__(name, base, d):
for k, v in d.iteritems():
if isinstance(v, type) and issubclass(v, Packet):
v.handler_method_name = k[0].lower() + k[1:]
# this builds a "singleton"
return type('PacketRegistry', base, d)(StaticRegistry)
# notifications
Error = register(
Error)
RequestIdentification, AcceptIdentification = register(
RequestIdentification, ignore_when_closed=True)
Ping, Pong = register(
Ping)
CloseClient = register(
CloseClient)
AskPrimary, AnswerPrimary = register(
PrimaryMaster)
NotPrimaryMaster = register(
NotPrimaryMaster)
NotifyNodeInformation = register(
NotifyNodeInformation)
AskRecovery, AnswerRecovery = register(
Recovery)
AskLastIDs, AnswerLastIDs = register(
LastIDs)
AskPartitionTable, AnswerPartitionTable = register(
PartitionTable)
SendPartitionTable = register(
NotifyPartitionTable)
NotifyPartitionChanges = register(
PartitionChanges)
StartOperation = register(
StartOperation)
StopOperation = register(
StopOperation)
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
UnfinishedTransactions)
AskLockedTransactions, AnswerLockedTransactions = register(
LockedTransactions)
AskFinalTID, AnswerFinalTID = register(
FinalTID)
ValidateTransaction = register(
ValidateTransaction)
AskBeginTransaction, AnswerBeginTransaction = register(
BeginTransaction)
FailedVote = register(
FailedVote)
AskFinishTransaction, AnswerTransactionFinished = register(
FinishTransaction, ignore_when_closed=False)
AskLockInformation, AnswerInformationLocked = register(
LockInformation, ignore_when_closed=False)
InvalidateObjects = register(
InvalidateObjects)
NotifyUnlockInformation = register(
UnlockInformation)
AskNewOIDs, AnswerNewOIDs = register(
GenerateOIDs)
NotifyDeadlock = register(
Deadlock)
AskRebaseTransaction, AnswerRebaseTransaction = register(
RebaseTransaction)
AskRebaseObject, AnswerRebaseObject = register(
RebaseObject)
AskStoreObject, AnswerStoreObject = register(
StoreObject)
AbortTransaction = register(
AbortTransaction)
AskStoreTransaction, AnswerStoreTransaction = register(
StoreTransaction)
AskVoteTransaction, AnswerVoteTransaction = register(
VoteTransaction)
AskObject, AnswerObject = register(
GetObject)
AskTIDs, AnswerTIDs = register(
TIDList)
AskTransactionInformation, AnswerTransactionInformation = register(
TransactionInformation)
AskObjectHistory, AnswerObjectHistory = register(
ObjectHistory)
AskPartitionList, AnswerPartitionList = register(
PartitionList)
AskNodeList, AnswerNodeList = register(
NodeList)
SetNodeState = register(
SetNodeState, ignore_when_closed=False)
AddPendingNodes = register(
AddPendingNodes, ignore_when_closed=False)
TweakPartitionTable = register(
TweakPartitionTable, ignore_when_closed=False)
SetClusterState = register(
SetClusterState, ignore_when_closed=False)
Repair = register(
Repair)
NotifyRepair = register(
RepairOne)
NotifyClusterInformation = register(
ClusterInformation)
AskClusterState, AnswerClusterState = register(
ClusterState)
AskObjectUndoSerial, AnswerObjectUndoSerial = register(
ObjectUndoSerial)
AskTIDsFrom, AnswerTIDsFrom = register(
TIDListFrom)
AskPack, AnswerPack = register(
Pack, ignore_when_closed=False)
CheckReplicas = register(
CheckReplicas)
CheckPartition = register(
CheckPartition)
AskCheckTIDRange, AnswerCheckTIDRange = register(
CheckTIDRange)
AskCheckSerialRange, AnswerCheckSerialRange = register(
CheckSerialRange)
NotifyPartitionCorrupted = register(
PartitionCorrupted)
NotifyReady = register(
NotifyReady)
AskLastTransaction, AnswerLastTransaction = register(
LastTransaction)
AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
CheckCurrentSerial)
NotifyTransactionFinished = register(
NotifyTransactionFinished)
Replicate = register(
Replicate)
NotifyReplicationDone = register(
ReplicationDone)
AskFetchTransactions, AnswerFetchTransactions = register(
FetchTransactions)
AskFetchObjects, AnswerFetchObjects = register(
FetchObjects)
AddTransaction = register(
AddTransaction)
AddObject = register(
AddObject)
Truncate = register(
Truncate)
def Errors():
registry_dict = {}
handler_method_name_dict = {}
def register_error(code):
return lambda self, message='': Error(code, message)
for error in ErrorCodes:
name = ''.join(part.capitalize() for part in str(error).split('_'))
registry_dict[name] = register_error(int(error))
handler_method_name_dict[int(error)] = name[0].lower() + name[1:]
return type('ErrorRegistry', (dict,),
registry_dict)(handler_method_name_dict)
Errors = Errors()
# Common helpers between the 'neo' module and 'neolog'.
from datetime import datetime
from operator import itemgetter
def formatNodeList(node_list, prefix='', _sort_key=itemgetter(2)):
if node_list:
node_list.sort(key=_sort_key)
node_list = [(
uuid_str(uuid), str(node_type),
('[%s]:%s' if ':' in addr[0] else '%s:%s')
% addr if addr else '', str(state),
str(id_timestamp and datetime.utcfromtimestamp(id_timestamp)))
for node_type, addr, uuid, state, id_timestamp in node_list]
t = ''.join('%%-%us | ' % max(len(x[i]) for x in node_list)
for i in xrange(len(node_list[0]) - 1))
return map((prefix + t + '%s').__mod__, node_list)
return ()
NotifyNodeInformation._neolog = staticmethod(lambda timestamp, node_list:
((timestamp,), formatNodeList(node_list, ' ! ')))
Error._neolog = staticmethod(lambda *args: ((), ("%s (%s)" % args,)))
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/pt.py 0000664 0000000 0000000 00000026744 13276562645 0024053 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import math
from . import logging, protocol
from .locking import Lock
from .protocol import uuid_str, CellStates
from .util import u64
class PartitionTableException(Exception):
"""
Base class for partition table exceptions
"""
class Cell(object):
"""This class represents a cell in a partition table."""
def __init__(self, node, state = CellStates.UP_TO_DATE):
self.node = node
self.state = state
def __repr__(self):
return "" % (
uuid_str(self.getUUID()),
self.getAddress(),
self.getState(),
)
def getState(self):
return self.state
def setState(self, state):
assert state != CellStates.DISCARDED
self.state = state
def isUpToDate(self):
return self.state == CellStates.UP_TO_DATE
def isOutOfDate(self):
return self.state == CellStates.OUT_OF_DATE
def isFeeding(self):
return self.state == CellStates.FEEDING
def isCorrupted(self):
return self.state == CellStates.CORRUPTED
def isReadable(self):
return self.state == CellStates.UP_TO_DATE or \
self.state == CellStates.FEEDING
def getNode(self):
return self.node
def getNodeState(self):
"""This is a short hand."""
return self.node.getState()
def getUUID(self):
return self.node.getUUID()
def getAddress(self):
return self.node.getAddress()
class PartitionTable(object):
"""This class manages a partition table."""
# Flushing logs whenever a cell becomes out-of-date would flood them.
_first_outdated_message = \
'a cell became non-readable whereas all cells were readable'
def __init__(self, num_partitions, num_replicas):
self._id = None
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
# Note: don't use [[]] * num_partition construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(num_partitions)]
self.count_dict = {}
def getID(self):
return self._id
def getPartitions(self):
return self.np
def getReplicas(self):
return self.nr
def clear(self):
"""Forget an existing partition table."""
self._id = None
self.num_filled_rows = 0
# Note: don't use [[]] * self.np construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(self.np)]
self.count_dict.clear()
def getAssignedPartitionList(self, uuid):
""" Return the partition assigned to the specified UUID """
assigned_partitions = []
for offset in xrange(self.np):
for cell in self.getCellList(offset, readable=True):
if cell.getUUID() == uuid:
assigned_partitions.append(offset)
break
return assigned_partitions
def hasOffset(self, offset):
try:
return len(self.partition_list[offset]) > 0
except IndexError:
return False
def getNodeSet(self, readable=False):
if readable:
return {x.getNode() for row in self.partition_list for x in row
if x.isReadable()}
return {x.getNode() for row in self.partition_list for x in row}
def getConnectedNodeList(self):
return [node for node in self.getNodeSet() if node.isConnected()]
def getCellList(self, offset, readable=False):
if readable:
return filter(Cell.isReadable, self.partition_list[offset])
return list(self.partition_list[offset])
def getPartition(self, oid_or_tid):
return u64(oid_or_tid) % self.getPartitions()
def getOutdatedOffsetListFor(self, uuid):
return [
offset for offset in xrange(self.np)
for c in self.partition_list[offset]
if c.getUUID() == uuid and c.getState() == CellStates.OUT_OF_DATE
]
def isAssigned(self, oid, uuid):
""" Check if the oid is assigned to the given node """
for cell in self.partition_list[u64(oid) % self.np]:
if cell.getUUID() == uuid:
return True
return False
def getCell(self, offset, uuid):
for cell in self.partition_list[offset]:
if cell.getUUID() == uuid:
return cell
def _setCell(self, offset, node, state):
if state == CellStates.DISCARDED:
return self.removeCell(offset, node)
if node.isUnknown():
raise PartitionTableException('Invalid node state')
self.count_dict.setdefault(node, 0)
for cell in self.partition_list[offset]:
if cell.getNode() is node:
if not cell.isFeeding():
self.count_dict[node] -= 1
cell.setState(state)
break
else:
row = self.partition_list[offset]
self.num_filled_rows += not row
row.append(Cell(node, state))
if state != CellStates.FEEDING:
self.count_dict[node] += 1
def removeCell(self, offset, node):
row = self.partition_list[offset]
for cell in row:
if cell.getNode() == node:
row.remove(cell)
self.num_filled_rows -= not row
if not cell.isFeeding():
self.count_dict[node] -= 1
break
def dropNode(self, node):
count = self.count_dict.get(node)
if count == 0:
del self.count_dict[node]
return not count
def load(self, ptid, row_list, nm):
"""
Load the partition table with the specified PTID, discard all previous
content.
"""
self.clear()
self._id = ptid
for offset, row in row_list:
if offset >= self.getPartitions():
raise IndexError
for uuid, state in row:
node = nm.getByUUID(uuid)
# the node must be known by the node manager
assert node is not None
self._setCell(offset, node, state)
logging.debug('partition table loaded (ptid=%s)', ptid)
self.log()
def update(self, ptid, cell_list, nm):
"""
Update the partition with the cell list supplied. If a node
is not known, it is created in the node manager and set as unavailable
"""
assert self._id < ptid, (self._id, ptid)
self._id = ptid
readable_list = []
for row in self.partition_list:
if not all(cell.isReadable() for cell in row):
del readable_list[:]
break
readable_list += row
for offset, uuid, state in cell_list:
node = nm.getByUUID(uuid)
assert node is not None, 'No node found for uuid ' + uuid_str(uuid)
self._setCell(offset, node, state)
self.logUpdated()
if not all(cell.isReadable() for cell in readable_list):
logging.warning(self._first_outdated_message)
def filled(self):
return self.num_filled_rows == self.np
def logUpdated(self):
logging.debug('partition table updated (ptid=%s)', self._id)
self.log()
def log(self):
logging.debug(self.format())
def format(self):
return '\n'.join(self._format())
def _format(self):
"""Help debugging partition table management.
Output sample:
pt: node 0: S1, R
pt: node 1: S2, R
pt: node 2: S3, R
pt: node 3: S4, R
pt: 00: .UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.
pt: 11: U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U
Here, there are 4 nodes in RUNNING state.
The first partition has 2 replicas in UP_TO_DATE state, on nodes 1 and
2 (nodes 0 and 3 are displayed as unused for that partition by
displaying a dot).
The first number on the left represents the number of the first
partition on the line (here, line length is 11 to keep the docstring
width under 80 column).
"""
node_list = sorted(self.count_dict)
result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()])
for i, node in enumerate(node_list)]
append = result.append
line = []
max_line_len = 20 # XXX: hardcoded number of partitions per line
prefix = 0
prefix_len = int(math.ceil(math.log10(self.np)))
for offset, row in enumerate(self._formatRows(node_list)):
if len(line) == max_line_len:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
line = []
prefix = offset
line.append(row)
if line:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
return result
def _formatRows(self, node_list):
cell_state_dict = protocol.cell_state_prefix_dict
for row in self.partition_list:
if row is None:
yield 'X' * len(node_list)
else:
cell_dict = {x.getNode(): cell_state_dict[x.getState()]
for x in row}
yield ''.join(cell_dict.get(x, '.') for x in node_list)
def operational(self, exclude_list=()):
if not self.filled():
return False
for row in self.partition_list:
for cell in row:
if cell.isReadable():
node = cell.getNode()
if node.isRunning() and node.getUUID() not in exclude_list:
break
else:
return False
return True
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return []
return [(cell.getUUID(), cell.getState()) for cell in row]
def getRowList(self):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
class MTPartitionTable(PartitionTable):
""" Thread-safe aware version of the partition table, override only methods
used in the client """
def __init__(self, *args, **kw):
self._lock = Lock()
PartitionTable.__init__(self, *args, **kw)
def update(self, *args, **kw):
with self._lock:
return PartitionTable.update(self, *args, **kw)
def clear(self, *args, **kw):
with self._lock:
return PartitionTable.clear(self, *args, **kw)
def operational(self, *args, **kw):
with self._lock:
return PartitionTable.operational(self, *args, **kw)
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/threaded_app.py 0000664 0000000 0000000 00000011300 13276562645 0026026 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import thread, threading, weakref
from . import logging
from .app import BaseApplication
from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher
from .locking import SimpleQueue
class app_set(weakref.WeakSet):
def on_log(self):
for app in self:
app.log()
app_set = app_set()
registerLiveDebugger(app_set.on_log)
class ThreadContainer(threading.local):
def __init__(self):
self.queue = SimpleQueue()
self.answer = None
class ThreadedApplication(BaseApplication):
"""The client node application."""
uuid = None
def __init__(self, master_nodes, name, **kw):
super(ThreadedApplication, self).__init__(**kw)
self.poll_thread = threading.Thread(target=self.run, name=name)
self.poll_thread.daemon = True
# Internal Attributes common to all thread
self.name = name
self.dispatcher = Dispatcher()
self.master_conn = None
# load master node list
for address in master_nodes:
self.nm.createMaster(address=address)
# Internal attribute distinct between thread
self._thread_container = ThreadContainer()
app_set.add(self) # to register self.on_log
def close(self):
# Clear all connection
self.master_conn = None
if self.poll_thread.is_alive():
for conn in self.em.getConnectionList():
conn.close()
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.em.wakeup(thread.exit)
else:
super(ThreadedApplication, self).close()
def start(self):
self.poll_thread.is_alive() or self.poll_thread.start()
def run(self):
logging.debug("Started %s", self.poll_thread)
try:
self._run()
finally:
super(ThreadedApplication, self).close()
logging.debug("Poll thread stopped")
def _run(self):
poll = self.em.poll
while 1:
try:
while 1:
poll(1)
except Exception:
self.log()
logging.error("poll raised, retrying", exc_info=1)
def getHandlerData(self):
return self._thread_container.answer
def setHandlerData(self, data):
self._thread_container.answer = data
def log(self):
self.em.log()
self.nm.log()
pt = self.__dict__.get('pt')
if pt is not None:
pt.log()
def _handlePacket(self, conn, packet, kw={}, handler=None):
"""
conn
The connection which received the packet (forwarded to handler).
packet
The packet to handle.
handler
The handler to use to handle packet.
If not given, it will be guessed from connection's not type.
"""
if handler is None:
# Guess the handler to use based on the type of node on the
# connection
node = self.nm.getByAddress(conn.getAddress())
if node is None:
raise ValueError, 'Expecting an answer from a node ' \
'which type is not known... Is this right ?'
if node.isStorage():
handler = self.storage_handler
elif node.isMaster():
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (node.__class__, )
with conn.lock:
handler.dispatch(conn, packet, kw)
def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None)
queue = self._thread_container.queue
msg_id = conn.ask(packet, queue=queue, **kw)
get = queue.get
_handlePacket = self._handlePacket
while True:
qconn, qpacket, kw = get(True)
if conn is qconn and msg_id == qpacket.getId():
_handlePacket(qconn, qpacket, kw, handler)
break
_handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
neoppod-30a02bdc1f256d74a4a78e10d9f74df606f7d132-neo-lib/neo/lib/util.py 0000664 0000000 0000000 00000016253 13276562645 0024377 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import socket
from binascii import a2b_hex, b2a_hex
from datetime import timedelta, datetime
from hashlib import sha1
from Queue import deque
from struct import pack, unpack
from time import gmtime
TID_LOW_OVERFLOW = 2**32
TID_LOW_MAX = TID_LOW_OVERFLOW - 1
SECOND_PER_TID_LOW = 60.0 / TID_LOW_OVERFLOW
TID_CHUNK_RULES = (
(-1900, 0),
(-1, 12),
(-1, 31),
(0, 24),
(0, 60),
)
def tidFromTime(tm):
gmt = gmtime(tm)
return packTID(
(gmt.tm_year, gmt.tm_mon, gmt.tm_mday, gmt.tm_hour, gmt.tm_min),
int((gmt.tm_sec + (tm - int(tm))) / SECOND_PER_TID_LOW))
def packTID(higher, lower):
"""
higher: a 5-tuple containing year, month, day, hour and minute
lower: seconds scaled to 60:2**32 into a 64 bits TID
"""
assert len(higher) == len(TID_CHUNK_RULES), higher
packed_higher = 0
for value, (offset, multiplicator) in zip(higher, TID_CHUNK_RULES):
assert isinstance(value, (int, long)), value
value += offset
assert 0 <= value, (value, offset, multiplicator)
assert multiplicator == 0 or value < multiplicator, (value,
offset, multiplicator)
packed_higher *= multiplicator
packed_higher += value
# If the machine is configured in such way that gmtime() returns leap
# seconds (e.g. TZ=right/UTC), then the best we can do is to use
# TID_LOW_MAX, because TID format was not designed to support them.
# For more information about leap seconds on Unix, see:
# https://en.wikipedia.org/wiki/Unix_time
# http://www.madore.org/~david/computers/unix-leap-seconds.html
return pack('!LL', packed_higher, min(lower, TID_LOW_MAX))
def unpackTID(ptid):
"""
Unpack given 64 bits TID in to a 2-tuple containing:
- a 5-tuple containing year, month, day, hour and minute
- seconds scaled to 60:2**32
"""
packed_higher, lower = unpack('!LL', ptid)
higher = []
append = higher.append
for offset, multiplicator in reversed(TID_CHUNK_RULES):
if multiplicator:
packed_higher, value = divmod(packed_higher, multiplicator)
else:
packed_higher, value = 0, packed_higher
append(value - offset)
higher.reverse()
return (tuple(higher), lower)
def timeStringFromTID(ptid):
"""
Return a string in the format "yyyy-mm-dd hh:mm:ss.ssssss" from a TID
"""
higher, lower = unpackTID(ptid)
seconds = lower * SECOND_PER_TID_LOW
return '%04d-%02d-%02d %02d:%02d:%09.6f' % (higher[0], higher[1], higher[2],
higher[3], higher[4], seconds)
def addTID(ptid, offset):
"""
Offset given packed TID.
"""
higher, lower = unpackTID(ptid)
high_offset, lower = divmod(lower + offset, TID_LOW_OVERFLOW)
if high_offset:
d = datetime(*higher) + timedelta(0, 60 * high_offset)
higher = (d.year, d.month, d.day, d.hour, d.minute)
return packTID(higher, lower)
def u64(s):
return unpack('!Q', s)[0]
def p64(n):
return pack('!Q', n)
def add64(packed, offset):
"""Add a python number to a 64-bits packed value"""
return p64(u64(packed) + offset)
def dump(s):
"""Dump a binary string in hex."""
if s is not None:
if isinstance(s, bytes):
return b2a_hex(s)
return repr(s)
def bin(s):
"""Inverse of dump method."""
if s is not None:
return a2b_hex(s)
def makeChecksum(s):
"""Return a 20-byte checksum against a string."""
return sha1(s).digest()
def parseNodeAddress(address, port_opt=None):
if address[:1] == '[':
(host, port) = address[1:].split(']')
if port[:1] == ':':
port = port[1:]
else:
port = port_opt
elif address.count(':') == 1:
(host, port) = address.split(':')
else:
host = address
port = port_opt
# Resolve (maybe) and cast to canonical form
# XXX: Always pick the first result. This might not be what is desired, and
# if so this function should either take a hint on the desired address type
# or return either raw host & port or getaddrinfo return value.
return socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][4][:2]
def parseMasterList(masters):
assert masters, 'At least one master must be defined'
return map(parseNodeAddress, masters.split())
class ReadBuffer(object):
"""
Implementation of a lazy buffer. Main purpose if to reduce useless
copies of data by storing chunks and join them only when the requested
size is available.
TODO: For better performance, use:
- socket.recv_into (64kiB blocks)
- struct.unpack_from
- and a circular buffer of dynamic size (initial size:
twice the length passed to socket.recv_into ?)
"""
def __init__(self):
self.size = 0
self.content = deque()
def append(self, data):
""" Append some data and compute the new buffer size """
self.size += len(data)
self.content.append(data)
def __len__(self):
""" Return the current buffer size """
return self.size
def read(self, size):
""" Read and consume size bytes """
if self.size < size:
return None
self.size -= size
chunk_list = []
pop_chunk = self.content.popleft
append_data = chunk_list.append
to_read = size
# select required chunks
while to_read > 0:
chunk_data = pop_chunk()
to_read -= len(chunk_data)
append_data(chunk_data)
if to_read < 0:
# too many bytes consumed, cut the last chunk
last_chunk = chunk_list[-1]
keep, let = last_chunk[:to_read], last_chunk[to_read:]
self.content.appendleft(let)
chunk_list[-1] = keep
# join all chunks (one copy)
data = ''.join(chunk_list)
assert len(data) == size
return data
def clear(self):
""" Erase all buffer content """
self.size = 0
self.content.clear()
dummy_read_buffer = ReadBuffer()
dummy_read_buffer.append = lambda _: None
class cached_property(object):
"""
A property that is only computed once per instance and then replaces itself
with an ordinary attribute. Deleting the attribute resets the property.
"""
def __init__(self, func):
self.__doc__ = func.__doc__
self.func = func
def __get__(self, obj, cls):
if obj is None: return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
|