Commit 866320f5 authored by Jim Fulton's avatar Jim Fulton

Added and tested Packless storage, which paves the way to

new GC strategy.

Meaningless checkpoint of other files.
parent 1f8ea369
This diff is collapsed.
......@@ -5,13 +5,16 @@ from struct import pack, unpack
class Minimal(Base):
def _setupDbs(self):
self._index=self._setupDB('mini')
# Supports Base framework
self._index=self._setupDB('current')
self._setupDB('pickle')
def load(self, oid, version):
self._lock_acquire()
try:
p=self._index[oid]
return p[8:], p[:8] # pickle, serial
s=self._index[oid]
p=self._pickle[oid]
return p, s # pickle, serial
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction):
......@@ -24,8 +27,7 @@ class Minimal(Base):
self._lock_acquire()
try:
if self._index.has_key(oid):
old=self._index[oid]
oserial=old[:8]
oserial=self._index[oid]
if serial != oserial: raise POSException.ConflictError
serial=self._serial
......@@ -35,14 +37,11 @@ class Minimal(Base):
return serial
def tpc_vote(self, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
def _finish(self, tid, u, d, e):
txn = self._env.txn_begin()
try:
txn = self._txn = self._env.txn_begin()
put=self._index.put
serial_put=self._index.put
pickle_put=self._pickle.put
serial=self._serial
tmp=self._tmp
s=tmp.tell()
......@@ -55,20 +54,25 @@ class Minimal(Base):
l=l+ldata+12
if ldata > s:
raise 'Temporary file corrupted'
put(oid, serial+data, txn)
serial_put(oid, serial, txn)
pickle_put(oid, data, txn)
tmp.seek(0)
if s > 999999: tmp.truncate()
finally: self._lock_release()
except:
txn.abort()
raise
else:
txn.commit()
def pack(self, t, referencesf):
self._lock_acquire()
try:
try:
# Build an index of *only* those objects reachable
# from the root.
index=self._index
index=self._pickle
rootl=['\0\0\0\0\0\0\0\0']
pop=rootl.pop
pindex={}
......@@ -78,9 +82,8 @@ class Minimal(Base):
if referenced(oid): continue
# Scan non-version pickle for references
r=index[oid]
pindex[oid]=r
p=r[8:]
p=index[oid]
pindex[oid]=1
referencesf(p, rootl)
# Now delete any unreferenced entries:
......@@ -88,3 +91,12 @@ class Minimal(Base):
if not referenced(oid): del index[oid]
finally: self._lock_release()
This diff is collapsed.
......@@ -5,13 +5,16 @@ from struct import pack, unpack
class Minimal(Base):
def _setupDbs(self):
self._index=self._setupDB('mini')
# Supports Base framework
self._index=self._setupDB('current')
self._setupDB('pickle')
def load(self, oid, version):
self._lock_acquire()
try:
p=self._index[oid]
return p[8:], p[:8] # pickle, serial
s=self._index[oid]
p=self._pickle[oid]
return p, s # pickle, serial
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction):
......@@ -24,8 +27,7 @@ class Minimal(Base):
self._lock_acquire()
try:
if self._index.has_key(oid):
old=self._index[oid]
oserial=old[:8]
oserial=self._index[oid]
if serial != oserial: raise POSException.ConflictError
serial=self._serial
......@@ -35,14 +37,11 @@ class Minimal(Base):
return serial
def tpc_vote(self, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
def _finish(self, tid, u, d, e):
txn = self._env.txn_begin()
try:
txn = self._txn = self._env.txn_begin()
put=self._index.put
serial_put=self._index.put
pickle_put=self._pickle.put
serial=self._serial
tmp=self._tmp
s=tmp.tell()
......@@ -55,20 +54,25 @@ class Minimal(Base):
l=l+ldata+12
if ldata > s:
raise 'Temporary file corrupted'
put(oid, serial+data, txn)
serial_put(oid, serial, txn)
pickle_put(oid, data, txn)
tmp.seek(0)
if s > 999999: tmp.truncate()
finally: self._lock_release()
except:
txn.abort()
raise
else:
txn.commit()
def pack(self, t, referencesf):
self._lock_acquire()
try:
try:
# Build an index of *only* those objects reachable
# from the root.
index=self._index
index=self._pickle
rootl=['\0\0\0\0\0\0\0\0']
pop=rootl.pop
pindex={}
......@@ -78,9 +82,8 @@ class Minimal(Base):
if referenced(oid): continue
# Scan non-version pickle for references
r=index[oid]
pindex[oid]=r
p=r[8:]
p=index[oid]
pindex[oid]=1
referencesf(p, rootl)
# Now delete any unreferenced entries:
......@@ -88,3 +91,12 @@ class Minimal(Base):
if not referenced(oid): del index[oid]
finally: self._lock_release()
from base import Base
from bsddb3 import db
from struct import pack, unpack
class Minimal(Base):
def _setupDbs(self):
# Supports Base framework
self._index=self._setupDB('current')
self._setupDB('pickle')
def load(self, oid, version):
self._lock_acquire()
try:
s=self._index[oid]
p=self._pickle[oid]
return p, s # pickle, serial
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
if version:
raise POSException.Unsupported, "Versions aren't supported"
self._lock_acquire()
try:
if self._index.has_key(oid):
oserial=self._index[oid]
if serial != oserial: raise POSException.ConflictError
serial=self._serial
self._tmp.write(oid+pack(">I", len(data)))
self._tmp.write(data)
finally: self._lock_release()
return serial
def _finish(self, tid, u, d, e):
txn = self._env.txn_begin()
try:
serial_put=self._index.put
pickle_put=self._pickle.put
serial=self._serial
tmp=self._tmp
s=tmp.tell()
tmp.seek(0)
read=tmp.read
l=0
while l < s:
oid, ldata = unpack(">8sI", read(12))
data=read(ldata)
l=l+ldata+12
if ldata > s:
raise 'Temporary file corrupted'
serial_put(oid, serial, txn)
pickle_put(oid, data, txn)
tmp.seek(0)
if s > 999999: tmp.truncate()
except:
txn.abort()
raise
else:
txn.commit()
def pack(self, t, referencesf):
self._lock_acquire()
try:
# Build an index of *only* those objects reachable
# from the root.
index=self._pickle
rootl=['\0\0\0\0\0\0\0\0']
pop=rootl.pop
pindex={}
referenced=pindex.has_key
while rootl:
oid=pop()
if referenced(oid): continue
# Scan non-version pickle for references
p=index[oid]
pindex[oid]=1
referencesf(p, rootl)
# Now delete any unreferenced entries:
for oid in index.keys():
if not referenced(oid): del index[oid]
finally: self._lock_release()
from base import Base
from bsddb3 import db
from struct import pack, unpack
from ZODB.referencesf import referencesf
class Packless(Base):
def _setupDbs(self):
# Supports Base framework
self._index=self._setupDB('current')
self._setupDB('referenceCount')
self._setupDB('oreferences', flags=db.DB_DUP)
self._setupDB('opickle')
def _dbnames(self):
return 'current', 'referenceCount', 'oreferences', 'opickle'
def load(self, oid, version):
self._lock_acquire()
try:
s=self._index[oid]
p=self._opickle[oid]
return p, s # pickle, serial
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
if version:
raise POSException.Unsupported, "Versions aren't supported"
self._lock_acquire()
try:
if self._index.has_key(oid):
oserial=self._index[oid]
if serial != oserial: raise POSException.ConflictError
serial=self._serial
self._tmp.write(oid+pack(">I", len(data)))
self._tmp.write(data)
finally: self._lock_release()
return serial
def _finish(self, tid, u, d, e):
txn = self._env.txn_begin()
try:
zeros={}
referenceCount=self._referenceCount
referenceCount_get=referenceCount.get
referenceCount_put=referenceCount.put
oreferences=self._oreferences
oreferences_put=oreferences.put
serial_put=self._index.put
opickle_put=self._opickle.put
serial=self._serial
tmp=self._tmp
s=tmp.tell()
tmp.seek(0)
read=tmp.read
l=0
while l < s:
oid, ldata = unpack(">8sI", read(12))
data=read(ldata)
# get references
referencesl=[]
referencesf(data, referencesl)
references={}
for roid in referencesl: references[roid]=1
referenced=references.has_key
# Create refcnt
if not referenceCount_get(oid, txn):
referenceCount_put(oid, '\0\0\0\0', txn)
zeros[oid]=1
# update stored references
c=oreferences.cursor(txn)
try:
try: roid = c.set(oid)
except:
pass
else:
while roid:
roid=roid[1]
if referenced(roid):
# still referenced, so no need to update
del references[roid]
else:
# Delete the stored ref, since we no longer
# have it
c.delete()
# decrement refcnt:
rc=unpack(">i",
referenceCount_get(roid, txn))[0]
rc=rc-1
if rc < 0:
raise "Bad reference count, %s" % (rc+1)
referenceCount_put(roid, pack(">i", rc), txn)
if rc==0: zeros[roid]=1
roid=c.get(db.DB_NEXT_DUP)
finally: c.close()
# Now add any references that weren't already stored:
for roid in references.keys():
oreferences_put(oid, roid, txn)
# Create/update refcnt
rc=referenceCount_get(roid, txn)
if rc:
rc=unpack(">i", rc)[0]
if rc=='\0\0\0\0': del zeros[roid]
referenceCount_put(roid, pack(">i", rc+1), txn)
else:
referenceCount_put(roid, '\0\0\0\1', txn)
l=l+ldata+12
if ldata > s:
raise 'Temporary file corrupted'
serial_put(oid, serial, txn)
opickle_put(oid, data, txn)
if zeros:
for oid in zeros.keys():
if oid == '\0\0\0\0\0\0\0\0': continue
self._takeOutGarbage(oid, txn)
tmp.seek(0)
if s > 999999: tmp.truncate()
except:
txn.abort()
raise
else:
txn.commit()
def _takeOutGarbage(self, oid, txn):
# take out the garbage.
referenceCount=self._referenceCount
referenceCount.delete(oid, txn)
self._opickle.delete(oid, txn)
self._current.delete(oid, txn)
# Remove/decref references
referenceCount_get=referenceCount.get
referenceCount_put=referenceCount.put
c=self._oreferences.cursor(txn)
try:
try: roid = c.set(oid)
except:
pass
else:
while roid:
c.delete()
roid=roid[1]
# decrement refcnt:
rc=referenceCount_get(roid, txn)
if rc:
rc=unpack(">i", rc)[0]-1
if rc < 0:
raise "Bad reference count, %s" % (rc+1)
if rc==0: self._takeOutGarbage(roid, txn)
else: referenceCount_put(roid, pack(">i", rc), txn)
roid=c.get(db.DB_NEXT_DUP)
finally: c.close()
if self._len > 0: self._len=self._len-1
def pack(self, t, referencesf):
self._lock_acquire()
try:
pass
# TBD
finally: self._lock_release()
......@@ -19,21 +19,38 @@ class Base(BaseStorage):
self._init_oid()
def _setupDB(self, name, flags=0):
"""Open an individual database and assign to an "_" attribute.
"""
d=db.Db(self._env)
if flags: db.set_flags(flags)
if flags: d.set_flags(flags)
d.open(self._prefix+name, db.DB_BTREE, db.DB_CREATE)
setattr(self, '_'+name, d)
return d
def _setupDbs(self):
"""Set up the storages databases, typically using '_setupDB'.
"""
def _init_oid(self):
c=self._index.cursor()
v=c.get(db.DB_LAST)
if v: self._oid=v[0]
else: self._oid='\0\0\0\0\0\0\0\0'
_len=-1
def __len__(self):
# TBD
return 0
l=self._len
if l < 0:
l=self._len=len(self._index)
return l
def new_oid(self, last=None):
# increment the cached length:
l=self._len
if l >= 0: self._len=l+1
return BaseStorage.new_oid(self, last)
def getSize(self):
# TBD
......@@ -49,13 +66,20 @@ class Base(BaseStorage):
self._tmp.seek(0)
def close(self):
for name in self._dbnames:
"""Close the storage
by closing the databases it uses and closing it's environment.
"""
for name in self._dbnames():
getattr(self, '_'+name).close()
delattr(self, '_'+name)
self._env.close()
del self._env
def _dbnames(self):
"""Return a list of the names of the databases used by the storage.
"""
return ("index",)
def envFromString(name):
if not os.path.exists(name): os.mkdir(name)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment