pax_global_header 0000666 0000000 0000000 00000000064 12000764302 0014504 g ustar 00root root 0000000 0000000 52 comment=e2eef56214c493bf36466ec5cf98e2b13be0ac9f
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/ 0000775 0000000 0000000 00000000000 12000764302 0021727 5 ustar 00root root 0000000 0000000 neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/ 0000775 0000000 0000000 00000000000 12000764302 0022510 5 ustar 00root root 0000000 0000000 neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/ 0000775 0000000 0000000 00000000000 12000764302 0023256 5 ustar 00root root 0000000 0000000 neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/__init__.py 0000664 0000000 0000000 00000001300 12000764302 0025361 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from .logger import logging
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/attributeTracker.py 0000664 0000000 0000000 00000003673 12000764302 0027160 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
ATTRIBUTE_TRACKER_ENABLED = False
from .locking import LockUser
"""
Usage example:
from neo import attributeTracker
class Foo(object):
...
def assertBar(self, expected_value):
if self.bar_attr != expected_value:
attributeTracker.whoSet(self, 'bar_attr')
attributeTracker.track(Foo)
"""
MODIFICATION_CONTAINER_ID = '_attribute_tracker_dict'
def tracker_setattr(self, attr, value, setattr):
modification_container = getattr(self, MODIFICATION_CONTAINER_ID, None)
if modification_container is None:
modification_container = {}
setattr(self, MODIFICATION_CONTAINER_ID, modification_container)
modification_container[attr] = LockUser()
setattr(self, attr, value)
if ATTRIBUTE_TRACKER_ENABLED:
def track(klass):
original_setattr = klass.__setattr__
def klass_tracker_setattr(self, attr, value):
tracker_setattr(self, attr, value, original_setattr)
klass.__setattr__ = klass_tracker_setattr
else:
def track(klass):
pass
def whoSet(instance, attr):
result = getattr(instance, MODIFICATION_CONTAINER_ID, None)
if result is not None:
result = result.get(attr)
if result is not None:
result = result.formatStack()
return result
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/bootstrap.py 0000664 0000000 0000000 00000012165 12000764302 0025652 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from time import sleep
from . import logging
from .handler import EventHandler
from .protocol import uuid_str, Packets
from .connection import ClientConnection
NO_SERVER = ('0.0.0.0', 0)
class BootstrapManager(EventHandler):
"""
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
accepted = False
def __init__(self, app, name, node_type, uuid=None, server=NO_SERVER):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
is ready.
"""
EventHandler.__init__(self, app)
self.primary = None
self.server = server
self.node_type = node_type
self.uuid = uuid
self.name = name
self.num_replicas = None
self.num_partitions = None
self.current = None
def notifyNodeInformation(self, conn, node_list):
pass
def announcePrimary(self, conn):
# We found the primary master early enough to be notified of election
# end. Lucky. Anyway, we must carry on with identification request, so
# nothing to do here.
pass
def connectionCompleted(self, conn):
"""
Triggered when the network connection is successful.
Now ask who's the primary.
"""
EventHandler.connectionCompleted(self, conn)
self.current.setRunning()
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.name))
def connectionFailed(self, conn):
"""
Triggered when the network connection failed.
Restart bootstrap.
"""
EventHandler.connectionFailed(self, conn)
self.current = None
def connectionLost(self, conn, new_state):
"""
Triggered when an established network connection is lost.
Restart bootstrap.
"""
self.current.setTemporarilyDown()
self.current = None
def notReady(self, conn, message):
"""
The primary master send this message when it is still not ready to
handle the client node.
Close connection and restart.
"""
conn.close()
def _acceptIdentification(self, node, uuid, num_partitions,
num_replicas, your_uuid, primary, known_master_list):
nm = self.app.nm
# Register new master nodes.
for address, uuid in known_master_list:
master_node = nm.getByAddress(address)
if master_node is None:
master_node = nm.createMaster(address=address)
master_node.setUUID(uuid)
self.primary = nm.getByAddress(primary)
if self.primary is None or self.current is not self.primary:
# three cases here:
# - something goes wrong (unknown UUID)
# - this master doesn't know who's the primary
# - got the primary's uuid, so cut here
node.getConnection().close()
return
logging.info('connected to a primary master node')
self.num_partitions = num_partitions
self.num_replicas = num_replicas
if self.uuid != your_uuid:
# got an uuid from the primary master
self.uuid = your_uuid
logging.info('Got a new UUID: %s', uuid_str(self.uuid))
self.accepted = True
def getPrimaryConnection(self, connector_handler):
"""
Primary lookup/connection process.
Returns when the connection is made.
"""
logging.info('connecting to a primary master node')
em, nm = self.app.em, self.app.nm
index = 0
self.current = None
conn = None
# retry until identified to the primary
while not self.accepted:
if self.current is None:
# conn closed
conn = None
# select a master
master_list = nm.getMasterList()
index = (index + 1) % len(master_list)
self.current = master_list[index]
if index == 0:
# tried all known masters, sleep a bit
sleep(1)
if conn is None:
# open the connection
conn = ClientConnection(em, self, self.current,
connector_handler())
# still processing
em.poll(1)
return (self.current, conn, self.uuid, self.num_partitions,
self.num_replicas)
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/config.py 0000664 0000000 0000000 00000006201 12000764302 0025074 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ConfigParser import SafeConfigParser, NoOptionError
from . import util
from .util import parseNodeAddress
class ConfigurationManager(object):
"""
Configuration manager that load options from a configuration file and
command line arguments
"""
def __init__(self, defaults, config_file, section, argument_list):
self.defaults = defaults
self.argument_list = argument_list
self.parser = None
if config_file is not None:
self.parser = SafeConfigParser(defaults)
self.parser.read(config_file)
self.section = section
def __get(self, key, optional=False):
value = self.argument_list.get(key)
if value is None:
if self.parser is None:
value = self.defaults.get(key)
else:
try:
value = self.parser.get(self.section, key)
except NoOptionError:
pass
if value is None and not optional:
raise RuntimeError("Option '%s' is undefined'" % (key, ))
return value
def getMasters(self):
""" Get the master node list except itself """
masters = self.__get('masters')
# load master node list except itself
return util.parseMasterList(masters, except_node=self.getBind())
def getBind(self):
""" Get the address to bind to """
bind = self.__get('bind')
return parseNodeAddress(bind, 0)
def getDatabase(self):
return self.__get('database')
def getWait(self):
return self.__get('wait')
def getDynamicMasterList(self):
return self.__get('dynamic_master_list', optional=True)
def getAdapter(self):
return self.__get('adapter')
def getCluster(self):
cluster = self.__get('cluster')
assert cluster != '', "Cluster name must be non-empty"
return cluster
def getReplicas(self):
return int(self.__get('replicas'))
def getPartitions(self):
return int(self.__get('partitions'))
def getReset(self):
# only from command line
return self.argument_list.get('reset', False)
def getUUID(self):
# only from command line
uuid = self.argument_list.get('uuid', None)
if uuid:
return int(uuid)
def getUpstreamCluster(self):
return self.__get('upstream_cluster', True)
def getUpstreamMasters(self):
return util.parseMasterList(self.__get('upstream_masters'))
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/connection.py 0000664 0000000 0000000 00000063720 12000764302 0025777 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from functools import wraps
from time import time
from . import attributeTracker, logging
from .connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException, \
ConnectorConnectionClosedException
from .locking import RLock
from .profiling import profiler_decorator
from .protocol import uuid_str, Errors, \
PacketMalformedError, Packets, ParserState
from .util import ReadBuffer
CRITICAL_TIMEOUT = 30
class ConnectionClosed(Exception):
pass
def not_closed(func):
def decorator(self, *args, **kw):
if self.connector is None:
raise ConnectorConnectionClosedException
return func(self, *args, **kw)
return wraps(func)(decorator)
def lockCheckWrapper(func):
"""
This function is to be used as a wrapper around
MT(Client|Server)Connection class methods.
It uses a "_" method on RLock class, so it might stop working without
notice (sadly, RLock does not offer any "acquired" method, but that one
will do as it checks that current thread holds this lock).
It requires moniroted class to have an RLock instance in self._lock
property.
"""
def wrapper(self, *args, **kw):
if not self._lock._is_owned():
import traceback
logging.warning('%s called on %s instance without being locked.'
' Stack:\n%s', func.func_code.co_name,
self.__class__.__name__, ''.join(traceback.format_stack()))
# Call anyway
return func(self, *args, **kw)
return wraps(func)(wrapper)
class OnTimeout(object):
"""
Simple helper class for on_timeout parameter used in HandlerSwitcher
class.
"""
def __init__(self, func, *args, **kw):
self.func = func
self.args = args
self.kw = kw
def __call__(self, conn, msg_id):
return self.func(conn, msg_id, *self.args, **self.kw)
class HandlerSwitcher(object):
_next_timeout = None
_next_timeout_msg_id = None
_next_on_timeout = None
def __init__(self, handler):
# pending handlers and related requests
self._pending = [[{}, handler]]
self._is_handling = False
def clear(self):
self.__init__(self.getLastHandler())
try:
del (self._next_timeout,
self._next_timeout_msg_id,
self._next_on_timeout)
except AttributeError:
pass
def isPending(self):
return bool(self._pending[0][0])
def cancelRequests(self, conn, message):
if self.isPending():
p = Errors.ProtocolError(message)
while True:
request_dict, handler = self._pending[0]
while request_dict:
msg_id, request = request_dict.popitem()
p.setId(msg_id)
handler.packetReceived(conn, p, request[3])
if len(self._pending) == 1:
break
del self._pending[0]
def getHandler(self):
return self._pending[0][1]
def getLastHandler(self):
""" Return the last (may be unapplied) handler registered """
return self._pending[-1][1]
@profiler_decorator
def emit(self, request, timeout, on_timeout, kw={}):
# register the request in the current handler
_pending = self._pending
if self._is_handling:
# If this is called while handling a packet, the response is to
# be excpected for the current handler...
(request_dict, _) = _pending[0]
else:
# ...otherwise, queue for the latest handler
assert len(_pending) == 1 or _pending[0][0]
(request_dict, _) = _pending[-1]
msg_id = request.getId()
answer_class = request.getAnswerClass()
assert answer_class is not None, "Not a request"
assert msg_id not in request_dict, "Packet id already expected"
next_timeout = self._next_timeout
if next_timeout is None or timeout < next_timeout:
self._next_timeout = timeout
self._next_timeout_msg_id = msg_id
self._next_on_timeout = on_timeout
request_dict[msg_id] = answer_class, timeout, on_timeout, kw
def getNextTimeout(self):
return self._next_timeout
def timeout(self, connection):
msg_id = self._next_timeout_msg_id
if self._next_on_timeout is not None:
self._next_on_timeout(connection, msg_id)
if self._next_timeout_msg_id != msg_id:
# on_timeout sent a packet with a smaller timeout
# so keep the connection open
return
# Notify that a timeout occured
return msg_id
def handle(self, connection, packet):
assert not self._is_handling
self._is_handling = True
try:
self._handle(connection, packet)
finally:
self._is_handling = False
@profiler_decorator
def _handle(self, connection, packet):
assert len(self._pending) == 1 or self._pending[0][0]
logging.packet(connection, packet, False)
if connection.isClosed() and packet.ignoreOnClosedConnection():
logging.debug('Ignoring packet %r on closed connection %r',
packet, connection)
return
msg_id = packet.getId()
(request_dict, handler) = self._pending[0]
# notifications are not expected
if not packet.isResponse():
handler.packetReceived(connection, packet)
return
# checkout the expected answer class
try:
klass, _, _, kw = request_dict.pop(msg_id)
except KeyError:
klass = None
kw = {}
if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(connection, packet, kw)
else:
logging.error('Unexpected answer %r in %r', packet, connection)
if not connection.isClosed():
notification = Packets.Notify('Unexpected answer: %r' % packet)
connection.notify(notification)
connection.abort()
# handler.peerBroken(connection)
# apply a pending handler if no more answers are pending
while len(self._pending) > 1 and not self._pending[0][0]:
del self._pending[0]
logging.debug('Apply handler %r on %r', self._pending[0][1],
connection)
if msg_id == self._next_timeout_msg_id:
self._updateNextTimeout()
def _updateNextTimeout(self):
# Find next timeout and its msg_id
next_timeout = None
for pending in self._pending:
for msg_id, (_, timeout, on_timeout, _) in pending[0].iteritems():
if not next_timeout or timeout < next_timeout[0]:
next_timeout = timeout, msg_id, on_timeout
self._next_timeout, self._next_timeout_msg_id, self._next_on_timeout = \
next_timeout or (None, None, None)
@profiler_decorator
def setHandler(self, handler):
can_apply = len(self._pending) == 1 and not self._pending[0][0]
if can_apply:
# nothing is pending, change immediately
self._pending[0][1] = handler
else:
# put the next handler in queue
self._pending.append([{}, handler])
return can_apply
class BaseConnection(object):
"""A base connection
About timeouts:
Timeout are mainly per-connection instead of per-packet.
The idea is that most of time, packets are received and processed
sequentially, so if it takes a long for a peer to process a packet,
following packets would just be enqueued.
What really matters is that the peer makes progress in its work.
As long as we receive an answer, we consider it's still alive and
it may just have started to process the following request. So we reset
timeouts.
There is anyway nothing more we could do, because processing of a packet
may be delayed in a very unpredictable way depending of previously
received packets on peer side.
Even ourself may be slow to receive a packet. We must not timeout for
an answer that is already in our incoming buffer (read_buf or _queue).
Timeouts in HandlerSwitcher are only there to prioritize some packets.
"""
KEEP_ALIVE = 60
def __init__(self, event_manager, handler, connector, addr=None):
assert connector is not None, "Need a low-level connector"
self.em = event_manager
self.connector = connector
self.addr = addr
self._handlers = HandlerSwitcher(handler)
event_manager.register(self)
event_manager.addReader(self)
# XXX: do not use getHandler
getHandler = property(lambda self: self._handlers.getHandler)
getLastHandler = property(lambda self: self._handlers.getLastHandler)
isPending = property(lambda self: self._handlers.isPending)
def cancelRequests(self, *args, **kw):
return self._handlers.cancelRequests(self, *args, **kw)
def checkTimeout(self, t):
pass
def lock(self):
return 1
def unlock(self):
return None
def getConnector(self):
return self.connector
def getAddress(self):
return self.addr
def readable(self):
raise NotImplementedError
def writable(self):
raise NotImplementedError
def close(self):
"""Close the connection."""
if self.connector is not None:
em = self.em
em.removeReader(self)
em.removeWriter(self)
em.unregister(self)
self.connector.shutdown()
self.connector.close()
self.connector = None
self.aborted = False
def __repr__(self):
address = self.addr and '%s:%d' % self.addr or '?'
return '<%s(uuid=%s, address=%s, closed=%s, handler=%s) at %x>' % (
self.__class__.__name__,
uuid_str(self.getUUID()),
address,
int(self.isClosed()),
self.getHandler(),
id(self),
)
__del__ = close
def setHandler(self, handler):
if self._handlers.setHandler(handler):
logging.debug('Set handler %r on %r', handler, self)
else:
logging.debug('Delay handler %r on %r', handler, self)
def getEventManager(self):
return self.em
def getUUID(self):
return None
def isClosed(self):
return self.connector is None or self.isAborted()
def isAborted(self):
return False
def isListening(self):
return False
def isServer(self):
return False
def isClient(self):
return False
def hasPendingMessages(self):
return False
def whoSetConnector(self):
"""
Debugging method: call this method to know who set the current
connector value.
"""
return attributeTracker.whoSet(self, 'connector')
def idle(self):
pass
attributeTracker.track(BaseConnection)
class ListeningConnection(BaseConnection):
"""A listen connection."""
def __init__(self, event_manager, handler, addr, connector, **kw):
logging.debug('listening to %s:%d', *addr)
BaseConnection.__init__(self, event_manager, handler,
addr=addr, connector=connector)
self.connector.makeListeningConnection(addr)
def readable(self):
try:
new_s, addr = self.connector.getNewConnection()
logging.debug('accepted a connection from %s:%d', *addr)
handler = self.getHandler()
new_conn = ServerConnection(self.getEventManager(), handler,
connector=new_s, addr=addr)
handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
pass
def getAddress(self):
return self.connector.getAddress()
def writable(self):
return False
def isListening(self):
return True
class Connection(BaseConnection):
"""A connection."""
connecting = False
client = False
server = False
peer_id = None
_base_timeout = None
def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer()
self.write_buf = []
self.cur_id = 0
self.aborted = False
self.uuid = None
self._queue = []
self._on_close = None
self._parser_state = ParserState()
def setOnClose(self, callback):
self._on_close = callback
def isClient(self):
return self.client
def isServer(self):
return self.server
def asClient(self):
try:
del self.idle
assert self.client
except AttributeError:
self.client = True
def asServer(self):
self.server = True
def _closeClient(self):
if self.server:
del self.idle
self.client = False
self.notify(Packets.CloseClient())
else:
self.close()
def closeClient(self):
if self.connector is not None and self.client:
self.idle = self._closeClient
def isAborted(self):
return self.aborted
def getUUID(self):
return self.uuid
def setUUID(self, uuid):
self.uuid = uuid
def setPeerId(self, peer_id):
assert peer_id is not None
self.peer_id = peer_id
def getPeerId(self):
return self.peer_id
@profiler_decorator
def _getNextId(self):
next_id = self.cur_id
self.cur_id = (next_id + 1) & 0xffffffff
return next_id
def updateTimeout(self, t=None):
if not self._queue:
if t:
self._base_timeout = t
self._timeout = self._handlers.getNextTimeout() or self.KEEP_ALIVE
def checkTimeout(self, t):
# first make sure we don't timeout on answers we already received
if self._base_timeout and not self._queue:
timeout = t - self._base_timeout
if self._timeout <= timeout:
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.timeout(self)
if msg_id is None:
self._base_timeout = t
else:
logging.info('timeout for #0x%08x with %r',
msg_id, self)
self.close()
else:
self.idle()
def abort(self):
"""Abort dealing with this connection."""
logging.debug('aborting a connector for %r', self)
self.aborted = True
assert self.write_buf
if self._on_close is not None:
self._on_close()
self._on_close = None
def writable(self):
"""Called when self is writable."""
self._send()
if not self.write_buf and self.connector is not None:
if self.aborted:
self.close()
else:
self.em.removeWriter(self)
def readable(self):
"""Called when self is readable."""
self._recv()
self.analyse()
if self.aborted:
self.em.removeReader(self)
def analyse(self):
"""Analyse received data."""
try:
while True:
packet = Packets.parse(self.read_buf, self._parser_state)
if packet is None:
break
self._queue.append(packet)
except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', self, e)
self._closure()
def hasPendingMessages(self):
"""
Returns True if there are messages queued and awaiting processing.
"""
return len(self._queue) != 0
def process(self):
"""
Process a pending packet.
"""
# check out packet and process it with current handler
packet = self._queue.pop(0)
self._handlers.handle(self, packet)
self.updateTimeout()
def pending(self):
return self.connector is not None and self.write_buf
def close(self):
if self.connector is None:
assert self._on_close is None
assert not self.read_buf
assert not self.write_buf
assert not self.isPending()
return
# process the network events with the last registered handler to
# solve issues where a node is lost with pending handlers and
# create unexpected side effects.
handler = self._handlers.getLastHandler()
super(Connection, self).close()
if self._on_close is not None:
self._on_close()
self._on_close = None
del self.write_buf[:]
self.read_buf.clear()
if self.connecting:
handler.connectionFailed(self)
self.connecting = False
else:
handler.connectionClosed(self)
self._handlers.clear()
def _closure(self):
assert self.connector is not None, self.whoSetConnector()
while self._queue:
self._handlers.handle(self, self._queue.pop(0))
self.close()
@profiler_decorator
def _recv(self):
"""Receive data from a connector."""
try:
data = self.connector.receive()
except ConnectorTryAgainException:
pass
except ConnectorConnectionRefusedException:
assert self.connecting
self._closure()
except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false
logging.debug('Connection reset by peer: %r', self.connector)
self._closure()
except:
logging.debug('Unknown connection error: %r', self.connector)
self._closure()
# unhandled connector exception
raise
else:
if not data:
logging.debug('Connection %r closed in recv', self.connector)
self._closure()
return
self._base_timeout = time() # last known remote activity
self.read_buf.append(data)
@profiler_decorator
def _send(self):
"""Send data to a connector."""
if not self.write_buf:
return
msg = ''.join(self.write_buf)
try:
n = self.connector.send(msg)
except ConnectorTryAgainException:
pass
except ConnectorConnectionClosedException:
# connection resetted by peer
logging.debug('Connection reset by peer: %r', self.connector)
self._closure()
except:
logging.debug('Unknown connection error: %r', self.connector)
# unhandled connector exception
self._closure()
raise
else:
if not n:
logging.debug('Connection %r closed in send', self.connector)
self._closure()
return
if n == len(msg):
del self.write_buf[:]
else:
self.write_buf = [msg[n:]]
@profiler_decorator
def _addPacket(self, packet):
"""Add a packet into the write buffer."""
if self.connector is None:
return
was_empty = not self.write_buf
self.write_buf.extend(packet.encode())
if was_empty:
# enable polling for writing.
self.em.addWriter(self)
logging.packet(self, packet, True)
@not_closed
def notify(self, packet):
""" Then a packet with a new ID """
msg_id = self._getNextId()
packet.setId(msg_id)
self._addPacket(packet)
return msg_id
@profiler_decorator
@not_closed
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, **kw):
"""
Send a packet with a new ID and register the expectation of an answer
"""
msg_id = self._getNextId()
packet.setId(msg_id)
self._addPacket(packet)
handlers = self._handlers
t = not handlers.isPending() and time() or None
handlers.emit(packet, timeout, on_timeout, kw)
self.updateTimeout(t)
return msg_id
@not_closed
def answer(self, packet, msg_id=None):
""" Answer to a packet by re-using its ID for the packet answer """
if msg_id is None:
msg_id = self.getPeerId()
packet.setId(msg_id)
assert packet.isResponse(), packet
self._addPacket(packet)
def idle(self):
self.ask(Packets.Ping())
class ClientConnection(Connection):
"""A connection from this node to a remote node."""
connecting = True
client = True
def __init__(self, event_manager, handler, node, connector):
addr = node.getAddress()
Connection.__init__(self, event_manager, handler, connector, addr)
node.setConnection(self)
handler.connectionStarted(self)
try:
try:
self.connector.makeClientConnection(addr)
except ConnectorInProgressException:
event_manager.addWriter(self)
else:
self._connectionCompleted()
except ConnectorConnectionRefusedException:
self._closure()
except ConnectorException:
# unhandled connector exception
self._closure()
raise
def writable(self):
"""Called when self is writable."""
if self.connector.getError():
self._closure()
else:
self._connectionCompleted()
self.writable()
def _connectionCompleted(self):
self.writable = super(ClientConnection, self).writable
self.connecting = False
self.updateTimeout(time())
self.getHandler().connectionCompleted(self)
class ServerConnection(Connection):
"""A connection from a remote node to this node."""
# Both server and client must check the connection, in case:
# - the remote crashed brutally (i.e. without closing TCP connections)
# - or packets sent by the remote are dropped (network failure)
# Use different timeout so that in normal condition, server never has to
# ping the client. Otherwise, it would do it about half of the time.
KEEP_ALIVE = Connection.KEEP_ALIVE + 5
server = True
def __init__(self, *args, **kw):
Connection.__init__(self, *args, **kw)
self.updateTimeout(time())
class MTClientConnection(ClientConnection):
"""A Multithread-safe version of ClientConnection."""
def __init__(self, *args, **kwargs):
# _lock is only here for lock debugging purposes. Do not use.
self._lock = lock = RLock()
self.acquire = lock.acquire
self.release = lock.release
self.dispatcher = kwargs.pop('dispatcher')
self.dispatcher.needPollThread()
self.lock()
try:
super(MTClientConnection, self).__init__(*args, **kwargs)
finally:
self.unlock()
def lock(self, blocking = 1):
return self.acquire(blocking = blocking)
def unlock(self):
self.release()
@lockCheckWrapper
def writable(self, *args, **kw):
return super(MTClientConnection, self).writable(*args, **kw)
@lockCheckWrapper
def readable(self, *args, **kw):
return super(MTClientConnection, self).readable(*args, **kw)
@lockCheckWrapper
def analyse(self, *args, **kw):
return super(MTClientConnection, self).analyse(*args, **kw)
def notify(self, *args, **kw):
self.lock()
try:
return super(MTClientConnection, self).notify(*args, **kw)
finally:
self.unlock()
@profiler_decorator
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None,
queue=None, **kw):
self.lock()
try:
if self.isClosed():
raise ConnectionClosed
# XXX: Here, we duplicate Connection.ask because we need to call
# self.dispatcher.register after setId is called and before
# _addPacket is called.
msg_id = self._getNextId()
packet.setId(msg_id)
if queue is None:
if type(packet) is not Packets.Ping:
raise TypeError, 'Only Ping packet can be asked ' \
'without a queue, got a %r.' % (packet, )
else:
self.dispatcher.register(self, msg_id, queue)
self._addPacket(packet)
handlers = self._handlers
t = not handlers.isPending() and time() or None
handlers.emit(packet, timeout, on_timeout, kw)
self.updateTimeout(t)
return msg_id
finally:
self.unlock()
@lockCheckWrapper
def answer(self, *args, **kw):
return super(MTClientConnection, self).answer(*args, **kw)
@lockCheckWrapper
def checkTimeout(self, *args, **kw):
return super(MTClientConnection, self).checkTimeout(*args, **kw)
def close(self):
self.lock()
try:
super(MTClientConnection, self).close()
finally:
self.release()
@lockCheckWrapper
def process(self, *args, **kw):
return super(MTClientConnection, self).process(*args, **kw)
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/connector.py 0000664 0000000 0000000 00000015463 12000764302 0025633 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2009-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import socket
import errno
# Global connector registry.
# Fill by calling registerConnectorHandler.
# Read by calling getConnectorHandler.
connector_registry = {}
DEFAULT_CONNECTOR = 'SocketConnectorIPv4'
def registerConnectorHandler(connector_handler):
connector_registry[connector_handler.__name__] = connector_handler
def getConnectorHandler(connector=None):
if connector is None:
connector = DEFAULT_CONNECTOR
if isinstance(connector, basestring):
connector_handler = connector_registry.get(connector)
else:
# Allow to directly provide a handler class without requiring to
# register it first.
connector_handler = connector
return connector_handler
class SocketConnector:
""" This class is a wrapper for a socket """
is_listening = False
remote_addr = None
is_closed = None
def __init__(self, s=None, accepted_from=None):
self.accepted_from = accepted_from
if accepted_from is not None:
self.remote_addr = accepted_from
self.is_listening = False
self.is_closed = False
if s is None:
self.socket = socket.socket(self.af_type, socket.SOCK_STREAM)
else:
self.socket = s
self.socket_fd = self.socket.fileno()
# always use non-blocking sockets
self.socket.setblocking(0)
# disable Nagle algorithm to reduce latency
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
def makeClientConnection(self, addr):
self.is_closed = False
self.remote_addr = addr
try:
self.socket.connect(addr)
except socket.error, (err, errmsg):
if err == errno.EINPROGRESS:
raise ConnectorInProgressException
if err == errno.ECONNREFUSED:
raise ConnectorConnectionRefusedException
raise ConnectorException, 'makeClientConnection to %s failed:' \
' %s:%s' % (addr, err, errmsg)
def makeListeningConnection(self, addr):
self.is_closed = False
self.is_listening = True
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(addr)
self.socket.listen(5)
except socket.error, (err, errmsg):
self.socket.close()
raise ConnectorException, 'makeListeningConnection on %s failed:' \
' %s:%s' % (addr, err, errmsg)
def getError(self):
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
def getAddress(self):
raise NotImplementedError
def getDescriptor(self):
# this descriptor must only be used by the event manager, where it
# guarantee unicity only while the connector is opened and registered
# in epoll
return self.socket_fd
def getNewConnection(self):
try:
(new_s, addr) = self._accept()
new_s = self.__class__(new_s, accepted_from=addr)
return (new_s, addr)
except socket.error, (err, errmsg):
if err == errno.EAGAIN:
raise ConnectorTryAgainException
raise ConnectorException, 'getNewConnection failed: %s:%s' % \
(err, errmsg)
def shutdown(self):
# This may fail if the socket is not connected.
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
def receive(self):
try:
return self.socket.recv(4096)
except socket.error, (err, errmsg):
if err == errno.EAGAIN:
raise ConnectorTryAgainException
if err in (errno.ECONNREFUSED, errno.EHOSTUNREACH):
raise ConnectorConnectionRefusedException
if err in (errno.ECONNRESET, errno.ETIMEDOUT):
raise ConnectorConnectionClosedException
raise ConnectorException, 'receive failed: %s:%s' % (err, errmsg)
def send(self, msg):
try:
return self.socket.send(msg)
except socket.error, (err, errmsg):
if err == errno.EAGAIN:
raise ConnectorTryAgainException
if err in (errno.ECONNRESET, errno.ETIMEDOUT, errno.EPIPE):
raise ConnectorConnectionClosedException
raise ConnectorException, 'send failed: %s:%s' % (err, errmsg)
def close(self):
self.is_closed = True
return self.socket.close()
def __repr__(self):
if self.is_closed:
fileno = '?'
else:
fileno = self.socket_fd
result = '<%s at 0x%x fileno %s %s, ' % (self.__class__.__name__,
id(self), fileno, self.socket.getsockname())
if self.is_closed is None:
result += 'never opened'
else:
if self.is_closed:
result += 'closed '
else:
result += 'opened '
if self.is_listening:
result += 'listening'
else:
if self.accepted_from is None:
result += 'to'
else:
result += 'from'
result += ' %s' % (self.remote_addr, )
return result + '>'
def _accept(self):
raise NotImplementedError
class SocketConnectorIPv4(SocketConnector):
" Wrapper for IPv4 sockets"
af_type = socket.AF_INET
def _accept(self):
return self.socket.accept()
def getAddress(self):
return self.socket.getsockname()
class SocketConnectorIPv6(SocketConnector):
" Wrapper for IPv6 sockets"
af_type = socket.AF_INET6
def _accept(self):
new_s, addr = self.socket.accept()
return new_s, addr[:2]
def getAddress(self):
return self.socket.getsockname()[:2]
registerConnectorHandler(SocketConnectorIPv4)
registerConnectorHandler(SocketConnectorIPv6)
class ConnectorException(Exception):
pass
class ConnectorTryAgainException(ConnectorException):
pass
class ConnectorInProgressException(ConnectorException):
pass
class ConnectorConnectionClosedException(ConnectorException):
pass
class ConnectorConnectionRefusedException(ConnectorException):
pass
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/debug.py 0000664 0000000 0000000 00000005677 12000764302 0024735 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2010-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import traceback
import signal
import imp
import os
import sys
from functools import wraps
import neo
# kill -RTMIN+1
# Dump information to logs.
# kill -RTMIN+2
# Loads (or reloads) neo.debug module.
# The content is up to you (it's only imported). It can be a breakpoint.
def safe_handler(func):
def wrapper(sig, frame):
try:
func(sig, frame)
except:
# Prevent exception from exiting signal handler, so mistakes in
# "debug" module don't kill process.
traceback.print_exc()
return wraps(func)(wrapper)
@safe_handler
def debugHandler(sig, frame):
file, filename, (suffix, mode, type) = imp.find_module('debug',
neo.__path__)
imp.load_module('neo.debug', file, filename, (suffix, mode, type))
def getPdb():
try: # try ipython if available
import IPython
try:
shell = get_ipython()
except NameError:
shell = IPython.frontend.terminal.embed.InteractiveShellEmbed()
return IPython.core.debugger.Pdb(shell.colors)
except ImportError:
import pdb
return pdb.Pdb()
_debugger = None
def winpdb(depth=0):
import rpdb2
depth += 1
if rpdb2.g_debugger is not None:
return rpdb2.setbreak(depth)
script = rpdb2.calc_frame_path(sys._getframe(depth))
pwd = str(os.getpid()) + os.getcwd().replace('/', '_').replace('-', '_')
pid = os.fork()
if pid:
try:
rpdb2.start_embedded_debugger(pwd, depth=depth)
finally:
os.waitpid(pid, 0)
else:
try:
os.execlp('python', 'python', '-c', """import os\nif not os.fork():
import rpdb2, winpdb
rpdb2_raw_input = rpdb2._raw_input
rpdb2._raw_input = lambda s: \
s == rpdb2.STR_PASSWORD_INPUT and %r or rpdb2_raw_input(s)
winpdb.g_ignored_warnings[winpdb.STR_EMBEDDED_WARNING] = True
winpdb.main()
""" % pwd, '-a', script)
finally:
os.abort()
def register(on_log=None):
if on_log is not None:
@safe_handler
def on_log_signal(signum, signal):
on_log()
signal.signal(signal.SIGRTMIN+1, on_log_signal)
signal.signal(signal.SIGRTMIN+2, debugHandler)
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/dispatcher.py 0000664 0000000 0000000 00000013313 12000764302 0025757 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from functools import wraps
from .locking import Lock, Empty
from .profiling import profiler_decorator
EMPTY = {}
NOBODY = []
class ForgottenPacket(object):
"""
Instances of this class will be pushed to queue when an expected answer
is being forgotten. Its purpose is similar to pushing "None" when
connection is closed, but the meaning is different.
"""
def __init__(self, msg_id):
self.msg_id = msg_id
def getId(self):
return self.msg_id
def giant_lock(func):
def wrapped(self, *args, **kw):
self.lock_acquire()
try:
return func(self, *args, **kw)
finally:
self.lock_release()
return wraps(func)(wrapped)
class Dispatcher:
"""Register a packet, connection pair as expecting a response packet."""
def __init__(self, poll_thread=None):
self.message_table = {}
self.queue_dict = {}
lock = Lock()
self.lock_acquire = lock.acquire
self.lock_release = lock.release
self.poll_thread = poll_thread
@giant_lock
@profiler_decorator
def dispatch(self, conn, msg_id, packet, kw):
"""
Retrieve register-time provided queue, and put conn and packet in it.
"""
queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None)
if queue is None:
return False
elif queue is NOBODY:
return True
self._decrefQueue(queue)
queue.put((conn, packet, kw))
return True
def _decrefQueue(self, queue):
queue_id = id(queue)
queue_dict = self.queue_dict
if queue_dict[queue_id] == 1:
queue_dict.pop(queue_id)
else:
queue_dict[queue_id] -= 1
def _increfQueue(self, queue):
queue_id = id(queue)
queue_dict = self.queue_dict
try:
queue_dict[queue_id] += 1
except KeyError:
queue_dict[queue_id] = 1
def needPollThread(self):
self.poll_thread.start()
@giant_lock
@profiler_decorator
def register(self, conn, msg_id, queue):
"""Register an expectation for a reply."""
if self.poll_thread is not None:
self.needPollThread()
self.message_table.setdefault(id(conn), {})[msg_id] = queue
self._increfQueue(queue)
@profiler_decorator
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads excepting responses from that connection """
self.lock_acquire()
try:
message_table = self.message_table.pop(id(conn), EMPTY)
finally:
self.lock_release()
notified_set = set()
_decrefQueue = self._decrefQueue
for queue in message_table.itervalues():
if queue is NOBODY:
continue
queue_id = id(queue)
if queue_id not in notified_set:
queue.put((conn, None, None))
notified_set.add(queue_id)
_decrefQueue(queue)
@giant_lock
@profiler_decorator
def forget(self, conn, msg_id):
""" Forget about a specific message for a specific connection.
Actually makes it "expected by nobody", so we know we can ignore it,
and not detect it as an error. """
message_table = self.message_table[id(conn)]
queue = message_table[msg_id]
if queue is NOBODY:
raise KeyError, 'Already expected by NOBODY: %r, %r' % (
conn, msg_id)
queue.put((conn, ForgottenPacket(msg_id), None))
self.queue_dict[id(queue)] -= 1
message_table[msg_id] = NOBODY
return queue
@giant_lock
@profiler_decorator
def forget_queue(self, queue, flush_queue=True):
"""
Forget all pending messages for given queue.
Actually makes them "expected by nobody", so we know we can ignore
them, and not detect it as an error.
flush_queue (boolean, default=True)
All packets in queue get flushed.
"""
# XXX: expensive lookup: we iterate over the whole dict
found = 0
for message_table in self.message_table.itervalues():
for msg_id, t_queue in message_table.iteritems():
if queue is t_queue:
found += 1
message_table[msg_id] = NOBODY
refcount = self.queue_dict.pop(id(queue), 0)
if refcount != found:
raise ValueError('We hit a refcount bug: %s queue uses ' \
'expected, %s found' % (refcount, found))
if flush_queue:
get = queue.get
while True:
try:
get(block=False)
except Empty:
break
@profiler_decorator
def registered(self, conn):
"""Check if a connection is registered into message table."""
return len(self.message_table.get(id(conn), EMPTY)) != 0
@giant_lock
@profiler_decorator
def pending(self, queue):
return not queue.empty() or self.queue_dict.get(id(queue), 0) > 0
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/event.py 0000664 0000000 0000000 00000017011 12000764302 0024751 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EINTR, EAGAIN
from . import logging
from .profiling import profiler_decorator
class EpollEventManager(object):
"""This class manages connections and events based on epoll(5)."""
def __init__(self):
self.connection_dict = {}
self.reader_set = set()
self.writer_set = set()
self.epoll = epoll()
self._pending_processing = []
def close(self):
for c in self.connection_dict.values():
c.close()
del self.__dict__
def getConnectionList(self):
# XXX: use index
return [x for x in self.connection_dict.itervalues()
if not x.isAborted()]
def getClientList(self):
# XXX: use index
return [c for c in self.getConnectionList() if c.isClient()]
def getServerList(self):
# XXX: use index
return [c for c in self.getConnectionList() if c.isServer()]
def getConnectionListByUUID(self, uuid):
""" Return the connection associated to the UUID, None if the UUID is
None, invalid or not found"""
# XXX: use index
# XXX: consider remove UUID from connection and thus this method
if uuid is None:
return None
result = []
append = result.append
for conn in self.getConnectionList():
if conn.getUUID() == uuid:
append(conn)
return result
def register(self, conn):
fd = conn.getConnector().getDescriptor()
self.connection_dict[fd] = conn
self.epoll.register(fd)
def unregister(self, conn):
new_pending_processing = [x for x in self._pending_processing
if x is not conn]
# Check that we removed at most one entry from
# self._pending_processing .
assert len(new_pending_processing) > len(self._pending_processing) - 2
self._pending_processing = new_pending_processing
fd = conn.getConnector().getDescriptor()
self.epoll.unregister(fd)
del self.connection_dict[fd]
def isIdle(self):
return not (self._pending_processing or self.writer_set)
def _addPendingConnection(self, conn):
pending_processing = self._pending_processing
if conn not in pending_processing:
pending_processing.append(conn)
def poll(self, timeout=1):
if not self._pending_processing:
# Fetch messages from polled file descriptors
self._poll(timeout=timeout)
if not self._pending_processing:
return
to_process = self._pending_processing.pop(0)
to_process.lock()
try:
try:
to_process.process()
finally:
# ...and requeue if there are pending messages
if to_process.hasPendingMessages():
self._addPendingConnection(to_process)
finally:
to_process.unlock()
# Non-blocking call: as we handled a packet, we should just offer
# poll a chance to fetch & send already-available data, but it must
# not delay us.
self._poll(timeout=0)
def _poll(self, timeout=1):
try:
event_list = self.epoll.poll(timeout)
except IOError, exc:
if exc.errno in (0, EAGAIN):
logging.info('epoll.poll triggered undocumented error %r',
exc.errno)
elif exc.errno != EINTR:
raise
event_list = ()
wlist = []
elist = []
for fd, event in event_list:
if event & EPOLLIN:
conn = self.connection_dict[fd]
conn.lock()
try:
conn.readable()
finally:
conn.unlock()
if conn.hasPendingMessages():
self._addPendingConnection(conn)
if event & EPOLLOUT:
wlist.append(fd)
if event & (EPOLLERR | EPOLLHUP):
elist.append(fd)
for fd in wlist:
# This can fail, if a connection is closed in readable().
try:
conn = self.connection_dict[fd]
except KeyError:
continue
conn.lock()
try:
conn.writable()
finally:
conn.unlock()
for fd in elist:
# This can fail, if a connection is closed in previous calls to
# readable() or writable().
try:
conn = self.connection_dict[fd]
except KeyError:
continue
conn.lock()
try:
conn.readable()
finally:
conn.unlock()
if conn.hasPendingMessages():
self._addPendingConnection(conn)
t = time()
for conn in self.connection_dict.values():
conn.lock()
try:
conn.checkTimeout(t)
finally:
conn.unlock()
def addReader(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd not in self.reader_set:
self.reader_set.add(fd)
self.epoll.modify(fd, EPOLLIN | (
fd in self.writer_set and EPOLLOUT))
def removeReader(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd in self.reader_set:
self.reader_set.remove(fd)
self.epoll.modify(fd, fd in self.writer_set and EPOLLOUT)
@profiler_decorator
def addWriter(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd not in self.writer_set:
self.writer_set.add(fd)
self.epoll.modify(fd, EPOLLOUT | (
fd in self.reader_set and EPOLLIN))
def removeWriter(self, conn):
connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector()
fd = connector.getDescriptor()
if fd in self.writer_set:
self.writer_set.remove(fd)
self.epoll.modify(fd, fd in self.reader_set and EPOLLIN)
def log(self):
logging.info('Event Manager:')
logging.info(' Readers: %r', list(self.reader_set))
logging.info(' Writers: %r', list(self.writer_set))
logging.info(' Connections:')
pending_set = set(self._pending_processing)
for fd, conn in self.connection_dict.items():
logging.info(' %r: %r (pending=%r)', fd, conn,
conn in pending_set)
# Default to EpollEventManager.
EventManager = EpollEventManager
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/exception.py 0000664 0000000 0000000 00000001665 12000764302 0025636 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
class NeoException(Exception):
pass
class ElectionFailure(NeoException):
pass
class PrimaryFailure(NeoException):
pass
class OperationFailure(NeoException):
pass
class DatabaseFailure(NeoException):
pass
class NodeNotReady(NeoException):
pass
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/handler.py 0000664 0000000 0000000 00000015006 12000764302 0025247 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from functools import wraps
from . import logging
from .protocol import (
NodeStates, Packets, ErrorCodes, Errors, BrokenNodeDisallowedError,
NotReadyError, PacketMalformedError, ProtocolError, UnexpectedPacketError)
class EventHandler(object):
"""This class handles events."""
def __init__(self, app):
self.app = app
def __repr__(self):
return self.__class__.__name__
def __unexpectedPacket(self, conn, packet, message=None):
"""Handle an unexpected packet."""
if message is None:
message = 'unexpected packet type %s in %s' % (type(packet),
self.__class__.__name__)
else:
message = 'unexpected packet: %s in %s' % (message,
self.__class__.__name__)
logging.error(message)
conn.answer(Errors.ProtocolError(message))
conn.abort()
# self.peerBroken(conn)
def dispatch(self, conn, packet, kw={}):
"""This is a helper method to handle various packet types."""
try:
conn.setPeerId(packet.getId())
try:
method = getattr(self, packet.handler_method_name)
except AttributeError:
raise UnexpectedPacketError('no handler found')
args = packet.decode() or ()
method(conn, *args, **kw)
except UnexpectedPacketError, e:
if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args)
except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', conn, e)
conn.close()
# self.peerBroken(conn)
except BrokenNodeDisallowedError:
if not conn.isClosed():
conn.answer(Errors.BrokenNode('go away'))
conn.abort()
except NotReadyError, message:
if not conn.isClosed():
if not message.args:
message = 'Retry Later'
message = str(message)
conn.answer(Errors.NotReady(message))
conn.abort()
except ProtocolError, message:
if not conn.isClosed():
message = str(message)
conn.answer(Errors.ProtocolError(message))
conn.abort()
def checkClusterName(self, name):
# raise an exception if the given name mismatch the current cluster name
if self.app.name != name:
logging.error('reject an alien cluster')
raise ProtocolError('invalid cluster name')
# Network level handlers
def packetReceived(self, *args):
"""Called when a packet is received."""
self.dispatch(*args)
def connectionStarted(self, conn):
"""Called when a connection is started."""
logging.debug('connection started for %r', conn)
def connectionCompleted(self, conn):
"""Called when a connection is completed."""
logging.debug('connection completed for %r (from %s:%u)',
conn, *conn.getConnector().getAddress())
def connectionFailed(self, conn):
"""Called when a connection failed."""
logging.debug('connection failed for %r', conn)
def connectionAccepted(self, conn):
"""Called when a connection is accepted."""
def connectionClosed(self, conn):
"""Called when a connection is closed by the peer."""
logging.debug('connection closed for %r', conn)
self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
#def peerBroken(self, conn):
# """Called when a peer is broken."""
# logging.error('%r is broken', conn)
# # NodeStates.BROKEN
def connectionLost(self, conn, new_state):
""" this is a method to override in sub-handlers when there is no need
to make distinction from the kind event that closed the connection """
pass
# Packet handlers.
def acceptIdentification(self, conn, node_type, *args):
try:
acceptIdentification = self._acceptIdentification
except AttributeError:
raise UnexpectedPacketError('no handler found')
if conn.isClosed():
# acceptIdentification received on a closed (probably aborted,
# actually) connection. Reject any further packet as unexpected.
conn.setHandler(EventHandler(self.app))
return
node = self.app.nm.getByAddress(conn.getAddress())
assert node.getConnection() is conn, (node.getConnection(), conn)
if node.getType() == node_type:
node.setIdentified()
acceptIdentification(node, *args)
return
conn.close()
def ping(self, conn):
conn.answer(Packets.Pong())
def pong(self, conn):
# Ignore PONG packets. The only purpose of ping/pong packets is
# to test/maintain underlying connection.
pass
def notify(self, conn, message):
logging.warning('notification from %r: %s', conn, message)
def closeClient(self, conn):
conn.server = False
if not conn.client:
conn.close()
# Error packet handlers.
def error(self, conn, code, message):
try:
getattr(self, Errors[code])(conn, message)
except (AttributeError, ValueError):
raise UnexpectedPacketError(message)
def protocolError(self, conn, message):
# the connection should have been closed by the remote peer
logging.error('protocol error: %s', message)
def timeoutError(self, conn, message):
logging.error('timeout error: %s', message)
def brokenNodeDisallowedError(self, conn, message):
raise RuntimeError, 'broken node disallowed error: %s' % (message,)
def alreadyPendingError(self, conn, message):
logging.error('already pending error: %s', message)
def ack(self, conn, message):
logging.debug("no error message: %s", message)
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/locking.py 0000664 0000000 0000000 00000014042 12000764302 0025257 0 ustar 00root root 0000000 0000000 from threading import Lock as threading_Lock
from threading import RLock as threading_RLock
from threading import currentThread
from Queue import Queue as Queue_Queue
from Queue import Empty
"""
Verbose locking classes.
Python threading module contains a simple logging mechanism, but:
- It's limitted to RLock class
- It's enabled instance by instance
- Choice to log or not is done at instanciation
- It does not emit any log before trying to acquire lock
This file defines a VerboseLock class implementing basic lock API and
logging in appropriate places with extensive details.
It can be globaly toggled by changing VERBOSE_LOCKING value.
There is no overhead at all when disabled (passthrough to threading
classes).
"""
__all__ = ['Lock', 'RLock', 'Queue', 'Empty']
VERBOSE_LOCKING = False
import traceback
import sys
import os
class LockUser(object):
def __init__(self, level=0):
self.ident = currentThread().getName()
# This class is instanciated from a place desiring to known what
# called it.
# limit=1 would return execution position in this method
# limit=2 would return execution position in caller
# limit=3 returns execution position in caller's caller
# Additionnal level value (should be positive only) can be used when
# more intermediate calls are involved
self.stack = stack = traceback.extract_stack()[:-(2 + level)]
path, line_number, func_name, line = stack[-1]
# Simplify path. Only keep 3 last path elements. It is enough for
# current Neo directory structure.
path = os.path.join('...', *path.split(os.path.sep)[-3:])
self.caller = (path, line_number, func_name, line)
def __eq__(self, other):
return isinstance(other, self.__class__) and self.ident == other.ident
def __repr__(self):
return '%s@%s:%s %s' % (self.ident, self.caller[0], self.caller[1],
self.caller[3])
def formatStack(self):
return ''.join(traceback.format_list(self.stack))
class VerboseLockBase(object):
def __init__(self, reentrant=False, debug_lock=False):
self.reentrant = reentrant
self.debug_lock = debug_lock
self.owner = None
self.waiting = []
self._note('%s@%X created by %r', self.__class__.__name__, id(self),
LockUser(1))
def _note(self, fmt, *args):
sys.stderr.write(fmt % args + '\n')
sys.stderr.flush()
def _getOwner(self):
if self._locked():
owner = self.owner
else:
owner = None
return owner
def acquire(self, blocking=1):
me = LockUser()
owner = self._getOwner()
self._note('[%r]%s.acquire(%s) Waiting for lock. Owned by:%r ' \
'Waiting:%r', me, self, blocking, owner, self.waiting)
if (self.debug_lock and owner is not None) or \
(not self.reentrant and blocking and me == owner):
if me == owner:
self._note('[%r]%s.acquire(%s): Deadlock detected: ' \
' I already own this lock:%r', me, self, blocking, owner)
else:
self._note('[%r]%s.acquire(%s): debug lock triggered: %r',
me, self, blocking, owner)
self._note('Owner traceback:\n%s', owner.formatStack())
self._note('My traceback:\n%s', me.formatStack())
self.waiting.append(me)
try:
return self.lock.acquire(blocking)
finally:
self.owner = me
self.waiting.remove(me)
self._note('[%r]%s.acquire(%s) Lock granted. Waiting: %r',
me, self, blocking, self.waiting)
__enter__ = acquire
def release(self):
me = LockUser()
self._note('[%r]%s.release() Waiting: %r', me, self, self.waiting)
return self.lock.release()
def __exit__(self, t, v, tb):
self.release()
def _locked(self):
raise NotImplementedError
def __repr__(self):
return '<%s@%X>' % (self.__class__.__name__, id(self))
class VerboseRLock(VerboseLockBase):
def __init__(self, verbose=None, debug_lock=False):
super(VerboseRLock, self).__init__(reentrant=True,
debug_lock=debug_lock)
self.lock = threading_RLock()
def _locked(self):
return self.lock._RLock__block.locked()
def _is_owned(self):
return self.lock._is_owned()
class VerboseLock(VerboseLockBase):
def __init__(self, verbose=None, debug_lock=False):
super(VerboseLock, self).__init__(debug_lock=debug_lock)
self.lock = threading_Lock()
def locked(self):
return self.lock.locked()
_locked = locked
class VerboseQueue(Queue_Queue):
def __init__(self, maxsize=0):
if maxsize <= 0:
self.put = self._verbose_put
Queue_Queue.__init__(self, maxsize=maxsize)
def _verbose_note(self, fmt, *args):
sys.stderr.write(fmt % args + '\n')
sys.stderr.flush()
def get(self, block=True, timeout=None):
note = self._verbose_note
me = '[%r]%s.get(block=%r, timeout=%r)' % (LockUser(), self, block, timeout)
note('%s waiting', me)
try:
result = Queue_Queue.get(self, block=block, timeout=timeout)
except Exception, exc:
note('%s got exeption %r', me, exc)
raise
note('%s got item', me)
return result
def _verbose_put(self, item, block=True, timeout=None):
note = self._verbose_note
me = '[%r]%s.put(..., block=%r, timeout=%r)' % (LockUser(), self, block, timeout)
try:
Queue_Queue.put(self, item, block=block, timeout=timeout)
except Exception, exc:
note('%s got exeption %r', me, exc)
raise
note('%s put item', me)
def __repr__(self):
return '<%s@%X>' % (self.__class__.__name__, id(self))
if VERBOSE_LOCKING:
Lock = VerboseLock
RLock = VerboseRLock
Queue = VerboseQueue
else:
Lock = threading_Lock
RLock = threading_RLock
Queue = Queue_Queue
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/logger.py 0000664 0000000 0000000 00000020510 12000764302 0025105 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
# WARNING: Log rotating should not be implemented here.
# SQLite does not access database only by file descriptor,
# and an OperationalError exception would be raised if a log is emitted
# between a rename and a reopen.
# Fortunately, SQLite allow multiple process to access the same DB,
# so an external tool should be able to dump and empty tables.
from collections import deque
from functools import wraps
from logging import getLogger, Formatter, Logger, LogRecord, StreamHandler, \
DEBUG, WARNING
from time import time
from traceback import format_exception
import bz2, inspect, neo, os, signal, sqlite3, threading
# Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE = ( 234360832 # extra memory used
- 16777264 # sum of raw data ('msg' attribute)
) // 187509 # number of records
FMT = ('%(asctime)s %(levelname)-9s %(name)-10s'
' [%(module)14s:%(lineno)3d] \n%(message)s')
class _Formatter(Formatter):
def formatTime(self, record, datefmt=None):
return Formatter.formatTime(self, record,
'%Y-%m-%d %H:%M:%S') + '.%04d' % (record.msecs * 10)
def format(self, record):
lines = iter(Formatter.format(self, record).splitlines())
prefix = lines.next()
return '\n'.join(prefix + line for line in lines)
class PacketRecord(object):
args = None
levelno = DEBUG
__init__ = property(lambda self: self.__dict__.update)
class NEOLogger(Logger):
default_root_handler = StreamHandler()
default_root_handler.setFormatter(_Formatter(FMT))
def __init__(self):
Logger.__init__(self, None)
self.parent = root = getLogger()
if not root.handlers:
root.addHandler(self.default_root_handler)
self.db = None
self._record_queue = deque()
self._record_size = 0
self._async = set()
l = threading.Lock()
self._acquire = l.acquire
release = l.release
def _release():
try:
while self._async:
self._async.pop()(self)
finally:
release()
self._release = _release
self.backlog()
def __async(wrapped):
def wrapper(self):
self._async.add(wrapped)
if self._acquire(0):
self._release()
return wraps(wrapped)(wrapper)
@__async
def flush(self):
if self.db is None:
return
try:
self.db.execute("BEGIN")
for r in self._record_queue:
self._emit(r)
finally:
# Always commit, to not lose any record that we could emit.
self.db.commit()
self._record_queue.clear()
self._record_size = 0
def backlog(self, max_size=1<<24):
self._acquire()
try:
self._max_size = max_size
if max_size is None:
self.flush()
else:
q = self._record_queue
while max_size < self._record_size:
self._record_size -= RECORD_SIZE + len(q.popleft().msg)
finally:
self._release()
def setup(self, filename=None, reset=False):
self._acquire()
try:
from . import protocol as p
global uuid_str
uuid_str = p.uuid_str
if self.db is not None:
self.db.close()
if not filename:
self.db = None
self._record_queue.clear()
self._record_size = 0
return
if filename:
self.db = sqlite3.connect(filename, isolation_level=None,
check_same_thread=False)
q = self.db.execute
if reset:
for t in 'log', 'packet':
q('DROP TABLE IF EXISTS ' + t)
q("""CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date REAL NOT NULL,
name TEXT,
level INTEGER NOT NULL,
pathname TEXT,
lineno INTEGER,
msg TEXT)
""")
q("""CREATE INDEX IF NOT EXISTS _log_i1 ON log(date)""")
q("""CREATE TABLE IF NOT EXISTS packet (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date REAL NOT NULL,
name TEXT,
msg_id INTEGER NOT NULL,
code INTEGER NOT NULL,
peer TEXT NOT NULL,
body BLOB)
""")
q("""CREATE INDEX IF NOT EXISTS _packet_i1 ON packet(date)""")
q("""CREATE TABLE IF NOT EXISTS protocol (
date REAL PRIMARY KEY NOT NULL,
text BLOB NOT NULL)
""")
with open(inspect.getsourcefile(p)) as p:
p = buffer(bz2.compress(p.read()))
for t, in q("SELECT text FROM protocol ORDER BY date DESC"):
if p == t:
break
else:
q("INSERT INTO protocol VALUES (?,?)", (time(), p))
finally:
self._release()
__del__ = setup
def isEnabledFor(self, level):
return True
def _emit(self, r):
if type(r) is PacketRecord:
ip, port = r.addr
peer = '%s %s (%s:%u)' % ('>' if r.outgoing else '<',
uuid_str(r.uuid), ip, port)
self.db.execute("INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)",
(r.created, r._name, r.msg_id, r.code, peer, buffer(r.msg)))
else:
pathname = os.path.relpath(r.pathname, *neo.__path__)
self.db.execute("INSERT INTO log VALUES (NULL,?,?,?,?,?,?)",
(r.created, r._name, r.levelno, pathname, r.lineno, r.msg))
def _queue(self, record):
record._name = self.name and str(self.name)
self._acquire()
try:
if self._max_size is None:
self._emit(record)
else:
self._record_size += RECORD_SIZE + len(record.msg)
q = self._record_queue
q.append(record)
if record.levelno < WARNING:
while self._max_size < self._record_size:
self._record_size -= RECORD_SIZE + len(q.popleft().msg)
else:
self.flush()
finally:
self._release()
def callHandlers(self, record):
if self.db is not None:
record.msg = record.getMessage()
record.args = None
if record.exc_info:
record.msg += '\n' + ''.join(
format_exception(*record.exc_info)).strip()
record.exc_info = None
self._queue(record)
if Logger.isEnabledFor(self, record.levelno):
record.name = self.name or 'NEO'
self.parent.callHandlers(record)
def packet(self, connection, packet, outgoing):
if self.db is not None:
ip, port = connection.getAddress()
self._queue(PacketRecord(
created=time(),
msg_id=packet._id,
code=packet._code,
outgoing=outgoing,
uuid=connection.getUUID(),
addr=connection.getAddress(),
msg=packet._body))
logging = NEOLogger()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/node.py 0000664 0000000 0000000 00000051532 12000764302 0024563 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from time import time
from os.path import exists, getsize
import json
from . import attributeTracker, logging
from .protocol import uuid_str, NodeTypes, NodeStates, ProtocolError
class Node(object):
"""This class represents a node."""
_connection = None
def __init__(self, manager, address=None, uuid=None,
state=NodeStates.UNKNOWN):
self._state = state
self._address = address
self._uuid = uuid
self._manager = manager
self._last_state_change = time()
self._identified = False
manager.add(self)
def notify(self, packet):
assert self.isConnected(), 'Not connected'
self._connection.notify(packet)
def ask(self, packet, *args, **kw):
assert self.isConnected(), 'Not connected'
self._connection.ask(packet, *args, **kw)
def answer(self, packet, msg_id=None):
assert self.isConnected(), 'Not connected'
self._connection.answer(packet, msg_id)
def getLastStateChange(self):
return self._last_state_change
def getState(self):
return self._state
def setState(self, new_state):
if self._state == new_state:
return
if new_state == NodeStates.DOWN:
self._manager.remove(self)
self._state = new_state
else:
old_state = self._state
self._state = new_state
self._manager._updateState(self, old_state)
self._last_state_change = time()
def setAddress(self, address):
if self._address == address:
return
old_address = self._address
self._address = address
self._manager._updateAddress(self, old_address)
def getAddress(self):
return self._address
def setUUID(self, uuid):
if self._uuid == uuid:
return
old_uuid = self._uuid
self._uuid = uuid
self._manager._updateUUID(self, old_uuid)
self._manager._updateIdentified(self)
if self._connection is not None:
self._connection.setUUID(uuid)
def getUUID(self):
return self._uuid
def onConnectionClosed(self):
"""
Callback from node's connection when closed
"""
assert self._connection is not None
del self._connection
self._identified = False
self._manager._updateIdentified(self)
def setConnection(self, connection, force=None):
"""
Define the connection that is currently available to this node.
If there is already a connection set, 'force' must be given:
the new connection replaces the old one if it is true. In any case,
the node must be managed by the same handler for the client and
server parts.
"""
assert connection.getUUID() in (None, self._uuid), connection
connection.setUUID(self._uuid)
conn = self._connection
if conn is None:
self._connection = connection
if connection.isServer():
self.setIdentified()
else:
assert force is not None, \
attributeTracker.whoSet(self, '_connection')
# The test on peer_id is there to protect against buggy nodes.
# XXX: handler comparison does not cover all cases: there may
# be a pending handler change, which won't be detected, or a future
# handler change which is not prevented. Complete implementation
# should allow different handlers for each connection direction,
# with in-packets client/server indicators to decide which handler
# (server-ish or client-ish) to use. There is currently no need for
# the full-fledged functionality, and it is simpler this way.
if not force or conn.getPeerId() is not None or \
type(conn.getHandler()) is not type(connection.getHandler()):
raise ProtocolError("already connected")
def on_closed():
self._connection = connection
assert connection.isServer()
self.setIdentified()
conn.setOnClose(on_closed)
conn.close()
assert not connection.isClosed(), connection
connection.setOnClose(self.onConnectionClosed)
self._manager._updateIdentified(self)
def getConnection(self):
"""
Returns the connection to the node if available
"""
assert self._connection is not None
return self._connection
def isConnected(self, connecting=False):
"""
Returns True is a connection is established with the node
"""
return self._connection is not None and (connecting or
not self._connection.connecting)
def setIdentified(self):
assert self._connection is not None
self._identified = True
def isIdentified(self):
"""
Returns True if identification packets have been exchanged
"""
return self._identified
def __repr__(self):
return '<%s(uuid=%s, address=%s, state=%s, connection=%r) at %x>' % (
self.__class__.__name__,
uuid_str(self._uuid),
self._address,
self._state,
self._connection,
id(self),
)
def isMaster(self):
return False
def isStorage(self):
return False
def isClient(self):
return False
def isAdmin(self):
return False
def isRunning(self):
return self._state == NodeStates.RUNNING
def isUnknown(self):
return self._state == NodeStates.UNKNOWN
def isTemporarilyDown(self):
return self._state == NodeStates.TEMPORARILY_DOWN
def isDown(self):
return self._state == NodeStates.DOWN
def isBroken(self):
return self._state == NodeStates.BROKEN
def isHidden(self):
return self._state == NodeStates.HIDDEN
def isPending(self):
return self._state == NodeStates.PENDING
def setRunning(self):
self.setState(NodeStates.RUNNING)
def setUnknown(self):
self.setState(NodeStates.UNKNOWN)
def setTemporarilyDown(self):
self.setState(NodeStates.TEMPORARILY_DOWN)
def setDown(self):
self.setState(NodeStates.DOWN)
def setBroken(self):
self.setState(NodeStates.BROKEN)
def setHidden(self):
self.setState(NodeStates.HIDDEN)
def setPending(self):
self.setState(NodeStates.PENDING)
def asTuple(self):
""" Returned tuple is intented to be used in procotol encoders """
return (self.getType(), self._address, self._uuid, self._state)
def __gt__(self, node):
# sort per UUID if defined
if self._uuid is not None:
return self._uuid > node._uuid
return self._address > node._address
def getType(self):
try:
return NODE_CLASS_MAPPING[self.__class__]
except KeyError:
raise NotImplementedError
def whoSetState(self):
"""
Debugging method: call this method to know who set the current
state value.
"""
return attributeTracker.whoSet(self, '_state')
attributeTracker.track(Node)
class MasterNode(Node):
"""This class represents a master node."""
def isMaster(self):
return True
class StorageNode(Node):
"""This class represents a storage node."""
def isStorage(self):
return True
class ClientNode(Node):
"""This class represents a client node."""
def isClient(self):
return True
class AdminNode(Node):
"""This class represents an admin node."""
def isAdmin(self):
return True
NODE_TYPE_MAPPING = {
NodeTypes.MASTER: MasterNode,
NodeTypes.STORAGE: StorageNode,
NodeTypes.CLIENT: ClientNode,
NodeTypes.ADMIN: AdminNode,
}
NODE_CLASS_MAPPING = {
StorageNode: NodeTypes.STORAGE,
MasterNode: NodeTypes.MASTER,
ClientNode: NodeTypes.CLIENT,
AdminNode: NodeTypes.ADMIN,
}
class MasterDB(object):
"""
Manages accesses to master's address database.
"""
def __init__(self, path):
self._path = path
try_load = exists(path) and getsize(path)
if try_load:
db = open(path, 'r')
init_set = map(tuple, json.load(db))
else:
db = open(path, 'w+')
init_set = []
self._set = set(init_set)
db.close()
def _save(self):
try:
db = open(self._path, 'w')
except IOError:
logging.warning('failed opening master database at %r '
'for writing, update skipped', self._path)
else:
json.dump(list(self._set), db)
db.close()
def add(self, addr):
self._set.add(addr)
self._save()
def discard(self, addr):
self._set.discard(addr)
self._save()
def __iter__(self):
return iter(self._set)
class NodeManager(object):
"""This class manages node status."""
_master_db = None
# TODO: rework getXXXList() methods, filter first by node type
# - getStorageList(identified=True, connected=True, )
# - getList(...)
def __init__(self, master_db=None):
"""
master_db (string)
Path to a file containing master nodes's addresses. Used to automate
master list updates. If not provided, no automation will happen.
"""
self._node_set = set()
self._address_dict = {}
self._uuid_dict = {}
self._type_dict = {}
self._state_dict = {}
self._identified_dict = {}
if master_db is not None:
self._master_db = db = MasterDB(master_db)
for addr in db:
self.createMaster(address=addr)
close = __init__
def add(self, node):
if node in self._node_set:
logging.warning('adding a known node %r, ignoring', node)
return
assert not node.isDown(), node
self._node_set.add(node)
self._updateAddress(node, None)
self._updateUUID(node, None)
self.__updateSet(self._type_dict, None, node.__class__, node)
self.__updateSet(self._state_dict, None, node.getState(), node)
self._updateIdentified(node)
if node.isMaster() and self._master_db is not None:
self._master_db.add(node.getAddress())
def remove(self, node):
if node not in self._node_set:
logging.warning('removing unknown node %r, ignoring', node)
return
self._node_set.remove(node)
self.__drop(self._address_dict, node.getAddress())
self.__drop(self._uuid_dict, node.getUUID())
self.__dropSet(self._state_dict, node.getState(), node)
self.__dropSet(self._type_dict, node.__class__, node)
uuid = node.getUUID()
if uuid in self._identified_dict:
del self._identified_dict[uuid]
if node.isMaster() and self._master_db is not None:
self._master_db.discard(node.getAddress())
def __drop(self, index_dict, key):
try:
del index_dict[key]
except KeyError:
# a node may have not be indexed by uuid or address, eg.:
# - a master known by address but without UUID
# - a client or admin node that don't have listening address
pass
def __update(self, index_dict, old_key, new_key, node):
""" Update an index from old to new key """
if old_key is not None:
assert index_dict[old_key] is node, '%r is stored as %s, ' \
'moving %r to %s' % (index_dict[old_key], old_key, node,
new_key)
del index_dict[old_key]
if new_key is not None:
assert index_dict.get(new_key, node) is node, 'Adding %r at %r ' \
'would overwrite %r' % (node, new_key, index_dict[new_key])
index_dict[new_key] = node
def _updateIdentified(self, node):
uuid = node.getUUID()
if uuid:
# XXX: It's probably a bug to include connecting nodes but there's
# no API yet to update manager when connection is established.
if node.isConnected(connecting=True):
self._identified_dict[uuid] = node
else:
self._identified_dict.pop(uuid, None)
def _updateAddress(self, node, old_address):
self.__update(self._address_dict, old_address, node.getAddress(), node)
def _updateUUID(self, node, old_uuid):
self.__update(self._uuid_dict, old_uuid, node.getUUID(), node)
def __dropSet(self, set_dict, key, node):
if key in set_dict and node in set_dict[key]:
set_dict[key].remove(node)
def __updateSet(self, set_dict, old_key, new_key, node):
""" Update a set index from old to new key """
if old_key in set_dict:
set_dict[old_key].remove(node)
if new_key is not None:
set_dict.setdefault(new_key, set()).add(node)
def _updateState(self, node, old_state):
assert not node.isDown(), node
self.__updateSet(self._state_dict, old_state, node.getState(), node)
def getList(self, node_filter=None):
if node_filter is None:
return list(self._node_set)
return filter(node_filter, self._node_set)
def getIdentifiedList(self, pool_set=None):
"""
Returns a generator to iterate over identified nodes
pool_set is an iterable of UUIDs allowed
"""
if pool_set is not None:
identified_nodes = self._identified_dict.items()
return [v for k, v in identified_nodes if k in pool_set]
return self._identified_dict.values()
def getConnectedList(self):
"""
Returns a generator to iterate over connected nodes
"""
# TODO: use an index
return [x for x in self._node_set if x.isConnected()]
def __getList(self, index_dict, key):
return index_dict.setdefault(key, set())
def getByStateList(self, state):
""" Get a node list filtered per the node state """
return list(self.__getList(self._state_dict, state))
def __getTypeList(self, type_klass, only_identified=False):
node_set = self.__getList(self._type_dict, type_klass)
if only_identified:
return [x for x in node_set if x.getUUID() in self._identified_dict]
return list(node_set)
def getMasterList(self, only_identified=False):
""" Return a list with master nodes """
return self.__getTypeList(MasterNode, only_identified)
def getStorageList(self, only_identified=False):
""" Return a list with storage nodes """
return self.__getTypeList(StorageNode, only_identified)
def getClientList(self, only_identified=False):
""" Return a list with client nodes """
return self.__getTypeList(ClientNode, only_identified)
def getAdminList(self, only_identified=False):
""" Return a list with admin nodes """
return self.__getTypeList(AdminNode, only_identified)
def getByAddress(self, address):
""" Return the node that match with a given address """
return self._address_dict.get(address, None)
def getByUUID(self, uuid):
""" Return the node that match with a given UUID """
return self._uuid_dict.get(uuid, None)
def hasAddress(self, address):
return address in self._address_dict
def hasUUID(self, uuid):
return uuid in self._uuid_dict
def _createNode(self, klass, address=None, uuid=None, **kw):
by_address = self.getByAddress(address)
by_uuid = self.getByUUID(uuid)
if by_address is None and by_uuid is None:
node = klass(self, address=address, uuid=uuid, **kw)
else:
if by_uuid is None or by_address is by_uuid:
node = by_address
elif by_address is None:
node = by_uuid
else:
raise ValueError('Got different nodes for uuid %s: %r and '
'address %r: %r.' % (uuid_str(uuid), by_uuid, address,
by_address))
if uuid is not None:
node_uuid = node.getUUID()
if node_uuid is None:
node.setUUID(uuid)
elif node_uuid != uuid:
raise ValueError('Expected uuid %s on node %r' % (
uuid_str(uuid), node))
if address is not None:
node_address = node.getAddress()
if node_address is None:
node.setAddress(address)
elif node_address != address:
raise ValueError('Expected address %r on node %r' % (
address, node))
assert node.__class__ is klass, (node.__class__, klass)
return node
def createMaster(self, **kw):
""" Create and register a new master """
return self._createNode(MasterNode, **kw)
def createStorage(self, **kw):
""" Create and register a new storage """
return self._createNode(StorageNode, **kw)
def createClient(self, **kw):
""" Create and register a new client """
return self._createNode(ClientNode, **kw)
def createAdmin(self, **kw):
""" Create and register a new admin """
return self._createNode(AdminNode, **kw)
def _getClassFromNodeType(self, node_type):
klass = NODE_TYPE_MAPPING.get(node_type)
if klass is None:
raise ValueError('Unknown node type : %s' % node_type)
return klass
def createFromNodeType(self, node_type, **kw):
return self._createNode(self._getClassFromNodeType(node_type), **kw)
def init(self):
self._node_set.clear()
self._type_dict.clear()
self._state_dict.clear()
self._uuid_dict.clear()
self._address_dict.clear()
def update(self, node_list):
for node_type, addr, uuid, state in node_list:
# This should be done here (although klass might not be used in this
# iteration), as it raises if type is not valid.
klass = self._getClassFromNodeType(node_type)
# lookup in current table
node_by_uuid = self.getByUUID(uuid)
node_by_addr = self.getByAddress(addr)
node = node_by_uuid or node_by_addr
log_args = node_type, uuid_str(uuid), addr, state
if node is None:
if state == NodeStates.DOWN:
logging.debug('NOT creating node %s %s %s %s', *log_args)
else:
node = self._createNode(klass, address=addr, uuid=uuid,
state=state)
logging.debug('creating node %r', node)
else:
assert isinstance(node, klass), 'node %r is not ' \
'of expected type: %r' % (node, klass)
assert None in (node_by_uuid, node_by_addr) or \
node_by_uuid is node_by_addr, \
'Discrepancy between node_by_uuid (%r) and ' \
'node_by_addr (%r)' % (node_by_uuid, node_by_addr)
if state == NodeStates.DOWN:
logging.debug('droping node %r (%r), found with %s '
'%s %s %s', node, node.isConnected(), *log_args)
if node.isConnected():
# cut this connection, node removed by handler
node.getConnection().close()
self.remove(node)
else:
logging.debug('updating node %r to %s %s %s %s',
node, *log_args)
node.setUUID(uuid)
node.setAddress(addr)
node.setState(state)
self.log()
def log(self):
logging.info('Node manager : %u nodes', len(self._node_set))
node_list = [(node, uuid_str(node.getUUID()))
for node in sorted(self._node_set)]
max_len = max(len(x[1]) for x in node_list)
for node, uuid in node_list:
address = node.getAddress() or ''
if address:
address = '%s:%d' % address
logging.info(' * %*s | %8s | %22s | %s',
max_len, uuid, node.getType(), address, node.getState())
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/profiling.py 0000664 0000000 0000000 00000002333 12000764302 0025622 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
Profiling is done with tiny-profiler, a very simple profiler.
It is different from python's built-in profilers in that it requires
developpers to explicitely put probes on specific methods, reducing:
- profiling overhead
- undesired result entries
You can get this profiler at:
https://svn.erp5.org/repos/public/erp5/trunk/utils/tiny_profiler
"""
PROFILING_ENABLED = False
if PROFILING_ENABLED:
from tiny_profiler import profiler_decorator, profiler_report
else:
def profiler_decorator(func):
return func
def profiler_report():
pass
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/protocol.py 0000664 0000000 0000000 00000132662 12000764302 0025503 0 ustar 00root root 0000000 0000000
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import socket
import sys
import traceback
from socket import inet_ntoa, inet_aton
from cStringIO import StringIO
from struct import Struct
try:
from .util import getAddressType
except ImportError:
pass
# The protocol version (major, minor).
PROTOCOL_VERSION = (10, 1)
# Size restrictions.
MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x4000000
PACKET_HEADER_FORMAT = Struct('!LHL')
# Check that header size is the expected value.
# If it is not, it means that struct module result is incompatible with
# "reference" platform (python 2.4 on x86-64).
assert PACKET_HEADER_FORMAT.size == 10, \
'Unsupported platform, packet header length = %i' % \
(PACKET_HEADER_FORMAT.size, )
RESPONSE_MASK = 0x8000
class Enum(tuple):
class Item(int):
__slots__ = '_name', '_enum'
def __str__(self):
return self._name
def __repr__(self):
return "" % (self._name, self)
def __eq__(self, other):
if type(other) is self.__class__:
assert other._enum is self._enum
return other is self
return other == int(self)
def __new__(cls, func):
names = func.func_code.co_names
self = tuple.__new__(cls, map(cls.Item, xrange(len(names))))
self._name = func.__name__
for item, name in zip(self, names):
setattr(self, name, item)
item._name = name
item._enum = self
return self
def __repr__(self):
return "" % self._name
@Enum
def ErrorCodes():
ACK
NOT_READY
OID_NOT_FOUND
TID_NOT_FOUND
OID_DOES_NOT_EXIST
PROTOCOL_ERROR
BROKEN_NODE
ALREADY_PENDING
REPLICATION_ERROR
CHECKING_ERROR
@Enum
def ClusterStates():
RECOVERING
VERIFYING
RUNNING
STOPPING
STARTING_BACKUP
BACKINGUP
STOPPING_BACKUP
@Enum
def NodeTypes():
MASTER
STORAGE
CLIENT
ADMIN
@Enum
def NodeStates():
RUNNING
TEMPORARILY_DOWN
DOWN
BROKEN
HIDDEN
PENDING
UNKNOWN
@Enum
def CellStates():
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Write-only cell. Last transactions are missing because storage is/was down
# for a while, or because it is new for the partition. It usually becomes
# UP_TO_DATE when replication is done.
OUT_OF_DATE
# Same as UP_TO_DATE, except that it will be discarded as soon as another
# node finishes to replicate it. It means a partition is moved from 1 node
# to another.
FEEDING
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
# A check revealed that data differs from other replicas. Cell is neither
# readable nor writable.
CORRUPTED
@Enum
def LockState():
NOT_LOCKED
GRANTED
GRANTED_TO_OTHER
# used for logging
node_state_prefix_dict = {
NodeStates.RUNNING: 'R',
NodeStates.TEMPORARILY_DOWN: 'T',
NodeStates.DOWN: 'D',
NodeStates.BROKEN: 'B',
NodeStates.HIDDEN: 'H',
NodeStates.PENDING: 'P',
NodeStates.UNKNOWN: 'U',
}
# used for logging
cell_state_prefix_dict = {
CellStates.UP_TO_DATE: 'U',
CellStates.OUT_OF_DATE: 'O',
CellStates.FEEDING: 'F',
CellStates.DISCARDED: 'D',
CellStates.CORRUPTED: 'C',
}
# Other constants.
INVALID_UUID = 0
INVALID_TID = '\xff' * 8
INVALID_OID = '\xff' * 8
INVALID_PARTITION = 0xffffffff
INVALID_ADDRESS_TYPE = socket.AF_UNSPEC
ZERO_HASH = '\0' * 20
ZERO_TID = '\0' * 8
ZERO_OID = '\0' * 8
OID_LEN = len(INVALID_OID)
TID_LEN = len(INVALID_TID)
MAX_TID = '\x7f' + '\xff' * 7 # SQLite does not accept numbers above 2^63-1
# High-order byte:
# 7 6 5 4 3 2 1 0
# | | | | +-+-+-+-- reserved (0)
# | +-+-+---------- node type
# +---------------- temporary if negative
# UUID namespaces are required to prevent conflicts when the master generate
# new uuid before it knows uuid of existing storage nodes. So only the high
# order bit is really important and the 31 other bits could be random.
# Extra namespace information and non-randomness of 3 LOB help to read logs.
UUID_NAMESPACES = {
NodeTypes.STORAGE: 0x00,
NodeTypes.MASTER: -0x10,
NodeTypes.CLIENT: -0x20,
NodeTypes.ADMIN: -0x30,
}
uuid_str = (lambda ns: lambda uuid:
ns[uuid >> 24] + str(uuid & 0xffffff) if uuid else str(uuid)
)(dict((v, str(k)[0]) for k, v in UUID_NAMESPACES.iteritems()))
class ProtocolError(Exception):
""" Base class for protocol errors, close the connection """
pass
class PacketMalformedError(ProtocolError):
""" Close the connection and set the node as broken"""
pass
class UnexpectedPacketError(ProtocolError):
""" Close the connection and set the node as broken"""
pass
class NotReadyError(ProtocolError):
""" Just close the connection """
pass
class BrokenNodeDisallowedError(ProtocolError):
""" Just close the connection """
pass
class Packet(object):
"""
Base class for any packet definition. The _fmt class attribute must be
defined for any non-empty packet.
"""
_ignore_when_closed = False
_request = None
_answer = None
_body = None
_code = None
_fmt = None
_id = None
def __init__(self, *args, **kw):
assert self._code is not None, "Packet class not registered"
if args or kw:
args = list(args)
buf = StringIO()
# load named arguments
for item in self._fmt._items[len(args):]:
args.append(kw.get(item._name))
self._fmt.encode(buf.write, args)
self._body = buf.getvalue()
else:
self._body = ''
def decode(self):
assert self._body is not None
if self._fmt is None:
return ()
buf = StringIO(self._body)
try:
return self._fmt.decode(buf.read)
except ParseError, msg:
name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
def setContent(self, msg_id, body):
""" Register the packet content for future decoding """
self._id = msg_id
self._body = body
def setId(self, value):
self._id = value
def getId(self):
assert self._id is not None, "No identifier applied on the packet"
return self._id
def encode(self):
""" Encode a packet as a string to send it over the network """
content = self._body
length = PACKET_HEADER_FORMAT.size + len(content)
return (PACKET_HEADER_FORMAT.pack(self._id, self._code, length), content)
def __len__(self):
return PACKET_HEADER_FORMAT.size + len(self._body)
def __repr__(self):
return '%s[%r]' % (self.__class__.__name__, self._id)
def __eq__(self, other):
""" Compare packets with their code instead of content """
if other is None:
return False
assert isinstance(other, Packet)
return self._code == other._code
def isError(self):
return isinstance(self, Error)
def isResponse(self):
return self._code & RESPONSE_MASK == RESPONSE_MASK
def getAnswerClass(self):
return self._answer
def ignoreOnClosedConnection(self):
"""
Tells if this packet must be ignored when its connection is closed
when it is handled.
"""
return self._ignore_when_closed
class ParseError(Exception):
"""
An exception that encapsulate another and build the 'path' of the
packet item that generate the error.
"""
def __init__(self, item, trace):
Exception.__init__(self)
self._trace = trace
self._items = [item]
def append(self, item):
self._items.append(item)
def __repr__(self):
chain = '/'.join([item.getName() for item in reversed(self._items)])
return 'at %s:\n%s' % (chain, self._trace)
__str__ = __repr__
# packet parsers
class PItem(object):
"""
Base class for any packet item, _encode and _decode must be overriden
by subclasses.
"""
def __init__(self, name):
self._name = name
def __repr__(self):
return self.__class__.__name__
def getName(self):
return self._name
def _trace(self, method, *args):
try:
return method(*args)
except ParseError, e:
# trace and forward exception
e.append(self)
raise
except Exception:
# original exception, encapsulate it
trace = ''.join(traceback.format_exception(*sys.exc_info())[2:])
raise ParseError(self, trace)
def encode(self, writer, items):
return self._trace(self._encode, writer, items)
def decode(self, reader):
return self._trace(self._decode, reader)
def _encode(self, writer, items):
raise NotImplementedError, self.__class__.__name__
def _decode(self, reader):
raise NotImplementedError, self.__class__.__name__
class PStruct(PItem):
"""
Aggregate other items
"""
def __init__(self, name, *items):
PItem.__init__(self, name)
self._items = items
def _encode(self, writer, items):
assert len(self._items) == len(items), (items, self._items)
for item, value in zip(self._items, items):
item.encode(writer, value)
def _decode(self, reader):
return tuple([item.decode(reader) for item in self._items])
class PStructItem(PItem):
"""
A single value encoded with struct
"""
def __init__(self, name, fmt):
PItem.__init__(self, name)
struct = Struct(fmt)
self.pack = struct.pack
self.unpack = struct.unpack
self.size = struct.size
def _encode(self, writer, value):
writer(self.pack(value))
def _decode(self, reader):
return self.unpack(reader(self.size))[0]
class PList(PStructItem):
"""
A list of homogeneous items
"""
def __init__(self, name, item):
PStructItem.__init__(self, name, '!L')
self._item = item
def _encode(self, writer, items):
writer(self.pack(len(items)))
item = self._item
for value in items:
item.encode(writer, value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
item = self._item
return [item.decode(reader) for _ in xrange(length)]
class PDict(PStructItem):
"""
A dictionary with custom key and value formats
"""
def __init__(self, name, key, value):
PStructItem.__init__(self, name, '!L')
self._key = key
self._value = value
def _encode(self, writer, item):
assert isinstance(item , dict), (type(item), item)
writer(self.pack(len(item)))
key, value = self._key, self._value
for k, v in item.iteritems():
key.encode(writer, k)
value.encode(writer, v)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
key, value = self._key, self._value
new_dict = {}
for _ in xrange(length):
k = key.decode(reader)
v = value.decode(reader)
new_dict[k] = v
return new_dict
class PEnum(PStructItem):
"""
Encapsulate an enumeration value
"""
def __init__(self, name, enum):
PStructItem.__init__(self, name, '!l')
self._enum = enum
def _encode(self, writer, item):
if item is None:
item = -1
else:
assert isinstance(item, int), item
writer(self.pack(item))
def _decode(self, reader):
code = self.unpack(reader(self.size))[0]
if code == -1:
return None
try:
return self._enum[code]
except KeyError:
enum = self._enum.__class__.__name__
raise ValueError, 'Invalid code for %s enum: %r' % (enum, code)
class PAddressIPGeneric(PStructItem):
def __init__(self, name, format):
PStructItem.__init__(self, name, format)
def encode(self, writer, address):
host, port = address
host = socket.inet_pton(self.af_type, host)
writer(self.pack(host, port))
def decode(self, reader):
data = reader(self.size)
address = self.unpack(data)
host, port = address
host = socket.inet_ntop(self.af_type, host)
return (host, port)
class PAddressIPv4(PAddressIPGeneric):
af_type = socket.AF_INET
def __init__(self, name):
PAddressIPGeneric.__init__(self, name, '!4sH')
class PAddressIPv6(PAddressIPGeneric):
af_type = socket.AF_INET6
def __init__(self, name):
PAddressIPGeneric.__init__(self, name, '!16sH')
class PAddress(PStructItem):
"""
An host address (IPv4/IPv6)
"""
address_format_dict = {
socket.AF_INET: PAddressIPv4('ipv4'),
socket.AF_INET6: PAddressIPv6('ipv6'),
}
def __init__(self, name):
PStructItem.__init__(self, name, '!L')
def _encode(self, writer, address):
if address is None:
writer(self.pack(INVALID_ADDRESS_TYPE))
return
af_type = getAddressType(address)
writer(self.pack(af_type))
encoder = self.address_format_dict[af_type]
encoder.encode(writer, address)
def _decode(self, reader):
af_type = self.unpack(reader(self.size))[0]
if af_type == INVALID_ADDRESS_TYPE:
return None
decoder = self.address_format_dict[af_type]
host, port = decoder.decode(reader)
return (host, port)
class PString(PStructItem):
"""
A variable-length string
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!L')
def _encode(self, writer, value):
writer(self.pack(len(value)))
writer(value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
return reader(length)
class PBoolean(PStructItem):
"""
A boolean value, encoded as a single byte
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!B')
def _encode(self, writer, value):
writer(self.pack(bool(value)))
def _decode(self, reader):
return bool(self.unpack(reader(self.size))[0])
class PNumber(PStructItem):
"""
A integer number (4-bytes length)
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!L')
class PIndex(PStructItem):
"""
A big integer to defined indexes in a huge list.
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!Q')
class PPTID(PStructItem):
"""
A None value means an invalid PTID
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!Q')
def _encode(self, writer, value):
if value is None:
value = 0
PStructItem._encode(self, writer, value)
def _decode(self, reader):
value = PStructItem._decode(self, reader)
if value == 0:
value = None
return value
class PProtocol(PStructItem):
"""
The protocol version definition
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!LL')
def _encode(self, writer, version):
writer(self.pack(*version))
def _decode(self, reader):
major, minor = self.unpack(reader(self.size))
if (major, minor) != PROTOCOL_VERSION:
raise ProtocolError('protocol version mismatch')
return (major, minor)
class PChecksum(PItem):
"""
A hash (SHA1)
"""
def _encode(self, writer, checksum):
assert len(checksum) == 20, (len(checksum), checksum)
writer(checksum)
def _decode(self, reader):
return reader(20)
class PUUID(PStructItem):
"""
An UUID (node identifier, 4-bytes signed integer)
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!l')
def _encode(self, writer, uuid):
writer(self.pack(uuid or 0))
def _decode(self, reader):
return self.unpack(reader(self.size))[0] or None
class PTID(PItem):
"""
A transaction identifier
"""
def _encode(self, writer, tid):
if tid is None:
tid = INVALID_TID
assert len(tid) == 8, (len(tid), tid)
writer(tid)
def _decode(self, reader):
tid = reader(8)
if tid == INVALID_TID:
tid = None
return tid
# same definition, for now
POID = PTID
# common definitions
PFEmpty = PStruct('no_content')
PFNodeType = PEnum('type', NodeTypes)
PFNodeState = PEnum('state', NodeStates)
PFCellState = PEnum('state', CellStates)
PFNodeList = PList('node_list',
PStruct('node',
PFNodeType,
PAddress('address'),
PUUID('uuid'),
PFNodeState,
),
)
PFCellList = PList('cell_list',
PStruct('cell',
PUUID('uuid'),
PFCellState,
),
)
PFRowList = PList('row_list',
PStruct('row',
PNumber('offset'),
PFCellList,
),
)
PFHistoryList = PList('history_list',
PStruct('history_entry',
PTID('serial'),
PNumber('size'),
),
)
PFUUIDList = PList('uuid_list',
PUUID('uuid'),
)
PFTidList = PList('tid_list',
PTID('tid'),
)
PFOidList = PList('oid_list',
POID('oid'),
)
# packets definition
class Notify(Packet):
"""
General purpose notification (remote logging)
"""
_fmt = PStruct('notify',
PString('message'),
)
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
usually. Any -> Any.
"""
_fmt = PStruct('error',
PNumber('code'),
PString('message'),
)
class Ping(Packet):
"""
Check if a peer is still alive. Any -> Any.
"""
_answer = PFEmpty
class CloseClient(Packet):
"""
Tell peer it can close the connection if it has finished with us. Any -> Any
"""
class RequestIdentification(Packet):
"""
Request a node identification. This must be the first packet for any
connection. Any -> Any.
"""
_fmt = PStruct('request_identification',
PProtocol('protocol_version'),
PFNodeType,
PUUID('uuid'),
PAddress('address'),
PString('name'),
)
_answer = PStruct('accept_identification',
PFNodeType,
PUUID('my_uuid'),
PNumber('num_partitions'),
PNumber('num_replicas'),
PUUID('your_uuid'),
PAddress('primary'),
PList('known_master_list',
PStruct('master',
PAddress('address'),
PUUID('uuid'),
),
),
)
def __init__(self, *args, **kw):
if args or kw:
# always announce current protocol version
args = list(args)
args.insert(0, PROTOCOL_VERSION)
super(RequestIdentification, self).__init__(*args, **kw)
def decode(self):
return super(RequestIdentification, self).decode()[1:]
class PrimaryMaster(Packet):
"""
Ask current primary master's uuid. CTL -> A.
"""
_answer = PStruct('answer_primary',
PUUID('primary_uuid'),
)
class AnnouncePrimary(Packet):
"""
Announce a primary master node election. PM -> SM.
"""
class ReelectPrimary(Packet):
"""
Force a re-election of a primary master node. M -> M.
"""
class LastIDs(Packet):
"""
Ask the last OID, the last TID and the last Partition Table ID that
a storage node stores. Used to recover information. PM -> S, S -> PM.
Reply to Ask Last IDs. S -> PM, PM -> S.
"""
_answer = PStruct('answer_last_ids',
POID('last_oid'),
PTID('last_tid'),
PPTID('last_ptid'),
PTID('backup_tid'),
)
class PartitionTable(Packet):
"""
Ask the full partition table. PM -> S.
Answer rows in a partition table. S -> PM.
"""
_answer = PStruct('answer_partition_table',
PPTID('ptid'),
PFRowList,
)
class NotifyPartitionTable(Packet):
"""
Send rows in a partition table to update other nodes. PM -> S, C.
"""
_fmt = PStruct('send_partition_table',
PPTID('ptid'),
PFRowList,
)
class PartitionChanges(Packet):
"""
Notify a subset of a partition table. This is used to notify changes.
PM -> S, C.
"""
_fmt = PStruct('notify_partition_changes',
PPTID('ptid'),
PList('cell_list',
PStruct('cell',
PNumber('offset'),
PUUID('uuid'),
PFCellState,
),
),
)
class StartOperation(Packet):
"""
Tell a storage nodes to start an operation. Until a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
class StopOperation(Packet):
"""
Tell a storage node to stop an operation. Once a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
class UnfinishedTransactions(Packet):
"""
Ask unfinished transactions PM -> S.
Answer unfinished transactions S -> PM.
"""
_answer = PStruct('answer_unfinished_transactions',
PTID('max_tid'),
PList('tid_list',
PTID('unfinished_tid'),
),
)
class ObjectPresent(Packet):
"""
Ask if an object is present. If not present, OID_NOT_FOUND should be
returned. PM -> S.
Answer that an object is present. PM -> S.
"""
_fmt = PStruct('object_present',
POID('oid'),
PTID('tid'),
)
_answer = PStruct('object_present',
POID('oid'),
PTID('tid'),
)
class DeleteTransaction(Packet):
"""
Delete a transaction. PM -> S.
"""
_fmt = PStruct('delete_transaction',
PTID('tid'),
PFOidList,
)
class CommitTransaction(Packet):
"""
Commit a transaction. PM -> S.
"""
_fmt = PStruct('commit_transaction',
PTID('tid'),
)
class BeginTransaction(Packet):
"""
Ask to begin a new transaction. C -> PM.
Answer when a transaction begin, give a TID if necessary. PM -> C.
"""
_fmt = PStruct('ask_begin_transaction',
PTID('tid'),
)
_answer = PStruct('answer_begin_transaction',
PTID('tid'),
)
class FinishTransaction(Packet):
"""
Finish a transaction. C -> PM.
Answer when a transaction is finished. PM -> C.
"""
_fmt = PStruct('ask_finish_transaction',
PTID('tid'),
PFOidList,
)
_answer = PStruct('answer_information_locked',
PTID('ttid'),
PTID('tid'),
)
class NotifyTransactionFinished(Packet):
"""
Notify that a transaction blocking a replication is now finished
M -> S
"""
_fmt = PStruct('notify_transaction_finished',
PTID('ttid'),
PTID('max_tid'),
)
class LockInformation(Packet):
"""
Lock information on a transaction. PM -> S.
Notify information on a transaction locked. S -> PM.
"""
_fmt = PStruct('ask_lock_informations',
PTID('ttid'),
PTID('tid'),
PFOidList,
)
_answer = PStruct('answer_information_locked',
PTID('tid'),
)
class InvalidateObjects(Packet):
"""
Invalidate objects. PM -> C.
"""
_fmt = PStruct('ask_finish_transaction',
PTID('tid'),
PFOidList,
)
class UnlockInformation(Packet):
"""
Unlock information on a transaction. PM -> S.
"""
_fmt = PStruct('notify_unlock_information',
PTID('tid'),
)
class GenerateOIDs(Packet):
"""
Ask new object IDs. C -> PM.
Answer new object IDs. PM -> C.
"""
_fmt = PStruct('ask_new_oids',
PNumber('num_oids'),
)
_answer = PStruct('answer_new_oids',
PFOidList,
)
class StoreObject(Packet):
"""
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
Answer if an object has been stored. If an object is in conflict,
a serial of the conflicting transaction is returned. In this case,
if this serial is newer than the current transaction ID, a client
node must not try to resolve the conflict. S -> C.
"""
_fmt = PStruct('ask_store_object',
POID('oid'),
PTID('serial'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
PTID('tid'),
PBoolean('unlock'),
)
_answer = PStruct('answer_store_object',
PBoolean('conflicting'),
POID('oid'),
PTID('serial'),
)
class AbortTransaction(Packet):
"""
Abort a transaction. C -> S, PM.
"""
_fmt = PStruct('abort_transaction',
PTID('tid'),
)
class StoreTransaction(Packet):
"""
Ask to store a transaction. C -> S.
Answer if transaction has been stored. S -> C.
"""
_fmt = PStruct('ask_store_transaction',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PFOidList,
)
_answer = PStruct('answer_store_transaction',
PTID('tid'),
)
class GetObject(Packet):
"""
Ask a stored object by its OID and a serial or a TID if given. If a serial
is specified, the specified revision of an object will be returned. If
a TID is specified, an object right before the TID will be returned. C -> S.
Answer the requested object. S -> C.
"""
_fmt = PStruct('ask_object',
POID('oid'),
PTID('serial'),
PTID('tid'),
)
_answer = PStruct('answer_object',
POID('oid'),
PTID('serial_start'),
PTID('serial_end'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
)
class TIDList(Packet):
"""
Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). C -> S.
Answer the requested TIDs. S -> C.
"""
_fmt = PStruct('ask_tids',
PIndex('first'),
PIndex('last'),
PNumber('partition'),
)
_answer = PStruct('answer_tids',
PFTidList,
)
class TIDListFrom(Packet):
"""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
C -> S.
Answer the requested TIDs. S -> C
"""
_fmt = PStruct('tid_list_from',
PTID('min_tid'),
PTID('max_tid'),
PNumber('length'),
PNumber('partition'),
)
_answer = PStruct('answer_tids',
PFTidList,
)
class TransactionInformation(Packet):
"""
Ask information about a transaction. Any -> S.
Answer information (user, description) about a transaction. S -> Any.
"""
_fmt = PStruct('ask_transaction_information',
PTID('tid'),
)
_answer = PStruct('answer_transaction_information',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PFOidList,
)
class ObjectHistory(Packet):
"""
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C -> S.
Answer history information (serial, size) for an object. S -> C.
"""
_fmt = PStruct('ask_object_history',
POID('oid'),
PIndex('first'),
PIndex('last'),
)
_answer = PStruct('answer_object_history',
POID('oid'),
PFHistoryList,
)
class PartitionList(Packet):
"""
All the following messages are for neoctl to admin node
Ask information about partition
Answer information about partition
"""
_fmt = PStruct('ask_partition_list',
PNumber('min_offset'),
PNumber('max_offset'),
PUUID('uuid'),
)
_answer = PStruct('answer_partition_list',
PPTID('ptid'),
PFRowList,
)
class NodeList(Packet):
"""
Ask information about nodes
Answer information about nodes
"""
_fmt = PStruct('ask_node_list',
PFNodeType,
)
_answer = PStruct('answer_node_list',
PFNodeList,
)
class SetNodeState(Packet):
"""
Set the node state
"""
_fmt = PStruct('set_node_state',
PUUID('uuid'),
PFNodeState,
PBoolean('modify_partition_table'),
)
_answer = Error
class AddPendingNodes(Packet):
"""
Ask the primary to include some pending node in the partition table
"""
_fmt = PStruct('add_pending_nodes',
PFUUIDList,
)
_answer = Error
class NotifyNodeInformation(Packet):
"""
Notify information about one or more nodes. PM -> Any.
"""
_fmt = PStruct('notify_node_informations',
PFNodeList,
)
class NodeInformation(Packet):
"""
Ask node information
"""
_answer = PFEmpty
class SetClusterState(Packet):
"""
Set the cluster state
"""
_fmt = PStruct('set_cluster_state',
PEnum('state', ClusterStates),
)
_answer = Error
class ClusterInformation(Packet):
"""
Notify information about the cluster
"""
_fmt = PStruct('notify_cluster_information',
PEnum('state', ClusterStates),
)
class ClusterState(Packet):
"""
Ask state of the cluster
Answer state of the cluster
"""
_answer = PStruct('answer_cluster_state',
PEnum('state', ClusterStates),
)
class NotifyLastOID(Packet):
"""
Notify last OID generated
"""
_fmt = PStruct('notify_last_oid',
POID('last_oid'),
)
class ObjectUndoSerial(Packet):
"""
Ask storage the serial where object data is when undoing given transaction,
for a list of OIDs.
C -> S
Answer serials at which object data is when undoing a given transaction.
object_tid_dict has the following format:
key: oid
value: 3-tuple
current_serial (TID)
The latest serial visible to the undoing transaction.
undo_serial (TID)
Where undone data is (tid at which data is before given undo).
is_current (bool)
If current_serial's data is current on storage.
S -> C
"""
_fmt = PStruct('ask_undo_transaction',
PTID('tid'),
PTID('ltid'),
PTID('undone_tid'),
PFOidList,
)
_answer = PStruct('answer_undo_transaction',
PDict('object_tid_dict',
POID('oid'),
PStruct('object_tid_value',
PTID('current_serial'),
PTID('undo_serial'),
PBoolean('is_current'),
),
),
)
class HasLock(Packet):
"""
Ask a storage is oid is locked by another transaction.
C -> S
Answer whether a transaction holds the write lock for requested object.
"""
_fmt = PStruct('has_load_lock',
PTID('tid'),
POID('oid'),
)
_answer = PStruct('answer_has_lock',
POID('oid'),
PEnum('lock_state', LockState),
)
class CheckCurrentSerial(Packet):
"""
Verifies if given serial is current for object oid in the database, and
take a write lock on it (so that this state is not altered until
transaction ends).
Answer to AskCheckCurrentSerial.
Same structure as AnswerStoreObject, to handle the same way, except there
is nothing to invalidate in any client's cache.
"""
_fmt = PStruct('ask_check_current_serial',
PTID('tid'),
PTID('serial'),
POID('oid'),
)
_answer = PStruct('answer_store_object',
PBoolean('conflicting'),
POID('oid'),
PTID('serial'),
)
class Pack(Packet):
"""
Request a pack at given TID.
C -> M
M -> S
Inform that packing it over.
S -> M
M -> C
"""
_fmt = PStruct('ask_pack',
PTID('tid'),
)
_answer = PStruct('answer_pack',
PBoolean('status'),
)
class CheckReplicas(Packet):
"""
ctl -> A
A -> M
"""
_fmt = PStruct('check_replicas',
PDict('partition_dict',
PNumber('partition'),
PUUID('source'),
),
PTID('min_tid'),
PTID('max_tid'),
)
_answer = Error
class CheckPartition(Packet):
"""
M -> S
"""
_fmt = PStruct('check_partition',
PNumber('partition'),
PStruct('source',
PString('upstream_name'),
PAddress('address'),
),
PTID('min_tid'),
PTID('max_tid'),
)
class CheckTIDRange(Packet):
"""
Ask some stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
S -> S
Stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
_fmt = PStruct('ask_check_tid_range',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
)
_answer = PStruct('answer_check_tid_range',
PNumber('count'),
PChecksum('checksum'),
PTID('max_tid'),
)
class CheckSerialRange(Packet):
"""
Ask some stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
S -> S
Stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
_fmt = PStruct('ask_check_serial_range',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
POID('min_oid'),
)
_answer = PStruct('answer_check_serial_range',
PNumber('count'),
PChecksum('tid_checksum'),
PTID('max_tid'),
PChecksum('oid_checksum'),
POID('max_oid'),
)
class PartitionCorrupted(Packet):
"""
S -> M
"""
_fmt = PStruct('partition_corrupted',
PNumber('partition'),
PList('cell_list',
PUUID('uuid'),
),
)
class LastTransaction(Packet):
"""
Ask last committed TID.
C -> M
Answer last committed TID.
M -> C
"""
_answer = PStruct('answer_last_transaction',
PTID('tid'),
)
class NotifyReady(Packet):
"""
Notify that node is ready to serve requests.
S -> M
"""
pass
# replication
class FetchTransactions(Packet):
"""
S -> S
"""
_fmt = PStruct('ask_transaction_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
PFTidList, # already known transactions
)
_answer = PStruct('answer_transaction_list',
PTID('pack_tid'),
PTID('next_tid'),
PFTidList, # transactions to delete
)
class AddTransaction(Packet):
"""
S -> S
"""
_fmt = PStruct('add_transaction',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PTID('ttid'),
PFOidList,
)
class FetchObjects(Packet):
"""
S -> S
"""
_fmt = PStruct('ask_object_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
POID('min_oid'),
PDict('object_dict', # already known objects
PTID('serial'),
PFOidList,
),
)
_answer = PStruct('answer_object_list',
PTID('pack_tid'),
PTID('next_tid'),
POID('next_oid'),
PDict('object_dict', # objects to delete
PTID('serial'),
PFOidList,
),
)
class AddObject(Packet):
"""
S -> S
"""
_fmt = PStruct('add_object',
POID('oid'),
PTID('serial'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
)
class Replicate(Packet):
"""
M -> S
"""
_fmt = PStruct('replicate',
PTID('tid'),
PString('upstream_name'),
PDict('source_dict',
PNumber('partition'),
PAddress('address'),
)
)
class ReplicationDone(Packet):
"""
Notify the master node that a partition has been successully replicated from
a storage to another.
S -> M
"""
_fmt = PStruct('notify_replication_done',
PNumber('offset'),
PTID('tid'),
)
class Truncate(Packet):
"""
M -> S
"""
_fmt = PStruct('ask_truncate',
PTID('tid'),
)
_answer = PFEmpty
StaticRegistry = {}
def register(request, ignore_when_closed=None):
""" Register a packet in the packet registry """
code = len(StaticRegistry)
if request is Error:
code |= RESPONSE_MASK
# register the request
StaticRegistry[code] = request
if request is None:
return # None registered only to skip a code number (for compatibility)
request._code = code
answer = request._answer
if ignore_when_closed is None:
# By default, on a closed connection:
# - request: ignore
# - answer: keep
# - nofitication: keep
ignore_when_closed = answer is not None
request._ignore_when_closed = ignore_when_closed
if answer in (Error, None):
return request
# build a class for the answer
answer = type('Answer%s' % (request.__name__, ), (Packet, ), {})
answer._fmt = request._answer
# compute the answer code
code = code | RESPONSE_MASK
answer._request = request
assert answer._code is None, "Answer of %s is already used" % (request, )
answer._code = code
request._answer = answer
# and register the answer packet
assert code not in StaticRegistry, "Duplicate response packet code"
StaticRegistry[code] = answer
return (request, answer)
class ParserState(object):
"""
Parser internal state.
To be considered opaque datatype outside of PacketRegistry.parse .
"""
payload = None
def set(self, payload):
self.payload = payload
def get(self):
return self.payload
def clear(self):
self.payload = None
class Packets(dict):
"""
Packet registry that check packet code unicity and provide an index
"""
def __metaclass__(name, base, d):
for k, v in d.iteritems():
if isinstance(v, type) and issubclass(v, Packet):
v.handler_method_name = k[0].lower() + k[1:]
# this builds a "singleton"
return type('PacketRegistry', base, d)(StaticRegistry)
def parse(self, buf, state_container):
state = state_container.get()
if state is None:
header = buf.read(PACKET_HEADER_FORMAT.size)
if header is None:
return None
msg_id, msg_type, msg_len = PACKET_HEADER_FORMAT.unpack(header)
try:
packet_klass = self[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
if msg_len < MIN_PACKET_SIZE:
raise PacketMalformedError('message too small (%d)' % msg_len)
msg_len -= PACKET_HEADER_FORMAT.size
else:
msg_id, packet_klass, msg_len = state
data = buf.read(msg_len)
if data is None:
# Not enough.
if state is None:
state_container.set((msg_id, packet_klass, msg_len))
return None
if state:
state_container.clear()
packet = packet_klass()
packet.setContent(msg_id, data)
return packet
# notifications
Error = register(
Error)
RequestIdentification, AcceptIdentification = register(
RequestIdentification)
# Code of RequestIdentification packet must never change so that 2
# incompatible nodes can reject themselves gracefully (i.e. comparing
# protocol versions) instead of raising PacketMalformedError.
assert RequestIdentification._code == 1
Ping, Pong = register(
Ping)
CloseClient = register(
CloseClient)
Notify = register(
Notify)
AskPrimary, AnswerPrimary = register(
PrimaryMaster)
AnnouncePrimary = register(
AnnouncePrimary)
ReelectPrimary = register(
ReelectPrimary)
NotifyNodeInformation = register(
NotifyNodeInformation)
AskLastIDs, AnswerLastIDs = register(
LastIDs)
AskPartitionTable, AnswerPartitionTable = register(
PartitionTable)
SendPartitionTable = register(
NotifyPartitionTable)
NotifyPartitionChanges = register(
PartitionChanges)
StartOperation = register(
StartOperation)
StopOperation = register(
StopOperation)
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
UnfinishedTransactions)
AskObjectPresent, AnswerObjectPresent = register(
ObjectPresent)
DeleteTransaction = register(
DeleteTransaction)
CommitTransaction = register(
CommitTransaction)
AskBeginTransaction, AnswerBeginTransaction = register(
BeginTransaction)
AskFinishTransaction, AnswerTransactionFinished = register(
FinishTransaction, ignore_when_closed=False)
AskLockInformation, AnswerInformationLocked = register(
LockInformation, ignore_when_closed=False)
InvalidateObjects = register(
InvalidateObjects)
NotifyUnlockInformation = register(
UnlockInformation)
AskNewOIDs, AnswerNewOIDs = register(
GenerateOIDs)
AskStoreObject, AnswerStoreObject = register(
StoreObject)
AbortTransaction = register(
AbortTransaction)
AskStoreTransaction, AnswerStoreTransaction = register(
StoreTransaction)
AskObject, AnswerObject = register(
GetObject)
AskTIDs, AnswerTIDs = register(
TIDList)
AskTransactionInformation, AnswerTransactionInformation = register(
TransactionInformation)
AskObjectHistory, AnswerObjectHistory = register(
ObjectHistory)
AskPartitionList, AnswerPartitionList = register(
PartitionList)
AskNodeList, AnswerNodeList = register(
NodeList)
SetNodeState = register(
SetNodeState, ignore_when_closed=False)
AddPendingNodes = register(
AddPendingNodes, ignore_when_closed=False)
AskNodeInformation, AnswerNodeInformation = register(
NodeInformation)
SetClusterState = register(
SetClusterState, ignore_when_closed=False)
NotifyClusterInformation = register(
ClusterInformation)
AskClusterState, AnswerClusterState = register(
ClusterState)
NotifyLastOID = register(
NotifyLastOID)
AskObjectUndoSerial, AnswerObjectUndoSerial = register(
ObjectUndoSerial)
AskHasLock, AnswerHasLock = register(
HasLock)
AskTIDsFrom, AnswerTIDsFrom = register(
TIDListFrom)
AskPack, AnswerPack = register(
Pack, ignore_when_closed=False)
CheckReplicas = register(
CheckReplicas)
CheckPartition = register(
CheckPartition)
AskCheckTIDRange, AnswerCheckTIDRange = register(
CheckTIDRange)
AskCheckSerialRange, AnswerCheckSerialRange = register(
CheckSerialRange)
NotifyPartitionCorrupted = register(
PartitionCorrupted)
NotifyReady = register(
NotifyReady)
AskLastTransaction, AnswerLastTransaction = register(
LastTransaction)
AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
CheckCurrentSerial)
NotifyTransactionFinished = register(
NotifyTransactionFinished)
Replicate = register(
Replicate)
NotifyReplicationDone = register(
ReplicationDone)
AskFetchTransactions, AnswerFetchTransactions = register(
FetchTransactions)
AskFetchObjects, AnswerFetchObjects = register(
FetchObjects)
AddTransaction = register(
AddTransaction)
AddObject = register(
AddObject)
AskTruncate, AnswerTruncate = register(
Truncate)
def Errors():
registry_dict = {}
handler_method_name_dict = {}
def register_error(code):
return lambda self, message='': Error(code, message)
for error in ErrorCodes:
name = ''.join(part.capitalize() for part in str(error).split('_'))
registry_dict[name] = register_error(int(error))
handler_method_name_dict[int(error)] = name[0].lower() + name[1:]
return type('ErrorRegistry', (dict,),
registry_dict)(handler_method_name_dict)
Errors = Errors()
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/pt.py 0000664 0000000 0000000 00000026573 12000764302 0024270 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import math
from functools import wraps
from . import logging, protocol
from .protocol import uuid_str, CellStates
from .util import u64
from .locking import RLock
class PartitionTableException(Exception):
"""
Base class for partition table exceptions
"""
class Cell(object):
"""This class represents a cell in a partition table."""
def __init__(self, node, state = CellStates.UP_TO_DATE):
self.node = node
self.state = state
def __repr__(self):
return "" % (
uuid_str(self.getUUID()),
self.getAddress(),
self.getState(),
)
def getState(self):
return self.state
def setState(self, state):
assert state != CellStates.DISCARDED
self.state = state
def isUpToDate(self):
return self.state == CellStates.UP_TO_DATE
def isOutOfDate(self):
return self.state == CellStates.OUT_OF_DATE
def isFeeding(self):
return self.state == CellStates.FEEDING
def isCorrupted(self):
return self.state == CellStates.CORRUPTED
def isReadable(self):
return self.state == CellStates.UP_TO_DATE or \
self.state == CellStates.FEEDING
def getNode(self):
return self.node
def getNodeState(self):
"""This is a short hand."""
return self.node.getState()
def getUUID(self):
return self.node.getUUID()
def getAddress(self):
return self.node.getAddress()
class PartitionTable(object):
"""This class manages a partition table."""
def __init__(self, num_partitions, num_replicas):
self._id = None
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
# Note: don't use [[]] * num_partition construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(num_partitions)]
self.count_dict = {}
def getID(self):
return self._id
def getPartitions(self):
return self.np
def getReplicas(self):
return self.nr
def clear(self):
"""Forget an existing partition table."""
self._id = None
self.num_filled_rows = 0
# Note: don't use [[]] * self.np construct, as it duplicates
# instance *references*, so the outer list contains really just one
# inner list instance.
self.partition_list = [[] for _ in xrange(self.np)]
self.count_dict.clear()
def getAssignedPartitionList(self, uuid):
""" Return the partition assigned to the specified UUID """
assigned_partitions = []
for offset in xrange(self.np):
for cell in self.getCellList(offset, readable=True):
if cell.getUUID() == uuid:
assigned_partitions.append(offset)
break
return assigned_partitions
def hasOffset(self, offset):
try:
return len(self.partition_list[offset]) > 0
except IndexError:
return False
def getNodeSet(self):
return set(x.getNode() for row in self.partition_list for x in row)
def getConnectedNodeList(self):
return [node for node in self.getNodeSet() if node.isConnected()]
def getNodeList(self):
"""Return all used nodes."""
return [node for node, count in self.count_dict.iteritems() \
if count > 0]
def getCellList(self, offset, readable=False):
if readable:
return filter(Cell.isReadable, self.partition_list[offset])
return list(self.partition_list[offset])
def getPartition(self, oid_or_tid):
return u64(oid_or_tid) % self.getPartitions()
def getOutdatedOffsetListFor(self, uuid):
return [
offset for offset in xrange(self.np)
for c in self.partition_list[offset]
if c.getUUID() == uuid and c.getState() == CellStates.OUT_OF_DATE
]
def isAssigned(self, oid, uuid):
""" Check if the oid is assigned to the given node """
for cell in self.partition_list[u64(oid) % self.np]:
if cell.getUUID() == uuid:
return True
return False
def getCell(self, offset, uuid):
for cell in self.partition_list[offset]:
if cell.getUUID() == uuid:
return cell
def setCell(self, offset, node, state):
if state == CellStates.DISCARDED:
return self.removeCell(offset, node)
if node.isBroken() or node.isDown():
raise PartitionTableException('Invalid node state')
self.count_dict.setdefault(node, 0)
for cell in self.partition_list[offset]:
if cell.getNode() is node:
if not cell.isFeeding():
self.count_dict[node] -= 1
cell.setState(state)
break
else:
row = self.partition_list[offset]
self.num_filled_rows += not row
row.append(Cell(node, state))
if state != CellStates.FEEDING:
self.count_dict[node] += 1
return offset, node.getUUID(), state
def removeCell(self, offset, node):
row = self.partition_list[offset]
for cell in row:
if cell.getNode() == node:
row.remove(cell)
if not cell.isFeeding():
self.count_dict[node] -= 1
break
return (offset, node.getUUID(), CellStates.DISCARDED)
def load(self, ptid, row_list, nm):
"""
Load the partition table with the specified PTID, discard all previous
content.
"""
self.clear()
self._id = ptid
for offset, row in row_list:
if offset >= self.getPartitions():
raise IndexError
for uuid, state in row:
node = nm.getByUUID(uuid)
# the node must be known by the node manager
assert node is not None
self.setCell(offset, node, state)
logging.debug('partition table loaded (ptid=%s)', ptid)
self.log()
def update(self, ptid, cell_list, nm):
"""
Update the partition with the cell list supplied. Ignore those changes
if the partition table ID is not greater than the current one. If a node
is not known, it is created in the node manager and set as unavailable
"""
if ptid <= self._id:
logging.warning('ignoring older partition changes')
return
self._id = ptid
for offset, uuid, state in cell_list:
node = nm.getByUUID(uuid)
assert node is not None, 'No node found for uuid ' + uuid_str(uuid)
self.setCell(offset, node, state)
logging.debug('partition table updated (ptid=%s)', ptid)
self.log()
def filled(self):
return self.num_filled_rows == self.np
def log(self):
logging.debug(self.format())
def format(self):
return '\n'.join(self._format())
def _format(self):
"""Help debugging partition table management.
Output sample:
pt: node 0: 67ae354b4ed240a0594d042cf5c01b28, R
pt: node 1: a68a01e8bf93e287bd505201c1405bc2, R
pt: node 2: ad7ffe8ceef4468a0c776f3035c7a543, R
pt: node 3: df57d7298678996705cd0092d84580f4, R
pt: 00: .UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.
pt: 11: U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U
Here, there are 4 nodes in RUNNING state.
The first partition has 2 replicas in UP_TO_DATE state, on nodes 1 and
2 (nodes 0 and 3 are displayed as unused for that partition by
displaying a dot).
The first number on the left represents the number of the first
partition on the line (here, line length is 11 to keep the docstring
width under 80 column).
"""
node_list = sorted(self.count_dict)
result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()])
for i, node in enumerate(node_list)]
append = result.append
line = []
max_line_len = 20 # XXX: hardcoded number of partitions per line
cell_state_dict = protocol.cell_state_prefix_dict
prefix = 0
prefix_len = int(math.ceil(math.log10(self.np)))
for offset, row in enumerate(self.partition_list):
if len(line) == max_line_len:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
line = []
prefix = offset
if row is None:
line.append('X' * len(node_list))
else:
cell_dict = dict((x.getNode(), cell_state_dict[x.getState()])
for x in row)
line.append(''.join(cell_dict.get(x, '.') for x in node_list))
if line:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
return result
def operational(self):
if not self.filled():
return False
for row in self.partition_list:
for cell in row:
if cell.isReadable() and cell.getNode().isRunning():
break
else:
return False
return True
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return []
return [(cell.getUUID(), cell.getState()) for cell in row]
def getRowList(self):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
def thread_safe(method):
def wrapper(self, *args, **kwargs):
self.lock()
try:
return method(self, *args, **kwargs)
finally:
self.unlock()
return wraps(method)(wrapper)
class MTPartitionTable(PartitionTable):
""" Thread-safe aware version of the partition table, override only methods
used in the client """
def __init__(self, *args, **kwargs):
self._lock = RLock()
PartitionTable.__init__(self, *args, **kwargs)
def lock(self):
self._lock.acquire()
def unlock(self):
self._lock.release()
@thread_safe
def setCell(self, *args, **kwargs):
return PartitionTable.setCell(self, *args, **kwargs)
@thread_safe
def clear(self, *args, **kwargs):
return PartitionTable.clear(self, *args, **kwargs)
@thread_safe
def operational(self, *args, **kwargs):
return PartitionTable.operational(self, *args, **kwargs)
@thread_safe
def getNodeList(self, *args, **kwargs):
return PartitionTable.getNodeList(self, *args, **kwargs)
neoppod-e2eef56214c493bf36466ec5cf98e2b13be0ac9f-neo-lib/neo/lib/util.py 0000664 0000000 0000000 00000011660 12000764302 0024611 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import re
import socket
from binascii import a2b_hex, b2a_hex
from hashlib import sha1
from Queue import deque
from struct import pack, unpack
SOCKET_CONNECTORS_DICT = {
socket.AF_INET : 'SocketConnectorIPv4',
socket.AF_INET6: 'SocketConnectorIPv6',
}
def u64(s):
return unpack('!Q', s)[0]
def p64(n):
return pack('!Q', n)
def add64(packed, offset):
"""Add a python number to a 64-bits packed value"""
return p64(u64(packed) + offset)
def dump(s):
"""Dump a binary string in hex."""
if s is not None:
if isinstance(s, str):
return b2a_hex(s)
return repr(s)
def bin(s):
"""Inverse of dump method."""
if s is not None:
return a2b_hex(s)
def makeChecksum(s):
"""Return a 20-byte checksum against a string."""
return sha1(s).digest()
def getAddressType(address):
"Return the type (IPv4 or IPv6) of an ip"
(host, port) = address
for af_type in SOCKET_CONNECTORS_DICT:
try :
socket.inet_pton(af_type, host)
except:
continue
else:
break
else:
raise ValueError("Unknown type of host", host)
return af_type
def getConnectorFromAddress(address):
address_type = getAddressType(address)
return SOCKET_CONNECTORS_DICT[address_type]
def parseNodeAddress(address, port_opt=None):
if address[:1] == '[':
(host, port) = address[1:].split(']')
if port[:1] == ':':
port = port[1:]
else:
port = port_opt
elif address.count(':') == 1:
(host, port) = address.split(':')
else:
host = address
port = port_opt
# Resolve (maybe) and cast to cannonical form
# XXX: Always pick the first result. This might not be what is desired, and
# if so this function should either take a hint on the desired address type
# or return either raw host & port or getaddrinfo return value.
return socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][4][:2]
def parseMasterList(masters, except_node=None):
assert masters, 'At least one master must be defined'
# load master node list
socket_connector = None
master_node_list = []
for node in masters.split(' '):
if not node:
continue
address = parseNodeAddress(node)
if (address != except_node):
master_node_list.append(address)
socket_connector_temp = getConnectorFromAddress(address)
if socket_connector is None:
socket_connector = socket_connector_temp
elif socket_connector != socket_connector_temp:
raise TypeError("Wrong connector type : you're trying to use "
"ipv6 and ipv4 simultaneously")
return master_node_list, socket_connector
class ReadBuffer(object):
"""
Implementation of a lazy buffer. Main purpose if to reduce useless
copies of data by storing chunks and join them only when the requested
size is available.
"""
def __init__(self):
self.size = 0
self.content = deque()
def append(self, data):
""" Append some data and compute the new buffer size """
size = len(data)
self.size += size
self.content.append((size, data))
def __len__(self):
""" Return the current buffer size """
return self.size
def read(self, size):
""" Read and consume size bytes """
if self.size < size:
return None
self.size -= size
chunk_list = []
pop_chunk = self.content.popleft
append_data = chunk_list.append
to_read = size
chunk_len = 0
# select required chunks
while to_read > 0:
chunk_size, chunk_data = pop_chunk()
to_read -= chunk_size
append_data(chunk_data)
if to_read < 0:
# too many bytes consumed, cut the last chunk
last_chunk = chunk_list[-1]
keep, let = last_chunk[:to_read], last_chunk[to_read:]
self.content.appendleft((-to_read, let))
chunk_list[-1] = keep
# join all chunks (one copy)
data = ''.join(chunk_list)
assert len(data) == size
return data
def clear(self):
""" Erase all buffer content """
self.size = 0
self.content.clear()
|