Commit 5dc1f06c by Julien Muchembled

Several improvements to verbose locks

All these changes were useful to debug deadlocks in threaded tests:
- New verbose Semaphore.
- Logs with numerical 'ident' were too annoying to read so revert to thread
  name (before commit 5b69d553), with an
  exception for threaded tests. There remains one case where the result is not
  unique: when several client apps are instantiated.
- Make deadlock detection optional.
- Make it possible to name locks.
- Make output more compact.
- Remove useless 'debug_lock' option.
- Add timing information.
- Make exception more verbose when an un-acquired lock is released.

Here is how I used 'locking':

--- a/neo/tests/threaded/
+++ b/neo/tests/threaded/
@@ -37,0 +38 @@
+from neo.lib.locking import VerboseSemaphore
@@ -71 +72,2 @@ def init(cls):
-        cls._global_lock = threading.Semaphore(0)
+        cls._global_lock = VerboseSemaphore(0, check_owner=False,
+                                            name="Serialized._global_lock")
@@ -265 +267,2 @@ def start(self):
-        self.em._lock = l = threading.Semaphore(0)
+        self.em._lock = l = VerboseSemaphore(0, check_owner=False,
+                                             name=self.node_name)
@@ -346 +349,2 @@ def __init__(self, master_nodes, name, **kw):
-        self.em._lock = threading.Semaphore(0)
+        self.em._lock = VerboseSemaphore(0, check_owner=False,
+                                         name=repr(self))
1 parent 0b93b1fb
from threading import Lock as threading_Lock
from threading import RLock as threading_RLock
from threading import currentThread
import os
import sys
import threading
import traceback
from time import time
from Queue import Empty
......@@ -20,20 +22,14 @@ from Queue import Empty
__all__ = ['Lock', 'RLock', 'Queue', 'Empty']
import traceback
import sys
import os
class LockUser(object):
def __init__(self, level=0):
t = currentThread()
self.ident = t.getName()
if self.ident != 'MainThread':
self.ident = t.ident
def __init__(self, message, level=0):
t = threading.currentThread()
ident = getattr(t, 'node_name',
# This class is instanciated from a place desiring to known what
# called it.
# limit=1 would return execution position in this method
......@@ -41,59 +37,57 @@ class LockUser(object):
# limit=3 returns execution position in caller's caller
# Additionnal level value (should be positive only) can be used when
# more intermediate calls are involved
self.stack = stack = traceback.extract_stack()[:-(2 + level)]
self.stack = stack = traceback.extract_stack()[:-2-level]
path, line_number, func_name, line = stack[-1]
# Simplify path. Only keep 3 last path elements. It is enough for
# current Neo directory structure.
path = os.path.join('...', *path.split(os.path.sep)[-3:])
self.caller = (path, line_number, func_name, line)
self.time = time()
self.ident = "%s@%r %s:%s %s" % (
ident, self.time, path, line_number, line)
self.ident = ident
def __eq__(self, other):
return isinstance(other, self.__class__) and self.ident == other.ident
def __repr__(self):
return '%s@%s:%s %s' % (self.ident, self.caller[0], self.caller[1],
return "%s@%r" % (self.ident, self.time)
def formatStack(self):
return ''.join(traceback.format_list(self.stack))
def note():
write = sys.stderr.write
flush = sys.stderr.flush
def note(self, message):
write("[%s] %s\n" % (self.ident, message))
return note
note = note()
class VerboseLockBase(object):
def __init__(self, reentrant=False, debug_lock=False):
self.reentrant = reentrant
self.debug_lock = debug_lock
self.owner = None
self.waiting = []
self._note('%s@%X created by %r', self.__class__.__name__, id(self),
def _note(self, fmt, *args):
sys.stderr.write(fmt % args + '\n')
_error_class = threading.ThreadError
_release_error = 'release unlocked lock'
def _getOwner(self):
if self._locked():
owner = self.owner
owner = None
return owner
def __init__(self, check_owner, name=None, verbose=None):
self._check_owner = check_owner
self._name = name or '<%s@%X>' % (self.__class__.__name__, id(self))
self.owner = None
self.waiting = []
LockUser(repr(self) + " created", 1)
def acquire(self, blocking=1):
me = LockUser()
owner = self._getOwner()
self._note('[%r]%s.acquire(%s) Waiting for lock. Owned by:%r ' \
'Waiting:%r', me, self, blocking, owner, self.waiting)
if (self.debug_lock and owner is not None) or \
(not self.reentrant and blocking and me == owner):
if me == owner:
self._note('[%r]%s.acquire(%s): Deadlock detected: ' \
' I already own this lock:%r', me, self, blocking, owner)
self._note('[%r]%s.acquire(%s): debug lock triggered: %r',
me, self, blocking, owner)
self._note('[%r] Owner traceback:\n%s', me, owner.formatStack())
self._note('[%r] My traceback:\n%s', me, me.formatStack())
owner = self.owner if self._locked() else None
me = LockUser("%s.acquire(%s). Owned by %r. Waiting: %r"
% (self, blocking, owner, self.waiting))
if blocking:
if self._check_owner and me == owner:
me.note("I already own this lock: %r" % owner)
me.note("Owner traceback:\n%s" % owner.formatStack())
me.note("My traceback:\n%s" % me.formatStack())
locked = self.lock.acquire(blocking)
......@@ -102,16 +96,20 @@ class VerboseLockBase(object):
if locked:
self.owner = me
self._note('[%r]%s.acquire(%s) Lock granted. Waiting: %r',
me, self, blocking, self.waiting)
me.note("Lock granted. Waiting: " + repr(self.waiting))
return locked
__enter__ = acquire
def release(self):
me = LockUser()
self._note('[%r]%s.release() Waiting: %r', me, self, self.waiting)
return self.lock.release()
me = LockUser("%s.release(). Waiting: %r" % (self, self.waiting))
return self.lock.release()
except self._error_class:
t, v, tb = sys.exc_info()
if str(v) == self._release_error:
raise t, "%s %s (%s)" % (v, self, me), tb
def __exit__(self, t, v, tb):
......@@ -120,13 +118,17 @@ class VerboseLockBase(object):
raise NotImplementedError
def __repr__(self):
return '<%s@%X>' % (self.__class__.__name__, id(self))
return self._name
class VerboseRLock(VerboseLockBase):
def __init__(self, verbose=None, debug_lock=False):
super(VerboseRLock, self).__init__(reentrant=True,
self.lock = threading_RLock()
_error_class = RuntimeError
_release_error = 'cannot release un-acquired lock'
def __init__(self, **kw):
super(VerboseRLock, self).__init__(check_owner=False, **kw)
self.lock = threading.RLock()
def _locked(self):
return self.lock._RLock__block.locked()
......@@ -135,17 +137,30 @@ class VerboseRLock(VerboseLockBase):
return self.lock._is_owned()
class VerboseLock(VerboseLockBase):
def __init__(self, verbose=None, debug_lock=False):
super(VerboseLock, self).__init__(debug_lock=debug_lock)
self.lock = threading_Lock()
def __init__(self, check_owner=True, **kw):
super(VerboseLock, self).__init__(check_owner, **kw)
self.lock = threading.Lock()
def locked(self):
return self.lock.locked()
_locked = locked
class VerboseSemaphore(VerboseLockBase):
def __init__(self, value=1, check_owner=True, **kw):
super(VerboseSemaphore, self).__init__(check_owner, **kw)
self.lock = threading.Semaphore(value)
def _locked(self):
return not self.lock._Semaphore__value
Lock = VerboseLock
RLock = VerboseRLock
Semaphore = VerboseSemaphore
Lock = threading_Lock
RLock = threading_RLock
Lock = threading.Lock
RLock = threading.RLock
Semaphore = threading.Semaphore
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!