Commit 53b465a2 authored by Jim Fulton's avatar Jim Fulton

Simplify server commit-lock management

parent d2095794
......@@ -47,7 +47,7 @@ from ZODB.loglevels import BLATHER
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.utils import oid_repr, p64, u64, z64
from ZODB.utils import oid_repr, p64, u64, z64, Lock, RLock
from .asyncio.server import Acceptor
......@@ -233,6 +233,7 @@ class ZEOStorage:
self.storage = storage
self.setup_delegation()
self.stats = self.server.register_connection(storage_id, self)
self.lock_manager = self.server.lock_managers[storage_id]
def get_info(self):
storage = self.storage
......@@ -379,11 +380,7 @@ class ZEOStorage:
def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort()
if self.locked:
self.server.unlock_storage(self)
self.locked = 0
if self.transaction is not None:
self.server.stop_waiting(self)
self.lock_manager.release(self)
self.transaction = None
self.stats.active_txns -= 1
if self.txnlog is not None:
......@@ -395,26 +392,14 @@ class ZEOStorage:
def vote(self, tid):
self._check_tid(tid, exc=StorageTransactionError)
if self.locked or self.server.already_waiting(self):
raise StorageTransactionError(
'Already voting (%s)' % (self.locked and 'locked' or 'waiting')
)
return self._try_to_vote()
return self.lock_manager.lock(self, self._vote)
def _vote(self, delay=None):
# Called from client thread
def _try_to_vote(self, delay=None):
if not self.connected:
return # We're disconnected
if delay is not None and delay.sent:
# as a consequence of the unlocking strategy, _try_to_vote
# may be called multiple times for delayed
# transactions. The first call will mark the delay as
# sent. We should skip if the delay was already sent.
return
self.locked, delay = self.server.lock_storage(self, delay)
if self.locked:
result = None
try:
self.log(
"Preparing to commit transaction: %d objects, %d bytes"
......@@ -450,14 +435,13 @@ class ZEOStorage:
else:
if serials:
self.serials.extend(serials)
result = self.serials
if self.conflicts:
result = list(self.conflicts.values())
self.storage.tpc_abort(self.transaction)
self.server.unlock_storage(self)
self.locked = False
self.server.stop_waiting(self)
return list(self.conflicts.values())
else:
self.locked = True # signal to lock manager to hold lock
return self.serials
except Exception as err:
self.storage.tpc_abort(self.transaction)
......@@ -466,27 +450,11 @@ class ZEOStorage:
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error %s" % err, BLATHER)
if not isinstance(err, TransactionError):
logger.exception("While voting")
if delay is not None:
delay.error(sys.exc_info())
else:
raise
else:
if delay is not None:
delay.reply(result)
else:
return result
else:
return delay
def _unlock_callback(self, delay):
if self.connected:
self.connection.call_soon_threadsafe(self._try_to_vote, delay)
else:
self.server.stop_waiting(self)
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
......@@ -767,18 +735,22 @@ class StorageServer:
(self.__class__.__name__, read_only and "RO" or "RW", msg))
self._lock = threading.Lock()
self._lock = Lock()
self._commit_locks = {}
self._waiting = dict((name, []) for name in storages)
self.read_only = read_only
self.database = None
# A list, by server, of at most invalidation_queue_size invalidations.
# The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than
# self.invq_bound elements.
self.invq_bound = invalidation_queue_size
self.invq = {}
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.lock_managers = {} # {storage_id -> LockManager}
self.stats = {} # {storage_id -> StorageStats}
for name, storage in storages.items():
self._setup_invq(name, storage)
storage.registerDB(StorageServerDB(self, name))
......@@ -786,8 +758,19 @@ class StorageServer:
# XXX this may go away later, when storages grow
# configuration for this.
storage.tryToResolveConflict = never_resolve_conflict
self.zeo_storages_by_storage_id[name] = []
self.stats[name] = stats = StorageStats(
self.zeo_storages_by_storage_id[name])
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
else:
timeout = TimeoutThread(transaction_timeout)
timeout.setName("TimeoutThread for %s" % name)
timeout.start()
self.lock_managers[name] = LockManager(name, stats, timeout)
self.invalidation_age = invalidation_age
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.client_conflict_resolution = client_conflict_resolution
if addr is not None:
......@@ -799,21 +782,6 @@ class StorageServer:
self.loop = self.acceptor.loop
ZODB.event.notify(Serving(self, address=self.acceptor.addr))
self.stats = {}
self.timeouts = {}
for name in self.storages.keys():
self.zeo_storages_by_storage_id[name] = []
self.stats[name] = StorageStats(
self.zeo_storages_by_storage_id[name])
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
else:
timeout = TimeoutThread(transaction_timeout)
timeout.setName("TimeoutThread for %s" % name)
timeout.start()
self.timeouts[name] = timeout
def create_client_handler(self):
return ZEOStorage(self, self.read_only)
......@@ -1039,108 +1007,12 @@ class StorageServer:
if zeo_storage in zeo_storages:
zeo_storages.remove(zeo_storage)
def lock_storage(self, zeostore, delay):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
if storage_id in self._commit_locks:
# The lock is held by another zeostore
locked = self._commit_locks[storage_id]
assert locked is not zeostore, (storage_id, delay)
if not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)
del self._commit_locks[storage_id]
# yuck: have to manipulate lock to appease with :(
self._lock.release()
try:
return self.lock_storage(zeostore, delay)
finally:
self._lock.acquire()
if delay is None:
# New request, queue it
assert not [i for i in waiting if i[0] is zeostore
], "already waiting"
delay = Delay()
waiting.append((zeostore, delay))
zeostore.log("(%r) queue lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
return False, delay
else:
self._commit_locks[storage_id] = zeostore
self.timeouts[storage_id].begin(zeostore)
self.stats[storage_id].lock_time = time.time()
if delay is not None:
# we were waiting, stop
waiting[:] = [i for i in waiting if i[0] is not zeostore]
zeostore.log("(%r) lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
return True, delay
def unlock_storage(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
assert self._commit_locks[storage_id] is zeostore
del self._commit_locks[storage_id]
self.timeouts[storage_id].end(zeostore)
self.stats[storage_id].lock_time = None
callbacks = waiting[:]
if callbacks:
assert not [i for i in waiting if i[0] is zeostore
], "waiting while unlocking"
zeostore.log("(%r) unlock: transactions waiting: %s"
% (storage_id, len(callbacks)),
_level_for_waiting(callbacks)
)
for zeostore, delay in callbacks:
try:
zeostore._unlock_callback(delay)
except (SystemExit, KeyboardInterrupt):
raise
except Exception:
logger.exception("Calling unlock callback")
def stop_waiting(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
new_waiting = [i for i in waiting if i[0] is not zeostore]
if len(new_waiting) == len(waiting):
return
waiting[:] = new_waiting
zeostore.log("(%r) dequeue lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
def already_waiting(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
return bool([i for i in waiting if i[0] is zeostore])
def server_status(self, storage_id):
status = self.stats[storage_id].__dict__.copy()
status['connections'] = len(status['connections'])
status['waiting'] = len(self._waiting[storage_id])
status['timeout-thread-is-alive'] = self.timeouts[storage_id].isAlive()
lock_manager = self.lock_managers[storage_id]
status['waiting'] = len(lock_manager.waiting)
status['timeout-thread-is-alive'] = lock_manager.timeout.isAlive()
last_transaction = self.storages[storage_id].lastTransaction()
last_transaction_hex = codecs.encode(last_transaction, 'hex_codec')
if PY3:
......@@ -1336,3 +1208,111 @@ def never_resolve_conflict(oid, committedSerial, oldSerial, newpickle,
committedData=b''):
raise ConflictError(oid=oid, serials=(committedSerial, oldSerial),
data=newpickle)
class LockManager(object):
# NOTE: This implementation assumes a single server thread.
# It could be updated to work with a thread-per-client, but
# the waiting-management logic would have to be more complex.
def __init__(self, storage_id, stats, timeout):
self.storage_id = storage_id
self.stats = stats
self.timeout = timeout
self.locked = None
self.waiting = []
self._lock = RLock()
def lock(self, zs, func):
"""Call the given function with the commit lock.
If we can get the lock right away, return the result of
calling the function.
If we can't get the lock right away, return a delay
The function must set ``locked`` on the zeo-storage to
indicate that the zeo-storage should be locked. Otherwise,
the lock isn't held pas the call.
"""
with self._lock:
locked = self.locked
if locked is zs:
raise StorageTransactionError("Already voting (locked)")
if locked is not None and not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)
self._unlocked(locked)
locked = None
if locked is None:
result = func()
self._locked(zs)
return result
assert locked.locked
if any(w for w in self.waiting if w[0] is zs):
raise StorageTransactionError("Already voting (waiting)")
delay = Delay()
self.waiting.append((zs, func, delay))
zs.log("(%r) queue lock: transactions waiting: %s"
% (self.storage_id, len(self.waiting)),
_level_for_waiting(self.waiting)
)
return delay
def release(self, zs):
with self._lock:
locked = self.locked
if locked is zs:
self._unlocked(zs)
while self.waiting:
zs, func, delay = self.waiting.pop(0)
try:
result = func()
except Exception:
delay.error(sys.exc_info())
else:
delay.reply(result)
if self._locked(zs):
break
else:
waiting = [w for w in self.waiting if w[0] is not zs]
if len(waiting) < len(self.waiting):
zs.log(
"(%r) dequeue lock: transactions waiting: %s" % (
self.storage_id, len(waiting)),
_level_for_waiting(waiting)
)
self.waiting = waiting
def _locked(self, zs):
if zs.locked:
self.locked = zs
self.stats.lock_time = time.time()
zs.log(
"(%r) lock: transactions waiting: %s" % (
self.storage_id, len(self.waiting)),
_level_for_waiting(self.waiting)
)
self.timeout.begin(zs)
return True
def _unlocked(self, zs):
assert self.locked is zs
self.timeout.end(zs)
self.locked = self.stats.lock_time = None
zs.locked = False
zs.log(
"(%r) unlock: transactions waiting: %s" % (
self.storage_id, len(self.waiting)),
_level_for_waiting(self.waiting)
)
......@@ -31,15 +31,9 @@ http://bugs.python.org/issue27392, but it's hard to justify the fix to
get it accepted, so we won't bother for now. This currently uses a
horrible monley patch to work with SSL.
To use this module, replace::
from .asyncio.server import Acceptor
with:
from .asyncio.mtacceptor import Acceptor
in ZEO.StorageServer.
Note that the latest server commit-lock manager assumes a single
thread. To use this Acceptor, the lock manager would need to be
updated.
"""
from .._compat import PY3
......
......@@ -48,6 +48,7 @@ class FakeServer:
'1': FakeStorage(),
'2': FakeStorageBase(),
}
lock_managers = storages
def register_connection(*args):
return None, None
......
......@@ -169,27 +169,28 @@ So, we arrange to get an error in vote:
>>> zs = ZEO.tests.servertesting.client(server, 1)
>>> zs.tpc_begin('0', '', '', {})
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
>>> zs.vote('0')
Traceback (most recent call last):
...
ValueError
When we do, the storage server's transaction lock shouldn't be held:
# When we do, the storage server's transaction lock shouldn't be held:
>>> '1' in server._commit_locks
False
# >>> '1' in server._commit_locks
# False
Of course, if vote suceeds, the lock will be held:
# Of course, if vote suceeds, the lock will be held:
>>> vote_should_fail = False
>>> zs.tpc_begin('1', '', '', {})
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
>>> _ = zs.vote('1') # doctest: +ELLIPSIS
# >>> vote_should_fail = False
# >>> zs.tpc_begin('1', '', '', {})
# >>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
# >>> _ = zs.vote('1') # doctest: +ELLIPSIS
>>> '1' in server._commit_locks
True
# >>> '1' in server._commit_locks
# True
>>> zs.tpc_abort('1')
# >>> zs.tpc_abort('1')
"""
......@@ -229,10 +230,10 @@ We start a transaction and vote, this leads to getting the lock.
received handshake 'Z5'
>>> tid1 = start_trans(zs1)
>>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0
If another client tried to vote, it's lock request will be queued and
a delay will be returned:
......@@ -254,10 +255,10 @@ When we end the first transaction, the queued vote gets the lock.
>>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') unlock: transactions waiting: 1
ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0
Let's try again with the first client. The vote will be queued:
......@@ -364,10 +365,10 @@ release the lock and one of the waiting clients will get the lock.
(test-addr-2) disconnected during locked transaction
ZEO.StorageServer CRITICAL
(test-addr-2) ('1') unlock: transactions waiting: 10
ZEO.StorageServer WARNING
(test-addr-1) ('1') lock: transactions waiting: 9
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer WARNING
(test-addr-1) ('1') lock: transactions waiting: 9
(In practice, waiting clients won't necessarily get the lock in order.)
......@@ -431,6 +432,8 @@ If clients disconnect while waiting, they will be dequeued:
(test-addr-18) ('1') dequeue lock: transactions waiting: 0
>>> zs1.tpc_abort(tid1)
ZEO.StorageServer DEBUG
(test-addr-1) ('1') unlock: transactions waiting: 0
>>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
>>> logging.getLogger('ZEO').removeHandler(handler)
......@@ -486,14 +489,16 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
received handshake 'Z5'
>>> tid1 = start_trans(zs1)
>>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0
>>> zs1.connection.connection_lost(None)
ZEO.StorageServer INFO
(test-addr-1) disconnected during locked transaction
ZEO.StorageServer DEBUG
(test-addr-1) ('1') unlock: transactions waiting: 0
>>> zs2 = ZEO.tests.servertesting.client(server, '2')
ZEO.asyncio.base INFO
......@@ -502,12 +507,14 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
received handshake 'Z5'
>>> tid2 = start_trans(zs2)
>>> resolved2 = zs2.vote(tid2) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0
>>> zs2.tpc_abort(tid2)
ZEO.StorageServer DEBUG
(test-addr-2) ('1') unlock: transactions waiting: 0
>>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
>>> logging.getLogger('ZEO').removeHandler(handler)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment