pax_global_header 0000666 0000000 0000000 00000000064 11777665324 0014534 g ustar 00root root 0000000 0000000 52 comment=23fad3af8aeb9cec4382f8d8e08ac3b8c6219364
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/ 0000775 0000000 0000000 00000000000 11777665324 0022465 5 ustar 00root root 0000000 0000000 neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/ 0000775 0000000 0000000 00000000000 11777665324 0023246 5 ustar 00root root 0000000 0000000 neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/ 0000775 0000000 0000000 00000000000 11777665324 0024524 5 ustar 00root root 0000000 0000000 neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/Storage.py 0000664 0000000 0000000 00000023536 11777665324 0026513 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB import BaseStorage, ConflictResolution, POSException
from zope.interface import implements
import ZODB.interfaces
from functools import wraps
from neo.lib import logging
from neo.lib.util import add64
from neo.lib.protocol import ZERO_TID
from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
def check_read_only(func):
def wrapped(self, *args, **kw):
if self._is_read_only:
raise POSException.ReadOnlyError()
return func(self, *args, **kw)
return wraps(func)(wrapped)
class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
# Stores the highest TID visible for current transaction.
# First call sets this snapshot by asking master node most recent
# committed TID.
# As a (positive) side-effect, this forces us to handle all pending
# invalidations, so we get a very recent view of the database (which is
# good when multiple databases are used in the same program with some
# amount of referential integrity).
# Should remain None when not bound to a connection,
# so that it always read the last revision.
_snapshot_tid = None
implements(*filter(None, (
ZODB.interfaces.IStorage,
# "restore" missing for the moment, but "store" implements this
# interface.
# ZODB.interfaces.IStorageRestoreable,
# XXX: imperfect iterator implementation:
# - start & stop are not handled (raises if either is not None)
# - transaction isolation is not done
# ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageUndoable,
ZODB.interfaces.IExternalGC,
getattr(ZODB.interfaces, 'ReadVerifyingStorage', None), # BBB ZODB 3.9
)))
def __init__(self, master_nodes, name, read_only=False,
compress=None, logfile=None, _app=None,
dynamic_master_list=None, **kw):
"""
Do not pass those parameters (used internally):
_app
"""
if compress is None:
compress = True
if logfile:
logging.setup(logfile)
BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
if _app is None:
_app = Application(master_nodes, name, compress=compress,
dynamic_master_list=dynamic_master_list)
self.app = _app
# Used to clone self (see new_instance & IMVCCStorage definition).
self._init_args = (master_nodes, name)
self._init_kw = {
'read_only': read_only,
'compress': compress,
'dynamic_master_list': dynamic_master_list,
'_app': _app,
}
@property
def _cache(self):
return self.app._cache
def load(self, oid, version=''):
# In order to know if it was safe to get the last revision of an object
# instead of using loadBefore(), ZODB.Connection._setstate relies on
# the fact that retrieving data from a remote storage forces incoming
# invalidations to be received.
# But in NEO, invalidations are not received from the same network
# connection that the one used to retrieve data.
# So we must implement load() like a loadBefore().
# XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make
# it optional.
assert version == '', 'Versions are not supported'
try:
return self.app.load(oid, None, self._snapshot_tid)[:2]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
@check_read_only
def new_oid(self):
return self.app.new_oid()
@check_read_only
def tpc_begin(self, transaction, tid=None, status=' '):
"""
Note: never blocks in NEO.
"""
return self.app.tpc_begin(transaction=transaction, tid=tid,
status=status)
@check_read_only
def tpc_vote(self, transaction):
return self.app.tpc_vote(transaction=transaction,
tryToResolveConflict=self.tryToResolveConflict)
@check_read_only
def tpc_abort(self, transaction):
return self.app.tpc_abort(transaction=transaction)
def tpc_finish(self, transaction, f=None):
tid = self.app.tpc_finish(transaction=transaction,
tryToResolveConflict=self.tryToResolveConflict, f=f)
# XXX: Note that when undoing changes, the following is useless because
# a temporary Storage object is used to commit.
# See also testZODB.NEOZODBTests.checkMultipleUndoInOneTransaction
if self._snapshot_tid:
self._snapshot_tid = add64(tid, 1)
return tid
@check_read_only
def store(self, oid, serial, data, version, transaction):
assert version == '', 'Versions are not supported'
return self.app.store(oid=oid, serial=serial,
data=data, version=version, transaction=transaction)
@check_read_only
def deleteObject(self, oid, serial, transaction):
self.app.store(oid, serial, None, None, transaction)
# mutliple revisions
def loadSerial(self, oid, serial):
try:
return self.app.load(oid, serial)[0]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def loadBefore(self, oid, tid):
try:
return self.app.load(oid, None, tid)
except NEOStorageDoesNotExistError:
raise POSException.POSKeyError(oid)
except NEOStorageNotFoundError:
return None
def iterator(self, start=None, stop=None):
# Iterator lives in its own transaction, so get a fresh snapshot.
snapshot_tid = self.lastTransaction()
if stop is None:
stop = snapshot_tid
else:
stop = min(snapshot_tid, stop)
return self.app.iterator(start, stop)
# undo
@check_read_only
def undo(self, transaction_id, txn):
return self.app.undo(self._snapshot_tid, undone_tid=transaction_id,
txn=txn, tryToResolveConflict=self.tryToResolveConflict)
@check_read_only
def undoLog(self, first=0, last=-20, filter=None):
return self.app.undoLog(first, last, filter)
def supportsUndo(self):
return True
def supportsTransactionalUndo(self):
return True
@check_read_only
def abortVersion(self, src, transaction):
return self.app.abortVersion(src, transaction)
@check_read_only
def commitVersion(self, src, dest, transaction):
return self.app.commitVersion(src, dest, transaction)
def loadEx(self, oid, version):
try:
data, serial, _ = self.app.load(oid, None, self._snapshot_tid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
return data, serial, ''
def __len__(self):
return self.app.getStorageSize()
def registerDB(self, db, limit=None):
self.app.registerDB(db, limit)
def history(self, oid, *args, **kw):
try:
return self.app.history(oid, *args, **kw)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def sync(self, force=True):
# Increment by one, as we will use this as an excluded upper
# bound (loadBefore).
self._snapshot_tid = add64(self.lastTransaction(), 1)
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
return self.app.importFrom(source, None, None,
self.tryToResolveConflict)
def importFrom(self, source, start=None, stop=None, preindex=None):
""" Allow import only a part of the source storage """
return self.app.importFrom(source, start, stop,
self.tryToResolveConflict, preindex)
def restore(self, oid, serial, data, version, prev_txn, transaction):
raise NotImplementedError
def pack(self, t, referencesf, gc=False):
if gc:
logging.warning('Garbage Collection is not available in NEO,'
' please use an external tool. Packing without GC.')
self.app.pack(t)
def lastSerial(self):
# seems unused
raise NotImplementedError
def lastTransaction(self):
# Used in ZODB unit tests
return self.app.lastTransaction()
def _clear_temp(self):
raise NotImplementedError
def set_max_oid(self, possible_new_max_oid):
# seems used only by FileStorage
raise NotImplementedError
def cleanup(self):
# Used in unit tests to remove local database files.
# We have no such thing, so make this method a no-op.
pass
def close(self):
self.app.close()
def getTid(self, oid):
try:
return self.app.getLastTID(oid)
except NEOStorageNotFoundError:
raise KeyError
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
self.app.checkCurrentSerialInTransaction(oid, serial, transaction)
def new_instance(self):
return Storage(*self._init_args, **self._init_kw)
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/__init__.py 0000664 0000000 0000000 00000012674 11777665324 0026647 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (C) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# At the moment, no ZODB release include the following patches.
# Later, this must be replaced by some detection mechanism.
if 1:
from ZODB.Connection import Connection
# Allow serial to be returned as late as tpc_finish
#
# This makes possible for storage to allocate serial inside tpc_finish,
# removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort).
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert isinstance(serial, str), repr(serial)
for oid_iterator in (self._modified, self._creating):
for oid in oid_iterator:
obj = self._cache.get(oid, None)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
self._tpc_cleanup()
Connection.tpc_finish = tpc_finish
###
try:
if Connection._nexedi_fix != 5:
raise Exception("A different ZODB fix is already applied")
except AttributeError:
Connection._nexedi_fix = 5
# Whenever an connection is opened (and there's usually an existing one
# in DB pool that can be reused) whereas the transaction is already
# started, we must make sure that proper storage setup is done by
# calling Connection.newTransaction.
# For example, there's no open transaction when a ZPublisher/Publish
# transaction begins.
def open(self, *args, **kw):
def _flush_invalidations():
acquire = self._db._a
try:
self._db._r() # this is a RLock
except RuntimeError:
acquire = lambda: None
try:
del self._flush_invalidations
self.newTransaction()
finally:
acquire()
self._flush_invalidations = _flush_invalidations
self._flush_invalidations = _flush_invalidations
try:
Connection_open(self, *args, **kw)
finally:
del self._flush_invalidations
Connection_open = Connection.open
Connection.open = open
# Storage.sync usually implements a "network barrier" (at least
# in NEO, but ZEO should be fixed to do the same), which is quite
# slow so we prefer to not call it where it's not useful.
# I don't know any legitimate use of DB access outside a transaction.
# But old versions of ERP5 (before 2010-10-29 17:15:34) and maybe other
# applications do not always call 'transaction.begin()' when they should
# so this patch disabled as a precaution, at least as long as we support
# old software. This should also be discussed on zodb-dev ML first.
def afterCompletion(self, *ignored):
try:
self._readCurrent.clear()
except AttributeError: # BBB: ZODB < 3.10
pass
self._flush_invalidations()
#Connection.afterCompletion = afterCompletion
class _DB(object):
"""
Wrapper to DB instance that properly initialize Connection objects
with NEO storages.
It forces the connection to always create a new instance of the
storage, because we don't implement IMVCCStorage completely.
"""
def __new__(cls, db, connection):
if db._storage.__class__.__module__ != 'neo.client.Storage':
return db
self = object.__new__(cls)
self._db = db
self._connection = connection
return self
def __getattr__(self, attr):
result = getattr(self._db, attr)
if attr == 'storage':
self.storage = result = result.new_instance()
self._connection._db = self._db
return result
Connection_init = Connection.__init__
Connection.__init__ = lambda self, db, *args, **kw: \
Connection_init(self, _DB(db, self), *args, **kw)
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/app.py 0000664 0000000 0000000 00000127766 11777665324 0025701 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from cPickle import dumps, loads
from zlib import compress as real_compress, decompress
from neo.lib.locking import Empty
from random import shuffle
import heapq
import time
import os
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.POSException import ReadConflictError
from ZODB.ConflictResolution import ResolvedSerial
from persistent.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, ZERO_HASH, ZERO_TID
from neo.lib.event import EventManager
from neo.lib.util import makeChecksum as real_makeChecksum, dump
from neo.lib.locking import Lock
from neo.lib.connection import MTClientConnection, OnTimeout, ConnectionClosed
from neo.lib.node import NodeManager
from neo.lib.connector import getConnectorHandler
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
from neo.lib.exception import NeoException
from .handlers import storage, master
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from .poll import ThreadedPoll, psThreadedPoll
from .iterator import Iterator
from .cache import ClientCache
from .pool import ConnectionPool
from neo.lib.util import u64, parseMasterList
from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
from neo.lib.debug import register as registerLiveDebugger
from .container import ThreadContainer, TransactionContainer
if PROFILING_ENABLED:
# Those functions require a "real" python function wrapper before they can
# be decorated.
@profiler_decorator
def compress(data):
return real_compress(data)
@profiler_decorator
def makeChecksum(data):
return real_makeChecksum(data)
else:
# If profiling is disabled, directly use original functions.
compress = real_compress
makeChecksum = real_makeChecksum
CHECKED_SERIAL = object()
class Application(object):
"""The client node application."""
def __init__(self, master_nodes, name, compress=True,
dynamic_master_list=None, **kw):
# Start polling thread
self.em = EventManager()
self.poll_thread = ThreadedPoll(self.em, name=name)
psThreadedPoll()
# Internal Attributes common to all thread
self._db = None
self.name = name
master_addresses, connector_name = parseMasterList(master_nodes)
self.connector_handler = getConnectorHandler(connector_name)
self.dispatcher = Dispatcher(self.poll_thread)
self.nm = NodeManager(dynamic_master_list)
self.cp = ConnectionPool(self)
self.pt = None
self.master_conn = None
self.primary_master_node = None
self.trying_master_node = None
# load master node list
for address in master_addresses:
self.nm.createMaster(address=address)
# no self-assigned UUID, primary master will supply us one
self.uuid = None
self._cache = ClientCache()
self.new_oid_list = []
self.last_oid = '\0' * 8
self.storage_event_handler = storage.StorageEventHandler(self)
self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
self.storage_handler = storage.StorageAnswersHandler(self)
self.primary_handler = master.PrimaryAnswersHandler(self)
self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
self.notifications_handler = master.PrimaryNotificationsHandler( self)
# Internal attribute distinct between thread
self._thread_container = ThreadContainer()
self._txn_container = TransactionContainer()
# Lock definition :
# _load_lock is used to make loading and storing atomic
lock = Lock()
self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release
# _oid_lock is used in order to not call multiple oid
# generation at the same time
lock = Lock()
self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release
lock = Lock()
# _cache_lock is used for the client cache
self._cache_lock_acquire = lock.acquire
self._cache_lock_release = lock.release
lock = Lock()
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attemps
self._connecting_to_master_node_acquire = lock.acquire
self._connecting_to_master_node_release = lock.release
# _nm ensure exclusive access to the node manager
lock = Lock()
self._nm_acquire = lock.acquire
self._nm_release = lock.release
self.compress = compress
registerLiveDebugger(on_log=self.log)
def getHandlerData(self):
return self._thread_container.get()['answer']
def setHandlerData(self, data):
self._thread_container.get()['answer'] = data
def _getThreadQueue(self):
return self._thread_container.get()['queue']
def log(self):
self.em.log()
self.nm.log()
if self.pt is not None:
self.pt.log()
@profiler_decorator
def _handlePacket(self, conn, packet, kw={}, handler=None):
"""
conn
The connection which received the packet (forwarded to handler).
packet
The packet to handle.
handler
The handler to use to handle packet.
If not given, it will be guessed from connection's not type.
"""
if handler is None:
# Guess the handler to use based on the type of node on the
# connection
node = self.nm.getByAddress(conn.getAddress())
if node is None:
raise ValueError, 'Expecting an answer from a node ' \
'which type is not known... Is this right ?'
if node.isStorage():
handler = self.storage_handler
elif node.isMaster():
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (node.__class__, )
conn.lock()
try:
handler.dispatch(conn, packet, kw)
finally:
conn.unlock()
@profiler_decorator
def _waitAnyMessage(self, queue, block=True):
"""
Handle all pending packets.
block
If True (default), will block until at least one packet was
received.
"""
pending = self.dispatcher.pending
get = queue.get
_handlePacket = self._handlePacket
while pending(queue):
try:
conn, packet, kw = get(block)
except Empty:
break
if packet is None or isinstance(packet, ForgottenPacket):
# connection was closed or some packet was forgotten
continue
block = False
try:
_handlePacket(conn, packet, kw)
except ConnectionClosed:
pass
def _waitAnyTransactionMessage(self, txn_context, block=True):
"""
Just like _waitAnyMessage, but for per-transaction exchanges, rather
than per-thread.
"""
queue = txn_context['queue']
self.setHandlerData(txn_context)
try:
self._waitAnyMessage(queue, block=block)
finally:
# Don't leave access to thread context, even if a raise happens.
self.setHandlerData(None)
@profiler_decorator
def _ask(self, conn, packet, handler=None):
self.setHandlerData(None)
queue = self._getThreadQueue()
msg_id = conn.ask(packet, queue=queue)
get = queue.get
_handlePacket = self._handlePacket
while True:
qconn, qpacket, kw = get(True)
is_forgotten = isinstance(qpacket, ForgottenPacket)
if conn is qconn:
# check fake packet
if qpacket is None:
raise ConnectionClosed
if msg_id == qpacket.getId():
if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \
'explicitely expected packet.'
_handlePacket(qconn, qpacket, kw, handler)
break
if not is_forgotten and qpacket is not None:
_handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
@profiler_decorator
def _askStorage(self, conn, packet):
""" Send a request to a storage node and process its answer """
return self._ask(conn, packet, handler=self.storage_handler)
@profiler_decorator
def _askPrimary(self, packet):
""" Send a request to the primary master and process its answer """
return self._ask(self._getMasterConnection(), packet,
handler=self.primary_handler)
@profiler_decorator
def _getMasterConnection(self):
""" Connect to the primary master node on demand """
# acquire the lock to allow only one thread to connect to the primary
result = self.master_conn
if result is None:
self._connecting_to_master_node_acquire()
try:
self.new_oid_list = []
result = self._connectToPrimaryNode()
finally:
self._connecting_to_master_node_release()
return result
def getPartitionTable(self):
""" Return the partition table manager, reconnect the PMN if needed """
# this ensure the master connection is established and the partition
# table is up to date.
self._getMasterConnection()
return self.pt
@profiler_decorator
def _connectToPrimaryNode(self):
"""
Lookup for the current primary master node
"""
logging.debug('connecting to primary master...')
ready = False
nm = self.nm
while not ready:
# Get network connection to primary master
index = 0
connected = False
while not connected:
if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node
self.primary_master_node = None
else:
# Otherwise, check one by one.
master_list = nm.getMasterList()
try:
self.trying_master_node = master_list[index]
except IndexError:
time.sleep(1)
index = 0
self.trying_master_node = master_list[0]
index += 1
# Connect to master
conn = MTClientConnection(self.em,
self.notifications_handler,
node=self.trying_master_node,
connector=self.connector_handler(),
dispatcher=self.dispatcher)
# Query for primary master node
if conn.getConnector() is None:
# This happens if a connection could not be established.
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
try:
self._ask(conn, Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name),
handler=self.primary_bootstrap_handler)
except ConnectionClosed:
continue
# If we reached the primary master node, mark as connected
connected = self.primary_master_node is not None and \
self.primary_master_node is self.trying_master_node
logging.info('Connected to %s', self.primary_master_node)
try:
ready = self.identifyToPrimaryNode(conn)
except ConnectionClosed:
logging.error('Connection to %s lost', self.trying_master_node)
self.primary_master_node = None
logging.info("Connected and ready")
return conn
def identifyToPrimaryNode(self, conn):
"""
Request identification and required informations to be operational.
Might raise ConnectionClosed so that the new primary can be
looked-up again.
"""
logging.info('Initializing from master')
ask = self._ask
handler = self.primary_bootstrap_handler
ask(conn, Packets.AskNodeInformation(), handler=handler)
ask(conn, Packets.AskPartitionTable(), handler=handler)
return self.pt.operational()
def registerDB(self, db, limit):
self._db = db
def getDB(self):
return self._db
@profiler_decorator
def new_oid(self):
"""Get a new OID."""
self._oid_lock_acquire()
try:
if len(self.new_oid_list) == 0:
# Get new oid list from master node
# we manage a list of oid here to prevent
# from asking too many time new oid one by one
# from master node
self._askPrimary(Packets.AskNewOIDs(100))
if len(self.new_oid_list) <= 0:
raise NEOStorageError('new_oid failed')
self.last_oid = self.new_oid_list.pop(0)
return self.last_oid
finally:
self._oid_lock_release()
def getStorageSize(self):
# return the last OID used, this is innacurate
return int(u64(self.last_oid))
@profiler_decorator
def load(self, oid, tid=None, before_tid=None):
"""
Internal method which manage load, loadSerial and loadBefore.
OID and TID (serial) parameters are expected packed.
oid
OID of object to get.
tid
If given, the exact serial at which OID is desired.
before_tid should be None.
before_tid
If given, the excluded upper bound serial at which OID is desired.
serial should be None.
Return value: (3-tuple)
- Object data (None if object creation was undone).
- Serial of given data.
- Next serial at which object exists, or None. Only set when tid
parameter is not None.
Exceptions:
NEOStorageError
technical problem
NEOStorageNotFoundError
object exists but no data satisfies given parameters
NEOStorageDoesNotExistError
object doesn't exist
NEOStorageCreationUndoneError
object existed, but its creation was undone
Note that loadSerial is used during conflict resolution to load
object's current version, which is not visible to us normaly (it was
committed after our snapshot was taken).
"""
# TODO:
# - rename parameters (here? and in handlers & packet definitions)
self._load_lock_acquire()
try:
result = self._loadFromCache(oid, tid, before_tid)
if not result:
result = self._loadFromStorage(oid, tid, before_tid)
self._cache_lock_acquire()
try:
self._cache.store(oid, *result)
finally:
self._cache_lock_release()
return result
finally:
self._load_lock_release()
@profiler_decorator
def _loadFromStorage(self, oid, at_tid, before_tid):
packet = Packets.AskObject(oid, at_tid, before_tid)
for node, conn in self.cp.iterateForObject(oid, readable=True):
try:
noid, tid, next_tid, compression, checksum, data \
= self._askStorage(conn, packet)
except ConnectionClosed:
continue
if data or checksum != ZERO_HASH:
if checksum != makeChecksum(data):
logging.error('wrong checksum from %s for oid %s',
conn, dump(oid))
continue
if compression:
data = decompress(data)
return data, tid, next_tid
raise NEOStorageCreationUndoneError(dump(oid))
# We didn't got any object from all storage node because of
# connection error
raise NEOStorageError('connection failure')
@profiler_decorator
def _loadFromCache(self, oid, at_tid=None, before_tid=None):
"""
Load from local cache, return None if not found.
"""
self._cache_lock_acquire()
try:
if at_tid:
result = self._cache.load(oid, at_tid + '*')
assert not result or result[1] == at_tid
return result
return self._cache.load(oid, before_tid)
finally:
self._cache_lock_release()
@profiler_decorator
def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction."""
txn_container = self._txn_container
# First get a transaction, only one is allowed at a time
if txn_container.get(transaction) is not None:
# We already begin the same transaction
raise StorageTransactionError('Duplicate tpc_begin calls')
txn_context = txn_container.new(transaction)
# use the given TID or request a new one to the master
answer_ttid = self._askPrimary(Packets.AskBeginTransaction(tid))
if answer_ttid is None:
raise NEOStorageError('tpc_begin failed')
assert tid in (None, answer_ttid), (tid, answer_ttid)
txn_context['txn'] = transaction
txn_context['ttid'] = answer_ttid
@profiler_decorator
def store(self, oid, serial, data, version, transaction):
"""Store object."""
txn_context = self._txn_container.get(transaction)
if txn_context is None:
raise StorageTransactionError(self, transaction)
logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
self._store(txn_context, oid, serial, data)
return None
def _store(self, txn_context, oid, serial, data, data_serial=None,
unlock=False):
ttid = txn_context['ttid']
if data is None:
# This is some undo: either a no-data object (undoing object
# creation) or a back-pointer to an earlier revision (going back to
# an older object revision).
compressed_data = ''
compression = 0
checksum = ZERO_HASH
else:
assert data_serial is None
compression = self.compress
compressed_data = data
size = len(data)
if self.compress:
compressed_data = compress(data)
if size < len(compressed_data):
compressed_data = data
compression = 0
else:
compression = 1
checksum = makeChecksum(compressed_data)
txn_context['data_size'] += size
on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid)
# Store object in tmp cache
txn_context['data_dict'][oid] = data
# Store data on each node
txn_context['object_stored_counter_dict'][oid] = {}
object_base_serial_dict = txn_context['object_base_serial_dict']
if oid not in object_base_serial_dict:
object_base_serial_dict[oid] = serial
txn_context['object_serial_dict'][oid] = serial
queue = txn_context['queue']
involved_nodes = txn_context['involved_nodes']
add_involved_nodes = involved_nodes.add
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid, unlock)
for node, conn in self.cp.iterateForObject(oid):
try:
conn.ask(packet, on_timeout=on_timeout, queue=queue)
add_involved_nodes(node)
except ConnectionClosed:
continue
if not involved_nodes:
raise NEOStorageError("Store failed")
while txn_context['data_size'] >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context)
self._waitAnyTransactionMessage(txn_context, False)
def onStoreTimeout(self, conn, msg_id, txn_context, oid):
# NOTE: this method is called from poll thread, don't use
# thread-specific value !
txn_context.setdefault('timeout_dict', {})[oid] = msg_id
# Ask the storage if someone locks the object.
# By sending a message with a smaller timeout,
# the connection will be kept open.
conn.ask(Packets.AskHasLock(txn_context['ttid'], oid),
timeout=5, queue=txn_context['queue'])
@profiler_decorator
def _handleConflicts(self, txn_context, tryToResolveConflict):
result = []
append = result.append
# Check for conflicts
data_dict = txn_context['data_dict']
object_base_serial_dict = txn_context['object_base_serial_dict']
object_serial_dict = txn_context['object_serial_dict']
conflict_serial_dict = txn_context['conflict_serial_dict'].copy()
txn_context['conflict_serial_dict'].clear()
resolved_conflict_serial_dict = txn_context[
'resolved_conflict_serial_dict']
for oid, conflict_serial_set in conflict_serial_dict.iteritems():
conflict_serial = max(conflict_serial_set)
serial = object_serial_dict[oid]
if ZERO_TID in conflict_serial_set:
if 1:
# XXX: disable deadlock avoidance code until it is fixed
logging.info('Deadlock avoidance on %r:%r',
dump(oid), dump(serial))
# 'data' parameter of ConflictError is only used to report the
# class of the object. It doesn't matter if 'data' is None
# because the transaction is too big.
try:
data = data_dict[oid]
except KeyError:
data = txn_context['cache_dict'][oid]
else:
# Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of
# "locking priority": when a higher value has the lock,
# this means we stored objects "too late", and we would
# otherwise cause a deadlock.
# To recover, we must ask storages to release locks we
# hold (to let possibly-competing transactions acquire
# them), and requeue our already-sent store requests.
# XXX: currently, brute-force is implemented: we send
# object data again.
# WARNING: not maintained code
logging.info('Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial))
for store_oid, store_data in data_dict.iteritems():
store_serial = object_serial_dict[store_oid]
if store_data is CHECKED_SERIAL:
self._checkCurrentSerialInTransaction(txn_context,
store_oid, store_serial)
else:
if store_data is None:
# Some undo
logging.warning('Deadlock avoidance cannot reliably'
' work with undo, this must be implemented.')
conflict_serial = ZERO_TID
break
self._store(txn_context, store_oid, store_serial,
store_data, unlock=True)
else:
continue
else:
data = data_dict.pop(oid)
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial))
if data: # XXX: can 'data' be None ???
txn_context['data_size'] -= len(data)
resolved_serial_set = resolved_conflict_serial_dict.setdefault(
oid, set())
if resolved_serial_set and conflict_serial <= max(
resolved_serial_set):
# A later serial has already been resolved, skip.
resolved_serial_set.update(conflict_serial_set)
continue
new_data = tryToResolveConflict(oid, conflict_serial,
serial, data)
if new_data is not None:
logging.info('Conflict resolution succeed for '
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
# Mark this conflict as resolved
resolved_serial_set.update(conflict_serial_set)
# Base serial changes too, as we resolved a conflict
object_base_serial_dict[oid] = conflict_serial
# Try to store again
self._store(txn_context, oid, conflict_serial, new_data)
append(oid)
continue
else:
logging.info('Conflict resolution failed for '
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
raise ConflictError(oid=oid, serials=(txn_context['ttid'],
serial), data=data)
return result
@profiler_decorator
def waitResponses(self, queue):
"""Wait for all requests to be answered (or their connection to be
detected as closed)"""
pending = self.dispatcher.pending
_waitAnyMessage = self._waitAnyMessage
while pending(queue):
_waitAnyMessage(queue)
@profiler_decorator
def waitStoreResponses(self, txn_context, tryToResolveConflict):
result = []
append = result.append
resolved_oid_set = set()
update = resolved_oid_set.update
ttid = txn_context['ttid']
_handleConflicts = self._handleConflicts
queue = txn_context['queue']
conflict_serial_dict = txn_context['conflict_serial_dict']
pending = self.dispatcher.pending
_waitAnyTransactionMessage = self._waitAnyTransactionMessage
while pending(queue) or conflict_serial_dict:
# Note: handler data can be overwritten by _handleConflicts
# so we must set it for each iteration.
_waitAnyTransactionMessage(txn_context)
if conflict_serial_dict:
conflicts = _handleConflicts(txn_context,
tryToResolveConflict)
if conflicts:
update(conflicts)
# Check for never-stored objects, and update result for all others
for oid, store_dict in \
txn_context['object_stored_counter_dict'].iteritems():
if not store_dict:
logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set:
append((oid, ResolvedSerial))
return result
@profiler_decorator
def tpc_vote(self, transaction, tryToResolveConflict):
"""Store current transaction."""
txn_context = self._txn_container.get(transaction)
if txn_context is None or transaction is not txn_context['txn']:
raise StorageTransactionError(self, transaction)
result = self.waitStoreResponses(txn_context, tryToResolveConflict)
ttid = txn_context['ttid']
# Store data on each node
txn_stored_counter = 0
assert not txn_context['data_dict'], txn_context
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
txn_context['cache_dict'])
add_involved_nodes = txn_context['involved_nodes'].add
for node, conn in self.cp.iterateForObject(ttid):
logging.debug("voting object %s on %s", dump(ttid),
dump(conn.getUUID()))
try:
self._askStorage(conn, packet)
except ConnectionClosed:
continue
add_involved_nodes(node)
txn_stored_counter += 1
# check at least one storage node accepted
if txn_stored_counter == 0:
logging.error('tpc_vote failed')
raise NEOStorageError('tpc_vote failed')
# Check if master connection is still alive.
# This is just here to lower the probability of detecting a problem
# in tpc_finish, as we should do our best to detect problem before
# tpc_finish.
self._getMasterConnection()
txn_context['txn_voted'] = True
return result
@profiler_decorator
def tpc_abort(self, transaction):
"""Abort current transaction."""
txn_container = self._txn_container
txn_context = txn_container.get(transaction)
if txn_context is None:
return
ttid = txn_context['ttid']
p = Packets.AbortTransaction(ttid)
getConnForNode = self.cp.getConnForNode
# cancel transaction one all those nodes
for node in txn_context['involved_nodes']:
conn = getConnForNode(node)
if conn is None:
continue
try:
conn.notify(p)
except:
logging.exception('Exception in tpc_abort while notifying'
'storage node %r of abortion, ignoring.', conn)
self._getMasterConnection().notify(p)
queue = txn_context['queue']
# We don't need to flush queue, as it won't be reused by future
# transactions (deleted on next line & indexed by transaction object
# instance).
self.dispatcher.forget_queue(queue, flush_queue=False)
txn_container.delete(transaction)
@profiler_decorator
def tpc_finish(self, transaction, tryToResolveConflict, f=None):
"""Finish current transaction."""
txn_container = self._txn_container
txn_context = txn_container.get(transaction)
if txn_context is None:
raise StorageTransactionError('tpc_finish called for wrong '
'transaction')
if not txn_context['txn_voted']:
self.tpc_vote(transaction, tryToResolveConflict)
self._load_lock_acquire()
try:
# Call finish on master
cache_dict = txn_context['cache_dict']
tid = self._askPrimary(Packets.AskFinishTransaction(
txn_context['ttid'], cache_dict))
# Call function given by ZODB
if f is not None:
f(tid)
# Update cache
self._cache_lock_acquire()
try:
cache = self._cache
for oid, data in cache_dict.iteritems():
if data is CHECKED_SERIAL:
# this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data
# was modified).
continue
# Update ex-latest value in cache
cache.invalidate(oid, tid)
if data is not None:
# Store in cache with no next_tid
cache.store(oid, data, tid, None)
finally:
self._cache_lock_release()
txn_container.delete(transaction)
return tid
finally:
self._load_lock_release()
def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
txn_context = self._txn_container.get(txn)
if txn_context is None:
raise StorageTransactionError(self, undone_tid)
txn_info, txn_ext = self._getTransactionInformation(undone_tid)
txn_oid_list = txn_info['oids']
# Regroup objects per partition, to ask a minimum set of storage.
partition_oid_dict = {}
pt = self.getPartitionTable()
for oid in txn_oid_list:
partition = pt.getPartition(oid)
try:
oid_list = partition_oid_dict[partition]
except KeyError:
oid_list = partition_oid_dict[partition] = []
oid_list.append(oid)
# Ask storage the undo serial (serial at which object's previous data
# is)
getCellList = pt.getCellList
getCellSortKey = self.cp.getCellSortKey
getConnForCell = self.cp.getConnForCell
queue = self._getThreadQueue()
ttid = txn_context['ttid']
undo_object_tid_dict = {}
for partition, oid_list in partition_oid_dict.iteritems():
cell_list = getCellList(partition, readable=True)
# We do want to shuffle before getting one with the smallest
# key, so that all cells with the same (smallest) key has
# identical chance to be chosen.
shuffle(cell_list)
storage_conn = getConnForCell(min(cell_list, key=getCellSortKey))
storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
snapshot_tid, undone_tid, oid_list),
queue=queue, undo_object_tid_dict=undo_object_tid_dict)
# Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
# meaning that objects in transaction's oid_list do not exist any
# longer. This is the symptom of a pack, so forbid undoing transaction
# when it happens.
try:
self.waitResponses(queue)
except NEOStorageNotFoundError:
self.dispatcher.forget_queue(queue)
raise UndoError('non-undoable transaction')
# Send undo data to all storage nodes.
for oid in txn_oid_list:
current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
if is_current:
data = None
else:
# Serial being undone is not the latest version for this
# object. This is an undo conflict, try to resolve it.
try:
# Load the latest version we are supposed to see
data = self.load(oid, current_serial)[0]
# Load the version we were undoing to
undo_data = self.load(oid, undo_serial)[0]
except NEOStorageNotFoundError:
raise UndoError('Object not found while resolving undo '
'conflict')
# Resolve conflict
try:
data = tryToResolveConflict(oid, current_serial,
undone_tid, undo_data, data)
except ConflictError:
data = None
if data is None:
raise UndoError('Some data were modified by a later ' \
'transaction', oid)
undo_serial = None
self._store(txn_context, oid, current_serial, data, undo_serial)
return None, txn_oid_list
def _insertMetadata(self, txn_info, extension):
for k, v in loads(extension).items():
txn_info[k] = v
def _getTransactionInformation(self, tid):
packet = Packets.AskTransactionInformation(tid)
for node, conn in self.cp.iterateForObject(tid, readable=True):
try:
txn_info, txn_ext = self._askStorage(conn, packet)
except ConnectionClosed:
continue
except NEOStorageNotFoundError:
# TID not found
continue
break
else:
raise NEOStorageError('Transaction %r not found' % (tid, ))
return (txn_info, txn_ext)
def undoLog(self, first, last, filter=None, block=0):
# XXX: undoLog is broken
if last < 0:
# See FileStorage.py for explanation
last = first - last
# First get a list of transactions from all storage nodes.
# Each storage node will return TIDs only for UP_TO_DATE state and
# FEEDING state cells
pt = self.getPartitionTable()
storage_node_list = pt.getNodeList()
queue = self._getThreadQueue()
packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
tid_set = set()
for storage_node in storage_node_list:
conn = self.cp.getConnForNode(storage_node)
if conn is None:
continue
conn.ask(packet, queue=queue, tid_set=tid_set)
# Wait for answers from all storages.
self.waitResponses(queue)
# Reorder tids
ordered_tids = sorted(tid_set, reverse=True)
logging.debug("UndoLog tids %s", map(dump, ordered_tids))
# For each transaction, get info
undo_info = []
append = undo_info.append
for tid in ordered_tids:
(txn_info, txn_ext) = self._getTransactionInformation(tid)
if filter is None or filter(txn_info):
txn_info.pop('packed')
txn_info.pop("oids")
self._insertMetadata(txn_info, txn_ext)
append(txn_info)
if len(undo_info) >= last - first:
break
# Check we return at least one element, otherwise call
# again but extend offset
if len(undo_info) == 0 and not block:
undo_info = self.undoLog(first=first, last=last*5, filter=filter,
block=1)
return undo_info
def transactionLog(self, start, stop, limit):
tid_list = []
# request a tid list for each partition
for offset in xrange(self.pt.getPartitions()):
p = Packets.AskTIDsFrom(start, stop, limit, offset)
for node, conn in self.cp.iterateForObject(offset, readable=True):
try:
r = self._askStorage(conn, p)
break
except ConnectionClosed:
pass
else:
raise NEOStorageError('transactionLog failed')
if r:
tid_list = list(heapq.merge(tid_list, r))
if len(tid_list) >= limit:
del tid_list[limit:]
stop = tid_list[-1]
# request transactions informations
txn_list = []
append = txn_list.append
tid = None
for tid in tid_list:
(txn_info, txn_ext) = self._getTransactionInformation(tid)
txn_info['ext'] = loads(txn_ext)
append(txn_info)
return (tid, txn_list)
def history(self, oid, size=1, filter=None):
# Get history informations for object first
packet = Packets.AskObjectHistory(oid, 0, size)
for node, conn in self.cp.iterateForObject(oid, readable=True):
try:
history_list = self._askStorage(conn, packet)
except ConnectionClosed:
continue
# Now that we have object informations, get txn informations
result = []
# history_list is already sorted descending (by the storage)
for serial, size in history_list:
txn_info, txn_ext = self._getTransactionInformation(serial)
# create history dict
txn_info.pop('id')
txn_info.pop('oids')
txn_info.pop('packed')
txn_info['tid'] = serial
txn_info['version'] = ''
txn_info['size'] = size
if filter is None or filter(txn_info):
result.append(txn_info)
self._insertMetadata(txn_info, txn_ext)
return result
@profiler_decorator
def importFrom(self, source, start, stop, tryToResolveConflict,
preindex=None):
if preindex is None:
preindex = {}
transaction_iter = source.iterator(start, stop)
for transaction in transaction_iter:
tid = transaction.tid
self.tpc_begin(transaction, tid, transaction.status)
for r in transaction:
oid = r.oid
pre = preindex.get(oid)
# TODO: bypass conflict resolution, locks...
self.store(oid, pre, r.data, r.version, transaction)
preindex[oid] = tid
conflicted = self.tpc_vote(transaction, tryToResolveConflict)
assert not conflicted, conflicted
real_tid = self.tpc_finish(transaction, tryToResolveConflict)
assert real_tid == tid, (real_tid, tid)
transaction_iter.close()
def iterator(self, start, stop):
if start is None:
start = ZERO_TID
return Iterator(self, start, stop)
def lastTransaction(self):
return self._askPrimary(Packets.AskLastTransaction())
def abortVersion(self, src, transaction):
if self._txn_container.get(transaction) is None:
raise StorageTransactionError(self, transaction)
return '', []
def commitVersion(self, src, dest, transaction):
if self._txn_container.get(transaction) is None:
raise StorageTransactionError(self, transaction)
return '', []
def __del__(self):
"""Clear all connection."""
# Due to bug in ZODB, close is not always called when shutting
# down zope, so use __del__ to close connections
for conn in self.em.getConnectionList():
conn.close()
self.cp.flush()
self.master_conn = None
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.poll_thread.stop()
psThreadedPoll()
close = __del__
def pack(self, t):
tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
if tid == ZERO_TID:
raise NEOStorageError('Invalid pack time')
self._askPrimary(Packets.AskPack(tid))
# XXX: this is only needed to make ZODB unit tests pass.
# It should not be otherwise required (clients should be free to load
# old data as long as it is available in cache, event if it was pruned
# by a pack), so don't bother invalidating on other clients.
self._cache_lock_acquire()
try:
self._cache.clear()
finally:
self._cache_lock_release()
def getLastTID(self, oid):
return self.load(oid)[1]
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
txn_context = self._txn_container.get(transaction)
if txn_context is None:
raise StorageTransactionError(self, transaction)
self._checkCurrentSerialInTransaction(txn_context, oid, serial)
def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
ttid = txn_context['ttid']
txn_context['object_serial_dict'][oid] = serial
# Placeholders
queue = txn_context['queue']
txn_context['object_stored_counter_dict'][oid] = {}
# ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
# after stores, and skips oids that have been succeessfully stored.
assert oid not in txn_context['cache_dict'], (oid, txn_context)
txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for node, conn in self.cp.iterateForObject(oid):
try:
conn.ask(packet, queue=queue)
except ConnectionClosed:
continue
self._waitAnyTransactionMessage(txn_context, False)
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/cache.py 0000664 0000000 0000000 00000020237 11777665324 0026145 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2011-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import math
class CacheItem(object):
__slots__ = ('oid', 'tid', 'next_tid', 'data',
'counter', 'level', 'expire',
'prev', 'next')
def __repr__(self):
s = ''
for attr in self.__slots__:
try:
value = getattr(self, attr)
if value:
if attr in ('prev', 'next'):
s += ' %s=<...>' % attr
continue
elif attr == 'data':
value = '...'
s += ' %s=%r' % (attr, value)
except AttributeError:
pass
return '<%s%s>' % (self.__class__.__name__, s)
class ClientCache(object):
"""In-memory pickle cache based on Multi-Queue cache algorithm
Multi-Queue algorithm for Second Level Buffer Caches:
http://www.usenix.org/event/usenix01/full_papers/zhou/zhou_html/index.html
Quick description:
- There are multiple "regular" queues, plus a history queue
- The queue to store an object in depends on its access frequency
- The queue an object is in defines its lifespan (higher-index queue eq.
longer lifespan)
-> The more often an object is accessed, the higher lifespan it will
have
- Upon cache or history hit, object frequency is increased and object
might get moved to longer-lived queue
- Each access "ages" objects in cache, and an aging object is moved to
shorter-lived queue as it ages without being accessed, or in the
history queue if it's really too old.
"""
__slots__ = ('_life_time', '_max_history_size', '_max_size',
'_queue_list', '_oid_dict', '_time', '_size', '_history_size')
def __init__(self, life_time=10000, max_history_size=100000,
max_size=20*1024*1024):
self._life_time = life_time
self._max_history_size = max_history_size
self._max_size = max_size
self.clear()
def clear(self):
"""Reset cache"""
self._queue_list = [None] # first is history
self._oid_dict = {}
self._time = 0
self._size = 0
self._history_size = 0
def _iterQueue(self, level):
"""for debugging purpose"""
if level < len(self._queue_list):
item = head = self._queue_list[level]
if item:
while 1:
yield item
item = item.next
if item is head:
break
def _add(self, item):
level = item.level
try:
head = self._queue_list[level]
except IndexError:
assert len(self._queue_list) == level
self._queue_list.append(item)
item.prev = item.next = item
else:
if head:
item.prev = tail = head.prev
tail.next = head.prev = item
item.next = head
else:
self._queue_list[level] = item
item.prev = item.next = item
if level:
item.expire = self._time + self._life_time
else:
self._size -= len(item.data)
item.data = None
if self._history_size < self._max_history_size:
self._history_size += 1
else:
self._remove(head)
item_list = self._oid_dict[head.oid]
item_list.remove(head)
if not item_list:
del self._oid_dict[head.oid]
def _remove(self, item):
level = item.level
if level is not None:
item.level = level - 1
next = item.next
if next is item:
self._queue_list[level] = next = None
else:
item.prev.next = next
next.prev = item.prev
if self._queue_list[level] is item:
self._queue_list[level] = next
return next
def _fetched(self, item, _log=math.log):
self._remove(item)
item.counter = counter = item.counter + 1
# XXX It might be better to adjust the level according to the object
# size. See commented factor for example.
item.level = 1 + int(_log(counter, 2)
# * (1.01 - float(len(item.data)) / self._max_size)
)
self._add(item)
self._time = time = self._time + 1
for head in self._queue_list[1:]:
if head and head.expire < time:
self._remove(head)
self._add(head)
break
def _load(self, oid, before_tid=None):
item_list = self._oid_dict.get(oid)
if item_list:
if before_tid:
for item in reversed(item_list):
if item.tid < before_tid:
next_tid = item.next_tid
if next_tid and next_tid < before_tid:
break
return item
else:
item = item_list[-1]
if not item.next_tid:
return item
def load(self, oid, before_tid=None):
"""Return a revision of oid that was current before given tid"""
item = self._load(oid, before_tid)
if item:
data = item.data
if data is not None:
self._fetched(item)
return data, item.tid, item.next_tid
def store(self, oid, data, tid, next_tid):
"""Store a new data record in the cache"""
size = len(data)
max_size = self._max_size
if size < max_size:
item = self._load(oid, next_tid)
if item:
assert not (item.data or item.level)
assert item.tid == tid and item.next_tid == next_tid
self._history_size -= 1
else:
item = CacheItem()
item.oid = oid
item.tid = tid
item.next_tid = next_tid
item.counter = 0
item.level = None
try:
item_list = self._oid_dict[oid]
except KeyError:
self._oid_dict[oid] = [item]
else:
if next_tid:
for i, x in enumerate(item_list):
if tid < x.tid:
break
item_list.insert(i, item)
else:
prev = item_list[-1]
item.counter = prev.counter
prev.counter = 0
if prev.level > 1:
self._fetched(prev)
item_list.append(item)
item.data = data
self._fetched(item)
self._size += size
if max_size < self._size:
for head in self._queue_list[1:]:
while head:
next = self._remove(head)
head.level = 0
self._add(head)
if self._size <= max_size:
return
head = next
def invalidate(self, oid, tid):
"""Mark data record as being valid only up to given tid"""
try:
item = self._oid_dict[oid][-1]
except KeyError:
pass
else:
if item.next_tid is None:
item.next_tid = tid
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/component.xml 0000664 0000000 0000000 00000003007 11777665324 0027250 0 ustar 00root root 0000000 0000000
A scalable storage for Zope
Give the list of the master node like ip:port ip:port...
Give the name of the cluster
If true, enable automatic data compression (compression is only used
when compressed size is smaller).
If true, only reads may be executed against the storage. Note
that the "pack" operation is not considered a write operation
and is still allowed on a read-only neostorage.
Log debugging information to specified SQLite DB.
The file designated by this option contains an updated list of master
nodes which are known to be part of current cluster, so new nodes can
be added/removed without requiring a config change each time.
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/config.py 0000664 0000000 0000000 00000001672 11777665324 0026351 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB.config import BaseConfig
class NeoStorage(BaseConfig):
def open(self):
from .Storage import Storage
config = self.config
return Storage(**dict((k, getattr(config, k))
for k in config.getSectionAttributes()))
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/container.py 0000664 0000000 0000000 00000004603 11777665324 0027063 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from thread import get_ident
from neo.lib.locking import Queue
class ContainerBase(object):
def __init__(self):
self._context_dict = {}
def _getID(self, *args, **kw):
raise NotImplementedError
def _new(self, *args, **kw):
raise NotImplementedError
def delete(self, *args, **kw):
del self._context_dict[self._getID(*args, **kw)]
def get(self, *args, **kw):
return self._context_dict.get(self._getID(*args, **kw))
def new(self, *args, **kw):
result = self._context_dict[self._getID(*args, **kw)] = self._new(
*args, **kw)
return result
class ThreadContainer(ContainerBase):
def _getID(self):
return get_ident()
def _new(self):
return {
'queue': Queue(0),
'answer': None,
}
def get(self):
"""
Implicitely create a thread context if it doesn't exist.
"""
my_id = self._getID()
try:
result = self._context_dict[my_id]
except KeyError:
result = self._context_dict[my_id] = self._new()
return result
class TransactionContainer(ContainerBase):
def _getID(self, txn):
return id(txn)
def _new(self, txn):
return {
'queue': Queue(0),
'txn': txn,
'ttid': None,
'data_dict': {},
'data_size': 0,
'cache_dict': {},
'cache_size': 0,
'object_base_serial_dict': {},
'object_serial_dict': {},
'object_stored_counter_dict': {},
'conflict_serial_dict': {},
'resolved_conflict_serial_dict': {},
'txn_voted': False,
'involved_nodes': set(),
}
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/exception.py 0000664 0000000 0000000 00000002407 11777665324 0027077 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB import POSException
class NEOStorageError(POSException.StorageError):
pass
class NEOStorageNotFoundError(NEOStorageError):
pass
class NEOStorageDoesNotExistError(NEOStorageNotFoundError):
"""
This error is a refinement of NEOStorageNotFoundError: this means
that some object was not found, but also that it does not exist at all.
"""
pass
class NEOStorageCreationUndoneError(NEOStorageDoesNotExistError):
"""
This error is a refinement of NEOStorageDoesNotExistError: this means that
some object existed at some point, but its creation was undone.
"""
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/handlers/ 0000775 0000000 0000000 00000000000 11777665324 0026324 5 ustar 00root root 0000000 0000000 neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/handlers/__init__.py 0000664 0000000 0000000 00000004720 11777665324 0030440 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib.handler import EventHandler
from neo.lib.protocol import ProtocolError, Packets
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
def __init__(self, app):
super(BaseHandler, self).__init__(app)
self.dispatcher = app.dispatcher
def dispatch(self, conn, packet, kw={}):
# Before calling superclass's dispatch method, lock the connection.
# This covers the case where handler sends a response to received
# packet.
conn.lock()
try:
super(BaseHandler, self).dispatch(conn, packet, kw)
finally:
conn.release()
def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread."""
if packet.isResponse() and type(packet) is not Packets.Pong:
if not self.dispatcher.dispatch(conn, packet.getId(), packet, kw):
raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet))
else:
self.dispatch(conn, packet, kw)
def connectionLost(self, conn, new_state):
self.app.dispatcher.unregister(conn)
def connectionFailed(self, conn):
self.app.dispatcher.unregister(conn)
def unexpectedInAnswerHandler(*args, **kw):
raise Exception('Unexpected event in an answer handler')
class AnswerBaseHandler(EventHandler):
connectionStarted = unexpectedInAnswerHandler
connectionCompleted = unexpectedInAnswerHandler
connectionFailed = unexpectedInAnswerHandler
connectionAccepted = unexpectedInAnswerHandler
timeoutExpired = unexpectedInAnswerHandler
connectionClosed = unexpectedInAnswerHandler
packetReceived = unexpectedInAnswerHandler
peerBroken = unexpectedInAnswerHandler
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/handlers/master.py 0000664 0000000 0000000 00000013310 11777665324 0030167 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.pt import MTPartitionTable as PartitionTable
from neo.lib.protocol import NodeTypes, NodeStates, ProtocolError
from neo.lib.util import dump
from . import BaseHandler, AnswerBaseHandler
from ..exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def notReady(self, conn, message):
app = self.app
app.trying_master_node = None
def _acceptIdentification(self, node, uuid, num_partitions,
num_replicas, your_uuid, primary, known_master_list):
app = self.app
# Register new master nodes.
found = False
conn_address = node.getAddress()
for node_address, node_uuid in known_master_list:
if node_address == conn_address:
assert uuid == node_uuid, (dump(uuid), dump(node_uuid))
found = True
n = app.nm.getByAddress(node_address)
if n is None:
n = app.nm.createMaster(address=node_address)
if node_uuid is not None and n.getUUID() != node_uuid:
n.setUUID(node_uuid)
assert found, (node, dump(uuid), known_master_list)
conn = node.getConnection()
if primary is not None:
primary_node = app.nm.getByAddress(primary)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
logging.warning('Unknown primary master: %s. Ignoring.',
primary)
return
else:
if app.trying_master_node is not primary_node:
app.trying_master_node = None
conn.close()
app.primary_master_node = primary_node
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
return
# the master must give an UUID
if your_uuid is None:
raise ProtocolError('No UUID supplied')
app.uuid = your_uuid
logging.info('Got an UUID: %s', dump(app.uuid))
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
app.master_conn = conn
def answerPartitionTable(self, conn, ptid, row_list):
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerNodeInformation(self, conn):
pass
class PrimaryNotificationsHandler(BaseHandler):
""" Handler that process the notifications from the primary master """
def connectionClosed(self, conn):
app = self.app
if app.master_conn is not None:
logging.critical("connection to primary master node closed")
app.master_conn = None
app.primary_master_node = None
super(PrimaryNotificationsHandler, self).connectionClosed(conn)
def stopOperation(self, conn):
logging.critical("master node ask to stop operation")
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
app._cache_lock_acquire()
try:
invalidate = app._cache.invalidate
for oid in oid_list:
invalidate(oid, tid)
db = app.getDB()
if db is not None:
db.invalidate(tid, oid_list)
finally:
app._cache_lock_release()
# For the two methods below, we must not use app._getPartitionTable()
# to avoid a dead lock. It is safe to not check the master connection
# because it's in the master handler, so the connection is already
# established.
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list):
nm = self.app.nm
nm.update(node_list)
# XXX: 'update' automatically closes DOWN nodes. Do we really want
# to do the same thing for nodes in other non-running states ?
for node_type, addr, uuid, state in node_list:
if state != NodeStates.RUNNING:
node = nm.getByUUID(uuid)
if node and node.isConnected():
node.getConnection().close()
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
def answerBeginTransaction(self, conn, ttid):
self.app.setHandlerData(ttid)
def answerNewOIDs(self, conn, oid_list):
self.app.new_oid_list = list(oid_list)
def answerTransactionFinished(self, conn, _, tid):
self.app.setHandlerData(tid)
def answerPack(self, conn, status):
if not status:
raise NEOStorageError('Already packing')
def answerLastTransaction(self, conn, ltid):
self.app.setHandlerData(ltid)
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/handlers/storage.py 0000664 0000000 0000000 00000020364 11777665324 0030347 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB.TimeStamp import TimeStamp
from ZODB.POSException import ConflictError
from neo.lib import logging
from neo.lib.protocol import NodeTypes, ProtocolError, LockState, ZERO_TID
from neo.lib.util import dump
from neo.lib.exception import NodeNotReady
from . import BaseHandler, AnswerBaseHandler
from ..exception import NEOStorageError, NEOStorageNotFoundError
from ..exception import NEOStorageDoesNotExistError
class StorageEventHandler(BaseHandler):
def connectionLost(self, conn, new_state):
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
self.app.dispatcher.unregister(conn)
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionFailed(conn)
class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """
def notReady(self, conn, message):
raise NodeNotReady(message)
def _acceptIdentification(self, node,
uuid, num_partitions, num_replicas, your_uuid, primary,
master_list):
assert primary == self.app.primary_master_node.getAddress(), (
primary, self.app.primary_master_node)
node.setUUID(uuid)
class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """
def answerObject(self, conn, oid, start_serial, end_serial,
compression, checksum, data, data_serial):
self.app.setHandlerData((oid, start_serial, end_serial,
compression, checksum, data))
def answerStoreObject(self, conn, conflicting, oid, serial):
txn_context = self.app.getHandlerData()
object_stored_counter_dict = txn_context[
'object_stored_counter_dict'][oid]
if conflicting:
# Warning: if a storage (S1) is much faster than another (S2), then
# we may process entirely a conflict with S1 (i.e. we received the
# answer to the store of the resolved object on S1) before we
# receive the conflict answer from the first store on S2.
logging.info('%r report a conflict for %r with %r',
conn, dump(oid), dump(serial))
# If this conflict is not already resolved, mark it for
# resolution.
if serial not in txn_context[
'resolved_conflict_serial_dict'].get(oid, ()):
if serial in object_stored_counter_dict and serial != ZERO_TID:
raise NEOStorageError('Storages %s accepted object %s'
' for serial %s but %s reports a conflict for it.' % (
map(dump, object_stored_counter_dict[serial]),
dump(oid), dump(serial), dump(conn.getUUID())))
conflict_serial_dict = txn_context['conflict_serial_dict']
conflict_serial_dict.setdefault(oid, set()).add(serial)
else:
uuid_set = object_stored_counter_dict.get(serial)
if uuid_set is None: # store to first storage node
object_stored_counter_dict[serial] = uuid_set = set()
try:
data = txn_context['data_dict'].pop(oid)
except KeyError: # multiple undo
assert txn_context['cache_dict'][oid] is None, oid
else:
if type(data) is str:
size = len(data)
txn_context['data_size'] -= size
size += txn_context['cache_size']
if size < self.app._cache._max_size:
txn_context['cache_size'] = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
txn_context['cache_dict'][oid] = data
else: # replica
assert oid not in txn_context['data_dict'], oid
uuid_set.add(conn.getUUID())
answerCheckCurrentSerial = answerStoreObject
def answerStoreTransaction(self, conn, _):
pass
def answerTIDsFrom(self, conn, tid_list):
logging.debug('Get %u TIDs from %r', len(tid_list), conn)
self.app.setHandlerData(tid_list)
def answerTransactionInformation(self, conn, tid,
user, desc, ext, packed, oid_list):
self.app.setHandlerData(({
'time': TimeStamp(tid).timeTime(),
'user_name': user,
'description': desc,
'id': tid,
'oids': oid_list,
'packed': packed,
}, ext))
def answerObjectHistory(self, conn, _, history_list):
# history_list is a list of tuple (serial, size)
self.app.setHandlerData(history_list)
def oidNotFound(self, conn, message):
# This can happen either when :
# - loading an object
# - asking for history
raise NEOStorageNotFoundError(message)
def oidDoesNotExist(self, conn, message):
raise NEOStorageDoesNotExistError(message)
def tidNotFound(self, conn, message):
# This can happen when requiring txn informations
raise NEOStorageNotFoundError(message)
def answerTIDs(self, conn, tid_list, tid_set):
tid_set.update(tid_list)
def answerObjectUndoSerial(self, conn, object_tid_dict,
undo_object_tid_dict):
undo_object_tid_dict.update(object_tid_dict)
def answerHasLock(self, conn, oid, status):
store_msg_id = self.app.getHandlerData()['timeout_dict'].pop(oid)
if status == LockState.GRANTED_TO_OTHER:
# Stop expecting the timed-out store request.
self.app.dispatcher.forget(conn, store_msg_id)
# Object is locked by another transaction, and we have waited until
# timeout. To avoid a deadlock, abort current transaction (we might
# be locking objects the other transaction is waiting for).
raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
dump(oid), conn)
# HasLock design required that storage is multi-threaded so that
# it can answer to AskHasLock while processing store resquests.
# This means that the 2 cases (granted to us or nobody) are legitimate,
# either because it gave us the lock but is/was slow to store our data,
# or because the storage took a lot of time processing a previous
# store (and did not even considered our lock request).
# XXX: But storage nodes are still mono-threaded, so they should
# only answer with GRANTED_TO_OTHER (if they reply!), except
# maybe in very rare cases of race condition. Only log for now.
# This also means that most of the time, if the storage is slow
# to process some store requests, HasLock will timeout in turn
# and the connector will be closed.
# Anyway, it's not clear that HasLock requests are useful.
# Are store requests potentially long to process ? If not,
# we should simply raise a ConflictError on store timeout.
logging.info('Store of oid %s delayed (storage overload ?)', dump(oid))
def alreadyPendingError(self, conn, message):
pass
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/iterator.py 0000664 0000000 0000000 00000011265 11777665324 0026734 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ZODB import BaseStorage
from zope.interface import implements
import ZODB.interfaces
from neo.lib.util import u64, add64
from .exception import NEOStorageCreationUndoneError, NEOStorageNotFoundError
CHUNK_LENGTH = 100
class Record(BaseStorage.DataRecord):
""" BaseStorage Transaction record yielded by the Transaction object """
def __init__(self, oid, tid, data, prev):
BaseStorage.DataRecord.__init__(self, oid, tid, data, prev)
def __str__(self):
oid = u64(self.oid)
tid = u64(self.tid)
args = (oid, tid, len(self.data), self.data_txn)
return 'Record %s:%s: %s (%s)' % args
class Transaction(BaseStorage.TransactionRecord):
""" Transaction object yielded by the NEO iterator """
def __init__(self, app, tid, status, user, desc, ext, oid_list,
prev_serial_dict):
BaseStorage.TransactionRecord.__init__( self, tid, status, user, desc,
ext)
self.app = app
self.oid_list = oid_list
self.oid_index = 0
self.history = []
self.prev_serial_dict = prev_serial_dict
def __iter__(self):
return self
def next(self):
""" Iterate over the transaction records """
app = self.app
oid_list = self.oid_list
oid_index = self.oid_index
oid_len = len(oid_list)
# load an object
while oid_index < oid_len:
oid = oid_list[oid_index]
try:
data, _, next_tid = app.load(oid, self.tid)
except NEOStorageCreationUndoneError:
data = next_tid = None
except NEOStorageNotFoundError:
# Transactions are not updated after a pack, so their object
# will not be found in the database. Skip them.
oid_list.pop(oid_index)
oid_len -= 1
continue
oid_index += 1
break
else:
# no more records for this transaction
self.oid_index = 0
raise StopIteration
self.oid_index = oid_index
record = Record(oid, self.tid, data,
self.prev_serial_dict.get(oid))
if next_tid is None:
self.prev_serial_dict.pop(oid, None)
else:
self.prev_serial_dict[oid] = self.tid
return record
def __str__(self):
tid = u64(self.tid)
args = (tid, self.user, self.status)
return 'Transaction #%s: %s %s' % args
class Iterator(object):
""" An iterator for the NEO storage """
def __init__(self, app, start, stop):
self.app = app
self._txn_list = []
assert None not in (start, stop)
self._start = start
self._stop = stop
# index of current iteration
self._index = 0
self._closed = False
# OID -> previous TID mapping
# TODO: prune old entries while walking ?
self._prev_serial_dict = {}
def __iter__(self):
return self
def __getitem__(self, index):
""" Simple index-based iterator """
if index != self._index:
raise IndexError, index
return self.next()
def next(self):
""" Return an iterator for the next transaction"""
if self._closed:
raise IOError, 'iterator closed'
if not self._txn_list:
(max_tid, chunk) = self.app.transactionLog(self._start, self._stop,
CHUNK_LENGTH)
if not chunk:
# nothing more
raise StopIteration
self._start = add64(max_tid, 1)
self._txn_list = chunk
txn = self._txn_list.pop(0)
self._index += 1
tid = txn['id']
user = txn['user_name']
desc = txn['description']
oid_list = txn['oids']
extension = txn['ext']
txn = Transaction(self.app, tid, ' ', user, desc, extension, oid_list,
self._prev_serial_dict)
return txn
def __str__(self):
return 'NEO transactions iterator'
def close(self):
self._closed = True
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/poll.py 0000664 0000000 0000000 00000007176 11777665324 0026057 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from logging import DEBUG, ERROR
from threading import Thread, Event, enumerate as thread_enum
from neo.lib import logging
from neo.lib.locking import Lock
class _ThreadedPoll(Thread):
"""Polling thread."""
def __init__(self, em, **kw):
Thread.__init__(self, **kw)
self.em = em
self.daemon = True
self._stop = Event()
def run(self):
_log = logging.log
def log(*args, **kw):
# Ignore errors due to garbage collection on exit
try:
_log(*args, **kw)
except:
if not self.stopping():
raise
log(DEBUG, 'Started %s', self)
while not self.stopping():
try:
# XXX: Delay cannot be infinite here, because we need
# to check connection timeout and thread shutdown.
self.em.poll(1)
except:
log(ERROR, 'poll raised, retrying', exc_info=1)
log(DEBUG, 'Threaded poll stopped')
self._stop.clear()
def stop(self):
self._stop.set()
def stopping(self):
return self._stop.isSet()
class ThreadedPoll(object):
"""
Wrapper for polloing thread, just to be able to start it again when
it stopped.
"""
_thread = None
_started = False
def __init__(self, *args, **kw):
lock = Lock()
self._status_lock_acquire = lock.acquire
self._status_lock_release = lock.release
self._args = args
self._kw = kw
self.newThread()
def newThread(self):
self._thread = _ThreadedPoll(*self._args, **self._kw)
def start(self):
"""
Start thread if not started or restart it if it's shutting down.
"""
# TODO: a refcount-based approach would be better, but more intrusive.
self._status_lock_acquire()
try:
thread = self._thread
if thread.stopping():
# XXX: ideally, we should wake thread up here, to be sure not
# to wait forever.
thread.join()
if not thread.isAlive():
if self._started:
self.newThread()
else:
self._started = True
self._thread.start()
finally:
self._status_lock_release()
def stop(self):
self._status_lock_acquire()
try:
self._thread.stop()
finally:
self._status_lock_release()
def __getattr__(self, key):
return getattr(self._thread, key)
def __repr__(self):
return repr(self._thread)
def psThreadedPoll(log=None):
"""
Logs alive ThreadedPoll threads.
"""
if log is None:
log = logging.debug
for thread in thread_enum():
if not isinstance(thread, ThreadedPoll):
continue
log('Thread %s at 0x%x, %s', thread.getName(), id(thread),
thread._stop.isSet() and 'stopping' or 'running')
neoppod-23fad3af8aeb9cec4382f8d8e08ac3b8c6219364-neo-client/neo/client/pool.py 0000664 0000000 0000000 00000014737 11777665324 0026063 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2012 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import time
from random import shuffle
from neo.lib import logging
from neo.lib.locking import RLock
from neo.lib.protocol import NodeTypes, Packets
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.profiling import profiler_decorator
from neo.lib.exception import NodeNotReady
from .exception import NEOStorageError
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600
# Cell list sort keys
# We are connected to storage node hosting cell, high priority
CELL_CONNECTED = -1
# normal priority
CELL_GOOD = 0
# Storage node hosting cell failed recently, low priority
CELL_FAILED = 1
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
def __init__(self, app, max_pool_size = 25):
self.app = app
self.max_pool_size = max_pool_size
self.connection_dict = {}
# Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections
# to the same node.
l = RLock()
self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release
self.node_failure_dict = {}
@profiler_decorator
def _initNodeConnection(self, node):
"""Init a connection to a given storage node."""
app = self.app
logging.debug('trying to connect to %s - %s', node, node.getState())
conn = MTClientConnection(app.em, app.storage_event_handler, node,
connector=app.connector_handler(), dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name)
try:
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
self.notifyFailure(node)
conn = None
except NodeNotReady:
logging.info('%r not ready', node)
self.notifyFailure(node)
conn = None
else:
logging.info('Connected %r', node)
return conn
@profiler_decorator
def _dropConnections(self):
"""Drop connections."""
for node_uuid, conn in self.connection_dict.items():
# Drop first connection which looks not used
conn.lock()
try:
if not conn.pending() and \
not self.app.dispatcher.registered(conn):
del self.connection_dict[conn.getUUID()]
conn.close()
logging.debug('_dropConnections: connection to '
'storage node %s:%d closed', *conn.getAddress())
if len(self.connection_dict) <= self.max_pool_size:
break
finally:
conn.unlock()
@profiler_decorator
def notifyFailure(self, node):
self._notifyFailure(node.getUUID(), time.time() + MAX_FAILURE_AGE)
def _notifyFailure(self, uuid, at):
self.node_failure_dict[uuid] = at
@profiler_decorator
def getCellSortKey(self, cell):
return self._getCellSortKey(cell.getUUID(), time.time())
def _getCellSortKey(self, uuid, now):
if uuid in self.connection_dict:
result = CELL_CONNECTED
else:
failure = self.node_failure_dict.get(uuid)
if failure is None or failure < now:
result = CELL_GOOD
else:
result = CELL_FAILED
return result
@profiler_decorator
def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode())
def iterateForObject(self, object_id, readable=False):
""" Iterate over nodes managing an object """
pt = self.app.getPartitionTable()
if type(object_id) is str:
object_id = pt.getPartition(object_id)
cell_list = pt.getCellList(object_id, readable)
if not cell_list:
raise NEOStorageError('no storage available')
getConnForNode = self.getConnForNode
while cell_list:
new_cell_list = []
cell_list = [c for c in cell_list if c.getNode().isRunning()]
shuffle(cell_list)
cell_list.sort(key=self.getCellSortKey)
for cell in cell_list:
node = cell.getNode()
conn = getConnForNode(node)
if conn is not None:
yield (node, conn)
elif node.isRunning():
new_cell_list.append(cell)
cell_list = new_cell_list
if new_cell_list:
# wait a bit to avoid a busy loop
time.sleep(1)
@profiler_decorator
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
if node.isRunning():
uuid = node.getUUID()
self.connection_lock_acquire()
try:
# Already connected to node
return self.connection_dict[uuid]
except KeyError:
if len(self.connection_dict) > self.max_pool_size:
# must drop some unused connections
self._dropConnections()
# Create new connection to node
conn = self._initNodeConnection(node)
if conn is not None:
self.connection_dict[uuid] = conn
return conn
finally:
self.connection_lock_release()
@profiler_decorator
def removeConnection(self, node):
"""Explicitly remove connection when a node is broken."""
self.connection_dict.pop(node.getUUID(), None)
def flush(self):
"""Remove all connections"""
self.connection_dict.clear()