Commit a4534105 authored by Jim Fulton's avatar Jim Fulton

Got rid of more version support.

parent eef59eb3
This diff is collapsed.
......@@ -34,8 +34,8 @@ class CommitLog:
def size(self):
return self.file.tell()
def store(self, oid, serial, data, version):
self.pickler.dump((oid, serial, data, version))
def store(self, oid, serial, data):
self.pickler.dump((oid, serial, data))
self.stores += 1
def get_loader(self):
......
......@@ -116,23 +116,21 @@ class StorageServer:
# server will make an asynchronous invalidateVerify() call.
# @param oid object id
# @param s serial number on non-version data
# @param sv serial number of version data or None
# @defreturn async
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
def zeoVerify(self, oid, s):
self.rpc.callAsync('zeoVerify', oid, s, None)
##
# Check whether current serial number is valid for oid and version.
# Check whether current serial number is valid for oid.
# If the serial number is not current, the server will make an
# asynchronous invalidateVerify() call.
# @param oid object id
# @param version name of version for oid
# @param serial client's current serial number
# @defreturn async
def verify(self, oid, version, serial):
self.rpc.callAsync('verify', oid, version, serial)
def verify(self, oid, serial):
self.rpc.callAsync('verify', oid, '', serial)
##
# Signal to the server that cache verification is done.
......@@ -166,34 +164,26 @@ class StorageServer:
self.rpc.call('pack', t, wait)
##
# Return current data for oid. Version data is returned if
# present.
# Return current data for oid.
# @param oid object id
# @defreturn 5-tuple
# @return 5-tuple, current non-version data, serial number,
# version name, version data, version data serial number
# @defreturn 2-tuple
# @return 2-tuple, current non-version data, serial number
# @exception KeyError if oid is not found
def zeoLoad(self, oid):
return self.rpc.call('zeoLoad', oid)
return self.rpc.call('zeoLoad', oid)[:2]
##
# Return current data for oid in version, the tid of the
# transaction that wrote the most recent revision, and the name of
# the version for the data returned. Note that if the object
# wasn't modified in the version, then the non-version data is
# returned and the returned version is an empty string.
# Return current data for oid, and the tid of the
# transaction that wrote the most recent revision.
# @param oid object id
# @param version string, name of version
# @defreturn 3-tuple
# @return data, transaction id, version
# where version is the name of the version the data came
# from or "" for non-version data
# @defreturn 2-tuple
# @return data, transaction id
# @exception KeyError if oid is not found
def loadEx(self, oid, version):
return self.rpc.call("loadEx", oid, version)
def loadEx(self, oid):
return self.rpc.call("loadEx", oid, '')[:2]
##
# Return non-current data along with transaction ids that identify
......@@ -213,14 +203,13 @@ class StorageServer:
# @param oid object id
# @param serial serial number that this transaction read
# @param data new data record for oid
# @param version name of version or ""
# @param id id of current transaction
# @defreturn async
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
# the data into memory, so we use a message iterator. This
......@@ -235,13 +224,13 @@ class StorageServer:
break
yield ('storeBlobChunk', (chunk, ))
f.close()
yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
self.rpc.callAsyncIterator(store())
def storeBlobShared(self, oid, serial, data, filename, version, id):
def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
version, id)
'', id)
##
# Start two-phase commit for a transaction
......@@ -267,23 +256,17 @@ class StorageServer:
def tpc_abort(self, id):
self.rpc.callAsync('tpc_abort', id)
def abortVersion(self, src, id):
return self.rpc.call('abortVersion', src, id)
def commitVersion(self, src, dest, id):
return self.rpc.call('commitVersion', src, dest, id)
def history(self, oid, version, length=None):
def history(self, oid, length=None):
if length is None:
return self.rpc.call('history', oid, version)
return self.rpc.call('history', oid, '')
else:
return self.rpc.call('history', oid, version, length)
return self.rpc.call('history', oid, '', length)
def record_iternext(self, next):
return self.rpc.call('record_iternext', next)
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def load(self, oid):
return self.rpc.call('load', oid, '')
def sendBlob(self, oid, serial):
return self.rpc.call('sendBlob', oid, serial)
......@@ -294,14 +277,11 @@ class StorageServer:
def loadSerial(self, oid, serial):
return self.rpc.call('loadSerial', oid, serial)
def modifiedInVersion(self, oid):
return self.rpc.call('modifiedInVersion', oid)
def new_oid(self):
return self.rpc.call('new_oid')
def store(self, oid, serial, data, version, trans):
return self.rpc.call('store', oid, serial, data, version, trans)
def store(self, oid, serial, data, trans):
return self.rpc.call('store', oid, serial, data, '', trans)
def undo(self, trans_id, trans):
return self.rpc.call('undo', trans_id, trans)
......@@ -312,15 +292,6 @@ class StorageServer:
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
def versionEmpty(self, vers):
return self.rpc.call('versionEmpty', vers)
def versions(self, max=None):
if max is None:
return self.rpc.call('versions')
else:
return self.rpc.call('versions', max)
class ExtensionMethodWrapper:
def __init__(self, rpc, name):
self.rpc = rpc
......
This diff is collapsed.
......@@ -77,34 +77,28 @@ class TransactionBuffer:
finally:
self.lock.release()
def store(self, oid, version, data):
def store(self, oid, data):
"""Store oid, version, data for later retrieval"""
self.lock.acquire()
try:
self._store(oid, version, data)
finally:
self.lock.release()
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
def _store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
if self.closed:
return
self.pickler.dump((oid, version, data))
self.pickler.dump((oid, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + len(data) + 31
if version:
# Assume version data has same size as non-version data
self.size = self.size + len(version) + len(data) + 12
finally:
self.lock.release()
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
def invalidate(self, oid, version):
def invalidate(self, oid):
self.lock.acquire()
try:
if self.closed:
return
self.pickler.dump((oid, version, None))
self.pickler.dump((oid, None))
self.count += 1
finally:
self.lock.release()
......
......@@ -20,6 +20,3 @@ ZEO is now part of ZODB; ZODB's home on the web is
http://www.zope.org/Wikis/ZODB
"""
# The next line must use double quotes, so release.py recognizes it.
version = "3.7.0b3"
......@@ -41,7 +41,7 @@ class IServeable(zope.interface.Interface):
performed by the most recent transactions.
An iterable of up to size entries must be returned, where each
entry is a transaction id and a sequence of object-id/version
pairs describing the objects and versions written by the
entry is a transaction id and a sequence of object-id/empty-string
pairs describing the objects written by the
transaction, in chronological order.
"""
......@@ -24,6 +24,18 @@ import logging
import ZEO
zeo_version = 'unknown'
try:
import pkg_resources
except ImportError:
pass
else:
zeo_dist = pkg_resources.working_set.find(
pkg_resources.Requirement.parse('ZODB3')
)
if zeo_dist is not None:
zeo_version = zeo_dist.version
class StorageStats:
"""Per-storage usage statistics."""
......@@ -149,7 +161,7 @@ class StatsServer(asyncore.dispatcher):
f.close()
def dump(self, f):
print >> f, "ZEO monitor server version %s" % ZEO.version
print >> f, "ZEO monitor server version %s" % zeo_version
print >> f, time.ctime()
print >> f
......
......@@ -49,14 +49,13 @@ def main():
print "Connected. Now starting a transaction..."
oid = storage.new_oid()
version = ""
revid = ZERO
data = MinPO("timeout.py")
pickled_data = zodb_pickle(data)
t = Transaction()
t.user = "timeout.py"
storage.tpc_begin(t)
storage.store(oid, revid, pickled_data, version, t)
storage.store(oid, revid, pickled_data, '', t)
print "Stored. Now voting..."
storage.tpc_vote(t)
......
......@@ -111,9 +111,17 @@ class CommonSetupTearDown(StorageTestBase):
self._newAddr()
self.startServer()
# self._old_log_level = logging.getLogger().getEffectiveLevel()
# logging.getLogger().setLevel(logging.WARNING)
# self._log_handler = logging.StreamHandler()
# logging.getLogger().addHandler(self._log_handler)
def tearDown(self):
"""Try to cause the tests to halt"""
logging.info("tearDown() %s" % self.id())
# logging.getLogger().setLevel(self._old_log_level)
# logging.getLogger().removeHandler(self._log_handler)
# logging.info("tearDown() %s" % self.id())
for p in self.conf_paths:
os.remove(p)
if getattr(self, '_storage', None) is not None:
......
......@@ -24,8 +24,7 @@ from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
from ZODB.POSException import ReadConflictError, ConflictError
# The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the
......@@ -433,44 +432,6 @@ class InvalidationTests:
db1.close()
db2.close()
# TODO: Temporarily disabled. I know it fails, and there's no point
# getting an endless number of reports about that.
def xxxcheckConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
transaction.commit()
cn.close()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
cd = {}
t1 = VersionStressThread(db1, stop, 1, cd, 1, 3)
t2 = VersionStressThread(db2, stop, 2, cd, 2, 3, 0.01)
t3 = VersionStressThread(db2, stop, 3, cd, 3, 3, 0.01)
self.go(stop, cd, t1, t2, t3)
while db1.lastTransaction() != db2.lastTransaction():
db1._storage.sync()
db2._storage.sync()
cn = db1.open()
tree = cn.root()["tree"]
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
def checkConcurrentLargeUpdates(self):
# Use 3 threads like the 2StorageMT test above.
self._storage = storage1 = self.openClientStorage()
......
......@@ -81,17 +81,17 @@ Now, if we call invalidate, we'll see it propigate to the client:
invalidateTransaction trans2 2
[('ob1', ''), ('ob2', '')]
>>> storage.db.invalidate('trans3', ['ob1', 'ob2'], 'v')
>>> storage.db.invalidate('trans3', ['ob1', 'ob2'])
invalidateTransaction trans3 1
[('ob1', 'v'), ('ob2', 'v')]
[('ob1', ''), ('ob2', '')]
invalidateTransaction trans3 2
[('ob1', 'v'), ('ob2', 'v')]
[('ob1', ''), ('ob2', '')]
The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans3' [('ob1', 'v'), ('ob2', 'v')]
'trans3' [('ob1', ''), ('ob2', '')]
'trans2' [('ob1', ''), ('ob2', '')]
'trans1' [('ob0', ''), ('ob1', '')]
'trans0' [('ob0', '')]
......
......@@ -23,18 +23,18 @@ def random_string(size):
def new_store_data():
"""Return arbitrary data to use as argument to store() method."""
return random_string(8), '', random_string(random.randrange(1000))
return random_string(8), random_string(random.randrange(1000))
def new_invalidate_data():
"""Return arbitrary data to use as argument to invalidate() method."""
return random_string(8), ''
return random_string(8)
class TransBufTests(unittest.TestCase):
def checkTypicalUsage(self):
tbuf = TransactionBuffer()
tbuf.store(*new_store_data())
tbuf.invalidate(*new_invalidate_data())
tbuf.invalidate(new_invalidate_data())
for o in tbuf:
pass
......@@ -45,13 +45,13 @@ class TransBufTests(unittest.TestCase):
tbuf.store(*d)
data.append(d)
d = new_invalidate_data()
tbuf.invalidate(*d)
tbuf.invalidate(d)
data.append(d)
for i, x in enumerate(tbuf):
if x[2] is None:
if x[1] is None:
# the tbuf add a dummy None to invalidates
x = x[:2]
x = x[0]
self.assertEqual(x, data[i])
def checkOrderPreserved(self):
......
......@@ -517,7 +517,6 @@ class CommonBlobTests:
handle_serials
import transaction
version = ''
somedata = 'a' * 10
blob = Blob()
......@@ -680,9 +679,6 @@ class StorageServerWrapper:
def supportsUndo(self):
return False
def supportsVersions(self):
return False
def new_oid(self):
return self.server.new_oids(1)[0]
......@@ -696,8 +692,8 @@ class StorageServerWrapper:
del self.server.client.serials[:]
return result
def store(self, oid, serial, data, version, transaction):
self.server.storea(oid, serial, data, version, id(transaction))
def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, '', id(transaction))
def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction))
......@@ -792,7 +788,7 @@ structure using lastTransactions.
>>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for (oid, version) in oids])
>>> sorted([int(u64(oid)) for (oid, _) in oids])
[0, 92, 93, 94, 95, 96, 97, 98, 99, 100]
(Note that the fact that we get oids for 92-100 is actually an
......@@ -840,7 +836,7 @@ transaction, we'll get a result:
>>> ntid == last[-1]
True
>>> sorted([int(u64(oid)) for (oid, version) in oids])
>>> sorted([int(u64(oid)) for (oid, _) in oids])
[0, 101, 102, 103, 104]
"""
......
......@@ -94,12 +94,10 @@ class CacheTests(unittest.TestCase):
def testLoad(self):
data1 = "data for n1"
self.assertEqual(self.cache.load(n1, ""), None)
self.assertEqual(self.cache.load(n1, "version"), None)
self.cache.store(n1, "", n3, None, data1)
self.assertEqual(self.cache.load(n1, ""), (data1, n3, ""))
# The cache doesn't know whether version exists, because it
# only has non-version data.
self.assertEqual(self.cache.load(n1, "version"), None)
self.assertEqual(self.cache.modifiedInVersion(n1), None)
def testInvalidate(self):
......
......@@ -217,7 +217,7 @@ class IConnection(Interface):
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is a set of oids, represented as a dict with oids as keys.
oids: oids is an iterable of oids.
"""
def root():
......
......@@ -47,7 +47,7 @@ class BasicStorage:
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 1, 2, 3, transaction.Transaction())
0, 1, 2, '', transaction.Transaction())
self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment