Commit 728f5faa authored by Tim Peters's avatar Tim Peters

Merge pycon-multidb branch (-r 29573:29605).

This introduces a "multi-database" concept (a simplification
of Jim's Wiki proposal), and adds many interface definitions.

Work done during the PyCon 2005 ZODB sprint, by Christian
Theune, Jim Fulton and Tim Peters.
parent f0498037
......@@ -252,6 +252,12 @@ class BaseStorage(UndoLogCompatible):
pass
def tpc_finish(self, transaction, f=None):
# It's important that the storage calls the function we pass
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._lock_acquire()
try:
if transaction is not self._transaction:
......
......@@ -23,6 +23,12 @@ from time import time
from persistent import PickleCache
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from transaction.interfaces import IDataManager
from zope.interface import implements
import transaction
from ZODB.ConflictResolution import ResolvedSerial
......@@ -31,12 +37,9 @@ from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference, \
ConnectionStateError
from ZODB.TmpStore import TmpStore
from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
from ZODB.interfaces import IConnection
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
from ZODB.utils import u64, oid_repr, z64, positive_id, \
DEPRECATED_ARGUMENT, deprecated36
global_reset_counter = 0
......@@ -54,127 +57,19 @@ def resetCaches():
global_reset_counter += 1
class Connection(ExportImport, object):
"""Connection to ZODB for loading and storing objects.
The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
:Groups:
- `User Methods`: root, get, add, close, db, sync, isReadOnly,
cacheGC, cacheFullSweep, cacheMinimize, getVersion,
modifiedInVersion
- `Experimental Methods`: setLocalTransaction, getTransaction,
onCloseCallbacks
- `Transaction Data Manager Methods`: tpc_begin, tpc_vote,
tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub,
abort_sub
- `Database Invalidation Methods`: invalidate, _setDB
- `IPersistentDataManager Methods`: setstate, register,
setklassstate
- `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo,
getTransferCounts
"""Connection to ZODB for loading and storing objects."""
"""
implements(IConnection)
implements(IConnection, IDataManager, IPersistentDataManager)
_tmp = None
_code_timestamp = 0
# ZODB.IConnection
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
instance that it is connected to.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `cache_size`: the target size of the in-memory object
cache, measured in objects.
- `cache_deactivate_after`: deprecated, ignored
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
"""Create a new Connection."""
self._log = logging.getLogger("ZODB.Connection")
self._storage = None
self._debug_info = ()
......@@ -220,7 +115,7 @@ class Connection(ExportImport, object):
# from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated.
# It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In
......@@ -253,79 +148,20 @@ class Connection(ExportImport, object):
# to pass to _importDuringCommit().
self._import = None
def getTransaction(self):
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
self.connections = None
def __repr__(self):
if self._version:
ver = ' (in version %s)' % `self._version`
else:
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open()
self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def get(self, oid):
"""Return the persistent object with oid 'oid'.
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
:return: persistent object corresponding to `oid`
:Parameters:
- `oid`: an object id
:Exceptions:
- `KeyError`: if oid does not exist. It is possible that an
object does not exist as of the current transaction, but
existed in the past. It may even exist again in the
future, if the transaction that removed it is undone.
- `ConnectionStateError`: if the connection is closed.
"""
"""Return the persistent object with oid 'oid'."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
......@@ -347,33 +183,8 @@ class Connection(ExportImport, object):
self._cache[oid] = obj
return obj
# deprecate this method?
__getitem__ = get
def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid.
A persistent object is normally added to the database and
assigned an oid when it becomes reachable to an object already in
the database. In some cases, it is useful to create a new
object and use its oid (_p_oid) in a single transaction.
This method assigns a new oid regardless of whether the object
is reachable.
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
:Parameters:
- `obj`: a Persistent object
:Exceptions:
- `TypeError`: if obj is not a persistent object.
- `InvalidObjectReference`: if obj is already associated
with another connection.
- `ConnectionStateError`: if the connection is closed.
"""
"""Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
......@@ -397,72 +208,11 @@ class Connection(ExportImport, object):
raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self):
# If two connections use the same storage, give them a
# consistent order using id(). This is unique for the
# lifetime of a connection, which is good enough.
return "%s:%s" % (self._sortKey(), id(self))
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
:Parameters:
- `odb`: database that owns the Connection
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self._sortKey = odb._storage.sortKey
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
"""Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self))
def abort(self, transaction):
"""Abort modifications to registered objects.
This tells the cache to invalidate changed objects. _p_jar
and _p_oid are deleted from new objects.
"""
"""Abort a transaction and forget all changes."""
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
......@@ -475,70 +225,22 @@ class Connection(ExportImport, object):
self._tpc_cleanup()
# Should there be a way to call incrgc directly?
# Perhaps "full sweep" should do that?
# TODO: we should test what happens when these methods are called
# mid-transaction.
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
:Parameters:
- `dt`: ignored. It is provided only for backwards compatibility.
"""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheGC(self):
"""Reduce cache size to target size.
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
"""Reduce cache size to target size."""
self._cache.incrgc()
__onCloseCallbacks = None
def onCloseCallback(self, f):
"""Register a callable, f, to be called by close().
The callable, f, will be called at most once, the next time
the Connection is closed.
:Parameters:
- `f`: object that will be called on `close`
"""
"""Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f)
def close(self):
"""Close the Connection.
A closed Connection should not be used by client code. It
can't load or store objects. Objects in the cache are not
freed, because Connections are re-used and the cache are
expected to be useful to the next client.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is scanned for
old objects.
"""
"""Close the Connection."""
if not self._needs_to_join:
# We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to "
......@@ -575,7 +277,10 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
# transaction.interfaces.IDataManager
def commit(self, transaction):
"""Commit changes to an object"""
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
......@@ -653,7 +358,8 @@ class Connection(ExportImport, object):
self._handle_serial(s, oid)
def commit_sub(self, t):
"""Commit all work done in all subtransactions for this transaction."""
"""Commit all changes made in subtransactions and begin 2-phase commit
"""
if self._tmp is None:
return
src = self._storage
......@@ -674,7 +380,7 @@ class Connection(ExportImport, object):
self._handle_serial(s, oid, change=False)
def abort_sub(self, t):
"""Abort work done in all subtransactions for this transaction."""
"""Discard all subtransaction data."""
if self._tmp is None:
return
src = self._storage
......@@ -685,7 +391,7 @@ class Connection(ExportImport, object):
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
"""Dissown any objects newly saved in an uncommitted transaction."""
"""Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
self._creating = []
......@@ -697,42 +403,6 @@ class Connection(ExportImport, object):
del o._p_jar
del o._p_oid
def db(self):
return self._db
def getVersion(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def isReadOnly(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects is accessed by
the current transaction, the revision written before C{tid}
will be used.
The DB calls this method, even when the Connection is closed.
:Parameters:
- `tid`: the storage-level id of the transaction that committed
- `oids`: oids is a set of oids, represented as a dict with oids
as keys.
"""
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
# The next two methods are callbacks for transaction synchronization.
def beforeCompletion(self, txn):
......@@ -753,213 +423,35 @@ class Connection(ExportImport, object):
# Now is a good time to collect some garbage
self._cache.incrgc()
def modifiedInVersion(self, oid):
try:
return self._db.modifiedInVersion(oid)
except KeyError:
return self._version
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
def root(self):
"""Return the database root object.
The root is a persistent.mapping.PersistentMapping.
"""
"""Return the database root object."""
return self.get(z64)
def setstate(self, obj):
oid = obj._p_oid
def db(self):
"""Returns a handle to the database this connection belongs to."""
return self._db
def isReadOnly(self):
"""Returns True if the storage for this connection is read only."""
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
self._log.error(msg)
raise ConnectionStateError(msg)
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try:
self._setstate(obj)
except ConflictError:
raise
except:
self._log.error("Couldn't load state for %s", oid_repr(oid),
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
# If an object has been invalidated, there are several cases
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# IDataManager
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
"""Return copy of obj that was written by tid.
The returned object does not have the typical metadata
(_p_jar, _p_oid, _p_serial) set. I'm not sure how references
to other peristent objects are handled.
:return: a persistent object
:Parameters:
- `obj`: a persistent object from this Connection.
- `tid`: id of a transaction that wrote an earlier revision.
:Exceptions:
- `KeyError`: if tid does not exist or if tid deleted a revision
of obj.
"""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def tpc_begin(self, transaction, sub=False):
self._modified = []
def tpc_begin(self, transaction, sub=False):
"""Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
......@@ -972,6 +464,7 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
try:
vote = self._storage.tpc_vote
except AttributeError:
......@@ -1022,12 +515,7 @@ class Connection(ExportImport, object):
obj._p_serial = serial
def tpc_finish(self, transaction):
# It's important that the storage calls the function we pass
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
"""Indicate confirmation that the transaction is done."""
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
......@@ -1044,6 +532,7 @@ class Connection(ExportImport, object):
self._tpc_cleanup()
def tpc_abort(self, transaction):
"""Abort a transaction."""
if self._import:
self._import = None
self._storage.tpc_abort(transaction)
......@@ -1055,16 +544,16 @@ class Connection(ExportImport, object):
del obj._p_jar
self._tpc_cleanup()
# Common cleanup actions after tpc_finish/tpc_abort.
def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._needs_to_join = True
self._registered_objects = []
def sync(self):
"""Manually update the view on the database."""
self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0)
if sync:
......@@ -1072,22 +561,304 @@ class Connection(ExportImport, object):
self._flush_invalidations()
def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info
def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args
def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored.
If clear is True, reset the counters.
"""
"""Returns the number of objects loaded and stored."""
res = self._load_count, self._store_count
if clear:
self._load_count = 0
self._store_count = 0
return res
##############################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'."""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setstate(self, obj):
"""Turns the ghost 'obj' into a real object by loading it's from the
database."""
oid = obj._p_oid
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
self._log.error(msg)
raise ConnectionStateError(msg)
try:
self._setstate(obj)
except ConflictError:
raise
except:
self._log.error("Couldn't load state for %s", oid_repr(oid),
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
# PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
Parameters:
odb: database that owns the Connection
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means
used the default transaction manager.
synch: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
# Multi-database support
self.connections = {self._db.database_name: self}
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
# Python protocol
def __repr__(self):
if self._version:
ver = ' (in version %s)' % `self._version`
else:
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
# DEPRECATION candidates
__getitem__ = get
def modifiedInVersion(self, oid):
"""Returns the version the object with the given oid was modified in.
If it wasn't modified in a version, the current version of this
connection is returned.
"""
try:
return self._db.modifiedInVersion(oid)
except KeyError:
import pdb; pdb.set_trace()
return self.getVersion()
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid
......@@ -1096,3 +867,52 @@ class Connection(ExportImport, object):
new._p_changed = 1
self._register(new)
self._cache[oid] = new
# DEPRECATED methods
def getTransaction(self):
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache."""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
......@@ -27,6 +27,9 @@ from ZODB.serialize import referencesf
from ZODB.utils import WeakSet
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
from ZODB.interfaces import IDatabase
import transaction
logger = logging.getLogger('ZODB.DB')
......@@ -178,6 +181,7 @@ class DB(object):
setCacheDeactivateAfter,
getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter
"""
implements(IDatabase)
klass = Connection # Class to use for connections
_activity_monitor = None
......@@ -188,6 +192,8 @@ class DB(object):
cache_deactivate_after=DEPRECATED_ARGUMENT,
version_pool_size=3,
version_cache_size=100,
database_name='unnamed',
databases=None,
version_cache_deactivate_after=DEPRECATED_ARGUMENT,
):
"""Create an object database.
......@@ -248,6 +254,16 @@ class DB(object):
storage.tpc_vote(t)
storage.tpc_finish(t)
# Multi-database setup.
if databases is None:
databases = {}
self.databases = databases
self.database_name = database_name
if database_name in databases:
raise ValueError("database_name %r already in databases" %
database_name)
databases[database_name] = self
# Pass through methods:
for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
'versionEmpty', 'versions']:
......@@ -565,7 +581,7 @@ class DB(object):
def get_info(c):
# `result`, `time` and `version` are lexically inherited.
o = c._opened
d = c._debug_info
d = c.getDebugInfo()
if d:
if len(d) == 1:
d = d[0]
......
......@@ -547,6 +547,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_release()
def load(self, oid, version):
"""Return pickle data and serial number."""
self._lock_acquire()
try:
pos = self._lookup_pos(oid)
......
......@@ -68,16 +68,16 @@
#
# - 8-byte data length
#
# ? 8-byte position of non-version data
# ? 8-byte position of non-version data record
# (if version length > 0)
#
# ? 8-byte position of previous record in this version
# (if version length > 0)
#
# ? version string
# ? version string
# (if version length > 0)
#
# ? data
# ? data
# (data length > 0)
#
# ? 8-byte position of data record containing data
......
......@@ -61,6 +61,9 @@ class TmpStore:
serial = h[:8]
return self._file.read(size), serial
def sortKey(self):
return self._storage.sortKey()
# TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid):
......
......@@ -16,14 +16,122 @@
$Id$
"""
import zope.interface
from zope.interface import Interface, Attribute
class IConnection(zope.interface.Interface):
"""ZODB connection.
class IConnection(Interface):
"""Connection to ZODB for loading and storing objects.
TODO: This interface is incomplete.
The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
Groups of methods:
User Methods:
root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep,
cacheMinimize, getVersion, modifiedInVersion
Experimental Methods:
onCloseCallbacks
Database Invalidation Methods:
invalidate
Other Methods: exchange, getDebugInfo, setDebugInfo,
getTransferCounts
"""
def __init__(version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
instance that it is connected to.
Parameters:
version: the "version" that all changes will be made in, defaults
to no version.
cache_size: the target size of the in-memory object cache, measured
in objects.
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means used the default
transaction manager.
synch: boolean indicating whether Connection should register for
afterCompletion() calls.
"""
def add(ob):
"""Add a new object 'obj' to the database and assign it an oid.
......@@ -38,4 +146,330 @@ class IConnection(zope.interface.Interface):
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
Parameters:
obj: a Persistent object
Raises TypeError if obj is not a persistent object.
Raises InvalidObjectReference if obj is already associated with another
connection.
Raises ConnectionStateError if the connection is closed.
"""
def get(oid):
"""Return the persistent object with oid 'oid'.
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
Parameters:
oid: an object id
Raises KeyError if oid does not exist.
It is possible that an object does not exist as of the current
transaction, but existed in the past. It may even exist again in
the future, if the transaction that removed it is undone.
Raises ConnectionStateError if the connection is closed.
"""
def cacheMinimize():
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
"""
def cacheGC():
"""Reduce cache size to target size.
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
def onCloseCallback(f):
"""Register a callable, f, to be called by close().
f will be called with no arguments before the Connection is closed.
Parameters:
f: method that will be called on `close`
"""
def close():
"""Close the Connection.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is garbage collected.
A closed Connection should not be used by client code. It can't load
or store objects. Objects in the cache are not freed, because
Connections are re-used and the cache is expected to be useful to the
next client.
"""
def db():
"""Returns a handle to the database this connection belongs to."""
def isReadOnly():
"""Returns True if the storage for this connection is read only."""
def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is a set of oids, represented as a dict with oids as keys.
"""
def root():
"""Return the database root object.
The root is a persistent.mapping.PersistentMapping.
"""
def getVersion():
"""Returns the version this connection is attached to."""
# Multi-database support.
connections = Attribute("""\
A mapping from database name to a Connection to that database.
In multi-database use, the Connections of all members of a database
collection share the same .connections object.
In single-database use, of course this mapping contains a single
entry.
""")
# TODO: should this accept all the arguments one may pass to DB.open()?
def get_connection(database_name):
"""Return a Connection for the named database.
This is intended to be called from an open Connection associated with
a multi-database. In that case, database_name must be the name of a
database within the database collection (probably the name of a
different database than is associated with the calling Connection
instance, but it's fine to use the name of the calling Connection
object's database). A Connection for the named database is
returned. If no connection to that database is already open, a new
Connection is opened. So long as the multi-database remains open,
passing the same name to get_connection() multiple times returns the
same Connection object each time.
"""
def sync():
"""Manually update the view on the database.
This includes aborting the current transaction, getting a fresh and
consistent view of the data (synchronizing with the storage if possible)
and call cacheGC() for this connection.
This method was especially useful in ZODB 3.2 to better support
read-only connections that were affected by a couple of problems.
"""
# Debug information
def getDebugInfo():
"""Returns a tuple with different items for debugging the connection.
Debug information can be added to a connection by using setDebugInfo.
"""
def setDebugInfo(*items):
"""Add the given items to the debug information of this connection."""
def getTransferCounts(clear=False):
"""Returns the number of objects loaded and stored.
If clear is True, reset the counters.
"""
class IDatabase(Interface):
"""ZODB DB.
TODO: This interface is incomplete.
"""
def __init__(storage,
pool_size=7,
cache_size=400,
version_pool_size=3,
version_cache_size=100,
database_name='unnamed',
databases=None,
):
"""Create an object database.
storage: the storage used by the database, e.g. FileStorage
pool_size: expected maximum number of open connections
cache_size: target size of Connection object cache, in number of
objects
version_pool_size: expected maximum number of connections (per
version)
version_cache_size: target size of Connection object cache for
version connections, in number of objects
database_name: when using a multi-database, the name of this DB
within the database group. It's a (detected) error if databases
is specified too and database_name is already a key in it.
This becomes the value of the DB's database_name attribute.
databases: when using a multi-database, a mapping to use as the
binding of this DB's .databases attribute. It's intended
that the second and following DB's added to a multi-database
pass the .databases attribute set on the first DB added to the
collection.
"""
databases = Attribute("""\
A mapping from database name to DB (database) object.
In multi-database use, all DB members of a database collection share
the same .databases object.
In single-database use, of course this mapping contains a single
entry.
""")
class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
def load(oid, version):
"""XXX"""
def close():
"""XXX"""
def cleanup():
"""XXX"""
def lastSerial():
"""XXX"""
def lastTransaction():
"""XXX"""
def lastTid(oid):
"""Return last serialno committed for object oid."""
def loadSerial(oid, serial):
"""XXX"""
def loadBefore(oid, tid):
"""XXX"""
def iterator(start=None, stop=None):
"""XXX"""
def sortKey():
"""XXX"""
def getName():
"""XXX"""
def getSize():
"""XXX"""
def history(oid, version, length=1, filter=None):
"""XXX"""
def new_oid(last=None):
"""XXX"""
def set_max_oid(possible_new_max_oid):
"""XXX"""
def registerDB(db, limit):
"""XXX"""
def isReadOnly():
"""XXX"""
def supportsUndo():
"""XXX"""
def supportsVersions():
"""XXX"""
def tpc_abort(transaction):
"""XXX"""
def tpc_begin(transaction):
"""XXX"""
def tpc_vote(transaction):
"""XXX"""
def tpc_finish(transaction, f=None):
"""XXX"""
def getSerial(oid):
"""XXX"""
def loadSerial(oid, serial):
"""XXX"""
def loadBefore(oid, tid):
"""XXX"""
def getExtensionMethods():
"""XXX"""
def copyTransactionsFrom():
"""XXX"""
def store(oid, oldserial, data, version, transaction):
"""
may return the new serial or not
"""
class IUndoableStorage(IStorage):
def undo(transaction_id, txn):
"""XXX"""
def undoInfo():
"""XXX"""
def undoLog(first, last, filter=None):
"""XXX"""
def pack(t, referencesf):
"""XXX"""
class IVersioningStorage(IStorage):
def abortVersion(src, transaction):
"""XXX"""
def commitVersion(src, dest, transaction):
"""XXX"""
def modifiedInVersion(oid):
"""XXX"""
def versionEmpty(version):
"""XXX"""
def versions(max=None):
"""XXX"""
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Multi-database tests
====================
Multi-database support adds the ability to tie multiple databases into a
collection. The original proposal is in the fishbowl:
http://www.zope.org/Wikis/ZODB/MultiDatabases/
It was implemented during the PyCon 2005 sprints, but in a simpler form,
by Jim Fulton, Christian Theune,and Tim Peters. Overview:
No private attributes were added, and one new method was introduced.
DB:
- a new .database_name attribute holds the name of this database
- a new .databases attribute maps from database name to DB object; all DBs
in a multi-database collection share the same .databases object
- the DB constructor has new optional arguments with the same names
(database_name= and databases=).
Connection:
- a new .connections attribute maps from database name to a Connection for
the database with that name; the .connections mapping object is also
shared among databases in a collection
- a new .get_connection(database_name) method returns a Connection for a
database in the collection; if a connection is already open, it's returned
(this is the value .connections[database_name]), else a new connection is
opened (and stored as .connections[database_name])
Creating a multi-database starts with creating a named DB:
>>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB
>>> dbmap = {}
>>> db = DB(MinimalMemoryStorage(), database_name='root', databases=dbmap)
The database name is accessible afterwards and in a newly created collection:
>>> db.database_name
'root'
>>> db.databases # doctest: +ELLIPSIS
{'root': <ZODB.DB.DB object at ...>}
>>> db.databases is dbmap
True
Adding another database to the collection works like this:
>>> db2 = DB(MinimalMemoryStorage(),
... database_name='notroot',
... databases=dbmap)
The new db2 now shares the 'databases' dictionary with db and has two entries:
>>> db2.databases is db.databases is dbmap
True
>>> len(db2.databases)
2
>>> names = dbmap.keys(); names.sort(); print names
['notroot', 'root']
It's an error to try to insert a database with a name already in use:
>>> db3 = DB(MinimalMemoryStorage(),
... database_name='root',
... databases=dbmap)
Traceback (most recent call last):
...
ValueError: database_name 'root' already in databases
Because that failed, db.databases wasn't changed:
>>> len(db.databases) # still 2
2
You can (still) get a connection to a database this way:
>>> cn = db.open()
>>> cn # doctest: +ELLIPSIS
<Connection at ...>
This is the only connection in this collection right now:
>>> cn.connections # doctest: +ELLIPSIS
{'root': <Connection at ...>}
Getting a connection to a different database from an existing connection in the
same database collection (this enables 'connection binding' within a given
thread/transaction/context ...):
>>> cn2 = cn.get_connection('notroot')
>>> cn2 # doctest: +ELLIPSIS
<Connection at ...>
Now there are two connections in that collection:
>>> cn2.connections is cn.connections
True
>>> len(cn2.connections)
2
>>> names = cn.connections.keys(); names.sort(); print names
['notroot', 'root']
So long as this database group remains open, the same Connection objects
are returned:
>>> cn.get_connection('root') is cn
True
>>> cn.get_connection('notroot') is cn2
True
>>> cn2.get_connection('root') is cn
True
>>> cn2.get_connection('notroot') is cn2
True
Of course trying to get a connection for a database not in the group raises
an exception:
>>> cn.get_connection('no way')
Traceback (most recent call last):
...
KeyError: 'no way'
Clean up:
>>> for a_db in dbmap.values():
... a_db.close()
......@@ -647,6 +647,8 @@ class StubDatabase:
self._storage = StubStorage()
classFactory = None
database_name = 'stubdatabase'
databases = {'stubdatabase': database_name}
def invalidate(self, transaction, dict_with_oid_keys, connection):
pass
......
......@@ -15,4 +15,6 @@
from zope.testing.doctestunit import DocFileSuite
def test_suite():
return DocFileSuite("dbopen.txt")
return DocFileSuite("dbopen.txt",
"multidb.txt",
)
......@@ -257,18 +257,35 @@ class IPersistentDataManager(Interface):
def setstate(object):
"""Load the state for the given object.
The object should be in the ghost state.
The object's state will be set and the object will end up
in the saved state.
The object should be in the ghost state. The object's state will be
set and the object will end up in the saved state.
The object must provide the IPersistent interface.
"""
def oldstate(obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'.
The returned object does not have the typical metadata (_p_jar, _p_oid,
_p_serial) set. I'm not sure how references to other peristent objects
are handled.
Parameters
obj: a persistent object from this Connection.
tid: id of a transaction that wrote an earlier revision.
Raises KeyError if tid does not exist or if tid deleted a revision of
obj.
"""
def register(object):
"""Register an IPersistent with the current transaction.
This method must be called when the object transitions to
the changed state.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
"""
def mtime(object):
......
......@@ -18,104 +18,7 @@ $Id$
import zope.interface
class IResourceManager(zope.interface.Interface):
"""Objects that manage resources transactionally.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
"""
# Two-phase commit protocol. These methods are called by the
# ITransaction object associated with the transaction being
# committed.
def tpc_begin(transaction):
"""Begin two-phase commit, to save data changes.
An implementation should do as much work as possible without
making changes permanent. Changes should be made permanent
when tpc_finish is called (or aborted if tpc_abort is called).
The work can be divided between tpc_begin() and tpc_vote(), and
the intent is that tpc_vote() be as fast as possible (to minimize
the period of uncertainty).
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_vote(transaction):
"""Verify that a resource manager can commit the transaction.
This is the last chance for a resource manager to vote 'no'. A
resource manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
"""
def tpc_abort(transaction):
"""Abort a transaction.
transaction is the ITransaction instance associated with the
transaction being committed.
All changes made by the current transaction are aborted. Note
that this includes all changes stored in any savepoints that may
be associated with the current transaction.
tpc_abort() can be called at any time, either in or out of the
two-phase commit.
This should never fail.
"""
# The savepoint/rollback API.
def savepoint(transaction):
"""Save partial transaction changes.
There are two purposes:
1) To allow discarding partial changes without discarding all
dhanges.
2) To checkpoint changes to disk that would otherwise live in
memory for the duration of the transaction.
Returns an object implementing ISavePoint2 that can be used
to discard changes made since the savepoint was captured.
An implementation that doesn't support savepoints should implement
this method by returning a savepoint object that raises an
exception when its rollback method is called. The savepoint method
shouldn't raise an error. This way, transactions that create
savepoints can proceed as long as an attempt is never made to roll
back a savepoint.
"""
def discard(transaction):
"""Discard changes within the transaction since the last savepoint.
That means changes made since the last savepoint if one exists, or
since the start of the transaction.
"""
class IDataManagerOriginal(zope.interface.Interface):
class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
......@@ -155,7 +58,7 @@ class IDataManagerOriginal(zope.interface.Interface):
has been called; this is only used when the transaction is
being committed.
This call also implied the beginning of 2-phase commit.
This call also implies the beginning of 2-phase commit.
"""
# Two-phase commit protocol. These methods are called by the
......@@ -180,10 +83,12 @@ class IDataManagerOriginal(zope.interface.Interface):
"""
def tpc_abort(transaction):
"""Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
......@@ -202,6 +107,11 @@ class IDataManagerOriginal(zope.interface.Interface):
database is not expected to maintain consistency; it's a
serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
"""
def tpc_vote(transaction):
......@@ -214,125 +124,46 @@ class IDataManagerOriginal(zope.interface.Interface):
transaction being committed.
"""
def commit(object, transaction):
"""CCCommit changes to an object
def commit(transaction):
"""Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
"""
def abort(object, transaction):
"""Abort changes to an object
Only changes made since the last transaction or
sub-transaction boundary are discarded.
This method may be called either:
o Outside of two-phase commit, or
o In the first phase of two-phase commit
"""
def sortKey():
"""
Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
class IDataManager(zope.interface.Interface):
"""Data management interface for storing objects transactionally.
ZODB database connections currently provides the older
IDataManagerOriginal interface, but the intent is to move to this newer
IDataManager interface.
Our hope is that this interface will evolve and become the standard
interface. There are some issues to be resolved first, like:
- Probably want separate abort methods for use in and out of
two-phase commit.
- The savepoint api may need some more thought.
"""
def prepare(transaction):
"""Perform the first phase of a 2-phase commit
The data manager prepares for commit any changes to be made
persistent. A normal return from this method indicated that
the data manager is ready to commit the changes.
The data manager must raise an exception if it is not prepared
to commit the transaction after executing prepare().
The transaction must match that used for preceeding
savepoints, if any.
This includes conflict detection and handling. If no conflicts or
errors occur it saves the objects in the storage.
"""
# This is equivalent to zodb3's tpc_begin, commit, and
# tpc_vote combined.
def abort(transaction):
"""Abort changes made by transaction
This may be called before two-phase commit or in the second
phase of two-phase commit.
The transaction must match that used for preceeding
savepoints, if any.
"""
# This is equivalent to *both* zodb3's abort and tpc_abort
# calls. This should probably be split into 2 methods.
def commit(transaction):
"""Finish two-phase commit
The prepare method must be called, with the same transaction,
before calling commit.
"""
# This is equivalent to zodb3's tpc_finish
def savepoint(transaction):
"""Do tentative commit of changes to this point.
Should return an object implementing IRollback that can be used
to rollback to the savepoint.
Note that (unlike zodb3) this doesn't use a 2-phase commit
protocol. If this call fails, or if a rollback call on the
result fails, the (containing) transaction should be
aborted. Aborting the containing transaction is *not* the
responsibility of the data manager, however.
"""Abort a transaction and forget all changes.
An implementation that doesn't support savepoints should
implement this method by returning a rollback implementation
that always raises an error when it's rollback method is
called. The savepoing method shouldn't raise an error. This
way, transactions that create savepoints can proceed as long
as an attempt is never made to roll back a savepoint.
Abort must be called outside of a two-phase commit.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
"""
def sortKey():
"""
Return a key to use for ordering registered DataManagers
"""Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
# XXX: Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction before completing a commit"""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit"""
class ITransaction(zope.interface.Interface):
"""Object representing a running transaction.
......@@ -414,34 +245,6 @@ class ITransaction(zope.interface.Interface):
# Unsure: is this allowed to cause an exception here, during
# the two-phase commit, or can it toss data silently?
class ISavePoint(zope.interface.Interface):
"""ISavePoint objects represent partial transaction changes.
Sequences of savepoint objects are associated with transactions,
and with IResourceManagers.
"""
def rollback():
"""Discard changes made after this savepoint.
This includes discarding (call the discard method on) all
subsequent savepoints.
"""
def discard():
"""Discard changes saved by this savepoint.
That means changes made since the immediately preceding
savepoint if one exists, or since the start of the transaction,
until this savepoint.
Once a savepoint has been discarded, it's an error to attempt
to rollback or discard it again.
"""
next_savepoint = zope.interface.Attribute(
"""The next savepoint (later in time), or None if self is the
most recent savepoint.""")
class IRollback(zope.interface.Interface):
......@@ -457,3 +260,4 @@ class IRollback(zope.interface.Interface):
- The transaction has ended.
"""
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment