DB.py 32.2 KB
Newer Older
Jim Fulton's avatar
Jim Fulton committed
1
##############################################################################
2
#
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
#
matt@zope.com's avatar
matt@zope.com committed
6
# This software is subject to the provisions of the Zope Public License,
Jim Fulton's avatar
Jim Fulton committed
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
matt@zope.com's avatar
matt@zope.com committed
8 9 10 11
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
Jim Fulton's avatar
Jim Fulton committed
13 14 15
##############################################################################
"""Database objects

16
$Id$"""
Jim Fulton's avatar
Jim Fulton committed
17

18 19
import warnings

Jim Fulton's avatar
Jim Fulton committed
20 21 22
import cPickle
import cStringIO
import sys
23
import threading
24
import logging
25 26
import datetime
import calendar
Jim Fulton's avatar
Jim Fulton committed
27
import time
Jim Fulton's avatar
Jim Fulton committed
28

Jeremy Hylton's avatar
Jeremy Hylton committed
29
from ZODB.broken import find_global
30
from ZODB.utils import z64
Jeremy Hylton's avatar
Jeremy Hylton committed
31
from ZODB.Connection import Connection
32
import ZODB.serialize
33 34

import transaction.weakset
35

36 37 38
from zope.interface import implements
from ZODB.interfaces import IDatabase

39
import BTrees.OOBTree
40 41
import transaction

42 43
from persistent.TimeStamp import TimeStamp

44

45
logger = logging.getLogger('ZODB.DB')
46

47
class AbstractConnectionPool(object):
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
    """Manage a pool of connections.

    CAUTION:  Methods should be called under the protection of a lock.
    This class does no locking of its own.

    There's no limit on the number of connections this can keep track of,
    but a warning is logged if there are more than pool_size active
    connections, and a critical problem if more than twice pool_size.

    New connections are registered via push().  This will log a message if
    "too many" connections are active.

    When a connection is explicitly closed, tell the pool via repush().
    That adds the connection to a stack of connections available for
    reuse, and throws away the oldest stack entries if the stack is too large.
    pop() pops this stack.

    When a connection is obtained via pop(), the pool holds only a weak
    reference to it thereafter.  It's not necessary to inform the pool
    if the connection goes away.  A connection handed out by pop() counts
    against pool_size only so long as it exists, and provided it isn't
    repush()'ed.  A weak reference is retained so that DB methods like
    connectionDebugInfo() can still gather statistics.
    """

73
    def __init__(self, size, timeout=None):
74
        # The largest # of connections we expect to see alive simultaneously.
75
        self._size = size
76

77 78
        # The minimum number of seconds that an available connection should
        # be kept, or None.
79
        self._timeout = timeout
80

81 82 83
        # A weak set of all connections we've seen.  A connection vanishes
        # from this set if pop() hands it out, it's not reregistered via
        # repush(), and it becomes unreachable.
84
        self.all = transaction.weakset.WeakSet()
85

86
    def setSize(self, size):
87 88 89 90 91
        """Change our belief about the expected maximum # of live connections.

        If the pool_size is smaller than the current value, this may discard
        the oldest available connections.
        """
92
        self._size = size
93 94
        self._reduce_size()

95 96 97
    def setTimeout(self, timeout):
        old = self._timeout
        self._timeout = timeout
98 99 100 101
        if timeout is not None and old != timeout and (
            old is None or old > timeout):
            self._reduce_size()

102 103 104 105 106 107
    def getSize(self):
        return self._size

    def getTimeout(self):
        return self._timeout

108
    timeout = property(getTimeout, lambda self, v: self.setTimeout(v))
109

110
    size = property(getSize, lambda self, v: self.setSize(v))
111 112 113

class ConnectionPool(AbstractConnectionPool):

Jim Fulton's avatar
Jim Fulton committed
114 115
    # XXX WTF, passing time.time() as a default?
    def __init__(self, size, timeout=time.time()):
116 117 118 119 120 121
        super(ConnectionPool, self).__init__(size, timeout)

        # A stack of connections available to hand out.  This is a subset
        # of self.all.  push() and repush() add to this, and may remove
        # the oldest available connections if the pool is too large.
        # pop() pops this stack.  There are never more than size entries
122 123
        # in this stack.
        self.available = []
124

125
    def push(self, c):
126 127 128 129 130
        """Register a new available connection.

        We must not know about c already. c will be pushed onto the available
        stack even if we're over the pool size limit.
        """
131
        assert c not in self.all
132
        assert c not in self.available
133 134
        self._reduce_size(strictly_less=True)
        self.all.add(c)
Jim Fulton's avatar
Jim Fulton committed
135
        self.available.append((time.time(), c))
136
        n = len(self.all)
137
        limit = self.size
138 139 140 141 142 143 144 145
        if n > limit:
            reporter = logger.warn
            if n > 2 * limit:
                reporter = logger.critical
            reporter("DB.open() has %s open connections with a pool_size "
                     "of %s", n, limit)

    def repush(self, c):
146 147 148 149 150
        """Reregister an available connection formerly obtained via pop().

        This pushes it on the stack of available connections, and may discard
        older available connections.
        """
151
        assert c in self.all
152
        assert c not in self.available
153
        self._reduce_size(strictly_less=True)
Jim Fulton's avatar
Jim Fulton committed
154
        self.available.append((time.time(), c))
155 156

    def _reduce_size(self, strictly_less=False):
157 158 159 160
        """Throw away the oldest available connections until we're under our
        target size (strictly_less=False, the default) or no more than that
        (strictly_less=True).
        """
Jim Fulton's avatar
Jim Fulton committed
161
        threshhold = time.time() - self.timeout
162
        target = self.size
163 164
        if strictly_less:
            target -= 1
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187

        available = self.available
        while (
            (len(available) > target)
            or
            (available and available[0][0] < threshhold)
            ):
            t, c = available.pop(0)
            self.all.remove(c)
            # While application code may still hold a reference to `c`,
            # there's little useful that can be done with this Connection
            # anymore. Its cache may be holding on to limited resources,
            # and we replace the cache with an empty one now so that we
            # don't have to wait for gc to reclaim it. Note that it's not
            # possible for DB.open() to return `c` again: `c` can never be
            # in an open state again.
            # TODO: Perhaps it would be better to break the reference
            # cycles between `c` and `c._cache`, so that refcounting
            # reclaims both right now. But if user code _does_ have a
            # strong reference to `c` now, breaking the cycle would not
            # reclaim `c` now, and `c` would be left in a user-visible
            # crazy state.
            c._resetCache()
188 189 190

    def reduce_size(self):
        self._reduce_size()
191 192

    def pop(self):
193 194 195 196 197 198
        """Pop an available connection and return it.

        Return None if none are available - in this case, the caller should
        create a new connection, register it via push(), and call pop() again.
        The caller is responsible for serializing this sequence.
        """
199 200
        result = None
        if self.available:
201
            _, result = self.available.pop()
202 203 204 205 206
            # Leave it in self.all, so we can still get at it for statistics
            # while it's alive.
            assert result in self.all
        return result

207 208 209 210 211 212 213 214 215
    def map(self, f):
        """For every live connection c, invoke f(c)."""
        self.all.map(f)

    def availableGC(self):
        """Perform garbage collection on available connections.
        
        If a connection is no longer viable because it has timed out, it is
        garbage collected."""
Jim Fulton's avatar
Jim Fulton committed
216
        threshhold = time.time() - self.timeout
217 218
        for t, c in list(self.available):
            if t < threshhold:
219 220 221 222 223 224
                del self.available[t]
                self.all.remove(c)
                c._resetCache()
            else:
                c.cacheGC()

225 226 227 228 229 230 231 232
class KeyedConnectionPool(AbstractConnectionPool):
    # this pool keeps track of keyed connections all together.  It makes
    # it possible to make assertions about total numbers of keyed connections.
    # The keys in this case are "before" TIDs, but this is used by other
    # packages as well.

    # see the comments in ConnectionPool for method descriptions.

Jim Fulton's avatar
Jim Fulton committed
233
    def __init__(self, size, timeout=time.time()):
234
        super(KeyedConnectionPool, self).__init__(size, timeout)
235 236 237 238 239 240 241 242 243 244 245
        self.pools = {}

    def setSize(self, v):
        self._size = v
        for pool in self.pools.values():
            pool.setSize(v)

    def setTimeout(self, v):
        self._timeout = v
        for pool in self.pools.values():
            pool.setTimeout(v)
246 247

    def push(self, c, key):
248 249 250 251
        pool = self.pools.get(key)
        if pool is None:
            pool = self.pools[key] = ConnectionPool(self.size, self.timeout)
        pool.push(c)
252 253

    def repush(self, c, key):
254
        self.pools[key].repush(c)
255 256

    def _reduce_size(self, strictly_less=False):
257 258 259 260
        for key, pool in list(self.pools.items()):
            pool._reduce_size(strictly_less)
            if not pool.all:
                del self.pools[key]
261 262 263 264 265

    def reduce_size(self):
        self._reduce_size()

    def pop(self, key):
266 267 268
        pool = self.pools.get(key)
        if pool is not None:
            return pool.pop()
269 270

    def map(self, f):
271 272
        for pool in self.pools.itervalues():
            pool.map(f)
273 274

    def availableGC(self):
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
        for key, pool in self.pools.items():
            pool.availableGC()
            if not pool.all:
                del self.pools[key]

    @property
    def test_all(self):
        result = set()
        for pool in self.pools.itervalues():
            result.update(pool.all)
        return frozenset(result)

    @property
    def test_available(self):
        result = []
        for pool in self.pools.itervalues():
            result.extend(pool.available)
        return tuple(result)
        
        
295

296 297 298 299 300 301 302 303 304 305 306 307 308
def toTimeStamp(dt):
    utc_struct = dt.utctimetuple()
    # if this is a leapsecond, this will probably fail.  That may be a good
    # thing: leapseconds are not really accounted for with serials.
    args = utc_struct[:5]+(utc_struct[5] + dt.microsecond/1000000.0,)
    return TimeStamp(*args)

def getTID(at, before):
    if at is not None:
        if before is not None:
            raise ValueError('can only pass zero or one of `at` and `before`')
        if isinstance(at, datetime.datetime):
            at = toTimeStamp(at)
309
        else:
310 311 312 313 314 315 316 317 318
            at = TimeStamp(at)
        before = repr(at.laterThan(at))
    elif before is not None:
        if isinstance(before, datetime.datetime):
            before = repr(toTimeStamp(before))
        else:
            before = repr(TimeStamp(before))
    return before

319

Jeremy Hylton's avatar
Jeremy Hylton committed
320
class DB(object):
Jim Fulton's avatar
Jim Fulton committed
321
    """The Object Database
322
    -------------------
Jim Fulton's avatar
Jim Fulton committed
323

324 325 326
    The DB class coordinates the activities of multiple database
    Connection instances.  Most of the work is done by the
    Connections created via the open method.
Jeremy Hylton's avatar
Jeremy Hylton committed
327

328 329
    The DB instance manages a pool of connections.  If a connection is
    closed, it is returned to the pool and its object cache is
Jeremy Hylton's avatar
Jeremy Hylton committed
330
    preserved.  A subsequent call to open() will reuse the connection.
331 332 333
    There is no hard limit on the pool size.  If more than `pool_size`
    connections are opened, a warning is logged, and if more than twice
    that many, a critical problem is logged.
Jeremy Hylton's avatar
Jeremy Hylton committed
334

335 336 337 338 339
    The class variable 'klass' is used by open() to create database
    connections.  It is set to Connection, but a subclass could override
    it to provide a different connection implementation.

    The database provides a few methods intended for application code
340 341
    -- open, close, undo, and pack -- and a large collection of
    methods for inspecting the database and its connections' caches.
342 343 344 345 346 347 348 349

    :Cvariables:
      - `klass`: Class used by L{open} to create database connections

    :Groups:
      - `User Methods`: __init__, open, close, undo, pack, classFactory
      - `Inspection Methods`: getName, getSize, objectCount,
        getActivityMonitor, setActivityMonitor
350
      - `Connection Pool Methods`: getPoolSize, getHistoricalPoolSize,
351 352
        setPoolSize, setHistoricalPoolSize, getHistoricalTimeout,
        setHistoricalTimeout
353 354 355
      - `Transaction Methods`: invalidate
      - `Other Methods`: lastTransaction, connectionDebugInfo
      - `Cache Inspection Methods`: cacheDetail, cacheExtremeDetail,
356
        cacheFullSweep, cacheLastGCTime, cacheMinimize, cacheSize,
357 358
        cacheDetailSize, getCacheSize, getHistoricalCacheSize, setCacheSize,
        setHistoricalCacheSize
Jim Fulton's avatar
Jim Fulton committed
359
    """
360
    implements(IDatabase)
361

362
    klass = Connection  # Class to use for connections
363
    _activity_monitor = next = previous = None
Jim Fulton's avatar
Jim Fulton committed
364 365 366 367

    def __init__(self, storage,
                 pool_size=7,
                 cache_size=400,
368
                 cache_size_bytes=0,
369 370
                 historical_pool_size=3,
                 historical_cache_size=1000,
371
                 historical_cache_size_bytes=0,
372
                 historical_timeout=300,
373 374
                 database_name='unnamed',
                 databases=None,
Jim Fulton's avatar
Jim Fulton committed
375 376 377
                 ):
        """Create an object database.

378 379
        :Parameters:
          - `storage`: the storage used by the database, e.g. FileStorage
380
          - `pool_size`: expected maximum number of open connections
381
          - `cache_size`: target size of Connection object cache
382 383 384
          - `cache_size_bytes`: target size measured in total estimated size
               of objects in the Connection object cache.
               "0" means unlimited.
385 386
          - `historical_pool_size`: expected maximum number of total
            historical connections
387 388
          - `historical_cache_size`: target size of Connection object cache for
            historical (`at` or `before`) connections
389 390
          - `historical_cache_size_bytes` -- similar to `cache_size_bytes` for
            the historical connection.
391 392
          - `historical_timeout`: minimum number of seconds that
            an unused historical connection will be kept, or None.
Jim Fulton's avatar
Jim Fulton committed
393
        """
394 395 396 397
        if isinstance(storage, basestring):
            from ZODB import FileStorage
            storage = ZODB.FileStorage.FileStorage(storage)

398 399 400 401
        # Allocate lock.
        x = threading.RLock()
        self._a = x.acquire
        self._r = x.release
Jim Fulton's avatar
Jim Fulton committed
402

403 404 405 406
        # pools and cache sizes
        self.pool = ConnectionPool(pool_size)
        self.historical_pool = KeyedConnectionPool(historical_pool_size,
                                                   historical_timeout)
Jeremy Hylton's avatar
Jeremy Hylton committed
407
        self._cache_size = cache_size
408
        self._cache_size_bytes = cache_size_bytes
409
        self._historical_cache_size = historical_cache_size
410
        self._historical_cache_size_bytes = historical_cache_size_bytes
Jeremy Hylton's avatar
Jeremy Hylton committed
411

412
        # Setup storage
413
        self.storage = storage
414 415 416 417 418
        self.references = ZODB.serialize.referencesf
        try:
            storage.registerDB(self)
        except TypeError:
            storage.registerDB(self, None) # Backward compat
419

420
        if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
421 422
            warnings.warn(
                "Storage doesn't have a tpc_vote and this violates "
423
                "the storage API. Violently monkeypatching in a do-nothing "
424 425
                "tpc_vote.",
                DeprecationWarning, 2)
426
            storage.tpc_vote = lambda *args: None
427

428
        try:
429
            storage.load(z64, '')
430 431
        except KeyError:
            # Create the database's root in the storage if it doesn't exist
432 433
            from persistent.mapping import PersistentMapping
            root = PersistentMapping()
434 435 436 437 438 439
            # Manually create a pickle for the root to put in the storage.
            # The pickle must be in the special ZODB format.
            file = cStringIO.StringIO()
            p = cPickle.Pickler(file, 1)
            p.dump((root.__class__, None))
            p.dump(root.__getstate__())
440
            t = transaction.Transaction()
441
            t.description = 'initial database creation'
442
            storage.tpc_begin(t)
443
            storage.store(z64, None, file.getvalue(), '', t)
444 445 446
            storage.tpc_vote(t)
            storage.tpc_finish(t)

447 448 449 450 451 452 453 454 455 456
        # Multi-database setup.
        if databases is None:
            databases = {}
        self.databases = databases
        self.database_name = database_name
        if database_name in databases:
            raise ValueError("database_name %r already in databases" %
                             database_name)
        databases[database_name] = self

457 458
        self._setupUndoMethods()
        self.history = storage.history
Jim Fulton's avatar
Jim Fulton committed
459

460
    def _setupUndoMethods(self):
461
        storage = self.storage
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
        try:
            self.supportsUndo = storage.supportsUndo
        except AttributeError:
            self.supportsUndo = lambda : False

        if self.supportsUndo():
            self.undoLog = storage.undoLog
            if hasattr(storage, 'undoInfo'):
                self.undoInfo = storage.undoInfo
        else:
            self.undoLog = self.undoInfo = lambda *a,**k: ()
            def undo(*a, **k):
                raise NotImplementedError
            self.undo = undo

477 478 479 480
    @property
    def _storage(self):      # Backward compatibility
        return self.storage

481
    # This is called by Connection.close().
482
    def _returnToPool(self, connection):
483 484 485 486 487
        """Return a connection to the pool.

        connection._db must be self on entry.
        """

Jim Fulton's avatar
Jim Fulton committed
488 489
        self._a()
        try:
490
            assert connection._db is self
491
            connection._opened = None
492

493 494 495
            am = self._activity_monitor
            if am is not None:
                am.closedConnection(connection)
496

497 498 499 500
            if connection.before:
                self.historical_pool.repush(connection, connection.before)
            else:
                self.pool.repush(connection)
501 502
        finally:
            self._r()
503

504 505
    def _connectionMap(self, f):
        """Call f(c) for all connections c in all pools, live and historical.
506
        """
Jim Fulton's avatar
Jim Fulton committed
507 508
        self._a()
        try:
509 510
            self.pool.map(f)
            self.historical_pool.map(f)
511 512
        finally:
            self._r()
Jim Fulton's avatar
Jim Fulton committed
513 514 515 516

    def cacheDetail(self):
        """Return information on objects in the various caches

517 518
        Organized by class.
        """
Jim Fulton's avatar
Jim Fulton committed
519

520
        detail = {}
521
        def f(con, detail=detail):
Jim Fulton's avatar
Jim Fulton committed
522
            for oid, ob in con._cache.items():
523 524
                module = getattr(ob.__class__, '__module__', '')
                module = module and '%s.' % module or ''
525
                c = "%s%s" % (module, ob.__class__.__name__)
526
                if c in detail:
527
                    detail[c] += 1
528 529
                else:
                    detail[c] = 1
530

Jim Fulton's avatar
Jim Fulton committed
531
        self._connectionMap(f)
532
        detail = detail.items()
Jim Fulton's avatar
Jim Fulton committed
533 534 535 536
        detail.sort()
        return detail

    def cacheExtremeDetail(self):
537
        detail = []
538 539
        conn_no = [0]  # A mutable reference to a counter
        def f(con, detail=detail, rc=sys.getrefcount, conn_no=conn_no):
540
            conn_no[0] += 1
541
            cn = conn_no[0]
542
            for oid, ob in con._cache_items():
543 544 545
                id = ''
                if hasattr(ob, '__dict__'):
                    d = ob.__dict__
Jim Fulton's avatar
Jim Fulton committed
546
                    if d.has_key('id'):
547
                        id = d['id']
Jim Fulton's avatar
Jim Fulton committed
548
                    elif d.has_key('__name__'):
549
                        id = d['__name__']
550 551

                module = getattr(ob.__class__, '__module__', '')
552 553 554 555 556 557 558 559 560 561 562 563
                module = module and ('%s.' % module) or ''

                # What refcount ('rc') should we return?  The intent is
                # that we return the true Python refcount, but as if the
                # cache didn't exist.  This routine adds 3 to the true
                # refcount:  1 for binding to name 'ob', another because
                # ob lives in the con._cache_items() list we're iterating
                # over, and calling sys.getrefcount(ob) boosts ob's
                # count by 1 too.  So the true refcount is 3 less than
                # sys.getrefcount(ob) returns.  But, in addition to that,
                # the cache holds an extra reference on non-ghost objects,
                # and we also want to pretend that doesn't exist.
Jim Fulton's avatar
Jim Fulton committed
564
                detail.append({
565 566 567
                    'conn_no': cn,
                    'oid': oid,
                    'id': id,
568
                    'klass': "%s%s" % (module, ob.__class__.__name__),
569
                    'rc': rc(ob) - 3 - (ob._p_changed is not None),
570 571
                    'state': ob._p_changed,
                    #'references': con.references(oid),
Jim Fulton's avatar
Jim Fulton committed
572 573 574 575 576
                    })

        self._connectionMap(f)
        return detail

577 578
    def cacheFullSweep(self):
        self._connectionMap(lambda c: c._cache.full_sweep())
Jim Fulton's avatar
Jim Fulton committed
579 580

    def cacheLastGCTime(self):
581
        m = [0]
Jim Fulton's avatar
Jim Fulton committed
582
        def f(con, m=m):
583 584 585
            t = con._cache.cache_last_gc_time
            if t > m[0]:
                m[0] = t
Jim Fulton's avatar
Jim Fulton committed
586 587 588 589

        self._connectionMap(f)
        return m[0]

590 591
    def cacheMinimize(self):
        self._connectionMap(lambda c: c._cache.minimize())
Jim Fulton's avatar
Jim Fulton committed
592 593

    def cacheSize(self):
594
        m = [0]
Jim Fulton's avatar
Jim Fulton committed
595
        def f(con, m=m):
596
            m[0] += con._cache.cache_non_ghost_count
Jim Fulton's avatar
Jim Fulton committed
597 598 599 600

        self._connectionMap(f)
        return m[0]

601
    def cacheDetailSize(self):
602
        m = []
603
        def f(con, m=m):
604 605 606
            m.append({'connection': repr(con),
                      'ngsize': con._cache.cache_non_ghost_count,
                      'size': len(con._cache)})
607 608 609 610
        self._connectionMap(f)
        m.sort()
        return m

Jeremy Hylton's avatar
Jeremy Hylton committed
611
    def close(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
612 613 614 615 616 617 618 619 620 621 622 623
        """Close the database and its underlying storage.

        It is important to close the database, because the storage may
        flush in-memory data structures to disk when it is closed.
        Leaving the storage open with the process exits can cause the
        next open to be slow.

        What effect does closing the database have on existing
        connections?  Technically, they remain open, but their storage
        is closed, so they stop behaving usefully.  Perhaps close()
        should also close all the Connections.
        """
624
        self.storage.close()
Jim Fulton's avatar
Jim Fulton committed
625

626 627
    def getCacheSize(self):
        return self._cache_size
Jim Fulton's avatar
Jim Fulton committed
628

629 630 631
    def getCacheSizeBytes(self):
        return self._cache_size_bytes

632
    def lastTransaction(self):
633
        return self.storage.lastTransaction()
634

635
    def getName(self):
636
        return self.storage.getName()
Jim Fulton's avatar
Jim Fulton committed
637

638
    def getPoolSize(self):
639
        return self.pool.size
640

641
    def getSize(self):
642
        return self.storage.getSize()
Jim Fulton's avatar
Jim Fulton committed
643

644
    def getHistoricalCacheSize(self):
645 646
        return self._historical_cache_size

647 648 649
    def getHistoricalCacheSizeBytes(self):
        return self._historical_cache_size_bytes

650
    def getHistoricalPoolSize(self):
651
        return self.historical_pool.size
652

653
    def getHistoricalTimeout(self):
654
        return self.historical_pool.timeout
655

656
    def invalidate(self, tid, oids, connection=None, version=''):
Jim Fulton's avatar
Jim Fulton committed
657 658 659 660 661 662 663
        """Invalidate references to a given oid.

        This is used to indicate that one of the connections has committed a
        change to the object.  The connection commiting the change should be
        passed in to prevent useless (but harmless) messages to the
        connection.
        """
664 665
        # Storages, esp. ZEO tests, need the version argument still. :-/
        assert version==''
666 667
        # Notify connections.
        def inval(c):
668
            if c is not connection:
669 670
                c.invalidate(tid, oids)
        self._connectionMap(inval)
Jim Fulton's avatar
Jim Fulton committed
671

672 673
    def invalidateCache(self):
        """Invalidate each of the connection caches
674
        """
675 676
        self._connectionMap(lambda c: c.invalidateCache())

677
    def objectCount(self):
678
        return len(self.storage)
679

680
    def open(self, transaction_manager=None, at=None, before=None):
681
        """Return a database Connection for use by application code.
Jim Fulton's avatar
Jim Fulton committed
682

683
        Note that the connection pool is managed as a stack, to
684
        increase the likelihood that the connection's stack will
685
        include useful objects.
686 687

        :Parameters:
688
          - `transaction_manager`: transaction manager to use.  None means
689 690 691 692 693 694 695 696
            use the default transaction manager.
          - `at`: a datetime.datetime or 8 character transaction id of the
            time to open the database with a read-only connection.  Passing
            both `at` and `before` raises a ValueError, and passing neither
            opens a standard writable transaction of the newest state.
            A timezone-naive datetime.datetime is treated as a UTC value.
          - `before`: like `at`, but opens the readonly state before the
            tid or datetime.
Jim Fulton's avatar
Jim Fulton committed
697
        """
698 699 700
        # `at` is normalized to `before`, since we use storage.loadBefore
        # as the underlying implementation of both.
        before = getTID(at, before)
701 702 703 704 705
        if (before is not None and
            before > self.lastTransaction() and
            before > getTID(self.lastTransaction(), None)):
            raise ValueError(
                'cannot open an historical connection in the future.')
706

Jim Fulton's avatar
Jim Fulton committed
707 708
        self._a()
        try:
709
            # result <- a connection
710 711 712
            if before is not None:
                result = self.historical_pool.pop(before)
                if result is None:
713 714 715 716 717
                    c = self.klass(self,
                                   self._historical_cache_size,
                                   before,
                                   self._historical_cache_size_bytes,
                                   )
718 719 720 721 722
                    self.historical_pool.push(c, before)
                    result = self.historical_pool.pop(before)
            else:
                result = self.pool.pop()
                if result is None:
723 724 725 726 727
                    c = self.klass(self,
                                   self._cache_size,
                                   None,
                                   self._cache_size_bytes,
                                   )
728 729
                    self.pool.push(c)
                    result = self.pool.pop()
730
            assert result is not None
Jim Fulton's avatar
Jim Fulton committed
731

732
            # open the connection.
733
            result.open(transaction_manager)
734

735
            # A good time to do some cache cleanup.
736
            # (note we already have the lock)
737 738
            self.pool.availableGC()
            self.historical_pool.availableGC()
739 740

            return result
Jim Fulton's avatar
Jim Fulton committed
741

742 743
        finally:
            self._r()
744 745

    def connectionDebugInfo(self):
746
        result = []
Jim Fulton's avatar
Jim Fulton committed
747
        t = time.time()
748

Tim Peters's avatar
Brrr.  
Tim Peters committed
749
        def get_info(c):
750
            # `result`, `time` and `before` are lexically inherited.
Tim Peters's avatar
Brrr.  
Tim Peters committed
751
            o = c._opened
752
            d = c.getDebugInfo()
Tim Peters's avatar
Brrr.  
Tim Peters committed
753 754 755 756 757 758 759
            if d:
                if len(d) == 1:
                    d = d[0]
            else:
                d = ''
            d = "%s (%s)" % (d, len(c._cache))

760
            # output UTC time with the standard Z time zone indicator
Tim Peters's avatar
Brrr.  
Tim Peters committed
761
            result.append({
762
                'opened': o and ("%s (%.2fs)" % (
763
                    time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(o)),
764
                    t-o)),
Tim Peters's avatar
Brrr.  
Tim Peters committed
765
                'info': d,
Jim Fulton's avatar
Jim Fulton committed
766
                'before': c.before,
Tim Peters's avatar
Brrr.  
Tim Peters committed
767 768
                })

Jim Fulton's avatar
Jim Fulton committed
769
        self._connectionMap(get_info)
770
        return result
771

772 773
    def getActivityMonitor(self):
        return self._activity_monitor
774

775
    def pack(self, t=None, days=0):
Jeremy Hylton's avatar
Jeremy Hylton committed
776 777 778 779 780 781 782 783 784
        """Pack the storage, deleting unused object revisions.

        A pack is always performed relative to a particular time, by
        default the current time.  All object revisions that are not
        reachable as of the pack time are deleted from the storage.

        The cost of this operation varies by storage, but it is
        usually an expensive operation.

785 786
        There are two optional arguments that can be used to set the
        pack time: t, pack time in seconds since the epcoh, and days,
787
        the number of days to subtract from t or from the current
788
        time if t is not specified.
Jeremy Hylton's avatar
Jeremy Hylton committed
789
        """
790
        if t is None:
Jim Fulton's avatar
Jim Fulton committed
791
            t = time.time()
792 793
        t -= days * 86400
        try:
794
            self.storage.pack(t, self.references)
Jim Fulton's avatar
Jim Fulton committed
795
        except:
796
            logger.error("packing", exc_info=True)
Jim Fulton's avatar
Jim Fulton committed
797
            raise
798

799 800
    def setActivityMonitor(self, am):
        self._activity_monitor = am
801

802
    def classFactory(self, connection, modulename, globalname):
803
        # Zope will rebind this method to arbitrary user code at runtime.
804
        return find_global(modulename, globalname)
805

Tim Peters's avatar
Brrr.  
Tim Peters committed
806
    def setCacheSize(self, size):
807 808
        self._a()
        try:
Tim Peters's avatar
Brrr.  
Tim Peters committed
809
            self._cache_size = size
810 811 812
            def setsize(c):
                c._cache.cache_size = size
            self.pool.map(setsize)
813 814
        finally:
            self._r()
815

816 817 818 819 820 821 822 823 824 825
    def setCacheSizeBytes(self, size):
        self._a()
        try:
            self._cache_size_bytes = size
            def setsize(c):
                c._cache.cache_size_bytes = size
            self.pool.map(setsize)
        finally:
            self._r()

826
    def setHistoricalCacheSize(self, size):       
827 828
        self._a()
        try:
829
            self._historical_cache_size = size
Tim Peters's avatar
Brrr.  
Tim Peters committed
830 831
            def setsize(c):
                c._cache.cache_size = size
832
            self.historical_pool.map(setsize)
833 834 835
        finally:
            self._r()

836 837 838 839 840 841 842 843 844 845
    def setHistoricalCacheSizeBytes(self, size):       
        self._a()
        try:
            self._historical_cache_size_bytes = size
            def setsize(c):
                c._cache.cache_size_bytes = size
            self.historical_pool.map(setsize)
        finally:
            self._r()

846
    def setPoolSize(self, size):
847 848 849 850 851
        self._a()
        try:
            self.pool.size = size
        finally:
            self._r()
852

853
    def setHistoricalPoolSize(self, size):
854 855
        self._a()
        try:
856
            self.historical_pool.size = size
857 858
        finally:
            self._r()
Jim Fulton's avatar
Jim Fulton committed
859

860 861 862
    def setHistoricalTimeout(self, timeout):
        self._a()
        try:
863
            self.historical_pool.timeout = timeout
864 865 866
        finally:
            self._r()

867
    def undo(self, id, txn=None):
868
        """Undo a transaction identified by id.
Jeremy Hylton's avatar
Jeremy Hylton committed
869 870 871 872 873 874

        A transaction can be undone if all of the objects involved in
        the transaction were not modified subsequently, if any
        modifications can be resolved by conflict resolution, or if
        subsequent changes resulted in the same object state.

875 876 877
        The value of id should be generated by calling undoLog()
        or undoInfo().  The value of id is not the same as a
        transaction id used by other methods; it is unique to undo().
Jeremy Hylton's avatar
Jeremy Hylton committed
878

879 880
        :Parameters:
          - `id`: a storage-specific transaction identifier
881
          - `txn`: transaction context to use for undo().
882
            By default, uses the current transaction.
Jeremy Hylton's avatar
Jeremy Hylton committed
883
        """
884 885 886
        if txn is None:
            txn = transaction.get()
        txn.register(TransactionalUndo(self, id))
887

888

889 890 891
resource_counter_lock = threading.Lock()
resource_counter = 0

892
class ResourceManager(object):
893
    """Transaction participation for an undo resource."""
894

895 896 897 898 899
    # XXX This implementation is broken.  Subclasses invalidate oids
    # in their commit calls. Invalidations should not be sent until
    # tpc_finish is called.  In fact, invalidations should be sent to
    # the db *while* tpc_finish is being called on the storage.

900 901 902
    def __init__(self, db):
        self._db = db
        # Delegate the actual 2PC methods to the storage
903 904 905
        self.tpc_vote = self._db.storage.tpc_vote
        self.tpc_finish = self._db.storage.tpc_finish
        self.tpc_abort = self._db.storage.tpc_abort
906

907 908 909 910 911 912 913 914 915 916 917 918
        # Get a number from a simple thread-safe counter, then
        # increment it, for the purpose of sorting ResourceManagers by
        # creation order.  This ensures that multiple ResourceManagers
        # within a transaction commit in a predictable sequence.
        resource_counter_lock.acquire()
        try:
            global resource_counter
            self._count = resource_counter
            resource_counter += 1
        finally:
            resource_counter_lock.release()

919
    def sortKey(self):
920
        return "%s:%016x" % (self._db.storage.sortKey(), self._count)
921

922 923
    def tpc_begin(self, txn, sub=False):
        if sub:
924
            raise ValueError("doesn't support sub-transactions")
925
        self._db.storage.tpc_begin(txn)
926

927 928 929
    # The object registers itself with the txn manager, so the ob
    # argument to the methods below is self.

930
    def abort(self, obj, txn):
931
        raise NotImplementedError
932

933
    def commit(self, obj, txn):
934
        raise NotImplementedError
935

936
class TransactionalUndo(ResourceManager):
937

938 939 940
    def __init__(self, db, tid):
        super(TransactionalUndo, self).__init__(db)
        self._tid = tid
941

942
    def commit(self, ob, t):
943
        # XXX see XXX in ResourceManager
944
        tid, oids = self._db.storage.undo(self._tid, t)
Jeremy Hylton's avatar
Jeremy Hylton committed
945
        self._db.invalidate(tid, dict.fromkeys(oids, 1))