Commit 8ce1295b authored by Chris McDonough's avatar Chris McDonough

Merge transience changes from chrism-pre27-branch.

parent 7311c6f6
...@@ -42,6 +42,7 @@ Data Structures Maintained by a Transient Object Container ...@@ -42,6 +42,7 @@ Data Structures Maintained by a Transient Object Container
inside of the "_data" structure. There is a concept of a inside of the "_data" structure. There is a concept of a
"current" bucket, which is the bucket that is contained within the "current" bucket, which is the bucket that is contained within the
_data structured with a key equal to the "current" timeslice. _data structured with a key equal to the "current" timeslice.
A current bucket must always exist (this is an invariant).
- A "max_timeslice" integer, which is equal to the "largest" - A "max_timeslice" integer, which is equal to the "largest"
timeslice for which there exists a bucket in the _data structure. timeslice for which there exists a bucket in the _data structure.
...@@ -74,10 +75,13 @@ Housekeeping: Finalization, Garbage Collection, and Bucket ...@@ -74,10 +75,13 @@ Housekeeping: Finalization, Garbage Collection, and Bucket
Replentishing Replentishing
The TOC performs "finalization", "garbage collection", and "bucket The TOC performs "finalization", "garbage collection", and "bucket
replentishing". It performs these tasks "in-band". This means that replentishing". It typically performs these tasks "in-band"
the TOC does not maintain a separate thread that wakes up every so (although it is possible to do the housekeeping tasks "out of band"
often to do these housekeeping tasks. Instead, during the course of as well: see the methods of the Transient Object Container with
normal operations, the TOC opportunistically performs them. "housekeep" in their names). "In band" housekeeping implies that
the TOC does not maintain a separate thread or process that wakes up
every so often to clean up. Instead, during the course of normal
operations, the TOC opportunistically performs housekeeping functions.
Finalization is defined as optionally calling a function at bucket Finalization is defined as optionally calling a function at bucket
expiration time against all transient objects contained within that expiration time against all transient objects contained within that
......
import time
class PreventTransactionCommit(Exception):
def __init__(self, reason):
self. reason = reason
def __str__(self):
return "Uncommittable transaction: " % self.reason
class UncommittableJar:
""" A jar that cannot be committed """
def __init__(self, reason):
self.reason = reason
self.time = time.time()
def sort_key(self):
return self.time()
def tpc_begin(self, *arg, **kw):
pass
def commit(self, obj, transaction):
pass
def tpc_vote(self, transaction):
raise PreventTransactionCommit(self.reason)
class makeTransactionUncommittable:
"""
- register an uncommittable object with the provided transaction
which prevents the commit of that transaction
"""
def __init__(self, transaction, reason):
self._p_jar = UncommittableJar(reason)
transaction.register(self)
...@@ -30,13 +30,13 @@ from TransienceInterfaces import Transient, DictionaryLike, ItemWithId,\ ...@@ -30,13 +30,13 @@ from TransienceInterfaces import Transient, DictionaryLike, ItemWithId,\
TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\ TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
StringKeyedHomogeneousItemContainer, TransientItemContainer StringKeyedHomogeneousItemContainer, TransientItemContainer
from BTrees.Length import Length from BTrees.Length import Length as BTreesLength
from BTrees.OOBTree import OOBTree from BTrees.OOBTree import OOBTree
from BTrees.IOBTree import IOBTree from BTrees.IOBTree import IOBTree
from ZODB.POSException import ConflictError
from Persistence import Persistent from Persistence import Persistent
from OFS.SimpleItem import SimpleItem from OFS.SimpleItem import SimpleItem
from ZPublisher.Publish import Retry
from AccessControl import ClassSecurityInfo, getSecurityManager from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager, \ from AccessControl.SecurityManagement import newSecurityManager, \
setSecurityManager setSecurityManager
...@@ -129,6 +129,7 @@ class TransientObjectContainer(SimpleItem): ...@@ -129,6 +129,7 @@ class TransientObjectContainer(SimpleItem):
_limit = 0 _limit = 0
_data = None _data = None
_inband_housekeeping = True
security.setDefaultAccess('deny') security.setDefaultAccess('deny')
...@@ -206,7 +207,7 @@ class TransientObjectContainer(SimpleItem): ...@@ -206,7 +207,7 @@ class TransientObjectContainer(SimpleItem):
# We make enough buckets initially to last us a while, and # We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old # we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see # buckets as necessary during normal operations (see
# _gc() and _replentish()). # _replentish() and _gc()).
self._data = DATA_CLASS() self._data = DATA_CLASS()
# populate _data with some number of buckets, each of which # populate _data with some number of buckets, each of which
...@@ -232,6 +233,10 @@ class TransientObjectContainer(SimpleItem): ...@@ -232,6 +233,10 @@ class TransientObjectContainer(SimpleItem):
# each expired item. # each expired item.
self._last_finalized_timeslice = Increaser(-self._period) self._last_finalized_timeslice = Increaser(-self._period)
# '_last_gc_timeslice' is a value that indicates in which
# timeslice the garbage collection process was last run.
self._last_gc_timeslice = Increaser(-self._period)
# our "_length" is the number of "active" data objects in _data. # our "_length" is the number of "active" data objects in _data.
# it does not include items that are still kept in _data but need to # it does not include items that are still kept in _data but need to
# be garbage collected. # be garbage collected.
...@@ -239,8 +244,10 @@ class TransientObjectContainer(SimpleItem): ...@@ -239,8 +244,10 @@ class TransientObjectContainer(SimpleItem):
# we need to maintain the length of the index structure separately # we need to maintain the length of the index structure separately
# because getting the length of a BTree is very expensive, and it # because getting the length of a BTree is very expensive, and it
# doesn't really tell us which ones are "active" anyway. # doesn't really tell us which ones are "active" anyway.
try: self._length.set(0) try:
except AttributeError: self._length = self.getLen = Length() self._length.set(0)
except AttributeError:
self._length = self.getLen = Length2()
def _getCurrentSlices(self, now): def _getCurrentSlices(self, now):
if self._timeout_slices: if self._timeout_slices:
...@@ -269,15 +276,14 @@ class TransientObjectContainer(SimpleItem): ...@@ -269,15 +276,14 @@ class TransientObjectContainer(SimpleItem):
bucket = self._data.get(0) bucket = self._data.get(0)
return bucket.get(k, default) return bucket.get(k, default)
# always call finalize if self._inband_housekeeping:
self._finalize(current_ts) self._housekeep(current_ts)
# call gc and/or replentish on an only-as needed basis
if self._roll(current_ts, 'replentish'):
self._replentish(current_ts)
if self._roll(current_ts, 'gc'): else:
self._gc(current_ts) # dont allow the TOC to stop working in an emergency bucket
# shortage
if self._in_emergency_bucket_shortage(current_ts):
self._replentish(current_ts)
# SUBTLETY ALERTY TO SELF: do not "improve" the code below # SUBTLETY ALERTY TO SELF: do not "improve" the code below
# unnecessarily, as it will end only in tears. The lack of aliases # unnecessarily, as it will end only in tears. The lack of aliases
...@@ -288,7 +294,8 @@ class TransientObjectContainer(SimpleItem): ...@@ -288,7 +294,8 @@ class TransientObjectContainer(SimpleItem):
found_ts = None found_ts = None
for ts in current_slices: for ts in current_slices:
abucket = self._data.get(ts, None) abucket = self._data.get(ts, None) # XXX ReadConflictError hotspot
if abucket is None: if abucket is None:
DEBUG and TLOG('_move_item: no bucket for ts %s' % ts) DEBUG and TLOG('_move_item: no bucket for ts %s' % ts)
continue continue
...@@ -348,13 +355,12 @@ class TransientObjectContainer(SimpleItem): ...@@ -348,13 +355,12 @@ class TransientObjectContainer(SimpleItem):
else: else:
current_ts = 0 current_ts = 0
self._finalize(current_ts) if self._inband_housekeeping:
self._housekeep(current_ts)
if self._roll(current_ts, 'replentish'): elif self._in_emergency_bucket_shortage(current_ts):
self._replentish(current_ts) # if our scheduler fails, dont allow the TOC to stop working
self._replentish(current_ts, force=True)
if self._roll(current_ts, 'gc'):
self._gc(current_ts)
STRICT and _assert(self._data.has_key(current_ts)) STRICT and _assert(self._data.has_key(current_ts))
current = self._getCurrentSlices(current_ts) current = self._getCurrentSlices(current_ts)
...@@ -374,8 +380,8 @@ class TransientObjectContainer(SimpleItem): ...@@ -374,8 +380,8 @@ class TransientObjectContainer(SimpleItem):
def keys(self): def keys(self):
return self._all().keys() return self._all().keys()
def rawkeys(self, current_ts): def raw(self, current_ts):
# for debugging # for debugging and unit testing
current = self._getCurrentSlices(current_ts) current = self._getCurrentSlices(current_ts)
current.reverse() # overwrite older with newer current.reverse() # overwrite older with newer
...@@ -425,15 +431,20 @@ class TransientObjectContainer(SimpleItem): ...@@ -425,15 +431,20 @@ class TransientObjectContainer(SimpleItem):
STRICT and _assert(self._data.has_key(current_ts)) STRICT and _assert(self._data.has_key(current_ts))
if item is _marker: if item is _marker:
# the key didnt already exist, this is a new item # the key didnt already exist, this is a new item
if self._limit and len(self) >= self._limit:
length = self._length() # XXX ReadConflictError hotspot
if self._limit and length >= self._limit:
LOG('Transience', WARNING, LOG('Transience', WARNING,
('Transient object container %s max subobjects ' ('Transient object container %s max subobjects '
'reached' % self.getId()) 'reached' % self.getId())
) )
raise MaxTransientObjectsExceeded, ( raise MaxTransientObjectsExceeded, (
"%s exceeds maximum number of subobjects %s" % "%s exceeds maximum number of subobjects %s" %
(len(self), self._limit)) (length, self._limit))
self._length.change(1)
self._length.increment(1)
DEBUG and TLOG('__setitem__: placing value for key %s in bucket %s' % DEBUG and TLOG('__setitem__: placing value for key %s in bucket %s' %
(k, current_ts)) (k, current_ts))
current_bucket = self._data[current_ts] current_bucket = self._data[current_ts]
...@@ -460,7 +471,11 @@ class TransientObjectContainer(SimpleItem): ...@@ -460,7 +471,11 @@ class TransientObjectContainer(SimpleItem):
if not issubclass(BUCKET_CLASS, Persistent): if not issubclass(BUCKET_CLASS, Persistent):
# tickle persistence machinery # tickle persistence machinery
self._data[current_ts] = bucket self._data[current_ts] = bucket
self._length.change(-1)
# XXX does increment(-1) make any sense here?
# rationale from dunny: we are removing an item rather than simply
# declaring it to be unused?
self._length.increment(-1)
return current_ts, item return current_ts, item
def __len__(self): def __len__(self):
...@@ -496,78 +511,45 @@ class TransientObjectContainer(SimpleItem): ...@@ -496,78 +511,45 @@ class TransientObjectContainer(SimpleItem):
DEBUG and TLOG('has_key: returning false from for %s' % k) DEBUG and TLOG('has_key: returning false from for %s' % k)
return False return False
def _roll(self, now, reason):
"""
Roll the dice to see if we're the lucky thread that does
bucket replentishment or gc. This method is guaranteed to return
true at some point as the difference between high and low naturally
diminishes to zero.
The reason we do the 'random' dance in the last part of this
is to minimize the chance that two threads will attempt to
do housekeeping at the same time (causing conflicts).
"""
low = now/self._period
high = self._max_timeslice()/self._period
if high <= low:
# we really need to win this roll because we have no
# spare buckets (and no valid values to provide to randrange), so
# we rig the toss.
DEBUG and TLOG('_roll: %s rigged toss' % reason)
return True
else:
# we're not in an emergency bucket shortage, so we can
# take our chances during the roll. It's unlikely that
# two threads will win the roll simultaneously, so we
# avoid a certain class of conflicts here.
if random.randrange(low, high) == low: # WINNAH!
DEBUG and TLOG("_roll: %s roll winner" % reason)
return True
DEBUG and TLOG("_roll: %s roll loser" % reason)
return False
def _get_max_expired_ts(self, now): def _get_max_expired_ts(self, now):
return now - (self._period * (self._timeout_slices + 1)) return now - (self._period * (self._timeout_slices + 1))
def _in_emergency_bucket_shortage(self, now):
max_ts = self._max_timeslice()
low = now/self._period
high = max_ts/self._period
required = high <= low
return required
def _finalize(self, now): def _finalize(self, now):
""" Call finalization handlers for the data in each stale bucket """
if not self._timeout_slices: if not self._timeout_slices:
DEBUG and TLOG('_finalize: doing nothing (no timeout)') DEBUG and TLOG('_finalize: doing nothing (no timeout)')
return # don't do any finalization if there is no timeout return # don't do any finalization if there is no timeout
# The nature of sessioning is that when the timeslice rolls # The nature of sessioning is that when the timeslice rolls
# over, all active threads will try to do a lot of work during # over, all active threads will try to do a lot of work during
# finalization, all but one unnecessarily. We really don't # finalization if inband housekeeping is enabled, all but one
# want more than one thread at a time to try to finalize # unnecessarily. We really don't want more than one thread at
# buckets at the same time so we try to lock. We give up if we # a time to try to finalize buckets at the same time so we try
# can't lock immediately because it doesn't matter if we skip # to lock. We give up if we can't lock immediately because it
# a couple of opportunities for finalization, as long as it # doesn't matter if we skip a couple of opportunities for
# gets done by some thread eventually. A similar pattern # finalization, as long as it gets done by some thread
# exists for _gc and _replentish. # eventually. A similar pattern exists for _gc and
# _replentish.
if not self.finalize_lock.acquire(0): if not self.finalize_lock.acquire(0):
DEBUG and TLOG('_finalize: couldnt acquire lock') DEBUG and TLOG('_finalize: could not acquire lock, returning')
return return
try: try:
DEBUG and TLOG('_finalize: lock acquired successfully') DEBUG and TLOG('_finalize: lock acquired successfully')
last_finalized = self._last_finalized_timeslice()
if now is None:
now = getCurrentTimeslice(self._period) # for unit tests
# we want to start finalizing from one timeslice after the # we want to start finalizing from one timeslice after the
# timeslice which we last finalized. Note that finalizing # timeslice which we last finalized.
# an already-finalized bucket somehow sends persistence
# into a spin with an exception later raised: start_finalize = last_finalized + self._period
# "SystemError: error return without exception set",
# typically coming from
# Products.Sessions.SessionDataManager, line 182, in
# _getSessionDataObject (if getattr(ob, '__of__', None)
# and getattr(ob, 'aq_parent', None)). According to this
# email message from Jim, it may be because the ob is
# ghosted and doesn't have a _p_jar somehow:
#http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
start_finalize = self._last_finalized_timeslice() + self._period
# we want to finalize only up to the maximum expired timeslice # we want to finalize only up to the maximum expired timeslice
max_ts = self._get_max_expired_ts(now) max_ts = self._get_max_expired_ts(now)
...@@ -577,124 +559,221 @@ class TransientObjectContainer(SimpleItem): ...@@ -577,124 +559,221 @@ class TransientObjectContainer(SimpleItem):
'_finalize: start_finalize (%s) >= max_ts (%s), ' '_finalize: start_finalize (%s) >= max_ts (%s), '
'doing nothing' % (start_finalize, max_ts)) 'doing nothing' % (start_finalize, max_ts))
return return
else:
DEBUG and TLOG('_finalize: now is %s' % now) DEBUG and TLOG(
DEBUG and TLOG('_finalize: max_ts is %s' % max_ts) '_finalize: start_finalize (%s) <= max_ts (%s), '
DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize) 'finalization possible' % (start_finalize, max_ts))
# we don't try to avoid conflicts here by doing a "random"
# dance (ala _replentish and _gc) because it's important that
# buckets are finalized as soon as possible after they've
# expired in order to call the delete notifier "on time".
self._do_finalize_work(now, max_ts, start_finalize)
finally:
self.finalize_lock.release()
to_finalize = list(self._data.keys(start_finalize, max_ts)) def _do_finalize_work(self, now, max_ts, start_finalize):
DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`) # this is only separated from _finalize for readability; it
# should generally not be called by anything but _finalize
DEBUG and TLOG('_do_finalize_work: entering')
DEBUG and TLOG('_do_finalize_work: now is %s' % now)
DEBUG and TLOG('_do_finalize_work: max_ts is %s' % max_ts)
DEBUG and TLOG('_do_finalize_work: start_finalize is %s' %
start_finalize)
delta = 0 to_finalize = list(self._data.keys(start_finalize, max_ts))
DEBUG and TLOG('_do_finalize_work: to_finalize is %s' % `to_finalize`)
for key in to_finalize: delta = 0
assert(start_finalize <= key <= max_ts) for key in to_finalize:
STRICT and _assert(self._data.has_key(key))
values = list(self._data[key].values())
DEBUG and TLOG('_finalize: values to notify from ts %s '
'are %s' % (key, `list(values)`))
delta += len(values) _assert(start_finalize <= key)
_assert(key <= max_ts)
STRICT and _assert(self._data.has_key(key))
values = list(self._data[key].values())
DEBUG and TLOG('_do_finalize_work: values to notify from ts %s '
'are %s' % (key, `list(values)`))
for v in values: delta += len(values)
self.notifyDel(v)
if delta: for v in values:
self._length.change(-delta) self.notifyDel(v)
DEBUG and TLOG('_finalize: setting _last_finalized_timeslice ' if delta:
'to max_ts of %s' % max_ts) self._length.decrement(delta)
self._last_finalized_timeslice.set(max_ts) DEBUG and TLOG('_do_finalize_work: setting _last_finalized_timeslice '
'to max_ts of %s' % max_ts)
finally: self._last_finalized_timeslice.set(max_ts)
self.finalize_lock.release()
def _invoke_finalize_and_gc(self):
# for unit testing purposes only!
last_finalized = self._last_finalized_timeslice()
now = getCurrentTimeslice(self._period) # for unit tests
start_finalize = last_finalized + self._period
max_ts = self._get_max_expired_ts(now)
self._do_finalize_work(now, max_ts, start_finalize)
self._do_gc_work(now)
def _replentish(self, now): def _replentish(self, now):
# available_spares == the number of "spare" buckets that exist in """ Add 'fresh' future or current buckets """
# "_data"
if not self._timeout_slices: if not self._timeout_slices:
return # do nothing if no timeout DEBUG and TLOG('_replentish: no timeout, doing nothing')
if not self.replentish_lock.acquire(0):
DEBUG and TLOG('_replentish: couldnt acquire lock')
return return
# the difference between high and low naturally diminishes to
# zero as now approaches self._max_timeslice() during normal
# operations. If high <= low, it means we have no current bucket,
# so we *really* need to replentish (having a current bucket is
# an invariant for continued operation).
try: required = self._in_emergency_bucket_shortage(now)
max_ts = self._max_timeslice() lock_acquired = self.replentish_lock.acquire(0)
available_spares = (max_ts-now) / self._period
DEBUG and TLOG('_replentish: now = %s' % now)
DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
DEBUG and TLOG('_replentish: available_spares = %s'
% available_spares)
if available_spares >= SPARE_BUCKETS:
DEBUG and TLOG('_replentish: available_spares (%s) >= '
'SPARE_BUCKETS (%s), doing '
'nothing'% (available_spares,
SPARE_BUCKETS))
return
if max_ts < now: try:
replentish_start = now if required:
replentish_end = now + (self._period * SPARE_BUCKETS) # we're in an emergency bucket shortage, we need to
# replentish regardless of whether we got the lock or
# not. (if we didn't get the lock, this transaction
# will likely result in a conflict error, that's ok)
if lock_acquired:
DEBUG and TLOG('_replentish: required, lock acquired)')
else:
DEBUG and TLOG('_replentish: required, lock NOT acquired)')
max_ts = self._max_timeslice()
self._do_replentish_work(now, max_ts)
elif lock_acquired:
# If replentish is optional, minimize the chance that
# two threads will attempt to do replentish work at
# the same time (which causes conflicts) by
# introducing a random element.
DEBUG and TLOG('_replentish: attempting optional replentish '
'(lock acquired)')
max_ts = self._max_timeslice()
low = now/self._period
high = max_ts/self._period
if roll(low, high, 'optional replentish'):
self._do_replentish_work(now, max_ts)
else: else:
replentish_start = max_ts + self._period # This is an optional replentish and we can't acquire
replentish_end = max_ts + (self._period * SPARE_BUCKETS) # the lock, bail.
DEBUG and TLOG('_optional replentish attempt aborted, could '
DEBUG and TLOG('_replentish: replentish_start = %s' % 'not acquire lock.')
replentish_start) return
DEBUG and TLOG('_replentish: replentish_end = %s'
% replentish_end)
# n is the number of buckets to create
n = (replentish_end - replentish_start) / self._period
new_buckets = getTimeslices(replentish_start, n, self._period)
new_buckets.reverse()
STRICT and _assert(new_buckets)
DEBUG and TLOG('_replentish: adding %s new buckets' % n)
DEBUG and TLOG('_replentish: buckets to add = %s'
% new_buckets)
for k in new_buckets:
STRICT and _assert(not self._data.has_key(k))
try:
self._data[k] = BUCKET_CLASS()
except ConflictError:
DEBUG and TLOG('_replentish: conflict when adding %s' % k)
time.sleep(random.uniform(0, 1)) # add entropy
raise
self._max_timeslice.set(max(new_buckets))
finally: finally:
self.replentish_lock.release() if lock_acquired:
self.replentish_lock.release()
def _do_replentish_work(self, now, max_ts):
DEBUG and TLOG('_do_replentish_work: entering')
# this is only separated from _replentish for readability; it
# should generally not be called by anything but _replentish
# available_spares == the number of "spare" buckets that exist
# in "_data"
available_spares = (max_ts - now) / self._period
DEBUG and TLOG('_do_replentish_work: now = %s' % now)
DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
DEBUG and TLOG('_do_replentish_work: available_spares = %s'
% available_spares)
if available_spares >= SPARE_BUCKETS:
DEBUG and TLOG('_do_replentish_work: available_spares (%s) >= '
'SPARE_BUCKETS (%s), doing '
'nothing'% (available_spares,
SPARE_BUCKETS))
return
if max_ts < now:
# the newest bucket in self._data is older than now!
replentish_start = now
replentish_end = now + (self._period * SPARE_BUCKETS)
else:
replentish_start = max_ts + self._period
replentish_end = max_ts + (self._period * (SPARE_BUCKETS +1))
DEBUG and TLOG('_do_replentish_work: replentish_start = %s' %
replentish_start)
DEBUG and TLOG('_do_replentish_work: replentish_end = %s'
% replentish_end)
# n is the number of buckets to create
n = (replentish_end - replentish_start) / self._period
new_buckets = getTimeslices(replentish_start, n, self._period)
new_buckets.reverse()
STRICT and _assert(new_buckets)
DEBUG and TLOG('_do_replentish_work: adding %s new buckets' % n)
DEBUG and TLOG('_do_replentish_work: buckets to add = %s'
% new_buckets)
for k in new_buckets:
STRICT and _assert(not self._data.has_key(k))
self._data[k] = BUCKET_CLASS() # XXX ReadConflictError hotspot
self._max_timeslice.set(max(new_buckets))
def _gc(self, now=None): def _gc(self, now=None):
""" Remove stale buckets """
if not self._timeout_slices: if not self._timeout_slices:
return # dont do gc if there is no timeout return # dont do gc if there is no timeout
# give callers a good chance to do nothing (gc isn't as important
# as replentishment or finalization)
if not roll(0, 5, 'gc'):
DEBUG and TLOG('_gc: lost roll, doing nothing')
return
if not self.gc_lock.acquire(0): if not self.gc_lock.acquire(0):
DEBUG and TLOG('_gc: couldnt acquire lock') DEBUG and TLOG('_gc: couldnt acquire lock')
return return
try: try:
if now is None: if now is None:
now = getCurrentTimeslice(self._period) # for unit tests now = getCurrentTimeslice(self._period) # for unit tests
# we want to garbage collect all buckets that have already been run last_gc = self._last_gc_timeslice()
# through finalization gc_every = self._period * round(SPARE_BUCKETS / 2.0)
max_ts = self._last_finalized_timeslice()
DEBUG and TLOG('_gc: now is %s' % now) if (now - last_gc) < gc_every:
DEBUG and TLOG('_gc: max_ts is %s' % max_ts) DEBUG and TLOG('_gc: gc attempt not yet required '
'( (%s - %s) < %s )' % (now, last_gc, gc_every))
return
else:
DEBUG and TLOG(
'_gc: (%s -%s) > %s, gc invoked' % (now, last_gc,
gc_every))
self._do_gc_work(now)
for key in list(self._data.keys(None, max_ts)):
assert(key <= max_ts)
STRICT and _assert(self._data.has_key(key))
DEBUG and TLOG('deleting %s from _data' % key)
del self._data[key]
finally: finally:
self.gc_lock.release() self.gc_lock.release()
def _do_gc_work(self, now):
# this is only separated from _gc for readability; it should
# generally not be called by anything but _gc
# we garbage collect any buckets that have already been run
# through finalization
DEBUG and TLOG('_do_gc_work: entering')
max_ts = self._last_finalized_timeslice()
DEBUG and TLOG('_do_gc_work: max_ts is %s' % max_ts)
to_gc = list(self._data.keys(None, max_ts))
DEBUG and TLOG('_do_gc_work: to_gc is: %s' % str(to_gc))
for key in to_gc:
_assert(key <= max_ts)
STRICT and _assert(self._data.has_key(key))
DEBUG and TLOG('_do_gc_work: deleting %s from _data' % key)
del self._data[key]
DEBUG and TLOG('_do_gc_work: setting last_gc_timeslice to %s' % now)
self._last_gc_timeslice.set(now)
def notifyAdd(self, item): def notifyAdd(self, item):
DEBUG and TLOG('notifyAdd with %s' % item) DEBUG and TLOG('notifyAdd with %s' % item)
callback = self._getCallback(self._addCallback) callback = self._getCallback(self._addCallback)
...@@ -830,12 +909,36 @@ class TransientObjectContainer(SimpleItem): ...@@ -830,12 +909,36 @@ class TransientObjectContainer(SimpleItem):
def setDelNotificationTarget(self, f): def setDelNotificationTarget(self, f):
self._delCallback = f self._delCallback = f
security.declareProtected(MGMT_SCREEN_PERM, 'nudge') security.declareProtected(MGMT_SCREEN_PERM, 'disableInbandHousekeeping')
def nudge(self): def disableInbandHousekeeping(self):
""" Used by mgmt interface to maybe do housekeeping each time """ No longer perform inband housekeeping """
a screen is shown """ self._inband_housekeeping = False
# run garbage collector so view is correct
self._gc() security.declareProtected(MGMT_SCREEN_PERM, 'enableInbandHousekeeping')
def enableInbandHousekeeping(self):
""" (Re)enable inband housekeeping """
self._inband_housekeeping = True
security.declareProtected(MGMT_SCREEN_PERM, 'isInbandHousekeepingEnabled')
def isInbandHousekeepingEnabled(self):
""" Report if inband housekeeping is enabled """
return self._inband_housekeeping
security.declareProtected('View', 'housekeep')
def housekeep(self):
""" Call this from a scheduler at least every
self._period * (SPARE_BUCKETS - 1) seconds to perform out of band
housekeeping """
# we can protect this method from being called too often by
# anonymous users as necessary in the future; we already have a lot
# of protection as-is though so no need to make it more complicated
# than necessary at the moment
self._housekeep(getCurrentTimeslice(self._period))
def _housekeep(self, now):
self._finalize(now)
self._replentish(now)
self._gc(now)
security.declareProtected(MANAGE_CONTAINER_PERM, security.declareProtected(MANAGE_CONTAINER_PERM,
'manage_changeTransientObjectContainer') 'manage_changeTransientObjectContainer')
...@@ -868,9 +971,17 @@ class TransientObjectContainer(SimpleItem): ...@@ -868,9 +971,17 @@ class TransientObjectContainer(SimpleItem):
# f/w compat: 2.8 cannot use __len__ as an instance variable # f/w compat: 2.8 cannot use __len__ as an instance variable
if not state.has_key('_length'): if not state.has_key('_length'):
length = state.get('__len__', Length()) length = state.get('__len__', Length2())
self._length = self.getLen = length self._length = self.getLen = length
oldlength = state['_length']
if isinstance(oldlength, BTreesLength):
# TOCS prior to 2.7.3 had a BTrees.Length.Length object as
# the TOC length object, replace it with our own Length2
# that does our conflict resolution correctly:
sz = oldlength()
self._length = self.getLen = Length2(sz)
# TOCs prior to 2.7.1 took their period from a global # TOCs prior to 2.7.1 took their period from a global
if not state.has_key('_period'): if not state.has_key('_period'):
self._period = 20 # this was the default for all prior releases self._period = 20 # this was the default for all prior releases
...@@ -891,6 +1002,10 @@ class TransientObjectContainer(SimpleItem): ...@@ -891,6 +1002,10 @@ class TransientObjectContainer(SimpleItem):
if not state.has_key('_last_finalized_timeslice'): if not state.has_key('_last_finalized_timeslice'):
self._last_finalized_timeslice = Increaser(-self._period) self._last_finalized_timeslice = Increaser(-self._period)
# TOCs prior to 2.7.3 didn't have a _last_gc_timeslice
if not state.has_key('_last_gc_timeslice'):
self._last_gc_timeslice = Increaser(-self._period)
# we should probably delete older attributes from state such as # we should probably delete older attributes from state such as
# '_last_timeslice', '_deindex_next',and '__len__' here but we leave # '_last_timeslice', '_deindex_next',and '__len__' here but we leave
# them in order to allow people to switch between 2.6.0->2.7.0 and # them in order to allow people to switch between 2.6.0->2.7.0 and
...@@ -919,6 +1034,22 @@ def getTimeslices(begin, n, period): ...@@ -919,6 +1034,22 @@ def getTimeslices(begin, n, period):
l.insert(0, begin + (x * period)) l.insert(0, begin + (x * period))
return l return l
def roll(low, high, reason):
try:
result = random.randrange(low, high)
except ValueError:
# empty range, must win this roll
result = low
if result == low:
DEBUG and TLOG('roll: low: %s, high: %s: won with %s (%s)' %
(low, high, result, reason))
return True
else:
DEBUG and TLOG('roll: low: %s, high: %s: lost with %s (%s)' %
(low, high, result, reason))
return False
def _assert(case): def _assert(case):
if not case: if not case:
raise AssertionError raise AssertionError
...@@ -926,8 +1057,8 @@ def _assert(case): ...@@ -926,8 +1057,8 @@ def _assert(case):
class Increaser(Persistent): class Increaser(Persistent):
""" """
A persistent object representing a typically increasing integer that A persistent object representing a typically increasing integer that
has conflict resolution uses the greatest integer out of the three has conflict resolution which uses the greatest integer out of the three
available states available states.
""" """
def __init__(self, v): def __init__(self, v):
self.value = v self.value = v
...@@ -947,7 +1078,51 @@ class Increaser(Persistent): ...@@ -947,7 +1078,51 @@ class Increaser(Persistent):
def _p_resolveConflict(self, old, state1, state2): def _p_resolveConflict(self, old, state1, state2):
return max(old, state1, state2) return max(old, state1, state2)
def _p_independent(self):
return 1 class Length2(Persistent):
"""
A persistent object responsible for maintaining a repesention of
the number of current transient objects.
Conflict resolution is sensitive to which methods are used to
change the length.
"""
def __init__(self, value=0):
self.set(value)
def set(self, value):
self.value = value
self.floor = 0
self.ceiling = value
def increment(self, delta):
"""Increase the length by delta.
Conflict resolution will take the sum of all the increments."""
self.ceiling += delta
self.value += delta
def decrement(self, delta):
"""Decrease the length by delta.
Conflict resolution will take the highest decrement."""
self.floor += delta
self.value -= delta
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)
def __call__(self):
return self.value
def _p_resolveConflict(self, old, saved, new):
new['ceiling'] = saved['ceiling'] + new['ceiling'] - old['ceiling']
new['floor'] = max(old['floor'], saved['floor'], new['floor'])
new['value'] = new['ceiling'] - new['floor']
return new
Globals.InitializeClass(TransientObjectContainer) Globals.InitializeClass(TransientObjectContainer)
...@@ -16,6 +16,8 @@ Simple ZODB-based transient object implementation. ...@@ -16,6 +16,8 @@ Simple ZODB-based transient object implementation.
$Id$ $Id$
""" """
__version__='$Revision: 1.9.68.5 $'[11:-2]
from Persistence import Persistent from Persistence import Persistent
from Acquisition import Implicit from Acquisition import Implicit
import time, random, sys, os import time, random, sys, os
...@@ -192,69 +194,59 @@ class TransientObject(Persistent, Implicit): ...@@ -192,69 +194,59 @@ class TransientObject(Persistent, Implicit):
# Other non interface code # Other non interface code
# #
def _p_independent(self):
# My state doesn't depend on or materially effect the state of
# other objects (eliminates read conflicts).
return 1
def _p_resolveConflict(self, saved, state1, state2): def _p_resolveConflict(self, saved, state1, state2):
DEBUG and TLOG('entering TO _p_rc') DEBUG and TLOG('entering TO _p_rc')
DEBUG and TLOG('states: sv: %s, s1: %s, s2: %s' % ( DEBUG and TLOG('states: sv: %s, s1: %s, s2: %s' % (
saved, state1, state2)) saved, state1, state2))
try: states = [saved, state1, state2]
states = [saved, state1, state2]
# We can clearly resolve the conflict if one state is invalid,
# We can clearly resolve the conflict if one state is invalid, # because it's a terminal state.
# because it's a terminal state. for state in states:
for state in states: if state.has_key('_invalid'):
if state.has_key('_invalid'): DEBUG and TLOG('TO _p_rc: a state was invalid')
DEBUG and TLOG('TO _p_rc: a state was invalid') return state
return state
# The only other times we can clearly resolve the conflict is if # The only other times we can clearly resolve the conflict is if
# the token, the id, or the creation time don't differ between # the token, the id, or the creation time don't differ between
# the three states, so we check that here. If any differ, we punt # the three states, so we check that here. If any differ, we punt
# by raising ConflictError. # by raising ConflictError.
attrs = ['token', 'id', '_created'] attrs = ['token', 'id', '_created']
for attr in attrs: for attr in attrs:
svattr = saved.get(attr) svattr = saved.get(attr)
s1attr = state1.get(attr) s1attr = state1.get(attr)
s2attr = state2.get(attr) s2attr = state2.get(attr)
DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' % DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
(attr, svattr, s1attr, s2attr)) (attr, svattr, s1attr, s2attr))
if not svattr==s1attr==s2attr: if not svattr==s1attr==s2attr:
DEBUG and TLOG('TO _p_rc: cant resolve conflict') DEBUG and TLOG('TO _p_rc: cant resolve conflict')
raise ConflictError raise ConflictError
# Now we need to do real work. # Now we need to do real work.
# #
# Data in our _container dictionaries might conflict. To make # Data in our _container dictionaries might conflict. To make
# things simple, we intentionally create a race condition where the # things simple, we intentionally create a race condition where the
# state which was last modified "wins". It would be preferable to # state which was last modified "wins". It would be preferable to
# somehow merge our _containers together, but as there's no # somehow merge our _containers together, but as there's no
# generally acceptable way to union their states, there's not much # generally acceptable way to union their states, there's not much
# we can do about it if we want to be able to resolve this kind of # we can do about it if we want to be able to resolve this kind of
# conflict. # conflict.
# We return the state which was most recently modified, if # We return the state which was most recently modified, if
# possible. # possible.
states.sort(lastmodified_sort) states.sort(lastmodified_sort)
if states[0].get('_last_modified'): if states[0].get('_last_modified'):
DEBUG and TLOG('TO _p_rc: returning last mod state') DEBUG and TLOG('TO _p_rc: returning last mod state')
return states[0]
# If we can't determine which object to return on the basis
# of last modification time (no state has been modified), we return
# the object that was most recently accessed (last pulled out of
# our parent). This will return an essentially arbitrary state if
# all last_accessed values are equal.
states.sort(lastaccessed_sort)
DEBUG and TLOG('TO _p_rc: returning last_accessed state')
return states[0] return states[0]
except ConflictError:
raise # If we can't determine which object to return on the basis
except: # of last modification time (no state has been modified), we return
LOG.info('Conflict resolution error in TransientObject', # the object that was most recently accessed (last pulled out of
exc_info=sys.exc_info()) # our parent). This will return an essentially arbitrary state if
# all last_accessed values are equal.
states.sort(lastaccessed_sort)
DEBUG and TLOG('TO _p_rc: returning last_accessed state')
return states[0]
getName = getId # this is for SQLSession compatibility getName = getId # this is for SQLSession compatibility
......
...@@ -13,7 +13,7 @@ Transient data will persist, but only for a user-specified period of time ...@@ -13,7 +13,7 @@ Transient data will persist, but only for a user-specified period of time
(the "data object timeout") after which it will be flushed. (the "data object timeout") after which it will be flushed.
</p> </p>
<dtml-call nudge><!-- turn the buckets if necessary --> <dtml-call housekeep><!-- turn the buckets if necessary -->
<p class="form-label"> <p class="form-label">
<font color="green"> <font color="green">
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
from unittest import TestCase, TestSuite, makeSuite
from ZODB.POSException import ConflictError
from ZODB.FileStorage import FileStorage
from ZODB.DB import DB
from Products.Transience.Transience import Length2, Increaser
class Base(TestCase):
db = None
def setUp(self):
pass
def tearDown(self):
if self.db is not None:
self.db.close()
self.storage.cleanup()
def openDB(self):
n = 'fs_tmp__%s' % os.getpid()
self.storage = FileStorage(n)
self.db = DB(self.storage)
class TestLength2(Base):
def testConflict(self):
# this test fails on the HEAD (MVCC?)
self.openDB()
length = Length2(0)
r1 = self.db.open().root()
r1['ob'] = length
get_transaction().commit()
r2 = self.db.open().root()
copy = r2['ob']
# The following ensures that copy is loaded.
self.assertEqual(copy(),0)
# First transaction.
length.increment(10)
length.decrement(1)
get_transaction().commit()
# Second transaction.
length = copy
length.increment(20)
length.decrement(2)
get_transaction().commit()
self.assertEqual(length(), 10+20-max(1,2))
class TestIncreaser(Base):
def testConflict(self):
self.openDB()
increaser = Increaser(0)
r1 = self.db.open().root()
r1['ob'] = increaser
get_transaction().commit()
r2 = self.db.open().root()
copy = r2['ob']
# The following ensures that copy is loaded.
self.assertEqual(copy(),0)
# First transaction.
increaser.set(10)
get_transaction().commit()
# Second transaction.
increaser = copy
increaser.set(20)
get_transaction().commit()
self.assertEqual(increaser(), 20)
def test_suite():
suite = TestSuite()
suite.addTest(makeSuite(TestLength2))
suite.addTest(makeSuite(TestIncreaser))
return suite
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import sys, os, time, random, unittest
if __name__ == "__main__":
sys.path.insert(0, '../../..')
import ZODB
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
from Products.Transience.TransactionHelper import PreventTransactionCommit, \
makeTransactionUncommittable
class TestTransactionHelper(TestCase):
def setUp(self):
self.t = get_transaction()
def tearDown(self):
self.t = None
def testUncommittable(self):
makeTransactionUncommittable(self.t, "test")
self.assertRaises(PreventTransactionCommit, get_transaction().commit)
def test_suite():
suite = makeSuite(TestTransactionHelper, 'test')
return suite
if __name__ == '__main__':
runner = TextTestRunner(verbosity=9)
runner.run(test_suite())
...@@ -17,7 +17,7 @@ if __name__ == "__main__": ...@@ -17,7 +17,7 @@ if __name__ == "__main__":
import ZODB import ZODB
from Products.Transience.Transience import TransientObjectContainer,\ from Products.Transience.Transience import TransientObjectContainer,\
MaxTransientObjectsExceeded MaxTransientObjectsExceeded, SPARE_BUCKETS, getCurrentTimeslice
from Products.Transience.TransientObject import TransientObject from Products.Transience.TransientObject import TransientObject
import Products.Transience.Transience import Products.Transience.Transience
import Products.Transience.TransientObject import Products.Transience.TransientObject
...@@ -380,6 +380,18 @@ class TestTransientObjectContainer(TestBase): ...@@ -380,6 +380,18 @@ class TestTransientObjectContainer(TestBase):
fauxtime.sleep(180) fauxtime.sleep(180)
self.assertEqual(len(self.t.keys()), 100) self.assertEqual(len(self.t.keys()), 100)
def testGarbageCollection(self):
# this is pretty implementation-dependent :-(
for x in range(0, 100):
self.t[x] = x
sleeptime = self.period * SPARE_BUCKETS
fauxtime.sleep(sleeptime)
self.t._invoke_finalize_and_gc()
max_ts = self.t._last_finalized_timeslice()
keys = list(self.t._data.keys())
for k in keys:
self.assert_(k > max_ts, "k %s < max_ts %s" % (k, max_ts))
def _maxOut(self): def _maxOut(self):
for x in range(11): for x in range(11):
self.t.new(str(x)) self.t.new(str(x))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment