Commit 93d9e125 authored by Jim Fulton's avatar Jim Fulton

Cleaned up the Z309 ZEO protocol, removing versions from arguments and

return values.  This in turn simplified the client and server
software.

Added code to select different client and server stubs and input
handlers depening on whether the Z309 or earlier protocols are used.

ZODB 3.8 clients can now talk to ZODB 3.9 servers and the other way
around.
parent 2bc1fd50
......@@ -109,7 +109,7 @@ class ClientStorage(object):
TransactionBufferClass = TransactionBuffer
ClientCacheClass = ClientCache
ConnectionManagerClass = ConnectionManager
StorageServerStubClass = ServerStub.StorageServer
StorageServerStubClass = ServerStub.stub
def __init__(self, addr, storage='1', cache_size=20 * MB,
name='', client=None, debug=0, var=None,
......@@ -566,7 +566,8 @@ class ClientStorage(object):
ZODB.interfaces.IStorageCurrentRecordIteration,
ZODB.interfaces.IBlobStorage,
):
if (iface.__module__, iface.__name__) in self._info.get('interfaces', ()):
if (iface.__module__, iface.__name__) in self._info.get(
'interfaces', ()):
zope.interface.alsoProvides(self, iface)
def _handle_extensions(self):
......@@ -1208,8 +1209,13 @@ class ClientStorage(object):
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
if self._connection.peer_protocol_version < 'Z309':
client = ClientStorage308Adapter(self)
else:
client = self
# allow incoming invalidations:
self._connection.register_object(self)
self._connection.register_object(client)
# If verify_cache() finishes the cache verification process,
# it should set self._server. If it goes through full cache
......@@ -1279,8 +1285,8 @@ class ClientStorage(object):
server.endZeoVerify()
return "full verification"
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, '') pair.
def invalidateVerify(self, oid):
"""Server callback to invalidate an oid pair.
This is called as part of cache validation.
"""
......@@ -1290,7 +1296,7 @@ class ClientStorage(object):
# This should never happen.
logger.error("%s invalidateVerify with no _pickler", self.__name__)
return
self._pickler.dump((None, [args[0]]))
self._pickler.dump((None, [oid]))
def endVerify(self):
"""Server callback to signal end of cache validation."""
......@@ -1304,10 +1310,7 @@ class ClientStorage(object):
try:
if catch_up:
# process catch-up invalidations
tid, invalidations = catch_up
self._process_invalidations(
tid, (arg[0] for arg in invalidations)
)
self._process_invalidations(*catch_up)
if self._pickler is None:
return
......@@ -1337,7 +1340,7 @@ class ClientStorage(object):
self._pending_server = None
def invalidateTransaction(self, tid, args):
def invalidateTransaction(self, tid, oids):
"""Server callback: Invalidate objects modified by tid."""
self._lock.acquire()
try:
......@@ -1345,14 +1348,13 @@ class ClientStorage(object):
logger.debug(
"%s Transactional invalidation during cache verification",
self.__name__)
self._pickler.dump((tid, [arg[0] for arg in args]))
self._pickler.dump((tid, oids))
else:
self._process_invalidations(tid, (arg[0] for arg in args))
self._process_invalidations(tid, oids)
finally:
self._lock.release()
def _process_invalidations(self, tid, oids):
oids = list(oids)
for oid in oids:
if oid == self._load_oid:
self._load_status = 0
......@@ -1363,8 +1365,8 @@ class ClientStorage(object):
# The following are for compatibility with protocol version 2.0.0
def invalidateTrans(self, args):
return self.invalidateTransaction(None, args)
def invalidateTrans(self, oids):
return self.invalidateTransaction(None, oids)
invalidate = invalidateVerify
end = endVerify
......@@ -1482,3 +1484,17 @@ class RecordIterator(object):
self._completed = True
raise ZODB.interfaces.StorageStopIteration()
return ZODB.BaseStorage.DataRecord(*item)
class ClientStorage308Adapter:
def __init__(self, client):
self.client = client
def invalidateTransaction(self, tid, args):
self.client.invalidateTransaction(tid, [arg[0] for arg in args])
def invalidateVerify(self, arg):
self.client.invalidateVerify(arg[0])
def __getattr__(self, name):
return getattr(self.client, name)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""RPC stubs for interface exported by ClientStorage."""
class ClientStorage:
"""An RPC stub class for the interface exported by ClientStorage.
This is the interface presented by ClientStorage to the
StorageServer; i.e. the StorageServer calls these methods and they
are executed in the ClientStorage.
See the ClientStorage class for documentation on these methods.
It is currently important that all methods here are asynchronous
(meaning they don't have a return value and the caller doesn't
wait for them to complete), *and* that none of them cause any
calls from the client to the storage. This is due to limitations
in the zrpc subpackage.
The on-the-wire names of some of the methods don't match the
Python method names. That's because the on-the-wire protocol was
fixed for ZEO 2 and we don't want to change it. There are some
aliases in ClientStorage.py to make up for this.
"""
def __init__(self, rpc):
"""Constructor.
The argument is a connection: an instance of the
zrpc.connection.Connection class.
"""
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('beginVerify')
def invalidateVerify(self, args):
self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
def info(self, arg):
self.rpc.callAsync('info', arg)
def storeBlob(self, oid, serial, blobfilename):
def store():
yield ('receiveBlobStart', (oid, serial))
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('receiveBlobChunk', (oid, serial, chunk, ))
f.close()
yield ('receiveBlobStop', (oid, serial))
self.rpc.callAsyncIterator(store())
......@@ -48,15 +48,6 @@ class StorageServer:
"""
self.rpc = rpc
# Wait until we know what version the other side is using.
while rpc.peer_protocol_version is None:
time.sleep(0.1)
if rpc.peer_protocol_version == 'Z200':
self.lastTransaction = lambda: None
self.getInvalidations = lambda tid: None
self.getAuthProtocol = lambda: None
def extensionMethod(self, name):
return ExtensionMethodWrapper(self.rpc, name).call
......@@ -111,15 +102,15 @@ class StorageServer:
return self.rpc.call('getInvalidations', tid)
##
# Check whether serial numbers s and sv are current for oid.
# If one or both of the serial numbers are not current, the
# Check whether a serial number is current for oid.
# If the serial number is not current, the
# server will make an asynchronous invalidateVerify() call.
# @param oid object id
# @param s serial number on non-version data
# @param s serial number
# @defreturn async
def zeoVerify(self, oid, s):
self.rpc.callAsync('zeoVerify', oid, s, None)
self.rpc.callAsync('zeoVerify', oid, s)
##
# Check whether current serial number is valid for oid.
......@@ -130,7 +121,7 @@ class StorageServer:
# @defreturn async
def verify(self, oid, serial):
self.rpc.callAsync('verify', oid, '', serial)
self.rpc.callAsync('verify', oid, serial)
##
# Signal to the server that cache verification is done.
......@@ -183,7 +174,7 @@ class StorageServer:
# @exception KeyError if oid is not found
def loadEx(self, oid):
return self.rpc.call("loadEx", oid, '')[:2]
return self.rpc.call("loadEx", oid)
##
# Return non-current data along with transaction ids that identify
......@@ -207,12 +198,11 @@ class StorageServer:
# @defreturn async
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
self.rpc.callAsync('storea', oid, serial, data, id)
def restorea(self, oid, serial, data, prev_txn, id):
self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
......@@ -228,13 +218,12 @@ class StorageServer:
break
yield ('storeBlobChunk', (chunk, ))
f.close()
yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
yield ('storeBlobEnd', (oid, serial, data, id(txn)))
self.rpc.callAsyncIterator(store())
def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
'', id)
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, id)
##
# Start two-phase commit for a transaction
......@@ -262,16 +251,13 @@ class StorageServer:
def history(self, oid, length=None):
if length is None:
return self.rpc.call('history', oid, '')
return self.rpc.call('history', oid)
else:
return self.rpc.call('history', oid, '', length)
return self.rpc.call('history', oid, length)
def record_iternext(self, next):
return self.rpc.call('record_iternext', next)
def load(self, oid):
return self.rpc.call('load', oid, '')
def sendBlob(self, oid, serial):
return self.rpc.call('sendBlob', oid, serial)
......@@ -284,9 +270,6 @@ class StorageServer:
def new_oid(self):
return self.rpc.call('new_oid')
def store(self, oid, serial, data, trans):
return self.rpc.call('store', oid, serial, data, '', trans)
def undo(self, trans_id, trans):
return self.rpc.call('undo', trans_id, trans)
......@@ -311,6 +294,90 @@ class StorageServer:
def iterator_gc(self, iids):
return self.rpc.callAsync('iterator_gc', iids)
class StorageServer308(StorageServer):
def __init__(self, rpc):
if rpc.peer_protocol_version == 'Z200':
self.lastTransaction = lambda: None
self.getInvalidations = lambda tid: None
self.getAuthProtocol = lambda: None
StorageServer.__init__(self, rpc)
def history(self, oid, length=None):
if length is None:
return self.rpc.call('history', oid, '')
else:
return self.rpc.call('history', oid, '', length)
def getInvalidations(self, tid):
# Not in protocol version 2.0.0; see __init__()
result = self.rpc.call('getInvalidations', tid)
if result is not None:
result = result[0], [oid for (oid, version) in result[1]]
return result
def verify(self, oid, serial):
self.rpc.callAsync('verify', oid, '', serial)
def loadEx(self, oid):
return self.rpc.call("loadEx", oid, '')[:2]
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
# the data into memory, so we use a message iterator. This
# allows us to read the blob data as needed.
def store():
yield ('storeBlobStart', ())
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('storeBlobChunk', (chunk, ))
f.close()
yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
self.rpc.callAsyncIterator(store())
def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
'', id)
def zeoVerify(self, oid, s):
self.rpc.callAsync('zeoVerify', oid, s, None)
def iterator_start(self, start, stop):
raise NotImplementedError
def iterator_next(self, iid):
raise NotImplementedError
def iterator_record_start(self, txn_iid, tid):
raise NotImplementedError
def iterator_record_next(self, iid):
raise NotImplementedError
def iterator_gc(self, iids):
raise NotImplementedError
def stub(client, connection):
# Wait until we know what version the other side is using.
while connection.peer_protocol_version is None:
time.sleep(0.1)
if connection.peer_protocol_version < 'Z309':
return StorageServer308(connection)
return StorageServer(connection)
class ExtensionMethodWrapper:
def __init__(self, rpc, name):
......
......@@ -37,7 +37,6 @@ import ZODB.serialize
import ZEO.zrpc.error
import zope.interface
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.server import Dispatcher
......@@ -81,9 +80,6 @@ class StorageServerError(StorageError):
class ZEOStorage:
"""Proxy to underlying storage for a single remote client."""
# Classes we instantiate. A subclass might override.
ClientStorageStubClass = ClientStub.ClientStorage
# A list of extension methods. A subclass with extra methods
# should override.
extensions = []
......@@ -128,7 +124,12 @@ class ZEOStorage:
def notifyConnected(self, conn):
self.connection = conn # For restart_other() below
self.client = self.ClientStorageStubClass(conn)
assert conn.peer_protocol_version is not None
if conn.peer_protocol_version < 'Z309':
self.client = ClientStub308(conn)
conn.register_object(ZEOStorage308Adapter(self))
else:
self.client = ClientStub(conn)
addr = conn.addr
if isinstance(addr, type("")):
label = addr
......@@ -208,7 +209,6 @@ class ZEOStorage:
else:
raise
def _check_tid(self, tid, exc=None):
if self.read_only:
raise ReadOnlyError()
......@@ -288,7 +288,6 @@ class ZEOStorage:
'size': storage.getSize(),
'name': storage.getName(),
'supportsUndo': supportsUndo,
'supportsVersions': False,
'extensionMethods': self.getExtensionMethods(),
'supports_record_iternext': hasattr(self, 'record_iternext'),
'interfaces': tuple(interfaces),
......@@ -302,23 +301,14 @@ class ZEOStorage:
def getExtensionMethods(self):
return self._extensions
def loadEx(self, oid, version=''):
def loadEx(self, oid):
self.stats.loads += 1
if version:
raise StorageServerError("Versions aren't supported.")
data, serial = self.storage.load(oid, '')
return data, serial, ''
return self.storage.load(oid, '')
def loadBefore(self, oid, tid):
self.stats.loads += 1
return self.storage.loadBefore(oid, tid)
def zeoLoad(self, oid):
self.stats.loads += 1
p, s = self.storage.load(oid, '')
return p, s, '', None, None
def getInvalidations(self, tid):
invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
if invtid is None:
......@@ -327,18 +317,16 @@ class ZEOStorage:
% (len(invlist), u64(invtid)))
return invtid, invlist
def verify(self, oid, version, tid):
if version:
raise StorageServerError("Versions aren't supported.")
def verify(self, oid, tid):
try:
t = self.getTid(oid)
except KeyError:
self.client.invalidateVerify((oid, ""))
self.client.invalidateVerify(oid)
else:
if tid != t:
self.client.invalidateVerify((oid, ''))
self.client.invalidateVerify(oid)
def zeoVerify(self, oid, s, sv=None):
def zeoVerify(self, oid, s):
if not self.verifying:
self.verifying = 1
self.stats.verifying_clients += 1
......@@ -350,9 +338,6 @@ class ZEOStorage:
# could be caused by an object uncreation, in which case
# invalidation is right. It could be an application bug
# that left a dangling reference, in which case it's bad.
else:
if sv:
raise StorageServerError("Versions aren't supported.")
else:
if s != os:
self.client.invalidateVerify((oid, ''))
......@@ -483,9 +468,7 @@ class ZEOStorage:
# Most of the real implementations are in methods beginning with
# an _.
def storea(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
def storea(self, oid, serial, data, id):
self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1
self.txnlog.store(oid, serial, data)
......@@ -503,17 +486,13 @@ class ZEOStorage:
def storeBlobChunk(self, chunk):
os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
def storeBlobEnd(self, oid, serial, data, id):
fd, tempname = self.blob_tempfile
self.blob_tempfile = None
os.close(fd)
self.blob_log.append((oid, serial, data, tempname))
def storeBlobShared(self, oid, serial, data, filename, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
def storeBlobShared(self, oid, serial, data, filename, id):
# Reconstruct the full path from the filename in the OID directory
filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
filename)
......@@ -570,7 +549,7 @@ class ZEOStorage:
newserial = [(oid, err)]
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, ''))
self.invalidated.append(oid)
if isinstance(newserial, str):
newserial = [(oid, newserial)]
......@@ -633,8 +612,7 @@ class ZEOStorage:
def _undo(self, trans_id):
tid, oids = self.storage.undo(trans_id, self.transaction)
inv = [(oid, None) for oid in oids]
self.invalidated.extend(inv)
self.invalidated.extend(oids)
return tid, oids
# When a delayed transaction is restarted, the dance is
......@@ -723,20 +701,6 @@ class ZEOStorage:
else:
return 1
def modifiedInVersion(self, oid):
return ''
def versions(self):
return ()
def versionEmpty(self, version):
return True
def commitVersion(self, *a, **k):
raise NotImplementedError
abortVersion = commitVersion
# IStorageIteration support
def iterator_start(self, start, stop):
......@@ -785,7 +749,6 @@ class ZEOStorage:
item = (info.oid,
info.tid,
info.data,
info.version,
info.data_txn)
return item
......@@ -805,10 +768,7 @@ class StorageServerDB:
if version:
raise StorageServerError("Versions aren't supported.")
storage_id = self.storage_id
self.server.invalidate(
None, storage_id, tid,
[(oid, '') for oid in oids],
)
self.server.invalidate(None, storage_id, tid, oids)
for zeo_server in self.server.connections.get(storage_id, ())[:]:
try:
zeo_server.connection.poll()
......@@ -1081,7 +1041,7 @@ class StorageServer:
This is called from several ZEOStorage methods.
invalidated is a sequence of oid, empty-string pairs.
invalidated is a sequence of oids.
This can do three different things:
......@@ -1306,3 +1266,129 @@ class SlowMethodThread(threading.Thread):
self.delay.error(sys.exc_info())
else:
self.delay.reply(result)
class ClientStub:
def __init__(self, rpc):
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('beginVerify')
def invalidateVerify(self, args):
self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
def info(self, arg):
self.rpc.callAsync('info', arg)
def storeBlob(self, oid, serial, blobfilename):
def store():
yield ('receiveBlobStart', (oid, serial))
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('receiveBlobChunk', (oid, serial, chunk, ))
f.close()
yield ('receiveBlobStop', (oid, serial))
self.rpc.callAsyncIterator(store())
class ClientStub308(ClientStub):
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll(
'invalidateTransaction', tid, [(arg, '') for arg in args])
def invalidateVerify(self, oid):
self.rpc.callAsync('invalidateVerify', (oid, ''))
class ZEOStorage308Adapter:
def __init__(self, storage):
self.storage = storage
def getSerial(self, oid):
return self.storage.loadEx(oid)[1] # Z200
def history(self, oid, version, size=1):
if version:
raise ValueError("Versions aren't supported.")
return self.storage.history(oid, size)
def getInvalidations(self, tid):
result = self.storage.getInvalidations(tid)
if result is not None:
result = result[0], [(oid, '') for oid in result[1]]
return result
def verify(self, oid, version, tid):
if version:
raise StorageServerError("Versions aren't supported.")
return self.storage.verify(oid, tid)
def loadEx(self, oid, version=''):
if version:
raise StorageServerError("Versions aren't supported.")
data, serial = self.storage.loadEx(oid)
return data, serial, ''
def storea(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
self.storage.storea(oid, serial, data, id)
def storeBlobEnd(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
self.storage.storeBlobEnd(oid, serial, data, id)
def storeBlobShared(self, oid, serial, data, filename, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
self.storage.storeBlobShared(oid, serial, data, filename, id)
def getInfo(self):
result = self.storage.getInfo()
result['supportsVersions'] = False
return result
def zeoVerify(self, oid, s, sv=None):
if sv:
raise StorageServerError("Versions aren't supported.")
self.storage.zeoVerify(oid, s)
def modifiedInVersion(self, oid):
return ''
def versions(self):
return ()
def versionEmpty(self, version):
return True
def commitVersion(self, *a, **k):
raise NotImplementedError
abortVersion = commitVersion
def zeoLoad(self, oid): # Z200
p, s = self.storage,loadEx(oid)
return p, s, '', None, None
def __getattr__(self, name):
return getattr(self.storage, name)
......@@ -620,7 +620,7 @@ class InvqTests(CommonSetupTearDown):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage._server._last_invals,
(revid, [(oid, '')]))
(revid, [oid]))
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
......
......@@ -86,7 +86,7 @@ def encode_format(fmt):
def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
path='Data.fs'):
path='Data.fs', protocol=None):
"""Start a ZEO server in a separate process.
Takes two positional arguments a string containing the storage conf
......@@ -126,6 +126,9 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
if keep:
args.append("-k")
if protocol:
args.extend(["-v", protocol])
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
......@@ -276,7 +279,7 @@ def setUp(test):
servers = {}
def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
addr=None):
addr=None, path='Data.fs', protocol=None):
"""Start a ZEO server.
Return the server and admin addresses.
......@@ -289,7 +292,7 @@ def setUp(test):
elif addr is not None:
raise TypeError("Can't specify port and addr")
addr, adminaddr, pid, config_path = start_zeo_server(
storage_conf, zeo_conf, port, keep)
storage_conf, zeo_conf, port, keep, path, protocol)
os.remove(config_path)
servers[adminaddr] = pid
return addr, adminaddr
......
Test that multiple protocols are supported
==========================================
A full test of all protocols isn't practical. But we'll do a limited
test that at least the current and previous protocols are supported in
both directions.
Let's start a Z308 server
>>> storage_conf = '''
... <blobstorage>
... blob-dir server-blobs
... <filestorage>
... path Data.fs
... </filestorage>
... </blobstorage>
... '''
>>> addr, admin = start_server(
... storage_conf, dict(invalidation_queue_size=5), protocol='Z308')
A current client should be able to connect to a old server:
>>> import ZEO, ZODB.blob, transaction
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> db.storage._connection.peer_protocol_version
'Z308'
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> conn.root()['blob1'].open('w').write('blob data 1')
>>> transaction.commit()
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
>>> wait_connected(db2.storage)
>>> conn2 = db2.open()
>>> for i in range(5):
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> conn2.root()['blob2'].open('w').write('blob data 2')
>>> transaction.commit()
>>> conn.sync()
>>> conn.root().x
5
>>> db.close()
>>> for i in range(2):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
7
>>> db.close()
>>> for i in range(10):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
17
>>> conn.root()['blob1'].open().read()
'blob data 1'
>>> conn.root()['blob2'].open().read()
'blob data 2'
Note that when taking to a 3.8 server, iteration won't work:
>>> db.storage.iterator()
Traceback (most recent call last):
...
NotImplementedError
>>> db2.close()
>>> db.close()
>>> stop_server(admin)
>>> import os, zope.testing.setupstack
>>> os.remove('client-1.zec')
>>> zope.testing.setupstack.rmtree('blobs')
>>> zope.testing.setupstack.rmtree('server-blobs')
And the other way around:
>>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5))
Note that we'll have to pull some hijinks:
>>> import ZEO.zrpc.connection
>>> old_current_protocol = ZEO.zrpc.connection.Connection.current_protocol
>>> ZEO.zrpc.connection.Connection.current_protocol = 'Z308'
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> db.storage._connection.peer_protocol_version
'Z308'
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> conn.root()['blob1'].open('w').write('blob data 1')
>>> transaction.commit()
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
>>> wait_connected(db2.storage)
>>> conn2 = db2.open()
>>> for i in range(5):
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> conn2.root()['blob2'].open('w').write('blob data 2')
>>> transaction.commit()
>>> conn.sync()
>>> conn.root().x
5
>>> db.close()
>>> for i in range(2):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
7
>>> db.close()
>>> for i in range(10):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
17
>>> conn.root()['blob1'].open().read()
'blob data 1'
>>> conn.root()['blob2'].open().read()
'blob data 2'
>>> db2.close()
>>> db.close()
Undo the hijinks:
>>> ZEO.zrpc.connection.Connection.current_protocol = old_current_protocol
......@@ -9,8 +9,8 @@ multiple storage servers.
We'll create a Faux storage that has a registerDB method.
>>> class FauxStorage:
... invalidations = [('trans0', [('ob0', '')]),
... ('trans1', [('ob0', ''), ('ob1', '')]),
... invalidations = [('trans0', ['ob0']),
... ('trans1', ['ob0', 'ob1']),
... ]
... def registerDB(self, db):
... self.db = db
......@@ -85,24 +85,24 @@ Now, if we call invalidate, we'll see it propigate to the client:
>>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
invalidateTransaction trans2 1
[('ob1', ''), ('ob2', '')]
['ob1', 'ob2']
invalidateTransaction trans2 2
[('ob1', ''), ('ob2', '')]
['ob1', 'ob2']
>>> storage.db.invalidate('trans3', ['ob1', 'ob2'])
invalidateTransaction trans3 1
[('ob1', ''), ('ob2', '')]
['ob1', 'ob2']
invalidateTransaction trans3 2
[('ob1', ''), ('ob2', '')]
['ob1', 'ob2']
The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans3' [('ob1', ''), ('ob2', '')]
'trans2' [('ob1', ''), ('ob2', '')]
'trans1' [('ob0', ''), ('ob1', '')]
'trans0' [('ob0', '')]
'trans3' ['ob1', 'ob2']
'trans2' ['ob1', 'ob2']
'trans1' ['ob0', 'ob1']
'trans0' ['ob0']
If we call invalidateCache, the storage server will close each of it's
connections:
......@@ -117,5 +117,5 @@ The servers's invalidation queue will get reset
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans1' [('ob0', ''), ('ob1', '')]
'trans0' [('ob0', '')]
'trans1' ['ob0', 'ob1']
'trans0' ['ob0']
......@@ -818,7 +818,7 @@ class StorageServerWrapper:
return result
def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, '', id(transaction))
self.server.storea(oid, serial, data, id(transaction))
def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction))
......@@ -859,7 +859,7 @@ def multiple_storages_invalidation_queue_is_not_insane():
>>> trans, oids = s1.getInvalidations(last)
>>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for (oid, v) in oids])
>>> sorted([int(u64(oid)) for oid in oids])
[10, 11, 12, 13, 14]
>>> server.close_server()
......@@ -913,7 +913,7 @@ structure using lastTransactions.
>>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for (oid, _) in oids])
>>> sorted([int(u64(oid)) for oid in oids])
[0, 92, 93, 94, 95, 96, 97, 98, 99, 100]
(Note that the fact that we get oids for 92-100 is actually an
......@@ -961,7 +961,7 @@ transaction, we'll get a result:
>>> ntid == last[-1]
True
>>> sorted([int(u64(oid)) for (oid, _) in oids])
>>> sorted([int(u64(oid)) for oid in oids])
[0, 101, 102, 103, 104]
>>> fs.close()
......@@ -970,9 +970,11 @@ transaction, we'll get a result:
def tpc_finish_error():
r"""Server errors in tpc_finish weren't handled properly.
>>> import ZEO.ClientStorage
>>> import ZEO.ClientStorage, ZEO.zrpc.connection
>>> class Connection:
... peer_protocol_version = (
... ZEO.zrpc.connection.Connection.current_protocol)
... def __init__(self, client):
... self.client = client
... def get_addr(self):
......@@ -1127,6 +1129,22 @@ def client_has_newer_data_than_server():
"""
def history_over_zeo():
"""
>>> addr, _ = start_server()
>>> import ZEO, ZODB.blob, transaction
>>> db = ZEO.DB(addr)
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> db.close()
"""
slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests,
......@@ -1150,6 +1168,7 @@ def test_suite():
doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt',
'protocols.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
),
)
......
......@@ -13,6 +13,9 @@
##############################################################################
"""Helper file used to launch a ZEO server cross platform"""
from ZEO.StorageServer import StorageServer
from ZEO.runzeo import ZEOOptions
import asyncore
import os
import sys
......@@ -25,10 +28,6 @@ import asyncore
import threading
import logging
from ZEO.StorageServer import StorageServer
from ZEO.runzeo import ZEOOptions
def cleanup(storage):
# FileStorage and the Berkeley storages have this method, which deletes
# all files and directories used by the storage. This prevents @-files
......@@ -155,12 +154,15 @@ def main():
keep = 0
configfile = None
# Parse the arguments and let getopt.error percolate
opts, args = getopt.getopt(sys.argv[1:], 'kC:')
opts, args = getopt.getopt(sys.argv[1:], 'kC:v:')
for opt, arg in opts:
if opt == '-k':
keep = 1
elif opt == '-C':
configfile = arg
elif opt == '-v':
import ZEO.zrpc.connection
ZEO.zrpc.connection.Connection.current_protocol = arg
zo = ZEOOptions()
zo.realize(["-C", configfile])
......
......@@ -350,11 +350,11 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# If we're a client, an exhaustive list of the server protocols we
# can accept.
servers_we_can_talk_to = [current_protocol]
servers_we_can_talk_to = ["Z308", current_protocol]
# If we're a server, an exhaustive list of the client protocols we
# can accept.
clients_we_can_talk_to = ["Z200", "Z201", "Z303", current_protocol]
clients_we_can_talk_to = ["Z200", "Z201", "Z303", "Z308", current_protocol]
# This is pretty excruciating. Details:
#
......@@ -779,24 +779,25 @@ class Connection(smac.SizedMessageAsyncConnection, object):
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
__super_init = Connection.__init__
__super_close = Connection.close
# Servers use a shared server trigger that uses the asyncore socket map
trigger = trigger()
def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr
self.__super_init(sock, addr, obj, 'S')
self.obj.notifyConnected(self)
Connection.__init__(self, sock, addr, obj, 'S')
def handshake(self):
# Send the server's preferred protocol to the client.
self.message_output(self.current_protocol)
def recv_handshake(self, proto):
Connection.recv_handshake(self, proto)
self.obj.notifyConnected(self)
def close(self):
self.obj.notifyDisconnected()
self.__super_close()
Connection.close(self)
class ManagedClientConnection(Connection):
"""Client-side Connection subclass."""
......
......@@ -425,8 +425,7 @@ class DataRecord(object):
version = ''
def __init__(self, oid, tid, data, version, prev):
assert not version, "versions are no-longer supported"
def __init__(self, oid, tid, data, prev):
self.oid = oid
self.tid = tid
self.data = data
......
......@@ -1090,7 +1090,7 @@ class FileStorage(BaseStorage.BaseStorage,
pos = pos - 8 - u64(read(8))
seek(0)
return [(trans.tid, [(r.oid, '') for r in trans])
return [(trans.tid, [r.oid for r in trans])
for trans in FileIterator(self._file_name, pos=pos)]
finally:
self._lock_release()
......@@ -1711,7 +1711,7 @@ class TransactionRecordIterator(FileStorageFormatter):
class Record(BaseStorage.DataRecord):
def __init__(self, oid, tid, data, prev, pos):
super(Record, self).__init__(oid, tid, data, '', prev)
super(Record, self).__init__(oid, tid, data, prev)
self.pos = pos
......
......@@ -332,7 +332,7 @@ class TransactionRecord:
def __iter__(self):
for oid, data in self.data.items():
yield DataRecord(oid, self.tid, data, None)
yield DataRecord(oid, self.tid, data)
def pack(self, oid):
self.status = 'p'
......@@ -345,12 +345,12 @@ class DataRecord(object):
zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
version = ''
data_txn = None
def __init__(self, oid, tid, data, prev):
def __init__(self, oid, tid, data):
self.oid = oid
self.tid = tid
self.data = data
self.data_txn = prev
def DB(*args, **kw):
return ZODB.DB(MappingStorage(), *args, **kw)
......@@ -471,7 +471,7 @@ Now, we can call lastInvalidations on it:
True
>>> from ZODB.utils import u64
>>> [[int(u64(oid)) for (oid, version) in oids]
>>> [[int(u64(oid)) for oid in oids]
... for (i, oids) in invalidations]
... # doctest: +NORMALIZE_WHITESPACE
[[0, 91], [0, 92], [0, 93], [0, 94], [0, 95],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment