Commit edea5b56 authored by Gary Poster's avatar Gary Poster

make historical connection cache story a bit more predictable: you set a total...

make historical connection cache story a bit more predictable: you set a total number of historical connections, not one per serial.  Also remove "historical future" connections--I should have seen seen/remembered lastTransaction.  Now attempts to create historical future connections raise an error, as they should.
parent 6a548ad6
......@@ -59,8 +59,7 @@ Blobs
- (3.9.0a1) Fixed bug #129921: getSize() function in BlobStorage could not
deal with garbage files
- (unreleased, after 3.9.0a1) Fixed bug in which MVCC would not work for
blobs.
- (3.9.0a1) Fixed bug in which MVCC would not work for blobs.
BTrees
------
......
......@@ -136,7 +136,7 @@ class Connection(ExportImport, object):
# During commit, all objects go to either _modified or _creating:
# Dict of oid->flag of new objects (without serial), either
# added by add() or implicitely added (discovered by the
# added by add() or implicitly added (discovered by the
# serializer during commit). The flag is True for implicit
# adding. Used during abort to remove created objects from the
# _cache, and by persistent_id to check that a new object isn't
......@@ -322,9 +322,8 @@ class Connection(ExportImport, object):
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self.before is not None and tid > self.before:
# this is an historical connection, and the tid is after the
# freeze. Invalidations are irrelevant.
if self.before is not None:
# this is an historical connection. Invalidations are irrelevant.
return
self._inv_lock.acquire()
try:
......@@ -824,28 +823,11 @@ class Connection(ExportImport, object):
if self.before is not None:
# Load data that was current before the time we have.
if self._txn_time is not None: # MVCC for readonly future conn.
before = self._txn_time
has_invalidated = True
else:
before = self.before
has_invalidated = False
before = self.before
t = self._storage.loadBefore(obj._p_oid, before)
if t is None:
raise POSKeyError()
raise POSKeyError() # historical connection!
p, serial, end = t
if not has_invalidated and end is None:
# MVCC: make sure another thread has not beaten us to the punch
self._inv_lock.acquire()
try:
txn_time = self._txn_time
finally:
self._inv_lock.release()
if txn_time is not None and txn_time < before:
t = self._storage.loadBefore(obj._p_oid, txn_time)
if t is None:
raise POSKeyError()
p, serial, end = t
else:
# There is a harmless data race with self._invalidated. A
......
......@@ -41,7 +41,7 @@ from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZODB.DB')
class _ConnectionPool(object):
class AbstractConnectionPool(object):
"""Manage a pool of connections.
CAUTION: Methods should be called under the protection of a lock.
......@@ -67,43 +67,58 @@ class _ConnectionPool(object):
connectionDebugInfo() can still gather statistics.
"""
def __init__(self, pool_size, timeout=None):
def __init__(self, size, timeout=None):
# The largest # of connections we expect to see alive simultaneously.
self.pool_size = pool_size
self._size = size
# The minimum number of seconds that an available connection should
# be kept, or None.
self.timeout = timeout
self._timeout = timeout
# A weak set of all connections we've seen. A connection vanishes
# from this set if pop() hands it out, it's not reregistered via
# repush(), and it becomes unreachable.
self.all = WeakSet()
# A stack of connections available to hand out. This is a subset
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than pool_size entries
# in this stack. The keys are time.time() values of the push or
# repush calls.
self.available = BTrees.OOBTree.Bucket()
def set_pool_size(self, pool_size):
def setSize(self, size):
"""Change our belief about the expected maximum # of live connections.
If the pool_size is smaller than the current value, this may discard
the oldest available connections.
"""
self.pool_size = pool_size
self._size = size
self._reduce_size()
def set_timeout(self, timeout):
old = self.timeout
self.timeout = timeout
def setTimeout(self, timeout):
old = self._timeout
self._timeout = timeout
if timeout is not None and old != timeout and (
old is None or old > timeout):
self._reduce_size()
def getSize(self):
return self._size
def getTimeout(self):
return self._timeout
timeout = property(getTimeout, setTimeout)
size = property(getSize, setSize)
class ConnectionPool(AbstractConnectionPool):
def __init__(self, size, timeout=None):
super(ConnectionPool, self).__init__(size, timeout)
# A stack of connections available to hand out. This is a subset
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than size entries
# in this stack. The keys are time.time() values of the push or
# repush calls.
self.available = BTrees.OOBTree.Bucket()
def push(self, c):
"""Register a new available connection.
......@@ -116,7 +131,7 @@ class _ConnectionPool(object):
self.all.add(c)
self.available[time()] = c
n = len(self.all)
limit = self.pool_size
limit = self.size
if n > limit:
reporter = logger.warn
if n > 2 * limit:
......@@ -144,7 +159,7 @@ class _ConnectionPool(object):
threshhold = None
else:
threshhold = time() - self.timeout
target = self.pool_size
target = self.size
if strictly_less:
target -= 1
for t, c in list(self.available.items()):
......@@ -208,6 +223,109 @@ class _ConnectionPool(object):
else:
c.cacheGC()
class KeyedConnectionPool(AbstractConnectionPool):
# this pool keeps track of keyed connections all together. It makes
# it possible to make assertions about total numbers of keyed connections.
# The keys in this case are "before" TIDs, but this is used by other
# packages as well.
# see the comments in ConnectionPool for method descriptions.
def __init__(self, size, timeout=None):
super(KeyedConnectionPool, self).__init__(size, timeout)
# key: {time.time: connection}
self.available = BTrees.family32.OO.Bucket()
# time.time: key
self.closed = BTrees.family32.OO.Bucket()
def push(self, c, key):
assert c not in self.all
available = self.available.get(key)
if available is None:
available = self.available[key] = BTrees.family32.OO.Bucket()
else:
assert c not in available.values()
self._reduce_size(strictly_less=True)
self.all.add(c)
t = time()
available[t] = c
self.closed[t] = key
n = len(self.all)
limit = self.size
if n > limit:
reporter = logger.warn
if n > 2 * limit:
reporter = logger.critical
reporter("DB.open() has %s open connections with a size "
"of %s", n, limit)
def repush(self, c, key):
assert c in self.all
self._reduce_size(strictly_less=True)
available = self.available.get(key)
if available is None:
available = self.available[key] = BTrees.family32.OO.Bucket()
else:
assert c not in available.values()
t = time()
available[t] = c
self.closed[t] = key
def _reduce_size(self, strictly_less=False):
if self.timeout is None:
threshhold = None
else:
threshhold = time() - self.timeout
target = self.size
if strictly_less:
target -= 1
for t, key in tuple(self.closed.items()):
if (len(self.available) > target or
threshhold is not None and t < threshhold):
del self.closed[t]
c = self.available[key].pop(t)
if not self.available[key]:
del self.available[key]
self.all.remove(c)
c._resetCache()
else:
break
def reduce_size(self):
self._reduce_size()
def pop(self, key):
result = None
available = self.available.get(key)
if available:
t = available.maxKey()
result = available.pop(t)
del self.closed[t]
if not available:
del self.available[key]
assert result in self.all
return result
def map(self, f):
self.all.map(f)
def availableGC(self):
if self.timeout is None:
threshhold = None
else:
threshhold = time() - self.timeout
for t, key in tuple(self.closed.items()):
if threshhold is not None and t < threshhold:
del self.closed[t]
c = self.available[key].pop(t)
if not self.available[key]:
del self.available[key]
self.all.remove(c)
c._resetCache()
else:
self.available[key][t].cacheGC()
def toTimeStamp(dt):
utc_struct = dt.utctimetuple()
# if this is a leapsecond, this will probably fail. That may be a good
......@@ -263,8 +381,8 @@ class DB(object):
- `Inspection Methods`: getName, getSize, objectCount,
getActivityMonitor, setActivityMonitor
- `Connection Pool Methods`: getPoolSize, getHistoricalPoolSize,
removeHistoricalPool, setPoolSize, setHistoricalPoolSize,
getHistoricalTimeout, setHistoricalTimeout
setPoolSize, setHistoricalPoolSize, getHistoricalTimeout,
setHistoricalTimeout
- `Transaction Methods`: invalidate
- `Other Methods`: lastTransaction, connectionDebugInfo
- `Cache Inspection Methods`: cacheDetail, cacheExtremeDetail,
......@@ -292,8 +410,8 @@ class DB(object):
- `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache
- `historical_pool_size`: expected maximum number of connections (per
historical, or transaction, identifier)
- `historical_pool_size`: expected maximum number of total
historical connections
- `historical_cache_size`: target size of Connection object cache for
historical (`at` or `before`) connections
- `historical_timeout`: minimum number of seconds that
......@@ -304,14 +422,12 @@ class DB(object):
self._a = x.acquire
self._r = x.release
# Setup connection pools and cache info
# _pools maps a tid identifier, or '', to a _ConnectionPool object.
self._pools = {}
self._pool_size = pool_size
# pools and cache sizes
self.pool = ConnectionPool(pool_size)
self.historical_pool = KeyedConnectionPool(historical_pool_size,
historical_timeout)
self._cache_size = cache_size
self._historical_pool_size = historical_pool_size
self._historical_cache_size = historical_cache_size
self._historical_timeout = historical_timeout
# Setup storage
self._storage=storage
......@@ -394,20 +510,10 @@ class DB(object):
if am is not None:
am.closedConnection(connection)
before = connection.before or ''
try:
pool = self._pools[before]
except KeyError:
# No such tid. We must have deleted the pool.
# Just let the connection go.
# We need to break circular refs to make it really go.
# TODO: Figure out exactly which objects are involved in the
# cycle.
connection.__dict__.clear()
return
pool.repush(connection)
if connection.before:
self.historical_pool.repush(connection, connection.before)
else:
self.pool.repush(connection)
finally:
self._r()
......@@ -416,8 +522,8 @@ class DB(object):
"""
self._a()
try:
for pool in self._pools.values():
pool.map(f)
self.pool.map(f)
self.historical_pool.map(f)
finally:
self._r()
......@@ -543,19 +649,19 @@ class DB(object):
return self._storage.getName()
def getPoolSize(self):
return self._pool_size
return self.pool.size
def getSize(self):
return self._storage.getSize()
def getHistoricalCacheSize(self):
def getHistoricalCacheSize(self):
return self._historical_cache_size
def getHistoricalPoolSize(self):
return self._historical_pool_size
return self.historical_pool.size
def getHistoricalTimeout(self):
return self._historical_timeout
return self.historical_pool.timeout
def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
......@@ -602,58 +708,42 @@ class DB(object):
# `at` is normalized to `before`, since we use storage.loadBefore
# as the underlying implementation of both.
before = getTID(at, before)
if (before is not None and
before > self.lastTransaction() and
before > getTID(self.lastTransaction(), None)):
raise ValueError(
'cannot open an historical connection in the future.')
self._a()
try:
# pool <- the _ConnectionPool for this `before` tid
pool = self._pools.get(before or '')
if pool is None:
if before is not None:
size = self._historical_pool_size
timeout = self._historical_timeout
else:
size = self._pool_size
timeout = None
self._pools[before or ''] = pool = _ConnectionPool(
size, timeout)
assert pool is not None
# result <- a connection
result = pool.pop()
if result is None:
if before is not None:
size = self._historical_cache_size
else:
size = self._cache_size
c = self.klass(self, size, before)
pool.push(c)
result = pool.pop()
if before is not None:
result = self.historical_pool.pop(before)
if result is None:
c = self.klass(self, self._historical_cache_size, before)
self.historical_pool.push(c, before)
result = self.historical_pool.pop(before)
else:
result = self.pool.pop()
if result is None:
c = self.klass(self, self._cache_size)
self.pool.push(c)
result = self.pool.pop()
assert result is not None
# Tell the connection it belongs to self.
# open the connection.
result.open(transaction_manager)
# A good time to do some cache cleanup.
# (note we already have the lock)
for key, pool in tuple(self._pools.items()):
pool.availableGC()
if not len(pool.available) and not len(pool.all):
del self._pools[key]
self.pool.availableGC()
self.historical_pool.availableGC()
return result
finally:
self._r()
def removeHistoricalPool(self, at=None, before=None):
if at is None and before is None:
raise ValueError('must pass one of `at` or `before`')
before = getTID(at, before)
try:
del self._pools[before]
except KeyError:
pass
def connectionDebugInfo(self):
result = []
t = time()
......@@ -717,11 +807,9 @@ class DB(object):
self._a()
try:
self._cache_size = size
pool = self._pools.get('')
if pool is not None:
def setsize(c):
c._cache.cache_size = size
pool.map(setsize)
def setsize(c):
c._cache.cache_size = size
self.pool.map(setsize)
finally:
self._r()
......@@ -731,38 +819,28 @@ class DB(object):
self._historical_cache_size = size
def setsize(c):
c._cache.cache_size = size
for tid, pool in self._pools.items():
if tid:
pool.map(setsize)
self.historical_pool.map(setsize)
finally:
self._r()
def setPoolSize(self, size):
self._pool_size = size
self._reset_pool_sizes(size, for_historical=False)
def setHistoricalPoolSize(self, size):
self._historical_pool_size = size
self._reset_pool_sizes(size, for_historical=True)
self._a()
try:
self.pool.size = size
finally:
self._r()
def _reset_pool_sizes(self, size, for_historical=False):
def setHistoricalPoolSize(self, size):
self._a()
try:
for tid, pool in self._pools.items():
if (tid != '') == for_historical:
pool.set_pool_size(size)
self.historical_pool.size = size
finally:
self._r()
def setHistoricalTimeout(self, timeout):
self._historical_timeout = timeout
self._a()
try:
for tid, pool in tuple(self._pools.items()):
if tid:
pool.set_timeout(timeout)
if not pool.available and not pool.all:
del self._pools[tid]
self.historical_pool.timeout = timeout
finally:
self._r()
......
......@@ -131,9 +131,9 @@ no matter what you pass into ``db.open``.
Configuration
=============
Like normal connections, the database lets you set how many historical
connections can be active without generating a warning for a given serial, and
how many objects should be kept in each connection's object cache.
Like normal connections, the database lets you set how many total historical
connections can be active without generating a warning, and
how many objects should be kept in each historical connection's object cache.
>>> db.getHistoricalPoolSize()
3
......@@ -182,32 +182,72 @@ Let's actually look at these values at work by shining some light into what
has been a black box up to now. We'll actually do some white box examination
of what is going on in the database, pools and connections.
First we'll clean out all the old historical pools so we have a clean slate.
Historical connections are held in a single connection pool with mappings
from the ``before`` TID to available connections. First we'll put a new
pool on the database so we have a clean slate.
>>> historical_conn.close()
>>> db.removeHistoricalPool(at=now)
>>> db.removeHistoricalPool(at=historical_serial)
>>> db.removeHistoricalPool(before=serial)
>>> from ZODB.DB import KeyedConnectionPool
>>> db.historical_pool = KeyedConnectionPool(
... db.historical_pool.size, db.historical_pool.timeout)
Now lets look what happens to the pools when we create an historical
Now lets look what happens to the pool when we create and close an historical
connection.
>>> pools = db._pools
>>> len(pools)
1
>>> pools.keys()
['']
>>> pool = db.historical_pool
>>> len(pool.all)
0
>>> len(pool.available)
0
>>> historical_conn = db.open(
... transaction_manager=transaction1, before=serial)
>>> len(pools)
2
>>> set(pools.keys()) == set(('', serial))
>>> len(pool.all)
1
>>> len(pool.available)
0
>>> historical_conn in pool.all
True
>>> historical_conn.close()
>>> len(pool.all)
1
>>> len(pool.available)
1
>>> pool.available.keys()[0] == serial
True
>>> len(pool.available.values()[0])
1
Now we'll open and close two for the same serial to see what happens to the
data structures.
>>> historical_conn is db.open(
... transaction_manager=transaction1, before=serial)
True
>>> pool = pools[serial]
>>> len(pool.all)
1
>>> len(pool.available)
0
>>> transaction2 = transaction.TransactionManager()
>>> historical_conn2 = db.open(
... transaction_manager=transaction2, before=serial)
>>> len(pool.all)
2
>>> len(pool.available)
0
>>> historical_conn2.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
1
>>> historical_conn.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
2
If you change the historical cache size, that changes the size of the
persistent cache on our connection.
......@@ -218,26 +258,18 @@ persistent cache on our connection.
>>> historical_conn._cache.cache_size
1500
Now let's look at pool sizes. We'll set it to two, then make and close three
Now let's look at pool sizes. We'll set it to two, then open and close three
connections. We should end up with only two available connections.
>>> db.setHistoricalPoolSize(2)
>>> transaction2 = transaction.TransactionManager()
>>> historical_conn = db.open(
... transaction_manager=transaction1, before=serial)
>>> historical_conn2 = db.open(
... transaction_manager=transaction2, before=serial)
>>> len(pools)
2
>>> len(pool.all)
2
>>> len(pool.available)
0
>>> transaction3 = transaction.TransactionManager()
>>> historical_conn3 = db.open(
... transaction_manager=transaction3, before=serial)
>>> len(pools)
2
... transaction_manager=transaction3, at=historical_serial)
>>> len(pool.all)
3
>>> len(pool.available)
......@@ -248,23 +280,35 @@ connections. We should end up with only two available connections.
3
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
1
>>> historical_conn2.close()
>>> len(pool.all)
3
>>> len(pool.available)
2
>>> len(pool.available.values()[0])
1
>>> len(pool.available.values()[1])
1
>>> historical_conn.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
2
Notice it dumped the one that was closed at the earliest time.
Finally, we'll look at the timeout. We'll need to monkeypatch ``time`` for
this. (The funky __import__ of DB is because some ZODB __init__ shenanigans
make the DB class mask the DB module.)
>>> db.getHistoricalTimeout()
400
>>> import time
>>> delta = 200
>>> def stub_time():
......@@ -276,8 +320,6 @@ make the DB class mask the DB module.)
>>> historical_conn = db.open(before=serial)
>>> len(pools)
2
>>> len(pool.all)
2
>>> len(pool.available)
......@@ -288,35 +330,22 @@ A close or an open will do garbage collection on the timed out connections.
>>> delta += 200
>>> historical_conn.close()
>>> len(pools)
2
>>> len(pool.all)
1
>>> len(pool.available)
1
An open also does garbage collection on the pools themselves.
>>> delta += 400
>>> conn = db.open() # normal connection
>>> len(pools)
>>> len(pool.available.values()[0])
1
>>> len(pool.all)
0
>>> len(pool.available)
0
>>> serial in pools
False
Invalidations
=============
In general, invalidations are ignored for historical connections, assuming
that you have really specified a point in history. This is another white box
Invalidations are ignored for historical connections. This is another white box
test.
>>> historical_conn = db.open(
... transaction_manager=transaction1, at=serial)
>>> conn = db.open()
>>> sorted(conn.root().keys())
['first', 'second']
>>> conn.root()['first']['count']
......@@ -332,35 +361,13 @@ test.
0
>>> historical_conn.close()
If you specify a time in the future, you get a read-only connection that
invalidates, rather than an error. The main reason for this is that, in some
cases, the most recent transaction id is in the future, so there's not an easy
way to reasonably disallow values. Beyond that, it's useful to have readonly
connections, though this spelling isn't quite appealing for the general case.
This "future history" also works with MVCC.
Note that if you try to open an historical connection to a time in the future,
you will get an error.
>>> THE_FUTURE = datetime.datetime(2038, 1, 19)
>>> historical_conn = db.open(
... transaction_manager=transaction1, at=THE_FUTURE)
>>> conn.root()['first']['count'] += 1
>>> conn.root()['fourth'] = persistent.mapping.PersistentMapping()
>>> transaction.commit()
>>> len(historical_conn._invalidated)
2
>>> historical_conn.root()['first']['count'] # MVCC
2
>>> historical_conn.sync()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.root()['first']['count']
3
>>> historical_conn.root()['first']['count'] = 0
>>> transaction1.commit()
>>> historical_conn = db.open(at=datetime.datetime.utcnow())
Traceback (most recent call last):
...
ReadOnlyHistoryError
>>> transaction1.abort()
>>> historical_conn.close()
ValueError: cannot open an historical connection in the future.
Warnings
========
......
......@@ -146,7 +146,7 @@ Reaching into the internals, we can see that db's connection pool now has
two connections available for reuse, and knows about three connections in
all:
>>> pool = db._pools['']
>>> pool = db.pool
>>> len(pool.available)
2
>>> len(pool.all)
......@@ -219,7 +219,7 @@ closed one out of the available connection stack.
>>> conns = [db.open() for dummy in range(6)]
>>> len(handler.records) # 3 warnings for the "excess" connections
3
>>> pool = db._pools['']
>>> pool = db.pool
>>> len(pool.available), len(pool.all)
(0, 6)
......@@ -297,7 +297,7 @@ Now open more connections so that the total exceeds pool_size (2):
>>> conn1 = db.open()
>>> conn2 = db.open()
>>> pool = db._pools['']
>>> pool = db.pool
>>> len(pool.all), len(pool.available) # all Connections are in use
(3, 0)
......
......@@ -62,85 +62,6 @@ class DBTests(unittest.TestCase):
self.db.setCacheSize(15)
self.db.setHistoricalCacheSize(15)
def test_removeHistoricalPool(self):
# Test that we can remove a historical pool
# This is white box because we check some internal data structures
serial1, root_serial1 = self.dowork()
now = datetime.datetime.utcnow()
serial2, root_serial2 = self.dowork()
self.failUnless(root_serial1 < root_serial2)
c1 = self.db.open(at=now)
root = c1.root()
root.keys() # wake up object to get proper serial set
self.assertEqual(root._p_serial, root_serial1)
c1.close() # return to pool
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
pools = self.db._pools
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
self.db.removeHistoricalPool(at=now)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
def _test_for_leak(self):
self.dowork()
now = datetime.datetime.utcnow()
self.dowork()
while 1:
c1 = self.db.open(at=now)
self.db.removeHistoricalPool(at=now)
c1.close() # return to pool
def test_removeHistoricalPool_while_connection_open(self):
# Test that we can remove a version pool
# This is white box because we check some internal data structures
self.dowork()
now = datetime.datetime.utcnow()
self.dowork()
c1 = self.db.open(at=now)
c1.close() # return to pool
c12 = self.db.open(at=now)
self.assert_(c1 is c12) # should be same
pools = self.db._pools
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
self.db.removeHistoricalPool(at=now)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12.close() # should leave pools alone
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
def test_references(self):
# TODO: For now test that we're using referencesf. We really should
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment