Commit 5de492bf authored by Jim Fulton's avatar Jim Fulton

No longer pass storage to Connection constructor.

Added subtransaction support.
parent 0628569b
...@@ -84,15 +84,16 @@ ...@@ -84,15 +84,16 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.10 1999/06/24 20:05:50 jim Exp $""" $Id: Connection.py,v 1.11 1999/06/29 18:28:16 jim Exp $"""
__version__='$Revision: 1.10 $'[11:-2] __version__='$Revision: 1.11 $'[11:-2]
from cPickleCache import PickleCache from cPickleCache import PickleCache
from POSException import ConflictError, ExportError from POSException import ConflictError, ExportError
from cStringIO import StringIO from cStringIO import StringIO
from cPickle import Unpickler, Pickler from cPickle import Unpickler, Pickler
from ExtensionClass import Base from ExtensionClass import Base
import Transaction, string, ExportImport, sys, traceback import Transaction, string, ExportImport, sys, traceback, TmpStore
from zLOG import LOG, ERROR
ExtensionKlass=Base.__class__ ExtensionKlass=Base.__class__
...@@ -108,12 +109,11 @@ class Connection(ExportImport.ExportImport): ...@@ -108,12 +109,11 @@ class Connection(ExportImport.ExportImport):
The Connection manages movement of objects in and out of object storage. The Connection manages movement of objects in and out of object storage.
""" """
_tmp=None
def __init__(self, storage, version='', cache_size=400, def __init__(self, version='', cache_size=400,
cache_deactivate_after=60): cache_deactivate_after=60):
"""Create a new Connection""" """Create a new Connection"""
self._storage=storage
self.new_oid=storage.new_oid
self._version=version self._version=version
self._cache=cache=PickleCache(self, cache_size, cache_deactivate_after) self._cache=cache=PickleCache(self, cache_size, cache_deactivate_after)
self._incrgc=cache.incrgc self._incrgc=cache.incrgc
...@@ -217,6 +217,8 @@ class Connection(ExportImport.ExportImport): ...@@ -217,6 +217,8 @@ class Connection(ExportImport.ExportImport):
Any objects modified since the last transaction are invalidated. Any objects modified since the last transaction are invalidated.
""" """
self._db=odb self._db=odb
self._storage=s=odb._storage
self.new_oid=s.new_oid
self._cache.invalidate(self._invalidated) self._cache.invalidate(self._invalidated)
return self return self
...@@ -231,7 +233,7 @@ class Connection(ExportImport.ExportImport): ...@@ -231,7 +233,7 @@ class Connection(ExportImport.ExportImport):
def close(self): def close(self):
self._incrgc() self._incrgc()
self._db._closeConnection(self) self._db._closeConnection(self)
del self._db self._db=self._storage=self._tmp=None
def commit(self, object, transaction): def commit(self, object, transaction):
oid=object._p_oid oid=object._p_oid
...@@ -265,8 +267,7 @@ class Connection(ExportImport.ExportImport): ...@@ -265,8 +267,7 @@ class Connection(ExportImport.ExportImport):
if hasattr(klass, '__getinitargs__'): return oid if hasattr(klass, '__getinitargs__'): return oid
try: module=klass.__module__ module=getattr(klass,'__module__','')
except: module=''
if module: klass=module, klass.__name__ if module: klass=module, klass.__name__
return oid, klass return oid, klass
...@@ -285,8 +286,7 @@ class Connection(ExportImport.ExportImport): ...@@ -285,8 +286,7 @@ class Connection(ExportImport.ExportImport):
object=stack[-1] object=stack[-1]
del stack[-1] del stack[-1]
oid=object._p_oid oid=object._p_oid
try: serial=object._p_serial serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0')
except: serial='\0'*8
if invalid(oid): raise ConflictError, oid if invalid(oid): raise ConflictError, oid
klass = object.__class__ klass = object.__class__
...@@ -304,8 +304,7 @@ class Connection(ExportImport.ExportImport): ...@@ -304,8 +304,7 @@ class Connection(ExportImport.ExportImport):
else: else:
args = None # New no-constructor protocol! args = None # New no-constructor protocol!
try: module=klass.__module__ module=getattr(klass,'__module__','')
except: module=''
if module: klass=module, klass.__name__ if module: klass=module, klass.__name__
state=object.__getstate__() state=object.__getstate__()
...@@ -324,6 +323,36 @@ class Connection(ExportImport.ExportImport): ...@@ -324,6 +323,36 @@ class Connection(ExportImport.ExportImport):
return topoid return topoid
def commit_sub(self, t):
tmp=self._tmp
if tmp is None: return
src=self._storage
self._storage=tmp
self._tmp=None
tmp.tpc_begin(t)
load=src.load
store=tmp.store
invalidate=self._invalidating.append
dest=self._version
get=self._cache.get
for oid in src._index.keys():
data, serial = load(oid, src)
s=store(oid, serial, data, dest, t)
o=get(oid, None)
if o is not None: o._p_serial=s
invalidate(oid)
def abort_sub(self, t):
tmp=self._tmp
if tmp is None: return
src=self._storage
self._tmp=None
self._storage=tmp
self._cache.invalidate(src._index.keys())
def db(self): return self._db def db(self): return self._db
def getVersion(self): return self._version def getVersion(self): return self._version
...@@ -357,6 +386,9 @@ class Connection(ExportImport.ExportImport): ...@@ -357,6 +386,9 @@ class Connection(ExportImport.ExportImport):
state = unpickler.load() state = unpickler.load()
except: except:
t, v =sys.exc_info()[:2] t, v =sys.exc_info()[:2]
LOG('ZODB',ERROR,
"Couldn't unnpickle %s" % `oid`,
error=sys.exc_info())
raise raise
if hasattr(object, '__setstate__'): if hasattr(object, '__setstate__'):
...@@ -372,10 +404,20 @@ class Connection(ExportImport.ExportImport): ...@@ -372,10 +404,20 @@ class Connection(ExportImport.ExportImport):
cache.invalidate(self._invalidated) cache.invalidate(self._invalidated)
cache.invalidate(self._invalidating) cache.invalidate(self._invalidating)
def tpc_begin(self, transaction): def tpc_begin(self, transaction, sub=None):
if self._invalid(None): # Some nitwit invalidated everything! if self._invalid(None): # Some nitwit invalidated everything!
raise ConflictError, oid raise ConflictError, oid
self._invalidating=[] self._invalidating=[]
if sub:
# Sub-transaction!
_tmp=self._tmp
if _tmp is None:
_tmp=TmpStore.TmpStore(self._version)
self._tmp=self._storage
self._storage=_tmp
_tmp.registerDB(self._db, 0)
self._storage.tpc_begin(transaction) self._storage.tpc_begin(transaction)
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
...@@ -389,7 +431,11 @@ class Connection(ExportImport.ExportImport): ...@@ -389,7 +431,11 @@ class Connection(ExportImport.ExportImport):
def sync(self): def sync(self):
get_transaction().abort() get_transaction().abort()
self._cache.invalidate(self._invalidated) self._cache.invalidate(self._invalidated)
def modifiedInVersion(self, oid):
try: self._db.modifiedInVersion(oid)
except KeyError:
return self._version
class tConnection(Connection): class tConnection(Connection):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment