Connection.py 26.1 KB
Newer Older
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
Guido van Rossum's avatar
Guido van Rossum committed
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
#
matt@zope.com's avatar
matt@zope.com committed
6 7 8 9 10 11
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
Jim Fulton's avatar
alpha1  
Jim Fulton committed
13 14 15
##############################################################################
"""Database connection support

16
$Id: Connection.py,v 1.119 2004/02/23 08:23:46 stevea Exp $"""
Jeremy Hylton's avatar
Jeremy Hylton committed
17

Jeremy Hylton's avatar
Jeremy Hylton committed
18
import logging
19
import sys
20
import threading
21
import itertools
22
from time import time
23
from utils import u64
Jeremy Hylton's avatar
Jeremy Hylton committed
24

25 26 27 28 29 30
from persistent import PickleCache
from zLOG import LOG, ERROR, BLATHER, WARNING

from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB.POSException \
31
     import ConflictError, ReadConflictError, InvalidObjectReference
32 33 34
from ZODB.TmpStore import TmpStore
from ZODB.Transaction import Transaction, get_transaction
from ZODB.utils import oid_repr, z64
35
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
36

37 38 39 40
global_reset_counter = 0

def resetCaches():
    """Causes all connection caches to be reset as connections are reopened.
Sidnei da Silva's avatar
Sidnei da Silva committed
41

42 43 44 45 46 47
    Zope's refresh feature uses this.  When you reload Python modules,
    instances of classes continue to use the old class definitions.
    To use the new code immediately, the refresh feature asks ZODB to
    clear caches by calling resetCaches().  When the instances are
    loaded by subsequent connections, they will use the new class
    definitions.
Barry Warsaw's avatar
Typos  
Barry Warsaw committed
48
    """
49 50
    global global_reset_counter
    global_reset_counter += 1
Shane Hathaway's avatar
Shane Hathaway committed
51

52
class Connection(ExportImport, object):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
53 54 55
    """Object managers for individual object space.

    An object space is a version of collection of objects.  In a
Barry Warsaw's avatar
Typos  
Barry Warsaw committed
56
    multi-threaded application, each thread gets its own object space.
Jim Fulton's avatar
alpha1  
Jim Fulton committed
57 58 59

    The Connection manages movement of objects in and out of object storage.
    """
Jeremy Hylton's avatar
Jeremy Hylton committed
60 61 62 63
    _tmp = None
    _debug_info = ()
    _opened = None
    _code_timestamp = 0
64
    _transaction = None
65
    _added_during_commit = None
Jim Fulton's avatar
alpha1  
Jim Fulton committed
66

67
    def __init__(self, version='', cache_size=400,
Jeremy Hylton's avatar
Jeremy Hylton committed
68
                 cache_deactivate_after=60, mvcc=True):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
69
        """Create a new Connection"""
Jeremy Hylton's avatar
Jeremy Hylton committed
70 71 72 73

        self._log = logging.getLogger("zodb.conn")

        self._version = version
74
        self._cache = cache = PickleCache(self, cache_size)
75 76 77 78
        if version:
            # Caches for versions end up empty if the version
            # is not used for a while. Non-version caches
            # keep their content indefinitely.
79 80 81

            # XXX Why do we want version caches to behave this way?

82
            self._cache.cache_drain_resistance = 100
83
        self._incrgc = self.cacheGC = cache.incrgc
84 85
        self._committed = []
        self._added = {}
86
        self._reset_counter = global_reset_counter
87 88
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored
Jim Fulton's avatar
alpha1  
Jim Fulton committed
89

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
        # _invalidated queues invalidate messages delivered from the DB
        # _inv_lock prevents one thread from modifying the set while
        # another is processing invalidations.  All the invalidations
        # from a single transaction should be applied atomically, so
        # the lock must be held when reading _invalidated.

        # XXX It sucks that we have to hold the lock to read
        # _invalidated.  Normally, _invalidated is written by call
        # dict.update, which will execute atomically by virtue of the
        # GIL.  But some storage might generate oids where hash or
        # compare invokes Python code.  In that case, the GIL can't
        # save us.
        self._inv_lock = threading.Lock()
        self._invalidated = d = {}
        self._invalid = d.has_key
        self._conflicts = {}
Jeremy Hylton's avatar
Jeremy Hylton committed
106 107 108 109 110 111 112 113 114 115
        self._noncurrent = {}

        # If MVCC is enabled, then _mvcc is True and _txn_time stores
        # the upper bound on transactions visible to this connection.
        # That is, all object revisions must be written before _txn_time.
        # If it is None, then the current revisions are acceptable.
        # If the connection is in a version, mvcc will be disabled, because
        # loadBefore() only returns non-version data.
        self._mvcc = mvcc and not version
        self._txn_time = None
116

117 118 119 120 121 122
        # To support importFile(), implemented in the ExportImport base
        # class, we need to run _importDuringCommit() from our commit()
        # method.  If _import is not None, it is a two-tuple of arguments
        # to pass to _importDuringCommit().
        self._import = None

123 124 125 126 127 128 129 130 131 132 133 134 135
    def getTransaction(self):
        t = self._transaction
        if t is None:
            # Fall back to thread-bound transactions
            t = get_transaction()
        return t

    def setLocalTransaction(self):
        """Use a transaction bound to the connection rather than the thread"""
        if self._transaction is None:
            self._transaction = Transaction()
        return self._transaction

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
        # remove those items that are on the lru list
        for k,v in items:
            del everything[k]
        # return a list of [ghosts....not recently used.....recently used]
        return everything.items() + items

    def __repr__(self):
        if self._version:
            ver = ' (in version %s)' % `self._version`
        else:
            ver = ''
152
        return '<Connection at %08x%s>' % (id(self), ver)
153

154
    def __getitem__(self, oid):
155
        obj = self._cache.get(oid, None)
156 157 158
        if obj is not None:
            return obj
        obj = self._added.get(oid, None)
159
        if obj is not None:
160
            return obj
Jim Fulton's avatar
alpha1  
Jim Fulton committed
161

Jim Fulton's avatar
Jim Fulton committed
162
        p, serial = self._storage.load(oid, self._version)
163
        obj = self._reader.getGhost(p)
164

165 166 167 168
        obj._p_oid = oid
        obj._p_jar = self
        obj._p_changed = None
        obj._p_serial = serial
169

170 171
        self._cache[oid] = obj
        return obj
Jim Fulton's avatar
alpha1  
Jim Fulton committed
172

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
    def add(self, obj):
        marker = object()
        oid = getattr(obj, "_p_oid", marker)
        if oid is marker:
            raise TypeError("Only first-class persistent objects may be"
                            " added to a Connection.", obj)
        elif obj._p_jar is None:
            oid = obj._p_oid = self._storage.new_oid()
            obj._p_jar = self
            self._added[oid] = obj
            if self._added_during_commit is not None:
                self._added_during_commit.append(obj)
        elif obj._p_jar is not self:
            raise InvalidObjectReference(obj, obj._p_jar)

188 189 190 191 192 193 194 195
    def sortKey(self):
        # XXX will raise an exception if the DB hasn't been set
        storage_key = self._sortKey()
        # If two connections use the same storage, give them a
        # consistent order using id().  This is unique for the
        # lifetime of a connection, which is good enough.
        return "%s:%s" % (storage_key, id(self))

196
    def _setDB(self, odb):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
197 198 199
        """Begin a new transaction.

        Any objects modified since the last transaction are invalidated.
Shane Hathaway's avatar
Shane Hathaway committed
200
        """
201 202
        self._db = odb
        self._storage = odb._storage
203
        self._sortKey = odb._storage.sortKey
204
        self.new_oid = odb._storage.new_oid
205
        if self._reset_counter != global_reset_counter:
Shane Hathaway's avatar
Shane Hathaway committed
206 207 208
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
209
            self._flush_invalidations()
210 211
        self._reader = ConnectionObjectReader(self, self._cache,
                                              self._db._classFactory)
212
        self._opened = time()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
213 214 215

        return self

Shane Hathaway's avatar
Shane Hathaway committed
216
    def _resetCache(self):
217 218 219 220 221
        """Creates a new cache, discarding the old.

        See the docstring for the resetCaches() function.
        """
        self._reset_counter = global_reset_counter
Shane Hathaway's avatar
Shane Hathaway committed
222
        self._invalidated.clear()
223 224
        cache_size = self._cache.cache_size
        self._cache = cache = PickleCache(self, cache_size)
225
        self._incrgc = self.cacheGC = cache.incrgc
Shane Hathaway's avatar
Shane Hathaway committed
226

Jim Fulton's avatar
Jim Fulton committed
227 228 229 230 231
    def abort(self, object, transaction):
        """Abort the object in the transaction.

        This just deactivates the thing.
        """
232
        if object is self:
233
            self._flush_invalidations()
234
        else:
235 236 237 238 239 240 241 242
            oid = object._p_oid
            assert oid is not None
            if oid in self._added:
                del self._added[oid]
                del object._p_jar
                del object._p_oid
            else:
                self._cache.invalidate(object._p_oid)
Jim Fulton's avatar
Jim Fulton committed
243

244 245
    def cacheFullSweep(self, dt=0):
        self._cache.full_sweep(dt)
246

247
    def cacheMinimize(self, dt=0):
248 249
        # dt is ignored
        self._cache.minimize()
250

251
    __onCloseCallbacks = None
252

253
    def onCloseCallback(self, f):
254 255 256
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)
257

Jim Fulton's avatar
alpha1  
Jim Fulton committed
258
    def close(self):
259 260
        if self._incrgc is not None:
            self._incrgc() # This is a good time to do some GC
Jim Fulton's avatar
alpha1  
Jim Fulton committed
261

262
        # Call the close callbacks.
263 264
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
Jeremy Hylton's avatar
Jeremy Hylton committed
265 266 267 268 269 270
                try:
                    f()
                except: # except what?
                    f = getattr(f, 'im_self', f)
                    self._log.error("Close callback failed for %s", f,
                                    sys.exc_info())
271
            self.__onCloseCallbacks = None
272 273
        self._storage = self._tmp = self.new_oid = self._opened = None
        self._debug_info = ()
274
        # Return the connection to the pool.
275
        self._db._closeConnection(self)
276

277
    def commit(self, object, transaction):
278
        if object is self:
279
            # We registered ourself.  Execute a commit action, if any.
280 281 282
            if self._import:
                self._importDuringCommit(transaction, *self._import)
                self._import = None
283
            return
284

Jeremy Hylton's avatar
Jeremy Hylton committed
285
        oid = object._p_oid
286
        if self._conflicts.has_key(oid):
287
            self.getTransaction().register(object)
288
            raise ReadConflictError(object=object)
289

Jeremy Hylton's avatar
Jeremy Hylton committed
290
        invalid = self._invalid
291 292 293 294 295 296 297

        # XXX In the case of a new object or an object added using add(),
        #     the oid is appended to _creating.
        #     However, this ought to be unnecessary because the _p_serial
        #     of the object will be z64 or None, so it will be appended
        #     to _creating about 30 lines down. The removal from _added
        #     ought likewise to be unnecessary.
Jim Fulton's avatar
 
Jim Fulton committed
298
        if oid is None or object._p_jar is not self:
299
            # new object
Jim Fulton's avatar
 
Jim Fulton committed
300
            oid = self.new_oid()
Jeremy Hylton's avatar
Jeremy Hylton committed
301 302
            object._p_jar = self
            object._p_oid = oid
303 304 305
            self._creating.append(oid) # maybe don't need this
        elif oid in self._added:
            # maybe don't need these
306
            self._creating.append(oid)
307
            del self._added[oid]
Jim Fulton's avatar
 
Jim Fulton committed
308
        elif object._p_changed:
309 310 311 312 313
            if invalid(oid):
                resolve = getattr(object, "_p_resolveConflict", None)
                if resolve is None:
                    raise ConflictError(object=object)
            self._modified.append(oid)
Jim Fulton's avatar
 
Jim Fulton committed
314 315 316 317
        else:
            # Nothing to do
            return

318
        w = ObjectWriter(object)
319 320 321 322 323 324 325 326 327 328 329 330 331
        self._added_during_commit = []
        try:
            for obj in itertools.chain(w, self._added_during_commit):
                oid = obj._p_oid
                serial = getattr(obj, '_p_serial', z64)

                # XXX which one? z64 or None? Why do I have to check both?
                if serial == z64 or serial is None:
                    # new object
                    self._creating.append(oid)
                    # If this object was added, it is now in _creating, so can
                    # be removed from _added.
                    self._added.pop(oid, None)
332
                else:
333 334 335 336 337
                    if (invalid(oid)
                        and not hasattr(object, '_p_resolveConflict')):
                        raise ConflictError(object=obj)
                    self._modified.append(oid)
                p = w.serialize(obj)  # This calls __getstate__ of obj
338

339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
                s = self._storage.store(oid, serial, p, self._version,
                                        transaction)
                self._store_count = self._store_count + 1
                # Put the object in the cache before handling the
                # response, just in case the response contains the
                # serial number for a newly created object
                try:
                    self._cache[oid] = obj
                except:
                    # Dang, I bet its wrapped:
                    if hasattr(obj, 'aq_base'):
                        self._cache[oid] = obj.aq_base
                    else:
                        raise

                self._handle_serial(s, oid)
        finally:
            del self._added_during_commit
357

358
    def commit_sub(self, t):
359
        """Commit all work done in all subtransactions for this transaction"""
360
        tmp=self._tmp
361
        if tmp is None: return
362
        src=self._storage
363

Jeremy Hylton's avatar
Jeremy Hylton committed
364 365
        self._log.debug("Commiting subtransaction of size %s",
                        src.getSize())
366

367
        self._storage=tmp
368
        self._tmp=None
369 370

        tmp.tpc_begin(t)
371

372 373 374
        load=src.load
        store=tmp.store
        dest=self._version
375
        oids=src._index.keys()
376 377

        # Copy invalidating and creating info from temporary storage:
378 379 380
        modified = self._modified
        modified[len(modified):] = oids
        creating = self._creating
381
        creating[len(creating):]=src._creating
382

383
        for oid in oids:
384 385
            data, serial = load(oid, src)
            s=store(oid, serial, data, dest, t)
386
            self._handle_serial(s, oid, change=0)
387 388

    def abort_sub(self, t):
389
        """Abort work done in all subtransactions for this transaction"""
390 391 392 393 394
        tmp=self._tmp
        if tmp is None: return
        src=self._storage
        self._tmp=None
        self._storage=tmp
395

396
        self._cache.invalidate(src._index.keys())
397 398 399 400 401 402 403 404 405 406 407 408 409 410
        self._invalidate_creating(src._creating)

    def _invalidate_creating(self, creating=None):
        """Dissown any objects newly saved in an uncommitted transaction.
        """
        if creating is None:
            creating=self._creating
            self._creating=[]

        cache=self._cache
        cache_get=cache.get
        for oid in creating:
            o=cache_get(oid, None)
            if o is not None:
411
                del cache[oid]
412 413 414
                del o._p_jar
                del o._p_oid

415 416
    def db(self):
        return self._db
417

418 419
    def getVersion(self):
        return self._version
420

421 422 423
    def isReadOnly(self):
        return self._storage.isReadOnly()

Jeremy Hylton's avatar
Jeremy Hylton committed
424
    def invalidate(self, tid, oids):
425
        """Invalidate a set of oids.
Jim Fulton's avatar
alpha1  
Jim Fulton committed
426 427 428 429 430

        This marks the oid as invalid, but doesn't actually invalidate
        it.  The object data will be actually invalidated at certain
        transaction boundaries.
        """
431 432
        self._inv_lock.acquire()
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
433 434
            if self._txn_time is None:
                self._txn_time = tid
435 436 437 438 439 440 441
            self._invalidated.update(oids)
        finally:
            self._inv_lock.release()

    def _flush_invalidations(self):
        self._inv_lock.acquire()
        try:
442 443
            for oid in self._noncurrent:
                assert oid in self._invalidated
444 445
            self._cache.invalidate(self._invalidated)
            self._invalidated.clear()
446
            self._noncurrent.clear()
Jeremy Hylton's avatar
Jeremy Hylton committed
447
            self._txn_time = None
448 449 450 451
        finally:
            self._inv_lock.release()
        # Now is a good time to collect some garbage
        self._cache.incrgc()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
452

453
    def modifiedInVersion(self, oid):
Jeremy Hylton's avatar
Jeremy Hylton committed
454 455
        try:
            return self._db.modifiedInVersion(oid)
456 457
        except KeyError:
            return self._version
Jim Fulton's avatar
alpha1  
Jim Fulton committed
458

459 460 461 462 463 464 465
    def register(self, object):
        """Register an object with the appropriate transaction manager.

        A subclass could override this method to customize the default
        policy of one transaction manager for each thread.
        """
        assert object._p_jar is self
Jeremy Hylton's avatar
Jeremy Hylton committed
466 467
        # XXX Figure out why this assert causes test failures
        # assert object._p_oid is not None
468
        self.getTransaction().register(object)
469 470

    def root(self):
471
        return self[z64]
Jim Fulton's avatar
alpha1  
Jim Fulton committed
472

473 474
    def setstate(self, obj):
        oid = obj._p_oid
475 476

        if self._storage is None:
477
            msg = ("Shouldn't load state for %s "
Jeremy Hylton's avatar
Jeremy Hylton committed
478
                   "when the connection is closed" % oid_repr(oid))
Jeremy Hylton's avatar
Jeremy Hylton committed
479
            self._log.error(msg)
480 481
            raise RuntimeError(msg)

482
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
483
            self._setstate(obj)
484 485
        except ConflictError:
            raise
486
        except:
Jeremy Hylton's avatar
Jeremy Hylton committed
487 488
            self._log.error("Couldn't load state for %s", oid_repr(oid),
                            exc_info=sys.exc_info())
489
            raise
490

Jeremy Hylton's avatar
Jeremy Hylton committed
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
    def _setstate(self, obj):
        # Helper for setstate(), which provides logging of failures.

        # The control flow is complicated here to avoid loading an
        # object revision that we are sure we aren't going to use.  As
        # a result, invalidation tests occur before and after the
        # load.  We can only be sure about invalidations after the
        # load.

        # If an object has been invalidated, there are several cases
        # to consider:
        # 1. Check _p_independent()
        # 2. Try MVCC
        # 3. Raise ConflictError.

        # Does anything actually use _p_independent()?  It would simplify
        # the code if we could drop support for it.

        # There is a harmless data race with self._invalidated.  A
        # dict update could go on in another thread, but we don't care
        # because we have to check again after the load anyway.
        if (obj._p_oid in self._invalidated
            and not myhasattr(obj, "_p_independent")):
            # If the object has _p_independent(), we will handle it below.
515 516
            self._load_before_or_conflict(obj)
            return
Jeremy Hylton's avatar
Jeremy Hylton committed
517 518 519 520

        p, serial = self._storage.load(obj._p_oid, self._version)
        self._load_count += 1

521 522
        self._inv_lock.acquire()
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
523
            invalid = obj._p_oid in self._invalidated
524 525 526
        finally:
            self._inv_lock.release()

Jeremy Hylton's avatar
Jeremy Hylton committed
527 528 529 530 531
        if invalid:
            if myhasattr(obj, "_p_independent"):
                # This call will raise a ReadConflictError if something
                # goes wrong
                self._handle_independent(obj)
532 533 534
            else:
                self._load_before_or_conflict(obj)
                return
Jeremy Hylton's avatar
Jeremy Hylton committed
535 536 537 538

        self._reader.setGhostState(obj, p)
        obj._p_serial = serial

539 540
    def _load_before_or_conflict(self, obj):
        """Load non-current state for obj or raise ReadConflictError."""
Tim Peters's avatar
Tim Peters committed
541

542 543
        if not (self._mvcc and self._setstate_noncurrent(obj)):
            self.getTransaction().register(obj)
544
            self._conflicts[obj._p_oid] = True
545 546
            raise ReadConflictError(object=obj)

Jeremy Hylton's avatar
Jeremy Hylton committed
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
    def _setstate_noncurrent(self, obj):
        """Set state using non-current data.

        Return True if state was available, False if not.
        """
        try:
            # Load data that was current before the commit at txn_time.
            t = self._storage.loadBefore(obj._p_oid, self._txn_time)
        except KeyError:
            return False
        if t is None:
            return False
        data, start, end = t
        # The non-current transaction must have been written before
        # txn_time.  It must be current at txn_time, but could have
        # been modified at txn_time.

564 565 566 567 568 569 570
        # It's possible that end is None.  The _txn_time is set by an
        # invalidation for one specific object, but it used for the
        # load time for all objects.  If an object hasn't been
        # modified since _txn_time, it's end tid will be None.
        assert start < self._txn_time, (u64(start), u64(self._txn_time))
        assert end is None or self._txn_time <= end, \
               (u64(self._txn_time), u64(end))
571 572
        if end is not None:
            self._noncurrent[obj._p_oid] = True
Jeremy Hylton's avatar
Jeremy Hylton committed
573 574
        self._reader.setGhostState(obj, data)
        obj._p_serial = start
575
        return True
Jeremy Hylton's avatar
Jeremy Hylton committed
576

577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
    def _handle_independent(self, obj):
        # Helper method for setstate() handles possibly independent objects
        # Call _p_independent(), if it returns True, setstate() wins.
        # Otherwise, raise a ConflictError.

        if obj._p_independent():
            self._inv_lock.acquire()
            try:
                try:
                    del self._invalidated[obj._p_oid]
                except KeyError:
                    pass
            finally:
                self._inv_lock.release()
        else:
            self.getTransaction().register(obj)
            raise ReadConflictError(object=obj)
594

595 596 597
    def oldstate(self, obj, serial):
        p = self._storage.loadSerial(obj._p_oid, serial)
        return self._reader.getState(p)
598

599 600 601
    def setklassstate(self, obj):
        # Special case code to handle ZClasses, I think.
        # Called the cache when an object of type type is invalidated.
602
        try:
603
            oid = obj._p_oid
604
            p, serial = self._storage.load(oid, self._version)
605

606 607 608 609 610
            # We call getGhost(), but we actually get a non-ghost back.
            # The object is a class, which can't actually be ghosted.
            copy = self._reader.getGhost(p)
            obj.__dict__.clear()
            obj.__dict__.update(copy.__dict__)
611

612 613 614 615
            obj._p_oid = oid
            obj._p_jar = self
            obj._p_changed = 0
            obj._p_serial = serial
616
        except:
Jeremy Hylton's avatar
Jeremy Hylton committed
617
            self._log.error("setklassstate failed", exc_info=sys.exc_info())
618
            raise
619

Jim Fulton's avatar
alpha1  
Jim Fulton committed
620
    def tpc_abort(self, transaction):
621 622
        if self._import:
            self._import = None
Jim Fulton's avatar
alpha1  
Jim Fulton committed
623
        self._storage.tpc_abort(transaction)
624 625 626
        self._cache.invalidate(self._modified)
        self._flush_invalidations()
        self._conflicts.clear()
627
        self._invalidate_creating()
628 629 630 631
        while self._added:
            oid, obj = self._added.popitem()
            del obj._p_oid
            del obj._p_jar
Jim Fulton's avatar
alpha1  
Jim Fulton committed
632

633
    def tpc_begin(self, transaction, sub=None):
634
        self._modified = []
Jeremy Hylton's avatar
Jeremy Hylton committed
635
        self._creating = []
636 637
        if sub:
            # Sub-transaction!
638
            if self._tmp is None:
639
                _tmp = TmpStore(self._version)
640 641
                self._tmp = self._storage
                self._storage = _tmp
642 643
                _tmp.registerDB(self._db, 0)

644
        self._storage.tpc_begin(transaction)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
645

646 647
    def tpc_vote(self, transaction):
        try:
648
            vote = self._storage.tpc_vote
649 650 651
        except AttributeError:
            return
        s = vote(transaction)
652 653 654 655 656 657 658 659 660 661 662 663 664
        self._handle_serial(s)

    def _handle_serial(self, store_return, oid=None, change=1):
        """Handle the returns from store() and tpc_vote() calls."""

        # These calls can return different types depending on whether
        # ZEO is used.  ZEO uses asynchronous returns that may be
        # returned in batches by the ClientStorage.  ZEO1 can also
        # return an exception object and expect that the Connection
        # will raise the exception.

        # When commit_sub() exceutes a store, there is no need to
        # update the _p_changed flag, because the subtransaction
Jeremy Hylton's avatar
Jeremy Hylton committed
665
        # tpc_vote() calls already did this.  The change=1 argument
666
        # exists to allow commit_sub() to avoid setting the flag
667
        # again.
668 669 670 671 672

        # When conflict resolution occurs, the object state held by
        # the connection does not match what is written to the
        # database.  Invalidate the object here to guarantee that
        # the new state is read the next time the object is used.
673

674 675
        if not store_return:
            return
676
        if isinstance(store_return, str):
677
            assert oid is not None
678
            self._handle_one_serial(oid, store_return, change)
679 680
        else:
            for oid, serial in store_return:
681 682 683
                self._handle_one_serial(oid, serial, change)

    def _handle_one_serial(self, oid, serial, change):
684
        if not isinstance(serial, str):
685 686 687 688 689 690 691 692 693 694
            raise serial
        obj = self._cache.get(oid, None)
        if obj is None:
            return
        if serial == ResolvedSerial:
            del obj._p_changed # transition from changed to ghost
        else:
            if change:
                obj._p_changed = 0 # trans. from changed to uptodate
            obj._p_serial = serial
695

Jim Fulton's avatar
alpha1  
Jim Fulton committed
696
    def tpc_finish(self, transaction):
697
        # It's important that the storage call the function we pass
698 699 700 701
        # while it still has it's lock.  We don't want another thread
        # to be able to read any updated data until we've had a chance
        # to send an invalidation message to all of the other
        # connections!
702 703 704 705

        if self._tmp is not None:
            # Commiting a subtransaction!
            # There is no need to invalidate anything.
706
            self._storage.tpc_finish(transaction)
707 708 709
            self._storage._creating[:0]=self._creating
            del self._creating[:]
        else:
Jeremy Hylton's avatar
Jeremy Hylton committed
710
            def callback(tid):
711 712
                d = {}
                for oid in self._modified:
713
                    d[oid] = 1
Jeremy Hylton's avatar
Jeremy Hylton committed
714
                self._db.invalidate(tid, d, self)
715
            self._storage.tpc_finish(transaction, callback)
716

717 718
        self._conflicts.clear()
        self._flush_invalidations()
719

720
    def sync(self):
721
        self.getTransaction().abort()
722 723 724
        sync = getattr(self._storage, 'sync', 0)
        if sync:
            sync()
725
        self._flush_invalidations()
726

727 728
    def getDebugInfo(self):
        return self._debug_info
729

730 731
    def setDebugInfo(self, *args):
        self._debug_info = self._debug_info + args
732

733 734 735 736 737
    def getTransferCounts(self, clear=0):
        """Returns the number of objects loaded and stored.

        Set the clear argument to reset the counters.
        """
738
        res = self._load_count, self._store_count
739 740 741 742 743
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res

744
    def exchange(self, old, new):
745 746 747 748 749
        # called by a ZClasses method that isn't executed by the test suite
        oid = old._p_oid
        new._p_oid = oid
        new._p_jar = self
        new._p_changed = 1
750
        self.getTransaction().register(new)
751
        self._cache[oid] = new