Commit 3e91643e authored by Barry Warsaw's avatar Barry Warsaw

Based on suggestions by Toby, we're now throwing checkpointing into a

thread, instead of doing it every nth ZODB transaction.  We can
actually provide a base class for both the checkpointing and
autopacking threads here.  Changes here include:

SLEEP_TIME: how often (in seconds) a background thread should wake up
to see if there's work to do.

True, False: Add these for older Pythons.

PackStop: New class that acts as an escape hatch for pack operations.

BerkeleyConfig:
    - change interval from 100 to 0.  Now that this controls the
      checkpointing thread, the current default is to not spawn the
      thread.  I'm probably going to change this once I figure out
      what a good value is.

BerkeleyBase:
    - __init__(): Start the checkpointing thread if interval > 0

    - _setupDB(): Add some additional keyword args so that the QUEUE
      style tables can use this convenience method too.

    - close(): Be sure to stop and join the checkpointing thread

    - _docheckpoint(): Removed

    - _withtxn(): Catch PackStop escape hatch exceptions.  This one
      aborts the current Berkeley transaction but eats the exception.
      Also, don't call _docheckpoint() here.

     - docheckpoint(): New method which the checkpointing threads can
       call.

env_from_string(): use DB_RECOVER_FATAL for autorecovery on open.

_WorkThread: New base class for the checkpointing and autopacking
threads.

_CheckPoint: The common checkpointing thread class.
parent a60ee322
......@@ -14,9 +14,12 @@
"""Base class for BerkeleyStorage implementations.
"""
__version__ = '$Revision: 1.23 $'.split()[-2:][0]
import os
import time
import errno
import threading
from types import StringType
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
......@@ -30,10 +33,27 @@ from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf
import ThreadLock
import zLOG
GBYTES = 1024 * 1024 * 1000
__version__ = '$Revision: 1.22 $'.split()[-2:][0]
# Maximum number of seconds for background thread to sleep before checking to
# see if it's time for another autopack run. Lower numbers mean more
# processing, higher numbers mean less responsiveness to shutdown requests.
# 10 seconds seems like a good compromise. Note that if the check interval is
# less than the sleep time, the minimum will be used.
SLEEP_TIME = 10
try:
True, False
except NameError:
True = 1
False = 0
class PackStop(Exception):
"""Escape hatch for pack operations."""
......@@ -53,9 +73,10 @@ class BerkeleyConfig:
The following checkpointing attributes are supported:
- interval indicates the approximate number of Berkeley transaction
commits and aborts after which a checkpoint is performed. Berkeley
transactions are performed after ZODB aborts, commits, and stores.
- interval indicates how often, in seconds, a Berkeley checkpoint is
performed. If this is non-zero, checkpointing is performed by a
background thread. Otherwise checkpointing will only be done when the
storage is closed. You really want to enable checkpointing. ;)
- kbytes is passed directly to txn_checkpoint()
......@@ -98,7 +119,7 @@ class BerkeleyConfig:
never automatically do classic packs. For Minimal storage, this value
is ignored -- all packs are classic packs.
"""
interval = 100
interval = 0
kbyte = 0
min = 0
logdir = None
......@@ -142,12 +163,10 @@ class BerkeleyBase(BaseStorage):
Optional config must be a BerkeleyConfig instance, or None, which
means to use the default configuration options.
"""
# sanity check arguments
if config is None:
config = BerkeleyConfig()
self._config = config
self._config._counter = 0
if name == '':
raise TypeError, 'database name is empty'
......@@ -167,24 +186,36 @@ class BerkeleyBase(BaseStorage):
# Instantiate a pack lock
self._packlock = ThreadLock.allocate_lock()
self._autopacker = None
self._stop = False
# Initialize a few other things
self._prefix = prefix
# Give the subclasses a chance to interpose into the database setup
# procedure
self._tables = []
self._setupDBs()
# Initialize the object id counter.
self._init_oid()
if config.interval > 0:
self._checkpointer = _Checkpoint(self, config.interval)
self._checkpointer.start()
else:
self._checkpointer = None
def _setupDB(self, name, flags=0):
def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
"""Open an individual database with the given flags.
flags are passed directly to the underlying DB.set_flags() call.
Optional dbtype specifies the type of BerkeleyDB access method to
use. Optional reclen if not None gives the record length.
"""
d = db.DB(self._env)
if flags:
d.set_flags(flags)
# Our storage is based on the underlying BSDDB btree database type.
d.open(self._prefix + name, db.DB_BTREE, db.DB_CREATE)
if reclen is not None:
d.set_re_len(reclen)
d.open(self._prefix + name, dbtype, db.DB_CREATE)
self._tables.append(d)
return d
def _setupDBs(self):
......@@ -270,6 +301,14 @@ class BerkeleyBase(BaseStorage):
"""Close the storage by closing the databases it uses and by closing
its environment.
"""
# Close all the tables
if self._checkpointer:
zLOG.LOG('Full storage', zLOG.INFO,
'stopping checkpointing thread')
self._checkpointer.stop()
self._checkpointer.join(SLEEP_TIME * 2)
for d in self._tables:
d.close()
# As recommended by Keith Bostic @ Sleepycat, we need to do
# two checkpoints just before we close the environment.
# Otherwise, auto-recovery on environment opens can be
......@@ -285,15 +324,6 @@ class BerkeleyBase(BaseStorage):
self._env.close()
os.unlink(lockfile)
def _docheckpoint(self):
# Periodically checkpoint the database. This is called approximately
# once per Berkeley transaction commit or abort.
config = self._config
config._counter += 1
if config._counter > config.interval:
self._env.txn_checkpoint(config.kbyte, config.min)
config._counter = 0
def _update(self, deltas, data, incdec):
refdoids = []
referencesf(data, refdoids)
......@@ -316,16 +346,27 @@ class BerkeleyBase(BaseStorage):
txn = self._env.txn_begin()
try:
ret = meth(txn, *args, **kws)
except PackStop:
# Escape hatch for shutdown during pack. Like the bare except --
# i.e. abort the transaction -- but swallow the exception.
txn.abort()
except:
#import traceback ; traceback.print_exc()
txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
return ret
def docheckpoint(self):
config = self._config
self._lock_acquire()
try:
if not self._stop:
self._env.txn_checkpoint(config.kbyte, config.min)
finally:
self._lock_release()
def env_from_string(envname, config):
......@@ -356,10 +397,55 @@ def env_from_string(envname, config):
gbytes, bytes = divmod(config.cachesize, GBYTES)
env.set_cachesize(gbytes, bytes)
env.open(envname,
db.DB_CREATE # create underlying files as necessary
| db.DB_RECOVER # run normal recovery before opening
| db.DB_INIT_MPOOL # initialize shared memory buffer pool
| db.DB_INIT_TXN # initialize transaction subsystem
| db.DB_THREAD # we use the environment from other threads
db.DB_CREATE # create underlying files as necessary
| db.DB_RECOVER_FATAL # run normal recovery before opening
| db.DB_INIT_MPOOL # initialize shared memory buffer pool
| db.DB_INIT_TXN # initialize transaction subsystem
| db.DB_THREAD # we use the environment from other threads
)
return env, lockfile
class _WorkThread(threading.Thread):
def __init__(self, storage, checkinterval, name='work'):
threading.Thread.__init__(self)
self._storage = storage
self._interval = checkinterval
self._name = name
# Bookkeeping
self._stop = False
self._nextcheck = checkinterval
# We don't want these threads to hold up process exit. That could
# lead to corrupt databases, but recovery should ultimately save us.
self.setDaemon(True)
def run(self):
name = self._name
zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread started' % name)
while not self._stop:
now = time.time()
if now > self._nextcheck:
zLOG.LOG('Berkeley storage', zLOG.INFO, 'running %s' % name)
self._dowork(now)
self._nextcheck = now + self._interval
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._interval and SLEEP_TIME so as to be as
# responsive as possible to .stop() calls.
time.sleep(min(self._interval, SLEEP_TIME))
zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread finished' % name)
def stop(self):
self._stop = True
def _dowork(self):
pass
class _Checkpoint(_WorkThread):
def __init__(self, storage, interval):
_WorkThread.__init__(self, storage, interval, 'checkpointing')
def _dowork(self, now):
self._storage.docheckpoint()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment