Commit 4ac3ab3e authored by Tim Peters's avatar Tim Peters

Merge tim-simpler_connection branch.

There's no longer a hard limit on # of open connections per DB.

Introduced a sane scheme for raising deprecation warnings.
Sane ==

1. The machinery ensures that a "this will be removed in ZODB 3.6"
   blurb gets attached to all deprecation warnings.

and

2. It will dead easy to find these when it's time for 3.6.
parent 5485da8a
......@@ -2,6 +2,33 @@ What's new in ZODB3 3.4?
========================
Release date: DD-MMM-2004
DB
--
- There is no longer a hard limit on the number of connections that
``DB.open()`` will create. In other words, ``DB.open()`` never blocks
anymore waiting for an earlier connection to close, and ``DB.open()``
always returns a connection now (while it wasn't documented, it was
possible for ``DB.open()`` to return ``None`` before).
``pool_size`` continues to default to 7, but its meaning has changed:
if more than ``pool_size`` connections are obtained from ``DB.open()``
and not closed, a warning is logged; if more than twice ``pool_size``, a
critical problem is logged. ``pool_size`` should be set to the maximum
number of connections from the ``DB`` instance you expect to have open
simultaneously.
In addition, if a connection obtained from ``DB.open()`` becomes
unreachable without having been explicitly closed, when Python's garbage
collection reclaims that connection it no longer counts against the
``pool_size`` thresholds for logging messages.
The following optional arguments to ``DB.open()`` are deprecated:
``transaction``, ``waitflag``, ``force`` and ``temporary``. If one
is specified, its value is ignored, and ``DeprecationWarning`` is
raised. In ZODB 3.6, these optional arguments will be removed.
Tools
-----
......
......@@ -182,6 +182,7 @@ def copy_other_files(cmd, outputbase):
"ZConfig/tests/library/widget",
"ZEO",
"ZODB",
"ZODB/tests",
"zdaemon",
"zdaemon/tests",
]:
......
......@@ -34,6 +34,8 @@ from ZODB.TmpStore import TmpStore
from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
from ZODB.interfaces import IConnection
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
global_reset_counter = 0
......@@ -262,9 +264,8 @@ class Connection(ExportImport, object):
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
warnings.warn("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.",
DeprecationWarning)
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
......@@ -276,9 +277,8 @@ class Connection(ExportImport, object):
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
warnings.warn("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.",
DeprecationWarning)
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
......@@ -486,14 +486,14 @@ class Connection(ExportImport, object):
def cacheFullSweep(self, dt=None):
# XXX needs doc string
warnings.warn("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.", DeprecationWarning)
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=None):
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
......@@ -503,9 +503,8 @@ class Connection(ExportImport, object):
:Parameters:
- `dt`: ignored. It is provided only for backwards compatibility.
"""
if dt is not None:
warnings.warn("The dt argument to cacheMinimize is ignored.",
DeprecationWarning)
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
def cacheGC(self):
......@@ -781,8 +780,8 @@ class Connection(ExportImport, object):
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
warnings.warn("Assigning to _p_jar is deprecated",
DeprecationWarning)
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
......
......@@ -16,7 +16,7 @@
$Id$"""
import cPickle, cStringIO, sys
from thread import allocate_lock
import threading
from time import time, ctime
import warnings
import logging
......@@ -25,11 +25,117 @@ from ZODB.broken import find_global
from ZODB.utils import z64
from ZODB.Connection import Connection
from ZODB.serialize import referencesf
from ZODB.utils import WeakSet
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
import transaction
logger = logging.getLogger('ZODB.DB')
class _ConnectionPool(object):
"""Manage a pool of connections.
CAUTION: Methods should be called under the protection of a lock.
This class does no locking of its own.
There's no limit on the number of connections this can keep track of,
but a warning is logged if there are more than pool_size active
connections, and a critical problem if more than twice pool_size.
New connections are registered via push(). This will log a message if
"too many" connections are active.
When a connection is explicitly closed, tell the pool via repush().
That adds the connection to a stack of connections available for
reuse, and throws away the oldest stack entries if the stack is too large.
pop() pops this stack.
When a connection is obtained via pop(), the pool holds only a weak
reference to it thereafter. It's not necessary to inform the pool
if the connection goes away. A connection handed out by pop() counts
against pool_size only so long as it exists, and provided it isn't
repush()'ed. A weak reference is retained so that DB methods like
connectionDebugInfo() can still gather statistics.
"""
def __init__(self, pool_size):
# The largest # of connections we expect to see alive simultaneously.
self.pool_size = pool_size
# A weak set of all connections we've seen. A connection vanishes
# from this set if pop() hands it out, it's not reregistered via
# repush(), and it becomes unreachable.
self.all = WeakSet()
# A stack of connections available to hand out. This is a subset
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than pool_size entries
# in this stack.
# In Python 2.4, a collections.deque would make more sense than
# a list (we push only "on the right", but may pop from both ends).
self.available = []
# Change our belief about the expected maximum # of live connections.
# If the pool_size is smaller than the current value, this may discard
# the oldest available connections.
def set_pool_size(self, pool_size):
self.pool_size = pool_size
self._reduce_size()
# Register a new available connection. We must not know about c already.
# c will be pushed onto the available stack even if we're over the
# pool size limit.
def push(self, c):
assert c not in self.all
assert c not in self.available
self._reduce_size(strictly_less=True)
self.all.add(c)
self.available.append(c)
n, limit = len(self.all), self.pool_size
if n > limit:
reporter = logger.warn
if n > 2 * limit:
reporter = logger.critical
reporter("DB.open() has %s open connections with a pool_size "
"of %s", n, limit)
# Reregister an available connection formerly obtained via pop(). This
# pushes it on the stack of available connections, and may discard
# older available connections.
def repush(self, c):
assert c in self.all
assert c not in self.available
self._reduce_size(strictly_less=True)
self.available.append(c)
# Throw away the oldest available connections until we're under our
# target size (strictly_less=False) or no more than that (strictly_less=
# True, the default).
def _reduce_size(self, strictly_less=False):
target = self.pool_size - bool(strictly_less)
while len(self.available) > target:
c = self.available.pop(0)
self.all.remove(c)
# Pop an available connection and return it, or return None if none are
# available. In the latter case, the caller should create a new
# connection, register it via push(), and call pop() again. The
# caller is responsible for serializing this sequence.
def pop(self):
result = None
if self.available:
result = self.available.pop()
# Leave it in self.all, so we can still get at it for statistics
# while it's alive.
assert result in self.all
return result
# Return a list of all connections we currently know about.
def all_as_list(self):
return self.all.as_list()
class DB(object):
"""The Object Database
-------------------
......@@ -41,9 +147,9 @@ class DB(object):
The DB instance manages a pool of connections. If a connection is
closed, it is returned to the pool and its object cache is
preserved. A subsequent call to open() will reuse the connection.
There is a limit to the pool size; if all its connections are in
use, calls to open() will block until one of the open connections
is closed.
There is no hard limit on the pool size. If more than `pool_size`
connections are opened, a warning is logged, and if more than twice
that many, a critical problem is logged.
The class variable 'klass' is used by open() to create database
connections. It is set to Connection, but a subclass could override
......@@ -81,41 +187,42 @@ class DB(object):
def __init__(self, storage,
pool_size=7,
cache_size=400,
cache_deactivate_after=None,
cache_deactivate_after=DEPRECATED_ARGUMENT,
version_pool_size=3,
version_cache_size=100,
version_cache_deactivate_after=None,
version_cache_deactivate_after=DEPRECATED_ARGUMENT,
):
"""Create an object database.
:Parameters:
- `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: maximum number of open connections
- `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache
- `cache_deactivate_after`: ignored
- `version_pool_size`: maximum number of connections (per version)
- `version_pool_size`: expected maximum number of connections (per
version)
- `version_cache_size`: target size of Connection object cache for
version connections
- `cache_deactivate_after`: ignored
- `version_cache_deactivate_after`: ignored
"""
# Allocate locks:
l = allocate_lock()
self._a = l.acquire
self._r = l.release
# Allocate lock.
x = threading.RLock()
self._a = x.acquire
self._r = x.release
# Setup connection pools and cache info
self._pools = {},[]
self._temps = []
# _pools maps a version string to a _ConnectionPool object.
self._pools = {}
self._pool_size = pool_size
self._cache_size = cache_size
self._version_pool_size = version_pool_size
self._version_cache_size = version_cache_size
# warn about use of deprecated arguments
if (cache_deactivate_after is not None or
version_cache_deactivate_after is not None):
warnings.warn("cache_deactivate_after has no effect",
DeprecationWarning)
if cache_deactivate_after is not DEPRECATED_ARGUMENT:
deprecated36("cache_deactivate_after has no effect")
if version_cache_deactivate_after is not DEPRECATED_ARGUMENT:
deprecated36("version_cache_deactivate_after has no effect")
self._miv_cache = {}
......@@ -151,6 +258,7 @@ class DB(object):
if hasattr(storage, 'undoInfo'):
self.undoInfo = storage.undoInfo
# This is called by Connection.close().
def _closeConnection(self, connection):
"""Return a connection to the pool.
......@@ -165,10 +273,10 @@ class DB(object):
am = self._activity_monitor
if am is not None:
am.closedConnection(connection)
version = connection._version
pools, pooll = self._pools
try:
pool, allocated, pool_lock = pools[version]
pool = self._pools[version]
except KeyError:
# No such version. We must have deleted the pool.
# Just let the connection go.
......@@ -177,30 +285,18 @@ class DB(object):
# XXX What objects are involved in the cycle?
connection.__dict__.clear()
return
pool.repush(connection)
pool.append(connection)
if len(pool) == 1:
# Pool now usable again, unlock it.
pool_lock.release()
finally:
self._r()
# Call f(c) for all connections c in all pools in all versions.
def _connectionMap(self, f):
self._a()
try:
pools, pooll = self._pools
for pool, allocated in pooll:
for cc in allocated:
f(cc)
temps = self._temps
if temps:
t = []
rc = sys.getrefcount
for cc in temps:
if rc(cc) > 3:
f(cc)
self._temps = t
for pool in self._pools.values():
for c in pool.all_as_list():
f(c)
finally:
self._r()
......@@ -216,12 +312,12 @@ class DB(object):
"""
detail = {}
def f(con, detail=detail, have_detail=detail.has_key):
def f(con, detail=detail):
for oid, ob in con._cache.items():
module = getattr(ob.__class__, '__module__', '')
module = module and '%s.' % module or ''
c = "%s%s" % (module, ob.__class__.__name__)
if have_detail(c):
if c in detail:
detail[c] += 1
else:
detail[c] = 1
......@@ -276,7 +372,7 @@ class DB(object):
self._connectionMap(lambda c: c._cache.full_sweep())
def cacheLastGCTime(self):
m=[0]
m = [0]
def f(con, m=m):
t = con._cache.cache_last_gc_time
if t > m[0]:
......@@ -289,7 +385,7 @@ class DB(object):
self._connectionMap(lambda c: c._cache.minimize())
def cacheSize(self):
m=[0]
m = [0]
def f(con, m=m):
m[0] += con._cache.cache_non_ghost_count
......@@ -299,9 +395,9 @@ class DB(object):
def cacheDetailSize(self):
m = []
def f(con, m=m):
m.append({'connection':repr(con),
'ngsize':con._cache.cache_non_ghost_count,
'size':len(con._cache)})
m.append({'connection': repr(con),
'ngsize': con._cache.cache_non_ghost_count,
'size': len(con._cache)})
self._connectionMap(f)
m.sort()
return m
......@@ -358,39 +454,24 @@ class DB(object):
if connection is not None:
version = connection._version
# Update modified in version cache
# XXX must make this work with list or dict to backport to 2.6
for oid in oids.keys():
h = hash(oid) % 131
o = self._miv_cache.get(h, None)
if o is not None and o[0]==oid:
del self._miv_cache[h]
# Notify connections
for pool, allocated in self._pools[1]:
for cc in allocated:
if (cc is not connection and
(not version or cc._version==version)):
if sys.getrefcount(cc) <= 3:
cc.close()
cc.invalidate(tid, oids)
if self._temps:
t = []
for cc in self._temps:
if sys.getrefcount(cc) > 3:
if (cc is not connection and
(not version or cc._version == version)):
cc.invalidate(tid, oids)
t.append(cc)
else:
cc.close()
self._temps = t
# Notify connections.
def inval(c):
if (c is not connection and
(not version or c._version == version)):
c.invalidate(tid, oids)
self._connectionMap(inval)
def modifiedInVersion(self, oid):
h = hash(oid) % 131
cache = self._miv_cache
o=cache.get(h, None)
if o and o[0]==oid:
o = cache.get(h, None)
if o and o[0] == oid:
return o[1]
v = self._storage.modifiedInVersion(oid)
cache[h] = oid, v
......@@ -399,202 +480,107 @@ class DB(object):
def objectCount(self):
return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
waitflag=1, mvcc=True, txn_mgr=None, synch=True):
def open(self, version='',
transaction=DEPRECATED_ARGUMENT, temporary=DEPRECATED_ARGUMENT,
force=DEPRECATED_ARGUMENT, waitflag=DEPRECATED_ARGUMENT,
mvcc=True, txn_mgr=None, synch=True):
"""Return a database Connection for use by application code.
The optional version argument can be used to specify that a
The optional `version` argument can be used to specify that a
version connection is desired.
The optional transaction argument can be provided to cause the
connection to be automatically closed when a transaction is
terminated. In addition, connections per transaction are
reused, if possible.
Note that the connection pool is managed as a stack, to
increate the likelihood that the connection's stack will
increase the likelihood that the connection's stack will
include useful objects.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `transaction`: XXX
- `temporary`: XXX
- `force`: XXX
- `waitflag`: XXX
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
if temporary is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() temporary= ignored. "
"open() no longer blocks.")
if force is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() force= ignored. "
"open() no longer blocks.")
if waitflag is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() waitflag= ignored. "
"open() no longer blocks.")
if transaction is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() transaction= ignored.")
self._a()
try:
if transaction is not None:
connections = transaction._connections
if connections:
if connections.has_key(version) and not temporary:
return connections[version]
else:
transaction._connections = connections = {}
transaction = transaction._connections
if temporary:
# This is a temporary connection.
# We won't bother with the pools. This will be
# a one-use connection.
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
c._setDB(self)
self._temps.append(c)
if transaction is not None:
transaction[id(c)] = c
return c
pools, pooll = self._pools
# pools is a mapping object:
#
# {version -> (pool, allocated, lock)
#
# where:
#
# pool is the connection pool for the version,
# allocated is a list of all of the allocated
# connections, and
# lock is a lock that is used to block when a pool is
# empty and no more connections can be allocated.
#
# pooll is a list of all of the pools and allocated for
# use in cases where we need to iterate over all
# connections or all inactive connections.
# Pool locks are tricky. Basically, the lock needs to be
# set whenever the pool becomes empty so that threads are
# forced to wait until the pool gets a connection in it.
# The lock is acquired when the (empty) pool is
# created. The lock is acquired just prior to removing
# the last connection from the pool and released just after
# adding a connection to an empty pool.
if pools.has_key(version):
pool, allocated, pool_lock = pools[version]
# pool <- the _ConnectionPool for this version
pool = self._pools.get(version)
if pool is None:
if version:
size = self._version_pool_size
else:
pool, allocated, pool_lock = pools[version] = (
[], [], allocate_lock())
pooll.append((pool, allocated))
pool_lock.acquire()
size = self._pool_size
self._pools[version] = pool = _ConnectionPool(size)
assert pool is not None
if not pool:
c = None
# result <- a connection
result = pool.pop()
if result is None:
if version:
if self._version_pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c)
pool.append(c)
elif self._pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._cache_size,
mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
allocated.append(c)
pool.append(c)
if c is None:
if waitflag:
self._r()
pool_lock.acquire()
self._a()
if len(pool) > 1:
# Note that the pool size will normally be 1 here,
# but it could be higher due to a race condition.
pool_lock.release()
size = self._version_cache_size
else:
return
size = self._cache_size
c = self.klass(version=version, cache_size=size,
mvcc=mvcc, txn_mgr=txn_mgr)
pool.push(c)
result = pool.pop()
assert result is not None
elif len(pool)==1:
# Taking last one, lock the pool.
# Note that another thread might grab the lock
# before us, so we might actually block, however,
# when we get the lock back, there *will* be a
# connection in the pool. OTOH, there's no limit on
# how long we may need to wait: if the other thread
# grabbed the lock in this section too, we'll wait
# here until another connection is closed.
# checkConcurrentUpdates1Storage provoked this frequently
# on a hyperthreaded machine, with its second thread
# timing out after waiting 5 minutes for DB.open() to
# return. So, if we can't get the pool lock immediately,
# now we make a recursive call. This allows the current
# thread to allocate a new connection instead of waiting
# arbitrarily long for the single connection in the pool
# right now.
self._r()
if not pool_lock.acquire(0):
result = DB.open(self, version, transaction, temporary,
force, waitflag)
self._a()
return result
self._a()
if len(pool) > 1:
# Note that the pool size will normally be 1 here,
# but it could be higher due to a race condition.
pool_lock.release()
# Tell the connection it belongs to self.
result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
c = pool.pop()
c._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
for pool, allocated in pooll:
for cc in pool:
cc.cacheGC()
# A good time to do some cache cleanup.
self._connectionMap(lambda c: c.cacheGC())
if transaction is not None:
transaction[version] = c
return c
return result
finally:
self._r()
def removeVersionPool(self, version):
pools, pooll = self._pools
info = pools.get(version)
if info:
del pools[version]
pool, allocated, pool_lock = info
pooll.remove((pool, allocated))
try:
pool_lock.release()
except: # XXX Do we actually expect this to fail?
del self._pools[version]
except KeyError:
pass
del pool[:]
del allocated[:]
def connectionDebugInfo(self):
r = []
pools, pooll = self._pools
result = []
t = time()
for version, (pool, allocated, lock) in pools.items():
for c in allocated:
def f(c):
o = c._opened
d = c._debug_info
if d:
if len(d)==1:
if len(d) == 1:
d = d[0]
else:
d=''
d = ''
d = "%s (%s)" % (d, len(c._cache))
r.append({
result.append({
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'info': d,
'version': version,
})
return r
self._connectionMap(f)
return result
def getActivityMonitor(self):
return self._activity_monitor
......@@ -623,33 +609,51 @@ class DB(object):
logger.error("packing", exc_info=True)
raise
def setCacheSize(self, v):
self._cache_size = v
d = self._pools[0]
pool_info = d.get('')
if pool_info is not None:
for c in pool_info[1]:
c._cache.cache_size = v
def setActivityMonitor(self, am):
self._activity_monitor = am
def classFactory(self, connection, modulename, globalname):
# Zope will rebind this method to arbitrary user code at runtime.
return find_global(modulename, globalname)
def setPoolSize(self, v):
self._pool_size = v
def setActivityMonitor(self, am):
self._activity_monitor = am
def setCacheSize(self, v):
self._a()
try:
self._cache_size = v
pool = self._pools.get('')
if pool is not None:
for c in pool.all_as_list():
c._cache.cache_size = v
finally:
self._r()
def setVersionCacheSize(self, v):
self._a()
try:
self._version_cache_size = v
for ver in self._pools[0].keys():
if ver:
for c in self._pools[0][ver][1]:
for version, pool in self._pools.items():
if version:
for c in pool.all_as_list():
c._cache.cache_size = v
finally:
self._r()
def setPoolSize(self, size):
self._pool_size = size
self._reset_pool_sizes(size, for_versions=False)
def setVersionPoolSize(self, size):
self._version_pool_size = size
self._reset_pool_sizes(size, for_versions=True)
def setVersionPoolSize(self, v):
self._version_pool_size=v
def _reset_pool_sizes(self, size, for_versions=False):
self._a()
try:
for version, pool in self._pools.items():
if (version != '') == for_versions:
pool.set_pool_size(size)
finally:
self._r()
def undo(self, id, txn=None):
"""Undo a transaction identified by id.
......@@ -679,23 +683,19 @@ class DB(object):
def getCacheDeactivateAfter(self):
"""Deprecated"""
warnings.warn("cache_deactivate_after has no effect",
DeprecationWarning)
deprecated36("getCacheDeactivateAfter has no effect")
def getVersionCacheDeactivateAfter(self):
"""Deprecated"""
warnings.warn("cache_deactivate_after has no effect",
DeprecationWarning)
deprecated36("getVersionCacheDeactivateAfter has no effect")
def setCacheDeactivateAfter(self, v):
"""Deprecated"""
warnings.warn("cache_deactivate_after has no effect",
DeprecationWarning)
deprecated36("setCacheDeactivateAfter has no effect")
def setVersionCacheDeactivateAfter(self, v):
"""Deprecated"""
warnings.warn("cache_deactivate_after has no effect",
DeprecationWarning)
deprecated36("setVersionCacheDeactivateAfter has no effect")
class ResourceManager(object):
"""Transaction participation for a version or undo resource."""
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
Here we exercise the connection management done by the DB class.
>>> from ZODB import DB
>>> from ZODB.MappingStorage import MappingStorage as Storage
Capturing log messages from DB is important for some of the examples:
>>> from zope.testing.loggingsupport import InstalledHandler
>>> handler = InstalledHandler('ZODB.DB')
Create a storage, and wrap it in a DB wrapper:
>>> st = Storage()
>>> db = DB(st)
By default, we can open 7 connections without any log messages:
>>> conns = [db.open() for dummy in range(7)]
>>> handler.records
[]
Open one more, and we get a warning:
>>> conns.append(db.open())
>>> len(handler.records)
1
>>> msg = handler.records[0]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 8 open connections with a pool_size of 7
Open 6 more, and we get 6 more warnings:
>>> conns.extend([db.open() for dummy in range(6)])
>>> len(conns)
14
>>> len(handler.records)
7
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 14 open connections with a pool_size of 7
Add another, so that it's more than twice the default, and the level
rises to critical:
>>> conns.append(db.open())
>>> len(conns)
15
>>> len(handler.records)
8
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB CRITICAL DB.open() has 15 open connections with a pool_size of 7
While it's boring, it's important to verify that the same relationships
hold if the default pool size is overridden.
>>> handler.clear()
>>> st.close()
>>> st = Storage()
>>> PS = 2 # smaller pool size
>>> db = DB(st, pool_size=PS)
>>> conns = [db.open() for dummy in range(PS)]
>>> handler.records
[]
A warning for opening one more:
>>> conns.append(db.open())
>>> len(handler.records)
1
>>> msg = handler.records[0]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 3 open connections with a pool_size of 2
More warnings through 4 connections:
>>> conns.extend([db.open() for dummy in range(PS-1)])
>>> len(conns)
4
>>> len(handler.records)
2
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 4 open connections with a pool_size of 2
And critical for going beyond that:
>>> conns.append(db.open())
>>> len(conns)
5
>>> len(handler.records)
3
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB CRITICAL DB.open() has 5 open connections with a pool_size of 2
We can change the pool size on the fly:
>>> handler.clear()
>>> db.setPoolSize(6)
>>> conns.append(db.open())
>>> handler.records # no log msg -- the pool is bigger now
[]
>>> conns.append(db.open()) # but one more and there's a warning again
>>> len(handler.records)
1
>>> msg = handler.records[0]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 7 open connections with a pool_size of 6
Enough of that.
>>> handler.clear()
>>> st.close()
More interesting is the stack-like nature of connection reuse. So long as
we keep opening new connections, and keep them alive, all connections
returned are distinct:
>>> st = Storage()
>>> db = DB(st)
>>> c1 = db.open()
>>> c2 = db.open()
>>> c3 = db.open()
>>> c1 is c2 or c1 is c3 or c2 is c3
False
Let's put some markers on the connections, so we can identify these
specific objects later:
>>> c1.MARKER = 'c1'
>>> c2.MARKER = 'c2'
>>> c3.MARKER = 'c3'
Now explicitly close c1 and c2:
>>> c1.close()
>>> c2.close()
Reaching into the internals, we can see that db's connection pool now has
two connections available for reuse, and knows about three connections in
all:
>>> pool = db._pools['']
>>> len(pool.available)
2
>>> len(pool.all)
3
Since we closed c2 last, it's at the top of the available stack, so will
be reused by the next open():
>>> c1 = db.open()
>>> c1.MARKER
'c2'
>>> len(pool.available), len(pool.all)
(1, 3)
>>> c3.close() # now the stack has c3 on top, then c1
>>> c2 = db.open()
>>> c2.MARKER
'c3'
>>> len(pool.available), len(pool.all)
(1, 3)
>>> c3 = db.open()
>>> c3.MARKER
'c1'
>>> len(pool.available), len(pool.all)
(0, 3)
What about the 3 in pool.all? We've seen that closing connections doesn't
reduce pool.all, and it would be bad if DB kept connections alive forever.
In fact pool.all is a "weak set" of connections -- it holds weak references
to connections. That alone doesn't keep connection objects alive. The
weak set allows DB's statistics methods to return info about connections
that are still alive.
>>> len(db.cacheDetailSize()) # one result for each connection's cache
3
If a connection object is abandoned (it becomes unreachable), then it
will vanish from pool.all automatically. However, connections are
involved in cycles, so exactly when a connection vanishes from pool.all
isn't predictable. It can be forced by running gc.collect():
>>> import gc
>>> dummy = gc.collect()
>>> len(pool.all)
3
>>> c3 = None
>>> dummy = gc.collect() # removes c3 from pool.all
>>> len(pool.all)
2
Note that c3 is really gone; in particular it didn't get added back to
the stack of available connections by magic:
>>> len(pool.available)
0
Nothing in that last block should have logged any msgs:
>>> handler.records
[]
If "too many" connections are open, then closing one may kick an older
closed one out of the available connection stack.
>>> st.close()
>>> st = Storage()
>>> db = DB(st, pool_size=3)
>>> conns = [db.open() for dummy in range(6)]
>>> len(handler.records) # 3 warnings for the "excess" connections
3
>>> pool = db._pools['']
>>> len(pool.available), len(pool.all)
(0, 6)
Let's mark them:
>>> for i, c in enumerate(conns):
... c.MARKER = i
Closing connections adds them to the stack:
>>> for i in range(3):
... conns[i].close()
>>> len(pool.available), len(pool.all)
(3, 6)
>>> del conns[:3] # leave the ones with MARKERs 3, 4 and 5
Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack):
>>> [c.MARKER for c in pool.available]
[0, 1, 2]
>>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all)
(3, 5)
>>> [c.MARKER for c in pool.available]
[1, 2, 3]
Similarly for the other two:
>>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all)
(3, 3)
>>> [c.MARKER for c in pool.available]
[3, 4, 5]
Reducing the pool size may also purge the oldest closed connections:
>>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all)
(2, 2)
>>> [c.MARKER for c in pool.available]
[4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the
first popped:
>>> c1 = db.open(); c2 = db.open()
>>> c1.MARKER, c2.MARKER
(5, 4)
>>> len(pool.available), len(pool.all)
(0, 2)
Clean up.
>>> st.close()
>>> handler.uninstall()
......@@ -414,8 +414,9 @@ class UserMethodTests(unittest.TestCase):
>>> len(hook.warnings)
1
>>> message, category, filename, lineno = hook.warnings[0]
>>> message
'The dt argument to cacheMinimize is ignored.'
>>> print message
This will be removed in ZODB 3.6:
cacheMinimize() dt= is ignored.
>>> category.__name__
'DeprecationWarning'
>>> hook.clear()
......@@ -434,8 +435,9 @@ class UserMethodTests(unittest.TestCase):
>>> len(hook.warnings)
2
>>> message, category, filename, lineno = hook.warnings[0]
>>> message
'cacheFullSweep is deprecated. Use cacheMinimize instead.'
>>> print message
This will be removed in ZODB 3.6:
cacheFullSweep is deprecated. Use cacheMinimize instead.
>>> category.__name__
'DeprecationWarning'
>>> message, category, filename, lineno = hook.warnings[1]
......
......@@ -23,6 +23,10 @@ import ZODB.FileStorage
from ZODB.tests.MinPO import MinPO
# Return total number of connections across all pools in a db._pools.
def nconn(pools):
return sum([len(pool.all) for pool in pools.values()])
class DBTests(unittest.TestCase):
def setUp(self):
......@@ -75,22 +79,22 @@ class DBTests(unittest.TestCase):
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools
pools = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2)
self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
self.assertEqual(nconn(pools), 3)
def _test_for_leak(self):
self.dowork()
......@@ -112,27 +116,27 @@ class DBTests(unittest.TestCase):
c12 = self.db.open('v1')
self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools
pools = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2)
self.assertEqual(nconn(pools), 2)
c12.close() # should leave pools alone
self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2)
self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
self.assertEqual(nconn(pools), 3)
def test_suite():
......
......@@ -243,9 +243,13 @@ class ZODBTests(unittest.TestCase):
self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2)
for msg, obj, filename, lineno in hook.warnings:
self.assert_(
msg.startswith("setLocalTransaction() is deprecated.") or
msg.startswith("getTransaction() is deprecated."))
self.assert_(msg in [
"This will be removed in ZODB 3.6:\n"
"setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.",
"This will be removed in ZODB 3.6:\n"
"getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead."])
finally:
conn1.close()
conn2.close()
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.testing.doctestunit import DocFileSuite
def test_suite():
return DocFileSuite("dbopen.txt")
......@@ -18,6 +18,8 @@ from struct import pack, unpack
from binascii import hexlify
import cPickle
import cStringIO
import weakref
import warnings
from persistent.TimeStamp import TimeStamp
......@@ -34,8 +36,27 @@ __all__ = ['z64',
'positive_id',
'get_refs',
'readable_tid_repr',
'WeakSet',
'DEPRECATED_ARGUMENT',
'deprecated36',
]
# A unique marker to give as the default value for a deprecated argument.
# The method should then do a
#
# if that_arg is not DEPRECATED_ARGUMENT:
# complain
#
# dance.
DEPRECATED_ARGUMENT = object()
# Raise DeprecationWarning, noting that the deprecated thing will go
# away in ZODB 3.6. Point to the caller of our caller (i.e., at the
# code using the deprecated thing).
def deprecated36(msg):
warnings.warn("This will be removed in ZODB 3.6:\n%s" % msg,
DeprecationWarning, stacklevel=3)
z64 = '\0'*8
# TODO The purpose of t32 is unclear. Code that uses it is usually
......@@ -164,3 +185,46 @@ def get_refs(pickle):
u.noload() # class info
u.noload() # instance state info
return refs
# A simple implementation of weak sets, supplying just enough of Python's
# sets.Set interface for our needs.
class WeakSet(object):
"""A set of objects that doesn't keep its elements alive.
The objects in the set must be weakly referencable.
The objects need not be hashable, and need not support comparison.
Two objects are considered to be the same iff their id()s are equal.
When the only references to an object are weak references (including
those from WeakSets), the object can be garbage-collected, and
will vanish from any WeakSets it may be a member of at that time.
"""
def __init__(self):
# Map id(obj) to obj. By using ids as keys, we avoid requiring
# that the elements be hashable or comparable.
self.data = weakref.WeakValueDictionary()
def __len__(self):
return len(self.data)
def __contains__(self, obj):
return id(obj) in self.data
# Same as a Set, add obj to the collection.
def add(self, obj):
self.data[id(obj)] = obj
# Same as a Set, remove obj from the collection, and raise
# KeyError if obj not in the collection.
def remove(self, obj):
del self.data[id(obj)]
# Return a list of all the objects in the collection.
# Because a weak dict is used internally, iteration
# is dicey (the underlying dict may change size during
# iteration, due to gc or activity from other threads).
# as_list() attempts to be safe.
def as_list(self):
return self.data.values()
......@@ -11,44 +11,16 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import doctest
import os
import sys
import unittest
import persistent.tests
from persistent import Persistent
from zope.testing.doctestunit import DocFileSuite
class P(Persistent):
def __init__(self):
self.x = 0
def inc(self):
self.x += 1
try:
DocFileSuite = doctest.DocFileSuite # >= Python 2.4.0a2
except AttributeError:
# <= Python 2.4.0a1
def DocFileSuite(path, globs=None):
# It's not entirely obvious how to connection this single string
# with unittest. For now, re-use the _utest() function that comes
# standard with doctest in Python 2.3. One problem is that the
# error indicator doesn't point to the line of the doctest file
# that failed.
path = os.path.join(persistent.tests.__path__[0], path)
source = open(path).read()
if globs is None:
globs = sys._getframe(1).f_globals
t = doctest.Tester(globs=globs)
def runit():
doctest._utest(t, path, source, path, 0)
f = unittest.FunctionTestCase(runit,
description="doctest from %s" % path)
suite = unittest.TestSuite()
suite.addTest(f)
return suite
def test_suite():
return DocFileSuite("persistent.txt", globs={"P": P})
......@@ -18,7 +18,6 @@ are associated with the right transaction.
"""
import thread
import weakref
from transaction._transaction import Transaction
......@@ -28,48 +27,16 @@ from transaction._transaction import Transaction
# practice not to explicitly close Connection objects, and keeping
# a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too).
# Therefore we use "weak sets" internally.
#
# Therefore we use "weak sets" internally. The implementation here
# implements just enough of Python's sets.Set interface for our needs.
class WeakSet(object):
"""A set of objects that doesn't keep its elements alive.
The objects in the set must be weakly referencable.
The objects need not be hashable, and need not support comparison.
Two objects are considered to be the same iff their id()s are equal.
When the only references to an object are weak references (including
those from WeakSets), the object can be garbage-collected, and
will vanish from any WeakSets it may be a member of at that time.
"""
def __init__(self):
# Map id(obj) to obj. By using ids as keys, we avoid requiring
# that the elements be hashable or comparable.
self.data = weakref.WeakValueDictionary()
# Same as a Set, add obj to the collection.
def add(self, obj):
self.data[id(obj)] = obj
# Same as a Set, remove obj from the collection, and raise
# KeyError if obj not in the collection.
def remove(self, obj):
del self.data[id(obj)]
# Return a list of all the objects in the collection.
# Because a weak dict is used internally, iteration
# is dicey (the underlying dict may change size during
# iteration, due to gc or activity from other threads).
# as_list() attempts to be safe.
def as_list(self):
return self.data.values()
# Obscure: because of the __init__.py maze, we can't import WeakSet
# at top level here.
class TransactionManager(object):
def __init__(self):
from ZODB.utils import WeakSet
self._txn = None
self._synchs = WeakSet()
......@@ -135,6 +102,8 @@ class ThreadTransactionManager(object):
del self._txns[tid]
def registerSynch(self, synch):
from ZODB.utils import WeakSet
tid = thread.get_ident()
ws = self._synchs.get(tid)
if ws is None:
......
......@@ -261,9 +261,10 @@ class Transaction(object):
self._resources.append(adapter)
def begin(self):
warnings.warn("Transaction.begin() should no longer be used; use "
"the begin() method of a transaction manager.",
DeprecationWarning, stacklevel=2)
from ZODB.utils import deprecated36
deprecated36("Transaction.begin() should no longer be used; use "
"the begin() method of a transaction manager.")
if (self._resources or
self._sub or
self._nonsub or
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment