Commit e4c8cfe4 authored by Jim Fulton's avatar Jim Fulton

Reimplemented MappingStorage to be more full featured, with a cleaner

and more instructive implementation.
parent 2268f9d3
##############################################################################
#
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
# Copyright (c) Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
......@@ -11,135 +11,346 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Very Simple Mapping ZODB storage
"""A simple in-memory mapping-based ZODB storage
The Mapping storage provides an extremely simple storage implementation that
doesn't provide undo or version support.
This storage provides an example implementation of a fairly full
storage without distracting storage details.
"""
It is meant to illustrate the simplest possible storage.
import BTrees
import cPickle
import time
import threading
import ZODB.interfaces
import ZODB.POSException
import ZODB.TimeStamp
import ZODB.utils
import zope.interface
The Mapping storage uses a single data structure to map object ids to data.
"""
class MappingStorage:
zope.interface.implements(
ZODB.interfaces.IStorage,
ZODB.interfaces.IStorageIteration,
)
import ZODB.BaseStorage
from ZODB.utils import u64, z64
from ZODB import POSException
from persistent.TimeStamp import TimeStamp
def __init__(self, name='MappingStorage'):
self.__name__ = name
self._data = {} # {oid->{tid->pickle}}
self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
self._ltid = None
self._last_pack = None
_lock = threading.RLock()
self._lock_acquire = _lock.acquire
self._lock_release = _lock.release
self._commit_lock = threading.Lock()
self._opened = True
self._transaction = None
self._oid = 0
######################################################################
# Preconditions:
def opened(self):
"""The storage is open
"""
return self._opened
class MappingStorage(ZODB.BaseStorage.BaseStorage):
def not_in_transaction(self):
"""The storage is not committing a transaction
"""
return self._transaction is None
def __init__(self, name='Mapping Storage'):
ZODB.BaseStorage.BaseStorage.__init__(self, name)
# ._index maps an oid to a string s. s[:8] is the tid of the
# transaction that created oid's current state, and s[8:] is oid's
# current state.
self._index = {}
self._clear_temp()
self._ltid = None
# Note: If you subclass this and use a persistent mapping facility
# (e.g. a dbm file), you will need to get the maximum key and save it
# as self._oid. See dbmStorage.
#
######################################################################
def __len__(self):
return len(self._index)
# testing framework (lame)
def cleanup(self):
pass
# ZODB.interfaces.IStorage
@ZODB.utils.locked
def close(self):
self._opened = False
# ZODB.interfaces.IStorage
def getName(self):
return self.__name__
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def getSize(self):
self._lock_acquire()
try:
# These constants are for Python object memory overheads. Heh.
s = 32
for p in self._index.itervalues():
s += 56 + len(p)
return s
finally:
self._lock_release()
def load(self, oid, version):
self._lock_acquire()
try:
size = 0
for oid, tid_data in self._data.items():
size += 50
for tid, pickle in tid_data.items():
size += 100+len(pickle)
return size
# ZEO.interfaces.IServeable
@ZODB.utils.locked(opened)
def getTid(self, oid):
tid_data = self._data.get(oid)
if tid_data:
return tid_data.maxKey()
raise ZODB.POSException.POSKeyError(oid)
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def history(self, oid, size=1):
tid_data = self._data.get(oid)
if not tid_data:
raise ZODB.POSException.POSKeyError(oid)
tids = tid_data.keys()[-size:]
tids.reverse()
return [
dict(
time = ZODB.TimeStamp.TimeStamp(tid),
tid = tid,
serial = tid,
user_name = self._transactions[tid].user,
description = self._transactions[tid].description,
extension = self._transactions[tid].extension,
size = len(tid_data[tid])
)
for tid in tids]
# ZODB.interfaces.IStorage
def isReadOnly(self):
return False
# ZODB.interfaces.IStorageIteration
def iterator(self, start=None, end=None):
for transaction_record in self._transactions.values(start, end):
yield transaction_record
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def lastTransaction(self):
if self._ltid is not None:
return self._ltid
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def __len__(self):
return len(self._data)
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def load(self, oid, version=''):
assert not version, "Versions are not supported"
tid_data = self._data.get(oid)
if tid_data:
tid = tid_data.maxKey()
return tid_data[tid], tid
raise ZODB.POSException.POSKeyError(oid)
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def loadBefore(self, oid, tid):
tid_data = self._data.get(oid)
if tid_data:
before = ZODB.utils.u64(tid)
if not before:
return None
before = ZODB.utils.p64(before-1)
tids_before = tid_data.keys(None, before)
if tids_before:
tids_after = tid_data.keys(tid, None)
tid = tids_before[-1]
return (tid_data[tid], tid,
(tids_after and tids_after[0] or None)
)
else:
raise ZODB.POSException.POSKeyError(oid)
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def loadSerial(self, oid, serial):
tid_data = self._data.get(oid)
if tid_data:
try:
p = self._index[oid]
return p[8:], p[:8] # pickle, serial
return tid_data[serial]
except KeyError:
raise POSException.POSKeyError(oid)
finally:
self._lock_release()
pass
def getTid(self, oid):
self._lock_acquire()
try:
# The tid is the first 8 bytes of the buffer.
return self._index[oid][:8]
finally:
self._lock_release()
raise ZODB.POSException.POSBeforeKeyError(oid)
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def new_oid(self):
self._oid += 1
return ZODB.utils.p64(self._oid)
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def pack(self, t, referencesf, gc=True):
if not self._data:
return
stop = `ZODB.TimeStamp.TimeStamp(*time.gmtime(t)[:5]+(t%60,))`
if self._last_pack is not None and self._last_pack >= stop:
if self._last_pack == stop:
return
raise ValueError("Already packed to a later time")
self._last_pack = stop
transactions = self._transactions
# Step 1, remove old non-current records
for oid, tid_data in self._data.items():
tids_to_remove = tid_data.keys(None, stop)
if tids_to_remove:
tids_to_remove.pop() # Keep the last, if any
if tids_to_remove:
for tid in tids_to_remove:
del tid_data[tid]
if transactions[tid].pack(oid):
del transactions[tid]
if gc:
# Step 2, GC. A simple sweep+copy
new_data = BTrees.OOBTree.OOBTree()
to_copy = set([ZODB.utils.z64])
while to_copy:
oid = to_copy.pop()
tid_data = self._data.pop(oid)
new_data[oid] = tid_data
for pickle in tid_data.values():
for oid in referencesf(pickle):
if oid in new_data:
continue
to_copy.add(oid)
# Remove left over data from transactions
for oid, tid_data in self._data.items():
for tid in tid_data:
if transactions[tid].pack(oid):
del transactions[tid]
self._data = new_data
# ZODB.interfaces.IStorage
def registerDB(self, db):
pass
# ZODB.interfaces.IStorage
def sortKey(self):
return self.__name__
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def store(self, oid, serial, data, version, transaction):
assert not version, "Versions are not supported"
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
raise ZODB.POSException.StorageTransactionError(self, transaction)
if version:
raise POSException.Unsupported("Versions aren't supported")
old_tid = None
tid_data = self._data.get(oid)
if tid_data:
old_tid = tid_data.maxKey()
if serial != old_tid:
raise ZODB.POSException.ConflictError(
oid=oid, serials=(old_tid, serial), data=data)
self._tdata[oid] = data
self._lock_acquire()
try:
if oid in self._index:
oserial = self._index[oid][:8]
if serial != oserial:
raise POSException.ConflictError(oid=oid,
serials=(oserial, serial),
data=data)
self._tindex[oid] = self._tid + data
finally:
self._lock_release()
return self._tid
def _clear_temp(self):
# store() saves data in _tindex; if the transaction completes
# successfully, _finish() merges _tindex into _index.
self._tindex = {}
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def tpc_abort(self, transaction):
if transaction is not self._transaction:
return
self._transaction = None
self._commit_lock.release()
def _finish(self, tid, user, desc, ext):
self._index.update(self._tindex)
self._ltid = self._tid
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def tpc_begin(self, transaction, tid=None):
# The tid argument exists to support testing.
if transaction is self._transaction:
return
self._lock_release()
self._commit_lock.acquire()
self._lock_acquire()
self._transaction = transaction
self._tdata = {}
if tid is None:
tid = ZODB.utils.newTid(self._ltid)
self._tid = tid
def lastTransaction(self):
return self._ltid
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def tpc_finish(self, transaction, func = lambda tid: None):
if (transaction is not self._transaction) or not self._tdata:
return
def pack(self, t, referencesf):
self._lock_acquire()
try:
if not self._index:
return
# Build an index of *only* those objects reachable from the root.
rootl = [z64]
pindex = {}
while rootl:
oid = rootl.pop()
if oid not in pindex:
# Scan non-version pickle for references.
r = self._index[oid]
pindex[oid] = r
referencesf(r[8:], rootl)
self._index = pindex
finally:
self._lock_release()
def _splat(self):
"""Spit out a string showing state."""
o = ['Index:']
keys = self._index.keys()
keys.sort()
for oid in keys:
r = self._index[oid]
o.append(' %s: %s, %s' %
(u64(oid), TimeStamp(r[:8]), repr(r[8:])))
return '\n'.join(o)
tid = self._tid
func(tid)
def cleanup(self):
pass
tdata = self._tdata
for oid in tdata:
tid_data = self._data.get(oid)
if tid_data is None:
tid_data = BTrees.OOBTree.OOBucket()
self._data[oid] = tid_data
tid_data[tid] = tdata[oid]
def close(self):
self._ltid = tid
self._transactions[tid] = TransactionRecord(tid, transaction, tdata)
self._transaction = None
self._commit_lock.release()
# ZEO.interfaces.IServeable
@ZODB.utils.locked(opened)
def tpc_transaction(self):
return self._transaction
# ZODB.interfaces.IStorage
def tpc_vote(self, transaction):
pass
class TransactionRecord:
status = ' '
def __init__(self, tid, transaction, data):
self.tid = tid
self.user = transaction.user
self.description = transaction.description
extension = transaction._extension
self.extension = extension
self.data = data
_extension = property(lambda self: self._extension,
lambda self, v: setattr(self, '_extension', v),
)
def __iter__(self):
for oid, data in self.data.items():
yield DataRecord(oid, self.tid, data, None)
def pack(self, oid):
self.status = 'p'
del self.data[oid]
return not self.data
class DataRecord(object):
"""Abstract base class for iterator protocol"""
zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
version = ''
def __init__(self, oid, tid, data, prev):
self.oid = oid
self.tid = tid
self.data = data
self.data_txn = prev
def DB(*args, **kw):
return ZODB.DB(MappingStorage(), *args, **kw)
......@@ -14,16 +14,30 @@
import ZODB.MappingStorage
import unittest
from ZODB.tests import StorageTestBase
from ZODB.tests import BasicStorage, MTStorage, Synchronization
from ZODB.tests import PackableStorage
class MappingStorageTests(StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
MTStorage.MTStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
):
from ZODB.tests import (
BasicStorage,
HistoryStorage,
IteratorStorage,
MTStorage,
PackableStorage,
RevisionStorage,
StorageTestBase,
Synchronization,
)
class MappingStorageTests(
StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
HistoryStorage.HistoryStorage,
IteratorStorage.ExtendedIteratorStorage,
IteratorStorage.IteratorStorage,
MTStorage.MTStorage,
PackableStorage.PackableStorage,
RevisionStorage.RevisionStorage,
Synchronization.SynchronizedStorage,
):
def setUp(self):
self._storage = ZODB.MappingStorage.MappingStorage()
......@@ -36,7 +50,10 @@ class MappingStorageTests(StorageTestBase.StorageTestBase,
# doesnt support huge transaction metadata. This storage doesnt
# have this limit, so we inhibit this test here.
pass
def checkLoadBeforeUndo(self):
pass # we don't support undo yet
checkUndoZombie = checkLoadBeforeUndo
def test_suite():
suite = unittest.makeSuite(MappingStorageTests, 'check')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment