pax_global_header 0000666 0000000 0000000 00000000064 12440376027 0014517 g ustar 00root root 0000000 0000000 52 comment=9bd318038fd4d9697f469abd7f09b12a1fa6e85f
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/ 0000775 0000000 0000000 00000000000 12440376027 0022330 5 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/ 0000775 0000000 0000000 00000000000 12440376027 0023111 5 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/ 0000775 0000000 0000000 00000000000 12440376027 0024367 5 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/Storage.py 0000664 0000000 0000000 00000017324 12440376027 0026354 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB import BaseStorage, ConflictResolution, POSException
from zope.interface import implements
import ZODB.interfaces
from functools import wraps
from neo.lib import logging
from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
def raiseReadOnlyError(*args, **kw):
raise POSException.ReadOnlyError()
class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
implements(
ZODB.interfaces.IStorage,
# ZODB.interfaces.IStorageRestoreable,
ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageUndoable,
ZODB.interfaces.IExternalGC,
ZODB.interfaces.ReadVerifyingStorage,
)
def __init__(self, master_nodes, name, read_only=False,
compress=None, logfile=None, _app=None, **kw):
"""
Do not pass those parameters (used internally):
_app
"""
if compress is None:
compress = True
if logfile:
logging.setup(logfile)
BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
if read_only:
for method_id in (
'new_oid',
'tpc_begin',
'tpc_vote',
'tpc_abort',
'store',
'deleteObject',
'undo',
'undoLog',
):
setattr(self, method_id, raiseReadOnlyError)
if _app is None:
_app = Application(master_nodes, name, compress=compress, **kw)
self.app = _app
@property
def _cache(self):
return self.app._cache
def load(self, oid, version=''):
# In order to know if it was safe to get the last revision of an object
# instead of using loadBefore(), ZODB.Connection._setstate relies on
# the fact that retrieving data from a remote storage forces incoming
# invalidations to be received.
# But in NEO, invalidations are not received from the same network
# connection that the one used to retrieve data.
# So we must implement load() like a loadBefore().
# XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make
# it optional.
assert version == '', 'Versions are not supported'
try:
return self.app.load(oid)[:2]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def new_oid(self):
return self.app.new_oid()
def tpc_begin(self, transaction, tid=None, status=' '):
"""
Note: never blocks in NEO.
"""
return self.app.tpc_begin(transaction, tid, status)
def tpc_vote(self, transaction):
return self.app.tpc_vote(transaction, self.tryToResolveConflict)
def tpc_abort(self, transaction):
return self.app.tpc_abort(transaction)
def tpc_finish(self, transaction, f=None):
return self.app.tpc_finish(transaction, self.tryToResolveConflict, f)
def store(self, oid, serial, data, version, transaction):
assert version == '', 'Versions are not supported'
return self.app.store(oid, serial, data, version, transaction)
def deleteObject(self, oid, serial, transaction):
self.app.store(oid, serial, None, None, transaction)
# mutliple revisions
def loadSerial(self, oid, serial):
try:
return self.app.load(oid, serial)[0]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def loadBefore(self, oid, tid):
# XXX: FileStorage return an empty string for a deleted object
# but it may cause EOFError exceptions in ZODB.Connection
# and it makes impossible to store empty values.
# We think it's wrong behaviour and raise POSKeyError instead.
# Or maybe we should return None?
try:
return self.app.load(oid, None, tid)
except NEOStorageDoesNotExistError:
raise POSException.POSKeyError(oid)
except NEOStorageNotFoundError:
return None
@property
def iterator(self):
return self.app.iterator
# undo
def undo(self, transaction_id, txn):
return self.app.undo(transaction_id, txn, self.tryToResolveConflict)
def undoLog(self, first=0, last=-20, filter=None):
return self.app.undoLog(first, last, filter)
def supportsUndo(self):
return True
def supportsTransactionalUndo(self):
return True
def loadEx(self, oid, version):
try:
data, serial, _ = self.app.load(oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
return data, serial, ''
def __len__(self):
return self.app.getObjectCount()
def registerDB(self, db, limit=None):
self.app.registerDB(db, limit)
def history(self, oid, *args, **kw):
try:
return self.app.history(oid, *args, **kw)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def sync(self, force=True):
# XXX: sync() is part of IMVCCStorage and we don't want to be called
# from afterCompletion() so it may not be a good place to ping the
# master here. See also monkey-patch in __init__.py
self.app.lastTransaction()
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
return self.importFrom(source)
def importFrom(self, source, start=None, stop=None, preindex=None):
""" Allow import only a part of the source storage """
return self.app.importFrom(source, start, stop,
self.tryToResolveConflict, preindex)
def pack(self, t, referencesf, gc=False):
if gc:
logging.warning('Garbage Collection is not available in NEO,'
' please use an external tool. Packing without GC.')
self.app.pack(t)
def lastSerial(self):
# seems unused
raise NotImplementedError
def lastTransaction(self):
# Used in ZODB unit tests
return self.app.lastTransaction()
def _clear_temp(self):
raise NotImplementedError
def set_max_oid(self, possible_new_max_oid):
# seems used only by FileStorage
raise NotImplementedError
def cleanup(self):
# Used in unit tests to remove local database files.
# We have no such thing, so make this method a no-op.
pass
def close(self):
self.app.close()
def getTid(self, oid):
try:
return self.app.getLastTID(oid)
except NEOStorageNotFoundError:
raise KeyError
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
self.app.checkCurrentSerialInTransaction(oid, serial, transaction)
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/__init__.py 0000664 0000000 0000000 00000006434 12440376027 0026507 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (C) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
if 1:
from hashlib import md5
from ZODB.Connection import Connection
def _check(f, *args):
h = md5(f.func_code.co_code).hexdigest()
assert h in args, h
# Allow serial to be returned as late as tpc_finish
#
# This makes possible for storage to allocate serial inside tpc_finish,
# removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort).
_check(Connection.tpc_finish,
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
#
serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert isinstance(serial, str), repr(serial)
for oid_iterator in (self._modified, self._creating):
for oid in oid_iterator:
obj = self._cache.get(oid, None)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
#
self._tpc_cleanup()
Connection.tpc_finish = tpc_finish
# IStorage implementations usually need to provide a "network barrier",
# at least for NEO & ZEO, to make sure we have an up-to-date view of
# the storage. It's unclear whether sync() is a good place to do this
# because a round-trip to the server introduces latency and we prefer
# it's not done when it's not useful.
# For example, we know we are up-to-date after a successful commit,
# so this should not be done in afterCompletion(), and anyway, we don't
# know any legitimate use of DB access outside a transaction.
_check(Connection.afterCompletion,
'cd3a080b80fd957190ff3bb867149448', # Python 2.7
)
def afterCompletion(self, *ignored):
self._readCurrent.clear()
# PATCH: do not call sync()
self._flush_invalidations()
Connection.afterCompletion = afterCompletion
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/app.py 0000664 0000000 0000000 00000123444 12440376027 0025531 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from cPickle import dumps, loads
from zlib import compress, decompress
from neo.lib.locking import Empty
from random import shuffle
import heapq
import time
from functools import partial
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.POSException import ReadConflictError
from ZODB.ConflictResolution import ResolvedSerial
from persistent.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, ZERO_HASH, ZERO_TID
from neo.lib.event import EventManager
from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Lock
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.node import NodeManager
from neo.lib.connector import getConnectorHandler
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
from .handlers import storage, master
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from .poll import ThreadedPoll, psThreadedPoll
from .cache import ClientCache
from .pool import ConnectionPool
from neo.lib.util import p64, u64, parseMasterList
from neo.lib.debug import register as registerLiveDebugger
from .container import ThreadContainer, TransactionContainer
CHECKED_SERIAL = master.CHECKED_SERIAL
try:
from Signals.Signals import SignalHandler
except ImportError:
SignalHandler = None
if SignalHandler:
import signal
SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
class Application(object):
"""The client node application."""
def __init__(self, master_nodes, name, compress=True,
dynamic_master_list=None):
# Start polling thread
self.em = EventManager()
self.poll_thread = ThreadedPoll(self.em, name=name)
psThreadedPoll()
# Internal Attributes common to all thread
self._db = None
self.name = name
master_addresses, connector_name = parseMasterList(master_nodes)
self.connector_handler = getConnectorHandler(connector_name)
self.dispatcher = Dispatcher(self.poll_thread)
self.nm = NodeManager(dynamic_master_list)
self.cp = ConnectionPool(self)
self.master_conn = None
self.primary_master_node = None
self.trying_master_node = None
# load master node list
for address in master_addresses:
self.nm.createMaster(address=address)
# no self-assigned UUID, primary master will supply us one
self.uuid = None
self._cache = ClientCache()
self._loading_oid = None
self.new_oid_list = ()
self.last_oid = '\0' * 8
self.last_tid = None
self.storage_event_handler = storage.StorageEventHandler(self)
self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
self.storage_handler = storage.StorageAnswersHandler(self)
self.primary_handler = master.PrimaryAnswersHandler(self)
self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
self.notifications_handler = master.PrimaryNotificationsHandler( self)
# Internal attribute distinct between thread
self._thread_container = ThreadContainer()
self._txn_container = TransactionContainer()
# Lock definition :
# _load_lock is used to make loading and storing atomic
lock = Lock()
self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release
# _oid_lock is used in order to not call multiple oid
# generation at the same time
lock = Lock()
self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release
lock = Lock()
# _cache_lock is used for the client cache
self._cache_lock_acquire = lock.acquire
self._cache_lock_release = lock.release
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attemps
self._connecting_to_master_node = Lock()
# _nm ensure exclusive access to the node manager
lock = Lock()
self._nm_acquire = lock.acquire
self._nm_release = lock.release
self.compress = compress
registerLiveDebugger(on_log=self.log)
def __getattr__(self, attr):
if attr == 'pt':
self._getMasterConnection()
return self.__getattribute__(attr)
@property
def txn_contexts(self):
# do not iter lazily to avoid race condition
return self._txn_container.values
def getHandlerData(self):
return self._thread_container.answer
def setHandlerData(self, data):
self._thread_container.answer = data
def log(self):
self.em.log()
self.nm.log()
if self.pt is not None:
self.pt.log()
def _handlePacket(self, conn, packet, kw={}, handler=None):
"""
conn
The connection which received the packet (forwarded to handler).
packet
The packet to handle.
handler
The handler to use to handle packet.
If not given, it will be guessed from connection's not type.
"""
if handler is None:
# Guess the handler to use based on the type of node on the
# connection
node = self.nm.getByAddress(conn.getAddress())
if node is None:
raise ValueError, 'Expecting an answer from a node ' \
'which type is not known... Is this right ?'
if node.isStorage():
handler = self.storage_handler
elif node.isMaster():
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (node.__class__, )
conn.lock()
try:
handler.dispatch(conn, packet, kw)
finally:
conn.unlock()
def _waitAnyMessage(self, queue, block=True):
"""
Handle all pending packets.
block
If True (default), will block until at least one packet was
received.
"""
pending = self.dispatcher.pending
get = queue.get
_handlePacket = self._handlePacket
while pending(queue):
try:
conn, packet, kw = get(block)
except Empty:
break
if packet is None or isinstance(packet, ForgottenPacket):
# connection was closed or some packet was forgotten
continue
block = False
try:
_handlePacket(conn, packet, kw)
except ConnectionClosed:
pass
def _waitAnyTransactionMessage(self, txn_context, block=True):
"""
Just like _waitAnyMessage, but for per-transaction exchanges, rather
than per-thread.
"""
queue = txn_context['queue']
self.setHandlerData(txn_context)
try:
self._waitAnyMessage(queue, block=block)
finally:
# Don't leave access to thread context, even if a raise happens.
self.setHandlerData(None)
def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None)
queue = self._thread_container.queue
msg_id = conn.ask(packet, queue=queue, **kw)
get = queue.get
_handlePacket = self._handlePacket
while True:
qconn, qpacket, kw = get(True)
is_forgotten = isinstance(qpacket, ForgottenPacket)
if conn is qconn:
# check fake packet
if qpacket is None:
raise ConnectionClosed
if msg_id == qpacket.getId():
if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \
'explicitely expected packet.'
_handlePacket(qconn, qpacket, kw, handler)
break
if not is_forgotten and qpacket is not None:
_handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
def _askStorage(self, conn, packet, **kw):
""" Send a request to a storage node and process its answer """
return self._ask(conn, packet, handler=self.storage_handler, **kw)
def _askPrimary(self, packet, **kw):
""" Send a request to the primary master and process its answer """
return self._ask(self._getMasterConnection(), packet,
handler=self.primary_handler, **kw)
def _getMasterConnection(self):
""" Connect to the primary master node on demand """
# For performance reasons, get 'master_conn' without locking.
result = self.master_conn
if result is None:
# If not connected, 'master_conn' must be tested again while we have
# the lock, to avoid concurrent threads reconnecting.
with self._connecting_to_master_node:
result = self.master_conn
if result is None:
self.new_oid_list = ()
result = self.master_conn = self._connectToPrimaryNode()
return result
def _connectToPrimaryNode(self):
"""
Lookup for the current primary master node
"""
logging.debug('connecting to primary master...')
index = 0
ask = self._ask
handler = self.primary_bootstrap_handler
while 1:
# Get network connection to primary master
while 1:
if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node
self.primary_master_node = None
else:
# Otherwise, check one by one.
master_list = self.nm.getMasterList()
try:
self.trying_master_node = master_list[index]
except IndexError:
time.sleep(1)
index = 0
self.trying_master_node = master_list[0]
index += 1
# Connect to master
conn = MTClientConnection(self.em,
self.notifications_handler,
node=self.trying_master_node,
connector=self.connector_handler(),
dispatcher=self.dispatcher)
# Query for primary master node
if conn.getConnector() is None:
# This happens if a connection could not be established.
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
try:
ask(conn, Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name),
handler=handler)
except ConnectionClosed:
continue
# If we reached the primary master node, mark as connected
if self.primary_master_node is not None and \
self.primary_master_node is self.trying_master_node:
break
logging.info('Connected to %s', self.primary_master_node)
try:
# Request identification and required informations to be
# operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again.
logging.info('Initializing from master')
ask(conn, Packets.AskNodeInformation(), handler=handler)
ask(conn, Packets.AskPartitionTable(), handler=handler)
ask(conn, Packets.AskLastTransaction(), handler=handler)
if self.pt.operational():
break
except ConnectionClosed:
logging.error('Connection to %s lost', self.trying_master_node)
self.primary_master_node = None
logging.info("Connected and ready")
return conn
def registerDB(self, db, limit):
self._db = db
def getDB(self):
return self._db
def new_oid(self):
"""Get a new OID."""
self._oid_lock_acquire()
try:
if not self.new_oid_list:
# Get new oid list from master node
# we manage a list of oid here to prevent
# from asking too many time new oid one by one
# from master node
self._askPrimary(Packets.AskNewOIDs(100))
if not self.new_oid_list:
raise NEOStorageError('new_oid failed')
self.last_oid = oid = self.new_oid_list.pop()
return oid
finally:
self._oid_lock_release()
def getObjectCount(self):
# return the last OID used, this is inaccurate
return int(u64(self.last_oid))
def load(self, oid, tid=None, before_tid=None):
"""
Internal method which manage load, loadSerial and loadBefore.
OID and TID (serial) parameters are expected packed.
oid
OID of object to get.
tid
If given, the exact serial at which OID is desired.
before_tid should be None.
before_tid
If given, the excluded upper bound serial at which OID is desired.
serial should be None.
Return value: (3-tuple)
- Object data (None if object creation was undone).
- Serial of given data.
- Next serial at which object exists, or None. Only set when tid
parameter is not None.
Exceptions:
NEOStorageError
technical problem
NEOStorageNotFoundError
object exists but no data satisfies given parameters
NEOStorageDoesNotExistError
object doesn't exist
NEOStorageCreationUndoneError
object existed, but its creation was undone
Note that loadSerial is used during conflict resolution to load
object's current version, which is not visible to us normaly (it was
committed after our snapshot was taken).
"""
# TODO:
# - rename parameters (here? and in handlers & packet definitions)
acquire = self._cache_lock_acquire
release = self._cache_lock_release
# XXX: Is it possible this giant lock ?
# See commit b77c946d67c9d7cc1e9ee9b15437568dee144aa4
# for a way to invalidate cache properly when several loads
# are done simultaneously.
self._load_lock_acquire()
try:
acquire()
try:
result = self._loadFromCache(oid, tid, before_tid)
if result:
return result
self._loading_oid = oid
finally:
release()
# When not bound to a ZODB Connection, load() may be the
# first method called and last_tid may still be None.
# This happens, for example, when opening the DB.
if not (tid or before_tid) and self.last_tid:
# Do not get something more recent than the last invalidation
# we got from master.
before_tid = p64(u64(self.last_tid) + 1)
data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
acquire()
try:
result = data, tid, (next_tid if self._loading_oid or next_tid
else self._loading_invalidated)
self._cache.store(oid, *result)
return result
finally:
release()
finally:
self._load_lock_release()
def _loadFromStorage(self, oid, at_tid, before_tid):
packet = Packets.AskObject(oid, at_tid, before_tid)
for node, conn in self.cp.iterateForObject(oid, readable=True):
try:
tid, next_tid, compression, checksum, data, data_tid \
= self._askStorage(conn, packet)
except ConnectionClosed:
continue
if data or checksum != ZERO_HASH:
if checksum != makeChecksum(data):
logging.error('wrong checksum from %s for oid %s',
conn, dump(oid))
continue
return (decompress(data) if compression else data,
tid, next_tid, data_tid)
raise NEOStorageCreationUndoneError(dump(oid))
# We didn't got any object from all storage node because of
# connection error
raise NEOStorageError('connection failure')
def _loadFromCache(self, oid, at_tid=None, before_tid=None):
"""
Load from local cache, return None if not found.
"""
if at_tid:
result = self._cache.load(oid, at_tid + '*')
assert not result or result[1] == at_tid
return result
return self._cache.load(oid, before_tid)
def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction."""
# First get a transaction, only one is allowed at a time
txn_context = self._txn_container.new(transaction)
# use the given TID or request a new one to the master
answer_ttid = self._askPrimary(Packets.AskBeginTransaction(tid))
if answer_ttid is None:
raise NEOStorageError('tpc_begin failed')
assert tid in (None, answer_ttid), (tid, answer_ttid)
txn_context['ttid'] = answer_ttid
def store(self, oid, serial, data, version, transaction):
"""Store object."""
logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
self._store(self._txn_container.get(transaction), oid, serial, data)
def _store(self, txn_context, oid, serial, data, data_serial=None,
unlock=False):
ttid = txn_context['ttid']
if data is None:
# This is some undo: either a no-data object (undoing object
# creation) or a back-pointer to an earlier revision (going back to
# an older object revision).
compressed_data = ''
compression = 0
checksum = ZERO_HASH
else:
assert data_serial is None
size = len(data)
if self.compress:
compressed_data = compress(data)
if size < len(compressed_data):
compressed_data = data
compression = 0
else:
compression = 1
else:
compression = 0
compressed_data = data
checksum = makeChecksum(compressed_data)
txn_context['data_size'] += size
on_timeout = partial(
self.onStoreTimeout,
txn_context=txn_context,
oid=oid,
)
# Store object in tmp cache
txn_context['data_dict'][oid] = data
# Store data on each node
txn_context['object_stored_counter_dict'][oid] = {}
txn_context['object_base_serial_dict'].setdefault(oid, serial)
txn_context['object_serial_dict'][oid] = serial
queue = txn_context['queue']
involved_nodes = txn_context['involved_nodes']
add_involved_nodes = involved_nodes.add
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid, unlock)
for node, conn in self.cp.iterateForObject(oid):
try:
conn.ask(packet, on_timeout=on_timeout, queue=queue)
add_involved_nodes(node)
except ConnectionClosed:
continue
if not involved_nodes:
raise NEOStorageError("Store failed")
while txn_context['data_size'] >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context)
self._waitAnyTransactionMessage(txn_context, False)
def onStoreTimeout(self, conn, msg_id, txn_context, oid):
# NOTE: this method is called from poll thread, don't use
# thread-specific value !
txn_context.setdefault('timeout_dict', {})[oid] = msg_id
# Ask the storage if someone locks the object.
# By sending a message with a smaller timeout,
# the connection will be kept open.
conn.ask(Packets.AskHasLock(txn_context['ttid'], oid),
timeout=5, queue=txn_context['queue'])
def _handleConflicts(self, txn_context, tryToResolveConflict):
result = []
append = result.append
# Check for conflicts
data_dict = txn_context['data_dict']
object_base_serial_dict = txn_context['object_base_serial_dict']
object_serial_dict = txn_context['object_serial_dict']
conflict_serial_dict = txn_context['conflict_serial_dict'].copy()
txn_context['conflict_serial_dict'].clear()
resolved_conflict_serial_dict = txn_context[
'resolved_conflict_serial_dict']
for oid, conflict_serial_set in conflict_serial_dict.iteritems():
conflict_serial = max(conflict_serial_set)
serial = object_serial_dict[oid]
if ZERO_TID in conflict_serial_set:
if 1:
# XXX: disable deadlock avoidance code until it is fixed
logging.info('Deadlock avoidance on %r:%r',
dump(oid), dump(serial))
# 'data' parameter of ConflictError is only used to report the
# class of the object. It doesn't matter if 'data' is None
# because the transaction is too big.
try:
data = data_dict[oid]
except KeyError:
data = txn_context['cache_dict'][oid]
else:
# Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of
# "locking priority": when a higher value has the lock,
# this means we stored objects "too late", and we would
# otherwise cause a deadlock.
# To recover, we must ask storages to release locks we
# hold (to let possibly-competing transactions acquire
# them), and requeue our already-sent store requests.
# XXX: currently, brute-force is implemented: we send
# object data again.
# WARNING: not maintained code
logging.info('Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial))
for store_oid, store_data in data_dict.iteritems():
store_serial = object_serial_dict[store_oid]
if store_data is CHECKED_SERIAL:
self._checkCurrentSerialInTransaction(txn_context,
store_oid, store_serial)
else:
if store_data is None:
# Some undo
logging.warning('Deadlock avoidance cannot reliably'
' work with undo, this must be implemented.')
conflict_serial = ZERO_TID
break
self._store(txn_context, store_oid, store_serial,
store_data, unlock=True)
else:
continue
else:
data = data_dict.pop(oid)
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial))
if data: # XXX: can 'data' be None ???
txn_context['data_size'] -= len(data)
resolved_serial_set = resolved_conflict_serial_dict.setdefault(
oid, set())
if resolved_serial_set and conflict_serial <= max(
resolved_serial_set):
# A later serial has already been resolved, skip.
resolved_serial_set.update(conflict_serial_set)
continue
try:
new_data = tryToResolveConflict(oid, conflict_serial,
serial, data)
except ConflictError:
logging.info('Conflict resolution failed for '
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
else:
logging.info('Conflict resolution succeeded for '
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
# Mark this conflict as resolved
resolved_serial_set.update(conflict_serial_set)
# Base serial changes too, as we resolved a conflict
object_base_serial_dict[oid] = conflict_serial
# Try to store again
self._store(txn_context, oid, conflict_serial, new_data)
append(oid)
continue
raise ConflictError(oid=oid, serials=(conflict_serial,
serial), data=data)
return result
def waitResponses(self, queue):
"""Wait for all requests to be answered (or their connection to be
detected as closed)"""
pending = self.dispatcher.pending
_waitAnyMessage = self._waitAnyMessage
while pending(queue):
_waitAnyMessage(queue)
def waitStoreResponses(self, txn_context, tryToResolveConflict):
result = []
append = result.append
resolved_oid_set = set()
update = resolved_oid_set.update
_handleConflicts = self._handleConflicts
queue = txn_context['queue']
conflict_serial_dict = txn_context['conflict_serial_dict']
pending = self.dispatcher.pending
_waitAnyTransactionMessage = self._waitAnyTransactionMessage
while pending(queue) or conflict_serial_dict:
# Note: handler data can be overwritten by _handleConflicts
# so we must set it for each iteration.
_waitAnyTransactionMessage(txn_context)
if conflict_serial_dict:
conflicts = _handleConflicts(txn_context,
tryToResolveConflict)
if conflicts:
update(conflicts)
# Check for never-stored objects, and update result for all others
for oid, store_dict in \
txn_context['object_stored_counter_dict'].iteritems():
if not store_dict:
logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set:
append((oid, ResolvedSerial))
return result
def tpc_vote(self, transaction, tryToResolveConflict):
"""Store current transaction."""
txn_context = self._txn_container.get(transaction)
result = self.waitStoreResponses(txn_context, tryToResolveConflict)
ttid = txn_context['ttid']
# Store data on each node
assert not txn_context['data_dict'], txn_context
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
txn_context['cache_dict'])
add_involved_nodes = txn_context['involved_nodes'].add
for node, conn in self.cp.iterateForObject(ttid):
logging.debug("voting transaction %s on %s", dump(ttid),
dump(conn.getUUID()))
try:
self._askStorage(conn, packet)
except ConnectionClosed:
continue
add_involved_nodes(node)
# check at least one storage node accepted
if txn_context['involved_nodes']:
txn_context['voted'] = None
# We must not go further if connection to master was lost since
# tpc_begin, to lower the probability of failing during tpc_finish.
if 'error' in txn_context:
raise NEOStorageError(txn_context['error'])
return result
logging.error('tpc_vote failed')
raise NEOStorageError('tpc_vote failed')
def tpc_abort(self, transaction):
"""Abort current transaction."""
txn_context = self._txn_container.pop(transaction)
if txn_context is None:
return
ttid = txn_context['ttid']
p = Packets.AbortTransaction(ttid)
getConnForNode = self.cp.getConnForNode
# cancel transaction one all those nodes
for node in txn_context['involved_nodes']:
conn = getConnForNode(node)
if conn is None:
continue
try:
conn.notify(p)
except:
logging.exception('Exception in tpc_abort while notifying'
'storage node %r of abortion, ignoring.', conn)
conn = self.master_conn
if conn is not None:
conn.notify(p)
# We don't need to flush queue, as it won't be reused by future
# transactions (deleted on next line & indexed by transaction object
# instance).
self.dispatcher.forget_queue(txn_context['queue'], flush_queue=False)
def tpc_finish(self, transaction, tryToResolveConflict, f=None):
"""Finish current transaction."""
txn_container = self._txn_container
if 'voted' not in txn_container.get(transaction):
self.tpc_vote(transaction, tryToResolveConflict)
self._load_lock_acquire()
try:
# Call finish on master
txn_context = txn_container.pop(transaction)
cache_dict = txn_context['cache_dict']
tid = self._askPrimary(Packets.AskFinishTransaction(
txn_context['ttid'], cache_dict),
cache_dict=cache_dict, callback=f)
assert tid
return tid
finally:
self._load_lock_release()
def undo(self, undone_tid, txn, tryToResolveConflict):
txn_context = self._txn_container.get(txn)
txn_info, txn_ext = self._getTransactionInformation(undone_tid)
txn_oid_list = txn_info['oids']
# Regroup objects per partition, to ask a minimum set of storage.
partition_oid_dict = {}
for oid in txn_oid_list:
partition = self.pt.getPartition(oid)
try:
oid_list = partition_oid_dict[partition]
except KeyError:
oid_list = partition_oid_dict[partition] = []
oid_list.append(oid)
# Ask storage the undo serial (serial at which object's previous data
# is)
getCellList = self.pt.getCellList
getCellSortKey = self.cp.getCellSortKey
getConnForCell = self.cp.getConnForCell
queue = self._thread_container.queue
ttid = txn_context['ttid']
undo_object_tid_dict = {}
snapshot_tid = p64(u64(self.last_tid) + 1)
for partition, oid_list in partition_oid_dict.iteritems():
cell_list = getCellList(partition, readable=True)
# We do want to shuffle before getting one with the smallest
# key, so that all cells with the same (smallest) key has
# identical chance to be chosen.
shuffle(cell_list)
storage_conn = getConnForCell(min(cell_list, key=getCellSortKey))
storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
snapshot_tid, undone_tid, oid_list),
queue=queue, undo_object_tid_dict=undo_object_tid_dict)
# Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
# meaning that objects in transaction's oid_list do not exist any
# longer. This is the symptom of a pack, so forbid undoing transaction
# when it happens.
try:
self.waitResponses(queue)
except NEOStorageNotFoundError:
self.dispatcher.forget_queue(queue)
raise UndoError('non-undoable transaction')
# Send undo data to all storage nodes.
for oid in txn_oid_list:
current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
if is_current:
data = None
else:
# Serial being undone is not the latest version for this
# object. This is an undo conflict, try to resolve it.
try:
# Load the latest version we are supposed to see
data = self.load(oid, current_serial)[0]
# Load the version we were undoing to
undo_data = self.load(oid, undo_serial)[0]
except NEOStorageNotFoundError:
raise UndoError('Object not found while resolving undo '
'conflict')
# Resolve conflict
try:
data = tryToResolveConflict(oid, current_serial,
undone_tid, undo_data, data)
except ConflictError:
raise UndoError('Some data were modified by a later ' \
'transaction', oid)
undo_serial = None
self._store(txn_context, oid, current_serial, data, undo_serial)
return None, txn_oid_list
def _insertMetadata(self, txn_info, extension):
for k, v in loads(extension).items():
txn_info[k] = v
def _getTransactionInformation(self, tid):
packet = Packets.AskTransactionInformation(tid)
for node, conn in self.cp.iterateForObject(tid, readable=True):
try:
txn_info, txn_ext = self._askStorage(conn, packet)
except ConnectionClosed:
continue
except NEOStorageNotFoundError:
# TID not found
continue
break
else:
raise NEOStorageError('Transaction %r not found' % (tid, ))
return (txn_info, txn_ext)
def undoLog(self, first, last, filter=None, block=0):
# XXX: undoLog is broken
if last < 0:
# See FileStorage.py for explanation
last = first - last
# First get a list of transactions from all storage nodes.
# Each storage node will return TIDs only for UP_TO_DATE state and
# FEEDING state cells
queue = self._thread_container.queue
packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
tid_set = set()
for storage_node in self.pt.getNodeSet(True):
conn = self.cp.getConnForNode(storage_node)
if conn is None:
continue
conn.ask(packet, queue=queue, tid_set=tid_set)
# Wait for answers from all storages.
self.waitResponses(queue)
# Reorder tids
ordered_tids = sorted(tid_set, reverse=True)
logging.debug("UndoLog tids %s", map(dump, ordered_tids))
# For each transaction, get info
undo_info = []
append = undo_info.append
for tid in ordered_tids:
(txn_info, txn_ext) = self._getTransactionInformation(tid)
if filter is None or filter(txn_info):
txn_info.pop('packed')
txn_info.pop("oids")
self._insertMetadata(txn_info, txn_ext)
append(txn_info)
if len(undo_info) >= last - first:
break
# Check we return at least one element, otherwise call
# again but extend offset
if len(undo_info) == 0 and not block:
undo_info = self.undoLog(first=first, last=last*5, filter=filter,
block=1)
return undo_info
def transactionLog(self, start, stop, limit):
tid_list = []
# request a tid list for each partition
for offset in xrange(self.pt.getPartitions()):
p = Packets.AskTIDsFrom(start, stop, limit, offset)
for node, conn in self.cp.iterateForObject(offset, readable=True):
try:
r = self._askStorage(conn, p)
break
except ConnectionClosed:
pass
else:
raise NEOStorageError('transactionLog failed')
if r:
tid_list = list(heapq.merge(tid_list, r))
if len(tid_list) >= limit:
del tid_list[limit:]
stop = tid_list[-1]
# request transactions informations
txn_list = []
append = txn_list.append
tid = None
for tid in tid_list:
(txn_info, txn_ext) = self._getTransactionInformation(tid)
txn_info['ext'] = loads(txn_ext)
append(txn_info)
return (tid, txn_list)
def history(self, oid, size=1, filter=None):
# Get history informations for object first
packet = Packets.AskObjectHistory(oid, 0, size)
for node, conn in self.cp.iterateForObject(oid, readable=True):
try:
history_list = self._askStorage(conn, packet)
except ConnectionClosed:
continue
# Now that we have object informations, get txn informations
result = []
# history_list is already sorted descending (by the storage)
for serial, size in history_list:
txn_info, txn_ext = self._getTransactionInformation(serial)
# create history dict
txn_info.pop('id')
txn_info.pop('oids')
txn_info.pop('packed')
txn_info['tid'] = serial
txn_info['version'] = ''
txn_info['size'] = size
if filter is None or filter(txn_info):
result.append(txn_info)
self._insertMetadata(txn_info, txn_ext)
return result
def importFrom(self, source, start, stop, tryToResolveConflict,
preindex=None):
# TODO: The main difference with BaseStorage implementation is that
# preindex can't be filled with the result 'store' (tid only
# known after 'tpc_finish'. This method could be dropped if we
# implemented IStorageRestoreable (a wrapper around source would
# still be required for partial import).
if preindex is None:
preindex = {}
for transaction in source.iterator(start, stop):
tid = transaction.tid
self.tpc_begin(transaction, tid, transaction.status)
for r in transaction:
oid = r.oid
pre = preindex.get(oid)
self.store(oid, pre, r.data, r.version, transaction)
preindex[oid] = tid
conflicted = self.tpc_vote(transaction, tryToResolveConflict)
assert not conflicted, conflicted
real_tid = self.tpc_finish(transaction, tryToResolveConflict)
assert real_tid == tid, (real_tid, tid)
from .iterator import iterator
def lastTransaction(self):
self._askPrimary(Packets.AskLastTransaction())
return self.last_tid
def __del__(self):
"""Clear all connection."""
# Due to bug in ZODB, close is not always called when shutting
# down zope, so use __del__ to close connections
for conn in self.em.getConnectionList():
conn.close()
self.cp.flush()
self.master_conn = None
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.poll_thread.stop()
psThreadedPoll()
close = __del__
def pack(self, t):
tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
if tid == ZERO_TID:
raise NEOStorageError('Invalid pack time')
self._askPrimary(Packets.AskPack(tid))
# XXX: this is only needed to make ZODB unit tests pass.
# It should not be otherwise required (clients should be free to load
# old data as long as it is available in cache, event if it was pruned
# by a pack), so don't bother invalidating on other clients.
self._cache_lock_acquire()
try:
self._cache.clear()
finally:
self._cache_lock_release()
def getLastTID(self, oid):
return self.load(oid)[1]
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
self._checkCurrentSerialInTransaction(
self._txn_container.get(transaction), oid, serial)
def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
ttid = txn_context['ttid']
txn_context['object_serial_dict'][oid] = serial
# Placeholders
queue = txn_context['queue']
txn_context['object_stored_counter_dict'][oid] = {}
# ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
# after stores, and skips oids that have been succeessfully stored.
assert oid not in txn_context['cache_dict'], (oid, txn_context)
txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for node, conn in self.cp.iterateForObject(oid):
try:
conn.ask(packet, queue=queue)
except ConnectionClosed:
continue
self._waitAnyTransactionMessage(txn_context, False)
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/cache.py 0000664 0000000 0000000 00000023725 12440376027 0026015 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2011-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import math
from bisect import insort
class CacheItem(object):
__slots__ = ('oid', 'tid', 'next_tid', 'data',
'counter', 'level', 'expire',
'prev', 'next')
def __repr__(self):
s = ''
for attr in self.__slots__:
try:
value = getattr(self, attr)
if value:
if attr in ('prev', 'next'):
s += ' %s=<...>' % attr
continue
elif attr == 'data':
value = '...'
s += ' %s=%r' % (attr, value)
except AttributeError:
pass
return '<%s%s>' % (self.__class__.__name__, s)
def __lt__(self, other):
return self.tid < other.tid
class ClientCache(object):
"""In-memory pickle cache based on Multi-Queue cache algorithm
Multi-Queue algorithm for Second Level Buffer Caches:
http://www.usenix.org/event/usenix01/full_papers/zhou/zhou_html/index.html
Quick description:
- There are multiple "regular" queues, plus a history queue
- The queue to store an object in depends on its access frequency
- The queue an object is in defines its lifespan (higher-index queue eq.
longer lifespan)
-> The more often an object is accessed, the higher lifespan it will
have
- Upon cache or history hit, object frequency is increased and object
might get moved to longer-lived queue
- Each access "ages" objects in cache, and an aging object is moved to
shorter-lived queue as it ages without being accessed, or in the
history queue if it's really too old.
"""
__slots__ = ('_life_time', '_max_history_size', '_max_size',
'_queue_list', '_oid_dict', '_time', '_size', '_history_size')
def __init__(self, life_time=10000, max_history_size=100000,
max_size=20*1024*1024):
self._life_time = life_time
self._max_history_size = max_history_size
self._max_size = max_size
self.clear()
def clear(self):
"""Reset cache"""
self._queue_list = [None] # first is history
self._oid_dict = {}
self._time = 0
self._size = 0
self._history_size = 0
def _iterQueue(self, level):
"""for debugging purpose"""
if level < len(self._queue_list):
item = head = self._queue_list[level]
if item:
while 1:
yield item
item = item.next
if item is head:
break
def _add(self, item):
level = item.level
try:
head = self._queue_list[level]
except IndexError:
assert len(self._queue_list) == level
self._queue_list.append(item)
item.prev = item.next = item
else:
if head:
item.prev = tail = head.prev
tail.next = head.prev = item
item.next = head
else:
self._queue_list[level] = item
item.prev = item.next = item
if level:
item.expire = self._time + self._life_time
else:
self._size -= len(item.data)
item.data = None
if self._history_size < self._max_history_size:
self._history_size += 1
else:
self._remove(head)
item_list = self._oid_dict[head.oid]
item_list.remove(head)
if not item_list:
del self._oid_dict[head.oid]
def _remove(self, item):
level = item.level
if level is not None:
item.level = level - 1
next = item.next
if next is item:
self._queue_list[level] = next = None
else:
item.prev.next = next
next.prev = item.prev
if self._queue_list[level] is item:
self._queue_list[level] = next
return next
def _fetched(self, item, _log=math.log):
self._remove(item)
item.counter = counter = item.counter + 1
# XXX It might be better to adjust the level according to the object
# size. See commented factor for example.
item.level = 1 + int(_log(counter, 2)
# * (1.01 - float(len(item.data)) / self._max_size)
)
self._add(item)
self._time = time = self._time + 1
for head in self._queue_list[1:]:
if head and head.expire < time:
self._remove(head)
self._add(head)
break
def _load(self, oid, before_tid=None):
item_list = self._oid_dict.get(oid)
if item_list:
if before_tid:
for item in reversed(item_list):
if item.tid < before_tid:
next_tid = item.next_tid
if next_tid and next_tid < before_tid:
break
return item
else:
item = item_list[-1]
if not item.next_tid:
return item
def load(self, oid, before_tid=None):
"""Return a revision of oid that was current before given tid"""
item = self._load(oid, before_tid)
if item:
data = item.data
if data is not None:
self._fetched(item)
return data, item.tid, item.next_tid
def store(self, oid, data, tid, next_tid):
"""Store a new data record in the cache"""
size = len(data)
max_size = self._max_size
if size < max_size:
item = self._load(oid, next_tid)
if item:
assert item.tid == tid and item.next_tid == next_tid
if item.level: # already stored
assert item.data == data
return
assert not item.data
self._history_size -= 1
else:
item = CacheItem()
item.oid = oid
item.tid = tid
item.next_tid = next_tid
item.counter = 0
item.level = None
try:
item_list = self._oid_dict[oid]
except KeyError:
self._oid_dict[oid] = [item]
else:
if next_tid:
insort(item_list, item)
else:
prev = item_list[-1]
item.counter = prev.counter
prev.counter = 0
if prev.level > 1:
self._fetched(prev)
item_list.append(item)
item.data = data
self._fetched(item)
self._size += size
if max_size < self._size:
for head in self._queue_list[1:]:
while head:
next = self._remove(head)
head.level = 0
self._add(head)
if self._size <= max_size:
return
head = next
def invalidate(self, oid, tid):
"""Mark data record as being valid only up to given tid"""
try:
item = self._oid_dict[oid][-1]
except KeyError:
pass
else:
if item.next_tid is None:
item.next_tid = tid
else:
assert item.next_tid <= tid, (item, oid, tid)
def clear_current(self):
oid_list = []
for oid, item_list in self._oid_dict.items():
item = item_list[-1]
if item.next_tid is None:
self._remove(item)
del item_list[-1]
# We don't preserve statistics of removed items. This could be
# done easily when previous versions are cached, by copying
# counters, but it would not be fair for other oids, so it's
# probably not worth it.
if not item_list:
del self._oid_dict[oid]
oid_list.append(oid)
return oid_list
def test(self):
cache = ClientCache()
self.assertEqual(cache.load(1, 10), None)
self.assertEqual(cache.load(1, None), None)
cache.invalidate(1, 10)
data = '5', 5, 10
# 2 identical stores happens if 2 threads got a cache miss at the same time
cache.store(1, *data)
cache.store(1, *data)
self.assertEqual(cache.load(1, 10), data)
self.assertEqual(cache.load(1, None), None)
data = '15', 15, None
cache.store(1, *data)
self.assertEqual(cache.load(1, None), data)
self.assertEqual(cache.clear_current(), [1])
self.assertEqual(cache.load(1, None), None)
cache.store(1, *data)
cache.invalidate(1, 20)
self.assertEqual(cache.clear_current(), [])
self.assertEqual(cache.load(1, 20), ('15', 15, 20))
cache.store(1, '10', 10, 15)
cache.store(1, '20', 20, 21)
self.assertEqual([5, 10, 15, 20], [x.tid for x in cache._oid_dict[1]])
if __name__ == '__main__':
import unittest
unittest.TextTestRunner().run(type('', (unittest.TestCase,), {
'runTest': test})())
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/component.xml 0000664 0000000 0000000 00000003022 12440376027 0027110 0 ustar 00root root 0000000 0000000
A scalable storage for Zope
Give the list of the master node like ip:port ip:port...
Give the name of the cluster
If true, data is automatically compressed (unless compressed size is
not smaller). This is the default behaviour.
If true, only reads may be executed against the storage. Note
that the "pack" operation is not considered a write operation
and is still allowed on a read-only neostorage.
Log debugging information to specified SQLite DB.
The file designated by this option contains an updated list of master
nodes which are known to be part of current cluster, so new nodes can
be added/removed without requiring a config change each time.
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/config.py 0000664 0000000 0000000 00000001660 12440376027 0026211 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB.config import BaseConfig
class NeoStorage(BaseConfig):
def open(self):
from .Storage import Storage
config = self.config
return Storage(**{k: getattr(config, k)
for k in config.getSectionAttributes()})
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/container.py 0000664 0000000 0000000 00000006712 12440376027 0026731 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import threading
from neo.lib.locking import Lock, Empty
from collections import deque
from ZODB.POSException import StorageTransactionError
class SimpleQueue(object):
"""
Similar to Queue.Queue but with simpler locking scheme, reducing lock
contention on "put" (benchmark shows 60% less time spent in "put").
As a result:
- only a single consumer possible ("get" vs. "get" race condition)
- only a single producer possible ("put" vs. "put" race condition)
- no blocking size limit possible
- no consumer -> producer notifications (task_done/join API)
Queue is on the critical path: any moment spent here increases client
application wait for object data, transaction completion, etc.
As we have a single consumer (client application's thread) and a single
producer (lib.dispatcher, which can be called from several threads but
serialises calls internally) for each queue, Queue.Queue's locking scheme
can be relaxed to reduce latency.
"""
__slots__ = ('_lock', '_unlock', '_popleft', '_append', '_queue')
def __init__(self):
lock = Lock()
self._lock = lock.acquire
self._unlock = lock.release
self._queue = queue = deque()
self._popleft = queue.popleft
self._append = queue.append
def get(self, block):
if block:
self._lock(False)
while True:
try:
return self._popleft()
except IndexError:
if not block:
raise Empty
self._lock()
def put(self, item):
self._append(item)
self._lock(False)
self._unlock()
def empty(self):
return not self._queue
class ThreadContainer(threading.local):
def __init__(self):
self.queue = SimpleQueue()
self.answer = None
class TransactionContainer(dict):
def pop(self, txn):
return dict.pop(self, id(txn), None)
def get(self, txn):
try:
return self[id(txn)]
except KeyError:
raise StorageTransactionError("unknown transaction %r" % txn)
def new(self, txn):
key = id(txn)
if key in self:
raise StorageTransactionError("commit of transaction %r"
" already started" % txn)
context = self[key] = {
'queue': SimpleQueue(),
'txn': txn,
'ttid': None,
'data_dict': {},
'data_size': 0,
'cache_dict': {},
'cache_size': 0,
'object_base_serial_dict': {},
'object_serial_dict': {},
'object_stored_counter_dict': {},
'conflict_serial_dict': {},
'resolved_conflict_serial_dict': {},
'involved_nodes': set(),
}
return context
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/exception.py 0000664 0000000 0000000 00000002407 12440376027 0026742 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB import POSException
class NEOStorageError(POSException.StorageError):
pass
class NEOStorageNotFoundError(NEOStorageError):
pass
class NEOStorageDoesNotExistError(NEOStorageNotFoundError):
"""
This error is a refinement of NEOStorageNotFoundError: this means
that some object was not found, but also that it does not exist at all.
"""
pass
class NEOStorageCreationUndoneError(NEOStorageDoesNotExistError):
"""
This error is a refinement of NEOStorageDoesNotExistError: this means that
some object existed at some point, but its creation was undone.
"""
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/handlers/ 0000775 0000000 0000000 00000000000 12440376027 0026167 5 ustar 00root root 0000000 0000000 neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/handlers/__init__.py 0000664 0000000 0000000 00000004617 12440376027 0030310 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib.handler import EventHandler
from neo.lib.protocol import ProtocolError, Packets
from ZODB.POSException import StorageError
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
def __init__(self, app):
super(BaseHandler, self).__init__(app)
self.dispatcher = app.dispatcher
def dispatch(self, conn, packet, kw={}):
assert conn._lock._is_owned()
super(BaseHandler, self).dispatch(conn, packet, kw)
def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread."""
if packet.isResponse() and type(packet) is not Packets.Pong:
if not self.dispatcher.dispatch(conn, packet.getId(), packet, kw):
raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet))
else:
self.dispatch(conn, packet, kw)
def connectionLost(self, conn, new_state):
self.app.dispatcher.unregister(conn)
def connectionFailed(self, conn):
self.app.dispatcher.unregister(conn)
def unexpectedInAnswerHandler(*args, **kw):
raise Exception('Unexpected event in an answer handler')
class AnswerBaseHandler(EventHandler):
connectionStarted = unexpectedInAnswerHandler
connectionCompleted = unexpectedInAnswerHandler
connectionFailed = unexpectedInAnswerHandler
connectionAccepted = unexpectedInAnswerHandler
timeoutExpired = unexpectedInAnswerHandler
connectionClosed = unexpectedInAnswerHandler
packetReceived = unexpectedInAnswerHandler
peerBroken = unexpectedInAnswerHandler
def protocolError(self, conn, message):
raise StorageError("protocol error: %s" % message)
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/handlers/master.py 0000664 0000000 0000000 00000017117 12440376027 0030043 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.pt import MTPartitionTable as PartitionTable
from neo.lib.protocol import NodeStates, Packets, ProtocolError
from neo.lib.util import dump, add64
from . import BaseHandler, AnswerBaseHandler
from ..exception import NEOStorageError
CHECKED_SERIAL = object()
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def notReady(self, conn, message):
app = self.app
app.trying_master_node = None
def _acceptIdentification(self, node, uuid, num_partitions,
num_replicas, your_uuid, primary, known_master_list):
app = self.app
# Register new master nodes.
found = False
conn_address = node.getAddress()
for node_address, node_uuid in known_master_list:
if node_address == conn_address:
assert uuid == node_uuid, (dump(uuid), dump(node_uuid))
found = True
n = app.nm.getByAddress(node_address)
if n is None:
n = app.nm.createMaster(address=node_address)
if node_uuid is not None and n.getUUID() != node_uuid:
n.setUUID(node_uuid)
assert found, (node, dump(uuid), known_master_list)
conn = node.getConnection()
if primary is not None:
primary_node = app.nm.getByAddress(primary)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
logging.warning('Unknown primary master: %s. Ignoring.',
primary)
return
else:
if app.trying_master_node is not primary_node:
app.trying_master_node = None
conn.close()
app.primary_master_node = primary_node
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
return
# the master must give an UUID
if your_uuid is None:
raise ProtocolError('No UUID supplied')
app.uuid = your_uuid
logging.info('Got an UUID: %s', dump(app.uuid))
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
def answerPartitionTable(self, conn, ptid, row_list):
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerNodeInformation(self, conn):
pass
def answerLastTransaction(self, conn, ltid):
pass
class PrimaryNotificationsHandler(BaseHandler):
""" Handler that process the notifications from the primary master """
def packetReceived(self, conn, packet, kw={}):
if type(packet) is Packets.AnswerLastTransaction:
app = self.app
ltid = packet.decode()[0]
if app.last_tid != ltid:
if app.master_conn is None:
app._cache_lock_acquire()
try:
oid_list = app._cache.clear_current()
db = app.getDB()
if db is not None:
db.invalidate(app.last_tid and
add64(app.last_tid, 1), oid_list)
finally:
app._cache_lock_release()
app.last_tid = ltid
elif type(packet) is Packets.AnswerTransactionFinished:
app = self.app
app.last_tid = tid = packet.decode()[1]
callback = kw.pop('callback')
# Update cache
cache = app._cache
app._cache_lock_acquire()
try:
for oid, data in kw.pop('cache_dict').iteritems():
if data is CHECKED_SERIAL:
# this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data
# was modified).
continue
# Update ex-latest value in cache
cache.invalidate(oid, tid)
if data is not None:
# Store in cache with no next_tid
cache.store(oid, data, tid, None)
if callback is not None:
callback(tid)
finally:
app._cache_lock_release()
BaseHandler.packetReceived(self, conn, packet, kw)
def connectionClosed(self, conn):
app = self.app
if app.master_conn is not None:
msg = "connection to primary master node closed"
logging.critical(msg)
app.master_conn = None
for txn_context in app.txn_contexts():
txn_context['error'] = msg
app.primary_master_node = None
super(PrimaryNotificationsHandler, self).connectionClosed(conn)
def stopOperation(self, conn):
logging.critical("master node ask to stop operation")
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
app.last_tid = tid
app._cache_lock_acquire()
try:
invalidate = app._cache.invalidate
loading = app._loading_oid
for oid in oid_list:
invalidate(oid, tid)
if oid == loading:
app._loading_oid = None
app._loading_invalidated = tid
db = app.getDB()
if db is not None:
db.invalidate(tid, oid_list)
finally:
app._cache_lock_release()
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list):
nm = self.app.nm
nm.update(node_list)
# XXX: 'update' automatically closes DOWN nodes. Do we really want
# to do the same thing for nodes in other non-running states ?
for node_type, addr, uuid, state in node_list:
if state != NodeStates.RUNNING:
node = nm.getByUUID(uuid)
if node and node.isConnected():
node.getConnection().close()
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
def answerBeginTransaction(self, conn, ttid):
self.app.setHandlerData(ttid)
def answerNewOIDs(self, conn, oid_list):
oid_list.reverse()
self.app.new_oid_list = oid_list
def answerTransactionFinished(self, conn, _, tid):
self.app.setHandlerData(tid)
def answerPack(self, conn, status):
if not status:
raise NEOStorageError('Already packing')
def answerLastTransaction(self, conn, ltid):
pass
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/handlers/storage.py 0000664 0000000 0000000 00000020210 12440376027 0030200 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB.TimeStamp import TimeStamp
from ZODB.POSException import ConflictError
from neo.lib import logging
from neo.lib.protocol import LockState, ZERO_TID
from neo.lib.util import dump
from neo.lib.exception import NodeNotReady
from . import BaseHandler, AnswerBaseHandler
from ..exception import NEOStorageError, NEOStorageNotFoundError
from ..exception import NEOStorageDoesNotExistError
class StorageEventHandler(BaseHandler):
def connectionLost(self, conn, new_state):
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
self.app.dispatcher.unregister(conn)
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionFailed(conn)
class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """
def notReady(self, conn, message):
raise NodeNotReady(message)
def _acceptIdentification(self, node,
uuid, num_partitions, num_replicas, your_uuid, primary,
master_list):
assert self.app.master_conn is None or \
primary == self.app.master_conn.getAddress(), (
primary, self.app.master_conn)
assert uuid == node.getUUID(), (uuid, node.getUUID())
class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """
def answerObject(self, conn, oid, *args):
self.app.setHandlerData(args)
def answerStoreObject(self, conn, conflicting, oid, serial):
txn_context = self.app.getHandlerData()
object_stored_counter_dict = txn_context[
'object_stored_counter_dict'][oid]
if conflicting:
# Warning: if a storage (S1) is much faster than another (S2), then
# we may process entirely a conflict with S1 (i.e. we received the
# answer to the store of the resolved object on S1) before we
# receive the conflict answer from the first store on S2.
logging.info('%r report a conflict for %r with %r',
conn, dump(oid), dump(serial))
# If this conflict is not already resolved, mark it for
# resolution.
if serial not in txn_context[
'resolved_conflict_serial_dict'].get(oid, ()):
if serial in object_stored_counter_dict and serial != ZERO_TID:
raise NEOStorageError('Storages %s accepted object %s'
' for serial %s but %s reports a conflict for it.' % (
map(dump, object_stored_counter_dict[serial]),
dump(oid), dump(serial), dump(conn.getUUID())))
conflict_serial_dict = txn_context['conflict_serial_dict']
conflict_serial_dict.setdefault(oid, set()).add(serial)
else:
uuid_set = object_stored_counter_dict.get(serial)
if uuid_set is None: # store to first storage node
object_stored_counter_dict[serial] = uuid_set = set()
try:
data = txn_context['data_dict'].pop(oid)
except KeyError: # multiple undo
assert txn_context['cache_dict'][oid] is None, oid
else:
if type(data) is str:
size = len(data)
txn_context['data_size'] -= size
size += txn_context['cache_size']
if size < self.app._cache._max_size:
txn_context['cache_size'] = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
txn_context['cache_dict'][oid] = data
else: # replica
assert oid not in txn_context['data_dict'], oid
uuid_set.add(conn.getUUID())
answerCheckCurrentSerial = answerStoreObject
def answerStoreTransaction(self, conn, _):
pass
def answerTIDsFrom(self, conn, tid_list):
logging.debug('Get %u TIDs from %r', len(tid_list), conn)
self.app.setHandlerData(tid_list)
def answerTransactionInformation(self, conn, tid,
user, desc, ext, packed, oid_list):
self.app.setHandlerData(({
'time': TimeStamp(tid).timeTime(),
'user_name': user,
'description': desc,
'id': tid,
'oids': oid_list,
'packed': packed,
}, ext))
def answerObjectHistory(self, conn, _, history_list):
# history_list is a list of tuple (serial, size)
self.app.setHandlerData(history_list)
def oidNotFound(self, conn, message):
# This can happen either when :
# - loading an object
# - asking for history
raise NEOStorageNotFoundError(message)
def oidDoesNotExist(self, conn, message):
raise NEOStorageDoesNotExistError(message)
def tidNotFound(self, conn, message):
# This can happen when requiring txn informations
raise NEOStorageNotFoundError(message)
def answerTIDs(self, conn, tid_list, tid_set):
tid_set.update(tid_list)
def answerObjectUndoSerial(self, conn, object_tid_dict,
undo_object_tid_dict):
undo_object_tid_dict.update(object_tid_dict)
def answerHasLock(self, conn, oid, status):
store_msg_id = self.app.getHandlerData()['timeout_dict'].pop(oid)
if status == LockState.GRANTED_TO_OTHER:
# Stop expecting the timed-out store request.
self.app.dispatcher.forget(conn, store_msg_id)
# Object is locked by another transaction, and we have waited until
# timeout. To avoid a deadlock, abort current transaction (we might
# be locking objects the other transaction is waiting for).
raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
dump(oid), conn)
# HasLock design required that storage is multi-threaded so that
# it can answer to AskHasLock while processing store resquests.
# This means that the 2 cases (granted to us or nobody) are legitimate,
# either because it gave us the lock but is/was slow to store our data,
# or because the storage took a lot of time processing a previous
# store (and did not even considered our lock request).
# XXX: But storage nodes are still mono-threaded, so they should
# only answer with GRANTED_TO_OTHER (if they reply!), except
# maybe in very rare cases of race condition. Only log for now.
# This also means that most of the time, if the storage is slow
# to process some store requests, HasLock will timeout in turn
# and the connector will be closed.
# Anyway, it's not clear that HasLock requests are useful.
# Are store requests potentially long to process ? If not,
# we should simply raise a ConflictError on store timeout.
logging.info('Store of oid %s delayed (storage overload ?)', dump(oid))
def alreadyPendingError(self, conn, message):
pass
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/iterator.py 0000664 0000000 0000000 00000005120 12440376027 0026570 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB import BaseStorage
from neo.lib.protocol import ZERO_TID, MAX_TID
from neo.lib.util import u64, add64
from .exception import NEOStorageCreationUndoneError, NEOStorageNotFoundError
CHUNK_LENGTH = 100
class Record(BaseStorage.DataRecord):
""" BaseStorage Transaction record yielded by the Transaction object """
def __str__(self):
oid = u64(self.oid)
tid = u64(self.tid)
args = (oid, tid, len(self.data), self.data_txn)
return 'Record %s:%s: %s (%s)' % args
class Transaction(BaseStorage.TransactionRecord):
""" Transaction object yielded by the NEO iterator """
def __init__(self, app, txn):
super(Transaction, self).__init__(txn['id'], ' ',
txn['user_name'], txn['description'], txn['ext'])
self.app = app
self.oid_list = txn['oids']
def __iter__(self):
""" Iterate over the transaction records """
load = self.app._loadFromStorage
for oid in self.oid_list:
try:
data, _, _, data_tid = load(oid, self.tid, None)
except NEOStorageCreationUndoneError:
data = data_tid = None
except NEOStorageNotFoundError:
# Transactions are not updated after a pack, so their object
# will not be found in the database. Skip them.
continue
yield Record(oid, self.tid, data, data_tid)
def __str__(self):
return 'Transaction #%s: %s %s' \
% (u64(self.tid), self.user, self.status)
def iterator(app, start=None, stop=None):
"""NEO transaction iterator"""
if start is None:
start = ZERO_TID
stop = min(stop or MAX_TID, app.lastTransaction())
while 1:
max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH)
if not chunk:
break # nothing more
for txn in chunk:
yield Transaction(app, txn)
start = add64(max_tid, 1)
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/poll.py 0000664 0000000 0000000 00000007176 12440376027 0025722 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from logging import DEBUG, ERROR
from threading import Thread, Event, enumerate as thread_enum
from neo.lib import logging
from neo.lib.locking import Lock
class _ThreadedPoll(Thread):
"""Polling thread."""
def __init__(self, em, **kw):
Thread.__init__(self, **kw)
self.em = em
self.daemon = True
self._stop = Event()
def run(self):
_log = logging.log
def log(*args, **kw):
# Ignore errors due to garbage collection on exit
try:
_log(*args, **kw)
except:
if not self.stopping():
raise
log(DEBUG, 'Started %s', self)
while not self.stopping():
try:
# XXX: Delay cannot be infinite here, because we need
# to check connection timeout and thread shutdown.
self.em.poll(1)
except:
log(ERROR, 'poll raised, retrying', exc_info=1)
log(DEBUG, 'Threaded poll stopped')
self._stop.clear()
def stop(self):
self._stop.set()
def stopping(self):
return self._stop.isSet()
class ThreadedPoll(object):
"""
Wrapper for polloing thread, just to be able to start it again when
it stopped.
"""
_thread = None
_started = False
def __init__(self, *args, **kw):
lock = Lock()
self._status_lock_acquire = lock.acquire
self._status_lock_release = lock.release
self._args = args
self._kw = kw
self.newThread()
def newThread(self):
self._thread = _ThreadedPoll(*self._args, **self._kw)
def start(self):
"""
Start thread if not started or restart it if it's shutting down.
"""
# TODO: a refcount-based approach would be better, but more intrusive.
self._status_lock_acquire()
try:
thread = self._thread
if thread.stopping():
# XXX: ideally, we should wake thread up here, to be sure not
# to wait forever.
thread.join()
if not thread.isAlive():
if self._started:
self.newThread()
else:
self._started = True
self._thread.start()
finally:
self._status_lock_release()
def stop(self):
self._status_lock_acquire()
try:
self._thread.stop()
finally:
self._status_lock_release()
def __getattr__(self, key):
return getattr(self._thread, key)
def __repr__(self):
return repr(self._thread)
def psThreadedPoll(log=None):
"""
Logs alive ThreadedPoll threads.
"""
if log is None:
log = logging.debug
for thread in thread_enum():
if not isinstance(thread, ThreadedPoll):
continue
log('Thread %s at 0x%x, %s', thread.getName(), id(thread),
thread._stop.isSet() and 'stopping' or 'running')
neoppod-9bd318038fd4d9697f469abd7f09b12a1fa6e85f-neo-client/neo/client/pool.py 0000664 0000000 0000000 00000015227 12440376027 0025721 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import time
from random import shuffle
from neo.lib import logging
from neo.lib.locking import RLock
from neo.lib.protocol import NodeTypes, Packets
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.exception import NodeNotReady
from .exception import NEOStorageError
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600
# Cell list sort keys
# We are connected to storage node hosting cell, high priority
CELL_CONNECTED = -1
# normal priority
CELL_GOOD = 0
# Storage node hosting cell failed recently, low priority
CELL_FAILED = 1
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
def __init__(self, app, max_pool_size = 25):
self.app = app
self.max_pool_size = max_pool_size
self.connection_dict = {}
# Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections
# to the same node.
l = RLock()
self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release
self.node_failure_dict = {}
def _initNodeConnection(self, node):
"""Init a connection to a given storage node."""
app = self.app
logging.debug('trying to connect to %s - %s', node, node.getState())
conn = MTClientConnection(app.em, app.storage_event_handler, node,
connector=app.connector_handler(), dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name)
try:
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
self.notifyFailure(node)
conn = None
except NodeNotReady:
logging.info('%r not ready', node)
self.notifyFailure(node)
conn = None
else:
logging.info('Connected %r', node)
return conn
def _dropConnections(self):
"""Drop connections."""
for conn in self.connection_dict.values():
# Drop first connection which looks not used
conn.lock()
try:
if not conn.pending() and \
not self.app.dispatcher.registered(conn):
del self.connection_dict[conn.getUUID()]
conn.close()
logging.debug('_dropConnections: connection to '
'storage node %s:%d closed', *conn.getAddress())
if len(self.connection_dict) <= self.max_pool_size:
break
finally:
conn.unlock()
def notifyFailure(self, node):
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell):
uuid = cell.getUUID()
if uuid in self.connection_dict:
return CELL_CONNECTED
failure = self.node_failure_dict.get(uuid)
if failure is None or failure < time.time():
return CELL_GOOD
return CELL_FAILED
def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode())
def iterateForObject(self, object_id, readable=False):
""" Iterate over nodes managing an object """
pt = self.app.pt
if type(object_id) is str:
object_id = pt.getPartition(object_id)
cell_list = pt.getCellList(object_id, readable)
if not cell_list:
raise NEOStorageError('no storage available')
getConnForNode = self.getConnForNode
while 1:
new_cell_list = []
# Shuffle to randomise node to access...
shuffle(cell_list)
# ...and sort with non-unique keys, to prioritise ranges of
# randomised entries.
cell_list.sort(key=self.getCellSortKey)
for cell in cell_list:
node = cell.getNode()
if node.isRunning():
conn = getConnForNode(node)
if conn is not None:
yield (node, conn)
# Re-check if node is running, as our knowledge of its
# state can have changed during connection attempt.
elif node.isRunning():
new_cell_list.append(cell)
if not new_cell_list:
break
cell_list = new_cell_list
# wait a bit to avoid a busy loop
time.sleep(1)
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
if node.isRunning():
uuid = node.getUUID()
try:
# Already connected to node
return self.connection_dict[uuid]
except KeyError:
self.connection_lock_acquire()
try:
# Second lookup, if another thread initiated connection
# while we were waiting for connection lock.
try:
return self.connection_dict[uuid]
except KeyError:
if len(self.connection_dict) > self.max_pool_size:
# must drop some unused connections
self._dropConnections()
# Create new connection to node
conn = self._initNodeConnection(node)
if conn is not None:
self.connection_dict[uuid] = conn
return conn
finally:
self.connection_lock_release()
def removeConnection(self, node):
"""Explicitly remove connection when a node is broken."""
self.connection_dict.pop(node.getUUID(), None)
def flush(self):
"""Remove all connections"""
self.connection_dict.clear()