Commit c8e3741d authored by Jim Fulton's avatar Jim Fulton

Fixed bug: The new option to drop the cache on verify didn't leave the

cache or the client storage in a valid state.

Also, changed the drop-cache feature to:

- Log a critical message and to
- Publish an event.  Some applications might want to captre the event
  and exit the process or take some drastic action when the cahe needs
  to be dropped.
parent fbf46870
...@@ -260,13 +260,14 @@ setup(name="ZODB3", ...@@ -260,13 +260,14 @@ setup(name="ZODB3",
'zdaemon', 'zdaemon',
], ],
install_requires = [ install_requires = [
'transaction',
'zc.lockfile',
'ZConfig',
'zdaemon',
'zope.event',
'zope.interface', 'zope.interface',
'zope.proxy', 'zope.proxy',
'zope.testing', 'zope.testing',
'ZConfig',
'zdaemon',
'transaction',
'zc.lockfile',
], ],
zip_safe = False, zip_safe = False,
entry_points = """ entry_points = """
......
...@@ -19,7 +19,19 @@ ClientStorage -- the main class, implementing the Storage API ...@@ -19,7 +19,19 @@ ClientStorage -- the main class, implementing the Storage API
""" """
from persistent.TimeStamp import TimeStamp
from ZEO.auth import get_module
from ZEO.cache import ClientCache
from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO import ServerStub
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.zrpc.client import ConnectionManager
from ZODB.blob import rename_or_copy_blob
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
import cPickle import cPickle
import logging
import os import os
import socket import socket
import stat import stat
...@@ -28,24 +40,13 @@ import tempfile ...@@ -28,24 +40,13 @@ import tempfile
import threading import threading
import time import time
import types import types
import logging
import weakref import weakref
import zope.interface
from ZEO import ServerStub
from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
import ZODB.interfaces
import zc.lockfile import zc.lockfile
import ZEO.interfaces
import ZODB.BaseStorage import ZODB.BaseStorage
from ZODB import POSException import ZODB.interfaces
from ZODB import utils import zope.event
from ZODB.blob import rename_or_copy_blob import zope.interface
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -112,7 +113,7 @@ class ClientStorage(object): ...@@ -112,7 +113,7 @@ class ClientStorage(object):
def __init__(self, addr, storage='1', cache_size=20 * MB, def __init__(self, addr, storage='1', cache_size=20 * MB,
name='', client=None, debug=0, var=None, name='', client=None, debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300, min_disconnect_poll=1, max_disconnect_poll=30,
wait_for_server_on_startup=None, # deprecated alias for wait wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None, wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0, read_only=0, read_only_fallback=0,
...@@ -224,12 +225,6 @@ class ClientStorage(object): ...@@ -224,12 +225,6 @@ class ClientStorage(object):
"%s ClientStorage(): debug argument is no longer used", "%s ClientStorage(): debug argument is no longer used",
self.__name__) self.__name__)
# Remember some parameters for "_setupCache"
self._var_ = var
self._storage_ = storage
self._client_ = client
self._cache_size_ = cache_size
self._drop_cache_rather_verify = drop_cache_rather_verify self._drop_cache_rather_verify = drop_cache_rather_verify
# wait defaults to True, but wait_for_server_on_startup overrides # wait defaults to True, but wait_for_server_on_startup overrides
...@@ -357,7 +352,13 @@ class ClientStorage(object): ...@@ -357,7 +352,13 @@ class ClientStorage(object):
else: else:
self.fshelper = None self.fshelper = None
self._setupCache() if client is not None:
dir = var or os.getcwd()
cache_path = os.path.join(dir, "%s-%s.zec" % (client, storage))
else:
cache_path = None
self._cache = self.ClientCacheClass(cache_path, size=cache_size)
self._rpc_mgr = self.ConnectionManagerClass(addr, self, self._rpc_mgr = self.ConnectionManagerClass(addr, self,
tmin=min_disconnect_poll, tmin=min_disconnect_poll,
...@@ -372,19 +373,6 @@ class ClientStorage(object): ...@@ -372,19 +373,6 @@ class ClientStorage(object):
if not self._rpc_mgr.attempt_connect(): if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect() self._rpc_mgr.connect()
def _setupCache(self):
'''create and open the cache.'''
# Decide whether to use non-temporary files
storage = self._storage_
client = self._client_
cache_size = self._cache_size_
if client is not None:
dir = self._var_ or os.getcwd()
cache_path = os.path.join(dir, "%s-%s.zec" % (client, storage))
else:
cache_path = None
self._cache = self.ClientCacheClass(cache_path, size=cache_size)
def _wait(self, timeout=None): def _wait(self, timeout=None):
if timeout is not None: if timeout is not None:
deadline = time.time() + timeout deadline = time.time() + timeout
...@@ -1276,11 +1264,12 @@ class ClientStorage(object): ...@@ -1276,11 +1264,12 @@ class ClientStorage(object):
self._db.invalidateCache() self._db.invalidateCache()
if self._cache and self._drop_cache_rather_verify: if self._cache and self._drop_cache_rather_verify:
logger.info("%s dropping cache", self.__name__) logger.critical("%s dropping stale cache", self.__name__)
self._cache.close() zope.event.notify(ZEO.interfaces.CacheDroppedEvent())
self._setupCache() # creates a new cache self._cache.clear()
self._server = server if ltid:
self._ready.set() self._cache.setLastTid(ltid)
self.finish_verification()
return "cache dropped" return "cache dropped"
logger.info("%s Verifying cache", self.__name__) logger.info("%s Verifying cache", self.__name__)
......
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
import zope.interface import zope.interface
class CacheDroppedEvent(object):
"""A ZEO Cache file was dropped to avoid verification
"""
class IServeable(zope.interface.Interface): class IServeable(zope.interface.Interface):
"""Interface provided by storages that can be served by ZEO """Interface provided by storages that can be served by ZEO
""" """
......
Avoiding cache verifification
=============================
For large databases it is common to also use very large ZEO cache
files. If a client has beed disconnected for too long, cache verification
might be necessary, but cache verification can be very hard on the
storage server.
ClientStorage provides an option to drop it's cache rather than doing
verification. When this option is used, and verification would be
necessary, ClientStorage:
- Invalidates all object caches
- Drops or clears it's client cache. (The end result is that the cache
is working but empty.)
- Logs a CRITICAL message.
- Publishes a ZEO.interfaces.CacheDroppedEvent event.
Here's an example that shows that this is actually what happens.
Start a server, create a cient to it and commit some data
>>> addr, admin = start_server(keep=1)
>>> import ZEO, transaction
>>> db = ZEO.DB(addr, drop_cache_rather_verify=True, client='cache',
... name='test')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root()[1] = conn.root().__class__()
>>> conn.root()[1].x = 1
>>> transaction.commit()
>>> len(db.storage._cache)
3
Now, we'll stop the server and restart with a different address:
>>> stop_server(admin)
>>> addr2, admin = start_server(keep=1)
And create another client and write some data to it:
>>> db2 = ZEO.DB(addr2)
>>> wait_connected(db2.storage)
>>> conn2 = db2.open()
>>> for i in range(5):
... conn2.root()[1].x += 1
... transaction.commit()
>>> db2.close()
>>> stop_server(admin)
Now, we'll restart the server. Before we do that, we'll capture
logging and event data:
>>> import logging, zope.testing.loggingsupport, zope.event
>>> handler = zope.testing.loggingsupport.InstalledHandler(
... 'ZEO.ClientStorage', level=logging.ERROR)
>>> events = []
>>> zope.event.subscribers.append(events.append)
Now, we'll restart the server on the original address:
>>> _, admin = start_server(zeo_conf=dict(invalidation_queue_size=1),
... addr=addr, keep=1)
>>> wait_connected(db.storage)
Now, let's verify our assertions above:
- Drops or clears it's client cache. (The end result is that the cache
is working but empty.)
>>> len(db.storage._cache)
0
- Invalidates all object caches
>>> transaction.abort()
>>> conn.root()._p_changed
- Logs a CRITICAL message.
>>> print handler
ZEO.ClientStorage CRITICAL
test dropping stale cache
>>> handler.clear()
- Publishes a cache-dropped event.
>>> for e in events:
... print e.__class__.__name__
CacheDroppedEvent
>>> del events[:]
If we access the root object, it'll be loaded from the server:
>>> conn.root()[1].x
6
>>> len(db.storage._cache)
2
Similarly, if we simply disconnect the client, and write data from
another client:
>>> db.close()
>>> db2 = ZEO.DB(addr)
>>> wait_connected(db2.storage)
>>> conn2 = db2.open()
>>> for i in range(5):
... conn2.root()[1].x += 1
... transaction.commit()
>>> db2.close()
>>> db = ZEO.DB(addr, drop_cache_rather_verify=True, client='cache',
... name='test')
>>> wait_connected(db.storage)
- Drops or clears it's client cache. (The end result is that the cache
is working but empty.)
>>> len(db.storage._cache)
1
(When a database is created, it checks to make sure the root object is
in the database, which is why we get 1, rather than 0 objects in the cache.)
- Logs a CRITICAL message.
>>> print handler
ZEO.ClientStorage CRITICAL
test dropping stale cache
>>> handler.clear()
- Publishes a cache-dropped event.
>>> for e in events:
... print e.__class__.__name__
CacheDroppedEvent
If we access the root object, it'll be loaded from the server:
>>> conn = db.open()
>>> conn.root()[1].x
11
>>> db.close()
...@@ -148,35 +148,8 @@ class MiscZEOTests: ...@@ -148,35 +148,8 @@ class MiscZEOTests:
self.assertNotEquals(ZODB.utils.z64, storage3.lastTransaction()) self.assertNotEquals(ZODB.utils.z64, storage3.lastTransaction())
storage3.close() storage3.close()
def checkDropCacheRatherVerifyImplementation(self):
# As it is quite difficult to set things up such that the verification
# optimizations do not step in, we emulate both the cache
# as well as the server.
from ZODB.TimeStamp import TimeStamp
class CacheEmulator(object):
# the settings below would be inconsitent for a normal cache
# but they are sufficient for our test setup
def __len__(self): return 1 # claim not to be empty
def contents(self): return () # do not invalidate anything
def getLastTid(self): return
def close(self): pass
class ServerEmulator(object):
def verify(*unused): pass
def endZeoVerify(*unused): pass
def lastTransaction(*unused): pass
storage = self._storage
storage._cache = cache = CacheEmulator()
server = ServerEmulator()
# test the standard behaviour
self.assertEqual(storage.verify_cache(server), "full verification")
# test the "drop cache rather verify" behaviour
storage._drop_cache_rather_verify = True
self.assertEqual(storage.verify_cache(server), "cache dropped")
# verify that we got a new cache
self.assert_(cache != storage._cache)
class ConfigurationTests(unittest.TestCase): class ConfigurationTests(unittest.TestCase):
def checkDropCacheRatherVerifyConfiguration(self): def checkDropCacheRatherVerifyConfiguration(self):
from ZODB.config import storageFromString from ZODB.config import storageFromString
# the default is to do verification and not drop the cache # the default is to do verification and not drop the cache
...@@ -1118,6 +1091,7 @@ def test_suite(): ...@@ -1118,6 +1091,7 @@ def test_suite():
zeo.addTest( zeo.addTest(
doctest.DocFileSuite( doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test', 'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
), ),
) )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment