Commit 86e7e1c5 authored by Barry Warsaw's avatar Barry Warsaw

Ported to pybsddb 4.1 (experimental), these changes are actually the

right thing to do anyway, and should work just find for earlier
versions of the wrapper.  Specifically,

_doabort(), _docommit(), _mark(), _sweep(), _collect_objs():
Transactionally protect .truncate() and .consume()

_docommit(): Move the setting of the _pending flag to here from
_finish() so we can transactionally protect it more conveniently.
This does't change the semantics and the recovery code in _setupDBs()
doesn't care since the flag will already be set to COMMIT.

_begin(): Transactionally protect the setting of the _pending flag to
ABORT.
parent 4131ccdf
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.19 $'[-2:][0] __version__ = '$Revision: 1.20 $'[-2:][0]
import time import time
import threading import threading
...@@ -161,12 +161,13 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -161,12 +161,13 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
if cs: cs.close() if cs: cs.close()
# We're done with these tables # We're done with these tables
self._oids.truncate(txn) self._oids.truncate(txn)
self._pending.truncate() self._pending.truncate(txn)
def _abort(self): def _abort(self):
self._withtxn(self._doabort, self._serial) self._withtxn(self._doabort, self._serial)
def _docommit(self, txn, tid): def _docommit(self, txn, tid):
self._pending.put(self._serial, COMMIT, txn)
deltas = {} deltas = {}
co = cs = None co = cs = None
try: try:
...@@ -205,7 +206,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -205,7 +206,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
if cs: cs.close() if cs: cs.close()
# We're done with this table # We're done with this table
self._oids.truncate(txn) self._oids.truncate(txn)
self._pending.truncate() self._pending.truncate(txn)
# Now, to finish up, we need apply the refcount deltas to the # Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0 # refcounts table, and do recursive collection of all refcount == 0
# objects. # objects.
...@@ -241,7 +242,14 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -241,7 +242,14 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
# When a transaction begins, we set the pending flag to ABORT, # When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes # meaning, if we crash between now and the time we vote, all changes
# will be aborted. # will be aborted.
self._pending[self._serial] = ABORT txn = self._env.txn_begin()
try:
self._pending.put(self._serial, ABORT, txn)
except:
txn.abort()
raise
else:
txn.commit()
def _dostore(self, txn, oid, serial, data): def _dostore(self, txn, oid, serial, data):
conflictresolved = False conflictresolved = False
...@@ -280,10 +288,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -280,10 +288,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._lock_release() self._lock_release()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
# Twiddle the pending flag to COMMIT now since after the vote call, we # _docommit() twiddles the pending flag to COMMIT now since after the
# promise that the changes will be committed, no matter what. The # vote call, we promise that the changes will be committed, no matter
# recovery process will check this. # what. The recovery process will check this.
self._pending[self._serial] = COMMIT
self._withtxn(self._docommit, self._serial) self._withtxn(self._docommit, self._serial)
# #
...@@ -436,7 +443,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -436,7 +443,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
for oid in refdoids: for oid in refdoids:
self._oidqueue.append(oid, txn) self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again # Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume() rec = self._oidqueue.consume(txn)
oid = rec and rec[1] oid = rec and rec[1]
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
...@@ -457,10 +464,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -457,10 +464,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
c.close() c.close()
# We're done with the mark table # We're done with the mark table
self._packmark.truncate(txn=txn) self._packmark.truncate(txn)
def _collect_objs(self, txn): def _collect_objs(self, txn):
orec = self._oidqueue.consume() orec = self._oidqueue.consume(txn)
while orec: while orec:
if self._stop: if self._stop:
raise PackStop, 'stopped in _collect_objs()' raise PackStop, 'stopped in _collect_objs()'
...@@ -511,7 +518,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -511,7 +518,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
c.close() c.close()
# We really do want this down here, since _decrefPickle() could # We really do want this down here, since _decrefPickle() could
# add more items to the queue. # add more items to the queue.
orec = self._oidqueue.consume() orec = self._oidqueue.consume(txn)
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
# #
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.19 $'[-2:][0] __version__ = '$Revision: 1.20 $'[-2:][0]
import time import time
import threading import threading
...@@ -161,12 +161,13 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -161,12 +161,13 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
if cs: cs.close() if cs: cs.close()
# We're done with these tables # We're done with these tables
self._oids.truncate(txn) self._oids.truncate(txn)
self._pending.truncate() self._pending.truncate(txn)
def _abort(self): def _abort(self):
self._withtxn(self._doabort, self._serial) self._withtxn(self._doabort, self._serial)
def _docommit(self, txn, tid): def _docommit(self, txn, tid):
self._pending.put(self._serial, COMMIT, txn)
deltas = {} deltas = {}
co = cs = None co = cs = None
try: try:
...@@ -205,7 +206,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -205,7 +206,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
if cs: cs.close() if cs: cs.close()
# We're done with this table # We're done with this table
self._oids.truncate(txn) self._oids.truncate(txn)
self._pending.truncate() self._pending.truncate(txn)
# Now, to finish up, we need apply the refcount deltas to the # Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0 # refcounts table, and do recursive collection of all refcount == 0
# objects. # objects.
...@@ -241,7 +242,14 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -241,7 +242,14 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
# When a transaction begins, we set the pending flag to ABORT, # When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes # meaning, if we crash between now and the time we vote, all changes
# will be aborted. # will be aborted.
self._pending[self._serial] = ABORT txn = self._env.txn_begin()
try:
self._pending.put(self._serial, ABORT, txn)
except:
txn.abort()
raise
else:
txn.commit()
def _dostore(self, txn, oid, serial, data): def _dostore(self, txn, oid, serial, data):
conflictresolved = False conflictresolved = False
...@@ -280,10 +288,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -280,10 +288,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._lock_release() self._lock_release()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
# Twiddle the pending flag to COMMIT now since after the vote call, we # _docommit() twiddles the pending flag to COMMIT now since after the
# promise that the changes will be committed, no matter what. The # vote call, we promise that the changes will be committed, no matter
# recovery process will check this. # what. The recovery process will check this.
self._pending[self._serial] = COMMIT
self._withtxn(self._docommit, self._serial) self._withtxn(self._docommit, self._serial)
# #
...@@ -436,7 +443,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -436,7 +443,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
for oid in refdoids: for oid in refdoids:
self._oidqueue.append(oid, txn) self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again # Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume() rec = self._oidqueue.consume(txn)
oid = rec and rec[1] oid = rec and rec[1]
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
...@@ -457,10 +464,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -457,10 +464,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
c.close() c.close()
# We're done with the mark table # We're done with the mark table
self._packmark.truncate(txn=txn) self._packmark.truncate(txn)
def _collect_objs(self, txn): def _collect_objs(self, txn):
orec = self._oidqueue.consume() orec = self._oidqueue.consume(txn)
while orec: while orec:
if self._stop: if self._stop:
raise PackStop, 'stopped in _collect_objs()' raise PackStop, 'stopped in _collect_objs()'
...@@ -511,7 +518,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -511,7 +518,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
c.close() c.close()
# We really do want this down here, since _decrefPickle() could # We really do want this down here, since _decrefPickle() could
# add more items to the queue. # add more items to the queue.
orec = self._oidqueue.consume() orec = self._oidqueue.consume(txn)
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
# #
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment