Commit eb02632c authored by Kirill Smelkov's avatar Kirill Smelkov

loadAt

loadAt is new optional storage interface that is intended to replace loadBefore
with more clean and uniform semantic. Compared to loadBefore, loadAt:

1) returns data=None and serial of the removal, when loaded object was found to
   be deleted. loadBefore is returning only data=None in such case. This loadAt
   property allows to fix DemoStorage data corruption when whiteouts in overlay
   part were not previously correctly taken into account.

   https://github.com/zopefoundation/ZODB/issues/318

2) for regular data records, does not require storages to return next_serial,
   in addition to (data, serial). loadBefore requirement to return both
   serial and next_serial is constraining storages unnecessarily, and,
   while for FileStorage it is free to implement, for other storages it is
   not - for example for NEO and RelStorage, finding out next_serial, after
   looking up oid@at data record, costs one more SQL query:

   https://lab.nexedi.com/nexedi/neoppod/blob/fb746e6b/neo/storage/database/mysqldb.py#L484-508
   https://lab.nexedi.com/nexedi/neoppod/blob/fb746e6b/neo/storage/database/mysqldb.py#L477-482

   https://github.com/zodb/relstorage/blob/3.1.1-1-ge7628f9/src/relstorage/storage/load.py#L259-L264
   https://github.com/zodb/relstorage/blob/3.1.1-1-ge7628f9/src/relstorage/adapters/mover.py#L177-L199

   next_serial is not only about execution overhead - it is semantically
   redundant to be there and can be removed from load return. The reason
   I say that next_serial can be removed is that in ZODB/py the only place,
   that I could find, where next_serial is used on client side is in client
   cache (e.g. in NEO client cache), and that cache can be remade to
   work without using that next_serial at all. In simple words whenever
   after

     loadAt(oid, at)  ->  (data, serial)

   query, the cache can remember data for oid in [serial, at] range.

   Next, when invalidation message from server is received, cache entries,
   that had at == client_head, are extended (at -> new_head) for oids that
   are not present in invalidation message, while for oids that are present
   in invalidation message no such extension is done. This allows to
   maintain cache in correct state, invalidate it when there is a need to
   invalidate, and not to throw away cache entries that should remain live.
   This of course requires ZODB server to include both modified and
   just-created objects into invalidation messages

     ( https://github.com/zopefoundation/ZEO/pull/160 ,
       https://github.com/zopefoundation/ZODB/pull/319 ).

   Switching to loadAt should thus allow storages like NEO and, maybe,
   RelStorage, to do 2x less SQL queries on every object access.

   https://github.com/zopefoundation/ZODB/issues/318#issuecomment-657685745

In other words loadAt unifies return signature to always be

   (data, serial)

instead of

   POSKeyError				object does not exist at all
   None					object was removed
   (data, serial, next_serial)		regular data record

used by loadBefore.

This patch:

- introduces new interface.
- introduces ZODB.utils.loadAt helper, that uses either storage.loadAt,
  or, if the storage does not implement loadAt interface, tries to mimic
  loadAt semantic via storage.loadBefore to possible extent + emits
  corresponding warning.
- converts MVCCAdapter to use loadAt instead of loadBefore.
- changes DemoStorage to use loadAt, and this way fixes above-mentioned
  data corruption issue; adds corresponding test; converts
  DemoStorage.loadBefore to be a wrapper around DemoStorage.loadAt.
- adds loadAt implementation to FileStorage and MappingStorage.
- adapts other tests/code correspondingly.

/cc @jimfulton, @jamadden, @vpelletier, @jmuchemb, @arnaud-fontaine, @gidzit, @klawlf82, @hannosch
parent ddfe57eb
...@@ -59,7 +59,7 @@ class BaseStorage(UndoLogCompatible): ...@@ -59,7 +59,7 @@ class BaseStorage(UndoLogCompatible):
If it stores multiple revisions, it should implement If it stores multiple revisions, it should implement
loadSerial() loadSerial()
loadBefore() loadAt()
Each storage will have two locks that are accessed via lock Each storage will have two locks that are accessed via lock
acquire and release methods bound to the instance. (Yuck.) acquire and release methods bound to the instance. (Yuck.)
...@@ -267,9 +267,8 @@ class BaseStorage(UndoLogCompatible): ...@@ -267,9 +267,8 @@ class BaseStorage(UndoLogCompatible):
raise POSException.Unsupported( raise POSException.Unsupported(
"Retrieval of historical revisions is not supported") "Retrieval of historical revisions is not supported")
def loadBefore(self, oid, tid): # do not provide loadAt/loadBefore here in BaseStorage - if child forgets
"""Return most recent revision of oid before tid committed.""" # to override it - storage will always return "no data" instead of failing.
return None
def copyTransactionsFrom(self, other, verbose=0): def copyTransactionsFrom(self, other, verbose=0):
"""Copy transactions from another storage. """Copy transactions from another storage.
......
...@@ -726,8 +726,7 @@ class DB(object): ...@@ -726,8 +726,7 @@ class DB(object):
- `before`: like `at`, but opens the readonly state before the - `before`: like `at`, but opens the readonly state before the
tid or datetime. tid or datetime.
""" """
# `at` is normalized to `before`, since we use storage.loadBefore # `at` is normalized to `before`.
# as the underlying implementation of both.
before = getTID(at, before) before = getTID(at, before)
if (before is not None and if (before is not None and
before > self.lastTransaction() and before > self.lastTransaction() and
......
...@@ -24,6 +24,7 @@ import os ...@@ -24,6 +24,7 @@ import os
import random import random
import weakref import weakref
import tempfile import tempfile
import warnings
import ZODB.BaseStorage import ZODB.BaseStorage
import ZODB.blob import ZODB.blob
import ZODB.interfaces import ZODB.interfaces
...@@ -223,45 +224,55 @@ class DemoStorage(ConflictResolvingStorage): ...@@ -223,45 +224,55 @@ class DemoStorage(ConflictResolvingStorage):
# still want load for old clients (e.g. zeo servers) # still want load for old clients (e.g. zeo servers)
load = load_current load = load_current
def loadBefore(self, oid, tid): def loadAt(self, oid, at):
try: data, serial = ZODB.utils.loadAt(self.changes, oid, at)
result = self.changes.loadBefore(oid, tid) if (data is not None) or (serial != ZODB.utils.z64):
except ZODB.POSException.POSKeyError: # object is present in changes either as data or deletion record.
# The oid isn't in the changes, so defer to base return data, serial
return self.base.loadBefore(oid, tid)
if result is None: # object is not present in changes at all - use base
# The oid *was* in the changes, but there aren't any return ZODB.utils.loadAt(self.base, oid, at)
# earlier records. Maybe there are in the base.
try: def loadBefore(self, oid, before):
result = self.base.loadBefore(oid, tid) warnings.warn("loadBefore is deprecated - use loadAt instead",
except ZODB.POSException.POSKeyError: DeprecationWarning, stacklevel=2)
# The oid isn't in the base, so None will be the right result p64 = ZODB.utils.p64
pass u64 = ZODB.utils.u64
if before in (maxtid, ZODB.utils.z64):
at = before
else:
at = p64(u64(before)-1)
data, serial = self.loadAt(oid, at)
# find out next_serial.
# it is ok to use dumb/slow implementation since loadBefore should not
# be used and is provided only for backward compatibility.
next_serial = maxtid
while 1:
_, s = self.loadAt(oid, p64(u64(next_serial)-1))
assert s >= serial
if s == serial:
# found - next_serial is serial of the next data record
break
next_serial = s
if next_serial == maxtid:
next_serial = None
# next_serial found -> return/raise what loadBefore users expect
if data is None:
if next_serial is None:
# object was never created
raise ZODB.POSException.POSKeyError(oid)
else: else:
if result and not result[-1]: # object was deleted
# The oid is current in the base. We need to find return None
# the end tid in the base by fining the first tid
# in the changes. Unfortunately, there isn't an # regular data record
# api for this, so we have to walk back using return data, serial, next_serial
# loadBefore.
if tid == maxtid:
# Special case: we were looking for the
# current value. We won't find anything in
# changes, so we're done.
return result
end_tid = maxtid
t = self.changes.loadBefore(oid, end_tid)
while t:
end_tid = t[1]
t = self.changes.loadBefore(oid, end_tid)
result = result[:2] + (
end_tid if end_tid != maxtid else None,
)
return result
def loadBlob(self, oid, serial): def loadBlob(self, oid, serial):
try: try:
......
...@@ -21,6 +21,7 @@ import errno ...@@ -21,6 +21,7 @@ import errno
import logging import logging
import os import os
import time import time
import warnings
from struct import pack from struct import pack
from struct import unpack from struct import unpack
...@@ -58,6 +59,7 @@ from ZODB.interfaces import IStorageIteration ...@@ -58,6 +59,7 @@ from ZODB.interfaces import IStorageIteration
from ZODB.interfaces import IStorageRestoreable from ZODB.interfaces import IStorageRestoreable
from ZODB.interfaces import IStorageUndoable from ZODB.interfaces import IStorageUndoable
from ZODB.interfaces import IStorageLastPack from ZODB.interfaces import IStorageLastPack
from ZODB.interfaces import IStorageLoadAt
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from ZODB.POSException import MultipleUndoErrors from ZODB.POSException import MultipleUndoErrors
from ZODB.POSException import POSKeyError from ZODB.POSException import POSKeyError
...@@ -135,6 +137,7 @@ class TempFormatter(FileStorageFormatter): ...@@ -135,6 +137,7 @@ class TempFormatter(FileStorageFormatter):
IExternalGC, IExternalGC,
IStorage, IStorage,
IStorageLastPack, IStorageLastPack,
IStorageLoadAt,
) )
class FileStorage( class FileStorage(
FileStorageFormatter, FileStorageFormatter,
...@@ -566,7 +569,40 @@ class FileStorage( ...@@ -566,7 +569,40 @@ class FileStorage(
else: else:
return self._loadBack_impl(oid, h.back)[0] return self._loadBack_impl(oid, h.back)[0]
def loadAt(self, oid, at):
"""loadAt implements IStorageLoadAt."""
with self._files.get() as _file:
try:
pos = self._lookup_pos(oid)
except POSKeyError:
# object does not exist
return None, z64
while 1:
h = self._read_data_header(pos, oid, _file)
if h.tid <= at:
break
pos = h.prev
if not pos:
# object not yet created as of at
return None, z64
# h is the most recent DataHeader with .tid <= at
if h.plen:
# regular data record
return _file.read(h.plen), h.tid
elif h.back:
# backpointer
data, _, _, _ = self._loadBack_impl(oid, h.back,
fail=False, _file=_file)
return data, h.tid
else:
# deletion
return None, h.tid
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
warnings.warn("loadBefore is deprecated - use loadAt instead",
DeprecationWarning, stacklevel=2)
with self._files.get() as _file: with self._files.get() as _file:
pos = self._lookup_pos(oid) pos = self._lookup_pos(oid)
end_tid = None end_tid = None
......
...@@ -19,6 +19,7 @@ storage without distracting storage details. ...@@ -19,6 +19,7 @@ storage without distracting storage details.
import BTrees import BTrees
import time import time
import warnings
import ZODB.BaseStorage import ZODB.BaseStorage
import ZODB.interfaces import ZODB.interfaces
import ZODB.POSException import ZODB.POSException
...@@ -31,6 +32,7 @@ import zope.interface ...@@ -31,6 +32,7 @@ import zope.interface
ZODB.interfaces.IStorage, ZODB.interfaces.IStorage,
ZODB.interfaces.IStorageIteration, ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageLastPack, ZODB.interfaces.IStorageLastPack,
ZODB.interfaces.IStorageLoadAt,
) )
class MappingStorage(object): class MappingStorage(object):
"""In-memory storage implementation """In-memory storage implementation
...@@ -149,9 +151,26 @@ class MappingStorage(object): ...@@ -149,9 +151,26 @@ class MappingStorage(object):
load = ZODB.utils.load_current load = ZODB.utils.load_current
# ZODB.interfaces.IStorageLoadAt
@ZODB.utils.locked(opened)
def loadAt(self, oid, at):
z64 = ZODB.utils.z64
tid_data = self._data.get(oid)
if not tid_data:
return None, z64
if at == z64:
return None, z64
tids_at = tid_data.keys(None, at)
if not tids_at:
return None, z64
serial = tids_at[-1]
return tid_data[serial], serial
# ZODB.interfaces.IStorage # ZODB.interfaces.IStorage
@ZODB.utils.locked(opened) @ZODB.utils.locked(opened)
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
warnings.warn("loadBefore is deprecated - use loadAt instead",
DeprecationWarning, stacklevel=2)
tid_data = self._data.get(oid) tid_data = self._data.get(oid)
if tid_data: if tid_data:
before = ZODB.utils.u64(tid) before = ZODB.utils.u64(tid)
......
...@@ -866,10 +866,10 @@ class BlobStorage(BlobStorageMixin): ...@@ -866,10 +866,10 @@ class BlobStorage(BlobStorageMixin):
for oid in self.fshelper.getOIDsForSerial(serial_id): for oid in self.fshelper.getOIDsForSerial(serial_id):
# we want to find the serial id of the previous revision # we want to find the serial id of the previous revision
# of this blob object. # of this blob object.
load_result = self.loadBefore(oid, serial_id) at_before = utils.p64(utils.u64(serial_id)-1)
_, serial_before = utils.loadAt(self, oid, at_before)
if load_result is None:
if serial_before == utils.z64:
# There was no previous revision of this blob # There was no previous revision of this blob
# object. The blob was created in the transaction # object. The blob was created in the transaction
# represented by serial_id. We copy the blob data # represented by serial_id. We copy the blob data
...@@ -884,7 +884,6 @@ class BlobStorage(BlobStorageMixin): ...@@ -884,7 +884,6 @@ class BlobStorage(BlobStorageMixin):
# transaction implied by "serial_id". We copy the blob # transaction implied by "serial_id". We copy the blob
# data to a new file that references the undo transaction # data to a new file that references the undo transaction
# in case a user wishes to undo this undo. # in case a user wishes to undo this undo.
data, serial_before, serial_after = load_result
orig_fn = self.fshelper.getBlobFilename(oid, serial_before) orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial) new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
with open(orig_fn, "rb") as orig: with open(orig_fn, "rb") as orig:
......
...@@ -36,7 +36,7 @@ development continues on a "development" head. ...@@ -36,7 +36,7 @@ development continues on a "development" head.
A database can be opened historically ``at`` or ``before`` a given transaction A database can be opened historically ``at`` or ``before`` a given transaction
serial or datetime. Here's a simple example. It should work with any storage serial or datetime. Here's a simple example. It should work with any storage
that supports ``loadBefore``. that supports ``loadAt`` or ``loadBefore``.
We'll begin our example with a fairly standard set up. We We'll begin our example with a fairly standard set up. We
...@@ -138,10 +138,9 @@ root. ...@@ -138,10 +138,9 @@ root.
>>> historical_conn.root()['first']['count'] >>> historical_conn.root()['first']['count']
0 0
In fact, ``at`` arguments are translated into ``before`` values because the In fact, ``at`` arguments are translated into ``before`` values.
underlying mechanism is a storage's loadBefore method. When you look at a When you look at a connection's ``before`` attribute, it is normalized into a
connection's ``before`` attribute, it is normalized into a ``before`` serial, ``before`` serial, no matter what you pass into ``db.open``.
no matter what you pass into ``db.open``.
>>> print(conn.before) >>> print(conn.before)
None None
......
...@@ -701,6 +701,9 @@ class IStorage(Interface): ...@@ -701,6 +701,9 @@ class IStorage(Interface):
def loadBefore(oid, tid): def loadBefore(oid, tid):
"""Load the object data written before a transaction id """Load the object data written before a transaction id
( This method is deprecated and kept for backward-compatibility.
Please use loadAt instead. )
If there isn't data before the object before the given If there isn't data before the object before the given
transaction, then None is returned, otherwise three values are transaction, then None is returned, otherwise three values are
returned: returned:
...@@ -907,6 +910,24 @@ class IStorageLastPack(Interface): ...@@ -907,6 +910,24 @@ class IStorageLastPack(Interface):
to perform round-trip and synchronize with the server. to perform round-trip and synchronize with the server.
""" """
class IStorageLoadAt(Interface):
def loadAt(oid, at): # -> (data, serial)
"""Load object data as observed at given database state.
loadAt returns data for object with given object ID as observed by
database state ≤ at. Two values are returned:
- The data record,
- The transaction ID of the data record.
If the object does not exist, or is deleted as of `at` database state,
loadAt returns data=None, and serial indicates transaction ID of the
most recent deletion done in transaction with ID ≤ at, or null tid if
there is no such deletion.
Note: no POSKeyError is raised even if object id is not in the storage.
"""
class IMultiCommitStorage(IStorage): class IMultiCommitStorage(IStorage):
"""A multi-commit storage can commit multiple transactions at once. """A multi-commit storage can commit multiple transactions at once.
......
...@@ -10,7 +10,7 @@ also simplifies the implementation of the DB and Connection classes. ...@@ -10,7 +10,7 @@ also simplifies the implementation of the DB and Connection classes.
import zope.interface import zope.interface
from . import interfaces, serialize, POSException from . import interfaces, serialize, POSException
from .utils import p64, u64, Lock, oid_repr, tid_repr from .utils import p64, u64, Lock, loadAt, oid_repr, tid_repr
class Base(object): class Base(object):
...@@ -99,7 +99,7 @@ class MVCCAdapterInstance(Base): ...@@ -99,7 +99,7 @@ class MVCCAdapterInstance(Base):
'checkCurrentSerialInTransaction', 'tpc_abort', 'checkCurrentSerialInTransaction', 'tpc_abort',
) )
_start = None # Transaction start time _start = None # Transaction start time (before)
_ltid = b'' # Last storage transaction id _ltid = b'' # Last storage transaction id
def __init__(self, base): def __init__(self, base):
...@@ -151,8 +151,9 @@ class MVCCAdapterInstance(Base): ...@@ -151,8 +151,9 @@ class MVCCAdapterInstance(Base):
def load(self, oid): def load(self, oid):
assert self._start is not None assert self._start is not None
r = self._storage.loadBefore(oid, self._start) at = p64(u64(self._start)-1)
if r is None: data, serial = loadAt(self._storage, oid, at)
if data is None:
# object was deleted or not-yet-created. # object was deleted or not-yet-created.
# raise POSKeyError, or ReadConflictError, if the deletion is # raise POSKeyError, or ReadConflictError, if the deletion is
# potentially due to simultaneous pack: a pack(t+δ) could be # potentially due to simultaneous pack: a pack(t+δ) could be
...@@ -186,7 +187,7 @@ class MVCCAdapterInstance(Base): ...@@ -186,7 +187,7 @@ class MVCCAdapterInstance(Base):
# no simultaneous pack detected, or lastPack was before our view of the database # no simultaneous pack detected, or lastPack was before our view of the database
raise POSException.POSKeyError(oid) raise POSException.POSKeyError(oid)
return r[:2] return data, serial
def prefetch(self, oids): def prefetch(self, oids):
try: try:
...@@ -265,10 +266,11 @@ class HistoricalStorageAdapter(Base): ...@@ -265,10 +266,11 @@ class HistoricalStorageAdapter(Base):
new_oid = pack = store = read_only_writer new_oid = pack = store = read_only_writer
def load(self, oid, version=''): def load(self, oid, version=''):
r = self._storage.loadBefore(oid, self._before) at = p64(u64(self._before)-1)
if r is None: data, serial = loadAt(self._storage, oid, at)
if data is None:
raise POSException.POSKeyError(oid) raise POSException.POSKeyError(oid)
return r[:2] return data, serial
class UndoAdapterInstance(Base): class UndoAdapterInstance(Base):
......
...@@ -16,6 +16,7 @@ A create_storage function is provided that creates a storage. ...@@ -16,6 +16,7 @@ A create_storage function is provided that creates a storage.
>>> transaction.commit() >>> transaction.commit()
>>> oid0 = conn.root()[0]._p_oid >>> oid0 = conn.root()[0]._p_oid
>>> oid1 = conn.root()[1]._p_oid >>> oid1 = conn.root()[1]._p_oid
>>> atLive = conn.root()._p_serial
>>> del conn.root()[0] >>> del conn.root()[0]
>>> del conn.root()[1] >>> del conn.root()[1]
>>> transaction.commit() >>> transaction.commit()
...@@ -66,9 +67,10 @@ Now if we try to load data for the objects, we get a POSKeyError: ...@@ -66,9 +67,10 @@ Now if we try to load data for the objects, we get a POSKeyError:
We can still get the data if we load before the time we deleted. We can still get the data if we load before the time we deleted.
>>> storage.loadBefore(oid0, conn.root()._p_serial) == (p0, s0, tid) >>> from ZODB.utils import loadAt, z64
>>> loadAt(storage, oid0, atLive) == (p0, s0)
True True
>>> storage.loadBefore(oid1, conn.root()._p_serial) == (p1, s1, tid) >>> loadAt(storage, oid1, atLive) == (p1, s1)
True True
>>> with open(storage.loadBlob(oid1, s1)) as fp: fp.read() >>> with open(storage.loadBlob(oid1, s1)) as fp: fp.read()
'some data' 'some data'
...@@ -92,15 +94,11 @@ gone: ...@@ -92,15 +94,11 @@ gone:
... ...
POSKeyError: ... POSKeyError: ...
>>> storage.loadBefore(oid0, conn.root()._p_serial) # doctest: +ELLIPSIS >>> loadAt(storage, oid0, atLive) == (None, z64)
Traceback (most recent call last): True
...
POSKeyError: ...
>>> storage.loadBefore(oid1, conn.root()._p_serial) # doctest: +ELLIPSIS >>> loadAt(storage, oid1, atLive) == (None, z64)
Traceback (most recent call last): True
...
POSKeyError: ...
>>> storage.loadBlob(oid1, s1) # doctest: +ELLIPSIS >>> storage.loadBlob(oid1, s1) # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
......
...@@ -47,6 +47,7 @@ class MVCCMappingStorage(MappingStorage): ...@@ -47,6 +47,7 @@ class MVCCMappingStorage(MappingStorage):
inst.pack = self.pack inst.pack = self.pack
inst.lastPack = self.lastPack inst.lastPack = self.lastPack
inst.loadBefore = self.loadBefore inst.loadBefore = self.loadBefore
inst.loadAt = self.loadAt
inst._ltid = self._ltid inst._ltid = self._ltid
inst._main_lock = self._lock inst._main_lock = self._lock
return inst return inst
......
...@@ -39,6 +39,13 @@ class HexStorage(object): ...@@ -39,6 +39,13 @@ class HexStorage(object):
setattr(self, name, v) setattr(self, name, v)
zope.interface.directlyProvides(self, zope.interface.providedBy(base)) zope.interface.directlyProvides(self, zope.interface.providedBy(base))
if hasattr(base, 'loadAt') and 'loadAt' not in self.copied_methods:
def loadAt(oid, at):
data, serial = self.base.loadAt(oid, at)
if data is not None:
data = unhexlify(data[2:])
return data, serial
self.loadAt = loadAt
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self.base, name) return getattr(self.base, name)
...@@ -130,7 +137,7 @@ class ServerHexStorage(HexStorage): ...@@ -130,7 +137,7 @@ class ServerHexStorage(HexStorage):
""" """
copied_methods = HexStorage.copied_methods + ( copied_methods = HexStorage.copied_methods + (
'load', 'loadBefore', 'loadSerial', 'store', 'restore', 'load', 'loadAt', 'loadBefore', 'loadSerial', 'store', 'restore',
'iterator', 'storeBlob', 'restoreBlob', 'record_iternext', 'iterator', 'storeBlob', 'restoreBlob', 'record_iternext',
) )
......
...@@ -1328,7 +1328,16 @@ class StubStorage(object): ...@@ -1328,7 +1328,16 @@ class StubStorage(object):
raise TypeError('StubStorage does not support versions.') raise TypeError('StubStorage does not support versions.')
return self._data[oid] return self._data[oid]
def loadAt(self, oid, at):
try:
data, serial = self._transdata[oid]
except KeyError:
return None, z64
return data, serial
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
warnings.warn("loadBefore is deprecated - use loadAt instead",
DeprecationWarning, stacklevel=2)
return self._data[oid] + (None, ) return self._data[oid] + (None, )
def store(self, oid, serial, p, version, transaction): def store(self, oid, serial, p, version, transaction):
......
...@@ -23,6 +23,7 @@ from ZODB.tests import ( ...@@ -23,6 +23,7 @@ from ZODB.tests import (
StorageTestBase, StorageTestBase,
Synchronization, Synchronization,
) )
from ZODB.tests.MinPO import MinPO
import os import os
if os.environ.get('USE_ZOPE_TESTING_DOCTEST'): if os.environ.get('USE_ZOPE_TESTING_DOCTEST'):
...@@ -33,7 +34,9 @@ import random ...@@ -33,7 +34,9 @@ import random
import re import re
import transaction import transaction
import unittest import unittest
import ZODB.Connection
import ZODB.DemoStorage import ZODB.DemoStorage
import ZODB.FileStorage
import ZODB.tests.hexstorage import ZODB.tests.hexstorage
import ZODB.tests.util import ZODB.tests.util
import ZODB.utils import ZODB.utils
...@@ -264,6 +267,101 @@ def load_before_base_storage_current(): ...@@ -264,6 +267,101 @@ def load_before_base_storage_current():
>>> base.close() >>> base.close()
""" """
# additional DemoStorage tests that do not fit into common DemoStorageTests setup.
class DemoStorageTests2(ZODB.tests.util.TestCase):
def checkLoadAfterDelete(self):
"""Verify that DemoStorage correctly handles load requests for objects
deleted in read-write part of the storage.
https://github.com/zopefoundation/ZODB/issues/318
"""
FileStorage = ZODB.FileStorage.FileStorage
DemoStorage = ZODB.DemoStorage.DemoStorage
TransactionMetaData = ZODB.Connection.TransactionMetaData
# mkbase prepares base part of the storage.
def mkbase(): # -> zbase
zbase = FileStorage("base.fs")
db = DB(zbase)
conn = db.open()
root = conn.root()
root['obj'] = obj = MinPO(0)
transaction.commit()
obj.value += 1
transaction.commit()
conn.close()
db.close()
zbase.close()
zbase = FileStorage("base.fs", read_only=True)
return zbase
# prepare base + overlay
zbase = mkbase()
zoverlay = FileStorage("overlay.fs")
zdemo = DemoStorage(base=zbase, changes=zoverlay)
# overlay: modify obj and root
db = DB(zdemo)
conn = db.open()
root = conn.root()
obj = root['obj']
oid = obj._p_oid
obj.value += 1
# modify root as well so that there is root revision saved in overlay that points to obj
root['x'] = 1
transaction.commit()
atLive = obj._p_serial
# overlay: delete obj from root making it a garbage
del root['obj']
transaction.commit()
atUnlink = root._p_serial
# unmount DemoStorage
conn.close()
db.close()
zdemo.close() # closes zbase and zoverlay as well
del zbase, zoverlay
# simulate GC on base+overlay
zoverlay = FileStorage("overlay.fs")
txn = transaction.get()
txn_meta = TransactionMetaData(txn.user, txn.description, txn.extension)
zoverlay.tpc_begin(txn_meta)
zoverlay.deleteObject(oid, atLive, txn_meta)
zoverlay.tpc_vote(txn_meta)
atGC = zoverlay.tpc_finish(txn_meta)
# remount base+overlay
zbase = FileStorage("base.fs", read_only=True)
zdemo = ZODB.DemoStorage.DemoStorage(base=zbase, changes=zoverlay)
db = DB(zdemo)
# verify:
# load(obj, atLive) -> 2
# load(obj, atUnlink) -> 2 (garbage, but still in DB)
# load(obj, atGC) -> POSKeyError, not 1 from base
def getObjAt(at):
conn = db.open(at=at)
obj = conn.get(oid)
self.assertIsInstance(obj, MinPO)
v = obj.value
conn.close()
return v
self.assertEqual(getObjAt(atLive), 2)
self.assertEqual(getObjAt(atUnlink), 2)
self.assertRaises(ZODB.POSException.POSKeyError, getObjAt, atGC)
# end
db.close()
zdemo.close() # closes zbase and zoverlay as well
def test_suite(): def test_suite():
suite = unittest.TestSuite(( suite = unittest.TestSuite((
doctest.DocTestSuite( doctest.DocTestSuite(
...@@ -285,4 +383,5 @@ def test_suite(): ...@@ -285,4 +383,5 @@ def test_suite():
'check')) 'check'))
suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundHexMappingStorage, suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundHexMappingStorage,
'check')) 'check'))
suite.addTest(unittest.makeSuite(DemoStorageTests2, 'check'))
return suite return suite
...@@ -20,10 +20,11 @@ storage tests against the test storage. ...@@ -20,10 +20,11 @@ storage tests against the test storage.
""" """
import bisect import bisect
import unittest import unittest
import warnings
from ZODB.BaseStorage import BaseStorage from ZODB.BaseStorage import BaseStorage
from ZODB import POSException from ZODB import POSException
from ZODB.utils import z64 from ZODB.utils import p64, u64, z64
from ZODB.tests import StorageTestBase from ZODB.tests import StorageTestBase
from ZODB.tests import BasicStorage, MTStorage, Synchronization from ZODB.tests import BasicStorage, MTStorage, Synchronization
...@@ -105,6 +106,11 @@ class MinimalMemoryStorage(BaseStorage, object): ...@@ -105,6 +106,11 @@ class MinimalMemoryStorage(BaseStorage, object):
self._ltid = self._tid self._ltid = self._tid
def loadBefore(self, the_oid, the_tid): def loadBefore(self, the_oid, the_tid):
warnings.warn("loadBefore is deprecated - use loadAt instead",
DeprecationWarning, stacklevel=2)
return self._loadBefore(the_oid, the_tid)
def _loadBefore(self, the_oid, the_tid):
# It's okay if loadBefore() is really expensive, because this # It's okay if loadBefore() is really expensive, because this
# storage is just used for testing. # storage is just used for testing.
with self._lock: with self._lock:
...@@ -126,6 +132,17 @@ class MinimalMemoryStorage(BaseStorage, object): ...@@ -126,6 +132,17 @@ class MinimalMemoryStorage(BaseStorage, object):
return self._index[(the_oid, tid)], tid, end_tid return self._index[(the_oid, tid)], tid, end_tid
def loadAt(self, oid, at):
try:
r = self._loadBefore(oid, p64(u64(at)+1))
except KeyError:
return None, z64
if r is None:
# not-yet created (deleteObject not supported -> serial=0)
return None, z64
data, serial, _ = r
return data, serial
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
return self._index[(oid, serial)] return self._index[(oid, serial)]
......
...@@ -35,7 +35,7 @@ MinimalMemoryStorage that implements MVCC support, but not much else. ...@@ -35,7 +35,7 @@ MinimalMemoryStorage that implements MVCC support, but not much else.
***IMPORTANT***: The MVCC approach has changed since these tests were ***IMPORTANT***: The MVCC approach has changed since these tests were
originally written. The new approach is much simpler because we no originally written. The new approach is much simpler because we no
longer call load to get the current state of an object. We call longer call load to get the current state of an object. We call
loadBefore instead, having gotten a transaction time at the start of a loadAt instead, having gotten a transaction time at the start of a
transaction. As a result, the rhythm of the tests is a little odd, transaction. As a result, the rhythm of the tests is a little odd,
because we no longer need to probe a complex dance that doesn't exist any more. because we no longer need to probe a complex dance that doesn't exist any more.
...@@ -290,7 +290,7 @@ first connection's state for b "old". ...@@ -290,7 +290,7 @@ first connection's state for b "old".
Now deactivate "b" in the first connection, and (re)fetch it. The first Now deactivate "b" in the first connection, and (re)fetch it. The first
connection should still see 1, due to MVCC, but to get this old state connection should still see 1, due to MVCC, but to get this old state
TmpStore needs to handle the loadBefore() method. TmpStore needs to handle the loadAt() or loadBefore() methods.
>>> r1["b"]._p_deactivate() >>> r1["b"]._p_deactivate()
...@@ -322,7 +322,7 @@ why ZODB no-longer calls load. :) ...@@ -322,7 +322,7 @@ why ZODB no-longer calls load. :)
Rather than add all the complexity of ZEO to these tests, the Rather than add all the complexity of ZEO to these tests, the
MinimalMemoryStorage has a hook. We'll write a subclass that will MinimalMemoryStorage has a hook. We'll write a subclass that will
deliver an invalidation when it loads (or loadBefore's) an object. deliver an invalidation when it loads (or loadAt's) an object.
The hook allows us to test the Connection code. The hook allows us to test the Connection code.
>>> class TestStorage(MinimalMemoryStorage): >>> class TestStorage(MinimalMemoryStorage):
......
...@@ -17,6 +17,7 @@ import struct ...@@ -17,6 +17,7 @@ import struct
import sys import sys
import time import time
import threading import threading
import warnings
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from tempfile import mkstemp from tempfile import mkstemp
...@@ -382,8 +383,51 @@ def load_current(storage, oid, version=''): ...@@ -382,8 +383,51 @@ def load_current(storage, oid, version=''):
some time in the future. some time in the future.
""" """
assert not version assert not version
r = storage.loadBefore(oid, maxtid) data, serial = loadAt(storage, oid, maxtid)
if r is None: if data is None:
raise ZODB.POSException.POSKeyError(oid) raise ZODB.POSException.POSKeyError(oid)
assert r[2] is None return data, serial
return r[:2]
_loadAtWarned = set() # of storage class
def loadAt(storage, oid, at):
"""loadAt provides IStorageLoadAt semantic for all storages.
Storages that do not implement loadAt are served via loadBefore.
"""
load_at = getattr(storage, 'loadAt', None)
if load_at is not None:
return load_at(oid, at)
# storage does not provide IStorageLoadAt - warn + fall back to loadBefore
if type(storage) not in _loadAtWarned:
# there is potential race around _loadAtWarned access, but due to the
# GIL this race cannot result in that set corruption, and can only lead
# to us emitting the warning twice instead of just once.
# -> do not spend CPU on lock and just ignore it.
warnings.warn(
"FIXME %s does not provide loadAt - emulating it via loadBefore, but ...\n"
"\t... 1) access will be potentially slower, and\n"
"\t... 2) not full semantic of loadAt could be provided.\n"
"\t... this can lead to data corruption.\n"
"\t... -> please see https://github.com/zopefoundation/ZODB/issues/318 for details." %
type(storage), DeprecationWarning)
_loadAtWarned.add(type(storage))
if at == maxtid:
before = at
else:
before = p64(u64(at)+1)
try:
r = storage.loadBefore(oid, before)
except ZODB.POSException.POSKeyError:
return (None, z64) # object does not exist at all
if r is None:
# object was removed; however loadBefore does not tell when.
# return serial=0 - this is the "data corruption" case talked about above.
return (None, z64)
data, serial, next_serial = r
return (data, serial)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment