Commit 100a2198 authored by Jim Fulton's avatar Jim Fulton

Merged cache fixes from 3.8 branch:

- The cache used an excessive amount of memory, causing applications
  with large caches to exhaust available memory.

- Fixed a number of bugs in the handling of persistent ZEO caches:

- Cache records are written in several steps.  If a process exits
    after writing begins and before it is finishes, the cache will be
    corrupt on restart.  The way records are writted was changed to
    make cache record updates atomic.

- There was no lock file to prevent opening a cache multiple times
    at once, which would lead to corruption.  Persistent caches now
    use lock files, in the same way that file storages do.

- A bug in the cache-opening logic led to cache failure in the
    unlikely event that a cache has no free blocks.
parent 0d1f0cfa
......@@ -22,568 +22,64 @@ it to map this richer API onto the simple key-based API of the lower-level
FileCache.
"""
from struct import pack, unpack
import bisect
import BTrees.LLBTree
import BTrees.LOBTree
import logging
import os
import struct
import tempfile
import time
from ZODB.utils import z64, u64
import ZODB.fsIndex
import ZODB.lock_file
from ZODB.utils import p64, u64, z64
logger = logging.getLogger("ZEO.cache")
max32 = (1 << 32) - 1
##
# A disk-based cache for ZEO clients.
# <p>
#
# This class provides an interface to a persistent, disk-based cache
# used by ZEO clients to store copies of database records from the
# server.
# <p>
#
# The details of the constructor as unspecified at this point.
# <p>
#
# Each entry in the cache is valid for a particular range of transaction
# ids. The lower bound is the transaction that wrote the data. The
# upper bound is the next transaction that wrote a revision of the
# object. If the data is current, the upper bound is stored as None;
# the data is considered current until an invalidate() call is made.
# <p>
#
# It is an error to call store() twice with the same object without an
# intervening invalidate() to set the upper bound on the first cache
# entry. <em>Perhaps it will be necessary to have a call the removes
# entry. Perhaps it will be necessary to have a call the removes
# something from the cache outright, without keeping a non-current
# entry.</em>
# <h3>Cache verification</h3>
# <p>
# entry.
# Cache verification
#
# When the client is connected to the server, it receives
# invalidations every time an object is modified. When the client is
# disconnected then reconnects, it must perform cache verification to make
# sure its cached data is synchronized with the storage's current state.
# <p>
#
# quick verification
# full verification
# <p>
class ClientCache(object):
"""A simple in-memory cache."""
##
# Do we put the constructor here?
# @param path path of persistent snapshot of cache state (a file path)
# @param size size of cache file, in bytes
# The default size of 200MB makes a lot more sense than the traditional
# default of 20MB. The default here is misleading, though, since
# ClientStorage is the only user of ClientCache, and it always passes an
# explicit size of its own choosing.
def __init__(self, path=None, size=200*1024**2):
self.path = path
self.size = size
# The cache stores objects in a dict mapping (oid, tid) pairs
# to Object() records (see below). The tid is the transaction
# id that wrote the object. An object record includes data,
# serialno, and end tid. It has auxillary data structures to
# compute the appropriate tid, given the oid and a transaction id
# representing an arbitrary point in history.
#
# The serialized form of the cache just stores the Object()
# records. The in-memory form can be reconstructed from these
# records.
# Maps oid to current tid. Used to compute key for objects.
self.current = {}
# Maps oid to list of (start_tid, end_tid) pairs in sorted order.
# Used to find matching key for load of non-current data.
self.noncurrent = {}
# A FileCache instance does all the low-level work of storing
# and retrieving objects to/from the cache file.
self.fc = FileCache(size, self.path, self)
self._setup_trace(self.path)
def open(self):
self.fc.scan(self.install)
##
# Callback for FileCache.scan(), when a pre-existing file cache is
# used. For each object in the file, `install()` is invoked. `f`
# is the file object, positioned at the start of the serialized Object.
# `ent` is an Entry giving the object's key ((oid, start_tid) pair).
def install(self, f, ent):
# Called by cache storage layer to insert object.
o = Object.fromFile(f, ent.key, skip_data=True)
if o is None:
return
oid = o.key[0]
if o.end_tid is None:
self.current[oid] = o.start_tid
else:
assert o.start_tid < o.end_tid
this_span = o.start_tid, o.end_tid
span_list = self.noncurrent.get(oid)
if span_list:
bisect.insort_left(span_list, this_span)
else:
self.noncurrent[oid] = [this_span]
def close(self):
self.fc.close()
if self._tracefile:
sync(self._tracefile)
self._tracefile.close()
self._tracefile = None
##
# Set the last transaction seen by the cache.
# @param tid a transaction id
# @exception ValueError attempt to set a new tid less than the current tid
def setLastTid(self, tid):
self.fc.settid(tid)
##
# Return the last transaction seen by the cache.
# @return a transaction id
# @defreturn string, or None if no transaction is yet known
def getLastTid(self):
if self.fc.tid == z64:
return None
else:
return self.fc.tid
##
# Return the current data record for oid.
# @param oid object id
# @return (data record, serial number), or None if the object is not
# in the cache
# @defreturn 2-tuple: (string, string)
def load(self, oid):
tid = None
if tid is None:
tid = self.current.get(oid)
if tid is None:
self._trace(0x20, oid)
return None
o = self.fc.access((oid, tid))
if o is None:
self._trace(0x20, oid)
return None
self._trace(0x22, oid, o.start_tid, o.end_tid, len(o.data))
return o.data, tid
##
# Return a non-current revision of oid that was current before tid.
# @param oid object id
# @param tid id of transaction that wrote next revision of oid
# @return data record, serial number, start tid, and end tid
# @defreturn 4-tuple: (string, string, string, string)
def loadBefore(self, oid, tid):
L = self.noncurrent.get(oid)
if L is None:
self._trace(0x24, oid, "", tid)
return None
# A pair with None as the second element is less than any pair with
# the same first tid. Dubious: this relies on that None is less
# than any comparable non-None object in recent Pythons.
i = bisect.bisect_left(L, (tid, None))
# Now L[i-1] < (tid, None) < L[i], and the start_tid for everything in
# L[:i] is < tid, and the start_tid for everything in L[i:] is >= tid.
# Therefore the largest start_tid < tid must be at L[i-1]. If i is 0,
# there is no start_tid < tid: we don't have any data old enougn.
if i == 0:
self._trace(0x24, oid, "", tid)
return
lo, hi = L[i-1]
assert lo < tid
if tid > hi: # we don't have any data in the right range
self._trace(0x24, oid, "", tid)
return None
o = self.fc.access((oid, lo))
self._trace(0x26, oid, "", tid)
return o.data, o.start_tid, o.end_tid
##
# Store a new data record in the cache.
# @param oid object id
# @param start_tid the id of the transaction that wrote this revision
# @param end_tid the id of the transaction that created the next
# revision of oid. If end_tid is None, the data is
# current.
# @param data the actual data
def store(self, oid, start_tid, end_tid, data):
# It's hard for the client to avoid storing the same object
# more than once.
if (oid, start_tid) in self.fc:
return
o = Object((oid, start_tid), data, start_tid, end_tid)
if end_tid is None:
_cur_start = self.current.get(oid)
if _cur_start:
if _cur_start != start_tid:
raise ValueError(
"already have current data for oid")
else:
return
if not self.fc.add(o):
return # too large
self.current[oid] = start_tid
self._trace(0x52, oid, start_tid, dlen=len(data))
else:
L = self.noncurrent.setdefault(oid, [])
p = start_tid, end_tid
if p in L:
return # duplicate store
if not self.fc.add(o):
return # too large
bisect.insort_left(L, p)
self._trace(0x54, oid, start_tid, end_tid, dlen=len(data))
##
# Remove all knowledge of noncurrent revisions of oid, both in
# self.noncurrent and in our FileCache. `tid` is used
# only for trace records.
def _remove_noncurrent_revisions(self, oid, tid):
noncurrent_list = self.noncurrent.get(oid)
if noncurrent_list:
# Note: must iterate over a copy of noncurrent_list. The
# FileCache remove() calls our _evicted() method, and that
# mutates the list.
for old_tid, dummy in noncurrent_list[:]:
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, tid)
self.fc.remove((oid, old_tid))
# fc.remove() calling back to _evicted() should have removed
# the list from noncurrent when the last non-current revision
# was removed.
assert oid not in self.noncurrent
##
# If `tid` is None, forget all knowledge of `oid`. (`tid` can be
# None only for invalidations generated by startup cache
# verification.) If `tid` isn't None, and we had current data for
# `oid`, stop believing we have current data, and mark the data we
# had as being valid only up to `tid`. In all other cases, do
# nothing.
# @param oid object id
# @param tid the id of the transaction that wrote a new revision of oid,
# or None to forget all cached info about oid (current
# revision, and non-current revisions)
def invalidate(self, oid, tid):
if tid > self.fc.tid and tid is not None:
self.fc.settid(tid)
remove_all_knowledge_of_oid = tid is None
if remove_all_knowledge_of_oid:
self._remove_noncurrent_revisions(oid, tid)
# Only current data remains to be handled.
cur_tid = self.current.get(oid)
if not cur_tid:
# 0x10 == invalidate (miss)
self._trace(0x10, oid, tid)
return
# We had current data for oid, but no longer.
if remove_all_knowledge_of_oid:
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, tid)
self.fc.remove((oid, cur_tid))
assert cur_tid not in self.current # .remove() got rid of it
return
# Add the data we have to the list of non-current data for oid.
assert tid is not None and cur_tid <= tid
# 0x1C = invalidate (hit, saving non-current)
self._trace(0x1C, oid, tid)
del self.current[oid] # because we no longer have current data
# Update the end_tid half of oid's validity range on disk.
# TODO: Want to fetch object without marking it as accessed.
o = self.fc.access((oid, cur_tid))
assert o is not None
assert o.end_tid is None # i.e., o was current
if o is None:
# TODO: Since we asserted o is not None above, this block
# should be removed; waiting on time to prove it can't happen.
return
o.end_tid = tid
self.fc.update(o) # record the new end_tid on disk
# Add to oid's list of non-current data.
L = self.noncurrent.setdefault(oid, [])
bisect.insort_left(L, (cur_tid, tid))
##
# Return the number of object revisions in the cache.
#
# Or maybe better to just return len(self.cache)? Needs clearer use case.
def __len__(self):
n = len(self.current)
if self.noncurrent:
n += sum(map(len, self.noncurrent))
return n
##
# Generates (oid, serial) pairs for all objects in the
# cache. This generator is used by cache verification.
def contents(self):
# May need to materialize list instead of iterating;
# depends on whether the caller may change the cache.
for o in self.fc:
oid, tid = o.key
yield oid, tid
def dump(self):
from ZODB.utils import oid_repr
print "cache size", len(self)
L = list(self.contents())
L.sort()
for oid, tid in L:
print oid_repr(oid), oid_repr(tid)
print "dll contents"
L = list(self.fc)
L.sort(lambda x, y: cmp(x.key, y.key))
for x in L:
end_tid = x.end_tid or z64
print oid_repr(x.key[0]), oid_repr(x.key[1]), oid_repr(end_tid)
print
def _evicted(self, o):
# Called by the FileCache to signal that Object o has been evicted.
oid, tid = o.key
if o.end_tid is None:
del self.current[oid]
else:
# Although we use bisect to keep the list sorted,
# we never expect the list to be very long. So the
# brute force approach should normally be fine.
L = self.noncurrent[oid]
element = (o.start_tid, o.end_tid)
if len(L) == 1:
# We don't want to leave an empty list in the dict: if
# the oid is never referenced again, it would consume RAM
# forever more for no purpose.
assert L[0] == element
del self.noncurrent[oid]
else:
L.remove(element)
# If `path` isn't None (== we're using a persistent cache file), and
# envar ZEO_CACHE_TRACE is set to a non-empty value, try to open
# path+'.trace' as a trace file, and store the file object in
# self._tracefile. If not, or we can't write to the trace file, disable
# tracing by setting self._trace to a dummy function, and set
# self._tracefile to None.
def _setup_trace(self, path):
self._tracefile = None
if path and os.environ.get("ZEO_CACHE_TRACE"):
tfn = path + ".trace"
try:
self._tracefile = open(tfn, "ab")
self._trace(0x00)
except IOError, msg:
self._tracefile = None
logger.warning("cannot write tracefile %r (%s)", tfn, msg)
else:
logger.info("opened tracefile %r", tfn)
if self._tracefile is None:
def notrace(*args, **kws):
pass
self._trace = notrace
def _trace(self,
code, oid="", tid=z64, end_tid=z64, dlen=0,
# The next two are just speed hacks.
time_time=time.time, struct_pack=struct.pack):
# The code argument is two hex digits; bits 0 and 7 must be zero.
# The first hex digit shows the operation, the second the outcome.
# This method has been carefully tuned to be as fast as possible.
# Note: when tracing is disabled, this method is hidden by a dummy.
encoded = (dlen + 255) & 0x7fffff00 | code
if tid is None:
tid = z64
if end_tid is None:
end_tid = z64
try:
self._tracefile.write(
struct_pack(">iiH8s8s",
time_time(),
encoded,
len(oid),
tid, end_tid) + oid)
except:
print `tid`, `end_tid`
raise
##
# An Object stores the cached data for a single object.
# <p>
# The cached data includes the actual object data, the key, and two
# data fields that describe the validity period of the
# object. The key contains the oid and a redundant start_tid. The
# actual size of an object is variable, depending on the size of the
# data.
# <p>
# The serialized format does not include the key, because it is stored
# in the header used by the cache file's storage format.
# <p>
# Instances of Object are generally short-lived -- they're really a way to
# package data on the way to or from the disk file.
class Object(object):
__slots__ = (# pair (object id, txn id) -- something usable as a dict key;
# the second part of the pair is equal to start_tid
"key",
# string, tid of txn that wrote the data
"start_tid",
# string, tid of txn that wrote next revision, or None
# if the data is current; if not None, end_tid is strictly
# greater than start_tid
"end_tid",
# string, the actual data record for the object
"data",
# total size of serialized object; this includes the
# data and all overhead (header) bytes.
"size",
)
# A serialized Object on disk looks like:
#
# offset # bytes value
# ------ ------- -----
# 0 8 end_tid; string
# 8 4 len(data); 4-byte signed int
# 12 len(data) the object pickle; string
# 12+len(data) 8 oid; string
# The serialization format uses an end tid of "\0"*8 (z64), the least
# 8-byte string, to represent None. It isn't possible for an end_tid
# to be 0, because it must always be strictly greater than the start_tid.
fmt = ">8si" # end_tid, len(self.data)
FIXED_HEADER_SIZE = struct.calcsize(fmt)
assert FIXED_HEADER_SIZE == 12
TOTAL_FIXED_SIZE = FIXED_HEADER_SIZE + 8 # +8 for the oid at the end
def __init__(self, key, data, start_tid, end_tid):
self.key = key
self.data = data
self.start_tid = start_tid
self.end_tid = end_tid
# The size of the serialized object on disk, including the
# 14-byte header, the length of data, and a
# copy of the 8-byte oid.
if data is not None:
self.size = self.TOTAL_FIXED_SIZE + len(data)
##
# Return the fixed-sized serialization header as a string: pack end_tid,
# and the length of the .data members.
def get_header(self):
return struct.pack(self.fmt,
self.end_tid or z64,
len(self.data))
##
# Write the serialized representation of self to file f, at its current
# position.
def serialize(self, f):
f.writelines([self.get_header(),
self.data,
self.key[0]])
##
# Write the fixed-size header for self, to file f at its current position.
# The only real use for this is when the current revision of an object
# in cache is invalidated. Then the end_tid field gets set to the tid
# of the transaction that caused the invalidation.
def serialize_header(self, f):
f.write(self.get_header())
##
# fromFile is a class constructor, unserializing an Object from the
# current position in file f. Exclusive access to f for the duration
# is assumed. The key is a (oid, start_tid) pair, and the oid must
# match the serialized oid. If `skip_data` is true, .data is left
# None in the Object returned, but all the other fields are populated.
# Else (`skip_data` is false, the default), all fields including .data
# are populated. .data can be big, so it's prudent to skip it when it
# isn't needed.
def fromFile(cls, f, key, skip_data=False):
s = f.read(cls.FIXED_HEADER_SIZE)
if not s:
return None
oid, start_tid = key
end_tid, dlen = struct.unpack(cls.fmt, s)
if end_tid == z64:
end_tid = None
if skip_data:
data = None
f.seek(dlen, 1)
else:
data = f.read(dlen)
if dlen != len(data):
raise ValueError("corrupted record, data")
s = f.read(8)
if s != oid:
raise ValueError("corrupted record, oid")
return cls((oid, start_tid), data, start_tid, end_tid)
fromFile = classmethod(fromFile)
# Entry just associates a key with a file offset. It's used by FileCache.
class Entry(object):
__slots__ = (# object key -- something usable as a dict key.
'key',
# Offset from start of file to the object's data
# record; this includes all overhead bytes (status
# byte, size bytes, etc). The size of the data
# record is stored in the file near the start of the
# record, but for efficiency we also keep size in a
# dict (filemap; see later).
'offset',
)
def __init__(self, key=None, offset=None):
self.key = key
self.offset = offset
#
##
# FileCache stores a cache in a single on-disk file.
#
# On-disk cache structure.
#
# The file begins with a 12-byte header. The first four bytes are the
# file's magic number - ZEC4 - indicating zeo cache version 4. The
# file's magic number - ZEC3 - indicating zeo cache version 4. The
# next eight bytes are the last transaction id.
magic = "ZEC4"
ZEC4_HEADER_SIZE = 12
ZEC_HEADER_SIZE = 12
# After the header, the file contains a contiguous sequence of blocks. All
# blocks begin with a one-byte status indicator:
......@@ -596,8 +92,8 @@ ZEC4_HEADER_SIZE = 12
# Free. The block is free; the next 8 bytes are >Q format total
# block size.
#
# '1', '2', '3', '4', '5', '6', '7', '8'
# The block is free, and consists of 1-8 bytes total.
# '1', '2', '3', '4'
# The block is free, and consists of 1, 2, 3 or 4 bytes total.
#
# "Total" includes the status byte, and size bytes. There are no
# empty (size 0) blocks.
......@@ -607,11 +103,11 @@ ZEC4_HEADER_SIZE = 12
#
# 1 byte allocation status ('a').
# 4 bytes block size, >I format.
# 16 bytes oid + tid, string.
# size-OBJECT_HEADER_SIZE bytes, the serialization of an Object (see
# class Object for details).
OBJECT_HEADER_SIZE = 1 + 4 + 16
# 8 byte oid
# 8 byte start_tid
# 8 byte end_tid
# 4 byte data size
# data
# The cache's currentofs goes around the file, circularly, forever.
# It's always the starting offset of some block.
......@@ -621,77 +117,64 @@ OBJECT_HEADER_SIZE = 1 + 4 + 16
# blocks needed to make enough room for the new object are evicted,
# starting at currentofs. Exception: if currentofs is close enough
# to the end of the file that the new object can't fit in one
# contiguous chunk, currentofs is reset to ZEC4_HEADER_SIZE first.
# contiguous chunk, currentofs is reset to ZEC_HEADER_SIZE first.
# Do all possible to ensure that the bytes we wrote to file f are really on
# disk.
def sync(f):
f.flush()
if hasattr(os, 'fsync'):
os.fsync(f.fileno())
class FileCache(object):
class ClientCache(object):
"""A simple in-memory cache."""
# The default size of 200MB makes a lot more sense than the traditional
# default of 20MB. The default here is misleading, though, since
# ClientStorage is the only user of ClientCache, and it always passes an
# explicit size of its own choosing.
def __init__(self, path=None, size=200*1024**2):
# - `path`: filepath for the cache file, or None (in which case
# a temp file will be created)
self.path = path
def __init__(self, maxsize, fpath, parent):
# - `maxsize`: total size of the cache file, in bytes; this is
# ignored path names an existing file; perhaps we should attempt
# to change the cache size in that case
# - `fpath`: filepath for the cache file, or None (in which case
# a temp file will be created)
# - `parent`: the ClientCache instance; its `_evicted()` method
# is called whenever we need to evict an object to make room in
# the file
self.maxsize = maxsize
self.parent = parent
self.maxsize = size
# The number of records in the cache.
self._len = 0
# {oid -> pos}
self.current = ZODB.fsIndex.fsIndex()
# {oid -> {tid->pos}}
# Note that caches in the wild seem to have very little non-current
# data, so this would seem to have little impact on memory consumption.
# I wonder if we even need to store non-current data in the cache.
self.noncurrent = BTrees.LOBTree.LOBTree()
# tid for the most recent transaction we know about. This is also
# stored near the start of the file.
self.tid = None
# There's one Entry instance, kept in memory, for each currently
# allocated block in the file, and there's one allocated block in the
# file per serialized Object. filemap retrieves the Entry given the
# starting offset of a block, and key2entry retrieves the Entry given
# an object revision's key (an (oid, start_tid) pair). From an
# Entry, we can get the Object's key and file offset.
# Map offset in file to pair (data record size, Entry).
# Entry is None iff the block starting at offset is free.
# filemap always contains a complete account of what's in the
# file -- study method _verify_filemap for executable checking
# of the relevant invariants. An offset is at the start of a
# block iff it's a key in filemap. The data record size is
# stored in the file too, so we could just seek to the offset
# and read it up; keeping it in memory is an optimization.
self.filemap = {}
# Map key to Entry. After
# obj = key2entry[key]
# then
# obj.key == key
# is true. An object is currently stored on disk iff its key is in
# key2entry.
self.key2entry = {}
# Always the offset into the file of the start of a block.
# New and relocated objects are always written starting at
# currentofs.
self.currentofs = ZEC4_HEADER_SIZE
self.currentofs = ZEC_HEADER_SIZE
# self.f is the open file object.
# When we're not reusing an existing file, self.f is left None
# here -- the scan() method must be called then to open the file
# (and it sets self.f).
self.fpath = fpath
self.f = None
if fpath and os.path.exists(fpath):
if path:
self._lock_file = ZODB.lock_file.LockFile(path + '.lock')
if path and os.path.exists(path):
# Reuse an existing file. scan() will open & read it.
logger.info("reusing persistent cache file %r", fpath)
elif self.maxsize >= 12:
if fpath:
self.f = open(fpath, 'wb+')
logger.info("created persistent cache file %r", fpath)
self.f = None
logger.info("reusing persistent cache file %r", path)
else:
if path:
self.f = open(path, 'wb+')
logger.info("created persistent cache file %r", path)
else:
self.f = tempfile.TemporaryFile()
logger.info("created temporary cache file %r", self.f.name)
......@@ -704,35 +187,42 @@ class FileCache(object):
self.f.write(magic)
self.f.write(z64)
# and one free block.
self.f.write('f' + struct.pack(">Q", self.maxsize -
ZEC4_HEADER_SIZE))
self.sync()
self.filemap[ZEC4_HEADER_SIZE] = (self.maxsize - ZEC4_HEADER_SIZE,
None)
self.f.write('f' + pack(">Q", self.maxsize - ZEC_HEADER_SIZE))
sync(self.f)
# Statistics: _n_adds, _n_added_bytes,
# _n_evicts, _n_evicted_bytes,
# _n_accesses
self.clearStats()
self._setup_trace(path)
# Backward compatibility. Client code used to have to use the fc
# attr to get to the file cache to get cache stats.
@property
def fc(self):
return self
##
# Scan the current contents of the cache file, calling `install`
# for each object found in the cache. This method should only
# be called once to initialize the cache from disk.
def scan(self, install):
def open(self):
if self.f is not None: # we're not (re)using a pre-existing file
return
fsize = os.path.getsize(self.fpath)
fsize = os.path.getsize(self.path)
if fsize != self.maxsize:
logger.warning("existing cache file %r has size %d; "
"requested size %d ignored", self.fpath,
"requested size %d ignored", self.path,
fsize, self.maxsize)
self.maxsize = fsize
self.f = open(self.fpath, 'rb+')
_magic = self.f.read(4)
self.f = open(self.path, 'rb+')
read = self.f.read
seek = self.f.seek
_magic = read(4)
if _magic != magic:
raise ValueError("unexpected magic number: %r" % _magic)
self.tid = self.f.read(8)
self.tid = read(8)
if len(self.tid) != 8:
raise ValueError("cache file too small -- no tid at start")
......@@ -740,38 +230,48 @@ class FileCache(object):
# file, and tell our parent about it too (via the `install` callback).
# Remember the location of the largest free block. That seems a
# decent place to start currentofs.
max_free_size = max_free_offset = 0
ofs = ZEC4_HEADER_SIZE
max_free_size = l = 0
ofs = max_free_offset = ZEC_HEADER_SIZE
current = self.current
while ofs < fsize:
self.f.seek(ofs)
ent = None
status = self.f.read(1)
seek(ofs)
status = read(1)
if status == 'a':
size, rawkey = struct.unpack(">I16s", self.f.read(20))
key = rawkey[:8], rawkey[8:]
assert key not in self.key2entry
self.key2entry[key] = ent = Entry(key, ofs)
install(self.f, ent)
size, oid, start_tid, end_tid = unpack(">I8s8s8s", read(28))
if end_tid == z64:
current[oid] = ofs
else:
assert start_tid < end_tid
self._set_noncurrent(oid, start_tid, ofs)
l += 1
elif status == 'f':
size, = struct.unpack(">Q", self.f.read(8))
size, = unpack(">Q", read(8))
elif status in '12345678':
size = int(status)
else:
raise ValueError("unknown status byte value %s in client "
"cache file" % 0, hex(ord(status)))
self.filemap[ofs] = size, ent
if ent is None and size > max_free_size:
max_free_size, max_free_offset = size, ofs
ofs += size
if ofs != fsize:
raise ValueError("final offset %s != file size %s in client "
"cache file" % (ofs, fsize))
if __debug__:
self._verify_filemap()
self.currentofs = max_free_offset
self._len = l
def _set_noncurrent(self, oid, tid, ofs):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
noncurrent_for_oid = BTrees.LLBTree.LLBucket()
self.noncurrent[u64(oid)] = noncurrent_for_oid
noncurrent_for_oid[u64(tid)] = ofs
def _del_noncurrent(self, oid, tid):
noncurrent_for_oid = self.noncurrent[u64(oid)]
del noncurrent_for_oid[u64(tid)]
if not noncurrent_for_oid:
del self.noncurrent[u64(oid)]
def clearStats(self):
self._n_adds = self._n_added_bytes = 0
......@@ -787,37 +287,23 @@ class FileCache(object):
##
# The number of objects currently in the cache.
def __len__(self):
return len(self.key2entry)
##
# Iterate over the objects in the cache, producing an Entry for each.
def __iter__(self):
return self.key2entry.itervalues()
##
# Test whether an (oid, tid) pair is in the cache.
def __contains__(self, key):
return key in self.key2entry
##
# Do all possible to ensure all bytes written to the file so far are
# actually on disk.
def sync(self):
sync(self.f)
return self._len
##
# Close the underlying file. No methods accessing the cache should be
# used after this.
def close(self):
if hasattr(self,'_lock_file'):
self._lock_file.close()
if self.f:
self.sync()
sync(self.f)
self.f.close()
self.f = None
##
# Evict objects as necessary to free up at least nbytes bytes,
# starting at currentofs. If currentofs is closer than nbytes to
# the end of the file, currentofs is reset to ZEC4_HEADER_SIZE first.
# the end of the file, currentofs is reset to ZEC_HEADER_SIZE first.
# The number of bytes actually freed may be (and probably will be)
# greater than nbytes, and is _makeroom's return value. The file is not
# altered by _makeroom. filemap and key2entry are updated to reflect the
......@@ -826,34 +312,170 @@ class FileCache(object):
# freed (starting at currentofs when _makeroom returns, and
# spanning the number of bytes retured by _makeroom).
def _makeroom(self, nbytes):
assert 0 < nbytes <= self.maxsize - ZEC4_HEADER_SIZE
assert nbytes <= max32
assert 0 < nbytes <= self.maxsize - ZEC_HEADER_SIZE
if self.currentofs + nbytes > self.maxsize:
self.currentofs = ZEC4_HEADER_SIZE
self.currentofs = ZEC_HEADER_SIZE
ofs = self.currentofs
seek = self.f.seek
read = self.f.read
current = self.current
while nbytes > 0:
size, e = self.filemap.pop(ofs)
if e is not None:
del self.key2entry[e.key]
seek(ofs)
status = read(1)
if status == 'a':
size, oid, start_tid, end_tid = unpack(">I8s8s8s", read(28))
self._n_evicts += 1
self._n_evicted_bytes += size
# Load the object header into memory so we know how to
# update the parent's in-memory data structures.
self.f.seek(e.offset + OBJECT_HEADER_SIZE)
o = Object.fromFile(self.f, e.key, skip_data=True)
self.parent._evicted(o)
if end_tid == z64:
del current[oid]
else:
self._del_noncurrent(oid, start_tid)
self._len -= 1
else:
if status == 'f':
size = unpack(">Q", read(8))[0]
else:
assert status in '12345678'
size = int(status)
ofs += size
nbytes -= size
return ofs - self.currentofs
##
# Write Object obj, with data, to file starting at currentofs.
# nfreebytes are already available for overwriting, and it's
# guranteed that's enough. obj.offset is changed to reflect the
# new data record position, and filemap and key2entry are updated to
# match.
def _writeobj(self, obj, nfreebytes):
size = OBJECT_HEADER_SIZE + obj.size
# Update our idea of the most recent tid. This is stored in the
# instance, and also written out near the start of the cache file. The
# new tid must be strictly greater than our current idea of the most
# recent tid.
def setLastTid(self, tid):
if self.tid is not None and tid <= self.tid:
raise ValueError("new last tid (%s) must be greater than "
"previous one (%s)" % (u64(tid),
u64(self.tid)))
assert isinstance(tid, str) and len(tid) == 8
self.tid = tid
self.f.seek(len(magic))
self.f.write(tid)
self.f.flush()
##
# Return the last transaction seen by the cache.
# @return a transaction id
# @defreturn string, or None if no transaction is yet known
def getLastTid(self):
if self.tid == z64:
return None
else:
return self.tid
##
# Return the current data record for oid.
# @param oid object id
# @return (data record, serial number, tid), or None if the object is not
# in the cache
# @defreturn 3-tuple: (string, string, string)
def load(self, oid):
ofs = self.current.get(oid)
if ofs is None:
self._trace(0x20, oid)
return None
self.f.seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, tid, end_tid, ldata = unpack(
">I8s8s8sI", read(32))
assert saved_oid == oid
data = read(ldata)
assert len(data) == ldata
assert read(8) == oid
self._n_accesses += 1
self._trace(0x22, oid, tid, end_tid, ldata)
return data, tid
##
# Return a non-current revision of oid that was current before tid.
# @param oid object id
# @param tid id of transaction that wrote next revision of oid
# @return data record, serial number, start tid, and end tid
# @defreturn 4-tuple: (string, string, string, string)
def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
self._trace(0x24, oid, "", before_tid)
return None
items = noncurrent_for_oid.items(None, u64(before_tid)-1)
if not items:
self._trace(0x24, oid, "", before_tid)
return None
tid, ofs = items[-1]
self.f.seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, saved_tid, end_tid, ldata = unpack(
">I8s8s8sI", read(32))
assert saved_oid == oid
assert saved_tid == p64(tid)
assert end_tid != z64
data = read(ldata)
assert len(data) == ldata
assert read(8) == oid
if end_tid < before_tid:
self._trace(0x24, oid, "", before_tid)
return None
self._n_accesses += 1
self._trace(0x26, oid, "", saved_tid)
return data, saved_tid, end_tid
##
# Store a new data record in the cache.
# @param oid object id
# @param start_tid the id of the transaction that wrote this revision
# @param end_tid the id of the transaction that created the next
# revision of oid. If end_tid is None, the data is
# current.
# @param data the actual data
def store(self, oid, start_tid, end_tid, data):
seek = self.f.seek
if end_tid is None:
ofs = self.current.get(oid)
if ofs:
seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, saved_tid, end_tid = unpack(
">I8s8s8s", read(28))
assert saved_oid == oid
assert end_tid == z64
if saved_tid == start_tid:
return
raise ValueError("already have current data for oid")
else:
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid):
return
size = 41 + len(data)
# A number of cache simulation experiments all concluded that the
# 2nd-level ZEO cache got a much higher hit rate if "very large"
# objects simply weren't cached. For now, we ignore the request
# only if the entire cache file is too small to hold the object.
if size > self.maxsize - ZEC_HEADER_SIZE:
return
self._n_adds += 1
self._n_added_bytes += size
self._len += 1
nfreebytes = self._makeroom(size)
assert size <= nfreebytes
excess = nfreebytes - size
# If there's any excess (which is likely), we need to record a
......@@ -864,144 +486,158 @@ class FileCache(object):
elif excess < 9:
extra = "012345678"[excess]
else:
extra = 'f' + struct.pack(">Q", excess)
self.f.seek(self.currentofs)
self.f.writelines(('a',
struct.pack(">I8s8s", size,
obj.key[0], obj.key[1])))
obj.serialize(self.f)
self.f.write(extra)
e = Entry(obj.key, self.currentofs)
self.key2entry[obj.key] = e
self.filemap[self.currentofs] = size, e
extra = 'f' + pack(">Q", excess)
ofs = self.currentofs
seek(ofs)
write = self.f.write
# Before writing data, we'll write a free block for the space freed.
# We'll come back with a last atomic write to rewrite the start of the
# allocated-block header.
write('f'+pack(">Q", nfreebytes)+'xxxx')
# Now write the rest of the allocation block header and object data.
write(pack(">8s8sI", start_tid, end_tid or z64, len(data)))
write(data)
write(oid)
write(extra)
# Now, we'll go back and rewrite the beginning of the
# allocated block header.
seek(ofs)
write('a'+pack(">I8s", size, oid))
if end_tid:
self._set_noncurrent(oid, start_tid, ofs)
self._trace(0x54, oid, start_tid, end_tid, dlen=len(data))
else:
self.current[oid] = ofs
self._trace(0x52, oid, start_tid, dlen=len(data))
self.currentofs += size
if excess:
# We need to record the free block in filemap, but there's
# no need to advance currentofs beyond it. Instead it
# gives some breathing room for the next object to get
# written.
self.filemap[self.currentofs] = excess, None
##
# Add Object object to the cache. This may evict existing objects, to
# make room (and almost certainly will, in steady state once the cache
# is first full). The object must not already be in the cache. If the
# object is too large for the cache, False is returned, otherwise True.
def add(self, object):
size = OBJECT_HEADER_SIZE + object.size
# A number of cache simulation experiments all concluded that the
# 2nd-level ZEO cache got a much higher hit rate if "very large"
# objects simply weren't cached. For now, we ignore the request
# only if the entire cache file is too small to hold the object.
if size > self.maxsize - ZEC4_HEADER_SIZE:
return False
assert object.key not in self.key2entry
assert len(object.key[0]) == 8
assert len(object.key[1]) == 8
# If `tid` is None,
# forget all knowledge of `oid`. (`tid` can be None only for
# invalidations generated by startup cache verification.) If `tid`
# isn't None, and we had current
# data for `oid`, stop believing we have current data, and mark the
# data we had as being valid only up to `tid`. In all other cases, do
# nothing.
# @param oid object id
# @param tid the id of the transaction that wrote a new revision of oid,
# or None to forget all cached info about oid.
def invalidate(self, oid, tid):
if tid > self.tid and tid is not None:
self.setLastTid(tid)
self._n_adds += 1
self._n_added_bytes += size
ofs = self.current.get(oid)
if ofs is None:
# 0x10 == invalidate (miss)
self._trace(0x10, oid, tid)
return
available = self._makeroom(size)
self._writeobj(object, available)
return True
self.f.seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, saved_tid, end_tid = unpack(">I8s8s8s", read(28))
assert saved_oid == oid
assert end_tid == z64
del self.current[oid]
if tid is None:
self.f.seek(ofs)
self.f.write('f'+pack(">Q", size))
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, tid)
self._len -= 1
else:
self.f.seek(ofs+21)
self.f.write(tid)
self._set_noncurrent(oid, saved_tid, ofs)
# 0x1C = invalidate (hit, saving non-current)
self._trace(0x1C, oid, tid)
##
# Return Object for key, or None if not in cache.
def access(self, key):
self._n_accesses += 1
e = self.key2entry.get(key)
if e is None:
return None
offset = e.offset
size, e2 = self.filemap[offset]
assert e is e2
# Generates (oid, serial) oairs for all objects in the
# cache. This generator is used by cache verification.
def contents(self):
# May need to materialize list instead of iterating;
# depends on whether the caller may change the cache.
seek = self.f.seek
read = self.f.read
for oid, ofs in self.current.iteritems():
seek(ofs)
assert read(1) == 'a'
size, saved_oid, tid, end_tid = unpack(">I8s8s8s", read(28))
assert saved_oid == oid
assert end_tid == z64
yield oid, tid
self.f.seek(offset + OBJECT_HEADER_SIZE)
return Object.fromFile(self.f, key)
def dump(self):
from ZODB.utils import oid_repr
print "cache size", len(self)
L = list(self.contents())
L.sort()
for oid, tid in L:
print oid_repr(oid), oid_repr(tid)
print "dll contents"
L = list(self)
L.sort(lambda x, y: cmp(x.key, y.key))
for x in L:
end_tid = x.end_tid or z64
print oid_repr(x.key[0]), oid_repr(x.key[1]), oid_repr(end_tid)
print
##
# Remove Object for key from cache, if present.
def remove(self, key):
# If an object is being explicitly removed, we need to load
# its header into memory and write a free block marker to the
# disk where the object was stored. We need to load the
# header to update the in-memory data structures held by
# ClientCache.
# We could instead just keep the header in memory at all times.
e = self.key2entry.pop(key, None)
if e is None:
# If `path` isn't None (== we're using a persistent cache file), and
# envar ZEO_CACHE_TRACE is set to a non-empty value, try to open
# path+'.trace' as a trace file, and store the file object in
# self._tracefile. If not, or we can't write to the trace file, disable
# tracing by setting self._trace to a dummy function, and set
# self._tracefile to None.
def _setup_trace(self, path):
_tracefile = None
if path and os.environ.get("ZEO_CACHE_TRACE"):
tfn = path + ".trace"
try:
_tracefile = open(tfn, "ab")
except IOError, msg:
logger.warning("cannot write tracefile %r (%s)", tfn, msg)
else:
logger.info("opened tracefile %r", tfn)
if _tracefile is None:
self._trace = lambda *a, **k: None
return
offset = e.offset
size, e2 = self.filemap[offset]
assert e is e2
self.filemap[offset] = size, None
self.f.seek(offset + OBJECT_HEADER_SIZE)
o = Object.fromFile(self.f, key, skip_data=True)
assert size >= 9 # only free blocks are tiny
self.f.seek(offset)
self.f.write('f' + struct.pack(">Q", size))
self.f.flush()
self.parent._evicted(o)
##
# Update on-disk representation of Object obj.
#
# This method should be called when the object header is modified.
# obj must be in the cache. The only real use for this is during
# invalidation, to set the end_tid field on a revision that was current
# (and so had an end_tid of None, but no longer does).
def update(self, obj):
e = self.key2entry[obj.key]
self.f.seek(e.offset + OBJECT_HEADER_SIZE)
obj.serialize_header(self.f)
now = time.time
def _trace(code, oid="", tid=z64, end_tid=z64, dlen=0):
# The code argument is two hex digits; bits 0 and 7 must be zero.
# The first hex digit shows the operation, the second the outcome.
# This method has been carefully tuned to be as fast as possible.
# Note: when tracing is disabled, this method is hidden by a dummy.
encoded = (dlen + 255) & 0x7fffff00 | code
if tid is None:
tid = z64
if end_tid is None:
end_tid = z64
try:
_tracefile.write(
pack(">iiH8s8s",
now(), encoded, len(oid), tid, end_tid) + oid,
)
except:
print `tid`, `end_tid`
raise
##
# Update our idea of the most recent tid. This is stored in the
# instance, and also written out near the start of the cache file. The
# new tid must be strictly greater than our current idea of the most
# recent tid.
def settid(self, tid):
if self.tid is not None and tid <= self.tid:
raise ValueError("new last tid (%s) must be greater than "
"previous one (%s)" % (u64(tid),
u64(self.tid)))
assert isinstance(tid, str) and len(tid) == 8
self.tid = tid
self.f.seek(len(magic))
self.f.write(tid)
self.f.flush()
self._trace = _trace
_trace(0x00)
##
# This debug method marches over the entire cache file, verifying that
# the current contents match the info in self.filemap and self.key2entry.
def _verify_filemap(self, display=False):
a = ZEC4_HEADER_SIZE
f = self.f
while a < self.maxsize:
f.seek(a)
status = f.read(1)
if status == 'a':
size, = struct.unpack(">I", f.read(4))
elif status == 'f':
size, = struct.unpack(">Q", f.read(8))
else:
size = int(status)
if display:
if a == self.currentofs:
print '*****',
print "%c%d" % (status, size),
size2, obj = self.filemap[a]
assert size == size2
assert (obj is not None) == (status == 'a')
if obj is not None:
assert obj.offset == a
assert self.key2entry[obj.key] is obj
a += size
if display:
print
assert a == self.maxsize
def sync(f):
f.flush()
if hasattr(os, 'fsync'):
def sync(f):
f.flush()
os.fsync(f.fileno())
====================================
The client cache file implementation
====================================
This test exercises the FileCache implementation which is responsible for
maintaining the ZEO client cache on disk. Specifics of persistent cache files
are not tested.
As the FileCache calls back to the client cache we'll use a dummy to monitor
those calls:
>>> from ZEO.tests.test_cache import ClientCacheDummy, oid
>>> tid = oid
>>> cache_dummy = ClientCacheDummy()
We'll instanciate a FileCache with 200 bytes of space:
>>> from ZEO.cache import FileCache
>>> fc = FileCache(maxsize=200, fpath=None, parent=cache_dummy)
Initially the cache is empty:
>>> len(fc)
0
>>> list(fc)
[]
>>> fc.getStats()
(0, 0, 0, 0, 0)
Basic usage
===========
Objects are represented in the cache using a special `Object` object. Let's
start with an object of the size 100 bytes:
>>> from ZEO.cache import Object
>>> obj1_1 = Object(key=(oid(1), tid(1)), data='#'*100,
... start_tid=tid(1), end_tid=None)
Notice that the actual object size is a bit larger because of the headers that
are written for each object:
>>> obj1_1.size
120
Initially the object is not in the cache:
>>> (oid(1), tid(1)) in fc
False
We can add it to the cache:
>>> fc.add(obj1_1)
True
And now it's in the cache:
>>> (oid(1), tid(1)) in fc
True
>>> len(fc)
1
We can get it back and the object will be equal but not identical to the one we
stored:
>>> obj1_1_copy = fc.access((oid(1), tid(1)))
>>> obj1_1_copy.data == obj1_1.data
True
>>> obj1_1_copy.key == obj1_1.key
True
>>> obj1_1_copy is obj1_1
False
The cache allows us to iterate over all entries in it:
>>> list(fc) # doctest: +ELLIPSIS
[<ZEO.cache.Entry object at 0x...>]
When an object gets superseded we can update it. This only modifies the header,
not the actual data. This is useful when invalidations tell us about the
`end_tid` of an object:
>>> obj1_1.data = '.' * 100
>>> obj1_1.end_tid = tid(2)
>>> fc.update(obj1_1)
When loading it again we can see that the data was not changed:
>>> obj1_1_copy = fc.access((oid(1), tid(1)))
>>> obj1_1_copy.data # doctest: +ELLIPSIS
'#############...################'
>>> obj1_1_copy.end_tid
'\x00\x00\x00\x00\x00\x00\x00\x02'
Objects can be explicitly removed from the cache:
>>> fc.remove((oid(1), tid(1)))
>>> len(fc)
0
>>> (oid(1), tid(1)) in fc
False
Evicting objects
================
When the cached data consumes the whole cache file and more objects need to be
stored the oldest stored objects are evicted until enough space is available.
In the next sections we'll exercise some of the special cases of the file
format and look at the cache after each step.
The current state is a cache with two records: the one object which we removed
from the cache and another free record the reaches to the end of the file.
The first record has a size of 141 bytes:
141 = 1 ('f') + 4 (size) + 8 (OID) + 8 (TID) + 8 (end_tid) +
4 (data length) + 100 (old data) + 8 (OID)
The second record has a size of 47 bytes:
47 = 1 ('f') + 8 (size) + 38 (free space)
Note that the last byte is an 'x' because the initialisation of the cache file
forced the absolute size of the file by seeking to byte 200 and writing an 'x'.
>>> from ZEO.tests.test_cache import hexprint
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 66 00 00 00 |ZEC4........f...|
00000010 00 00 00 00 8d 00 00 00 01 00 00 00 00 00 00 00 |................|
00000020 01 00 00 00 00 00 00 00 02 00 00 00 64 23 23 23 |............d###|
00000030 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000040 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000050 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000060 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000070 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000080 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000090 23 00 00 00 00 00 00 00 01 66 00 00 00 00 00 00 |#........f......|
000000a0 00 2f 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |./..............|
000000b0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
000000c0 00 00 00 00 00 00 00 78 |.......x |
Case 1: Allocating a new block that fits after the last used one
>>> obj2_1 = Object(key=(oid(2), tid(1)), data='******',
... start_tid=tid(1), end_tid=None)
>>> fc.add(obj2_1)
True
The new block fits exactly in the remaining 47 bytes (41 bytes header + 6
bytes payload) so the beginning of the data is the same except for the last 47
bytes:
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 66 00 00 00 |ZEC4........f...|
00000010 00 00 00 00 8d 00 00 00 01 00 00 00 00 00 00 00 |................|
00000020 01 00 00 00 00 00 00 00 02 00 00 00 64 23 23 23 |............d###|
00000030 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000040 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000050 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000060 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000070 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000080 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 23 |################|
00000090 23 00 00 00 00 00 00 00 01 61 00 00 00 2f 00 00 |#........a.../..|
000000a0 00 00 00 00 00 02 00 00 00 00 00 00 00 01 00 00 |................|
000000b0 00 00 00 00 00 00 00 00 00 06 2a 2a 2a 2a 2a 2a |..........******|
000000c0 00 00 00 00 00 00 00 02 |........ |
Case 2: Allocating a block that wraps around and frees *exactly* one block
>>> obj3_1 = Object(key=(oid(3), tid(1)), data='@'*100,
... start_tid=tid(1), end_tid=None)
>>> fc.add(obj3_1)
True
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 61 00 00 00 |ZEC4........a...|
00000010 8d 00 00 00 00 00 00 00 03 00 00 00 00 00 00 00 |................|
00000020 01 00 00 00 00 00 00 00 00 00 00 00 64 40 40 40 |............d@@@|
00000030 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000040 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000050 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000060 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000070 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000080 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000090 40 00 00 00 00 00 00 00 03 61 00 00 00 2f 00 00 |@........a.../..|
000000a0 00 00 00 00 00 02 00 00 00 00 00 00 00 01 00 00 |................|
000000b0 00 00 00 00 00 00 00 00 00 06 2a 2a 2a 2a 2a 2a |..........******|
000000c0 00 00 00 00 00 00 00 02 |........ |
Case 3: Allocating a block that requires 1 byte less than the next block
>>> obj4_1 = Object(key=(oid(4), tid(1)), data='~~~~~',
... start_tid=tid(1), end_tid=None)
>>> fc.add(obj4_1)
True
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 61 00 00 00 |ZEC4........a...|
00000010 8d 00 00 00 00 00 00 00 03 00 00 00 00 00 00 00 |................|
00000020 01 00 00 00 00 00 00 00 00 00 00 00 64 40 40 40 |............d@@@|
00000030 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000040 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000050 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000060 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000070 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000080 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 |@@@@@@@@@@@@@@@@|
00000090 40 00 00 00 00 00 00 00 03 61 00 00 00 2e 00 00 |@........a......|
000000a0 00 00 00 00 00 04 00 00 00 00 00 00 00 01 00 00 |................|
000000b0 00 00 00 00 00 00 00 00 00 05 7e 7e 7e 7e 7e 00 |..........~~~~~.|
000000c0 00 00 00 00 00 00 04 31 |.......1 |
Case 4: Allocating a block that requires 2 bytes less than the next block
>>> obj4_1 = Object(key=(oid(5), tid(1)), data='^'*98,
... start_tid=tid(1), end_tid=None)
>>> fc.add(obj4_1)
True
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 61 00 00 00 |ZEC4........a...|
00000010 8b 00 00 00 00 00 00 00 05 00 00 00 00 00 00 00 |................|
00000020 01 00 00 00 00 00 00 00 00 00 00 00 62 5e 5e 5e |............b^^^|
00000030 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e |^^^^^^^^^^^^^^^^|
00000040 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e |^^^^^^^^^^^^^^^^|
00000050 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e |^^^^^^^^^^^^^^^^|
00000060 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e |^^^^^^^^^^^^^^^^|
00000070 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e |^^^^^^^^^^^^^^^^|
00000080 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 5e 00 |^^^^^^^^^^^^^^^.|
00000090 00 00 00 00 00 00 05 32 03 61 00 00 00 2e 00 00 |.......2.a......|
000000a0 00 00 00 00 00 04 00 00 00 00 00 00 00 01 00 00 |................|
000000b0 00 00 00 00 00 00 00 00 00 05 7e 7e 7e 7e 7e 00 |..........~~~~~.|
000000c0 00 00 00 00 00 00 04 31 |.......1 |
Case 5: Allocating a block that requires 3 bytes less than the next block
The end of the file is already a bit crowded and would create a rather complex
situation to work on. We create an entry with the size of 95 byte which will
be inserted at the beginning of the file, leaving a 3 byte free space after
it.
>>> obj4_1 = Object(key=(oid(6), tid(1)), data='+'*95,
... start_tid=tid(1), end_tid=None)
>>> fc.add(obj4_1)
True
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 61 00 00 00 |ZEC4........a...|
00000010 88 00 00 00 00 00 00 00 06 00 00 00 00 00 00 00 |................|
00000020 01 00 00 00 00 00 00 00 00 00 00 00 5f 2b 2b 2b |............_+++|
00000030 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b |++++++++++++++++|
00000040 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b |++++++++++++++++|
00000050 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b |++++++++++++++++|
00000060 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b |++++++++++++++++|
00000070 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b |++++++++++++++++|
00000080 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 2b 00 00 00 00 |++++++++++++....|
00000090 00 00 00 06 33 00 05 32 03 61 00 00 00 2e 00 00 |....3..2.a......|
000000a0 00 00 00 00 00 04 00 00 00 00 00 00 00 01 00 00 |................|
000000b0 00 00 00 00 00 00 00 00 00 05 7e 7e 7e 7e 7e 00 |..........~~~~~.|
000000c0 00 00 00 00 00 00 04 31 |.......1 |
Case 6: Allocating a block that requires 6 bytes less than the next block
As in our previous case, we'll write a block that only fits in the first
block's place to avoid dealing with the cluttering at the end of the cache
file.
>>> obj4_1 = Object(key=(oid(7), tid(1)), data='-'*89,
... start_tid=tid(1), end_tid=None)
>>> fc.add(obj4_1)
True
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 61 00 00 00 |ZEC4........a...|
00000010 82 00 00 00 00 00 00 00 07 00 00 00 00 00 00 00 |................|
00000020 01 00 00 00 00 00 00 00 00 00 00 00 59 2d 2d 2d |............Y---|
00000030 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d |----------------|
00000040 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d |----------------|
00000050 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d |----------------|
00000060 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d |----------------|
00000070 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d 2d |----------------|
00000080 2d 2d 2d 2d 2d 2d 00 00 00 00 00 00 00 07 36 00 |------........6.|
00000090 00 00 00 06 33 00 05 32 03 61 00 00 00 2e 00 00 |....3..2.a......|
000000a0 00 00 00 00 00 04 00 00 00 00 00 00 00 01 00 00 |................|
000000b0 00 00 00 00 00 00 00 00 00 05 7e 7e 7e 7e 7e 00 |..........~~~~~.|
000000c0 00 00 00 00 00 00 04 31 |.......1 |
Case 7: Allocating a block that requires >= 5 bytes less than the next block
Again, we replace the block at the beginning of the cache.
>>> obj4_1 = Object(key=(oid(8), tid(1)), data='='*80,
... start_tid=tid(1), end_tid=None)
>>> fc.add(obj4_1)
True
>>> hexprint(fc.f)
00000000 5a 45 43 34 00 00 00 00 00 00 00 00 61 00 00 00 |ZEC4........a...|
00000010 79 00 00 00 00 00 00 00 08 00 00 00 00 00 00 00 |y...............|
00000020 01 00 00 00 00 00 00 00 00 00 00 00 50 3d 3d 3d |............P===|
00000030 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d |================|
00000040 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d |================|
00000050 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d |================|
00000060 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d |================|
00000070 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 3d 00 00 00 |=============...|
00000080 00 00 00 00 08 66 00 00 00 00 00 00 00 09 36 00 |.....f........6.|
00000090 00 00 00 06 33 00 05 32 03 61 00 00 00 2e 00 00 |....3..2.a......|
000000a0 00 00 00 00 00 04 00 00 00 00 00 00 00 01 00 00 |................|
000000b0 00 00 00 00 00 00 00 00 00 05 7e 7e 7e 7e 7e 00 |..........~~~~~.|
000000c0 00 00 00 00 00 00 04 31 |.......1 |
Statistic functions
===================
The `getStats` method talks about the added objects, added bytes, evicted
objects, evicted bytes and accesses to the cache:
>>> fc.getStats()
(8, 901, 5, 593, 2)
We can reset the stats by calling the `clearStats` method:
>>> fc.clearStats()
>>> fc.getStats()
(0, 0, 0, 0, 0)
Small file cache sizes
======================
The file cache requires a few bytes at the beginning of the file for itself.
Therefore cache sizes smaller than this threshold do not create a file and
will cause the cache to be disabled.
>>> obj_small = Object(key=(oid(1), tid(1)), data='#',
... start_tid=tid(1), end_tid=None)
>>> sizes = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 53, 54]
>>> for i in sizes: # doctest: +ELLIPSIS
... print "*" * 20
... print "Cache file size", i
... try:
... fc = FileCache(maxsize=i, fpath=None, parent=cache_dummy)
... except Exception, v:
... print i, v
... continue
... print "Added", fc.add(obj_small)
... print "Length", len(fc)
... print "Content", list(fc)
... print "Statistics", fc.getStats()
********************
Cache file size 0
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 1
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 2
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 3
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 4
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 5
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 6
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 7
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 8
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 9
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 10
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 53
Added False
Length 0
Content []
Statistics (0, 0, 0, 0, 0)
********************
Cache file size 54
Added True
Length 1
Content [<ZEO.cache.Entry object at 0x...>]
Statistics (1, 42, 0, 0, 0)
Cleanup
=======
As the cache is non-persistent, its file will be gone from disk after closing
the cache:
>>> fc.f # doctest: +ELLIPSIS
<open file '<fdopen>', mode 'w+b' at 0x...>
>>> fc.close()
>>> fc.f
......@@ -13,17 +13,19 @@
##############################################################################
"""Basic unit tests for a client cache."""
from ZODB.utils import p64, repr_to_oid
from zope.testing import doctest
import os
import random
import tempfile
import unittest
import doctest
import string
import sys
import tempfile
import unittest
import ZEO.cache
from ZODB.utils import p64, repr_to_oid
import zope.testing.setupstack
import ZEO.cache
from ZODB.utils import p64, u64
n1 = p64(1)
n2 = p64(2)
......@@ -53,16 +55,6 @@ def hexprint(file):
offset += 16
class ClientCacheDummy(object):
def __init__(self):
self.objects = {}
def _evicted(self, o):
if o.key in self.objects:
del self.objects[o.key]
def oid(o):
repr = '%016x' % o
return repr_to_oid(repr)
......@@ -85,9 +77,10 @@ class CacheTests(unittest.TestCase):
self.assertEqual(self.cache.getLastTid(), None)
self.cache.setLastTid(n2)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(None, n1)
self.cache.invalidate(n1, n1)
self.cache.invalidate(n1, n1)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(None, n3)
self.cache.invalidate(n1, n3)
self.assertEqual(self.cache.getLastTid(), n3)
self.assertRaises(ValueError, self.cache.setLastTid, n2)
......@@ -121,6 +114,7 @@ class CacheTests(unittest.TestCase):
self.assertEqual(self.cache.loadBefore(n2, n4), None)
def testException(self):
self.cache.store(n1, n2, None, "data")
self.cache.store(n1, n2, None, "data")
self.assertRaises(ValueError,
self.cache.store,
......@@ -128,95 +122,24 @@ class CacheTests(unittest.TestCase):
def testEviction(self):
# Manually override the current maxsize
maxsize = self.cache.size = self.cache.fc.maxsize = 3295 # 1245
self.cache.fc = ZEO.cache.FileCache(3295, None, self.cache)
cache = ZEO.cache.ClientCache(None, 3295)
# Trivial test of eviction code. Doesn't test non-current
# eviction.
data = ["z" * i for i in range(100)]
for i in range(50):
n = p64(i)
self.cache.store(n, n, None, data[i])
self.assertEquals(len(self.cache), i + 1)
# The cache now uses 1225 bytes. The next insert
cache.store(n, n, None, data[i])
self.assertEquals(len(cache), i + 1)
# The cache now uses 3287 bytes. The next insert
# should delete some objects.
n = p64(50)
self.cache.store(n, n, None, data[51])
self.assert_(len(self.cache) < 51)
cache.store(n, n, None, data[51])
self.assert_(len(cache) < 51)
# TODO: Need to make sure eviction of non-current data
# are handled correctly.
def _run_fuzzing(self):
current_tid = 1
current_oid = 1
def log(*args):
#print args
pass
cache = self.fuzzy_cache
objects = self.fuzzy_cache_client.objects
for operation in xrange(10000):
op = random.choice(['add', 'access', 'remove', 'update', 'settid'])
if not objects:
op = 'add'
log(op)
if op == 'add':
current_oid += 1
key = (oid(current_oid), tid(current_tid))
object = ZEO.cache.Object(
key=key, data='*'*random.randint(1,60*1024),
start_tid=tid(current_tid), end_tid=None)
assert key not in objects
log(key, len(object.data), current_tid)
cache.add(object)
if (object.size + ZEO.cache.OBJECT_HEADER_SIZE >
cache.maxsize - ZEO.cache.ZEC4_HEADER_SIZE):
assert key not in cache
else:
objects[key] = object
assert key in cache, key
elif op == 'access':
key = random.choice(objects.keys())
log(key)
object = objects[key]
found = cache.access(key)
assert object.data == found.data
assert object.key == found.key
assert object.size == found.size == (len(object.data)+object.TOTAL_FIXED_SIZE)
elif op == 'remove':
key = random.choice(objects.keys())
log(key)
cache.remove(key)
assert key not in cache
assert key not in objects
elif op == 'update':
key = random.choice(objects.keys())
object = objects[key]
log(key, object.key)
if not object.end_tid:
object.end_tid = tid(current_tid)
log(key, current_tid)
cache.update(object)
elif op == 'settid':
current_tid += 1
log(current_tid)
cache.settid(tid(current_tid))
cache.close()
def testFuzzing(self):
random.seed()
seed = random.randint(0, sys.maxint)
random.seed(seed)
self.fuzzy_cache_client = ClientCacheDummy()
self.fuzzy_cache = ZEO.cache.FileCache(
random.randint(100, 50*1024), None, self.fuzzy_cache_client)
try:
self._run_fuzzing()
except:
print "Error in fuzzing with seed", seed
hexprint(self.fuzzy_cache.f)
raise
def testSerialization(self):
self.cache.store(n1, n2, None, "data for n1")
self.cache.store(n3, n3, n4, "non-current data for n3")
......@@ -226,9 +149,9 @@ class CacheTests(unittest.TestCase):
# Copy data from self.cache into path, reaching into the cache
# guts to make the copy.
dst = open(path, "wb+")
src = self.cache.fc.f
src = self.cache.f
src.seek(0)
dst.write(src.read(self.cache.fc.maxsize))
dst.write(src.read(self.cache.maxsize))
dst.close()
copy = ZEO.cache.ClientCache(path)
copy.open()
......@@ -238,8 +161,10 @@ class CacheTests(unittest.TestCase):
eq = self.assertEqual
eq(copy.getLastTid(), self.cache.getLastTid())
eq(len(copy), len(self.cache))
eq(copy.current, self.cache.current)
eq(copy.noncurrent, self.cache.noncurrent)
eq(dict(copy.current), dict(self.cache.current))
eq(dict([(k, dict(v)) for (k, v) in copy.noncurrent.items()]),
dict([(k, dict(v)) for (k, v) in self.cache.noncurrent.items()]),
)
def testCurrentObjectLargerThanCache(self):
if self.cache.path:
......@@ -260,20 +185,98 @@ class CacheTests(unittest.TestCase):
def testOldObjectLargerThanCache(self):
if self.cache.path:
os.remove(self.cache.path)
self.cache = ZEO.cache.ClientCache(size=50)
self.cache.open()
cache = ZEO.cache.ClientCache(size=50)
cache.open()
# We store an object that is a bit larger than the cache can handle.
self.cache.store(n1, n2, n3, "x"*64)
cache.store(n1, n2, n3, "x"*64)
# We can see that it was not stored.
self.assertEquals(None, self.cache.load(n1))
self.assertEquals(None, cache.load(n1))
# If an object cannot be stored in the cache, it must not be
# recorded as non-current.
self.assert_((n2, n3) not in self.cache.noncurrent[n1])
self.assert_(1 not in cache.noncurrent)
__test__ = dict(
kill_does_not_cause_cache_corruption =
r"""
If we kill a process while a cache is being written to, the cache
isn't corrupted. To see this, we'll write a little script that
writes records to a cache file repeatedly.
>>> import os, random, sys, time
>>> open('t', 'w').write('''
... import os, random, sys, thread, time
... sys.path = %r
...
... def suicide():
... time.sleep(random.random()/10)
... os._exit(0)
...
... import ZEO.cache
... from ZODB.utils import p64
... cache = ZEO.cache.ClientCache('cache')
... oid = 0
... t = 0
... thread.start_new_thread(suicide, ())
... while 1:
... oid += 1
... t += 1
... data = 'X' * random.randint(5000,25000)
... cache.store(p64(oid), p64(t), None, data)
...
... ''' % sys.path)
>>> for i in range(10):
... _ = os.spawnl(os.P_WAIT, sys.executable, sys.executable, 't')
... if os.path.exists('cache'):
... cache = ZEO.cache.ClientCache('cache')
... cache.open()
... cache.close()
... os.remove('cache')
... os.remove('cache.lock')
""",
full_cache_is_valid =
r"""
If we fill up the cache without any free space, the cache can
still be used.
>>> import ZEO.cache
>>> cache = ZEO.cache.ClientCache('cache', 1000)
>>> data = 'X' * (1000 - ZEO.cache.ZEC_HEADER_SIZE - 41)
>>> cache.store(p64(1), p64(1), None, data)
>>> cache.close()
>>> cache = ZEO.cache.ClientCache('cache', 1000)
>>> cache.open()
>>> cache.store(p64(2), p64(2), None, 'XXX')
>>> cache.close()
""",
cannot_open_same_cache_file_twice =
r"""
>>> import ZEO.cache
>>> cache = ZEO.cache.ClientCache('cache', 1000)
>>> cache2 = ZEO.cache.ClientCache('cache', 1000)
Traceback (most recent call last):
...
LockError: Couldn't lock 'cache.lock'
>>> cache.close()
""",
)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(CacheTests))
suite.addTest(doctest.DocFileSuite('filecache.txt'))
suite.addTest(
doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown,
)
)
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment