Commit 33ced447 authored by Chris McDonough's avatar Chris McDonough

Fix for KeyErrors emanating from TemporaryStorage unexpectedly. A

connection can be holding on to an object with references to an object
that has been recently garbage collected.  When the object is unghosted in
the reference-holding thread, the storage cannot find the oid (because it's
out of storage due to gc).  We work around this by turning KeyErrors that
we believe are due to this scenario into ConflictErrors.
parent 040018ea
...@@ -17,13 +17,13 @@ MappingStorage. Unlike MappingStorage, it needs not be packed to get rid of ...@@ -17,13 +17,13 @@ MappingStorage. Unlike MappingStorage, it needs not be packed to get rid of
non-cyclic garbage and it does rudimentary conflict resolution. This is a non-cyclic garbage and it does rudimentary conflict resolution. This is a
ripoff of Jim's Packless bsddb3 storage. ripoff of Jim's Packless bsddb3 storage.
$Id: TemporaryStorage.py,v 1.5 2004/02/19 18:35:23 jeremy Exp $ $Id: TemporaryStorage.py,v 1.1.2.2 2004/05/16 01:41:34 chrism Exp $
""" """
__version__ ='$Revision: 1.5 $'[11:-2] __version__ ='$Revision: 1.1.2.2 $'[11:-2]
from zLOG import LOG, BLATHER from zLOG import LOG, BLATHER
from ZODB.serialize import referencesf from ZODB.referencesf import referencesf
from ZODB import POSException from ZODB import POSException
from ZODB.BaseStorage import BaseStorage from ZODB.BaseStorage import BaseStorage
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
...@@ -33,6 +33,8 @@ import time ...@@ -33,6 +33,8 @@ import time
CONFLICT_CACHE_MAXAGE = 60 CONFLICT_CACHE_MAXAGE = 60
# garbage collect conflict cache every CONFLICT_CACHE_GCEVERY seconds # garbage collect conflict cache every CONFLICT_CACHE_GCEVERY seconds
CONFLICT_CACHE_GCEVERY = 60 CONFLICT_CACHE_GCEVERY = 60
# keep history of recently gc'ed oids of length RECENTLY_GC_OIDS_LEN
RECENTLY_GC_OIDS_LEN = 200
class ReferenceCountError(POSException.POSError): class ReferenceCountError(POSException.POSError):
""" An error occured while decrementing a reference to an object in """ An error occured while decrementing a reference to an object in
...@@ -55,6 +57,7 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -55,6 +57,7 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
_tmp -- used by 'store' to collect changes before finalization _tmp -- used by 'store' to collect changes before finalization
_conflict_cache -- cache of recently-written object revisions _conflict_cache -- cache of recently-written object revisions
_last_cache_gc -- last time that conflict cache was garbage collected _last_cache_gc -- last time that conflict cache was garbage collected
_recently_gc_oids -- a queue of recently gc'ed oids
""" """
BaseStorage.__init__(self, name) BaseStorage.__init__(self, name)
...@@ -65,6 +68,7 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -65,6 +68,7 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
self._tmp = [] self._tmp = []
self._conflict_cache = {} self._conflict_cache = {}
self._last_cache_gc = 0 self._last_cache_gc = 0
self._recently_gc_oids = [None for x in range (RECENTLY_GC_OIDS_LEN)]
self._oid = '\0\0\0\0\0\0\0\0' self._oid = '\0\0\0\0\0\0\0\0'
def __len__(self): def __len__(self):
...@@ -91,16 +95,26 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -91,16 +95,26 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
def load(self, oid, version): def load(self, oid, version):
self._lock_acquire() self._lock_acquire()
try: try:
s=self._index[oid] try:
p=self._opickle[oid] s=self._index[oid]
return p, s # pickle, serial p=self._opickle[oid]
return p, s # pickle, serial
except KeyError:
# this oid was probably garbage collected while a thread held
# on to an object that had a reference to it; we can probably
# force the loader to sync their connection by raising a
# ConflictError (at least if Zope is the loader, because it
# will resync its connection on a retry). This isn't
# perfect because the length of the recently gc'ed oids list
# is finite and could be overrun through a mass gc, but it
# should be adequate in common-case usage.
if oid in self._recently_gc_oids:
raise POSException.ConflictError(oid=oid)
else:
raise
finally: finally:
self._lock_release() self._lock_release()
def loadEx(self, oid, version):
p, s = self.load(oid, version)
return p, s, s
def loadSerial(self, oid, serial, marker=[]): def loadSerial(self, oid, serial, marker=[]):
""" this is only useful to make conflict resolution work. It """ this is only useful to make conflict resolution work. It
does not actually implement all the semantics that a revisioning does not actually implement all the semantics that a revisioning
...@@ -132,16 +146,13 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -132,16 +146,13 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
if self._index.has_key(oid): if self._index.has_key(oid):
oserial=self._index[oid] oserial=self._index[oid]
if serial != oserial: if serial != oserial:
rdata = self.tryToResolveConflict(oid, oserial, data=self.tryToResolveConflict(oid, oserial, serial, data)
serial, data) if not data:
if rdata is None: raise POSException.ConflictError(oid=oid,
raise POSException.ConflictError( serials=(oserial, serial))
oid=oid, serials=(oserial, serial), data=data)
else:
data = rdata
else: else:
oserial = serial oserial = serial
newserial=self._tid newserial=self._serial
self._tmp.append((oid, data)) self._tmp.append((oid, data))
now = time.time() now = time.time()
self._conflict_cache[(oid, newserial)] = data, now self._conflict_cache[(oid, newserial)] = data, now
...@@ -154,7 +165,7 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -154,7 +165,7 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
referenceCount=self._referenceCount referenceCount=self._referenceCount
referenceCount_get=referenceCount.get referenceCount_get=referenceCount.get
oreferences=self._oreferences oreferences=self._oreferences
serial=self._tid serial=self._serial
index=self._index index=self._index
opickle=self._opickle opickle=self._opickle
...@@ -229,6 +240,10 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -229,6 +240,10 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
# take out the garbage. # take out the garbage.
referenceCount=self._referenceCount referenceCount=self._referenceCount
referenceCount_get=referenceCount.get referenceCount_get=referenceCount.get
self._recently_gc_oids.pop()
self._recently_gc_oids.insert(0, oid)
try: del referenceCount[oid] try: del referenceCount[oid]
except: pass except: pass
try: del self._opickle[oid] try: del self._opickle[oid]
...@@ -236,6 +251,11 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -236,6 +251,11 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
try: del self._index[oid] try: del self._index[oid]
except: pass except: pass
# remove this object from the conflict cache if it exists there
for k in self._conflict_cache.keys():
if k[0] == oid:
del self._conflict_cache[k]
# Remove/decref references # Remove/decref references
roids = self._oreferences.get(oid, []) roids = self._oreferences.get(oid, [])
while roids: while roids:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment