Commit a9330b7e authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #44 from zopefoundation/simplify-server-commit-lock-management

Simplify server commit lock management
parents 815f39d1 3d02129b
......@@ -47,7 +47,7 @@ from ZODB.loglevels import BLATHER
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.utils import oid_repr, p64, u64, z64
from ZODB.utils import oid_repr, p64, u64, z64, Lock, RLock
from .asyncio.server import Acceptor
......@@ -98,6 +98,7 @@ class ZEOStorage:
def notify_connected(self, conn):
self.connection = conn
self.call_soon_threadsafe = conn.call_soon_threadsafe
self.connected = True
assert conn.protocol_version is not None
self.log_label = _addr_label(conn.addr)
......@@ -215,6 +216,7 @@ class ZEOStorage:
self.storage = storage
self.setup_delegation()
self.stats = self.server.register_connection(storage_id, self)
self.lock_manager = self.server.lock_managers[storage_id]
def get_info(self):
storage = self.storage
......@@ -353,12 +355,8 @@ class ZEOStorage:
def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort()
if self.locked:
self.server.unlock_storage(self)
self.locked = 0
if self.transaction is not None:
self.server.stop_waiting(self)
self.transaction = None
self.lock_manager.release(self)
self.transaction = None
self.stats.active_txns -= 1
if self.txnlog is not None:
self.txnlog.close()
......@@ -369,98 +367,69 @@ class ZEOStorage:
def vote(self, tid):
self._check_tid(tid, exc=StorageTransactionError)
if self.locked or self.server.already_waiting(self):
raise StorageTransactionError(
'Already voting (%s)' % (self.locked and 'locked' or 'waiting')
)
return self._try_to_vote()
return self.lock_manager.lock(self, self._vote)
def _vote(self, delay=None):
# Called from client thread
def _try_to_vote(self, delay=None):
if not self.connected:
return # We're disconnected
if delay is not None and delay.sent:
# as a consequence of the unlocking strategy, _try_to_vote
# may be called multiple times for delayed
# transactions. The first call will mark the delay as
# sent. We should skip if the delay was already sent.
return
try:
self.log(
"Preparing to commit transaction: %d objects, %d bytes"
% (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction,
self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
self.locked, delay = self.server.lock_storage(self, delay)
if self.locked:
result = None
try:
self.log(
"Preparing to commit transaction: %d objects, %d bytes"
% (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction,
self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
for op, args in self.txnlog:
getattr(self, op)(*args)
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)
if not self.conflicts:
try:
serials = self.storage.tpc_vote(self.transaction)
except ConflictError as err:
if (self.client_conflict_resolution and
err.oid and err.serials and err.data
):
self.conflicts[err.oid] = dict(
oid=err.oid, serials=err.serials, data=err.data)
else:
raise
else:
if serials:
self.serials.extend(serials)
result = self.serials
if self.conflicts:
result = list(self.conflicts.values())
self.storage.tpc_abort(self.transaction)
self.server.unlock_storage(self)
self.locked = False
self.server.stop_waiting(self)
except Exception as err:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
for op, args in self.txnlog:
getattr(self, op)(*args)
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error %s" % err, BLATHER)
if not isinstance(err, TransactionError):
logger.exception("While voting")
if delay is not None:
delay.error(sys.exc_info())
if not self.conflicts:
try:
serials = self.storage.tpc_vote(self.transaction)
except ConflictError as err:
if (self.client_conflict_resolution and
err.oid and err.serials and err.data
):
self.conflicts[err.oid] = dict(
oid=err.oid, serials=err.serials, data=err.data)
else:
raise
else:
raise
if serials:
self.serials.extend(serials)
if self.conflicts:
self.storage.tpc_abort(self.transaction)
return list(self.conflicts.values())
else:
if delay is not None:
delay.reply(result)
else:
return result
self.locked = True # signal to lock manager to hold lock
return self.serials
else:
return delay
except Exception as err:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
def _unlock_callback(self, delay):
if self.connected:
self.connection.call_soon_threadsafe(self._try_to_vote, delay)
else:
self.server.stop_waiting(self)
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error %s" % err, BLATHER)
if not isinstance(err, TransactionError):
logger.exception("While voting")
raise
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
......@@ -741,19 +710,22 @@ class StorageServer:
(self.__class__.__name__, read_only and "RO" or "RW", msg))
self._lock = threading.Lock()
self._commit_locks = {}
self._waiting = dict((name, []) for name in storages)
self._lock = Lock()
self.ssl = ssl # For dev convenience
self.read_only = read_only
self.database = None
# A list, by server, of at most invalidation_queue_size invalidations.
# The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than
# self.invq_bound elements.
self.invq_bound = invalidation_queue_size
self.invq = {}
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.lock_managers = {} # {storage_id -> LockManager}
self.stats = {} # {storage_id -> StorageStats}
for name, storage in storages.items():
self._setup_invq(name, storage)
storage.registerDB(StorageServerDB(self, name))
......@@ -761,8 +733,19 @@ class StorageServer:
# XXX this may go away later, when storages grow
# configuration for this.
storage.tryToResolveConflict = never_resolve_conflict
self.zeo_storages_by_storage_id[name] = []
self.stats[name] = stats = StorageStats(
self.zeo_storages_by_storage_id[name])
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
else:
timeout = TimeoutThread(transaction_timeout)
timeout.setName("TimeoutThread for %s" % name)
timeout.start()
self.lock_managers[name] = LockManager(name, stats, timeout)
self.invalidation_age = invalidation_age
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.client_conflict_resolution = client_conflict_resolution
if addr is not None:
......@@ -774,21 +757,6 @@ class StorageServer:
self.loop = self.acceptor.loop
ZODB.event.notify(Serving(self, address=self.acceptor.addr))
self.stats = {}
self.timeouts = {}
for name in self.storages.keys():
self.zeo_storages_by_storage_id[name] = []
self.stats[name] = StorageStats(
self.zeo_storages_by_storage_id[name])
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
else:
timeout = TimeoutThread(transaction_timeout)
timeout.setName("TimeoutThread for %s" % name)
timeout.start()
self.timeouts[name] = timeout
def create_client_handler(self):
return ZEOStorage(self, self.read_only)
......@@ -855,7 +823,7 @@ class StorageServer:
# later transactions since they will have to validate their
# caches anyway.
for zs in self.zeo_storages_by_storage_id[storage_id][:]:
zs.connection.call_soon_threadsafe(zs.connection.close)
zs.call_soon_threadsafe(zs.connection.close)
def invalidate(
self, zeo_storage, storage_id, tid, invalidated=(), info=None):
......@@ -993,8 +961,7 @@ class StorageServer:
for zs in zeo_storages[:]:
try:
logger.debug("Closing %s", zs.connection)
zs.connection.call_soon_threadsafe(
zs.connection.close)
zs.call_soon_threadsafe(zs.connection.close)
except Exception:
logger.exception("closing connection %r", zs)
......@@ -1014,108 +981,12 @@ class StorageServer:
if zeo_storage in zeo_storages:
zeo_storages.remove(zeo_storage)
def lock_storage(self, zeostore, delay):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
if storage_id in self._commit_locks:
# The lock is held by another zeostore
locked = self._commit_locks[storage_id]
assert locked is not zeostore, (storage_id, delay)
if not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)
del self._commit_locks[storage_id]
# yuck: have to manipulate lock to appease with :(
self._lock.release()
try:
return self.lock_storage(zeostore, delay)
finally:
self._lock.acquire()
if delay is None:
# New request, queue it
assert not [i for i in waiting if i[0] is zeostore
], "already waiting"
delay = Delay()
waiting.append((zeostore, delay))
zeostore.log("(%r) queue lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
return False, delay
else:
self._commit_locks[storage_id] = zeostore
self.timeouts[storage_id].begin(zeostore)
self.stats[storage_id].lock_time = time.time()
if delay is not None:
# we were waiting, stop
waiting[:] = [i for i in waiting if i[0] is not zeostore]
zeostore.log("(%r) lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
return True, delay
def unlock_storage(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
assert self._commit_locks[storage_id] is zeostore
del self._commit_locks[storage_id]
self.timeouts[storage_id].end(zeostore)
self.stats[storage_id].lock_time = None
callbacks = waiting[:]
if callbacks:
assert not [i for i in waiting if i[0] is zeostore
], "waiting while unlocking"
zeostore.log("(%r) unlock: transactions waiting: %s"
% (storage_id, len(callbacks)),
_level_for_waiting(callbacks)
)
for zeostore, delay in callbacks:
try:
zeostore._unlock_callback(delay)
except (SystemExit, KeyboardInterrupt):
raise
except Exception:
logger.exception("Calling unlock callback")
def stop_waiting(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
new_waiting = [i for i in waiting if i[0] is not zeostore]
if len(new_waiting) == len(waiting):
return
waiting[:] = new_waiting
zeostore.log("(%r) dequeue lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
def already_waiting(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
return bool([i for i in waiting if i[0] is zeostore])
def server_status(self, storage_id):
status = self.stats[storage_id].__dict__.copy()
status['connections'] = len(status['connections'])
status['waiting'] = len(self._waiting[storage_id])
status['timeout-thread-is-alive'] = self.timeouts[storage_id].isAlive()
lock_manager = self.lock_managers[storage_id]
status['waiting'] = len(lock_manager.waiting)
status['timeout-thread-is-alive'] = lock_manager.timeout.isAlive()
last_transaction = self.storages[storage_id].lastTransaction()
last_transaction_hex = codecs.encode(last_transaction, 'hex_codec')
if PY3:
......@@ -1129,14 +1000,6 @@ class StorageServer:
for storage_id in self.storages)
def _level_for_waiting(waiting):
if len(waiting) > 9:
return logging.CRITICAL
if len(waiting) > 3:
return logging.WARNING
else:
return logging.DEBUG
class StubTimeoutThread:
def begin(self, client):
......@@ -1197,8 +1060,7 @@ class TimeoutThread(threading.Thread):
client.log("Transaction timeout after %s seconds" %
self._timeout, logging.CRITICAL)
try:
client.connection.call_soon_threadsafe(
client.connection.close)
client.call_soon_threadsafe(client.connection.close)
except:
client.log("Timeout failure", logging.CRITICAL,
exc_info=sys.exc_info())
......@@ -1311,3 +1173,129 @@ def never_resolve_conflict(oid, committedSerial, oldSerial, newpickle,
committedData=b''):
raise ConflictError(oid=oid, serials=(committedSerial, oldSerial),
data=newpickle)
class LockManager(object):
def __init__(self, storage_id, stats, timeout):
self.storage_id = storage_id
self.stats = stats
self.timeout = timeout
self.locked = None
self.waiting = {} # {ZEOStorage -> (func, delay)}
self._lock = RLock()
def lock(self, zs, func):
"""Call the given function with the commit lock.
If we can get the lock right away, return the result of
calling the function.
If we can't get the lock right away, return a delay
The function must set ``locked`` on the zeo-storage to
indicate that the zeo-storage should be locked. Otherwise,
the lock isn't held pas the call.
"""
with self._lock:
if self._can_lock(zs):
self._locked(zs)
else:
if any(w for w in self.waiting if w is zs):
raise StorageTransactionError("Already voting (waiting)")
delay = Delay()
self.waiting[zs] = (func, delay)
self._log_waiting(
zs, "(%r) queue lock: transactions waiting: %s")
return delay
try:
result = func()
except Exception:
self.release(zs)
raise
else:
if not zs.locked:
self.release(zs)
return result
def _lock_waiting(self, zs):
waiting = None
with self._lock:
if self.locked is zs:
assert zs.locked
return
if self._can_lock(zs):
waiting = self.waiting.pop(zs, None)
if waiting:
self._locked(zs)
if waiting:
func, delay = waiting
try:
result = func()
except Exception:
delay.error(sys.exc_info())
self.release(zs)
else:
delay.reply(result)
if not zs.locked:
self.release(zs)
def release(self, zs):
with self._lock:
locked = self.locked
if locked is zs:
self._unlocked(zs)
for zs in list(self.waiting):
zs.call_soon_threadsafe(self._lock_waiting, zs)
else:
if self.waiting.pop(zs, None):
self._log_waiting(
zs, "(%r) dequeue lock: transactions waiting: %s")
def _log_waiting(self, zs, message):
l = len(self.waiting)
zs.log(message % (self.storage_id, l),
logging.CRITICAL if l > 9 else (
logging.WARNING if l > 3 else logging.DEBUG)
)
def _can_lock(self, zs):
locked = self.locked
if locked is zs:
raise StorageTransactionError("Already voting (locked)")
if locked is not None:
if not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)
self._unlocked(locked)
locked = None
else:
assert locked.locked
return locked is None
def _locked(self, zs):
self.locked = zs
self.stats.lock_time = time.time()
self._log_waiting(zs, "(%r) lock: transactions waiting: %s")
self.timeout.begin(zs)
return True
def _unlocked(self, zs):
assert self.locked is zs
self.timeout.end(zs)
self.locked = self.stats.lock_time = None
zs.locked = False
self._log_waiting(zs, "(%r) unlock: transactions waiting: %s")
......@@ -35,7 +35,7 @@ To use this module, replace::
from .asyncio.server import Acceptor
with:
with::
from .asyncio.mtacceptor import Acceptor
......
......@@ -165,12 +165,14 @@ class Delay(object):
def reply(self, obj):
self.sent = 'reply'
self.protocol.send_reply(self.msgid, obj)
if self.protocol:
self.protocol.send_reply(self.msgid, obj)
def error(self, exc_info):
self.sent = 'error'
logger.error("Error raised in delayed method", exc_info=exc_info)
self.protocol.send_error(self.msgid, exc_info[1])
if self.protocol:
self.protocol.send_error(self.msgid, exc_info[1])
def __repr__(self):
return "%s[%s, %r, %r, %r]" % (
......
......@@ -30,6 +30,8 @@ from ZEO._compat import StringIO
logger = logging.getLogger('ZEO.tests.forker')
DEBUG = os.environ.get('ZEO_TEST_SERVER_DEBUG')
class ZEOConfig:
"""Class to generate ZEO configuration file. """
......@@ -88,7 +90,7 @@ def runner(config, qin, qout, timeout=None,
debug=False, name=None,
keep=False, protocol=None):
if debug:
if debug or DEBUG:
debug_logging()
old_protocol = None
......
......@@ -48,6 +48,7 @@ class FakeServer:
'1': FakeStorage(),
'2': FakeStorageBase(),
}
lock_managers = storages
def register_connection(*args):
return None, None
......@@ -58,6 +59,8 @@ class FakeConnection:
protocol_version = b'Z4'
addr = 'test'
call_soon_threadsafe = lambda f, *a: f(*a)
def test_server_record_iternext():
"""
......
......@@ -169,6 +169,7 @@ So, we arrange to get an error in vote:
>>> zs = ZEO.tests.servertesting.client(server, 1)
>>> zs.tpc_begin('0', '', '', {})
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
>>> zs.vote('0')
Traceback (most recent call last):
...
......@@ -176,7 +177,7 @@ So, we arrange to get an error in vote:
When we do, the storage server's transaction lock shouldn't be held:
>>> '1' in server._commit_locks
>>> zs.lock_manager.locked is not None
False
Of course, if vote suceeds, the lock will be held:
......@@ -186,7 +187,7 @@ Of course, if vote suceeds, the lock will be held:
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
>>> _ = zs.vote('1') # doctest: +ELLIPSIS
>>> '1' in server._commit_locks
>>> zs.lock_manager.locked is not None
True
>>> zs.tpc_abort('1')
......@@ -361,13 +362,13 @@ release the lock and one of the waiting clients will get the lock.
>>> zs2.notify_disconnected() # doctest: +ELLIPSIS
ZEO.StorageServer INFO
(test-addr-2) disconnected during locked transaction
(test-addr-...) disconnected during locked transaction
ZEO.StorageServer CRITICAL
(test-addr-2) ('1') unlock: transactions waiting: 10
(test-addr-...) ('1') unlock: transactions waiting: 10
ZEO.StorageServer WARNING
(test-addr-1) ('1') lock: transactions waiting: 9
(test-addr-...) ('1') lock: transactions waiting: 9
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
(test-addr-...) Preparing to commit transaction: 1 objects, ... bytes
(In practice, waiting clients won't necessarily get the lock in order.)
......@@ -392,45 +393,19 @@ statistics using the server_status method:
If clients disconnect while waiting, they will be dequeued:
>>> for client in clients:
... client.notify_disconnected()
... client.notify_disconnected() # doctest: +ELLIPSIS
ZEO.StorageServer INFO
(test-addr-10) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-10) ('1') dequeue lock: transactions waiting: 8
ZEO.StorageServer INFO
(test-addr-11) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-11) ('1') dequeue lock: transactions waiting: 7
ZEO.StorageServer INFO
(test-addr-12) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-12) ('1') dequeue lock: transactions waiting: 6
ZEO.StorageServer INFO
(test-addr-13) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-13) ('1') dequeue lock: transactions waiting: 5
ZEO.StorageServer INFO
(test-addr-14) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-14) ('1') dequeue lock: transactions waiting: 4
ZEO.StorageServer INFO
(test-addr-15) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-15) ('1') dequeue lock: transactions waiting: 3
ZEO.StorageServer INFO
(test-addr-16) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-16) ('1') dequeue lock: transactions waiting: 2
ZEO.StorageServer INFO
(test-addr-17) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-17) ('1') dequeue lock: transactions waiting: 1
ZEO.StorageServer INFO
(test-addr-18) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-18) ('1') dequeue lock: transactions waiting: 0
...
>>> zs1.server_status()['waiting']
0
>>> zs1.tpc_abort(tid1)
ZEO.StorageServer DEBUG
(test-addr-1) ('1') unlock: transactions waiting: 0
>>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
>>> logging.getLogger('ZEO').removeHandler(handler)
......@@ -494,6 +469,8 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
>>> zs1.connection.connection_lost(None)
ZEO.StorageServer INFO
(test-addr-1) disconnected during locked transaction
ZEO.StorageServer DEBUG
(test-addr-1) ('1') unlock: transactions waiting: 0
>>> zs2 = ZEO.tests.servertesting.client(server, '2')
ZEO.asyncio.base INFO
......@@ -508,6 +485,8 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
(test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
>>> zs2.tpc_abort(tid2)
ZEO.StorageServer DEBUG
(test-addr-2) ('1') unlock: transactions waiting: 0
>>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
>>> logging.getLogger('ZEO').removeHandler(handler)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment