Commit b63a4b46 authored by Chris McDonough's avatar Chris McDonough

Merge from 2.7 branch.

New "Transience" implementation.  This implementation offers no new features,
but is is vastly simpler, which makes it easier to maintain.  The older version
used all sorts of (questionable) tricks to attempt to avoid conflicts and to
improve performance, such as using Python Queue-module queues to store action 
lists, using an index to quickly look up which "bucket" a transient object
was stored within and several other persistent objects which attempted to keep
pointers into the data.  The older version also had a lot of "voodoo" code in
it which papered over problems that was apparenly caused by its complexity.
This code is now removed/replaced and the implementation is fairly straight-
forward.

The newer version is probably much slower (due to the lack of an index, it
needs to scan all "current" buckets to attempt to find a value), but it 
operates reliably under high load. 

This implementation removes backwards compatibility support for transient
object containers persisted via the Zope 2.5.X implementation.  It is possible
to use it against instances created in Zope 2.6.X and better, and it
is possible after using it against a database created under one of these
flavors to move back to an "older" Zope in this range, although it is likely
that data in the TOC will be silently lost when this is done.
parent efe70c9e
......@@ -43,21 +43,8 @@ Data Structures Maintained by a Transient Object Container
"current" bucket, which is the bucket that is contained within the
_data structured with a key equal to the "current" timeslice.
- An "index" which is an OOBTree mapping transient object "key" to
"timeslice", letting us quickly figure out which element in the _data
mapping contains the transient object related to the key. It is
stored as the attribute "_index" of the TOC. When calling code
wants to obtain a Transient Object, its key is looked up in the
index, which returns a timeslice. We ask the _data structure for the
bucket it has stored under that timeslice. Then the bucket is asked
for the object stored under the key. This returns the Transient Object.
- A "last timeslice" integer, which is equal to the "last" timeslice
under which TOC housekeeping operations were performed.
- A "next to deindex" integer, which is a timeslice
representing the next bucket which requires "deindexing"
(the removal of all the keys of the bucket from the index).
- A "max_timeslice" integer, which is equal to the "largest" timeslice
for which there exists a bucket in the _data structure.
When a Transient Object is created via new_or_existing, it is added
to the "current" bucket. As time goes by, the bucket to which the
......@@ -69,41 +56,35 @@ Data Structures Maintained by a Transient Object Container
During the course of normal operations, a TransientObject will move
from an "old" bucket to the "current" bucket many times, as long as
it continues to be accessed. It is possible for a TransientObject
to *never* expire, as long as it is called up out of its TOC often
to never expire, as long as it is called up out of its TOC often
enough.
If a TransientObject is not accessed in the period of time defined by
the TOC's "timeout", it is deindexed and eventually garbage collected.
the TOC's "timeout", it is eventually garbage collected.
How the TransientObjectContainer Determines if a TransientObject is "Current"
A TO is current if it has an entry in the "index". When a TO has an
entry in the index, it implies that the TO resides in a bucket that
is no "older" than the TOC timeout period, based on the bucket's
timeslice.
All "current" timeslice buckets (as specified by the timeout) are
searched for the transient object, most recent bucket first.
Housekeeping: Finalization, Notification, Garbage Collection, and
Bucket Replentishing
Housekeeping: Notification, Garbage Collection, and Bucket
Replentishing
The TOC performs "deindexing", "notification", "garbage
collection", and "bucket replentishing". It performs these tasks
"in-band". This means that the TOC does not maintain a separate
thread that wakes up every so often to do these housekeeping tasks.
Instead, during the course of normal operations, the TOC
opportunistically performs them.
Deindexing is defined as the act of making an "expired" TO
inaccessible (by deleting it from the "index"). After a TO is
deindexed, it may not be used by application code any longer,
although it may "stick around" in a bucket for a while until the
bucket is eventually garbage collected.
Notification is defined as optionally calling a function at TOC
finalization time. The optional function call is user-defined, but
it is managed by the "notifyDestruct" method of the TOC.
The TOC performs "notification", "garbage collection", and "bucket
replentishing". It performs these tasks "in-band". This means that
the TOC does not maintain a separate thread that wakes up every so
often to do these housekeeping tasks. Instead, during the course of
normal operations, the TOC opportunistically performs them.
Garbage collection is defined as deleting "expired" buckets in the
_data structure (the _data structure maps a timeslice to a bucket).
Typically this is done by throwing away one or more buckets in the
_data structure after they expire.
Notification is defined as optionally calling a function at TOC
finalization time against individual transient object contained
within a bucket. The optional function call is user-defined, but it
is managed by the "notifyDel" method of the TOC.
Bucket replentishing is defined as the action of (opportunistically)
creating more buckets to insert into the the _data structure,
......@@ -112,6 +93,9 @@ Bucket Replentishing
will be immediately created thereafter. We create new buckets in
batches to reduce the possibility of conflicts.
Housekeeping is performed on a somewhat random basis to avoid
unnecessary conflicts.
Goals
- A low number of ZODB conflict errors (which reduce performance).
......
......@@ -11,49 +11,42 @@
#
##############################################################################
"""
Transient Object Container Class ('timeslice'-based design).
Transient Object Container Class ('timeslice'-based design, no index).
$Id: Transience.py,v 1.35 2004/01/21 19:59:09 Brian Exp $
$Id: Transience.py,v 1.32.12.3 2004/05/14 22:52:12 chrism Exp $
"""
__version__='$Revision: 1.35 $'[11:-2]
__version__='$Revision: 1.32.12.3 $'[11:-2]
import math
import time
import random
import sys
import os
import thread
from cgi import escape
import Globals
from Globals import HTMLFile
from TransienceInterfaces import Transient, DictionaryLike, ItemWithId,\
TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
StringKeyedHomogeneousItemContainer, TransientItemContainer
from OFS.SimpleItem import SimpleItem
from BTrees.Length import Length
from BTrees.OOBTree import OOBTree
from BTrees.IOBTree import IOBTree
from ZODB.POSException import ConflictError
from Persistence import Persistent
from Acquisition import Implicit
from OFS.SimpleItem import SimpleItem
from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager, \
setSecurityManager
from AccessControl.User import nobody
from BTrees.OOBTree import OOBTree, OOBucket, OOSet
from BTrees.IOBTree import IOBTree
from BTrees.Length import Length
import os.path
import os
import math, sys, random
import time
import logging
from types import InstanceType
from TransientObject import TransientObject
import thread
import ThreadLock
import Queue
from cgi import escape
from zLOG import LOG, WARNING, BLATHER
_marker = []
DEBUG = os.environ.get('Z_TOC_DEBUG', '')
LOG = logging.getLogger('Zope.Transience')
class MaxTransientObjectsExceeded(Exception): pass
from TransientObject import TransientObject
MIN_SPARE_BUCKETS = 10 # minimum number of transient buckets to keep spare
PERIOD = 20 # attempt housekeeping every PERIOD seconds
ADD_CONTAINER_PERM = 'Add Transient Object Container'
MGMT_SCREEN_PERM = 'View management screens'
ACCESS_CONTENTS_PERM = 'Access contents information'
......@@ -61,8 +54,17 @@ CREATE_TRANSIENTS_PERM = 'Create Transient Objects'
ACCESS_TRANSIENTS_PERM = 'Access Transient Objects'
MANAGE_CONTAINER_PERM = 'Manage Transient Object Container'
constructTransientObjectContainerForm = HTMLFile(
'dtml/addTransientObjectContainer', globals())
PERIOD = 20 # signifies "resolution" of transience machinery
SPARE_BUCKETS = 15 # number of buckets to keep spare
STRICT = os.environ.get('Z_TOC_STRICT', '')
DEBUG = int(os.environ.get('Z_TOC_DEBUG', 0))
_marker = []
def setStrict(on=''):
""" Turn on assertions (which may cause conflicts) """
global STRICT
STRICT = on
def TLOG(*args):
sargs = []
......@@ -70,7 +72,11 @@ def TLOG(*args):
sargs.append(str(time.time()))
for arg in args:
sargs.append(str(arg))
LOG.debug(' '.join(sargs))
msg = ' '.join(sargs)
LOG('Transience', BLATHER, msg)
constructTransientObjectContainerForm = HTMLFile(
'dtml/addTransientObjectContainer', globals())
def constructTransientObjectContainer(self, id, title='', timeout_mins=20,
addNotification=None, delNotification=None, limit=0, REQUEST=None):
......@@ -81,6 +87,8 @@ def constructTransientObjectContainer(self, id, title='', timeout_mins=20,
if REQUEST is not None:
return self.manage_main(self, REQUEST, update_menu=1)
class MaxTransientObjectsExceeded(Exception): pass
class TransientObjectContainer(SimpleItem):
""" Object which contains items that are automatically flushed
after a period of inactivity """
......@@ -88,39 +96,6 @@ class TransientObjectContainer(SimpleItem):
meta_type = "Transient Object Container"
icon = "misc_/Transience/datacontainer.gif"
# chrism 6/20/2002
# I was forced to make this a mostly "synchronized" class, using
# a single ThreadLock instance ("lock" below). I realize this
# is paranoid and even a little sloppy. ;-)
#
# Rationale: in high-conflict situations without this lock, the
# index and the "data" (bucket) structure slowly get out of sync with
# one another. I'm only sure about one thing when it comes to this:
# I don't completely understand why. So, I'm not going to worry about
# it (indefinitely) as the locking solves it. "Slow and steady" is better
# than "fast and broken".
lock = ThreadLock.allocate_lock()
# notify_queue is a queue in which deindexed objects are placed
# for later processing by housekeeping, which calls the
# "delete notifier" at appropriate times. As threads pass through
# the housekeeping stage, they pull any unnotified objects from this
# queue and call the delete notifier. We use a queue here in order
# to not "double-notify" when two threads are doing housekeeping
# at the same time. Note that there may be a case where a conflict
# error is raised and the results of a delete notifier are not
# committed, but that is better than calling the delete notifier
# *again* on the retry.
notify_queue = Queue.Queue()
# replentish queue is a size-one queue. It is used as optimization
# to avoid conflicts. If you're running low on buckets, an entry is
# placed in the replentish queue. The next thread that does housekeeping
# to notice the entry will extend the buckets. Because queues are thread-
# safe, more than one thread will not attempt to replentish at the same
# time.
replentish_queue = Queue.Queue(1)
__implements__ = (ItemWithId,
StringKeyedHomogeneousItemContainer,
TransientItemContainer
......@@ -160,45 +135,438 @@ class TransientObjectContainer(SimpleItem):
delNotification=None, limit=0):
self.id = id
self.title=title
self._reset()
self._setTimeout(timeout_mins)
self._setLimit(limit)
self._addCallback = None
self._delCallback = None
self.setDelNotificationTarget(delNotification)
self.setAddNotificationTarget(addNotification)
self._reset()
# helpers
def _setTimeout(self, timeout_mins):
if type(timeout_mins) is not type(1):
raise TypeError, (escape(`timeout_mins`), "Must be integer")
self._timeout_secs = t_secs = timeout_mins * 60
# timeout_slices == fewest number of timeslices that's >= t_secs
self._timeout_slices=int(math.ceil(float(t_secs)/PERIOD))
def _setLimit(self, limit):
if type(limit) is not type(1):
raise TypeError, (escape(`limit`), "Must be integer")
self._limit = limit
def _reset(self):
""" Reset ourselves to a sane state (deletes all content) """
# _data contains a mapping of f-of-time(int) (aka "slice") to
# "bucket". Each bucket will contain a set of transient items.
# Transient items move automatically from bucket-to-bucket inside
# of the _data structure based on last access time (e.g.
# "get" calls), escaping destruction only if they move quickly
# enough.
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
# _gc() and _replentish()).
self._data = IOBTree()
# populate _data with some number of buckets, each of which
# is "current" for its timeslice key
if self._timeout_slices:
new_slices = getTimeslices(getCurrentTimeslice(), SPARE_BUCKETS*2)
for i in new_slices:
self._data[i] = OOBTree()
# create an Increaser for max timeslice
self._max_timeslice = Increaser(max(new_slices))
else:
self._data[0] = OOBTree() # sentinel value for non-expiring data
self._max_timeslice = Increaser(0)
# our "_length" is the length of _index.
# we need to maintain the length of the index structure separately
# because getting the length of a BTree is very expensive.
try: self._length.set(0)
except AttributeError: self._length = self.getLen = Length()
def _getCurrentSlices(self, now):
if self._timeout_slices:
begin = now+PERIOD - (PERIOD * self._timeout_slices)
num_slices = self._timeout_slices
else:
return [0] # sentinel for timeout value 0 (don't expire)
DEBUG and TLOG('_getCurrentSlices, begin = %s' % begin)
DEBUG and TLOG('_getCurrentSlices, num_slices = %s' % num_slices)
result = getTimeslices(begin, num_slices)
DEBUG and TLOG('_getCurrentSlices, result = %s' % result)
return result
def _move_item(self, k, current_ts, default=None):
if not getattr(self, '_max_timeslice', None):
# in-place upgrade for old instances; this would usually be
# "evil" but sessions are all about write-on-read anyway,
# so it really doesn't matter.
self._upgrade()
if self._timeout_slices:
if self._roll(current_ts, 'replentish'):
self._replentish(current_ts)
if self._roll(current_ts, 'gc'):
self._gc(current_ts)
STRICT and _assert(self._data.has_key(current_ts))
current = self._getCurrentSlices(current_ts)
found_ts = None
for ts in current:
bucket = self._data.get(ts)
# dont use hasattr here (it hides conflict errors)
if getattr(bucket, 'has_key', None) and bucket.has_key(k):
found_ts = ts
break
if found_ts is None:
return default
bucket = self._data[found_ts]
item = bucket[k]
if current_ts != found_ts:
del bucket[k]
self._data[current_ts][k] = item
else:
# special case for no timeout value
bucket = self._data.get(0)
item = bucket.get(k, default)
# dont use hasattr here (it hides conflict errors)
if getattr(item, 'setLastAccessed', None):
item.setLastAccessed()
return item
def _all(self):
if not getattr(self, '_max_timeslice', None):
# in-place upgrade for old instances
self._upgrade()
if self._timeout_slices:
current_ts = getCurrentTimeslice()
else:
current_ts = 0
if self._roll(current_ts, 'replentish'):
self._replentish(current_ts)
if self._roll(current_ts, 'gc'):
self._gc(current_ts)
STRICT and _assert(self._data.has_key(current_ts))
current = self._getCurrentSlices(current_ts)
current.reverse() # overwrite older with newer
d = {}
for ts in current:
bucket = self._data.get(ts)
if bucket is None:
continue
for k,v in bucket.items():
d[k] = self._wrap(v)
return d
def keys(self):
return self._all().keys()
def rawkeys(self, current_ts):
# for debugging
current = self._getCurrentSlices(current_ts)
current.reverse() # overwrite older with newer
d = {}
for ts in current:
bucket = self._data.get(ts, None)
if bucket is None:
continue
for k,v in bucket.items():
d[k] = self._wrap(v)
return d
def items(self):
return self._all().items()
def values(self):
return self._all().values()
def _wrap(self, item):
# dont use hasattr here (it hides conflict errors)
if getattr(item, '__of__', None):
item = item.__of__(self)
return item
def __getitem__(self, k):
if self._timeout_slices:
current_ts = getCurrentTimeslice()
else:
current_ts = 0
item = self._move_item(k, current_ts, _marker)
STRICT and _assert(self._data.has_key(current_ts))
if item is _marker:
raise KeyError, k
return self._wrap(item)
def __setitem__(self, k, v):
if self._timeout_slices:
current_ts = getCurrentTimeslice()
else:
current_ts = 0
item = self._move_item(k, current_ts, _marker)
STRICT and _assert(self._data.has_key(current_ts))
if item is _marker:
# the key didnt already exist, this is a new item
if self._limit and len(self) >= self._limit:
LOG('Transience', WARNING,
('Transient object container %s max subobjects '
'reached' % self.getId())
)
raise MaxTransientObjectsExceeded, (
"%s exceeds maximum number of subobjects %s" %
(len(self), self._limit))
self._length.change(1)
current_bucket = self._data[current_ts]
current_bucket[k] = v
self.notifyAdd(v)
# change the TO's last accessed time
# dont use hasattr here (it hides conflict errors)
if getattr(v, 'setLastAccessed', None):
v.setLastAccessed()
def __delitem__(self, k):
if self._timeout_slices:
current_ts = getCurrentTimeslice()
else:
current_ts = 0
item = self._move_item(k, current_ts)
STRICT and _assert(self._data.has_key(current_ts))
del self._data[current_ts][k]
self._length.change(-1)
return current_ts, item
def __len__(self):
return self._length()
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'get')
def get(self, k, default=None):
if self._timeout_slices:
current_ts = getCurrentTimeslice()
else:
current_ts = 0
item = self._move_item(k, current_ts, _marker)
STRICT and _assert(self._data.has_key(current_ts))
if item is _marker:
return default
return self._wrap(item)
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'has_key')
def has_key(self, k):
if self._timeout_slices:
current_ts = getCurrentTimeslice()
else:
current_ts = 0
item = self._move_item(k, current_ts, _marker)
STRICT and _assert(self._data.has_key(current_ts))
if item is not _marker:
return True
return False
def _roll(self, now, reason):
"""
Roll the dice to see if we're the lucky thread that does
bucket replentishment or gc. This method is guaranteed to return
true at some point as the difference between high and low naturally
diminishes to zero.
The reason we do the 'random' dance in the last part of this
is to minimize the chance that two threads will attempt to
do housekeeping at the same time (causing conflicts).
"""
low = now/PERIOD
high = self._max_timeslice()/PERIOD
if high <= low:
# we really need to win this roll because we have no
# spare buckets (and no valid values to provide to randrange), so
# we rig the toss.
DEBUG and TLOG('_roll: %s rigged toss' % reason)
return True
else:
# we're not in an emergency bucket shortage, so we can take
# our chances during the roll. It's highly unlikely that two
# threads will win the roll simultaneously, so we avoid a certain
# class of conflicts here.
if random.randrange(low, high) == low: # WINNAH!
DEBUG and TLOG("_roll: %s roll winner" % reason)
return True
DEBUG and TLOG("_roll: %s roll loser" % reason)
return False
def _replentish(self, now):
# available_spares == the number of "spare" buckets that exist in
# "_data"
if not self._timeout_slices:
return # do nothing if no timeout
max_ts = self._max_timeslice()
available_spares = (max_ts-now) / PERIOD
DEBUG and TLOG('_replentish: now = %s' % now)
DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
DEBUG and TLOG('_replentish: available_spares = %s'
% available_spares)
if available_spares < SPARE_BUCKETS:
if max_ts < now:
replentish_start = now
replentish_end = now + (PERIOD * SPARE_BUCKETS)
else:
replentish_start = max_ts + PERIOD
replentish_end = max_ts + (PERIOD * SPARE_BUCKETS)
DEBUG and TLOG('_replentish: replentish_start = %s' %
replentish_start)
DEBUG and TLOG('_replentish: replentish_end = %s'
% replentish_end)
# n is the number of buckets to create
n = (replentish_end - replentish_start) / PERIOD
new_buckets = getTimeslices(replentish_start, n)
new_buckets.reverse()
STRICT and _assert(new_buckets)
DEBUG and TLOG('_replentish: adding %s new buckets' % n)
DEBUG and TLOG('_replentish: buckets to add = %s'
% new_buckets)
for k in new_buckets:
STRICT and _assert(not self._data.has_key(k))
try:
self._data[k] = OOBTree()
except ConflictError:
DEBUG and TLOG('_replentish: conflict when adding %s' % k)
time.sleep(random.choice([0.1, 0.2, 0.3])) # add entropy
raise
self._max_timeslice.set(max(new_buckets))
def _gc(self, now=None):
if not self._timeout_slices:
return # dont do gc if there is no timeout
if now is None:
now = getCurrentTimeslice() # for unit tests
max_ts = now - (PERIOD * (self._timeout_slices + 1))
to_notify = []
for key in list(self._data.keys(None, max_ts)):
assert(key <= max_ts)
STRICT and _assert(self._data.has_key(key))
for v in self._data[key].values():
to_notify.append(v)
self._length.change(-1)
del self._data[key]
for v in to_notify:
self.notifyDel(v)
def notifyAdd(self, item):
DEBUG and TLOG('notifyAdd with %s' % item)
callback = self._getCallback(self._addCallback)
if callback is None:
return
self._notify(item, callback, 'notifyAdd')
def notifyDel(self, item):
DEBUG and TLOG('notifyDel with %s' % item)
callback = self._getCallback(self._delCallback)
if callback is None:
return
self._notify(item, callback, 'notifyDel' )
def _getCallback(self, callback):
if not callback:
return None
if type(callback) is type(''):
try:
method = self.unrestrictedTraverse(callback)
except (KeyError, AttributeError):
path = self.getPhysicalPath()
err = 'No such method %s in %s %s'
LOG('Transience',
WARNING,
err % (callback, '/'.join(path), name),
error=sys.exc_info()
)
return
else:
method = callback
return method
def _notify(self, item, callback, name):
if callable(callback):
sm = getSecurityManager()
try:
user = sm.getUser()
try:
newSecurityManager(None, nobody)
callback(item, self)
except:
# dont raise, just log
path = self.getPhysicalPath()
LOG('Transience',
WARNING,
'%s failed when calling %s in %s' % (name,callback,
'/'.join(path)),
error=sys.exc_info()
)
finally:
setSecurityManager(sm)
else:
err = '%s in %s attempted to call non-callable %s'
path = self.getPhysicalPath()
LOG('Transience',
WARNING,
err % (name, '/'.join(path), callback),
error=sys.exc_info()
)
def getId(self):
return self.id
security.declareProtected(CREATE_TRANSIENTS_PERM, 'new_or_existing')
def new_or_existing(self, key):
self.lock.acquire()
try:
DEBUG and TLOG('new_or_existing called with %s' % key)
notfound = []
item = self.get(key, notfound)
if item is notfound:
# intentionally dont call "new" here in order to avoid another
# call to "get"
item = TransientObject(key)
self[key] = item
self.notifyAdd(item)
return item.__of__(self)
finally:
self.lock.release()
DEBUG and TLOG('new_or_existing called with %s' % key)
item = self.get(key, _marker)
if item is _marker:
item = TransientObject(key)
self[key] = item
item = item.__of__(self)
return item
security.declareProtected(CREATE_TRANSIENTS_PERM, 'new')
def new(self, key):
self.lock.acquire()
try:
if type(key) is not type(''):
raise TypeError, (key, "key is not a string type")
if self.has_key(key):
raise KeyError, "cannot duplicate key %s" % key
item = TransientObject(key)
self[key] = item
self.notifyAdd(item)
return item.__of__(self)
finally:
self.lock.release()
DEBUG and TLOG('new called with %s' % key)
if type(key) is not type(''):
raise TypeError, (key, "key is not a string type")
if self.has_key(key):
raise KeyError, "cannot duplicate key %s" % key
item = TransientObject(key)
self[key] = item
return item.__of__(self)
# TransientItemContainer methods
security.declareProtected(MANAGE_CONTAINER_PERM, 'setTimeoutMinutes')
def setTimeoutMinutes(self, timeout_mins):
......@@ -207,7 +575,6 @@ class TransientObjectContainer(SimpleItem):
self._setTimeout(timeout_mins)
self._reset()
security.declareProtected(MGMT_SCREEN_PERM, 'getTimeoutMinutes')
def getTimeoutMinutes(self):
""" """
return self._timeout_secs / 60
......@@ -229,8 +596,6 @@ class TransientObjectContainer(SimpleItem):
security.declareProtected(MANAGE_CONTAINER_PERM,'setAddNotificationTarget')
def setAddNotificationTarget(self, f):
# We should assert that the callback function 'f' implements
# the TransientNotification interface
self._addCallback = f
security.declareProtected(MGMT_SCREEN_PERM, 'getDelNotificationTarget')
......@@ -239,65 +604,14 @@ class TransientObjectContainer(SimpleItem):
security.declareProtected(MANAGE_CONTAINER_PERM,'setDelNotificationTarget')
def setDelNotificationTarget(self, f):
# We should assert that the callback function 'f' implements
# the TransientNotification interface
self._delCallback = f
def notifyAdd(self, item):
if self._addCallback:
self._notify(item, 'add')
def notifyDestruct(self, item):
if self._delCallback:
self._notify(item, 'destruct')
def _notify(self, items, kind):
if not type(items) in [type([]), type(())]:
items = [items]
if kind =='add':
name = 'notifyAdd'
callback = self._addCallback
else:
name = 'notifyDestruct'
callback = self._delCallback
if type(callback) is type(''):
try:
method = self.unrestrictedTraverse(callback)
except (KeyError, AttributeError):
path = self.getPhysicalPath()
err = 'No such method %s in %s %s'
LOG.warn(err % (callback, '/'.join(path), name),
exc_info=sys.exc_info()
)
return
else:
method = callback
for item in items:
if callable(method):
sm = getSecurityManager()
try:
user = sm.getUser()
try:
newSecurityManager(None, nobody)
method(item, self)
except:
# dont raise, just log
path = self.getPhysicalPath()
LOG.warn(
'%s failed when calling %s in %s' % (name,callback,
'/'.join(path)),
exc_info=sys.exc_info()
)
finally:
setSecurityManager(sm)
else:
err = '%s in %s attempted to call non-callable %s'
path = self.getPhysicalPath()
LOG.warn(err % (name, '/'.join(path), callback),
exc_info=sys.exc_info()
)
security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
def nudge(self):
""" Used by mgmt interface to maybe do housekeeping each time
a screen is shown """
# run garbage collector so view is correct
self._gc()
security.declareProtected(MANAGE_CONTAINER_PERM,
'manage_changeTransientObjectContainer')
......@@ -320,593 +634,55 @@ class TransientObjectContainer(SimpleItem):
return self.manage_container(
self, REQUEST, manage_tabs_message='Changes saved.'
)
def _setTimeout(self, timeout_mins):
if type(timeout_mins) is not type(1):
raise TypeError, (escape(`timeout_mins`), "Must be integer")
self._timeout_secs = t_secs = timeout_mins * 60
# timeout_slices == fewest number of timeslices that's >= t_secs
self._timeout_slices=int(math.ceil(float(t_secs)/self._period))
def _setLimit(self, limit):
if type(limit) is not type(1):
raise TypeError, (escape(`limit`), "Must be integer")
self._limit = limit
security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
def nudge(self):
""" Used by mgmt interface to maybe turn the ring each time
a screen is shown """
self._getCurrentBucket()
def _getCurrentTimeslice(self):
"""
Return an integer representing the 'current' timeslice.
The current timeslice is guaranteed to be the same integer
within a 'slice' of time based on a divisor of 'period'.
'period' is the number of seconds in a slice.
"""
period = self._period
now = time.time()
low = int(math.floor(now)) - period + 1
high = int(math.ceil(now)) + 1
for x in range(low, high):
if x % period == 0:
return x
def _getTimeslices(self, begin, n):
""" Get a list of future timeslice integers of 'n' size """
l = []
for x in range(n):
l.append(begin + (x * self._period))
return l
def _getIndex(self):
""" returns the index, a mapping of TOC key to bucket """
self.lock.acquire()
try:
if self._data is None:
# do in-place upgrade of old instances
self._upgrade()
return self._index
finally:
self.lock.release()
def _upgrade(self):
""" upgrade older ring-based (2.5.X) TOC instances """
self.lock.acquire()
try:
self._reset()
timeout_mins = self._timeout_secs / 60
self._setTimeout(timeout_mins)
# iterate over all the buckets in the ring
for bucket, dump_after in self._ring._data:
# get all TOs in the ring and call our __setitem__
for k, v in bucket.items():
self[k] = v
# we probably should delete the old "_ring" attribute here,
# but leave it around in case folks switch back to 2.5.X
finally:
self.lock.release()
def _reset(self):
""" Reset ourselves to a sane state (deletes all content) """
self.lock.acquire()
try:
# set the period (the timeslice length)
self._period = PERIOD
# set the number of minimum spare buckets
self._min_spare_buckets = MIN_SPARE_BUCKETS
# _data contains a mapping of f-of-time(int) (aka "slice") to
# "bucket". Each bucket will contain a set of transient items.
# Transient items move automatically from bucket-to-bucket inside
# of the _data structure based on last access time (e.g.
# "get" calls), escaping destruction only if they move quickly
# enough.
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
# _housekeep()).
self._data = IOBTree()
# populate _data with some number of buckets, each of which
# is "current" for its timeslice key
for i in self._getTimeslices(self._getCurrentTimeslice(),
self._min_spare_buckets*2):
self._data[i] = OOBTree()
# _index is a mapping of transient item key -> slice, letting
# us quickly figure out which bucket in the _data mapping
# contains the transient object related to the key
self._index = OOBTree()
# our "__len__" is the length of _index.
# we need to maintain the length of the index structure separately
# because getting the length of a BTree is very expensive.
# Note that it is a mistake to use the __len__ attr this way,
# because length methods are cached in C slots and out instance
# attr won't be used for len(foo) in new-style classes.
# See the __len__ method below. I (Jim) am not changing this now
# on account of ols instances. With some effort, we could fix this,
# bit I'm not up for it now.
try:
self.__len__.set(0)
except AttributeError:
self.__len__ = Length()
# set up last_timeslice and deindex_next integer pointers
# we set them to the current timeslice as it's the only sane
# thing to do
self._last_timeslice=Increaser(self._getCurrentTimeslice())
self._deindex_next=Increaser(self._getCurrentTimeslice())
finally:
self.lock.release()
def __len__(self):
return self.__dict__['__len__']()
def getLen(self):
return self.__len__
def _getCurrentBucket(self):
"""
Do housekeeping if necessary, then return the 'current' bucket.
"""
self.lock.acquire()
try:
# do in-place upgrade of old "ring-based" instances if
# we've just upgraded from Zope 2.5.X
if self._data is None:
self._upgrade()
# data is the mapping from timeslice to bucket
data = self._data
# period == number of seconds in a slice
period = self._period
# pnow == the current timeslice
pnow = self._getCurrentTimeslice()
# pprev = the true previous timeslice in relation to pnow
pprev = pnow - period
# plast == the last timeslice under which we did housekeeping
plast = self._last_timeslice()
if not data.has_key(pnow):
# we were asleep a little too long, we don't even have a
# current bucket; we create one for ourselves.
# XXX - currently this ignores going back in time.
DEBUG and TLOG('_getCurrentBucket: creating current bucket!')
data[pnow] = OOBTree()
if pnow <= plast:
# If we went "back in time" or if the timeslice hasn't
# changed, dont try to do housekeeping.
# Instead, just return the current bucket.
return pnow
# the current timeslice has changed since the last time we did
# housekeeping, so we're going to see if we need to finalize
# anything.
DEBUG and TLOG('_getCurrentBucket: new timeslice (pnow) %s' % pnow)
# pmax == the last timeslice integer kept by _data as a key.
pmax = data.maxKey()
# t_slices == this TOC's timeout expressed in slices
# (fewest number of timeslices that's >= t_secs)
t_slices = self._timeout_slices
# deindex_next == the timeslice of the bucket we need to start
# deindexing from
deindex_next = self._deindex_next()
# The ordered set implied by data.keys(deindex_next, pprev) is
# a set of all timeslices that may have entries in the index which
# are known about by _data, starting from "deindex_next" up to
# but not including the current timeslice. We iterate over
# these keys, deindexing buckets as necessary when they're older
# than the timeout.
# XXX - fixme! range search doesn't always work (btrees bug)
for k in list(data.keys(deindex_next, pprev)):
if k < deindex_next:
DEBUG and TLOG(
'broken range search: key %s < min %s'
% (k, deindex_next)
)
continue
if k > pprev:
DEBUG and TLOG(
'broken range search: key %s > max %s'
% (k, pprev)
)
continue
# pthen == the number of seconds elapsed since the timeslice
# implied by k
pthen = pnow - k
# slices_since == the number of slices elapsed since the
# timeslice implied by k
slices_since = pthen / self._period
# if the number of slices since 'k' is less than the number of
# slices that make up the timeout, break out of this loop.
# (remember, this is an ordered set, and following keys are
# bound to be higher, meaning subsequent tests will also fail,
# so we don't even bother checking them)
if slices_since < t_slices:
DEBUG and TLOG(
'_getCurrentBucket: slices_since (%s)<t_slices (%s)' %
(slices_since, t_slices))
break
# if the bucket has keys, deindex them and add them to the
# notify queue (destruction notification happens during
# garbage collection)
bucket = data.get(k, _marker)
if bucket is _marker:
DEBUG and TLOG(
'data IOBTree lied about keys: %s doesnt exist' % k
)
continue
keys = list(bucket.keys())
for key in keys:
ob = bucket.get(key, _marker)
if ob is _marker:
DEBUG and TLOG(
'bucket OOBTree lied about keys: %s doesnt exist' %
key
)
continue
self.notify_queue.put((key, ob))
DEBUG and TLOG(
'_getCurrentBucket: deindexing keys %s' % keys
)
keys and self._deindex(keys)
# set the "last deindexed" pointer to k + period
deindex_next = k+period
self._deindex_next.set(deindex_next)
# housekeep_elected indicates that this thread was elected to do
# housekeeping. We set it off initially and only set it true if
# we "win the roll". The "roll" is necessary to avoid a conflict
# scenario where more than one thread tries to do housekeeping at
# the same time.
housekeep_elected = 0
# We ask this thread to "roll the dice." If it wins, it gets
# elected to do housekeeping
housekeep_elected = self._roll(pnow, pmax)
housekeep_elected and DEBUG and TLOG('housekeep elected')
# if we were elected to do housekeeping, do it now.
if housekeep_elected:
# available_spares == the number of "spare" ("clean", "future")
# buckets that exist in "_data"
available_spares = (pmax-pnow) / period
DEBUG and TLOG(
'_getCurrentBucket: available_spares %s' % available_spares
)
# delete_end == the last bucket we want to destroy
delete_end = deindex_next - period
# min_spares == minimum number of spare buckets acceptable
# by this TOC
min_spares = self._min_spare_buckets
if available_spares < min_spares:
DEBUG and TLOG(
'_getCurrentBucket: available_spares < min_spares'
)
# the first bucket we want to begin creating
replentish_start = pmax + period
try:
self.replentish_queue.put_nowait(replentish_start)
except Queue.Full:
DEBUG and TLOG(
'_getCurrentBucket: replentish queue full'
)
self._housekeep(delete_end)
# finally, bump the last_timeslice housekeeping counter and return
# the current bucket
self._last_timeslice.set(pnow)
return pnow
finally:
self.lock.release()
def _roll(self, pnow, pmax):
"""
Roll the dice to see if we're the lucky thread that does
housekeeping. This method is guaranteed to return true at
some point as the difference between pnow and pmax naturally
diminishes to zero.
The reason we do the 'random' dance in the last part of this
is to minimize the chance that two threads will attempt to
do housekeeping at the same time (causing conflicts and double-
notifications).
"""
period = self._period
low = pnow/period
high = pmax/period
if high <= low:
# we really need to win this roll because we have no
# spare buckets (and no valid values to provide to randrange), so
# we rig the toss.
DEBUG and TLOG("_roll: rigged toss")
return 1
else:
# we're not in an emergency bucket shortage, so we can take
# our chances during the roll. It's highly unlikely that two
# threads will win the roll simultaneously, so we avoid a certain
# class of conflicts here.
if random.randrange(low, high) == low: # WINNAH!
DEBUG and TLOG("_roll: roll winner")
return 1
DEBUG and TLOG("_roll: roll loser")
return 0
def _housekeep(self, delete_end):
""" do garbage collection, bucket replentishing and notification """
data = self._data
period = self._period
min_spares = self._min_spare_buckets
DEBUG and TLOG(
'_housekeep: current slice %s' % self._getCurrentTimeslice()
)
notify = {}
while 1:
try:
k, v = self.notify_queue.get_nowait()
# duplicates will be ignored
notify[k] = v
except Queue.Empty:
break
to_notify = notify.values()
# if we have transient objects to notify about destruction, notify
# them (only once, that's why we use a queue) ("notification")
if to_notify:
DEBUG and TLOG('_housekeep: notifying: %s' % notify.keys())
self.notifyDestruct(to_notify)
# extend _data with extra buckets if necessary ("bucket replentishing")
try:
replentish_start = self.replentish_queue.get_nowait()
DEBUG and TLOG('_housekeep: replentishing')
new_bucket_keys=self._getTimeslices(replentish_start, min_spares)
DEBUG and TLOG('_housekeep: new_bucket_keys = %s '%new_bucket_keys)
for i in new_bucket_keys:
if data.has_key(i):
continue
data[i] = OOBTree()
except Queue.Empty:
DEBUG and TLOG('replentish queue empty')
# gc the stale buckets at the "beginning" of _data ("garbage collect")
# iterate over the keys in data that have no minimum value and
# a maximum value of delete_end (note: ordered set)
# XXX- fixme. range search doesn't always work (btrees bug)
for k in list(data.keys(None, delete_end)):
if k > delete_end:
DEBUG and TLOG(
'_housekeep: broken range search (key %s > max %s)'
% (k, delete_end)
)
continue
bucket = data.get(k, _marker)
if bucket is _marker:
DEBUG and TLOG(
'bucket OOBTree lied about keys: %s doesnt exist' % k
)
continue
# delete the bucket from _data
del data[k]
DEBUG and TLOG('_housekeep: deleted data[%s]' % k)
def _deindex(self, keys):
""" Iterate over 'keys' and remove any that match from our index """
self.lock.acquire()
try:
index = self._getIndex()
for k in keys:
if index.has_key(k):
DEBUG and TLOG('_deindex: deleting %s' % k)
self.__len__.change(-1)
del index[k]
finally:
self.lock.release()
def __setitem__(self, k, v):
self.lock.acquire()
try:
notfound = []
current = self._getCurrentBucket()
index = self._getIndex()
b = index.get(k, notfound)
if b is notfound:
# if this is a new item, we do OOM protection before actually
# adding it to ourselves.
li = self._limit
if li and len(self) >= li:
LOG('Transience', WARNING,
('Transient object container %s max subobjects '
'reached' % self.id)
)
raise MaxTransientObjectsExceeded, (
"%s exceeds maximum number of subobjects %s" %
(len(self), li))
# do length accounting
try: self.__len__.change(1)
except AttributeError: pass
elif b != current:
# this is an old key that isn't in the current bucket.
if self._data[b].has_key(k):
del self._data[b][k] # delete it from the old bucket
# change the value
DEBUG and TLOG('setitem: setting current[%s]=%s' % (k,v))
self._data[current][k] = v
# change the TO's last accessed time
if hasattr(v, 'setLastAccessed'):
v.setLastAccessed()
# set the index up with the current bucket for this key
index[k] = current
finally:
self.lock.release()
def __getitem__(self, k):
self.lock.acquire()
try:
# we dont want to call getCurrentBucket here because we need to
# be able to raise a KeyError. The housekeeping steps
# performed in the getCurrentBucket method would be ignored
# if we raised a KeyError.
index = self._getIndex()
# the next line will raise the proper error if the item has expired
b = index[k]
v = self._data[b][k]
if hasattr(v, '__of__'):
return v.__of__(self)
else:
return v
finally:
self.lock.release()
def __delitem__(self, k):
self.lock.acquire()
try:
self._getCurrentBucket()
index = self._getIndex()
b = index[k]
v = self._data[b][k]
del self._data[b][k]
self.__len__.change(-1)
if hasattr(v, '__of__'):
v = v.__of__(self)
del index[k]
finally:
self.lock.release()
self.notifyDestruct(v)
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'get')
def get(self, k, default=_marker):
self.lock.acquire()
try:
DEBUG and TLOG('get: called with k=%s' % k)
notfound = []
current = self._getCurrentBucket()
DEBUG and TLOG('get: current is %s' % current)
if default is _marker: default=None
index = self._getIndex()
b = index.get(k, notfound)
if b is notfound:
# it's not here, this is a genuine miss
DEBUG and TLOG('bucket was notfound for %s' %k)
return default
else:
v = self._data[b].get(k, notfound)
if v is notfound:
DEBUG and TLOG(
'get: %s was not found in index bucket (%s)' % (k, b))
return default
elif b != current:
DEBUG and TLOG('get: b was not current, it was %s' %b)
# we accessed the object, so it becomes current
# by moving it to the current bucket
del self._data[b][k] # delete the item from the old bucket.
self._data[current][k] = v # add the value to the current
self._setLastAccessed(v)
index[k] = current # change the index to the current buck.
if hasattr(v, '__of__'):
v = v.__of__(self)
return v
finally:
self.lock.release()
def _setLastAccessed(self, transientObject):
self.lock.acquire()
try:
sla = getattr(transientObject, 'setLastAccessed', None)
if sla is not None: sla()
finally:
self.lock.release()
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'has_key')
def has_key(self, k):
notfound = []
v = self.get(k, notfound)
if v is notfound: return 0
return 1
def values(self):
# sloppy and loving it!
# we used to use something like:
# [ self[x] for x in self.keys() ]
# but it was causing KeyErrors in getitem's "v = self._data[b][k]"
# due to some synchronization problem that I don't understand.
# since this is a utility method, I don't care too much. -cm
l = []
notfound = []
for k, t in self._index.items():
bucket = self._data.get(t, notfound)
if bucket is notfound:
continue
value = bucket.get(k, notfound)
if value is notfound:
continue
if hasattr(value, '__of__'):
value = value.__of__(self)
l.append(value)
return l
def items(self):
# sloppy and loving it!
# we used to use something like:
# [ (x, self[x]) for x in self.keys() ]
# but it was causing KeyErrors in getitem's "v = self._data[b][k]"
# due to some synchronization problem that I don't understand.
# since this is a utility method, I don't care too much. -cm
l = []
notfound = []
for k, t in self._index.items():
bucket = self._data.get(t, notfound)
if bucket is notfound:
continue
value = bucket.get(k, notfound)
if value is notfound:
continue
if hasattr(value, '__of__'):
value = value.__of__(self)
l.append((k, value))
return l
def true_items(self):
l = []
for bucket in self._data.values():
items = list(bucket.items())
l.extend(items)
return l
def keys(self):
self._getCurrentBucket()
index = self._getIndex()
return list(index.keys())
# proxy security declaration
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'getLen')
# inplace upgrade for versions of Transience in Zope versions less
# than 2.7.1, which used a different transience mechanism. Note:
# this will not work for upgrading versions older than 2.6.0,
# all of which used a very different transience implementation
if not getattr(self, '_max_timeslice', None):
new_slices = getTimeslices(getCurrentTimeslice(), SPARE_BUCKETS*2)
for i in new_slices:
if not self._data.has_key(i):
self._data[i] = OOBTree()
# create an Increaser for max timeslice
self._max_timeslice = Increaser(max(new_slices))
# can't make __len__ an instance variable in new-style classes
if not getattr(self, '_length', None):
length = self.__dict__.get('__len__', Length())
self._length = self.getLen = length
# we should probably delete older attributes such as
# '_last_timeslice', '_deindex_next',and '__len__' here but we leave
# them in order to allow people to switch between 2.6.0->2.7.0 and
# 2.7.1+ as necessary (although that has not been tested)
def getCurrentTimeslice():
"""
Return an integer representing the 'current' timeslice.
The current timeslice is guaranteed to be the same integer
within a 'slice' of time based on a divisor of 'period'.
'period' is the number of seconds in a slice.
"""
now = time.time()
low = int(math.floor(now)) - PERIOD + 1
high = int(math.ceil(now)) + 1
for x in range(low, high):
if x % PERIOD == 0:
return x
def getTimeslices(begin, n):
""" Get a list of future timeslice integers of 'n' size in descending
order """
l = []
for x in range(n):
l.insert(0, begin + (x * PERIOD))
return l
def _assert(case):
if not case:
raise AssertionError
class Increaser(Persistent):
"""
......@@ -930,37 +706,7 @@ class Increaser(Persistent):
return self.value
def _p_resolveConflict(self, old, state1, state2):
DEBUG and TLOG('Resolving conflict in Increaser')
if old <= state1 <= state2: return state2
if old <= state2 <= state1: return state1
return old
def _p_independent(self):
return 1
class Ring(Persistent):
""" ring of buckets. This class is only kept for backwards-compatibility
purposes (Zope 2.5X). """
def __init__(self, l, index):
if not len(l):
raise ValueError, "ring must have at least one element"
DEBUG and TLOG('initial _ring buckets: %s' % map(oid, l))
self._data = l
self._index = index
def __repr__(self):
return repr(self._data)
def __len__(self):
return len(self._data)
def __getitem__(self, i):
return self._data[i]
def turn(self):
last = self._data.pop(-1)
self._data.insert(0, last)
self._p_changed = 1
return max(old, state1, state2)
def _p_independent(self):
return 1
......
......@@ -58,6 +58,7 @@ class TestBase(TestCase):
def setUp(self):
Products.Transience.Transience.time = fauxtime
Products.Transience.TransientObject.time = fauxtime
Products.Transience.Transience.setStrict(1)
self.app = makerequest.makerequest(_getApp())
timeout = self.timeout = 1
sm=TransientObjectContainer(
......@@ -72,6 +73,7 @@ class TestBase(TestCase):
del self.app
Products.Transience.Transience.time = oldtime
Products.Transience.TransientObject.time = oldtime
Products.Transience.Transience.setStrict(0)
class TestLastAccessed(TestBase):
def testLastAccessed(self):
......@@ -92,7 +94,7 @@ class TestLastAccessed(TestBase):
# to get to the next Windows time.time() tick.
fauxtime.sleep(WRITEGRANULARITY + 0.06 * 60)
sdo = self.app.sm.get('TempObject')
assert sdo.getLastAccessed() > la1, (sdo.getLastAccessed(), la1)
self.assert_(sdo.getLastAccessed() > la1)
class TestNotifications(TestBase):
def testAddNotification(self):
......@@ -100,8 +102,8 @@ class TestNotifications(TestBase):
sdo = self.app.sm.new_or_existing('TempObject')
now = fauxtime.time()
k = sdo.get('starttime')
assert type(k) == type(now)
assert k <= now
self.assertEqual(type(k), type(now))
self.assert_(k <= now)
def testDelNotification(self):
self.app.sm.setDelNotificationTarget(delNotificationTarget)
......@@ -110,12 +112,11 @@ class TestNotifications(TestBase):
fauxtime.sleep(timeout + (timeout * .75))
sdo1 = self.app.sm.get('TempObject')
# force the sdm to do housekeeping
self.app.sm._housekeep(self.app.sm._deindex_next() -
self.app.sm._period)
self.app.sm._gc()
now = fauxtime.time()
k = sdo.get('endtime')
assert (type(k) == type(now)), type(k)
assert k <= now, (k, now)
self.assertEqual(type(k), type(now))
self.assert_(k <= now)
def addNotificationTarget(item, context):
item['starttime'] = fauxtime.time()
......
......@@ -25,6 +25,7 @@ class TestTransientObject(TestCase):
def setUp(self):
Products.Transience.Transience.time = fauxtime
Products.Transience.TransientObject.time = fauxtime
Products.Transience.Transience.setStrict(1)
self.errmargin = .20
self.timeout = 60
self.t = TransientObjectContainer('sdc', timeout_mins=self.timeout/60)
......@@ -32,55 +33,56 @@ class TestTransientObject(TestCase):
def tearDown(self):
Products.Transience.Transience.time = oldtime
Products.Transience.TransientObject.time = oldtime
Products.Transience.Transience.setStrict(0)
self.t = None
del self.t
def test_id(self):
t = self.t.new('xyzzy')
assert t.getId() != 'xyzzy'
assert t.getContainerKey() == 'xyzzy'
self.failIfEqual(t.getId(), 'xyzzy') # dont acquire
self.assertEqual(t.getContainerKey(), 'xyzzy')
def test_validate(self):
t = self.t.new('xyzzy')
assert t.isValid()
self.assert_(t.isValid())
t.invalidate()
assert not t.isValid()
self.failIf(t.isValid())
def test_getLastAccessed(self):
t = self.t.new('xyzzy')
ft = fauxtime.time()
assert t.getLastAccessed() <= ft
self.assert_(t.getLastAccessed() <= ft)
def test_getCreated(self):
t = self.t.new('xyzzy')
ft = fauxtime.time()
assert t.getCreated() <= ft
self.assert_(t.getCreated() <= ft)
def test_getLastModifiedUnset(self):
t = self.t.new('xyzzy')
assert t.getLastModified() == None
self.assertEqual(t.getLastModified(), None)
def test_getLastModifiedSet(self):
t = self.t.new('xyzzy')
t['a'] = 1
assert t.getLastModified() is not None
self.failIfEqual(t.getLastModified(), None)
def testSetLastModified(self):
t = self.t.new('xyzzy')
ft = fauxtime.time()
t.setLastModified()
assert t.getLastModified() is not None
self.failIfEqual(t.getLastModified(), None)
def test_setLastAccessed(self):
t = self.t.new('xyzzy')
ft = fauxtime.time()
assert t.getLastAccessed() <= ft
self.assert_(t.getLastAccessed() <= ft)
fauxtime.sleep(self.timeout) # go to sleep past the granuarity
ft2 = fauxtime.time()
t.setLastAccessed()
ft3 = fauxtime.time()
assert t.getLastAccessed() <= ft3
assert t.getLastAccessed() >= ft2
self.assert_(t.getLastAccessed() <= ft3)
self.assert_(t.getLastAccessed() >= ft2)
def _genKeyError(self, t):
return t.get('foobie')
......@@ -91,27 +93,27 @@ class TestTransientObject(TestCase):
def test_dictionaryLike(self):
t = self.t.new('keytest')
t.update(data)
assert t.keys() == data.keys()
assert t.values() == data.values()
assert t.items() == data.items()
self.assertEqual(t.keys(), data.keys())
self.assertEqual(t.values(), data.values())
self.assertEqual(t.items(), data.items())
for k in data.keys():
assert t.get(k) == data.get(k)
assert t.get('foobie') is None
self.assertEqual(t.get(k), data.get(k))
self.assertEqual(t.get('foobie'), None)
self.assertRaises(AttributeError, self._genLenError, t)
assert t.get('foobie',None) is None
assert t.has_key('a')
assert not t.has_key('foobie')
self.assertEqual(t.get('foobie',None), None)
self.assert_(t.has_key('a'))
self.failIf(t.has_key('foobie'))
t.clear()
assert not len(t.keys())
self.assertEqual(len(t.keys()), 0)
def test_TTWDictionary(self):
t = self.t.new('mouthfultest')
t.set('foo', 'bar')
assert t['foo'] == 'bar'
assert t.get('foo') == 'bar'
self.assertEqual(t['foo'], 'bar')
self.assertEqual(t.get('foo'), 'bar')
t.set('foobie', 'blech')
t.delete('foobie')
assert t.get('foobie') is None
self.assertEqual(t.get('foobie'), None)
def test_suite():
......
......@@ -30,14 +30,17 @@ class TestBase(TestCase):
def setUp(self):
Products.Transience.Transience.time = fauxtime
Products.Transience.TransientObject.time = fauxtime
Products.Transience.Transience.setStrict(1)
self.errmargin = .20
self.timeout = 60
self.timeout = 120
self.t = TransientObjectContainer('sdc', timeout_mins=self.timeout/60)
def tearDown(self):
self.t = None
Products.Transience.Transience.time = oldtime
Products.Transience.TransientObject.time = oldtime
Products.Transience.Transience.setStrict(0)
class TestTransientObjectContainer(TestBase):
def testGetItemFails(self):
......@@ -47,30 +50,30 @@ class TestTransientObjectContainer(TestBase):
return self.t[10]
def testGetReturnsDefault(self):
assert self.t.get(10) == None
assert self.t.get(10, 'foo') == 'foo'
self.assertEqual(self.t.get(10), None)
self.assertEqual(self.t.get(10, 'foo'), 'foo')
def testSetItemGetItemWorks(self):
self.t[10] = 1
a = self.t[10]
assert a == 1, `a`
self.assertEqual(a, 1)
def testReplaceWorks(self):
self.t[10] = 1
assert self.t[10] == 1
self.assertEqual(self.t[10], 1)
self.t[10] = 2
assert self.t[10] == 2
self.assertEqual(self.t[10], 2)
def testHasKeyWorks(self):
self.t[10] = 1
assert self.t.has_key(10)
self.failUnless(self.t.has_key(10))
def testValuesWorks(self):
for x in range(10, 110):
self.t[x] = x
v = self.t.values()
v.sort()
assert len(v) == 100
self.assertEqual(len(v), 100)
i = 10
for x in v:
assert x == i
......@@ -81,10 +84,10 @@ class TestTransientObjectContainer(TestBase):
self.t[x] = x
v = self.t.keys()
v.sort()
assert len(v) == 100
self.assertEqual(len(v), 100)
i = 10
for x in v:
assert x == i
self.assertEqual(x, i)
i = i + 1
def testItemsWorks(self):
......@@ -92,11 +95,11 @@ class TestTransientObjectContainer(TestBase):
self.t[x] = x
v = self.t.items()
v.sort()
assert len(v) == 100
self.assertEquals(len(v), 100)
i = 10
for x in v:
assert x[0] == i
assert x[1] == i
self.assertEqual(x[0], i)
self.assertEqual(x[1], i)
i = i + 1
def testDeleteInvalidKeyRaisesKeyError(self):
......@@ -105,63 +108,6 @@ class TestTransientObjectContainer(TestBase):
def _deletefail(self):
del self.t[10]
def donttestDeleteNoChildrenWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
del self.t[4]
assert lsubtract(self.t.keys(), [1,2,3,5,6,10]) == [], `self.t.keys()`
def donttestDeleteOneChildWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
del self.t[3]
assert lsubtract(self.t.keys(), [1,2,4,5,6,10]) == [], `self.t.keys()`
def donttestDeleteTwoChildrenNoInorderSuccessorWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
del self.t[2]
assert lsubtract(self.t.keys(),[1,3,4,5,6,10])==[], `self.t.keys()`
def donttestDeleteTwoChildrenInorderSuccessorWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
self.t[2.5] = 150
del self.t[2]
assert lsubtract(self.t.keys(),[1,2.5,3,4,5,6,10])==[], `self.t.keys()`
def donttestDeleteRootWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
self.t[2.5] = 150
del self.t[5]
assert lsubtract(self.t.keys(),[1,2,2.5,3,4,6,10])==[], `self.t.keys()`
def testRandomNonOverlappingInserts(self):
added = {}
r = range(10, 110)
......@@ -172,7 +118,7 @@ class TestTransientObjectContainer(TestBase):
added[k] = 1
addl = added.keys()
addl.sort()
assert lsubtract(self.t.keys(),addl)==[], `self.t.keys()`
self.assertEqual(lsubtract(self.t.keys(),addl), [])
def testRandomOverlappingInserts(self):
added = {}
......@@ -183,7 +129,7 @@ class TestTransientObjectContainer(TestBase):
added[k] = 1
addl = added.keys()
addl.sort()
assert lsubtract(self.t.keys(), addl) ==[]
self.assertEqual(lsubtract(self.t.keys(), addl), [])
def testRandomDeletes(self):
r = range(10, 1010)
......@@ -204,49 +150,48 @@ class TestTransientObjectContainer(TestBase):
for x in deleted:
if self.t.has_key(x):
badones.append(x)
assert badones == [], (badones, added, deleted)
self.assertEqual(badones, [])
def testTargetedDeletes(self):
r = range(10, 1010)
seen = {}
for x in r:
k = random.choice(r)
vals = seen.setdefault(k, [])
vals.append(x)
self.t[k] = x
couldntdelete = {}
weird = []
results = {}
for x in r:
try:
del self.t[x]
except KeyError:
pass
assert self.t.keys() == [], `self.t.keys()`
ts, item = self.t.__delitem__(x)
results[x] = ts, item
except KeyError, v:
if v.args[0] != x:
weird.append(x)
couldntdelete[x] = v.args[0]
self.assertEqual(self.t.keys(), [])
def testPathologicalRightBranching(self):
r = range(10, 1010)
# NOTE: If the process running this test swaps out inside the loop,
# it can easily cause the test to fail, with a prefix of the expected
# keys missing (the keys added before the interruption expire by the
# time they're checked). This can happen with interruptions of less
# than 1 wall-clock second, so can and does happen.
for x in r:
self.t[x] = 1
assert list(self.t.keys()) == r, (self.t.keys(), r)
# NOTE: The next line may fail even if the line above succeeds: if
# the key age is such that keys *start* to expire right after
# list(self.t.keys()) completes, keys can vanish before __delitem__
# gets to them.
map(self.t.__delitem__, r)
assert list(self.t.keys()) == [], self.t.keys()
self.assertEqual(list(self.t.keys()), [])
def testPathologicalLeftBranching(self):
# See notes for testPathologicalRightBranching.
r = range(10, 1010)
revr = r[:]
revr.reverse()
for x in revr:
self.t[x] = 1
assert list(self.t.keys()) == r, (self.t.keys(), r)
self.assertEqual(list(self.t.keys()), r)
map(self.t.__delitem__, revr)
assert list(self.t.keys()) == [], self.t.keys()
self.assertEqual(list(self.t.keys()), [])
def donttestSuccessorChildParentRewriteExerciseCase(self):
def testSuccessorChildParentRewriteExerciseCase(self):
add_order = [
85, 73, 165, 273, 215, 142, 233, 67, 86, 166, 235, 225, 255,
73, 175, 171, 285, 162, 108, 28, 283, 258, 232, 199, 260,
......@@ -290,37 +235,7 @@ class TestTransientObjectContainer(TestBase):
for x in delete_order:
try: del self.t[x]
except KeyError:
if self.t.has_key(x): assert 1==2,"failed to delete %s" % x
def testChangingTimeoutWorks(self):
# 1 minute
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
# 2 minutes
self.t._setTimeout(self.timeout/60*2)
self.t._reset()
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
# 3 minutes
self.t._setTimeout(self.timeout/60*3)
self.t._reset()
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
self.failIf(self.t.has_key(x))
def testItemsGetExpired(self):
for x in range(10, 110):
......@@ -329,10 +244,15 @@ class TestTransientObjectContainer(TestBase):
fauxtime.sleep(self.timeout * (self.errmargin+1))
for x in range(110, 210):
self.t[x] = x
assert len(self.t.keys()) == 100, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 100)
# call _gc just to make sure __len__ gets changed after a gc
self.t._gc()
self.assertEqual(len(self.t), 100)
# we should still have 100 - 199
for x in range(110, 210):
assert self.t[x] == x
self.assertEqual(self.t[x], x)
# but we shouldn't have 0 - 100
for x in range(10, 110):
try: self.t[x]
......@@ -344,7 +264,7 @@ class TestTransientObjectContainer(TestBase):
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 0)
# 2 minutes
self.t._setTimeout(self.timeout/60*2)
......@@ -352,9 +272,11 @@ class TestTransientObjectContainer(TestBase):
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 100)
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 0)
# 3 minutes
self.t._setTimeout(self.timeout/60*3)
......@@ -362,11 +284,11 @@ class TestTransientObjectContainer(TestBase):
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 100)
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 100)
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 0)
def testGetDelaysTimeout(self):
for x in range(10, 110):
......@@ -377,9 +299,9 @@ class TestTransientObjectContainer(TestBase):
for x in range(10, 110):
self.t.get(x)
fauxtime.sleep(self.timeout/2)
assert len(self.t.keys()) == 100, len(self.t.keys())
self.assertEqual(len(self.t.keys()), 100)
for x in range(10, 110):
assert self.t[x] == x
self.assertEqual(self.t[x], x)
def testSetItemDelaysTimeout(self):
for x in range(10, 110):
......@@ -405,19 +327,25 @@ class TestTransientObjectContainer(TestBase):
added[k] = x
self.assertEqual(len(self.t), len(added))
for k in added.keys():
del self.t[k]
self.assertEqual(len(self.t), 0)
def testResetWorks(self):
self.t[10] = 1
self.t._reset()
assert not self.t.get(10)
self.failIf(self.t.get(10))
def testGetTimeoutMinutesWorks(self):
assert self.t.getTimeoutMinutes() == self.timeout / 60
self.assertEqual(self.t.getTimeoutMinutes(), self.timeout / 60)
self.t._setTimeout(10)
assert self.t.getTimeoutMinutes() == 10
self.assertEqual(self.t.getTimeoutMinutes(), 10)
def test_new(self):
t = self.t.new('foobieblech')
assert issubclass(t.__class__, TransientObject)
self.failUnless(issubclass(t.__class__, TransientObject))
def _dupNewItem(self):
t = self.t.new('foobieblech')
......@@ -430,18 +358,23 @@ class TestTransientObjectContainer(TestBase):
t = self.t.new('foobieblech')
t['hello'] = "Here I am!"
t2 = self.t.new_or_existing('foobieblech')
assert t2['hello'] == "Here I am!"
self.assertEqual(t2['hello'], "Here I am!")
def test_getId(self):
assert self.t.getId() == 'sdc'
self.assertEqual(self.t.getId(), 'sdc')
def testSubobjectLimitWorks(self):
self.t = TransientObjectContainer('a', timeout_mins=self.timeout/60,
limit=10)
self.assertRaises(MaxTransientObjectsExceeded, self._maxOut)
def testUnlimitedSubobjectLimitWorks(self):
self._maxOut()
def testZeroTimeoutMeansPersistForever(self):
self.t._setTimeout(0)
self.t._reset()
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(180)
self.assertEqual(len(self.t.keys()), 100)
def _maxOut(self):
for x in range(11):
......@@ -457,7 +390,6 @@ def lsubtract(l1, l2):
def test_suite():
testsuite = makeSuite(TestTransientObjectContainer, 'test')
#testsuite = makeSuite(TestBase, 'test')
alltests = TestSuite((testsuite,))
return alltests
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment