Connection.py 27.6 KB
Newer Older
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
Guido van Rossum's avatar
Guido van Rossum committed
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
#
matt@zope.com's avatar
matt@zope.com committed
6 7 8 9 10 11
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
Jim Fulton's avatar
alpha1  
Jim Fulton committed
13 14 15
##############################################################################
"""Database connection support

16
$Id: Connection.py,v 1.122 2004/02/24 22:07:46 jeremy Exp $"""
Jeremy Hylton's avatar
Jeremy Hylton committed
17

Jeremy Hylton's avatar
Jeremy Hylton committed
18
import logging
19
import sys
20
import threading
21
import itertools
22
from time import time
23
from utils import u64
Jeremy Hylton's avatar
Jeremy Hylton committed
24

25 26 27 28 29 30
from persistent import PickleCache
from zLOG import LOG, ERROR, BLATHER, WARNING

from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB.POSException \
31
     import ConflictError, ReadConflictError, InvalidObjectReference
32 33 34
from ZODB.TmpStore import TmpStore
from ZODB.Transaction import Transaction, get_transaction
from ZODB.utils import oid_repr, z64
35
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
36

37 38 39 40
global_reset_counter = 0

def resetCaches():
    """Causes all connection caches to be reset as connections are reopened.
Sidnei da Silva's avatar
Sidnei da Silva committed
41

42 43 44 45 46 47
    Zope's refresh feature uses this.  When you reload Python modules,
    instances of classes continue to use the old class definitions.
    To use the new code immediately, the refresh feature asks ZODB to
    clear caches by calling resetCaches().  When the instances are
    loaded by subsequent connections, they will use the new class
    definitions.
Barry Warsaw's avatar
Typos  
Barry Warsaw committed
48
    """
49 50
    global global_reset_counter
    global_reset_counter += 1
Shane Hathaway's avatar
Shane Hathaway committed
51

52
class Connection(ExportImport, object):
53 54 55 56 57
    """Connection to ZODB for loading and storing objects.

    The Connection object serves as a data manager.  The root() method
    on a Connection returns the root object for the database.  This
    object and all objects reachable from it are associated with the
58 59
    Connection that loaded them.  When a transaction commits, it uses
    the Connection to store modified objects.
60

61
    The typical use of ZODB is for each thread to have its own
62 63 64 65 66 67 68 69 70
    Connection and that no thread should have more than one Connection
    to the same database.  A thread is associated with a Connection by
    loading objects from that Connection.  Objects loaded by one
    thread should not be used by another thread.

    A Connection can be associated with a single version when it is
    created.  By default, a Connection is not associated with a
    version; it uses non-version data.

71 72 73 74 75 76 77 78
    Each Connection provides an isolated, consistent view of the
    database, by managing independent copies of objects in the
    database.  At transaction boundaries, these copies are updated to
    reflect the current state of the database.

    You should not instantiate this class directly; instead call the
    open() method of a DB instance.
    
79 80 81 82 83 84 85 86
    Synchronization

    A Connection instance is not thread-safe.  It is designed to
    support a thread model where each thread has its own transaction.
    If an application has more than one thread that uses the
    connection or the transaction the connection is registered with,
    the application should provide locking.

87 88 89
    XXX We should document an intended API for using a Connection via
    multiple threads.

90
    $Id: Connection.py,v 1.122 2004/02/24 22:07:46 jeremy Exp $
Jim Fulton's avatar
alpha1  
Jim Fulton committed
91
    """
92

Jeremy Hylton's avatar
Jeremy Hylton committed
93 94 95 96
    _tmp = None
    _debug_info = ()
    _opened = None
    _code_timestamp = 0
97
    _transaction = None
98
    _added_during_commit = None
Jim Fulton's avatar
alpha1  
Jim Fulton committed
99

100
    def __init__(self, version='', cache_size=400,
Jeremy Hylton's avatar
Jeremy Hylton committed
101
                 cache_deactivate_after=60, mvcc=True):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
102
        """Create a new Connection"""
Jeremy Hylton's avatar
Jeremy Hylton committed
103 104 105 106

        self._log = logging.getLogger("zodb.conn")

        self._version = version
107
        self._cache = cache = PickleCache(self, cache_size)
108 109 110 111
        if version:
            # Caches for versions end up empty if the version
            # is not used for a while. Non-version caches
            # keep their content indefinitely.
112 113 114

            # XXX Why do we want version caches to behave this way?

115
            self._cache.cache_drain_resistance = 100
116
        self._incrgc = self.cacheGC = cache.incrgc
117 118
        self._committed = []
        self._added = {}
119
        self._reset_counter = global_reset_counter
120 121
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored
Jim Fulton's avatar
alpha1  
Jim Fulton committed
122

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
        # _invalidated queues invalidate messages delivered from the DB
        # _inv_lock prevents one thread from modifying the set while
        # another is processing invalidations.  All the invalidations
        # from a single transaction should be applied atomically, so
        # the lock must be held when reading _invalidated.

        # XXX It sucks that we have to hold the lock to read
        # _invalidated.  Normally, _invalidated is written by call
        # dict.update, which will execute atomically by virtue of the
        # GIL.  But some storage might generate oids where hash or
        # compare invokes Python code.  In that case, the GIL can't
        # save us.
        self._inv_lock = threading.Lock()
        self._invalidated = d = {}
        self._invalid = d.has_key
        self._conflicts = {}
Jeremy Hylton's avatar
Jeremy Hylton committed
139 140 141 142 143 144 145 146 147 148
        self._noncurrent = {}

        # If MVCC is enabled, then _mvcc is True and _txn_time stores
        # the upper bound on transactions visible to this connection.
        # That is, all object revisions must be written before _txn_time.
        # If it is None, then the current revisions are acceptable.
        # If the connection is in a version, mvcc will be disabled, because
        # loadBefore() only returns non-version data.
        self._mvcc = mvcc and not version
        self._txn_time = None
149

150 151 152 153 154 155
        # To support importFile(), implemented in the ExportImport base
        # class, we need to run _importDuringCommit() from our commit()
        # method.  If _import is not None, it is a two-tuple of arguments
        # to pass to _importDuringCommit().
        self._import = None

156 157 158 159 160 161 162 163 164 165 166 167 168
    def getTransaction(self):
        t = self._transaction
        if t is None:
            # Fall back to thread-bound transactions
            t = get_transaction()
        return t

    def setLocalTransaction(self):
        """Use a transaction bound to the connection rather than the thread"""
        if self._transaction is None:
            self._transaction = Transaction()
        return self._transaction

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
        # remove those items that are on the lru list
        for k,v in items:
            del everything[k]
        # return a list of [ghosts....not recently used.....recently used]
        return everything.items() + items

    def __repr__(self):
        if self._version:
            ver = ' (in version %s)' % `self._version`
        else:
            ver = ''
185
        return '<Connection at %08x%s>' % (id(self), ver)
186

187
    def __getitem__(self, oid):
188
        obj = self._cache.get(oid, None)
189 190 191
        if obj is not None:
            return obj
        obj = self._added.get(oid, None)
192
        if obj is not None:
193
            return obj
Jim Fulton's avatar
alpha1  
Jim Fulton committed
194

Jim Fulton's avatar
Jim Fulton committed
195
        p, serial = self._storage.load(oid, self._version)
196
        obj = self._reader.getGhost(p)
197

198 199 200 201
        obj._p_oid = oid
        obj._p_jar = self
        obj._p_changed = None
        obj._p_serial = serial
202

203 204
        self._cache[oid] = obj
        return obj
Jim Fulton's avatar
alpha1  
Jim Fulton committed
205

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
    def add(self, obj):
        marker = object()
        oid = getattr(obj, "_p_oid", marker)
        if oid is marker:
            raise TypeError("Only first-class persistent objects may be"
                            " added to a Connection.", obj)
        elif obj._p_jar is None:
            oid = obj._p_oid = self._storage.new_oid()
            obj._p_jar = self
            self._added[oid] = obj
            if self._added_during_commit is not None:
                self._added_during_commit.append(obj)
        elif obj._p_jar is not self:
            raise InvalidObjectReference(obj, obj._p_jar)

221 222 223 224 225 226 227 228
    def sortKey(self):
        # XXX will raise an exception if the DB hasn't been set
        storage_key = self._sortKey()
        # If two connections use the same storage, give them a
        # consistent order using id().  This is unique for the
        # lifetime of a connection, which is good enough.
        return "%s:%s" % (storage_key, id(self))

229
    def _setDB(self, odb):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
230 231 232
        """Begin a new transaction.

        Any objects modified since the last transaction are invalidated.
Shane Hathaway's avatar
Shane Hathaway committed
233
        """
234 235
        self._db = odb
        self._storage = odb._storage
236
        self._sortKey = odb._storage.sortKey
237
        self.new_oid = odb._storage.new_oid
238
        if self._reset_counter != global_reset_counter:
Shane Hathaway's avatar
Shane Hathaway committed
239 240 241
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
242
            self._flush_invalidations()
243 244
        self._reader = ConnectionObjectReader(self, self._cache,
                                              self._db._classFactory)
245
        self._opened = time()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
246 247 248

        return self

Shane Hathaway's avatar
Shane Hathaway committed
249
    def _resetCache(self):
250 251 252 253 254
        """Creates a new cache, discarding the old.

        See the docstring for the resetCaches() function.
        """
        self._reset_counter = global_reset_counter
Shane Hathaway's avatar
Shane Hathaway committed
255
        self._invalidated.clear()
256 257
        cache_size = self._cache.cache_size
        self._cache = cache = PickleCache(self, cache_size)
258
        self._incrgc = self.cacheGC = cache.incrgc
Shane Hathaway's avatar
Shane Hathaway committed
259

Jim Fulton's avatar
Jim Fulton committed
260 261 262 263 264
    def abort(self, object, transaction):
        """Abort the object in the transaction.

        This just deactivates the thing.
        """
265
        if object is self:
266
            self._flush_invalidations()
267
        else:
268 269 270 271 272 273 274 275
            oid = object._p_oid
            assert oid is not None
            if oid in self._added:
                del self._added[oid]
                del object._p_jar
                del object._p_oid
            else:
                self._cache.invalidate(object._p_oid)
Jim Fulton's avatar
Jim Fulton committed
276

277 278
    def cacheFullSweep(self, dt=0):
        self._cache.full_sweep(dt)
279

280
    def cacheMinimize(self, dt=0):
281 282
        # dt is ignored
        self._cache.minimize()
283

284
    __onCloseCallbacks = None
285

286
    def onCloseCallback(self, f):
287 288 289
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)
290

Jim Fulton's avatar
alpha1  
Jim Fulton committed
291
    def close(self):
292 293
        if self._incrgc is not None:
            self._incrgc() # This is a good time to do some GC
Jim Fulton's avatar
alpha1  
Jim Fulton committed
294

295
        # Call the close callbacks.
296 297
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
Jeremy Hylton's avatar
Jeremy Hylton committed
298 299 300 301 302 303
                try:
                    f()
                except: # except what?
                    f = getattr(f, 'im_self', f)
                    self._log.error("Close callback failed for %s", f,
                                    sys.exc_info())
304
            self.__onCloseCallbacks = None
305 306
        self._storage = self._tmp = self.new_oid = self._opened = None
        self._debug_info = ()
307
        # Return the connection to the pool.
308
        self._db._closeConnection(self)
309

310
    def commit(self, object, transaction):
311
        if object is self:
312
            # We registered ourself.  Execute a commit action, if any.
313 314 315
            if self._import:
                self._importDuringCommit(transaction, *self._import)
                self._import = None
316
            return
317

Jeremy Hylton's avatar
Jeremy Hylton committed
318
        oid = object._p_oid
319
        if self._conflicts.has_key(oid):
320
            self.getTransaction().register(object)
321
            raise ReadConflictError(object=object)
322

Jeremy Hylton's avatar
Jeremy Hylton committed
323
        invalid = self._invalid
324 325 326 327 328 329 330

        # XXX In the case of a new object or an object added using add(),
        #     the oid is appended to _creating.
        #     However, this ought to be unnecessary because the _p_serial
        #     of the object will be z64 or None, so it will be appended
        #     to _creating about 30 lines down. The removal from _added
        #     ought likewise to be unnecessary.
Jim Fulton's avatar
 
Jim Fulton committed
331
        if oid is None or object._p_jar is not self:
332
            # new object
Jim Fulton's avatar
 
Jim Fulton committed
333
            oid = self.new_oid()
Jeremy Hylton's avatar
Jeremy Hylton committed
334 335
            object._p_jar = self
            object._p_oid = oid
336 337 338
            self._creating.append(oid) # maybe don't need this
        elif oid in self._added:
            # maybe don't need these
339
            self._creating.append(oid)
340
            del self._added[oid]
Jim Fulton's avatar
 
Jim Fulton committed
341
        elif object._p_changed:
342 343 344 345 346
            if invalid(oid):
                resolve = getattr(object, "_p_resolveConflict", None)
                if resolve is None:
                    raise ConflictError(object=object)
            self._modified.append(oid)
Jim Fulton's avatar
 
Jim Fulton committed
347 348 349 350
        else:
            # Nothing to do
            return

351
        w = ObjectWriter(object)
352 353 354 355 356 357 358 359 360 361 362 363 364
        self._added_during_commit = []
        try:
            for obj in itertools.chain(w, self._added_during_commit):
                oid = obj._p_oid
                serial = getattr(obj, '_p_serial', z64)

                # XXX which one? z64 or None? Why do I have to check both?
                if serial == z64 or serial is None:
                    # new object
                    self._creating.append(oid)
                    # If this object was added, it is now in _creating, so can
                    # be removed from _added.
                    self._added.pop(oid, None)
365
                else:
366 367 368 369 370
                    if (invalid(oid)
                        and not hasattr(object, '_p_resolveConflict')):
                        raise ConflictError(object=obj)
                    self._modified.append(oid)
                p = w.serialize(obj)  # This calls __getstate__ of obj
371

372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
                s = self._storage.store(oid, serial, p, self._version,
                                        transaction)
                self._store_count = self._store_count + 1
                # Put the object in the cache before handling the
                # response, just in case the response contains the
                # serial number for a newly created object
                try:
                    self._cache[oid] = obj
                except:
                    # Dang, I bet its wrapped:
                    if hasattr(obj, 'aq_base'):
                        self._cache[oid] = obj.aq_base
                    else:
                        raise

                self._handle_serial(s, oid)
        finally:
            del self._added_during_commit
390

391
    def commit_sub(self, t):
392
        """Commit all work done in all subtransactions for this transaction"""
393
        tmp=self._tmp
394
        if tmp is None: return
395
        src=self._storage
396

Jeremy Hylton's avatar
Jeremy Hylton committed
397 398
        self._log.debug("Commiting subtransaction of size %s",
                        src.getSize())
399

400
        self._storage=tmp
401
        self._tmp=None
402 403

        tmp.tpc_begin(t)
404

405 406 407
        load=src.load
        store=tmp.store
        dest=self._version
408
        oids=src._index.keys()
409 410

        # Copy invalidating and creating info from temporary storage:
411 412 413
        modified = self._modified
        modified[len(modified):] = oids
        creating = self._creating
414
        creating[len(creating):]=src._creating
415

416
        for oid in oids:
417 418
            data, serial = load(oid, src)
            s=store(oid, serial, data, dest, t)
419
            self._handle_serial(s, oid, change=0)
420 421

    def abort_sub(self, t):
422
        """Abort work done in all subtransactions for this transaction"""
423 424 425 426 427
        tmp=self._tmp
        if tmp is None: return
        src=self._storage
        self._tmp=None
        self._storage=tmp
428

429
        self._cache.invalidate(src._index.keys())
430 431 432 433 434 435 436 437 438 439 440 441 442 443
        self._invalidate_creating(src._creating)

    def _invalidate_creating(self, creating=None):
        """Dissown any objects newly saved in an uncommitted transaction.
        """
        if creating is None:
            creating=self._creating
            self._creating=[]

        cache=self._cache
        cache_get=cache.get
        for oid in creating:
            o=cache_get(oid, None)
            if o is not None:
444
                del cache[oid]
445 446 447
                del o._p_jar
                del o._p_oid

448 449
    def db(self):
        return self._db
450

451 452
    def getVersion(self):
        return self._version
453

454 455 456
    def isReadOnly(self):
        return self._storage.isReadOnly()

Jeremy Hylton's avatar
Jeremy Hylton committed
457
    def invalidate(self, tid, oids):
458
        """Invalidate a set of oids.
Jim Fulton's avatar
alpha1  
Jim Fulton committed
459 460 461 462 463

        This marks the oid as invalid, but doesn't actually invalidate
        it.  The object data will be actually invalidated at certain
        transaction boundaries.
        """
464 465
        self._inv_lock.acquire()
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
466 467
            if self._txn_time is None:
                self._txn_time = tid
468 469 470 471 472 473 474
            self._invalidated.update(oids)
        finally:
            self._inv_lock.release()

    def _flush_invalidations(self):
        self._inv_lock.acquire()
        try:
475 476
            for oid in self._noncurrent:
                assert oid in self._invalidated
477 478
            self._cache.invalidate(self._invalidated)
            self._invalidated.clear()
479
            self._noncurrent.clear()
Jeremy Hylton's avatar
Jeremy Hylton committed
480
            self._txn_time = None
481 482 483 484
        finally:
            self._inv_lock.release()
        # Now is a good time to collect some garbage
        self._cache.incrgc()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
485

486
    def modifiedInVersion(self, oid):
Jeremy Hylton's avatar
Jeremy Hylton committed
487 488
        try:
            return self._db.modifiedInVersion(oid)
489 490
        except KeyError:
            return self._version
Jim Fulton's avatar
alpha1  
Jim Fulton committed
491

492 493 494 495 496 497 498
    def register(self, object):
        """Register an object with the appropriate transaction manager.

        A subclass could override this method to customize the default
        policy of one transaction manager for each thread.
        """
        assert object._p_jar is self
Jeremy Hylton's avatar
Jeremy Hylton committed
499
        # XXX Figure out why this assert causes test failures
500
        assert object._p_oid is not None
501
        self.getTransaction().register(object)
502 503

    def root(self):
504
        return self[z64]
Jim Fulton's avatar
alpha1  
Jim Fulton committed
505

506 507
    def setstate(self, obj):
        oid = obj._p_oid
508 509

        if self._storage is None:
510
            msg = ("Shouldn't load state for %s "
Jeremy Hylton's avatar
Jeremy Hylton committed
511
                   "when the connection is closed" % oid_repr(oid))
Jeremy Hylton's avatar
Jeremy Hylton committed
512
            self._log.error(msg)
513 514
            raise RuntimeError(msg)

515
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
516
            self._setstate(obj)
517 518
        except ConflictError:
            raise
519
        except:
Jeremy Hylton's avatar
Jeremy Hylton committed
520 521
            self._log.error("Couldn't load state for %s", oid_repr(oid),
                            exc_info=sys.exc_info())
522
            raise
523

Jeremy Hylton's avatar
Jeremy Hylton committed
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
    def _setstate(self, obj):
        # Helper for setstate(), which provides logging of failures.

        # The control flow is complicated here to avoid loading an
        # object revision that we are sure we aren't going to use.  As
        # a result, invalidation tests occur before and after the
        # load.  We can only be sure about invalidations after the
        # load.

        # If an object has been invalidated, there are several cases
        # to consider:
        # 1. Check _p_independent()
        # 2. Try MVCC
        # 3. Raise ConflictError.

        # Does anything actually use _p_independent()?  It would simplify
        # the code if we could drop support for it.

        # There is a harmless data race with self._invalidated.  A
        # dict update could go on in another thread, but we don't care
        # because we have to check again after the load anyway.
        if (obj._p_oid in self._invalidated
            and not myhasattr(obj, "_p_independent")):
            # If the object has _p_independent(), we will handle it below.
548 549
            self._load_before_or_conflict(obj)
            return
Jeremy Hylton's avatar
Jeremy Hylton committed
550 551 552 553

        p, serial = self._storage.load(obj._p_oid, self._version)
        self._load_count += 1

554 555
        self._inv_lock.acquire()
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
556
            invalid = obj._p_oid in self._invalidated
557 558 559
        finally:
            self._inv_lock.release()

Jeremy Hylton's avatar
Jeremy Hylton committed
560 561 562 563 564
        if invalid:
            if myhasattr(obj, "_p_independent"):
                # This call will raise a ReadConflictError if something
                # goes wrong
                self._handle_independent(obj)
565 566 567
            else:
                self._load_before_or_conflict(obj)
                return
Jeremy Hylton's avatar
Jeremy Hylton committed
568 569 570 571

        self._reader.setGhostState(obj, p)
        obj._p_serial = serial

572 573
    def _load_before_or_conflict(self, obj):
        """Load non-current state for obj or raise ReadConflictError."""
Tim Peters's avatar
Tim Peters committed
574

575 576
        if not (self._mvcc and self._setstate_noncurrent(obj)):
            self.getTransaction().register(obj)
577
            self._conflicts[obj._p_oid] = True
578 579
            raise ReadConflictError(object=obj)

Jeremy Hylton's avatar
Jeremy Hylton committed
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
    def _setstate_noncurrent(self, obj):
        """Set state using non-current data.

        Return True if state was available, False if not.
        """
        try:
            # Load data that was current before the commit at txn_time.
            t = self._storage.loadBefore(obj._p_oid, self._txn_time)
        except KeyError:
            return False
        if t is None:
            return False
        data, start, end = t
        # The non-current transaction must have been written before
        # txn_time.  It must be current at txn_time, but could have
        # been modified at txn_time.

597 598 599 600 601 602 603
        # It's possible that end is None.  The _txn_time is set by an
        # invalidation for one specific object, but it used for the
        # load time for all objects.  If an object hasn't been
        # modified since _txn_time, it's end tid will be None.
        assert start < self._txn_time, (u64(start), u64(self._txn_time))
        assert end is None or self._txn_time <= end, \
               (u64(self._txn_time), u64(end))
604 605
        if end is not None:
            self._noncurrent[obj._p_oid] = True
Jeremy Hylton's avatar
Jeremy Hylton committed
606 607
        self._reader.setGhostState(obj, data)
        obj._p_serial = start
608
        return True
Jeremy Hylton's avatar
Jeremy Hylton committed
609

610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
    def _handle_independent(self, obj):
        # Helper method for setstate() handles possibly independent objects
        # Call _p_independent(), if it returns True, setstate() wins.
        # Otherwise, raise a ConflictError.

        if obj._p_independent():
            self._inv_lock.acquire()
            try:
                try:
                    del self._invalidated[obj._p_oid]
                except KeyError:
                    pass
            finally:
                self._inv_lock.release()
        else:
            self.getTransaction().register(obj)
            raise ReadConflictError(object=obj)
627

628 629 630
    def oldstate(self, obj, serial):
        p = self._storage.loadSerial(obj._p_oid, serial)
        return self._reader.getState(p)
631

632 633 634
    def setklassstate(self, obj):
        # Special case code to handle ZClasses, I think.
        # Called the cache when an object of type type is invalidated.
635
        try:
636
            oid = obj._p_oid
637
            p, serial = self._storage.load(oid, self._version)
638

639 640 641 642 643
            # We call getGhost(), but we actually get a non-ghost back.
            # The object is a class, which can't actually be ghosted.
            copy = self._reader.getGhost(p)
            obj.__dict__.clear()
            obj.__dict__.update(copy.__dict__)
644

645 646 647 648
            obj._p_oid = oid
            obj._p_jar = self
            obj._p_changed = 0
            obj._p_serial = serial
649
        except:
Jeremy Hylton's avatar
Jeremy Hylton committed
650
            self._log.error("setklassstate failed", exc_info=sys.exc_info())
651
            raise
652

Jim Fulton's avatar
alpha1  
Jim Fulton committed
653
    def tpc_abort(self, transaction):
654 655
        if self._import:
            self._import = None
Jim Fulton's avatar
alpha1  
Jim Fulton committed
656
        self._storage.tpc_abort(transaction)
657 658 659
        self._cache.invalidate(self._modified)
        self._flush_invalidations()
        self._conflicts.clear()
660
        self._invalidate_creating()
661 662 663 664
        while self._added:
            oid, obj = self._added.popitem()
            del obj._p_oid
            del obj._p_jar
Jim Fulton's avatar
alpha1  
Jim Fulton committed
665

666
    def tpc_begin(self, transaction, sub=None):
667
        self._modified = []
Jeremy Hylton's avatar
Jeremy Hylton committed
668
        self._creating = []
669 670
        if sub:
            # Sub-transaction!
671
            if self._tmp is None:
672
                _tmp = TmpStore(self._version)
673 674
                self._tmp = self._storage
                self._storage = _tmp
675 676
                _tmp.registerDB(self._db, 0)

677
        self._storage.tpc_begin(transaction)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
678

679 680
    def tpc_vote(self, transaction):
        try:
681
            vote = self._storage.tpc_vote
682 683 684
        except AttributeError:
            return
        s = vote(transaction)
685 686 687 688 689 690 691 692 693 694 695 696 697
        self._handle_serial(s)

    def _handle_serial(self, store_return, oid=None, change=1):
        """Handle the returns from store() and tpc_vote() calls."""

        # These calls can return different types depending on whether
        # ZEO is used.  ZEO uses asynchronous returns that may be
        # returned in batches by the ClientStorage.  ZEO1 can also
        # return an exception object and expect that the Connection
        # will raise the exception.

        # When commit_sub() exceutes a store, there is no need to
        # update the _p_changed flag, because the subtransaction
Jeremy Hylton's avatar
Jeremy Hylton committed
698
        # tpc_vote() calls already did this.  The change=1 argument
699
        # exists to allow commit_sub() to avoid setting the flag
700
        # again.
701 702 703 704 705

        # When conflict resolution occurs, the object state held by
        # the connection does not match what is written to the
        # database.  Invalidate the object here to guarantee that
        # the new state is read the next time the object is used.
706

707 708
        if not store_return:
            return
709
        if isinstance(store_return, str):
710
            assert oid is not None
711
            self._handle_one_serial(oid, store_return, change)
712 713
        else:
            for oid, serial in store_return:
714 715 716
                self._handle_one_serial(oid, serial, change)

    def _handle_one_serial(self, oid, serial, change):
717
        if not isinstance(serial, str):
718 719 720 721 722 723 724 725 726 727
            raise serial
        obj = self._cache.get(oid, None)
        if obj is None:
            return
        if serial == ResolvedSerial:
            del obj._p_changed # transition from changed to ghost
        else:
            if change:
                obj._p_changed = 0 # trans. from changed to uptodate
            obj._p_serial = serial
728

Jim Fulton's avatar
alpha1  
Jim Fulton committed
729
    def tpc_finish(self, transaction):
730
        # It's important that the storage call the function we pass
731 732 733 734
        # while it still has it's lock.  We don't want another thread
        # to be able to read any updated data until we've had a chance
        # to send an invalidation message to all of the other
        # connections!
735 736 737 738

        if self._tmp is not None:
            # Commiting a subtransaction!
            # There is no need to invalidate anything.
739
            self._storage.tpc_finish(transaction)
740 741 742
            self._storage._creating[:0]=self._creating
            del self._creating[:]
        else:
Jeremy Hylton's avatar
Jeremy Hylton committed
743
            def callback(tid):
744 745
                d = {}
                for oid in self._modified:
746
                    d[oid] = 1
Jeremy Hylton's avatar
Jeremy Hylton committed
747
                self._db.invalidate(tid, d, self)
748
            self._storage.tpc_finish(transaction, callback)
749

750 751
        self._conflicts.clear()
        self._flush_invalidations()
752

753
    def sync(self):
754
        self.getTransaction().abort()
755 756 757
        sync = getattr(self._storage, 'sync', 0)
        if sync:
            sync()
758
        self._flush_invalidations()
759

760 761
    def getDebugInfo(self):
        return self._debug_info
762

763 764
    def setDebugInfo(self, *args):
        self._debug_info = self._debug_info + args
765

766 767 768 769 770
    def getTransferCounts(self, clear=0):
        """Returns the number of objects loaded and stored.

        Set the clear argument to reset the counters.
        """
771
        res = self._load_count, self._store_count
772 773 774 775 776
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res

777
    def exchange(self, old, new):
778 779 780 781 782
        # called by a ZClasses method that isn't executed by the test suite
        oid = old._p_oid
        new._p_oid = oid
        new._p_jar = self
        new._p_changed = 1
783
        self.getTransaction().register(new)
784
        self._cache[oid] = new