Connection.py 52.4 KB
Newer Older
Jim Fulton's avatar
Jim Fulton committed
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
3
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
matt@zope.com's avatar
matt@zope.com committed
6
# This software is subject to the provisions of the Zope Public License,
Jim Fulton's avatar
Jim Fulton committed
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
matt@zope.com's avatar
matt@zope.com committed
8 9 10 11
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
Jim Fulton's avatar
Jim Fulton committed
13 14 15
##############################################################################
"""Database connection support

16
$Id$"""
Jeremy Hylton's avatar
Jeremy Hylton committed
17

18
import logging
19
import sys
20
import tempfile
21
import threading
22
import warnings
23
import os
24
import time
25

26 27
from persistent import PickleCache

28
# interfaces
29 30
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
31
from ZODB.interfaces import IBlobStorage
32
from ZODB.interfaces import IMVCCStorage
Jim Fulton's avatar
Jim Fulton committed
33
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
34 35 36
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
37
from zope.interface import implementer
38

39 40
import transaction

41
import ZODB
42
from ZODB.blob import SAVEPOINT_SUFFIX
43 44
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
45 46 47
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
48
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
49
from ZODB.POSException import POSKeyError
Jim Fulton's avatar
Jim Fulton committed
50
from ZODB.serialize import ObjectWriter, ObjectReader
51
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
52
from ZODB import utils
53
import six
54

55 56
global_reset_counter = 0

57

58 59
def resetCaches():
    """Causes all connection caches to be reset as connections are reopened.
Sidnei da Silva's avatar
Sidnei da Silva committed
60

61 62 63 64 65 66
    Zope's refresh feature uses this.  When you reload Python modules,
    instances of classes continue to use the old class definitions.
    To use the new code immediately, the refresh feature asks ZODB to
    clear caches by calling resetCaches().  When the instances are
    loaded by subsequent connections, they will use the new class
    definitions.
Barry Warsaw's avatar
Typos  
Barry Warsaw committed
67
    """
68 69
    global global_reset_counter
    global_reset_counter += 1
Shane Hathaway's avatar
Shane Hathaway committed
70

71 72 73 74 75 76

def className(obj):
    cls = type(obj)
    return "%s.%s" % (cls.__module__, cls.__name__)


77 78 79 80
@implementer(IConnection,
             ISavepointDataManager,
             IPersistentDataManager,
             ISynchronizer)
81
class Connection(ExportImport, object):
82
    """Connection to ZODB for loading and storing objects."""
83

84

85

86
    _code_timestamp = 0
Jim Fulton's avatar
Jim Fulton committed
87

88 89
    ##########################################################################
    # Connection methods, ZODB.IConnection
90

91
    def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
92
        """Create a new Connection."""
93

94 95 96
        self._log = logging.getLogger('ZODB.Connection')
        self._debug_info = ()

97
        self._db = db
98
        self.large_record_size = db.large_record_size
Jim Fulton's avatar
Jim Fulton committed
99

100 101
        # historical connection
        self.before = before
Jim Fulton's avatar
Jim Fulton committed
102

103 104 105
        # Multi-database support
        self.connections = {self._db.database_name: self}

106 107 108 109 110 111 112 113 114
        storage = db.storage
        if IMVCCStorage.providedBy(storage):
            # Use a connection-specific storage instance.
            self._mvcc_storage = True
            storage = storage.new_instance()
        else:
            self._mvcc_storage = False

        self._normal_storage = self._storage = storage
Jim Fulton's avatar
Jim Fulton committed
115
        self.new_oid = db.new_oid
116 117
        self._savepoint_storage = None

118 119
        # Do we need to join a txn manager?
        self._needs_to_join = True
Florent Guillaume's avatar
Florent Guillaume committed
120
        self.transaction_manager = None
121
        self.opened = None # time.time() when DB.open() opened us
122

123 124 125 126 127 128 129
        self._reset_counter = global_reset_counter
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored

        # Cache which can ghostify (forget the state of) objects not
        # recently used. Its API is roughly that of a dict, with
        # additional gc-related and invalidation-related methods.
130
        self._cache = PickleCache(self, cache_size, cache_size_bytes)
131 132 133 134 135

        # The pre-cache is used by get to avoid infinite loops when
        # objects immediately load their state whern they get their
        # persistent data set.
        self._pre_cache = {}
136

137 138 139 140 141 142 143
        # List of all objects (not oids) registered as modified by the
        # persistence machinery, or by add(), or whose access caused a
        # ReadConflictError (just to be able to clean them up from the
        # cache on abort with the other modified objects). All objects
        # of this list are either in _cache or in _added.
        self._registered_objects = []

144 145 146 147
        # ids and serials of objects for which readCurrent was called
        # in a transaction.
        self._readCurrent = {}

148 149 150 151
        # Dict of oid->obj added explicitly through add(). Used as a
        # preliminary cache until commit time when objects are all moved
        # to the real _cache. The objects are moved to _creating at
        # commit time.
152
        self._added = {}
153 154 155

        # During commit this is turned into a list, which receives
        # objects added as a side-effect of storing a modified object.
Jeremy Hylton's avatar
Jeremy Hylton committed
156
        self._added_during_commit = None
157 158 159 160

        # During commit, all objects go to either _modified or _creating:

        # Dict of oid->flag of new objects (without serial), either
161
        # added by add() or implicitly added (discovered by the
162 163 164 165
        # serializer during commit). The flag is True for implicit
        # adding. Used during abort to remove created objects from the
        # _cache, and by persistent_id to check that a new object isn't
        # reachable from multiple databases.
166
        self._creating = {}
167

168 169
        # List of oids of modified objects, which have to be invalidated
        # in the cache on abort and in other connections on finish.
170 171
        self._modified = []

172 173 174 175 176 177
        # _invalidated queues invalidate messages delivered from the DB
        # _inv_lock prevents one thread from modifying the set while
        # another is processing invalidations.  All the invalidations
        # from a single transaction should be applied atomically, so
        # the lock must be held when reading _invalidated.

178
        # It sucks that we have to hold the lock to read _invalidated.
179 180 181 182
        # Normally, _invalidated is written by calling dict.update, which
        # will execute atomically by virtue of the GIL.  But some storage
        # might generate oids where hash or compare invokes Python code.  In
        # that case, the GIL can't save us.
183 184 185
        # Note:  since that was written, it was officially declared that the
        # type of an oid is str.  TODO:  remove the related now-unnecessary
        # critical sections (if any -- this needs careful thought).
186

187
        self._inv_lock = threading.Lock()
188
        self._invalidated = set()
189

190 191 192
        # Flag indicating whether the cache has been invalidated:
        self._invalidatedCache = False

193 194 195 196 197 198 199
        # We intend to prevent committing a transaction in which
        # ReadConflictError occurs.  _conflicts is the set of oids that
        # experienced ReadConflictError.  Any time we raise ReadConflictError,
        # the oid should be added to this set, and we should be sure that the
        # object is registered.  Because it's registered, Connection.commit()
        # will raise ReadConflictError again (because the oid is in
        # _conflicts).
200
        self._conflicts = {}
201

202 203 204 205
        # _txn_time stores the upper bound on transactions visible to
        # this connection. That is, all object revisions must be
        # written before _txn_time. If it is None, then the current
        # revisions are acceptable.
206
        self._txn_time = None
207

208 209 210 211 212 213
        # To support importFile(), implemented in the ExportImport base
        # class, we need to run _importDuringCommit() from our commit()
        # method.  If _import is not None, it is a two-tuple of arguments
        # to pass to _importDuringCommit().
        self._import = None

214 215
        self._reader = ObjectReader(self, self._cache, self._db.classFactory)

216

217
    def add(self, obj):
218
        """Add a new object 'obj' to the database and assign it an oid."""
219
        if self.opened is None:
220
            raise ConnectionStateError("The database connection is closed")
221

222 223 224 225 226 227
        marker = object()
        oid = getattr(obj, "_p_oid", marker)
        if oid is marker:
            raise TypeError("Only first-class persistent objects may be"
                            " added to a Connection.", obj)
        elif obj._p_jar is None:
228
            assert obj._p_oid is None
Jim Fulton's avatar
Jim Fulton committed
229
            oid = obj._p_oid = self.new_oid()
230 231 232
            obj._p_jar = self
            if self._added_during_commit is not None:
                self._added_during_commit.append(obj)
233
            self._register(obj)
234 235 236 237
            # Add to _added after calling register(), so that _added
            # can be used as a test for whether the object has been
            # registered with the transaction.
            self._added[oid] = obj
238 239 240
        elif obj._p_jar is not self:
            raise InvalidObjectReference(obj, obj._p_jar)

241 242
    def get(self, oid):
        """Return the persistent object with oid 'oid'."""
243
        if self.opened is None:
244
            raise ConnectionStateError("The database connection is closed")
Shane Hathaway's avatar
Shane Hathaway committed
245

246 247 248 249
        obj = self._cache.get(oid, None)
        if obj is not None:
            return obj
        obj = self._added.get(oid, None)
250
        if obj is not None:
Jim Fulton's avatar
Jim Fulton committed
251
            return obj
252
        obj = self._pre_cache.get(oid, None)
253 254
        if obj is not None:
            return obj
255

256
        p, serial = self._storage.load(oid, '')
257
        obj = self._reader.getGhost(p)
258

259 260
        # Avoid infiniate loop if obj tries to load its state before
        # it is added to the cache and it's state refers to it.
Jim Fulton's avatar
Jim Fulton committed
261 262
        # (This will typically be the case for non-ghostifyable objects,
        # like persistent caches.)
263
        self._pre_cache[oid] = obj
Jim Fulton's avatar
Jim Fulton committed
264
        self._cache.new_ghost(oid, obj)
265
        self._pre_cache.pop(oid)
266
        return obj
Jim Fulton's avatar
Jim Fulton committed
267

268
    def cacheMinimize(self):
Jim Fulton's avatar
Jim Fulton committed
269 270
        """Deactivate all unmodified objects in the cache.
        """
271
        for connection in six.itervalues(self.connections):
Jim Fulton's avatar
Jim Fulton committed
272
            connection._cache.minimize()
273

274
    # TODO: we should test what happens when cacheGC is called mid-transaction.
275
    def cacheGC(self):
Jim Fulton's avatar
Jim Fulton committed
276 277
        """Reduce cache size to target size.
        """
278
        for connection in six.itervalues(self.connections):
Jim Fulton's avatar
Jim Fulton committed
279
            connection._cache.incrgc()
280

281
    __onCloseCallbacks = None
282
    def onCloseCallback(self, f):
283
        """Register a callable, f, to be called by close()."""
284 285 286
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)
287

288
    def close(self, primary=True):
289
        """Close the Connection."""
290 291 292 293 294
        if not self._needs_to_join:
            # We're currently joined to a transaction.
            raise ConnectionStateError("Cannot close a connection joined to "
                                       "a transaction")

295 296
        if self._cache is not None:
            self._cache.incrgc() # This is a good time to do some GC
Jim Fulton's avatar
Jim Fulton committed
297

298
        # Call the close callbacks.
299 300
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
301 302 303 304
                try:
                    f()
                except: # except what?
                    f = getattr(f, 'im_self', f)
305
                    self._log.exception("Close callback failed for %s", f)
306
            self.__onCloseCallbacks = None
307

308
        self._debug_info = ()
309

310
        if self.opened:
311 312
            self.transaction_manager.unregisterSynch(self)

313 314 315
        if self._mvcc_storage:
            self._storage.sync(force=False)

316 317 318 319 320 321
        if primary:
            for connection in self.connections.values():
                if connection is not self:
                    connection.close(False)

            # Return the connection to the pool.
322
            if self.opened is not None:
323
                self._db._returnToPool(self)
324

325
                # _returnToPool() set self.opened to None.
326 327 328 329
                # However, we can't assert that here, because self may
                # have been reused (by another thread) by the time we
                # get back here.
        else:
330
            self.opened = None
331

Jim Fulton's avatar
Jim Fulton committed
332 333
        am = self._db._activity_monitor
        if am is not None:
334
            am.closedConnection(self)
335

336 337 338
    def db(self):
        """Returns a handle to the database this connection belongs to."""
        return self._db
339

340
    def isReadOnly(self):
341
        """Returns True if this connection is read only."""
342
        if self.opened is None:
343
            raise ConnectionStateError("The database connection is closed")
344
        return self.before is not None or self._storage.isReadOnly()
345

346 347
    def invalidate(self, tid, oids):
        """Notify the Connection that transaction 'tid' invalidated oids."""
348
        if self.before is not None:
Jim Fulton's avatar
Jim Fulton committed
349
            # This is a historical connection.  Invalidations are irrelevant.
350
            return
351 352 353 354
        self._inv_lock.acquire()
        try:
            if self._txn_time is None:
                self._txn_time = tid
355
            elif (tid is not None) and (tid < self._txn_time):
356 357 358
                raise AssertionError("invalidations out of order, %r < %r"
                                     % (tid, self._txn_time))

359 360 361
            self._invalidated.update(oids)
        finally:
            self._inv_lock.release()
362

363 364 365 366 367 368 369
    def invalidateCache(self):
        self._inv_lock.acquire()
        try:
            self._invalidatedCache = True
        finally:
            self._inv_lock.release()

370
    @property
371 372
    def root(self):
        """Return the database root object."""
373
        return RootConvenience(self.get(z64))
374

375 376 377 378
    def get_connection(self, database_name):
        """Return a Connection for the named database."""
        connection = self.connections.get(database_name)
        if connection is None:
379 380
            new_con = self._db.databases[database_name].open(
                transaction_manager=self.transaction_manager,
381
                before=self.before,
382
                )
383 384 385 386
            self.connections.update(new_con.connections)
            new_con.connections = self.connections
            connection = new_con
        return connection
387

388 389 390 391 392 393
    def _implicitlyAdding(self, oid):
        """Are we implicitly adding an object within the current transaction

        This is used in a check to avoid implicitly adding an object
        to a database in a multi-database situation.
        See serialize.ObjectWriter.persistent_id.
394

395 396 397 398 399 400 401 402 403
        """
        return (self._creating.get(oid, 0)
                or
                ((self._savepoint_storage is not None)
                 and
                 self._savepoint_storage.creating.get(oid, 0)
                 )
                )

404 405
    def sync(self):
        """Manually update the view on the database."""
406
        self.transaction_manager.abort()
407
        self._storage_sync()
Jeremy Hylton's avatar
Jeremy Hylton committed
408

409 410 411 412 413
    def getDebugInfo(self):
        """Returns a tuple with different items for debugging the
        connection.
        """
        return self._debug_info
Jeremy Hylton's avatar
Jeremy Hylton committed
414

415 416 417
    def setDebugInfo(self, *args):
        """Add the given items to the debug information of this connection."""
        self._debug_info = self._debug_info + args
418

419 420 421 422 423 424 425
    def getTransferCounts(self, clear=False):
        """Returns the number of objects loaded and stored."""
        res = self._load_count, self._store_count
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res
426

427 428
    # Connection methods
    ##########################################################################
429

430
    ##########################################################################
431
    # Data manager (ISavepointDataManager) methods
432

433 434
    def abort(self, transaction):
        """Abort a transaction and forget all changes."""
435

436 437 438 439 440 441
        # The order is important here.  We want to abort registered
        # objects before we process the cache.  Otherwise, we may un-add
        # objects added in savepoints.  If they've been modified since
        # the savepoint, then they won't have _p_oid or _p_jar after
        # they've been unadded. This will make the code in _abort
        # confused.
442

443
        self._abort()
444

445 446
        if self._savepoint_storage is not None:
            self._abort_savepoint()
447

Jim Fulton's avatar
Jim Fulton committed
448
        self._invalidate_creating()
449
        self._tpc_cleanup()
450

451 452
    def _abort(self):
        """Abort a transaction and forget all changes."""
453

454 455 456 457 458
        for obj in self._registered_objects:
            oid = obj._p_oid
            assert oid is not None
            if oid in self._added:
                del self._added[oid]
Jim Fulton's avatar
Jim Fulton committed
459 460
                if self._cache.get(oid) is not None:
                    del self._cache[oid]
461 462
                del obj._p_jar
                del obj._p_oid
Jim Fulton's avatar
Jim Fulton committed
463 464
                if obj._p_changed:
                    obj._p_changed = False
465
            else:
466

467 468
                # Note: If we invalidate a non-ghostifiable object
                # (i.e. a persistent class), the object will
469
                # immediately reread its state.  That means that the
470
                # following call could result in a call to
471
                # self.setstate, which, of course, must succeed.
472 473 474 475 476 477 478 479 480
                # In general, it would be better if the read could be
                # delayed until the start of the next transaction.  If
                # we read at the end of a transaction and if the
                # object was invalidated during this transaction, then
                # we'll read non-current data, which we'll discard
                # later in transaction finalization.  Unfortnately, we
                # can only delay the read if this abort corresponds to
                # a top-level-transaction abort.  We can't tell if
                # this is a top-level-transaction abort, so we have to
481
                # go ahead and invalidate now.  Fortunately, it's
482 483 484
                # pretty unlikely that the object we are invalidating
                # was invalidated by another thread, so the risk of a
                # reread is pretty low.
485

486
                self._cache.invalidate(oid)
487

488 489 490 491 492
    def _tpc_cleanup(self):
        """Performs cleanup operations to support tpc_finish and tpc_abort."""
        self._conflicts.clear()
        self._needs_to_join = True
        self._registered_objects = []
493
        self._creating.clear()
494

495
    # Process pending invalidations.
496
    def _flush_invalidations(self):
497 498 499 500 501 502
        if self._mvcc_storage:
            # Poll the storage for invalidations.
            invalidated = self._storage.poll_invalidations()
            if invalidated is None:
                # special value: the transaction is so old that
                # we need to flush the whole cache.
503
                self._cache.invalidate(list(self._cache.cache_data.keys()))
504 505 506
            elif invalidated:
                self._cache.invalidate(invalidated)

507 508
        self._inv_lock.acquire()
        try:
509
            # Non-ghostifiable objects may need to read when they are
510
            # invalidated, so we'll quickly just replace the
511 512
            # invalidating dict with a new one.  We'll then process
            # the invalidations after freeing the lock *and* after
513
            # resetting the time.  This means that invalidations will
514
            # happen after the start of the transactions.  They are
515
            # subject to conflict errors and to reading old data.
516 517

            # TODO: There is a potential problem lurking for persistent
518
            # classes.  Suppose we have an invalidation of a persistent
519 520 521
            # class and of an instance.  If the instance is
            # invalidated first and if the invalidation logic uses
            # data read from the class, then the invalidation could
522
            # be performed with stale data.  Or, suppose that there
523 524 525 526 527 528 529 530 531
            # are instances of the class that are freed as a result of
            # invalidating some object.  Perhaps code in their __del__
            # uses class data.  Really, the only way to properly fix
            # this is to, in fact, make classes ghostifiable.  Then
            # we'd have to reimplement attribute lookup to check the
            # class state and, if necessary, activate the class.  It's
            # much worse than that though, because we'd also need to
            # deal with slots.  When a class is ghostified, we'd need
            # to replace all of the slot operations with versions that
532 533
            # reloaded the object when called. It's hard to say which
            # is better or worse.  For now, it seems the risk of
534
            # using a class while objects are being invalidated seems
535
            # small enough to be acceptable.
536

537 538
            invalidated = dict.fromkeys(self._invalidated)
            self._invalidated = set()
539
            self._txn_time = None
540 541 542
            if self._invalidatedCache:
                self._invalidatedCache = False
                invalidated = self._cache.cache_data.copy()
543 544
        finally:
            self._inv_lock.release()
545 546 547 548

        self._cache.invalidate(invalidated)

        # Now is a good time to collect some garbage.
549
        self._cache.incrgc()
Jim Fulton's avatar
Jim Fulton committed
550

551 552 553 554 555 556
    def tpc_begin(self, transaction):
        """Begin commit of a transaction, starting the two-phase commit."""
        self._modified = []

        # _creating is a list of oids of new objects, which is used to
        # remove them from the cache if a transaction aborts.
557
        self._creating.clear()
558 559 560 561 562 563 564 565 566 567 568 569 570 571
        self._normal_storage.tpc_begin(transaction)

    def commit(self, transaction):
        """Commit changes to an object"""

        if self._savepoint_storage is not None:

            # We first checkpoint the current changes to the savepoint
            self.savepoint()

            # then commit all of the savepoint changes at once
            self._commit_savepoint(transaction)

            # No need to call _commit since savepoint did.
572

573 574 575
        else:
            self._commit(transaction)

576
        for oid, serial in six.iteritems(self._readCurrent):
Jim Fulton's avatar
Jim Fulton committed
577 578 579 580 581 582
            try:
                self._storage.checkCurrentSerialInTransaction(
                    oid, serial, transaction)
            except ConflictError:
                self._cache.invalidate(oid)
                raise
583

584 585
    def _commit(self, transaction):
        """Commit changes to an object"""
Jim Fulton's avatar
Jim Fulton committed
586

587 588
        if self.before is not None:
            raise ReadOnlyHistoryError()
589 590

        if self._import:
591 592
            # We are importing an export file. We alsways do this
            # while making a savepoint so we can copy export data
Jim Fulton's avatar
typo  
Jim Fulton committed
593
            # directly to our storage, typically a TmpStore.
594 595 596 597 598 599 600 601
            self._importDuringCommit(transaction, *self._import)
            self._import = None

        # Just in case an object is added as a side-effect of storing
        # a modified object.  If, for example, a __getstate__() method
        # calls add(), the newly added objects will show up in
        # _added_during_commit.  This sounds insane, but has actually
        # happened.
Jim Fulton's avatar
Jim Fulton committed
602

603
        self._added_during_commit = []
604

605
        if self._invalidatedCache:
Jim Fulton's avatar
Jim Fulton committed
606
            raise ConflictError()
607

608 609 610 611 612
        for obj in self._registered_objects:
            oid = obj._p_oid
            assert oid
            if oid in self._conflicts:
                raise ReadConflictError(object=obj)
613

614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
            if obj._p_jar is not self:
                raise InvalidObjectReference(obj, obj._p_jar)
            elif oid in self._added:
                assert obj._p_serial == z64
            elif obj._p_changed:
                if oid in self._invalidated:
                    resolve = getattr(obj, "_p_resolveConflict", None)
                    if resolve is None:
                        raise ConflictError(object=obj)
                self._modified.append(oid)
            else:
                # Nothing to do.  It's been said that it's legal, e.g., for
                # an object to set _p_changed to false after it's been
                # changed and registered.
                continue
629

630
            self._store_objects(ObjectWriter(obj), transaction)
631

632 633 634
        for obj in self._added_during_commit:
            self._store_objects(ObjectWriter(obj), transaction)
        self._added_during_commit = None
635

636 637 638 639
    def _store_objects(self, writer, transaction):
        for obj in writer:
            oid = obj._p_oid
            serial = getattr(obj, "_p_serial", z64)
640

641 642 643 644 645 646 647
            if ((serial == z64)
                and
                ((self._savepoint_storage is None)
                 or (oid not in self._savepoint_storage.creating)
                 or self._savepoint_storage.creating[oid]
                 )
                ):
Jim Fulton's avatar
Jim Fulton committed
648

649
                # obj is a new object
650 651 652 653

                # Because obj was added, it is now in _creating, so it
                # can be removed from _added.  If oid wasn't in
                # adding, then we are adding it implicitly.
654

655 656 657
                implicitly_adding = self._added.pop(oid, None) is None

                self._creating[oid] = implicitly_adding
658

659 660 661 662 663
            else:
                if (oid in self._invalidated
                    and not hasattr(obj, '_p_resolveConflict')):
                    raise ConflictError(object=obj)
                self._modified.append(oid)
Jim Fulton's avatar
Jim Fulton committed
664

665
            p = writer.serialize(obj)  # This calls __getstate__ of obj
666 667
            if len(p) >= self.large_record_size:
                warnings.warn(large_object_message % (obj.__class__, len(p)))
668

669
            if isinstance(obj, Blob):
670 671
                if not IBlobStorage.providedBy(self._storage):
                    raise Unsupported(
Jim Fulton's avatar
Jim Fulton committed
672
                        "Storing Blobs in %s is not supported." %
673
                        repr(self._storage))
674 675
                if obj.opened():
                    raise ValueError("Can't commit with opened blobs.")
Jim Fulton's avatar
Jim Fulton committed
676 677 678 679 680 681
                blobfilename = obj._uncommitted()
                if blobfilename is None:
                    assert serial is not None # See _uncommitted
                    self._modified.pop() # not modified
                    continue
                s = self._storage.storeBlob(oid, serial, p, blobfilename,
682
                                            '', transaction)
683 684 685 686 687 688
                # we invalidate the object here in order to ensure
                # that that the next attribute access of its name
                # unghostify it, which will cause its blob data
                # to be reattached "cleanly"
                obj._p_invalidate()
            else:
689
                s = self._storage.store(oid, serial, p, '', transaction)
690

691 692 693 694 695 696 697 698 699 700 701 702 703
            self._store_count += 1
            # Put the object in the cache before handling the
            # response, just in case the response contains the
            # serial number for a newly created object
            try:
                self._cache[oid] = obj
            except:
                # Dang, I bet it's wrapped:
                # TODO:  Deprecate, then remove, this.
                if hasattr(obj, 'aq_base'):
                    self._cache[oid] = obj.aq_base
                else:
                    raise
Jim Fulton's avatar
Jim Fulton committed
704

705 706 707
            self._cache.update_object_size_estimation(oid, len(p))
            obj._p_estimated_size = len(p)

708
            self._handle_serial(oid, s)
709

710
    def _handle_serial(self, oid, serial, change=True):
711 712 713 714 715

        # if we write an object, we don't want to check if it was read
        # while current.  This is a convenient choke point to do this.
        self._readCurrent.pop(oid, None)

716
        if not serial:
717
            return
718
        if not isinstance(serial, bytes):
719 720 721 722 723 724 725 726
            raise serial
        obj = self._cache.get(oid, None)
        if obj is None:
            return
        if serial == ResolvedSerial:
            del obj._p_changed # transition from changed to ghost
        else:
            if change:
727
                obj._p_changed = 0 # transition from changed to up-to-date
728
            obj._p_serial = serial
729

730 731 732
    def tpc_abort(self, transaction):
        if self._import:
            self._import = None
733 734 735

        if self._savepoint_storage is not None:
            self._abort_savepoint()
736

737
        self._storage.tpc_abort(transaction)
738

739 740
        # Note: If we invalidate a non-ghostifiable object (i.e. a
        # persistent class), the object will immediately reread its
741 742 743 744 745 746 747 748 749 750 751 752 753 754
        # state.  That means that the following call could result in a
        # call to self.setstate, which, of course, must succeed.  In
        # general, it would be better if the read could be delayed
        # until the start of the next transaction.  If we read at the
        # end of a transaction and if the object was invalidated
        # during this transaction, then we'll read non-current data,
        # which we'll discard later in transaction finalization.  We
        # could, theoretically queue this invalidation by calling
        # self.invalidate.  Unfortunately, attempts to make that
        # change resulted in mysterious test failures.  It's pretty
        # unlikely that the object we are invalidating was invalidated
        # by another thread, so the risk of a reread is pretty low.
        # It's really not worth the effort to pursue this.

755 756 757 758
        self._cache.invalidate(self._modified)
        self._invalidate_creating()
        while self._added:
            oid, obj = self._added.popitem()
Jim Fulton's avatar
Jim Fulton committed
759 760
            if obj._p_changed:
                obj._p_changed = False
761 762 763 764
            del obj._p_oid
            del obj._p_jar
        self._tpc_cleanup()

765 766 767 768
    def _invalidate_creating(self, creating=None):
        """Disown any objects newly saved in an uncommitted transaction."""
        if creating is None:
            creating = self._creating
Jim Fulton's avatar
Jim Fulton committed
769
            self._creating = {}
770

771 772 773 774
        for oid in creating:
            o = self._cache.get(oid)
            if o is not None:
                del self._cache[oid]
Jim Fulton's avatar
Jim Fulton committed
775 776
                if o._p_changed:
                    o._p_changed = False
777 778
                del o._p_jar
                del o._p_oid
779

Jim Fulton's avatar
Jim Fulton committed
780

781 782 783 784 785 786
    def tpc_vote(self, transaction):
        """Verify that a data manager can commit the transaction."""
        try:
            vote = self._storage.tpc_vote
        except AttributeError:
            return
Jim Fulton's avatar
Jim Fulton committed
787 788
        try:
            s = vote(transaction)
789
        except ReadConflictError as v:
Jim Fulton's avatar
Jim Fulton committed
790 791 792 793
            if v.oid:
                self._cache.invalidate(v.oid)
            raise

794 795 796
        if s:
            for oid, serial in s:
                self._handle_serial(oid, serial)
797

798 799
    def tpc_finish(self, transaction):
        """Indicate confirmation that the transaction is done."""
800

801
        def callback(tid):
802 803 804 805
            if self._mvcc_storage:
                # Inter-connection invalidation is not needed when the
                # storage provides MVCC.
                return
806
            d = dict.fromkeys(self._modified)
807
            self._db.invalidate(tid, d, self)
808 809 810 811 812
#       It's important that the storage calls the passed function
#       while it still has its lock.  We don't want another thread
#       to be able to read any updated data until we've had a chance
#       to send an invalidation message to all of the other
#       connections!
813 814
        self._storage.tpc_finish(transaction, callback)
        self._tpc_cleanup()
815

816 817 818 819
    def sortKey(self):
        """Return a consistent sort key for this connection."""
        return "%s:%s" % (self._storage.sortKey(), id(self))

820
    # Data manager (ISavepointDataManager) methods
821 822 823 824 825 826 827 828 829
    ##########################################################################

    ##########################################################################
    # Transaction-manager synchronization -- ISynchronizer

    def beforeCompletion(self, txn):
        # We don't do anything before a commit starts.
        pass

830 831 832 833
    # Call the underlying storage's sync() method (if any), and process
    # pending invalidations regardless.  Of course this should only be
    # called at transaction boundaries.
    def _storage_sync(self, *ignored):
834
        self._readCurrent.clear()
835 836 837
        sync = getattr(self._storage, 'sync', 0)
        if sync:
            sync()
838
        self._flush_invalidations()
839

840 841 842 843
    afterCompletion =  _storage_sync
    newTransaction = _storage_sync

     # Transaction-manager synchronization -- ISynchronizer
844
    ##########################################################################
845

846
    ##########################################################################
847 848 849 850 851 852 853 854 855
    # persistent.interfaces.IPersistentDatamanager

    def oldstate(self, obj, tid):
        """Return copy of 'obj' that was written by transaction 'tid'."""
        assert obj._p_jar is self
        p = self._storage.loadSerial(obj._p_oid, tid)
        return self._reader.getState(p)

    def setstate(self, obj):
856 857
        """Turns the ghost 'obj' into a real object by loading its state from
        the database."""
858 859
        oid = obj._p_oid

860
        if self.opened is None:
861 862 863 864 865 866 867 868
            msg = ("Shouldn't load state for %s %s "
                   "when the connection is closed"
                   % (className(obj), oid_repr(oid)))
            try:
                raise ConnectionStateError(msg)
            except:
                self._log.exception(msg)
                raise
869 870 871 872 873 874

        try:
            self._setstate(obj)
        except ConflictError:
            raise
        except:
875
            self._log.exception("Couldn't load state for %s %s",
876
                                className(obj), oid_repr(oid))
877 878 879 880 881 882 883 884 885 886 887
            raise

    def _setstate(self, obj):
        # Helper for setstate(), which provides logging of failures.

        # The control flow is complicated here to avoid loading an
        # object revision that we are sure we aren't going to use.  As
        # a result, invalidation tests occur before and after the
        # load.  We can only be sure about invalidations after the
        # load.

888 889 890
        # If an object has been invalidated, among the cases to consider:
        # - Try MVCC
        # - Raise ConflictError.
891

892 893
        if self.before is not None:
            # Load data that was current before the time we have.
894
            before = self.before
895 896
            t = self._storage.loadBefore(obj._p_oid, before)
            if t is None:
897
                raise POSKeyError() # historical connection!
898
            p, serial, end = t
Jim Fulton's avatar
Jim Fulton committed
899

900 901 902 903 904 905 906
        else:
            # There is a harmless data race with self._invalidated.  A
            # dict update could go on in another thread, but we don't care
            # because we have to check again after the load anyway.

            if self._invalidatedCache:
                raise ReadConflictError()
Jim Fulton's avatar
Jim Fulton committed
907

908
            if (obj._p_oid in self._invalidated):
909 910
                self._load_before_or_conflict(obj)
                return
Jim Fulton's avatar
Jim Fulton committed
911

912 913
            p, serial = self._storage.load(obj._p_oid, '')
            self._load_count += 1
Jim Fulton's avatar
Jim Fulton committed
914

915 916 917 918 919
            self._inv_lock.acquire()
            try:
                invalid = obj._p_oid in self._invalidated
            finally:
                self._inv_lock.release()
Jim Fulton's avatar
Jim Fulton committed
920

921
            if invalid:
922 923
                self._load_before_or_conflict(obj)
                return
924 925 926

        self._reader.setGhostState(obj, p)
        obj._p_serial = serial
927
        self._cache.update_object_size_estimation(obj._p_oid, len(p))
928
        obj._p_estimated_size = len(p)
929

930
        # Blob support
931
        if isinstance(obj, Blob):
932
            obj._p_blob_uncommitted = None
933
            obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, serial)
934

935 936
    def _load_before_or_conflict(self, obj):
        """Load non-current state for obj or raise ReadConflictError."""
937
        if not self._setstate_noncurrent(obj):
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963
            self._register(obj)
            self._conflicts[obj._p_oid] = True
            raise ReadConflictError(object=obj)

    def _setstate_noncurrent(self, obj):
        """Set state using non-current data.

        Return True if state was available, False if not.
        """
        try:
            # Load data that was current before the commit at txn_time.
            t = self._storage.loadBefore(obj._p_oid, self._txn_time)
        except KeyError:
            return False
        if t is None:
            return False
        data, start, end = t
        # The non-current transaction must have been written before
        # txn_time.  It must be current at txn_time, but could have
        # been modified at txn_time.

        assert start < self._txn_time, (u64(start), u64(self._txn_time))
        assert end is not None
        assert self._txn_time <= end, (u64(self._txn_time), u64(end))
        self._reader.setGhostState(obj, data)
        obj._p_serial = start
Gary Poster's avatar
Gary Poster committed
964 965 966 967 968 969

        # MVCC Blob support
        if isinstance(obj, Blob):
            obj._p_blob_uncommitted = None
            obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)

970 971 972 973 974 975 976 977 978 979 980 981 982 983 984
        return True

    def register(self, obj):
        """Register obj with the current transaction manager.

        A subclass could override this method to customize the default
        policy of one transaction manager for each thread.

        obj must be an object loaded from this Connection.
        """
        assert obj._p_jar is self
        if obj._p_oid is None:
            # The actual complaint here is that an object without
            # an oid is being registered.  I can't think of any way to
            # achieve that without assignment to _p_jar.  If there is
985 986
            # a way, this will be a very confusing exception.
            raise ValueError("assigning to _p_jar is not supported")
987 988 989 990 991 992
        elif obj._p_oid in self._added:
            # It was registered before it was added to _added.
            return
        self._register(obj)

    def _register(self, obj=None):
993 994 995 996 997

        # The order here is important.  We need to join before
        # registering the object, because joining may take a
        # savepoint, and the savepoint should not reflect the change
        # to the object.
998

999
        if self._needs_to_join:
1000
            self.transaction_manager.get().join(self)
1001 1002
            self._needs_to_join = False

1003 1004 1005
        if obj is not None:
            self._registered_objects.append(obj)

1006 1007 1008
    def readCurrent(self, ob):
        assert ob._p_jar is self
        assert ob._p_oid is not None and ob._p_serial is not None
1009 1010
        if ob._p_serial != z64:
            self._readCurrent[ob._p_oid] = ob._p_serial
1011

1012 1013 1014 1015
    # persistent.interfaces.IPersistentDatamanager
    ##########################################################################

    ##########################################################################
1016 1017 1018 1019 1020 1021 1022
    # PROTECTED stuff (used by e.g. ZODB.DB.DB)

    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
1023 1024
        # remove those items that are on the lru list (which may not actually
        # be in the full cache, under the Python implementation)
1025
        for k,v in items:
1026
            everything.pop(k, None)
1027
        # return a list of [ghosts....not recently used.....recently used]
1028
        return list(everything.items()) + items
1029

1030
    def open(self, transaction_manager=None, delegate=True):
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
        """Register odb, the DB that this Connection uses.

        This method is called by the DB every time a Connection
        is opened.  Any invalidations received while the Connection
        was closed will be processed.

        If the global module function resetCaches() was called, the
        cache will be cleared.

        Parameters:
        odb: database that owns the Connection
1042 1043
        transaction_manager: transaction manager to use.  None means
            use the default transaction manager.
1044 1045 1046
        register for afterCompletion() calls.
        """

1047
        self.opened = time.time()
1048 1049 1050

        if transaction_manager is None:
            transaction_manager = transaction.manager
1051

1052 1053
        self.transaction_manager = transaction_manager

1054 1055 1056 1057 1058 1059
        if self._reset_counter != global_reset_counter:
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
            self._flush_invalidations()

1060
        transaction_manager.registerSynch(self)
1061 1062 1063 1064 1065 1066 1067 1068

        if self._cache is not None:
            self._cache.incrgc() # This is a good time to do some GC

        if delegate:
            # delegate open to secondary connections
            for connection in self.connections.values():
                if connection is not self:
1069
                    connection.open(transaction_manager, False)
1070 1071 1072 1073 1074 1075 1076 1077

    def _resetCache(self):
        """Creates a new cache, discarding the old one.

        See the docstring for the resetCaches() function.
        """
        self._reset_counter = global_reset_counter
        self._invalidated.clear()
1078
        self._invalidatedCache = False
1079
        cache_size = self._cache.cache_size
1080 1081
        cache_size_bytes = self._cache.cache_size_bytes
        self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
1082 1083
        if getattr(self, '_reader', None) is not None:
            self._reader._cache = cache
1084

Jim Fulton's avatar
Jim Fulton committed
1085
    def _release_resources(self):
1086
        for c in six.itervalues(self.connections):
Jim Fulton's avatar
Jim Fulton committed
1087 1088 1089 1090
            if c._mvcc_storage:
                c._storage.release()
            c._storage = c._normal_storage = None
            c._cache = PickleCache(self, 0, 0)
1091

1092
    ##########################################################################
1093 1094 1095
    # Python protocol

    def __repr__(self):
1096
        return '<Connection at %08x>' % (positive_id(self),)
1097

1098 1099 1100 1101
    # Python protocol
    ##########################################################################

    ##########################################################################
1102 1103 1104 1105
    # DEPRECATION candidates

    __getitem__ = get

1106
    def exchange(self, old, new):
1107 1108 1109 1110 1111
        # called by a ZClasses method that isn't executed by the test suite
        oid = old._p_oid
        new._p_oid = oid
        new._p_jar = self
        new._p_changed = 1
1112
        self._register(new)
1113
        self._cache[oid] = new
1114

1115 1116 1117 1118
    # DEPRECATION candidates
    ##########################################################################

    ##########################################################################
1119 1120
    # DEPRECATED methods

1121
    # None at present.
1122

1123 1124
    # DEPRECATED methods
    ##########################################################################
1125

1126 1127 1128 1129 1130
    #####################################################################
    # Savepoint support

    def savepoint(self):
        if self._savepoint_storage is None:
1131
            tmpstore = TmpStore(self._normal_storage)
1132
            self._savepoint_storage = tmpstore
1133 1134
            self._storage = self._savepoint_storage

1135
        self._creating.clear()
1136
        self._commit(None)
1137 1138
        self._storage.creating.update(self._creating)
        self._creating.clear()
1139 1140
        self._registered_objects = []

Jim Fulton's avatar
Jim Fulton committed
1141 1142 1143 1144
        state = (self._storage.position,
                 self._storage.index.copy(),
                 self._storage.creating.copy(),
                 )
1145 1146 1147 1148 1149 1150 1151
        result = Savepoint(self, state)
        # While the interface doesn't guarantee this, savepoints are
        # sometimes used just to "break up" very long transactions, and as
        # a pragmatic matter this is a good time to reduce the cache
        # memory burden.
        self.cacheGC()
        return result
1152 1153 1154 1155 1156

    def _rollback(self, state):
        self._abort()
        self._registered_objects = []
        src = self._storage
Jim Fulton's avatar
Jim Fulton committed
1157 1158 1159 1160

        # Invalidate objects created *after* the savepoint.
        self._invalidate_creating((oid for oid in src.creating
                                   if oid not in state[2]))
1161
        index = src.index
1162
        src.reset(*state)
1163
        self._cache.invalidate(index)
1164 1165

    def _commit_savepoint(self, transaction):
1166
        """Commit all changes made in savepoints and begin 2-phase commit
1167 1168 1169 1170
        """
        src = self._savepoint_storage
        self._storage = self._normal_storage
        self._savepoint_storage = None
1171 1172
        try:
            self._log.debug("Committing savepoints of size %s", src.getSize())
1173
            oids = sorted(src.index.keys())
1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197

            # Copy invalidating and creating info from temporary storage:
            self._modified.extend(oids)
            self._creating.update(src.creating)

            for oid in oids:
                data, serial = src.load(oid, src)
                obj = self._cache.get(oid, None)
                if obj is not None:
                    self._cache.update_object_size_estimation(
                        obj._p_oid, len(data))
                    obj._p_estimated_size = len(data)
                if isinstance(self._reader.getGhost(data), Blob):
                    blobfilename = src.loadBlob(oid, serial)
                    s = self._storage.storeBlob(
                        oid, serial, data, blobfilename,
                        '', transaction)
                    # we invalidate the object here in order to ensure
                    # that that the next attribute access of its name
                    # unghostify it, which will cause its blob data
                    # to be reattached "cleanly"
                    self.invalidate(None, (oid, ))
                else:
                    s = self._storage.store(oid, serial, data,
1198
                                            '', transaction)
1199

1200 1201 1202
                self._handle_serial(oid, s, change=False)
        finally:
            src.close()
1203 1204

    def _abort_savepoint(self):
1205
        """Discard all savepoint data."""
1206
        src = self._savepoint_storage
Jim Fulton's avatar
Jim Fulton committed
1207
        self._invalidate_creating(src.creating)
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
        self._storage = self._normal_storage
        self._savepoint_storage = None

        # Note: If we invalidate a non-ghostifiable object (i.e. a
        # persistent class), the object will immediately reread it's
        # state.  That means that the following call could result in a
        # call to self.setstate, which, of course, must succeed.  In
        # general, it would be better if the read could be delayed
        # until the start of the next transaction.  If we read at the
        # end of a transaction and if the object was invalidated
        # during this transaction, then we'll read non-current data,
        # which we'll discard later in transaction finalization.  We
        # could, theoretically queue this invalidation by calling
        # self.invalidate.  Unfortunately, attempts to make that
        # change resulted in mysterious test failures.  It's pretty
        # unlikely that the object we are invalidating was invalidated
        # by another thread, so the risk of a reread is pretty low.
        # It's really not worth the effort to pursue this.

Jim Fulton's avatar
Jim Fulton committed
1227 1228 1229
        # Note that we do this *after* reseting the storage so that, if
        # data are read, we read it from the reset storage!

1230
        self._cache.invalidate(src.index)
Jim Fulton's avatar
Jim Fulton committed
1231

1232 1233 1234 1235 1236
        src.close()

    # Savepoint support
    #####################################################################

1237
@implementer(IDataManagerSavepoint)
1238 1239
class Savepoint:

1240

1241 1242 1243 1244 1245 1246 1247
    def __init__(self, datamanager, state):
        self.datamanager = datamanager
        self.state = state

    def rollback(self):
        self.datamanager._rollback(self.state)

1248
@implementer(IBlobStorage)
1249 1250 1251
class TmpStore:
    """A storage-like thing to support savepoints."""

1252

1253
    def __init__(self, storage):
1254 1255
        self._storage = storage
        for method in (
1256
            'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
1257
            'isReadOnly'
1258 1259
            ):
            setattr(self, method, getattr(storage, method))
1260

1261
        self._file = tempfile.TemporaryFile(prefix='TmpStore')
1262 1263
        # position: current file position
        # _tpos: file position at last commit point
1264
        self.position = 0
1265 1266
        # index: map oid to pos of last committed version
        self.index = {}
1267
        self.creating = {}
1268

Jim Fulton's avatar
Jim Fulton committed
1269 1270
        self._blob_dir = None

1271 1272 1273 1274 1275
    def __len__(self):
        return len(self.index)

    def close(self):
        self._file.close()
Jim Fulton's avatar
Jim Fulton committed
1276 1277 1278
        if self._blob_dir is not None:
            remove_committed_dir(self._blob_dir)
            self._blob_dir = None
1279 1280 1281 1282

    def load(self, oid, version):
        pos = self.index.get(oid)
        if pos is None:
1283
            return self._storage.load(oid, '')
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297
        self._file.seek(pos)
        h = self._file.read(8)
        oidlen = u64(h)
        read_oid = self._file.read(oidlen)
        if read_oid != oid:
            raise POSException.StorageSystemError('Bad temporary storage')
        h = self._file.read(16)
        size = u64(h[8:])
        serial = h[:8]
        return self._file.read(size), serial

    def store(self, oid, serial, data, version, transaction):
        # we have this funny signature so we can reuse the normal non-commit
        # commit logic
1298
        assert version == ''
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
        self._file.seek(self.position)
        l = len(data)
        if serial is None:
            serial = z64
        header = p64(len(oid)) + oid + serial + p64(l)
        self._file.write(header)
        self._file.write(data)
        self.index[oid] = self.position
        self.position += l + len(header)
        return serial

1310 1311
    def storeBlob(self, oid, serial, data, blobfilename, version,
                  transaction):
1312 1313
        assert version == ''
        serial = self.store(oid, serial, data, '', transaction)
1314

1315
        targetpath = self._getBlobPath()
1316
        if not os.path.exists(targetpath):
1317
            os.makedirs(targetpath, 0o700)
1318 1319

        targetname = self._getCleanFilename(oid, serial)
1320
        rename_or_copy_blob(blobfilename, targetname, chmod=False)
1321

1322
    def loadBlob(self, oid, serial):
1323 1324
        """Return the filename where the blob file can be found.
        """
1325 1326 1327 1328
        if not IBlobStorage.providedBy(self._storage):
            raise Unsupported(
                "Blobs are not supported by the underlying storage %r." %
                self._storage)
1329 1330
        filename = self._getCleanFilename(oid, serial)
        if not os.path.exists(filename):
1331
            return self._storage.loadBlob(oid, serial)
1332 1333
        return filename

1334 1335 1336 1337 1338 1339 1340
    def openCommittedBlobFile(self, oid, serial, blob=None):
        blob_filename = self.loadBlob(oid, serial)
        if blob is None:
            return open(blob_filename, 'rb')
        else:
            return ZODB.blob.BlobFile(blob_filename, 'r', blob)

1341
    def _getBlobPath(self):
Jim Fulton's avatar
Jim Fulton committed
1342 1343 1344 1345 1346 1347
        blob_dir = self._blob_dir
        if blob_dir is None:
            blob_dir = tempfile.mkdtemp(dir=self.temporaryDirectory(),
                                        prefix='savepoints')
            self._blob_dir = blob_dir
        return blob_dir
1348 1349

    def _getCleanFilename(self, oid, tid):
Jim Fulton's avatar
Jim Fulton committed
1350 1351 1352 1353 1354
        return os.path.join(
            self._getBlobPath(),
            "%s-%s%s" % (utils.oid_repr(oid), utils.tid_repr(tid),
                         SAVEPOINT_SUFFIX,)
            )
1355 1356 1357 1358

    def temporaryDirectory(self):
        return self._storage.temporaryDirectory()

Jim Fulton's avatar
Jim Fulton committed
1359
    def reset(self, position, index, creating):
1360 1361
        self._file.truncate(position)
        self.position = position
1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
        # Caution:  We're typically called as part of a savepoint rollback.
        # Other machinery remembers the index to restore, and passes it to
        # us.  If we simply bind self.index to `index`, then if the caller
        # didn't pass a copy of the index, the caller's index will mutate
        # when self.index mutates.  This can be a disaster if the caller is a
        # savepoint to which the user rolls back again later (the savepoint
        # loses the original index it passed).  Therefore, to be safe, we make
        # a copy of the index here.  An alternative would be to ensure that
        # all callers pass copies.  As is, our callers do not make copies.
        self.index = index.copy()
Jim Fulton's avatar
Jim Fulton committed
1372
        self.creating = creating
1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401

class RootConvenience(object):

    def __init__(self, root):
        self.__dict__['_root'] = root

    def __getattr__(self, name):
        try:
            return self._root[name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, v):
        self._root[name] = v

    def __delattr__(self, name):
        try:
            del self._root[name]
        except KeyError:
            raise AttributeError(name)

    def __call__(self):
        return self._root

    def __repr__(self):
        names = " ".join(sorted(self._root))
        if len(names) > 60:
            names = names[:57].rsplit(' ', 1)[0] + ' ...'
        return "<root: %s>" % names
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421

large_object_message = """The %s
object you're saving is large. (%s bytes.)

Perhaps you're storing media which should be stored in blobs.

Perhaps you're using a non-scalable data structure, such as a
PersistentMapping or PersistentList.

Perhaps you're storing data in objects that aren't persistent at
all. In cases like that, the data is stored in the record of the
containing persistent object.

In any case, storing records this big is probably a bad idea.

If you insist and want to get rid of this warning, use the
large_record_size option of the ZODB.DB constructor (or the
large-record-size option in a configuration file) to specify a larger
size.
"""