Commit 82daec9f authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #72 from zopefoundation/clean-up-locking

Modernize and cleanup locking
parents f9f3ddae 0ed7579d
......@@ -41,18 +41,15 @@ class ActivityMonitor:
self.trim(now)
def trim(self, now):
self.trim_lock.acquire()
log = self.log
cutoff = now - self.history_length
n = 0
loglen = len(log)
while n < loglen and log[n][0] < cutoff:
n = n + 1
if n:
del log[:n]
self.trim_lock.release()
with self.trim_lock:
log = self.log
cutoff = now - self.history_length
n = 0
loglen = len(log)
while n < loglen and log[n][0] < cutoff:
n = n + 1
if n:
del log[:n]
def setHistoryLength(self, history_length):
self.history_length = history_length
......
......@@ -85,13 +85,13 @@ class BaseStorage(UndoLogCompatible):
# Allocate locks:
self._lock = utils.RLock()
self.__commit_lock = utils.Lock()
self._commit_lock = utils.Lock()
# Comment out the following 4 lines to debug locking:
# Needed by external storages that use this dumb api :(
self._lock_acquire = self._lock.acquire
self._lock_release = self._lock.release
self._commit_lock_acquire = self.__commit_lock.acquire
self._commit_lock_release = self.__commit_lock.release
self._commit_lock_acquire = self._commit_lock.acquire
self._commit_lock_release = self._commit_lock.release
t = time.time()
t = self._ts = TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
......@@ -128,8 +128,8 @@ class BaseStorage(UndoLogCompatible):
def new_oid(self):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._lock_acquire()
try:
with self._lock:
last = self._oid
d = byte_ord(last[-1])
if d < 255: # fast path for the usual case
......@@ -139,19 +139,14 @@ class BaseStorage(UndoLogCompatible):
last = _structpack(">Q", last_as_long + 1)
self._oid = last
return last
finally:
self._lock_release()
# Update the maximum oid in use, under protection of a lock. The
# maximum-in-use attribute is changed only if possible_new_max_oid is
# larger than its current value.
def set_max_oid(self, possible_new_max_oid):
self._lock_acquire()
try:
with self._lock:
if possible_new_max_oid > self._oid:
self._oid = possible_new_max_oid
finally:
self._lock_release()
def registerDB(self, db):
pass # we don't care
......@@ -160,18 +155,17 @@ class BaseStorage(UndoLogCompatible):
return self._is_read_only
def tpc_abort(self, transaction):
self._lock_acquire()
try:
with self._lock:
if transaction is not self._transaction:
return
try:
self._abort()
self._clear_temp()
self._transaction = None
finally:
self._commit_lock_release()
finally:
self._lock_release()
def _abort(self):
"""Subclasses should redefine this to supply abort actions"""
......@@ -180,14 +174,15 @@ class BaseStorage(UndoLogCompatible):
def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._lock_acquire()
try:
with self._lock:
if self._transaction is transaction:
raise POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._lock_release()
self._commit_lock_acquire()
self._lock_acquire()
self._commit_lock.acquire()
with self._lock:
self._transaction = transaction
self._clear_temp()
......@@ -212,8 +207,6 @@ class BaseStorage(UndoLogCompatible):
self._tstatus = status
self._begin(self._tid, user, desc, ext)
finally:
self._lock_release()
def tpc_transaction(self):
return self._transaction
......@@ -224,14 +217,11 @@ class BaseStorage(UndoLogCompatible):
pass
def tpc_vote(self, transaction):
self._lock_acquire()
try:
with self._lock:
if transaction is not self._transaction:
raise POSException.StorageTransactionError(
"tpc_vote called with wrong transaction")
self._vote()
finally:
self._lock_release()
def _vote(self):
"""Subclasses should redefine this to supply transaction vote actions.
......@@ -245,8 +235,7 @@ class BaseStorage(UndoLogCompatible):
# to send an invalidation message to all of the other
# connections!
self._lock_acquire()
try:
with self._lock:
if transaction is not self._transaction:
raise POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
......@@ -259,9 +248,7 @@ class BaseStorage(UndoLogCompatible):
finally:
self._ude = None
self._transaction = None
self._commit_lock_release()
finally:
self._lock_release()
self._commit_lock.release()
def _finish(self, tid, u, d, e):
"""Subclasses should redefine this to supply transaction finish actions
......@@ -273,11 +260,8 @@ class BaseStorage(UndoLogCompatible):
return self._ltid
def getTid(self, oid):
self._lock_acquire()
try:
with self._lock:
return load_current(self, oid)[1]
finally:
self._lock_release()
def loadSerial(self, oid, serial):
raise POSException.Unsupported(
......
......@@ -403,9 +403,7 @@ class DB(object):
"""
# Allocate lock.
x = utils.RLock()
self._a = x.acquire
self._r = x.release
self._lock = utils.RLock()
# pools and cache sizes
self.pool = ConnectionPool(pool_size, pool_timeout)
......@@ -492,8 +490,7 @@ class DB(object):
connection._db must be self on entry.
"""
self._a()
try:
with self._lock:
assert connection._db is self
connection.opened = None
......@@ -501,18 +498,13 @@ class DB(object):
self.historical_pool.repush(connection, connection.before)
else:
self.pool.repush(connection)
finally:
self._r()
def _connectionMap(self, f):
"""Call f(c) for all connections c in all pools, live and historical.
"""
self._a()
try:
with self._lock:
self.pool.map(f)
self.historical_pool.map(f)
finally:
self._r()
def cacheDetail(self):
"""Return information on objects in the various caches
......@@ -716,8 +708,7 @@ class DB(object):
DeprecationWarning, 2)
transaction_manager = None
self._a()
try:
with self._lock:
# result <- a connection
if before is not None:
result = self.historical_pool.pop(before)
......@@ -746,8 +737,6 @@ class DB(object):
self.pool.availableGC()
self.historical_pool.availableGC()
finally:
self._r()
result.open(transaction_manager)
return result
......@@ -814,65 +803,44 @@ class DB(object):
return find_global(modulename, globalname)
def setCacheSize(self, size):
self._a()
try:
with self._lock:
self._cache_size = size
def setsize(c):
c._cache.cache_size = size
self.pool.map(setsize)
finally:
self._r()
def setCacheSizeBytes(self, size):
self._a()
try:
with self._lock:
self._cache_size_bytes = size
def setsize(c):
c._cache.cache_size_bytes = size
self.pool.map(setsize)
finally:
self._r()
def setHistoricalCacheSize(self, size):
self._a()
try:
with self._lock:
self._historical_cache_size = size
def setsize(c):
c._cache.cache_size = size
self.historical_pool.map(setsize)
finally:
self._r()
def setHistoricalCacheSizeBytes(self, size):
self._a()
try:
with self._lock:
self._historical_cache_size_bytes = size
def setsize(c):
c._cache.cache_size_bytes = size
self.historical_pool.map(setsize)
finally:
self._r()
def setPoolSize(self, size):
self._a()
try:
with self._lock:
self.pool.size = size
finally:
self._r()
def setHistoricalPoolSize(self, size):
self._a()
try:
with self._lock:
self.historical_pool.size = size
finally:
self._r()
def setHistoricalTimeout(self, timeout):
self._a()
try:
with self._lock:
self.historical_pool.timeout = timeout
finally:
self._r()
def history(self, *args, **kw):
return self.storage.history(*args, **kw)
......
......@@ -114,7 +114,7 @@ class DemoStorage(ConflictResolvingStorage):
def _copy_methods_from_changes(self, changes):
for meth in (
'_lock_acquire', '_lock_release',
'_lock',
'getSize', 'isReadOnly',
'sortKey', 'tpc_transaction', 'tpc_vote',
):
......@@ -248,22 +248,22 @@ class DemoStorage(ConflictResolvingStorage):
except ZODB.POSException.POSKeyError:
return self.base.loadSerial(oid, serial)
@ZODB.utils.locked
def new_oid(self):
while 1:
oid = ZODB.utils.p64(self._next_oid )
if oid not in self._issued_oids:
try:
load_current(self.changes, oid)
except ZODB.POSException.POSKeyError:
with self._lock:
while 1:
oid = ZODB.utils.p64(self._next_oid )
if oid not in self._issued_oids:
try:
load_current(self.base, oid)
load_current(self.changes, oid)
except ZODB.POSException.POSKeyError:
self._next_oid += 1
self._issued_oids.add(oid)
return oid
try:
load_current(self.base, oid)
except ZODB.POSException.POSKeyError:
self._next_oid += 1
self._issued_oids.add(oid)
return oid
self._next_oid = random.randint(1, 1<<62)
self._next_oid = random.randint(1, 1<<62)
def pack(self, t, referencesf, gc=None):
if gc is None:
......@@ -343,38 +343,39 @@ class DemoStorage(ConflictResolvingStorage):
return self.changes.temporaryDirectory()
raise
@ZODB.utils.locked
def tpc_abort(self, transaction):
if transaction is not self._transaction:
return
self._stored_oids = set()
self._transaction = None
self.changes.tpc_abort(transaction)
self._commit_lock.release()
with self._lock:
if transaction is not self._transaction:
return
self._stored_oids = set()
self._transaction = None
self.changes.tpc_abort(transaction)
self._commit_lock.release()
@ZODB.utils.locked
def tpc_begin(self, transaction, *a, **k):
# The tid argument exists to support testing.
if transaction is self._transaction:
raise ZODB.POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._lock_release()
with self._lock:
# The tid argument exists to support testing.
if transaction is self._transaction:
raise ZODB.POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._commit_lock.acquire()
self._lock_acquire()
self.changes.tpc_begin(transaction, *a, **k)
self._transaction = transaction
self._stored_oids = set()
@ZODB.utils.locked
with self._lock:
self.changes.tpc_begin(transaction, *a, **k)
self._transaction = transaction
self._stored_oids = set()
def tpc_finish(self, transaction, func = lambda tid: None):
if (transaction is not self._transaction):
raise ZODB.POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
self._issued_oids.difference_update(self._stored_oids)
self._stored_oids = set()
self._transaction = None
self.changes.tpc_finish(transaction, func)
self._commit_lock.release()
with self._lock:
if (transaction is not self._transaction):
raise ZODB.POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
self._issued_oids.difference_update(self._stored_oids)
self._stored_oids = set()
self._transaction = None
self.changes.tpc_finish(transaction, func)
self._commit_lock.release()
_temporary_blobdirs = {}
def cleanup_temporary_blobdir(
......
......@@ -747,7 +747,7 @@ class FileStorage(
finally:
self._ude = None
self._transaction = None
self._commit_lock_release()
self._commit_lock.release()
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
......@@ -918,8 +918,8 @@ class FileStorage(
us.search()
# Give another thread a chance, so that a long undoLog()
# operation doesn't block all other activity.
self._lock_release()
self._lock_acquire()
self._lock.release()
self._lock.acquire()
return us.results
def undo(self, transaction_id, transaction):
......@@ -1153,13 +1153,13 @@ class FileStorage(
# blobs and removing the .old file (see further down).
if self.blob_dir:
self._commit_lock_release()
self._commit_lock.release()
have_commit_lock = False
self._remove_blob_files_tagged_for_removal_during_pack()
finally:
if have_commit_lock:
self._commit_lock_release()
self._commit_lock.release()
with self._lock:
self._pack_is_in_progress = False
......@@ -1195,14 +1195,14 @@ class FileStorage(
removed = False
if level:
self._lock_acquire()
self._lock.acquire()
try:
if not os.listdir(path):
os.rmdir(path)
removed = True
finally:
if level:
self._lock_release()
self._lock.release()
if removed:
maybe_remove_empty_dir_containing(path, level+1)
......
......@@ -334,11 +334,10 @@ class FileStoragePacker(FileStorageFormatter):
# path is the storage file path.
# stop is the pack time, as a TimeStamp.
# la and lr are the acquire() and release() methods of the storage's lock.
# cla and clr similarly, for the storage's commit lock.
# current_size is the storage's _pos. All valid data at the start
# lives before that offset (there may be a checkpoint transaction in
# progress after it).
def __init__(self, storage, referencesf, stop, gc=True):
self._storage = storage
if storage.blob_dir:
......@@ -366,10 +365,8 @@ class FileStoragePacker(FileStorageFormatter):
# The packer needs to acquire the parent's commit lock
# during the copying stage, so the two sets of lock acquire
# and release methods are passed to the constructor.
self._lock_acquire = storage._lock_acquire
self._lock_release = storage._lock_release
self._commit_lock_acquire = storage._commit_lock_acquire
self._commit_lock_release = storage._commit_lock_release
self._lock = storage._lock
self._commit_lock = storage._commit_lock
# The packer will use several indexes.
# index: oid -> pos
......@@ -445,11 +442,10 @@ class FileStoragePacker(FileStorageFormatter):
# pack didn't free any data. there's no point in continuing.
close_files_remove()
return None
self._commit_lock_acquire()
self._commit_lock.acquire()
self.locked = True
try:
self._lock_acquire()
try:
with self._lock:
# Re-open the file in unbuffered mode.
# The main thread may write new transactions to the
......@@ -468,8 +464,7 @@ class FileStoragePacker(FileStorageFormatter):
self._file = open(self._path, "rb", 0)
self._file.seek(0, 2)
self.file_end = self._file.tell()
finally:
self._lock_release()
if ipos < self.file_end:
self.copyRest(ipos)
......@@ -486,11 +481,11 @@ class FileStoragePacker(FileStorageFormatter):
# most probably ran out of disk space or some other IO error
close_files_remove()
if self.locked:
self._commit_lock_release()
self._commit_lock.release()
raise # don't succeed silently
except:
if self.locked:
self._commit_lock_release()
self._commit_lock.release()
raise
def copyToPacktime(self):
......@@ -639,7 +634,7 @@ class FileStoragePacker(FileStorageFormatter):
# The call below will raise CorruptedDataError at EOF.
th = self._read_txn_header(ipos)
# Release commit lock while writing to pack file
self._commit_lock_release()
self._commit_lock.release()
self.locked = False
pos = self._tfile.tell()
self._copier.setTxnPos(pos)
......@@ -668,6 +663,6 @@ class FileStoragePacker(FileStorageFormatter):
self.index.update(self.tindex)
self.tindex.clear()
self._commit_lock_acquire()
self._commit_lock.acquire()
self.locked = True
return ipos
......@@ -30,7 +30,6 @@ class IFileStoragePacker(zope.interface.Interface):
or, of the form:
oid.encode('hex')+'\n'
If packing is unnecessary, or would not change the file, then
no pack or removed files are created None is returned,
......@@ -47,7 +46,7 @@ class IFileStoragePacker(zope.interface.Interface):
- Rename the .pack file, and
- process the blob_dir/.removed file by removing the blobs
corresponding to the file records.
corresponding to the file records.
"""
class IFileStorage(zope.interface.Interface):
......@@ -60,14 +59,10 @@ class IFileStorage(zope.interface.Interface):
"The file object used to access the underlying data."
)
def _lock_acquire():
"Acquire the storage lock"
def _lock_release():
"Release the storage lock"
def _commit_lock_acquire():
"Acquire the storage commit lock"
_lock = zope.interface.Attribute(
"The storage lock."
)
def _commit_lock_release():
"Release the storage commit lock"
_commit_lock = zope.interface.Attribute(
"The storage commit lock."
)
......@@ -39,9 +39,7 @@ class MappingStorage(object):
self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
self._ltid = ZODB.utils.z64
self._last_pack = None
_lock = ZODB.utils.RLock()
self._lock_acquire = _lock.acquire
self._lock_release = _lock.release
self._lock = ZODB.utils.RLock()
self._commit_lock = ZODB.utils.Lock()
self._opened = True
self._transaction = None
......@@ -263,24 +261,28 @@ class MappingStorage(object):
self._commit_lock.release()
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def tpc_begin(self, transaction, tid=None):
# The tid argument exists to support testing.
if transaction is self._transaction:
raise ZODB.POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._lock_release()
with self._lock:
ZODB.utils.check_precondition(self.opened)
# The tid argument exists to support testing.
if transaction is self._transaction:
raise ZODB.POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._commit_lock.acquire()
self._lock_acquire()
self._transaction = transaction
self._tdata = {}
if tid is None:
if self._transactions:
old_tid = self._transactions.maxKey()
else:
old_tid = None
tid = ZODB.utils.newTid(old_tid)
self._tid = tid
with self._lock:
self._transaction = transaction
self._tdata = {}
if tid is None:
if self._transactions:
old_tid = self._transactions.maxKey()
else:
old_tid = None
tid = ZODB.utils.newTid(old_tid)
self._tid = tid
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
......
......@@ -696,8 +696,7 @@ class BlobStorageMixin(object):
return self._tid
def _blob_storeblob(self, oid, serial, blobfilename):
self._lock_acquire()
try:
with self._lock:
self.fshelper.getPathForOID(oid, create=True)
targetname = self.fshelper.getBlobFilename(oid, serial)
rename_or_copy_blob(blobfilename, targetname)
......@@ -705,8 +704,6 @@ class BlobStorageMixin(object):
# if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
......@@ -816,13 +813,10 @@ class BlobStorage(BlobStorageMixin):
def pack(self, packtime, referencesf):
"""Remove all unused OID/TID combinations."""
self._lock_acquire()
try:
with self._lock:
if self._blobs_pack_is_in_progress:
raise BlobStorageError('Already packing')
self._blobs_pack_is_in_progress = True
finally:
self._lock_release()
try:
# Pack the underlying storage, which will allow us to determine
......@@ -836,9 +830,8 @@ class BlobStorage(BlobStorageMixin):
else:
self._packNonUndoing(packtime, referencesf)
finally:
self._lock_acquire()
self._blobs_pack_is_in_progress = False
self._lock_release()
with self._lock:
self._blobs_pack_is_in_progress = False
return result
......@@ -853,9 +846,7 @@ class BlobStorage(BlobStorageMixin):
# (belying the web UI legacy of the ZODB code :-()
serial_id = decodebytes(serial_id + b'\n')
self._lock_acquire()
try:
with self._lock:
# we get all the blob oids on the filesystem related to the
# transaction we want to undo.
for oid in self.fshelper.getOIDsForSerial(serial_id):
......@@ -887,8 +878,6 @@ class BlobStorage(BlobStorageMixin):
utils.cp(orig, new)
self.dirty_oids.append((oid, undo_serial))
finally:
self._lock_release()
return undo_serial, keys
def new_instance(self):
......
......@@ -188,8 +188,7 @@ class MBox:
self._max = max
def next(self):
self._lock.acquire()
try:
with self.lock:
if self._max > 0 and self.number >= self._max:
raise IndexError(self.number + 1)
message = next(self._mbox)
......@@ -199,8 +198,6 @@ class MBox:
message.number = self.number
message.mbox = self.__name__
return message
finally:
self._lock.release()
bins = 9973
#bins = 11
......
......@@ -32,8 +32,7 @@ class MVCCMappingStorage(MappingStorage):
# _polled_tid contains the transaction ID at the last poll.
self._polled_tid = b''
self._data_snapshot = None # {oid->(state, tid)}
self._main_lock_acquire = self._lock_acquire
self._main_lock_release = self._lock_release
self._main_lock = self._lock
def new_instance(self):
"""Returns a storage instance that is a view of the same data.
......@@ -48,8 +47,7 @@ class MVCCMappingStorage(MappingStorage):
inst.pack = self.pack
inst.loadBefore = self.loadBefore
inst._ltid = self._ltid
inst._main_lock_acquire = self._lock_acquire
inst._main_lock_release = self._lock_release
inst._main_lock = self._lock
return inst
@ZODB.utils.locked(MappingStorage.opened)
......@@ -73,8 +71,7 @@ class MVCCMappingStorage(MappingStorage):
"""Poll the storage for changes by other connections.
"""
# prevent changes to _transactions and _data during analysis
self._main_lock_acquire()
try:
with self._main_lock:
if self._transactions:
new_tid = self._transactions.maxKey()
else:
......@@ -110,9 +107,6 @@ class MVCCMappingStorage(MappingStorage):
continue
changed_oids.update(txn.data.keys())
finally:
self._main_lock_release()
self._polled_tid = self._ltid = new_tid
return list(changed_oids)
......@@ -126,8 +120,5 @@ class MVCCMappingStorage(MappingStorage):
def pack(self, t, referencesf, gc=True):
# prevent all concurrent commits during packing
self._commit_lock.acquire()
try:
with self._commit_lock:
MappingStorage.pack(self, t, referencesf, gc)
finally:
self._commit_lock.release()
......@@ -214,7 +214,7 @@ def testSomeDelegation():
... six.print_(self.name, 'closed')
... sortKey = __len__ = getTid = None
... tpc_finish = tpc_vote = tpc_transaction = None
... _lock_acquire = _lock_release = lambda self: None
... _lock = ZODB.utils.Lock()
... getName = lambda self: 'S'
... isReadOnly = tpc_transaction = None
... supportsUndo = undo = undoLog = undoInfo = None
......@@ -240,6 +240,8 @@ def testSomeDelegation():
begin 2 3
>>> storage.tpc_abort(1)
>>>
"""
def blob_pos_key_error_with_non_blob_base():
......
......@@ -99,13 +99,10 @@ class MinimalMemoryStorage(BaseStorage, object):
del self._txn
def _finish(self, tid, u, d, e):
self._lock_acquire()
try:
with self._lock:
self._index.update(self._txn.index)
self._cur.update(self._txn.cur())
self._ltid = self._tid
finally:
self._lock_release()
def loadBefore(self, the_oid, the_tid):
# It's okay if loadBefore() is really expensive, because this
......
......@@ -268,6 +268,12 @@ def mktemp(dir=None, prefix='tmp'):
os.close(handle)
return filename
def check_precondition(precondition):
if not precondition():
raise AssertionError(
"Failed precondition: ",
precondition.__doc__.strip())
class Locked(object):
def __init__(self, func, inst=None, class_=None, preconditions=()):
......@@ -286,8 +292,7 @@ class Locked(object):
inst = args[0]
func = self.__func__.__get__(self.__self__, self.__self_class__)
inst._lock_acquire()
try:
with inst._lock:
for precondition in self.preconditions:
if not precondition(inst):
raise AssertionError(
......@@ -295,8 +300,6 @@ class Locked(object):
precondition.__doc__.strip())
return func(*args, **kw)
finally:
inst._lock_release()
class locked(object):
......
......@@ -98,6 +98,10 @@ we'll create a "lock" type that simply prints when it is called:
... print('acquire')
... def release(self):
... print('release')
... def __enter__(self):
... return self.acquire()
... def __exit__(self, *ignored):
... return self.release()
Now we'll demonstrate the descriptor:
......@@ -150,9 +154,7 @@ supports optional method preconditions [1]_.
>>> class C:
... def __init__(self):
... _lock = Lock()
... self._lock_acquire = _lock.acquire
... self._lock_release = _lock.release
... self._lock = Lock()
... self._opened = True
... self._transaction = None
...
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment