Commit d3526b09 authored by Jim Fulton's avatar Jim Fulton

Updated to support the registerDB framework which allows storages to

generate it's own invalidations.  Also updated to honor the storage
APIs more carefully.  These changes together allow a ClientStorage to
be served by a storage server.
parent 9efd46d7
...@@ -31,6 +31,9 @@ import warnings ...@@ -31,6 +31,9 @@ import warnings
import transaction import transaction
import ZODB.serialize
import ZEO.zrpc.error
from ZEO import ClientStub from ZEO import ClientStub
from ZEO.CommitLog import CommitLog from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer from ZEO.monitor import StorageStats, StatsServer
...@@ -625,24 +628,37 @@ class ZEOStorage: ...@@ -625,24 +628,37 @@ class ZEOStorage:
self.log(msg, logging.ERROR) self.log(msg, logging.ERROR)
err = StorageServerError(msg) err = StorageServerError(msg)
# The exception is reported back as newserial for this oid # The exception is reported back as newserial for this oid
newserial = err newserial = [(oid, err)]
else: else:
if serial != "\0\0\0\0\0\0\0\0": if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version)) self.invalidated.append((oid, version))
if newserial == ResolvedSerial:
self.stats.conflicts_resolved += 1 if isinstance(newserial, str):
self.log("conflict resolved oid=%s" % oid_repr(oid), BLATHER) newserial = [(oid, newserial)]
self.serials.append((oid, newserial))
if newserial:
for oid, s in newserial:
if s == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.log("conflict resolved oid=%s"
% oid_repr(oid), BLATHER)
self.serials.append((oid, s))
return err is None return err is None
def _vote(self): def _vote(self):
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
# the serialnos() call will deliver an exception that will be
# handled by the client in its tpc_vote() method.
serials = self.storage.tpc_vote(self.transaction)
if serials:
self.serials.extend(serials)
self.client.serialnos(self.serials) self.client.serialnos(self.serials)
# If a store call failed, then return to the client immediately. return
# The serialnos() call will deliver an exception that will be
# handled by the client in its tpc_vote() method.
if self.store_failed:
return
return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src): def _abortVersion(self, src):
tid, oids = self.storage.abortVersion(src, self.transaction) tid, oids = self.storage.abortVersion(src, self.transaction)
...@@ -741,6 +757,30 @@ class ZEOStorage: ...@@ -741,6 +757,30 @@ class ZEOStorage:
else: else:
return 1 return 1
class StorageServerDB:
def __init__(self, server, storage_id):
self.server = server
self.storage_id = storage_id
self.references = ZODB.serialize.referencesf
def invalidate(self, tid, oids, version=''):
storage_id = self.storage_id
self.server.invalidate(
None, storage_id, tid,
[(oid, version) for oid in oids],
)
for zeo_server in self.server.connections.get(storage_id, ())[:]:
try:
zeo_server.connection.poll()
except ZEO.zrpc.error.DisconnectedError:
pass
else:
break # We only need to pull one :)
def invalidateCache(self):
self.server._invalidateCache(self.storage_id)
class StorageServer: class StorageServer:
...@@ -845,17 +885,12 @@ class StorageServer: ...@@ -845,17 +885,12 @@ class StorageServer:
# The list is kept in sorted order with the most recent # The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than # invalidation at the front. The list never has more than
# self.invq_bound elements. # self.invq_bound elements.
self.invq_bound = invalidation_queue_size
self.invq = {} self.invq = {}
for name, storage in storages.items(): for name, storage in storages.items():
lastInvalidations = getattr(storage, 'lastInvalidations', None) self._setup_invq(name, storage)
if lastInvalidations is None: storage.registerDB(StorageServerDB(self, name))
self.invq[name] = [(storage.lastTransaction(), None)]
else:
self.invq[name] = list(
lastInvalidations(invalidation_queue_size)
)
self.invq[name].reverse()
self.invq_bound = invalidation_queue_size
self.connections = {} self.connections = {}
self.dispatcher = self.DispatcherClass(addr, self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection) factory=self.new_connection)
...@@ -875,6 +910,17 @@ class StorageServer: ...@@ -875,6 +910,17 @@ class StorageServer:
else: else:
self.monitor = None self.monitor = None
def _setup_invq(self, name, storage):
lastInvalidations = getattr(storage, 'lastInvalidations', None)
if lastInvalidations is None:
self.invq[name] = [(storage.lastTransaction(), None)]
else:
self.invq[name] = list(
lastInvalidations(self.invq_bound)
)
self.invq[name].reverse()
def _setup_auth(self, protocol): def _setup_auth(self, protocol):
# Can't be done in global scope, because of cyclic references # Can't be done in global scope, because of cyclic references
from ZEO.auth import get_module from ZEO.auth import get_module
...@@ -947,6 +993,49 @@ class StorageServer: ...@@ -947,6 +993,49 @@ class StorageServer:
stats.clients += 1 stats.clients += 1
return self.timeouts[storage_id], stats return self.timeouts[storage_id], stats
def _invalidateCache(self, storage_id):
"""We need to invalidate any caches we have.
This basically means telling our clients to
invalidate/revalidate their caches. We do this by closing them
and making them reconnect.
"""
# This method can be called from foreign threads. We have to
# worry about interaction with the main thread.
# 1. We modify self.invq which is read by get_invalidations
# below. This is why get_invalidations makes a copy of
# self.invq.
# 2. We access connections. There are two dangers:
#
# a. We miss a new connection. This is not a problem because
# if a client connects after we get the list of connections,
# then it will have to read the invalidation queue, which
# has already been reset.
#
# b. A connection is closes while we are iterating. This
# doesn't matter, bacause we can call should_close on a closed
# connection.
# Rebuild invq
self._setup_invq(storage_id, self.storages[storage_id])
connections = self.connections.get(storage_id, ())
# Make a copy since we are going to be mutating the
# connections indirectoy by closing them. We don't care about
# later transactions since they will have to validate their
# caches anyway.
connections = connections[:]
for p in connections:
try:
p.connection.should_close()
except ZEO.zrpc.error.DisconnectedError:
pass
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None): def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients. """Internal: broadcast info and invalidations to clients.
...@@ -972,6 +1061,27 @@ class StorageServer: ...@@ -972,6 +1061,27 @@ class StorageServer:
""" """
# This method can be called from foreign threads. We have to
# worry about interaction with the main thread.
# 1. We modify self.invq which is read by get_invalidations
# below. This is why get_invalidations makes a copy of
# self.invq.
# 2. We access connections. There are two dangers:
#
# a. We miss a new connection. This is not a problem because
# we are called while the storage lock is held. A new
# connection that tries to read data won't read committed
# data without first recieving an invalidation. Also, if a
# client connects after getting the list of connections,
# then it will have to read the invalidation queue, which
# has been updated to reflect the invalidations.
#
# b. A connection is closes while we are iterating. We'll need
# to cactch and ignore Disconnected errors.
if invalidated: if invalidated:
invq = self.invq[storage_id] invq = self.invq[storage_id]
if len(invq) >= self.invq_bound: if len(invq) >= self.invq_bound:
...@@ -980,7 +1090,11 @@ class StorageServer: ...@@ -980,7 +1090,11 @@ class StorageServer:
for p in self.connections.get(storage_id, ()): for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn: if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated) try:
p.client.invalidateTransaction(tid, invalidated)
except ZEO.zrpc.error.DisconnectedError:
pass
elif info is not None: elif info is not None:
p.client.info(info) p.client.info(info)
...@@ -994,7 +1108,13 @@ class StorageServer: ...@@ -994,7 +1108,13 @@ class StorageServer:
do full cache verification. do full cache verification.
""" """
invq = self.invq[storage_id] invq = self.invq[storage_id]
# We make a copy of invq because it might be modified by a
# foreign (other than main thread) calling invalidate above.
invq = invq[:]
if not invq: if not invq:
log("invq empty") log("invq empty")
return None, [] return None, []
......
Storage Servers should call registerDB on storages to propigate invalidations
=============================================================================
Storages servers propagate invalidations from their storages. Among
other things, this allows client storages to be used in storage
servers, allowing storage-server fan out, spreading read load over
multiple storage servers.
We'll create a Faux storage that has a registerDB method.
>>> class FauxStorage:
... invalidations = [('trans0', [('ob0', '')]),
... ('trans1', [('ob0', ''), ('ob1', '')]),
... ]
... def registerDB(self, db):
... self.db = db
... def isReadOnly(self):
... return False
... def getName(self):
... return 'faux'
... def lastTransaction(self):
... return self.invq[0][0]
... def lastInvalidations(self, size):
... return list(self.invalidations)
We dont' want the storage server to try to bind to a socket. We'll
subclass it and give it a do-nothing dispatcher "class":
>>> import ZEO.StorageServer
>>> class StorageServer(ZEO.StorageServer.StorageServer):
... DispatcherClass = lambda *a, **k: None
We'll create a storage instance and a storage server using it:
>>> storage = FauxStorage()
>>> server = StorageServer('addr', dict(t=storage))
Our storage now has a db attribute that provides IStorageDB. It's
references method is just the referencesf function from ZODB.Serialize
>>> import ZODB.serialize
>>> storage.db.references is ZODB.serialize.referencesf
True
To see the effects of the invalidation messages, we'll create a client
stub that implements the client invalidation calls:
>>> class Client:
... def __init__(self, name):
... self.name = name
... def invalidateTransaction(self, tid, invalidated):
... print 'invalidateTransaction', tid, self.name
... print invalidated
>>> class Connection:
... def __init__(self, mgr, obj):
... self.mgr = mgr
... self.obj = obj
... def should_close(self):
... print 'closed', self.obj.name
... self.mgr.close_conn(self)
... def poll(self):
... pass
>>> class ZEOStorage:
... def __init__(self, server, name):
... self.name = name
... self.connection = Connection(server, self)
... self.client = Client(name)
Now, we'll register the client with the storage server:
>>> _ = server.register_connection('t', ZEOStorage(server, 1))
>>> _ = server.register_connection('t', ZEOStorage(server, 2))
Now, if we call invalidate, we'll see it propigate to the client:
>>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
invalidateTransaction trans2 1
[('ob1', ''), ('ob2', '')]
invalidateTransaction trans2 2
[('ob1', ''), ('ob2', '')]
>>> storage.db.invalidate('trans3', ['ob1', 'ob2'], 'v')
invalidateTransaction trans3 1
[('ob1', 'v'), ('ob2', 'v')]
invalidateTransaction trans3 2
[('ob1', 'v'), ('ob2', 'v')]
The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans3' [('ob1', 'v'), ('ob2', 'v')]
'trans2' [('ob1', ''), ('ob2', '')]
'trans1' [('ob0', ''), ('ob1', '')]
'trans0' [('ob0', '')]
If we call invalidateCache, the storage server will close each of it's
connections:
>>> storage.db.invalidateCache()
closed 1
closed 2
The connections will then reopen and revalidate their caches.
The servers's invalidation queue will get reset
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans1' [('ob0', ''), ('ob1', '')]
'trans0' [('ob0', '')]
...@@ -26,6 +26,9 @@ import time ...@@ -26,6 +26,9 @@ import time
import unittest import unittest
import shutil import shutil
import zope.testing.setupstack
from zope.testing import doctest
# ZODB test support # ZODB test support
import ZODB import ZODB
import ZODB.tests.util import ZODB.tests.util
...@@ -150,11 +153,13 @@ class GenericTests( ...@@ -150,11 +153,13 @@ class GenericTests(
self._servers = [adminaddr] self._servers = [adminaddr]
self._conf_path = path self._conf_path = path
if not self.blob_cache_dir: if not self.blob_cache_dir:
self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage # This is the blob cache for ClientStorage
self._storage = ClientStorage(zport, '1', cache_size=20000000, self.blob_cache_dir = tempfile.mkdtemp()
min_disconnect_poll=0.5, wait=1, self._storage = ClientStorage(
wait_timeout=60, blob_dir=self.blob_cache_dir, zport, '1', cache_size=20000000,
blob_cache_writable=self.blob_cache_writable) min_disconnect_poll=0.5, wait=1,
wait_timeout=60, blob_dir=self.blob_cache_dir,
blob_cache_writable=self.blob_cache_writable)
self._storage.registerDB(DummyDB()) self._storage.registerDB(DummyDB())
def tearDown(self): def tearDown(self):
...@@ -816,10 +821,20 @@ test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests, ...@@ -816,10 +821,20 @@ test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests,
BlobAdaptedFileStorageTests, BlobWritableCacheTests] BlobAdaptedFileStorageTests, BlobWritableCacheTests]
def zeoFanOutSetup(test):
zope.testing.setupstack.setUpDirectory(test)
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp, suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown)) tearDown=ZODB.tests.util.tearDown))
suite.addTest(doctest.DocFileSuite('registerDB.test'))
suite.addTest(
doctest.DocFileSuite('zeo-fan-out.test',
setUp=zeoFanOutSetup,
tearDown=zope.testing.setupstack.tearDown,
),
)
for klass in test_classes: for klass in test_classes:
sub = unittest.makeSuite(klass, "check") sub = unittest.makeSuite(klass, "check")
suite.addTest(sub) suite.addTest(sub)
......
ZEO Fan Out
===========
We should be able to set up ZEO servers with ZEO clients. Let's see
if we can make it work.
We'll use some helper functions. The first is a helpter that starts
ZEO servers for us and another one that picks ports.
We'll start the first server:
>>> import ZEO.tests.forker, ZEO.tests.testZEO
>>> port0 = ZEO.tests.testZEO.get_port()
>>> zconf0 = ZEO.tests.forker.ZEOConfig(('', port0))
>>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
Then we''ll start 2 others that use this one:
>>> port1 = ZEO.tests.testZEO.get_port()
>>> zconf1 = ZEO.tests.forker.ZEOConfig(('', port1))
>>> zport1, adminaddr1, pid1, path1 = ZEO.tests.forker.start_zeo_server(
... '<zeoclient 1>\n server %s\n</zeoclient>\n' % port0,
... zconf1, port1)
>>> port2 = ZEO.tests.testZEO.get_port()
>>> zconf2 = ZEO.tests.forker.ZEOConfig(('', port2))
>>> zport2, adminaddr2, pid2, path2 = ZEO.tests.forker.start_zeo_server(
... '<zeoclient 1>\n server %s\n</zeoclient>\n' % port0,
... zconf2, port2)
Now, let's create some client storages that connect to these:
>>> import ZEO.ClientStorage
>>> cs1 = ZEO.ClientStorage.ClientStorage(('', port1), '1')
>>> cs2 = ZEO.ClientStorage.ClientStorage(('', port2), '1')
And some databases and connections around these:
>>> from ZODB.DB import DB
>>> import transaction
>>> db1 = DB(cs1)
>>> tm1 = transaction.TransactionManager()
>>> c1 = db1.open(transaction_manager=tm1)
>>> r1 = c1.root()
>>> r1
{}
>>> db2 = DB(cs2)
>>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root()
>>> r2
{}
>>> db2 = DB(cs2)
>>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root()
>>> r2
{}
If we update c1, we'll eventually see the change in c2:
>>> import persistent.mapping
>>> r1[1] = persistent.mapping.PersistentMapping()
>>> r1[1].v = 1000
>>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2].v = -1000
>>> tm1.commit()
>>> import time
>>> for i in range(100):
... t = tm2.begin()
... if 1 in r2:
... break
... time.sleep(0.01)
>>> r2[1].v
1000
>>> r2[2].v
-1000
Now, let's see if we can break it. :)
>>> def f():
... for i in range(100):
... r1[1].v -= 1
... r1[2].v += 1
... tm1.commit()
... time.sleep(0.01)
>>> import threading
>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> for i in range(1000):
... t = tm2.begin()
... if r2[1].v + r2[2].v:
... print 'oops', r2[1], r2[2]
... if r1[1].v == 900:
... break # we caught up
... time.sleep(0.01)
>>> thread.join()
If we shutdown and restart the source server, the variables will be
invalidated:
>>> ZEO.tests.forker.shutdown_zeo_server(adminaddr0)
>>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
>>> for i in range(1000):
... c1.sync()
... c2.sync()
... if (
... (r1[1]._p_changed is None)
... and
... (r1[2]._p_changed is None)
... and
... (r2[1]._p_changed is None)
... and
... (r2[2]._p_changed is None)
... ):
... print 'Cool'
... break
... time.sleep(0.01)
... else:
... print 'Dang'
Cool
Cleanup:
>>> db1.close()
>>> db2.close()
>>> ZEO.tests.forker.shutdown_zeo_server(adminaddr0)
>>> ZEO.tests.forker.shutdown_zeo_server(adminaddr1)
>>> ZEO.tests.forker.shutdown_zeo_server(adminaddr2)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment