Commit 924991e3 authored by Jim Fulton's avatar Jim Fulton

Defined IStorageDB. As a result:

- Changed the signature for registerDB to ommit the unused second
  argument.  DB, the normal caller of registerDB will work with the
  old signature.

- Loosened the input requirements to invalidate to not require a
  dictionary with unused keys.

- Added a references function to give storages a way to extract object
  references from database records that will work with storage
  adapters that might change the record format, for example through
  encryption or compression.
parent dd581c8c
...@@ -389,7 +389,7 @@ class ClientStorage(object): ...@@ -389,7 +389,7 @@ class ClientStorage(object):
self._rpc_mgr.close() self._rpc_mgr.close()
self._rpc_mgr = None self._rpc_mgr = None
def registerDB(self, db, limit): def registerDB(self, db):
"""Storage API: register a database for invalidation messages. """Storage API: register a database for invalidation messages.
This is called by ZODB.DB (and by some tests). This is called by ZODB.DB (and by some tests).
...@@ -1221,7 +1221,11 @@ class ClientStorage(object): ...@@ -1221,7 +1221,11 @@ class ClientStorage(object):
if oid == self._load_oid: if oid == self._load_oid:
self._load_status = 0 self._load_status = 0
self._cache.invalidate(oid, version, tid) self._cache.invalidate(oid, version, tid)
versions.setdefault((version, tid), {})[oid] = tid oids = versions.get((version, tid))
if not oids:
versions[(version, tid)] = [oid]
else:
oids.append(oid)
if self._db is not None: if self._db is not None:
for (version, tid), d in versions.items(): for (version, tid), d in versions.items():
......
...@@ -134,7 +134,7 @@ class CommitLockTests: ...@@ -134,7 +134,7 @@ class CommitLockTests:
# address. # address.
addr = self._storage._addr addr = self._storage._addr
new = ZEO.ClientStorage.ClientStorage(addr, wait=1) new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
new.registerDB(DummyDB(), None) new.registerDB(DummyDB())
return new return new
def _get_timestamp(self): def _get_timestamp(self):
......
...@@ -184,7 +184,7 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -184,7 +184,7 @@ class CommonSetupTearDown(StorageTestBase):
username=username, username=username,
password=password, password=password,
realm=realm) realm=realm)
storage.registerDB(DummyDB(), None) storage.registerDB(DummyDB())
return storage return storage
def getServerConfig(self, addr, ro_svr): def getServerConfig(self, addr, ro_svr):
......
...@@ -155,7 +155,7 @@ class GenericTests( ...@@ -155,7 +155,7 @@ class GenericTests(
min_disconnect_poll=0.5, wait=1, min_disconnect_poll=0.5, wait=1,
wait_timeout=60, blob_dir=self.blob_cache_dir, wait_timeout=60, blob_dir=self.blob_cache_dir,
blob_cache_writable=self.blob_cache_writable) blob_cache_writable=self.blob_cache_writable)
self._storage.registerDB(DummyDB(), None) self._storage.registerDB(DummyDB())
def tearDown(self): def tearDown(self):
self._storage.close() self._storage.close()
...@@ -347,7 +347,7 @@ class ConnectionInvalidationOnReconnect( ...@@ -347,7 +347,7 @@ class ConnectionInvalidationOnReconnect(
pass pass
db = DummyDB() db = DummyDB()
storage.registerDB(db, None) storage.registerDB(db)
base = db._invalidatedCache base = db._invalidatedCache
...@@ -383,7 +383,7 @@ class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase): ...@@ -383,7 +383,7 @@ class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
_base = ClientStorage(zport, '1', cache_size=20000000, _base = ClientStorage(zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1, min_disconnect_poll=0.5, wait=1,
wait_timeout=60) wait_timeout=60)
_base.registerDB(DummyDB(), None) _base.registerDB(DummyDB())
return _base return _base
def tearDown(self): def tearDown(self):
......
...@@ -30,14 +30,18 @@ from ZODB.UndoLogCompatible import UndoLogCompatible ...@@ -30,14 +30,18 @@ from ZODB.UndoLogCompatible import UndoLogCompatible
log = logging.getLogger("ZODB.BaseStorage") log = logging.getLogger("ZODB.BaseStorage")
class BaseStorage(UndoLogCompatible): class BaseStorage(UndoLogCompatible):
"""Abstract base class that supports storage implementations. """Base class that supports storage implementations.
XXX Base classes like this are an attractive nuisance. They often
introduce more complexity than they save. While important logic
is implemented here, we should consider exposing it as utility
functions or as objects that can be used through composition.
A subclass must define the following methods: A subclass must define the following methods:
load() load()
store() store()
close() close()
cleanup() cleanup()
lastSerial()
lastTransaction() lastTransaction()
It must override these hooks: It must override these hooks:
...@@ -173,7 +177,7 @@ class BaseStorage(UndoLogCompatible): ...@@ -173,7 +177,7 @@ class BaseStorage(UndoLogCompatible):
finally: finally:
self._lock_release() self._lock_release()
def registerDB(self, db, limit): def registerDB(self, db):
pass # we don't care pass # we don't care
def isReadOnly(self): def isReadOnly(self):
......
...@@ -164,7 +164,7 @@ class Connection(ExportImport, object): ...@@ -164,7 +164,7 @@ class Connection(ExportImport, object):
# critical sections (if any -- this needs careful thought). # critical sections (if any -- this needs careful thought).
self._inv_lock = threading.Lock() self._inv_lock = threading.Lock()
self._invalidated = {} self._invalidated = set()
# Flag indicating whether the cache has been invalidated: # Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False self._invalidatedCache = False
...@@ -488,8 +488,8 @@ class Connection(ExportImport, object): ...@@ -488,8 +488,8 @@ class Connection(ExportImport, object):
# using a class while objects are being invalidated seems # using a class while objects are being invalidated seems
# small enough to be acceptable. # small enough to be acceptable.
invalidated = self._invalidated invalidated = dict.fromkeys(self._invalidated)
self._invalidated = {} self._invalidated = set()
self._txn_time = None self._txn_time = None
if self._invalidatedCache: if self._invalidatedCache:
self._invalidatedCache = False self._invalidatedCache = False
...@@ -906,7 +906,7 @@ class Connection(ExportImport, object): ...@@ -906,7 +906,7 @@ class Connection(ExportImport, object):
self._inv_lock.acquire() self._inv_lock.acquire()
try: try:
try: try:
del self._invalidated[obj._p_oid] self._invalidated.remove(obj._p_oid)
except KeyError: except KeyError:
pass pass
finally: finally:
......
...@@ -23,7 +23,7 @@ import logging ...@@ -23,7 +23,7 @@ import logging
from ZODB.broken import find_global from ZODB.broken import find_global
from ZODB.utils import z64 from ZODB.utils import z64
from ZODB.Connection import Connection from ZODB.Connection import Connection
from ZODB.serialize import referencesf import ZODB.serialize
from ZODB.utils import WeakSet from ZODB.utils import WeakSet
from zope.interface import implements from zope.interface import implements
...@@ -231,7 +231,12 @@ class DB(object): ...@@ -231,7 +231,12 @@ class DB(object):
# Setup storage # Setup storage
self._storage=storage self._storage=storage
storage.registerDB(self, None) self.references = ZODB.serialize.referencesf
try:
storage.registerDB(self)
except TypeError:
storage.registerDB(self, None) # Backward compat
if not hasattr(storage, 'tpc_vote'): if not hasattr(storage, 'tpc_vote'):
storage.tpc_vote = lambda *args: None storage.tpc_vote = lambda *args: None
try: try:
...@@ -467,7 +472,7 @@ class DB(object): ...@@ -467,7 +472,7 @@ class DB(object):
if connection is not None: if connection is not None:
version = connection._version version = connection._version
# Update modified in version cache # Update modified in version cache
for oid in oids.keys(): for oid in oids:
h = hash(oid) % 131 h = hash(oid) % 131
o = self._miv_cache.get(h, None) o = self._miv_cache.get(h, None)
if o is not None and o[0]==oid: if o is not None and o[0]==oid:
...@@ -608,7 +613,7 @@ class DB(object): ...@@ -608,7 +613,7 @@ class DB(object):
t = time() t = time()
t -= days * 86400 t -= days * 86400
try: try:
self._storage.pack(t, referencesf) self._storage.pack(t, self.references)
except: except:
logger.error("packing", exc_info=True) logger.error("packing", exc_info=True)
raise raise
...@@ -685,6 +690,8 @@ class DB(object): ...@@ -685,6 +690,8 @@ class DB(object):
def versionEmpty(self, version): def versionEmpty(self, version):
return self._storage.versionEmpty(version) return self._storage.versionEmpty(version)
resource_counter_lock = threading.Lock() resource_counter_lock = threading.Lock()
resource_counter = 0 resource_counter = 0
......
...@@ -89,6 +89,21 @@ from cPickle import loads ...@@ -89,6 +89,21 @@ from cPickle import loads
from BTrees import OOBTree from BTrees import OOBTree
class DemoStorage(BaseStorage): class DemoStorage(BaseStorage):
"""Demo storage
Demo storages provide useful storages for writing tests because
they store their data in memory and throw away their data
(implicitly) when they are closed.
They were originally designed to allow demonstrations using base
data provided on a CD. They can optionally wrap an *unchanging*
base storage. It is critical that the base storage does not
change. Using a changing base storage is not just unsupported, it
is known not to work and can even lead to serious errors and even
core dumps.
"""
def __init__(self, name='Demo Storage', base=None, quota=None): def __init__(self, name='Demo Storage', base=None, quota=None):
BaseStorage.__init__(self, name, base) BaseStorage.__init__(self, name, base)
...@@ -106,14 +121,6 @@ class DemoStorage(BaseStorage): ...@@ -106,14 +121,6 @@ class DemoStorage(BaseStorage):
raise POSException.StorageError( raise POSException.StorageError(
"Demo base storage has version data") "Demo base storage has version data")
# While we officially don't support wrapping a non-read-only base
# storage, it has proved useful for test suites to wrap a ClientStorage
# in DemoStorage. The least we can do to help support that case is
# to arrange for invalidations to get delivered to the base storage.
def registerDB(self, db, limit):
if self._base is not None: # delegate
self._base.registerDB(db, limit)
# When DemoStorage needs to create a new oid, and there is a base # When DemoStorage needs to create a new oid, and there is a base
# storage, it must use that storage's new_oid() method. Else # storage, it must use that storage's new_oid() method. Else
# DemoStorage may end up assigning "new" oids that are already in use # DemoStorage may end up assigning "new" oids that are already in use
......
...@@ -292,7 +292,54 @@ class IConnection(Interface): ...@@ -292,7 +292,54 @@ class IConnection(Interface):
""" """
class IDatabase(Interface): class IStorageDB(Interface):
"""Database interface exposed to storages
This interface provides 2 facilities:
- Out-of-band invalidation support
A storage can notify it's database of object invalidations that
don't occur due to direct operations on the storage. Currently
this is only used by ZEO client storages to pass invalidation
messages sent from a server.
- Record-reference extraction.
The references method can be used to extract referenced object
IDs from a database record. This can be used by storages to
provide more advanced garbage collection.
This interface may be implemented by storage adapters or other
intermediaries. For example, a storage adapter that provides
encryption and/or compresssion will apply record transformations
in it's references method.
"""
def invalidateCache():
"""Discard all cached data
This can be necessary if there have been major changes to
stored data and it is either impractical to enumerate them or
there would be so many that it would be inefficient to do so.
"""
def references(record, oids=None):
"""Scan the given record for object ids
A list of object ids is returned. If a list is passed in,
then it will be used and augmented. Otherwise, a new list will
be created and returned.
"""
def invalidate(transaction_id, oids, version=''):
"""Invalidate object ids committed by the given transaction
The oids argument is an iterable of object identifiers.
"""
class IDatabase(IStorageDB):
"""ZODB DB. """ZODB DB.
TODO: This interface is incomplete. TODO: This interface is incomplete.
...@@ -399,7 +446,7 @@ class IStorage(Interface): ...@@ -399,7 +446,7 @@ class IStorage(Interface):
## def set_max_oid(possible_new_max_oid): ## def set_max_oid(possible_new_max_oid):
## """TODO""" ## """TODO"""
## ##
## def registerDB(db, limit): ## def registerDB(db):
## """TODO""" ## """TODO"""
## ##
## def isReadOnly(): ## def isReadOnly():
......
...@@ -474,7 +474,7 @@ class InvalidationTests(unittest.TestCase): ...@@ -474,7 +474,7 @@ class InvalidationTests(unittest.TestCase):
>>> p3._p_state >>> p3._p_state
0 0
>>> cn._invalidated >>> cn._invalidated
{} set([])
""" """
......
...@@ -131,6 +131,14 @@ class DBTests(unittest.TestCase): ...@@ -131,6 +131,14 @@ class DBTests(unittest.TestCase):
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3) self.assertEqual(nconn(pools), 3)
def test_references(self):
# TODO: For now test that we're using referencesf. We really should
# have tests of referencesf.
import ZODB.serialize
self.assert_(self.db.references is ZODB.serialize.referencesf)
def test_invalidateCache(): def test_invalidateCache():
"""\ """\
......
...@@ -323,7 +323,7 @@ to test the Connection code. ...@@ -323,7 +323,7 @@ to test the Connection code.
... self.hooked = {} ... self.hooked = {}
... self.count = 0 ... self.count = 0
... super(TestStorage, self).__init__() ... super(TestStorage, self).__init__()
... def registerDB(self, db, limit): ... def registerDB(self, db):
... self.db = db ... self.db = db
... def hook(self, oid, tid, version): ... def hook(self, oid, tid, version):
... if oid in self.hooked: ... if oid in self.hooked:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment