Connection.py 23.5 KB
Newer Older
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
Guido van Rossum's avatar
Guido van Rossum committed
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
# 
matt@zope.com's avatar
matt@zope.com committed
6 7 8 9 10 11
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
# 
Jim Fulton's avatar
alpha1  
Jim Fulton committed
13 14 15
##############################################################################
"""Database connection support

16 17
$Id: Connection.py,v 1.67 2002/06/10 20:20:44 shane Exp $"""
__version__='$Revision: 1.67 $'[11:-2]
Jim Fulton's avatar
alpha1  
Jim Fulton committed
18

19
from cPickleCache import PickleCache, MUCH_RING_CHECKING
20
from POSException import ConflictError, ReadConflictError
21
from ExtensionClass import Base
22
import ExportImport, TmpStore
23
from zLOG import LOG, ERROR, BLATHER, WARNING
Jim Fulton's avatar
 
Jim Fulton committed
24
from coptimizations import new_persistent_id
25
from ConflictResolution import ResolvedSerial
26 27 28 29 30 31

from cPickle import Unpickler, Pickler
from cStringIO import StringIO
import sys
from time import time
from types import StringType, ClassType
32

Shane Hathaway's avatar
Shane Hathaway committed
33 34
global_code_timestamp = 0

35 36 37 38 39
if MUCH_RING_CHECKING:
  # To get rid of this warning, change the define inside cPickleCache.c and recompile.
  LOG('ZODB',WARNING, 'Using cPickleCache with low performance (but extra debugging checks)')
del MUCH_RING_CHECKING

Shane Hathaway's avatar
Shane Hathaway committed
40 41 42 43 44 45 46 47 48
def updateCodeTimestamp():
    '''
    Called after changes are made to persistence-based classes.
    Causes all connection caches to be re-created as the
    connections are reopened.
    '''
    global global_code_timestamp
    global_code_timestamp = time()

49
ExtensionKlass=Base.__class__
Jim Fulton's avatar
alpha1  
Jim Fulton committed
50

51
class Connection(ExportImport.ExportImport):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
52 53 54 55 56 57 58 59
    """Object managers for individual object space.

    An object space is a version of collection of objects.  In a
    multi-threaded application, each thread get's it's own object
    space.

    The Connection manages movement of objects in and out of object storage.
    """
60
    _tmp=None
61 62
    _debug_info=()
    _opened=None
Shane Hathaway's avatar
Shane Hathaway committed
63
    _code_timestamp = 0
Jim Fulton's avatar
alpha1  
Jim Fulton committed
64

65 66 67
    # Experimental. Other connections can register to be closed
    # when we close by putting something here.

68
    def __init__(self, version='', cache_size=400,
Jim Fulton's avatar
alpha1  
Jim Fulton committed
69 70 71
                 cache_deactivate_after=60):
        """Create a new Connection"""
        self._version=version
72
        self._cache = cache = PickleCache(self, cache_size)
73 74 75 76
        if version:
            # Caches for versions end up empty if the version
            # is not used for a while. Non-version caches
            # keep their content indefinitely.
77 78 79

            # XXX Why do we want version caches to behave this way?

80
            self._cache.cache_drain_resistance = 100
81
        self._incrgc=self.cacheGC=cache.incrgc
82 83 84
        self._invalidated=d={}
        self._invalid=d.has_key
        self._committed=[]
Shane Hathaway's avatar
Shane Hathaway committed
85
        self._code_timestamp = global_code_timestamp
86 87
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored
Jim Fulton's avatar
alpha1  
Jim Fulton committed
88

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
        # remove those items that are on the lru list
        for k,v in items:
            del everything[k]
        # return a list of [ghosts....not recently used.....recently used]
        return everything.items() + items

    def __repr__(self):
        if self._version:
            ver = ' (in version %s)' % `self._version`
        else:
            ver = ''
        return '<Connection at %08x%s>' % (id(self),ver)

Jim Fulton's avatar
alpha1  
Jim Fulton committed
107 108 109 110 111
    def _breakcr(self):
        try: del self._cache
        except: pass
        try: del self._incrgc
        except: pass
112 113
        try: del self.cacheGC
        except: pass
Jim Fulton's avatar
alpha1  
Jim Fulton committed
114

115 116 117 118
    def __getitem__(self, oid, tt=type(())):
        obj = self._cache.get(oid, None)
        if obj is not None:
          return obj
Jim Fulton's avatar
alpha1  
Jim Fulton committed
119

120
        __traceback_info__ = (oid)
Jim Fulton's avatar
Jim Fulton committed
121
        p, serial = self._storage.load(oid, self._version)
122
        __traceback_info__ = (oid, p)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
123 124 125 126
        file=StringIO(p)
        unpickler=Unpickler(file)
        unpickler.persistent_load=self._persistent_load

127 128 129 130 131
        try:
            object = unpickler.load()
        except:
            raise "Could not load oid %s, pickled data in traceback info may\
            contain clues" % (oid)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
132

Jim Fulton's avatar
Jim Fulton committed
133
        klass, args = object
134 135 136 137 138

        if type(klass) is tt:
            module, name = klass
            klass=self._db._classFactory(self, module, name)
        
Jim Fulton's avatar
Jim Fulton committed
139 140 141
        if (args is None or
            not args and not hasattr(klass,'__getinitargs__')):
            object=klass.__basicnew__()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
142
        else:
Jim Fulton's avatar
Jim Fulton committed
143
            object=apply(klass,args)
144 145
            if klass is not ExtensionKlass:
                object.__dict__.clear()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
146

Jim Fulton's avatar
Jim Fulton committed
147 148 149 150
        object._p_oid=oid
        object._p_jar=self
        object._p_changed=None
        object._p_serial=serial
Jim Fulton's avatar
alpha1  
Jim Fulton committed
151

152 153 154
        self._cache[oid] = object
        if oid=='\0\0\0\0\0\0\0\0':
          self._root_=object # keep a ref
Jim Fulton's avatar
alpha1  
Jim Fulton committed
155 156 157
        return object

    def _persistent_load(self,oid,
158
                        tt=type(())):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
159 160 161 162 163 164 165

        __traceback_info__=oid

        if type(oid) is tt:
            # Quick instance reference.  We know all we need to know
            # to create the instance wo hitting the db, so go for it!
            oid, klass = oid
166 167 168
            obj = self._cache.get(oid, None)
            if obj is not None:
                return obj
169 170 171 172 173 174 175 176 177 178

            if type(klass) is tt:
                module, name = klass
                try: klass=self._db._classFactory(self, module, name)
                except:
                    # Eek, we couldn't get the class. Hm.
                    # Maybe their's more current data in the
                    # object's actual record!
                    return self[oid]
            
Jim Fulton's avatar
Jim Fulton committed
179 180 181 182
            object=klass.__basicnew__()
            object._p_oid=oid
            object._p_jar=self
            object._p_changed=None
Jim Fulton's avatar
alpha1  
Jim Fulton committed
183
            
184
            self._cache[oid] = object
185

Jim Fulton's avatar
alpha1  
Jim Fulton committed
186 187
            return object

188 189 190
        obj = self._cache.get(oid, None)
        if obj is not None:
            return obj
191
        return self[oid]
Jim Fulton's avatar
alpha1  
Jim Fulton committed
192

193
    def _setDB(self, odb):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
194 195 196
        """Begin a new transaction.

        Any objects modified since the last transaction are invalidated.
Shane Hathaway's avatar
Shane Hathaway committed
197
        """
Jim Fulton's avatar
alpha1  
Jim Fulton committed
198
        self._db=odb
199 200
        self._storage=s=odb._storage
        self.new_oid=s.new_oid
Shane Hathaway's avatar
Shane Hathaway committed
201 202 203 204 205
        if self._code_timestamp != global_code_timestamp:
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
            self._cache.invalidate(self._invalidated)
206
        self._opened=time()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
207 208 209

        return self

Shane Hathaway's avatar
Shane Hathaway committed
210 211 212 213 214 215 216
    def _resetCache(self):
        '''
        Creates a new cache, discarding the old.
        '''
        self._code_timestamp = global_code_timestamp
        self._invalidated.clear()
        orig_cache = self._cache
217
        self._cache = PickleCache(self, orig_cache.cache_size)
Shane Hathaway's avatar
Shane Hathaway committed
218

Jim Fulton's avatar
Jim Fulton committed
219 220 221 222 223
    def abort(self, object, transaction):
        """Abort the object in the transaction.

        This just deactivates the thing.
        """
224 225 226 227
        if object is self:
            self._cache.invalidate(self._invalidated)
        else:
            self._cache.invalidate(object._p_oid)
Jim Fulton's avatar
Jim Fulton committed
228

229 230 231 232
    def cacheFullSweep(self, dt=0):
        self._cache.full_sweep(dt)
        
    def cacheMinimize(self, dt=0):
233 234
        # dt is ignored
        self._cache.minimize()
235

236 237
    __onCloseCallbacks = None
    
238
    def onCloseCallback(self, f):
239 240 241
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)
242

Jim Fulton's avatar
alpha1  
Jim Fulton committed
243
    def close(self):
Jim Fulton's avatar
Jim Fulton committed
244
        self._incrgc() # This is a good time to do some GC
245
        db=self._db
Jim Fulton's avatar
alpha1  
Jim Fulton committed
246

247
        # Call the close callbacks.
248 249 250 251 252 253 254 255
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
                try: f()
                except:
                    f=getattr(f, 'im_self', f)
                    LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
                        error=sys.exc_info())
            self.__onCloseCallbacks = None
256 257 258 259 260
        self._db=self._storage=self._tmp=self.new_oid=self._opened=None
        self._debug_info=()
        # Return the connection to the pool.
        db._closeConnection(self)
                        
261 262
    __onCommitActions = None
    
263
    def onCommitAction(self, method_name, *args, **kw):
264 265 266
        if self.__onCommitActions is None:
            self.__onCommitActions = []
        self.__onCommitActions.append((method_name, args, kw))
267 268
        get_transaction().register(self)

269
    def commit(self, object, transaction):
270
        if object is self:
271
            # We registered ourself.  Execute a commit action, if any.
272 273
            if self.__onCommitActions is not None:
                method_name, args, kw = self.__onCommitActions.pop(0)
274 275
                apply(getattr(self, method_name), (transaction,) + args, kw)
            return
Jim Fulton's avatar
alpha1  
Jim Fulton committed
276
        oid=object._p_oid
277
        invalid=self._invalid
Jim Fulton's avatar
 
Jim Fulton committed
278
        if oid is None or object._p_jar is not self:
279
            # new object
Jim Fulton's avatar
 
Jim Fulton committed
280 281 282
            oid = self.new_oid()
            object._p_jar=self
            object._p_oid=oid
283
            self._creating.append(oid)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
284

Jim Fulton's avatar
 
Jim Fulton committed
285
        elif object._p_changed:
286 287 288 289 290
            if (
                (invalid(oid) and not hasattr(object, '_p_resolveConflict'))
                or
                invalid(None)
                ):
291
                raise ConflictError(object=object)
Jim Fulton's avatar
 
Jim Fulton committed
292
            self._invalidating.append(oid)
293

Jim Fulton's avatar
 
Jim Fulton committed
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
        else:
            # Nothing to do
            return

        stack=[object]

        # Create a special persistent_id that passes T and the subobject
        # stack along:
        #
        # def persistent_id(object,
        #                   self=self,
        #                   stackup=stackup, new_oid=self.new_oid):
        #     if (not hasattr(object, '_p_oid') or
        #         type(object) is ClassType): return None
        # 
        #     oid=object._p_oid
        # 
        #     if oid is None or object._p_jar is not self:
        #         oid = self.new_oid()
        #         object._p_jar=self
        #         object._p_oid=oid
        #         stackup(object)
        # 
        #     klass=object.__class__
        # 
        #     if klass is ExtensionKlass: return oid
        #     
        #     if hasattr(klass, '__getinitargs__'): return oid
        # 
        #     module=getattr(klass,'__module__','')
        #     if module: klass=module, klass.__name__
        #     
        #     return oid, klass
        
        file=StringIO()
        seek=file.seek
        pickler=Pickler(file,1)
        pickler.persistent_id=new_persistent_id(self, stack.append)
        dbstore=self._storage.store
        file=file.getvalue
        cache=self._cache
335
        get=cache.get
Jim Fulton's avatar
 
Jim Fulton committed
336 337
        dump=pickler.dump
        clear_memo=pickler.clear_memo
338

339

Jim Fulton's avatar
 
Jim Fulton committed
340 341 342 343 344 345 346
        version=self._version
        
        while stack:
            object=stack[-1]
            del stack[-1]
            oid=object._p_oid
            serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0')
347 348 349 350 351
            if serial == '\0\0\0\0\0\0\0\0':
                # new object
                self._creating.append(oid)
            else:
                #XXX We should never get here
352 353 354 355 356 357
                if (
                    (invalid(oid) and
                     not hasattr(object, '_p_resolveConflict'))
                    or
                    invalid(None)
                    ):
358
                    raise ConflictError(object=object)
359 360
                self._invalidating.append(oid)
                
Jim Fulton's avatar
 
Jim Fulton committed
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
            klass = object.__class__
        
            if klass is ExtensionKlass:
                # Yee Ha!
                dict={}
                dict.update(object.__dict__)
                del dict['_p_jar']
                args=object.__name__, object.__bases__, dict
                state=None
            else:
                if hasattr(klass, '__getinitargs__'):
                    args = object.__getinitargs__()
                    len(args) # XXX Assert it's a sequence
                else:
                    args = None # New no-constructor protocol!
        
377
                module=getattr(klass,'__module__','')
378
                if module: klass=module, klass.__name__
Jim Fulton's avatar
 
Jim Fulton committed
379 380 381 382 383 384 385 386
                __traceback_info__=klass, oid, self._version
                state=object.__getstate__()
        
            seek(0)
            clear_memo()
            dump((klass,args))
            dump(state)
            p=file(1)
387
            s=dbstore(oid,serial,p,version,transaction)
388
            self._store_count = self._store_count + 1
389 390 391
            # Put the object in the cache before handling the
            # response, just in case the response contains the
            # serial number for a newly created object
Jim Fulton's avatar
 
Jim Fulton committed
392 393 394 395 396
            try: cache[oid]=object
            except:
                # Dang, I bet its wrapped:
                if hasattr(object, 'aq_base'):
                    cache[oid]=object.aq_base
397 398
                else:
                    raise
Jim Fulton's avatar
alpha1  
Jim Fulton committed
399

400 401
            self._handle_serial(s, oid)

402
    def commit_sub(self, t):
403
        """Commit all work done in subtransactions"""
404
        tmp=self._tmp
405
        if tmp is None: return
406
        src=self._storage
407 408 409 410

        LOG('ZODB', BLATHER,
            'Commiting subtransaction of size %s' % src.getSize())
        
411
        self._storage=tmp
412
        self._tmp=None
413 414 415 416 417 418 419

        tmp.tpc_begin(t)
        
        load=src.load
        store=tmp.store
        dest=self._version
        get=self._cache.get
420
        oids=src._index.keys()
421 422

        # Copy invalidating and creating info from temporary storage:
423 424
        invalidating=self._invalidating
        invalidating[len(invalidating):]=oids
425 426
        creating=self._creating
        creating[len(creating):]=src._creating
427
        
428
        for oid in oids:
429 430
            data, serial = load(oid, src)
            s=store(oid, serial, data, dest, t)
431
            self._handle_serial(s, oid, change=0)
432 433

    def abort_sub(self, t):
434
        """Abort work done in subtransactions"""
435 436 437 438 439
        tmp=self._tmp
        if tmp is None: return
        src=self._storage
        self._tmp=None
        self._storage=tmp
440

441
        self._cache.invalidate(src._index.keys())
442 443 444 445 446 447 448 449 450 451 452 453 454 455
        self._invalidate_creating(src._creating)

    def _invalidate_creating(self, creating=None):
        """Dissown any objects newly saved in an uncommitted transaction.
        """
        if creating is None:
            creating=self._creating
            self._creating=[]

        cache=self._cache
        cache_get=cache.get
        for oid in creating:
            o=cache_get(oid, None)
            if o is not None:
456
                del cache[oid]
457 458 459 460
                del o._p_jar
                del o._p_oid

    #XXX
461

Jim Fulton's avatar
alpha1  
Jim Fulton committed
462 463 464 465 466 467 468 469 470 471 472 473 474
    def db(self): return self._db

    def getVersion(self): return self._version
        
    def invalidate(self, oid):
        """Invalidate a particular oid

        This marks the oid as invalid, but doesn't actually invalidate
        it.  The object data will be actually invalidated at certain
        transaction boundaries.
        """
        self._invalidated[oid]=1

475 476 477 478
    def modifiedInVersion(self, oid):
        try: return self._db.modifiedInVersion(oid)
        except KeyError:
            return self._version
Jim Fulton's avatar
alpha1  
Jim Fulton committed
479 480 481

    def root(self): return self['\0\0\0\0\0\0\0\0']

482
    def setstate(self, object):
483 484 485 486 487 488 489
        oid=object._p_oid

        if self._storage is None:
            msg = "Shouldn't load state for %s when the connection is closed" % `oid`
            LOG('ZODB',ERROR, msg)
            raise RuntimeError(msg)

490
        try:
491
            p, serial = self._storage.load(oid, self._version)
492
            self._load_count = self._load_count + 1
493 494 495 496 497 498 499 500 501 502 503 504

            # XXX this is quite conservative!
            # We need, however, to avoid reading data from a transaction
            # that committed after the current "session" started, as
            # that might lead to mixing of cached data from earlier
            # transactions and new inconsistent data.
            #
            # Note that we (carefully) wait until after we call the
            # storage to make sure that we don't miss an invaildation
            # notifications between the time we check and the time we
            # read.
            invalid=self._invalid
505 506 507
            if invalid(oid) or invalid(None):
                if not hasattr(object.__class__, '_p_independent'):
                    get_transaction().register(self)
508
                    raise ReadConflictError(object=object)
509 510 511
                invalid=1
            else:
                invalid=0
512

513 514 515
            file=StringIO(p)
            unpickler=Unpickler(file)
            unpickler.persistent_load=self._persistent_load
516 517
            unpickler.load()
            state = unpickler.load()
518 519 520 521 522 523 524 525 526

            if hasattr(object, '__setstate__'):
                object.__setstate__(state)
            else:
                d=object.__dict__
                for k,v in state.items(): d[k]=v

            object._p_serial=serial

527 528 529 530 531 532
            if invalid:
                if object._p_independent():
                    try: del self._invalidated[oid]
                    except KeyError: pass
                else:
                    get_transaction().register(self)
533
                    raise ConflictError(object=object)
534

535 536
        except ConflictError:
            raise
537
        except:
538
            LOG('ZODB',ERROR, "Couldn't load state for %s" % `oid`,
539
                error=sys.exc_info())
540
            raise
541

542 543 544 545 546 547 548 549 550
    def oldstate(self, object, serial):
        oid=object._p_oid
        p = self._storage.loadSerial(oid, serial)
        file=StringIO(p)
        unpickler=Unpickler(file)
        unpickler.persistent_load=self._persistent_load
        unpickler.load()
        return  unpickler.load()

551
    def setklassstate(self, object):
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
        try:
            oid=object._p_oid
            __traceback_info__=oid
            p, serial = self._storage.load(oid, self._version)
            file=StringIO(p)
            unpickler=Unpickler(file)
            unpickler.persistent_load=self._persistent_load
    
            copy = unpickler.load()
    
            klass, args = copy
    
            if klass is not ExtensionKlass:
                LOG('ZODB',ERROR,
                    "Unexpected klass when setting class state on %s"
                    % getattr(object,'__name__','(?)'))
                return
            
            copy=apply(klass,args)
            object.__dict__.clear()
            object.__dict__.update(copy.__dict__)
    
            object._p_oid=oid
            object._p_jar=self
            object._p_changed=0
            object._p_serial=serial
        except:
579
            LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
580
            raise
581

Jim Fulton's avatar
alpha1  
Jim Fulton committed
582
    def tpc_abort(self, transaction):
Andreas Jung's avatar
Andreas Jung committed
583
        if self.__onCommitActions is not None:
584
            del self.__onCommitActions
Jim Fulton's avatar
alpha1  
Jim Fulton committed
585 586
        self._storage.tpc_abort(transaction)
        cache=self._cache
587 588
        cache.invalidate(self._invalidated)
        cache.invalidate(self._invalidating)
589
        self._invalidate_creating()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
590

591
    def tpc_begin(self, transaction, sub=None):
592
        if self._invalid(None): # Some nitwit invalidated everything!
593
            raise ConflictError("transaction already invalidated")
Jim Fulton's avatar
alpha1  
Jim Fulton committed
594
        self._invalidating=[]
595
        self._creating=[]
596 597 598 599 600 601 602 603 604 605

        if sub:
            # Sub-transaction!
            _tmp=self._tmp
            if _tmp is None:
                _tmp=TmpStore.TmpStore(self._version)
                self._tmp=self._storage
                self._storage=_tmp
                _tmp.registerDB(self._db, 0)

Jim Fulton's avatar
alpha1  
Jim Fulton committed
606 607
        self._storage.tpc_begin(transaction)

608 609 610 611 612 613 614 615
    def tpc_vote(self, transaction):
        if self.__onCommitActions is not None:
            del self.__onCommitActions
        try:
            vote=self._storage.tpc_vote
        except AttributeError:
            return
        s = vote(transaction)
616 617 618 619 620 621 622 623 624 625 626 627 628
        self._handle_serial(s)

    def _handle_serial(self, store_return, oid=None, change=1):
        """Handle the returns from store() and tpc_vote() calls."""

        # These calls can return different types depending on whether
        # ZEO is used.  ZEO uses asynchronous returns that may be
        # returned in batches by the ClientStorage.  ZEO1 can also
        # return an exception object and expect that the Connection
        # will raise the exception.

        # When commit_sub() exceutes a store, there is no need to
        # update the _p_changed flag, because the subtransaction
Jeremy Hylton's avatar
Jeremy Hylton committed
629
        # tpc_vote() calls already did this.  The change=1 argument
630 631 632 633 634 635 636 637
        # exists to allow commit_sub() to avoid setting the flag
        # again. 
        if not store_return:
            return
        if isinstance(store_return, StringType):
            assert oid is not None
            serial = store_return
            obj = self._cache.get(oid, None)
638 639
            if obj is None:
                return
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
            if serial == ResolvedSerial:
                obj._p_changed = None
            else:
                if change:
                    obj._p_changed = 0
                obj._p_serial = serial
        else:
            for oid, serial in store_return:
                if not isinstance(serial, StringType):
                    raise serial
                obj = self._cache.get(oid, None)
                if obj is None:
                    continue
                if serial == ResolvedSerial:
                    obj._p_changed = None
                else:
                    if change:
                        obj._p_changed = 0
                    obj._p_serial = serial

660

Jim Fulton's avatar
alpha1  
Jim Fulton committed
661
    def tpc_finish(self, transaction):
662 663

        # It's important that the storage call the function we pass
664 665 666 667
        # (self._invalidate_invalidating) while it still has it's
        # lock.  We don't want another thread to be able to read any
        # updated data until we've had a chance to send an
        # invalidation message to all of the other connections!
668 669 670 671

        if self._tmp is not None:
            # Commiting a subtransaction!
            # There is no need to invalidate anything.
672
            self._storage.tpc_finish(transaction)
673 674 675
            self._storage._creating[:0]=self._creating
            del self._creating[:]
        else:
676
            self._db.begin_invalidation()
677 678 679
            self._storage.tpc_finish(transaction,
                                     self._invalidate_invalidating)

680
        self._cache.invalidate(self._invalidated)
Jim Fulton's avatar
Jim Fulton committed
681
        self._incrgc() # This is a good time to do some GC
Jim Fulton's avatar
alpha1  
Jim Fulton committed
682

683
    def _invalidate_invalidating(self):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
684
        invalidate=self._db.invalidate
685 686 687
        for oid in self._invalidating:
            invalidate(oid, self)
        self._db.finish_invalidation()
688

689 690
    def sync(self):
        get_transaction().abort()
691 692
        sync=getattr(self._storage, 'sync', 0)
        if sync != 0: sync()
693
        self._cache.invalidate(self._invalidated)
Jim Fulton's avatar
Jim Fulton committed
694
        self._incrgc() # This is a good time to do some GC
695

696 697 698
    def getDebugInfo(self): return self._debug_info
    def setDebugInfo(self, *args): self._debug_info=self._debug_info+args

699 700 701 702 703 704 705 706 707 708 709
    def getTransferCounts(self, clear=0):
        """Returns the number of objects loaded and stored.

        Set the clear argument to reset the counters.
        """
        res = (self._load_count, self._store_count)
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res

710 711 712 713 714 715 716 717 718

    ######################################################################
    # Just plain weird. Don't try this at home kids.
    def exchange(self, old, new):
        oid=old._p_oid
        new._p_oid=oid
        new._p_jar=self
        new._p_changed=1
        get_transaction().register(new)
719
        self._cache[oid]=new
720
        
Jim Fulton's avatar
alpha1  
Jim Fulton committed
721 722 723 724
class tConnection(Connection):

    def close(self):
        self._breakcr()
725