Commit 96f819bb authored by Jim Fulton's avatar Jim Fulton

Added a lock for calling cache methods to avoid a race condition

between calls from the storage and from the out-of-band
invalidation message handler.
parent ec9f5c8f
......@@ -144,16 +144,23 @@ file 0 and file 1.
"""
__version__ = "$Revision: 1.11 $"[11:-2]
__version__ = "$Revision: 1.12 $"[11:-2]
import os, tempfile
from struct import pack, unpack
from thread import allocate_lock
magic='ZEC0'
class ClientCache:
def __init__(self, storage='', size=20000000, client=None, var=None):
# Allocate locks:
l=allocate_lock()
self._acquire=l.acquire
self._release=l.release
if client:
# Create a persistent cache
if var is None: var=os.path.join(INSTANCE_HOME,'var')
......@@ -199,75 +206,117 @@ class ClientCache:
self._current=current
def open(self):
self._index=index={}
self._get=index.get
serial={}
f=self._f
current=self._current
if f[not current] is not None:
read_index(index, serial, f[not current], not current)
self._pos=read_index(index, serial, f[current], current)
return serial.items()
self._acquire()
try:
self._index=index={}
self._get=index.get
serial={}
f=self._f
current=self._current
if f[not current] is not None:
read_index(index, serial, f[not current], not current)
self._pos=read_index(index, serial, f[current], current)
return serial.items()
finally: self._release()
def invalidate(self, oid, version):
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
f.seek(ap)
h=f.read(8)
if h != oid: return
f.seek(8,1) # Dang, we shouldn't have to do this. Bad Solaris & NT
if version:
f.write('n')
else:
del self._index[oid]
f.write('i')
self._acquire()
try:
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
f.seek(ap)
h=f.read(8)
if h != oid: return
f.seek(8,1) # Dang, we shouldn't have to do this. Bad Solaris & NT
if version:
f.write('n')
else:
del self._index[oid]
f.write('i')
finally: self._release()
def load(self, oid, version):
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
read=f.read
seek(ap)
h=read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else: tlen=-1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
del self._index[oid]
return None
if h[8]=='n':
if version: return None
if not dlen:
self._acquire()
try:
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
read=f.read
seek(ap)
h=read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else: tlen=-1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
del self._index[oid]
return None
if not vlen or not version:
if dlen: return read(dlen), h[19:]
else: return None
if dlen: seek(dlen, 1)
v=read(vlen)
if version != v:
if dlen:
seek(-dlen-vlen, 1)
return read(dlen), h[19:]
else: None
dlen=unpack(">i", read(4))[0]
return read(dlen), read(8)
return None
if h[8]=='n':
if version: return None
if not dlen:
del self._index[oid]
return None
if not vlen or not version:
if dlen: return read(dlen), h[19:]
else: return None
if dlen: seek(dlen, 1)
v=read(vlen)
if version != v:
if dlen:
seek(-dlen-vlen, 1)
return read(dlen), h[19:]
else: None
dlen=unpack(">i", read(4))[0]
return read(dlen), read(8)
finally: self._release()
def update(self, oid, serial, version, data):
if version:
# We need to find and include non-version data
self._acquire()
try:
if version:
# We need to find and include non-version data
p=self._get(oid, None)
if p is None:
return self._store(oid, '', '', version, data, serial)
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
read=f.read
seek(ap)
h=read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
return self._store(oid, '', '', version, data, serial)
if tlen <= 0 or vlen < 0 or dlen <= 0 or vlen+dlen > tlen:
return self._store(oid, '', '', version, data, serial)
if dlen:
p=read(dlen)
s=h[19:]
else:
return self._store(oid, '', '', version, data, serial)
self._store(oid, p, s, version, data, serial)
else:
# Simple case, just store new data:
self._store(oid, data, serial, '', None, None)
finally: self._release()
def modifiedInVersion(self, oid):
self._acquire()
try:
p=self._get(oid, None)
if p is None:
return self.store(oid, '', '', version, data, serial)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
......@@ -276,69 +325,50 @@ class ClientCache:
h=read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
return self.store(oid, '', '', version, data, serial)
if tlen <= 0 or vlen < 0 or dlen <= 0 or vlen+dlen > tlen:
return self.store(oid, '', '', version, data, serial)
if dlen:
p=read(dlen)
s=h[19:]
else:
return self.store(oid, '', '', version, data, serial)
self.store(oid, p, s, version, data, serial)
else:
# Simple case, just store new data:
self.store(oid, data, serial, '', None, None)
else: tlen=-1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
del self._index[oid]
return None
def modifiedInVersion(self, oid):
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
read=f.read
seek(ap)
h=read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else: tlen=-1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
del self._index[oid]
return None
if h[8]=='n': return None
if h[8]=='n': return None
if not vlen: return ''
seek(dlen, 1)
return read(vlen)
if not vlen: return ''
seek(dlen, 1)
return read(vlen)
finally: self._release()
def checkSize(self, size):
# Make sure we aren't going to exceed the target size.
# If we are, then flip the cache.
if self._pos+size > self._limit:
current=not self._current
self._current=current
if self._p[current] is not None:
# Persistent cache file:
# Note that due to permission madness, waaa,
# we need to remove the old file before
# we open the new one. Waaaaaaaaaa.
if self._f[current] is not None:
self._f[current].close()
try: os.remove(self._p[current])
except: pass
self._f[current]=open(self._p[current],'w+b')
else:
# Temporary cache file:
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic)
self._pos=pos=4
self._acquire()
try:
# Make sure we aren't going to exceed the target size.
# If we are, then flip the cache.
if self._pos+size > self._limit:
current=not self._current
self._current=current
if self._p[current] is not None:
# Persistent cache file:
# Note that due to permission madness, waaa,
# we need to remove the old file before
# we open the new one. Waaaaaaaaaa.
if self._f[current] is not None:
self._f[current].close()
try: os.remove(self._p[current])
except: pass
self._f[current]=open(self._p[current],'w+b')
else:
# Temporary cache file:
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic)
self._pos=pos=4
finally: self._release()
def store(self, oid, p, s, version, pv, sv):
self._acquire()
try: self._store(oid, p, s, version, pv, sv)
finally: self._release()
def _store(self, oid, p, s, version, pv, sv):
if not s:
p=''
s='\0\0\0\0\0\0\0\0'
......@@ -348,7 +378,7 @@ class ClientCache:
vlen=len(version)
else:
vlen=0
pos=self._pos
current=self._current
f=self._f[current]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment