Commit ddc41882 authored by Jim Fulton's avatar Jim Fulton

Got rid of more version support.

parent 81c1fae4
......@@ -265,11 +265,11 @@ class ClientStorage(object):
# _server_addr is used by sortKey()
self._server_addr = None
self._tfile = None
self._pickler = None
self._verification_invalidations = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0}
'supportsUndo':0}
self._tbuf = self.TransactionBufferClass()
self._db = None
......@@ -572,6 +572,13 @@ class ClientStorage(object):
# it should set self._server. If it goes through full cache
# verification, then endVerify() should self._server.
# if not self._cache:
# log2("No verification necessary -- empty cache")
# self._server = server
# self._ready.set()
# return "full verification"
last_inval_tid = self._cache.getLastTid()
if last_inval_tid is not None:
ltid = server.lastTransaction()
......@@ -594,17 +601,15 @@ class ClientStorage(object):
self._server = server
self._ready.set()
return "quick verification"
log2("Verifying cache")
# setup tempfile to hold zeoVerify results
self._tfile = tempfile.TemporaryFile(suffix=".inv")
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
self._verification_invalidations = []
# TODO: should batch these operations for efficiency; would need
# to acquire lock ...
for oid, tid, version in self._cache.contents():
server.verify(oid, version, tid)
server.verify(oid, tid)
self._pending_server = server
server.endZeoVerify()
return "full verification"
......@@ -667,10 +672,6 @@ class ClientStorage(object):
"""Storage API: return whether we support undo."""
return self._info['supportsUndo']
def supportsVersions(self):
"""Storage API: return whether we support versions."""
return self._info['supportsVersions']
def isReadOnly(self):
"""Storage API: return whether we are in read-only mode."""
if self._is_read_only:
......@@ -689,44 +690,10 @@ class ClientStorage(object):
raise POSException.StorageTransactionError(self._transaction,
trans)
def abortVersion(self, version, txn):
"""Storage API: clear any changes made by the given version."""
self._check_trans(txn)
tid, oids = self._server.abortVersion(version, id(txn))
# When a version aborts, invalidate the version and
# non-version data. The non-version data should still be
# valid, but older versions of ZODB will change the
# non-version serialno on an abort version. With those
# versions of ZODB, you'd get a conflict error if you tried to
# commit a transaction with the cached data.
# If we could guarantee that ZODB gave the right answer,
# we could just invalidate the version data.
for oid in oids:
self._tbuf.invalidate(oid, '')
return tid, oids
def commitVersion(self, source, destination, txn):
"""Storage API: commit the source version in the destination."""
self._check_trans(txn)
tid, oids = self._server.commitVersion(source, destination, id(txn))
if destination:
# just invalidate our version data
for oid in oids:
self._tbuf.invalidate(oid, source)
else:
# destination is "", so invalidate version and non-version
for oid in oids:
self._tbuf.invalidate(oid, "")
return tid, oids
def history(self, oid, version, length=1):
def history(self, oid, length=1):
"""Storage API: return a sequence of HistoryEntry objects.
This does not support the optional filter argument defined by
the Storage API.
"""
return self._server.history(oid, version, length)
return self._server.history(oid, length)
def record_iternext(self, next=None):
"""Storage API: get the next database record.
......@@ -743,21 +710,18 @@ class ClientStorage(object):
"""Storage API: load a historical revision of an object."""
return self._server.loadSerial(oid, serial)
def load(self, oid, version):
def load(self, oid, version=''):
"""Storage API: return the data for a given object.
This returns the pickle data and serial number for the object
specified by the given object id and version, if they exist;
specified by the given object id, if they exist;
otherwise a KeyError is raised.
"""
return self.loadEx(oid, version)[:2]
def loadEx(self, oid, version):
self._lock.acquire() # for atomic processing of invalidations
try:
t = self._cache.load(oid, version)
t = self._cache.load(oid, '')
if t:
return t
return t[:2] # XXX strip version
finally:
self._lock.release()
......@@ -773,19 +737,19 @@ class ClientStorage(object):
finally:
self._lock.release()
data, tid, ver = self._server.loadEx(oid, version)
data, tid = self._server.loadEx(oid)
self._lock.acquire() # for atomic processing of invalidations
try:
if self._load_status:
self._cache.store(oid, ver, tid, None, data)
self._cache.store(oid, '', tid, None, data)
self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()
return data, tid, ver
return data, tid
def loadBefore(self, oid, tid):
self._lock.acquire()
......@@ -823,20 +787,6 @@ class ClientStorage(object):
return data, start, end
def modifiedInVersion(self, oid):
"""Storage API: return the version, if any, that modfied an object.
If no version modified the object, return an empty string.
"""
self._lock.acquire()
try:
v = self._cache.modifiedInVersion(oid)
if v is not None:
return v
finally:
self._lock.release()
return self._server.modifiedInVersion(oid)
def new_oid(self):
"""Storage API: return a new object identifier."""
if self._is_read_only:
......@@ -887,24 +837,25 @@ class ClientStorage(object):
def store(self, oid, serial, data, version, txn):
"""Storage API: store data for an object."""
assert not version
self._check_trans(txn)
self._server.storea(oid, serial, data, version, id(txn))
self._tbuf.store(oid, version, data)
self._server.storea(oid, serial, data, id(txn))
self._tbuf.store(oid, data)
return self._check_serials()
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
"""Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
assert not version
serials = self.store(oid, serial, data, '', txn)
if self.shared_blob_dir:
self._storeBlob_shared(
oid, serial, data, blobfilename, version, txn)
self._storeBlob_shared(oid, serial, data, blobfilename, txn)
else:
self._server.storeBlob(
oid, serial, data, blobfilename, version, txn)
self._server.storeBlob(oid, serial, data, blobfilename, txn)
self._tbuf.storeBlob(oid, blobfilename)
return serials
def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
def _storeBlob_shared(self, oid, serial, data, filename, txn):
# First, move the blob into the blob directory
dir = self.fshelper.getPathForOID(oid)
if not os.path.exists(dir):
......@@ -924,8 +875,7 @@ class ClientStorage(object):
# Now tell the server where we put it
self._server.storeBlobShared(
oid, serial, data,
os.path.basename(target), version, id(txn))
oid, serial, data, os.path.basename(target), id(txn))
def _have_blob(self, blob_filename, oid, serial):
if os.path.exists(blob_filename):
......@@ -1161,14 +1111,14 @@ class ClientStorage(object):
if self._cache is None:
return
for oid, version, data in self._tbuf:
self._cache.invalidate(oid, version, tid)
for oid, data in self._tbuf:
self._cache.invalidate(oid, '', tid)
# If data is None, we just invalidate.
if data is not None:
s = self._seriald[oid]
if s != ResolvedSerial:
assert s == tid, (s, tid)
self._cache.store(oid, version, s, None, data)
self._cache.store(oid, '', s, None, data)
if self.fshelper is not None:
......@@ -1197,7 +1147,7 @@ class ClientStorage(object):
self._check_trans(txn)
tid, oids = self._server.undo(trans_id, id(txn))
for oid in oids:
self._tbuf.invalidate(oid, '')
self._tbuf.invalidate(oid)
return tid, oids
def undoInfo(self, first=0, last=-20, specification=None):
......@@ -1216,14 +1166,6 @@ class ClientStorage(object):
return []
return self._server.undoLog(first, last)
def versionEmpty(self, version):
"""Storage API: return whether the version has no transactions."""
return self._server.versionEmpty(version)
def versions(self, max=None):
"""Storage API: return a sequence of versions in the storage."""
return self._server.versions(max)
# Below are methods invoked by the StorageServer
def serialnos(self, args):
......@@ -1235,55 +1177,38 @@ class ClientStorage(object):
self._info.update(dict)
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
"""Server callback to invalidate an (oid, '') pair.
This is called as part of cache validation.
"""
# Invalidation as result of verify_cache().
# Queue an invalidate for the end the verification procedure.
if self._pickler is None:
if self._verification_invalidations is None:
# This should never happen. TODO: assert it doesn't, or log
# if it does.
return
self._pickler.dump(args)
def _process_invalidations(self, invs):
# Invalidations are sent by the ZEO server as a sequence of
# oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids.
self._verification_invalidations.append(args[0])
def _process_invalidations(self, tid, oids):
self._lock.acquire()
try:
# versions maps version names to dictionary of invalidations
versions = {}
for oid, version, tid in invs:
for oid in oids:
if oid == self._load_oid:
self._load_status = 0
self._cache.invalidate(oid, version, tid)
oids = versions.get((version, tid))
if not oids:
versions[(version, tid)] = [oid]
else:
oids.append(oid)
self._cache.invalidate(oid, '', tid)
if self._db is not None:
for (version, tid), d in versions.items():
self._db.invalidate(tid, d, version=version)
self._db.invalidate(tid, oids)
finally:
self._lock.release()
def endVerify(self):
"""Server callback to signal end of cache validation."""
if self._pickler is None:
if self._verification_invalidations is None:
return
# write end-of-data marker
self._pickler.dump((None, None))
self._pickler = None
self._tfile.seek(0)
f = self._tfile
self._tfile = None
self._process_invalidations(InvalidationLogIterator(f))
f.close()
self._process_invalidations(None, self._verification_invalidations)
self._verification_invalidations = None
log2("endVerify finishing")
self._server = self._pending_server
......@@ -1293,19 +1218,18 @@ class ClientStorage(object):
def invalidateTransaction(self, tid, args):
"""Invalidate objects modified by tid."""
oids = (a[0] for a in args)
self._lock.acquire()
try:
self._cache.setLastTid(tid)
finally:
self._lock.release()
if self._pickler is not None:
if self._verification_invalidations is not None:
log2("Transactional invalidation during cache verification",
level=BLATHER)
for t in args:
self._pickler.dump(t)
self._verification_invalidations.extend(oids)
return
self._process_invalidations([(oid, version, tid)
for oid, version in args])
self._process_invalidations(tid, list(oids))
# The following are for compatibility with protocol version 2.0.0
......@@ -1316,10 +1240,3 @@ class ClientStorage(object):
end = endVerify
Invalidate = invalidateTrans
def InvalidationLogIterator(fileobj):
unpickler = cPickle.Unpickler(fileobj)
while 1:
oid, version = unpickler.load()
if oid is None:
break
yield oid, version, None
......@@ -34,8 +34,8 @@ class CommitLog:
def size(self):
return self.file.tell()
def store(self, oid, serial, data, version):
self.pickler.dump((oid, serial, data, version))
def store(self, oid, serial, data):
self.pickler.dump((oid, serial, data))
self.stores += 1
def get_loader(self):
......
......@@ -116,23 +116,21 @@ class StorageServer:
# server will make an asynchronous invalidateVerify() call.
# @param oid object id
# @param s serial number on non-version data
# @param sv serial number of version data or None
# @defreturn async
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
def zeoVerify(self, oid, s):
self.rpc.callAsync('zeoVerify', oid, s, None)
##
# Check whether current serial number is valid for oid and version.
# Check whether current serial number is valid for oid.
# If the serial number is not current, the server will make an
# asynchronous invalidateVerify() call.
# @param oid object id
# @param version name of version for oid
# @param serial client's current serial number
# @defreturn async
def verify(self, oid, version, serial):
self.rpc.callAsync('verify', oid, version, serial)
def verify(self, oid, serial):
self.rpc.callAsync('verify', oid, '', serial)
##
# Signal to the server that cache verification is done.
......@@ -166,34 +164,26 @@ class StorageServer:
self.rpc.call('pack', t, wait)
##
# Return current data for oid. Version data is returned if
# present.
# Return current data for oid.
# @param oid object id
# @defreturn 5-tuple
# @return 5-tuple, current non-version data, serial number,
# version name, version data, version data serial number
# @defreturn 2-tuple
# @return 2-tuple, current non-version data, serial number
# @exception KeyError if oid is not found
def zeoLoad(self, oid):
return self.rpc.call('zeoLoad', oid)
return self.rpc.call('zeoLoad', oid)[:2]
##
# Return current data for oid in version, the tid of the
# transaction that wrote the most recent revision, and the name of
# the version for the data returned. Note that if the object
# wasn't modified in the version, then the non-version data is
# returned and the returned version is an empty string.
# Return current data for oid, and the tid of the
# transaction that wrote the most recent revision.
# @param oid object id
# @param version string, name of version
# @defreturn 3-tuple
# @return data, transaction id, version
# where version is the name of the version the data came
# from or "" for non-version data
# @defreturn 2-tuple
# @return data, transaction id
# @exception KeyError if oid is not found
def loadEx(self, oid, version):
return self.rpc.call("loadEx", oid, version)
def loadEx(self, oid):
return self.rpc.call("loadEx", oid, '')[:2]
##
# Return non-current data along with transaction ids that identify
......@@ -213,14 +203,13 @@ class StorageServer:
# @param oid object id
# @param serial serial number that this transaction read
# @param data new data record for oid
# @param version name of version or ""
# @param id id of current transaction
# @defreturn async
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
# the data into memory, so we use a message iterator. This
......@@ -235,13 +224,13 @@ class StorageServer:
break
yield ('storeBlobChunk', (chunk, ))
f.close()
yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
self.rpc.callAsyncIterator(store())
def storeBlobShared(self, oid, serial, data, filename, version, id):
def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
version, id)
'', id)
##
# Start two-phase commit for a transaction
......@@ -267,23 +256,17 @@ class StorageServer:
def tpc_abort(self, id):
self.rpc.callAsync('tpc_abort', id)
def abortVersion(self, src, id):
return self.rpc.call('abortVersion', src, id)
def commitVersion(self, src, dest, id):
return self.rpc.call('commitVersion', src, dest, id)
def history(self, oid, version, length=None):
def history(self, oid, length=None):
if length is None:
return self.rpc.call('history', oid, version)
return self.rpc.call('history', oid, '')
else:
return self.rpc.call('history', oid, version, length)
return self.rpc.call('history', oid, '', length)
def record_iternext(self, next):
return self.rpc.call('record_iternext', next)
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def load(self, oid):
return self.rpc.call('load', oid, '')
def sendBlob(self, oid, serial):
return self.rpc.call('sendBlob', oid, serial)
......@@ -294,14 +277,11 @@ class StorageServer:
def loadSerial(self, oid, serial):
return self.rpc.call('loadSerial', oid, serial)
def modifiedInVersion(self, oid):
return self.rpc.call('modifiedInVersion', oid)
def new_oid(self):
return self.rpc.call('new_oid')
def store(self, oid, serial, data, version, trans):
return self.rpc.call('store', oid, serial, data, version, trans)
def store(self, oid, serial, data, trans):
return self.rpc.call('store', oid, serial, data, '', trans)
def undo(self, trans_id, trans):
return self.rpc.call('undo', trans_id, trans)
......@@ -312,15 +292,6 @@ class StorageServer:
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
def versionEmpty(self, vers):
return self.rpc.call('versionEmpty', vers)
def versions(self, max=None):
if max is None:
return self.rpc.call('versions')
else:
return self.rpc.call('versions', max)
class ExtensionMethodWrapper:
def __init__(self, rpc, name):
self.rpc = rpc
......
......@@ -162,17 +162,6 @@ class ZEOStorage:
storage = self.storage
info = self.get_info()
if info['supportsVersions']:
self.versionEmpty = storage.versionEmpty
self.versions = storage.versions
self.modifiedInVersion = storage.modifiedInVersion
else:
self.versionEmpty = lambda version: True
self.versions = lambda max=None: ()
self.modifiedInVersion = lambda oid: ''
def commitVersion(*a, **k):
raise NotImplementedError
self.commitVersion = self.abortVersion = commitVersion
if not info['supportsUndo']:
self.undoLog = self.undoInfo = lambda *a,**k: ()
......@@ -276,13 +265,6 @@ class ZEOStorage:
def get_info(self):
storage = self.storage
try:
supportsVersions = storage.supportsVersions
except AttributeError:
supportsVersions = False
else:
supportsVersions = supportsVersions()
try:
supportsUndo = storage.supportsUndo
except AttributeError:
......@@ -294,7 +276,7 @@ class ZEOStorage:
'size': storage.getSize(),
'name': storage.getName(),
'supportsUndo': supportsUndo,
'supportsVersions': supportsVersions,
'supportsVersions': False,
'extensionMethods': self.getExtensionMethods(),
'supports_record_iternext': hasattr(self, 'record_iternext'),
}
......@@ -307,13 +289,10 @@ class ZEOStorage:
def getExtensionMethods(self):
return self._extensions
def loadEx(self, oid, version):
def loadEx(self, oid, version=''):
self.stats.loads += 1
if version:
oversion = self.storage.modifiedInVersion(oid)
if oversion == version:
data, serial = self.storage.load(oid, version)
return data, serial, version
raise StorageServerError("Versions aren't supported.")
data, serial = self.storage.load(oid, '')
return data, serial, ''
......@@ -324,20 +303,8 @@ class ZEOStorage:
def zeoLoad(self, oid):
self.stats.loads += 1
v = self.storage.modifiedInVersion(oid)
if v:
pv, sv = self.storage.load(oid, v)
else:
pv = sv = None
try:
p, s = self.storage.load(oid, '')
except KeyError:
if sv:
# Created in version, no non-version data
p = s = None
else:
raise
return p, s, v, pv, sv
p, s = self.storage.load(oid, '')
return p, s, '', None, None
def getInvalidations(self, tid):
invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
......@@ -348,20 +315,17 @@ class ZEOStorage:
return invtid, invlist
def verify(self, oid, version, tid):
if version:
raise StorageServerError("Versions aren't supported.")
try:
t = self.getTid(oid)
except KeyError:
self.client.invalidateVerify((oid, ""))
else:
if tid != t:
# This will invalidate non-version data when the
# client only has invalid version data. Since this is
# an uncommon case, we avoid the cost of checking
# whether the serial number matches the current
# non-version data.
self.client.invalidateVerify((oid, version))
def zeoVerify(self, oid, s, sv):
self.client.invalidateVerify((oid, ''))
def zeoVerify(self, oid, s, sv=None):
if not self.verifying:
self.verifying = 1
self.stats.verifying_clients += 1
......@@ -374,22 +338,8 @@ class ZEOStorage:
# invalidation is right. It could be an application bug
# that left a dangling reference, in which case it's bad.
else:
# If the client has version data, the logic is a bit more
# complicated. If the current serial number matches the
# client serial number, then the non-version data must
# also be valid. If the current serialno is for a
# version, then the non-version data can't change.
# If the version serialno isn't valid, then the
# non-version serialno may or may not be valid. Rather
# than trying to figure it whether it is valid, we just
# invalidate it. Sending an invalidation for the
# non-version data implies invalidating the version data
# too, since an update to non-version data can only occur
# after the version is aborted or committed.
if sv:
if sv != os:
self.client.invalidateVerify((oid, ''))
raise StorageServerError("Versions aren't supported.")
else:
if s != os:
self.client.invalidateVerify((oid, ''))
......@@ -521,9 +471,11 @@ class ZEOStorage:
# an _.
def storea(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
self.txnlog.store(oid, serial, data)
def storeBlobStart(self):
assert self.blob_tempfile is None
......@@ -534,16 +486,20 @@ class ZEOStorage:
os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
fd, tempname = self.blob_tempfile
self.blob_tempfile = None
os.close(fd)
self.blob_log.append((oid, serial, data, tempname, version))
self.blob_log.append((oid, serial, data, tempname))
def storeBlobShared(self, oid, serial, data, filename, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
# Reconstruct the full path from the filename in the OID directory
filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
filename)
self.blob_log.append((oid, serial, data, filename, version))
self.blob_log.append((oid, serial, data, filename))
def sendBlob(self, oid, serial):
self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
......@@ -558,20 +514,6 @@ class ZEOStorage:
else:
return self._wait(lambda: self._vote())
def abortVersion(self, src, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._abortVersion(src)
else:
return self._wait(lambda: self._abortVersion(src))
def commitVersion(self, src, dest, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._commitVersion(src, dest)
else:
return self._wait(lambda: self._commitVersion(src, dest))
def undo(self, trans_id, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
......@@ -585,10 +527,10 @@ class ZEOStorage:
self.stats.lock_time = time.time()
self.storage.tpc_begin(txn, tid, status)
def _store(self, oid, serial, data, version):
def _store(self, oid, serial, data):
err = None
try:
newserial = self.storage.store(oid, serial, data, version,
newserial = self.storage.store(oid, serial, data, '',
self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
......@@ -616,7 +558,7 @@ class ZEOStorage:
newserial = [(oid, err)]
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version))
self.invalidated.append((oid, ''))
if isinstance(newserial, str):
newserial = [(oid, newserial)]
......@@ -645,21 +587,6 @@ class ZEOStorage:
self.client.serialnos(self.serials)
return
def _abortVersion(self, src):
tid, oids = self.storage.abortVersion(src, self.transaction)
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return tid, oids
def _commitVersion(self, src, dest):
tid, oids = self.storage.commitVersion(src, dest, self.transaction)
inv = [(oid, dest) for oid in oids]
self.invalidated.extend(inv)
if dest:
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return tid, oids
def _undo(self, trans_id):
tid, oids = self.storage.undo(trans_id, self.transaction)
inv = [(oid, None) for oid in oids]
......@@ -706,9 +633,9 @@ class ZEOStorage:
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename, version = self.blob_log.pop()
oid, oldserial, data, blobfilename = self.blob_log.pop()
self.storage.storeBlob(oid, oldserial, data, blobfilename,
version, self.transaction,)
'', self.transaction,)
resp = self._thunk()
if delay is not None:
......@@ -742,6 +669,20 @@ class ZEOStorage:
else:
return 1
def modifiedInVersion(self, oid):
return ''
def versions(self):
return ()
def versionEmpty(self, version):
return True
def commitVersion(self, *a, **k):
raise NotImplementedError
abortVersion = commitVersion
class StorageServerDB:
def __init__(self, server, storage_id):
......@@ -750,10 +691,12 @@ class StorageServerDB:
self.references = ZODB.serialize.referencesf
def invalidate(self, tid, oids, version=''):
if version:
raise StorageServerError("Versions aren't supported.")
storage_id = self.storage_id
self.server.invalidate(
None, storage_id, tid,
[(oid, version) for oid in oids],
[(oid, '') for oid in oids],
)
for zeo_server in self.server.connections.get(storage_id, ())[:]:
try:
......@@ -1026,7 +969,7 @@ class StorageServer:
This is called from several ZEOStorage methods.
invalidated is a sequence of oid, version pairs.
invalidated is a sequence of oid, empty-string pairs.
This can do three different things:
......
......@@ -77,34 +77,28 @@ class TransactionBuffer:
finally:
self.lock.release()
def store(self, oid, version, data):
def store(self, oid, data):
"""Store oid, version, data for later retrieval"""
self.lock.acquire()
try:
self._store(oid, version, data)
if self.closed:
return
self.pickler.dump((oid, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + len(data) + 31
finally:
self.lock.release()
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
def _store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
if self.closed:
return
self.pickler.dump((oid, version, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + len(data) + 31
if version:
# Assume version data has same size as non-version data
self.size = self.size + len(version) + len(data) + 12
def invalidate(self, oid, version):
def invalidate(self, oid):
self.lock.acquire()
try:
if self.closed:
return
self.pickler.dump((oid, version, None))
self.pickler.dump((oid, None))
self.count += 1
finally:
self.lock.release()
......
......@@ -20,6 +20,3 @@ ZEO is now part of ZODB; ZODB's home on the web is
http://www.zope.org/Wikis/ZODB
"""
# The next line must use double quotes, so release.py recognizes it.
version = "3.7.0b3"
......@@ -41,7 +41,7 @@ class IServeable(zope.interface.Interface):
performed by the most recent transactions.
An iterable of up to size entries must be returned, where each
entry is a transaction id and a sequence of object-id/version
pairs describing the objects and versions written by the
entry is a transaction id and a sequence of object-id/empty-string
pairs describing the objects written by the
transaction, in chronological order.
"""
......@@ -24,6 +24,18 @@ import logging
import ZEO
zeo_version = 'unknown'
try:
import pkg_resources
except ImportError:
pass
else:
zeo_dist = pkg_resources.working_set.find(
pkg_resources.Requirement.parse('ZODB3')
)
if zeo_dist is not None:
zeo_version = zeo_dist.version
class StorageStats:
"""Per-storage usage statistics."""
......@@ -149,7 +161,7 @@ class StatsServer(asyncore.dispatcher):
f.close()
def dump(self, f):
print >> f, "ZEO monitor server version %s" % ZEO.version
print >> f, "ZEO monitor server version %s" % zeo_version
print >> f, time.ctime()
print >> f
......
......@@ -49,14 +49,13 @@ def main():
print "Connected. Now starting a transaction..."
oid = storage.new_oid()
version = ""
revid = ZERO
data = MinPO("timeout.py")
pickled_data = zodb_pickle(data)
t = Transaction()
t.user = "timeout.py"
storage.tpc_begin(t)
storage.store(oid, revid, pickled_data, version, t)
storage.store(oid, revid, pickled_data, '', t)
print "Stored. Now voting..."
storage.tpc_vote(t)
......
......@@ -111,9 +111,17 @@ class CommonSetupTearDown(StorageTestBase):
self._newAddr()
self.startServer()
# self._old_log_level = logging.getLogger().getEffectiveLevel()
# logging.getLogger().setLevel(logging.WARNING)
# self._log_handler = logging.StreamHandler()
# logging.getLogger().addHandler(self._log_handler)
def tearDown(self):
"""Try to cause the tests to halt"""
logging.info("tearDown() %s" % self.id())
# logging.getLogger().setLevel(self._old_log_level)
# logging.getLogger().removeHandler(self._log_handler)
# logging.info("tearDown() %s" % self.id())
for p in self.conf_paths:
os.remove(p)
if getattr(self, '_storage', None) is not None:
......
......@@ -24,8 +24,7 @@ from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
from ZODB.POSException import ReadConflictError, ConflictError
# The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the
......@@ -433,44 +432,6 @@ class InvalidationTests:
db1.close()
db2.close()
# TODO: Temporarily disabled. I know it fails, and there's no point
# getting an endless number of reports about that.
def xxxcheckConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
transaction.commit()
cn.close()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
cd = {}
t1 = VersionStressThread(db1, stop, 1, cd, 1, 3)
t2 = VersionStressThread(db2, stop, 2, cd, 2, 3, 0.01)
t3 = VersionStressThread(db2, stop, 3, cd, 3, 3, 0.01)
self.go(stop, cd, t1, t2, t3)
while db1.lastTransaction() != db2.lastTransaction():
db1._storage.sync()
db2._storage.sync()
cn = db1.open()
tree = cn.root()["tree"]
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
def checkConcurrentLargeUpdates(self):
# Use 3 threads like the 2StorageMT test above.
self._storage = storage1 = self.openClientStorage()
......
......@@ -81,17 +81,17 @@ Now, if we call invalidate, we'll see it propigate to the client:
invalidateTransaction trans2 2
[('ob1', ''), ('ob2', '')]
>>> storage.db.invalidate('trans3', ['ob1', 'ob2'], 'v')
>>> storage.db.invalidate('trans3', ['ob1', 'ob2'])
invalidateTransaction trans3 1
[('ob1', 'v'), ('ob2', 'v')]
[('ob1', ''), ('ob2', '')]
invalidateTransaction trans3 2
[('ob1', 'v'), ('ob2', 'v')]
[('ob1', ''), ('ob2', '')]
The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans3' [('ob1', 'v'), ('ob2', 'v')]
'trans3' [('ob1', ''), ('ob2', '')]
'trans2' [('ob1', ''), ('ob2', '')]
'trans1' [('ob0', ''), ('ob1', '')]
'trans0' [('ob0', '')]
......
......@@ -23,18 +23,18 @@ def random_string(size):
def new_store_data():
"""Return arbitrary data to use as argument to store() method."""
return random_string(8), '', random_string(random.randrange(1000))
return random_string(8), random_string(random.randrange(1000))
def new_invalidate_data():
"""Return arbitrary data to use as argument to invalidate() method."""
return random_string(8), ''
return random_string(8)
class TransBufTests(unittest.TestCase):
def checkTypicalUsage(self):
tbuf = TransactionBuffer()
tbuf.store(*new_store_data())
tbuf.invalidate(*new_invalidate_data())
tbuf.invalidate(new_invalidate_data())
for o in tbuf:
pass
......@@ -45,13 +45,13 @@ class TransBufTests(unittest.TestCase):
tbuf.store(*d)
data.append(d)
d = new_invalidate_data()
tbuf.invalidate(*d)
tbuf.invalidate(d)
data.append(d)
for i, x in enumerate(tbuf):
if x[2] is None:
if x[1] is None:
# the tbuf add a dummy None to invalidates
x = x[:2]
x = x[0]
self.assertEqual(x, data[i])
def checkOrderPreserved(self):
......
......@@ -517,7 +517,6 @@ class CommonBlobTests:
handle_serials
import transaction
version = ''
somedata = 'a' * 10
blob = Blob()
......@@ -680,9 +679,6 @@ class StorageServerWrapper:
def supportsUndo(self):
return False
def supportsVersions(self):
return False
def new_oid(self):
return self.server.new_oids(1)[0]
......@@ -696,8 +692,8 @@ class StorageServerWrapper:
del self.server.client.serials[:]
return result
def store(self, oid, serial, data, version, transaction):
self.server.storea(oid, serial, data, version, id(transaction))
def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, '', id(transaction))
def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction))
......@@ -792,7 +788,7 @@ structure using lastTransactions.
>>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for (oid, version) in oids])
>>> sorted([int(u64(oid)) for (oid, _) in oids])
[0, 92, 93, 94, 95, 96, 97, 98, 99, 100]
(Note that the fact that we get oids for 92-100 is actually an
......@@ -840,7 +836,7 @@ transaction, we'll get a result:
>>> ntid == last[-1]
True
>>> sorted([int(u64(oid)) for (oid, version) in oids])
>>> sorted([int(u64(oid)) for (oid, _) in oids])
[0, 101, 102, 103, 104]
"""
......
......@@ -94,12 +94,10 @@ class CacheTests(unittest.TestCase):
def testLoad(self):
data1 = "data for n1"
self.assertEqual(self.cache.load(n1, ""), None)
self.assertEqual(self.cache.load(n1, "version"), None)
self.cache.store(n1, "", n3, None, data1)
self.assertEqual(self.cache.load(n1, ""), (data1, n3, ""))
# The cache doesn't know whether version exists, because it
# only has non-version data.
self.assertEqual(self.cache.load(n1, "version"), None)
self.assertEqual(self.cache.modifiedInVersion(n1), None)
def testInvalidate(self):
......
......@@ -217,7 +217,7 @@ class IConnection(Interface):
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is a set of oids, represented as a dict with oids as keys.
oids: oids is an iterable of oids.
"""
def root():
......
......@@ -47,7 +47,7 @@ class BasicStorage:
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 1, 2, 3, transaction.Transaction())
0, 1, 2, '', transaction.Transaction())
self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment