Commit 48ff0513 authored by Vincent Pelletier's avatar Vincent Pelletier

client.container: Reduce lock contention on queue.put .

parent 7cdd3f8f
...@@ -15,7 +15,53 @@ ...@@ -15,7 +15,53 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from thread import get_ident from thread import get_ident
from neo.lib.locking import Queue from neo.lib.locking import Lock, Empty
from collections import deque
class SimpleQueue(object):
"""
Similar to Queue.Queue but with simpler locking scheme, reducing lock
contention on "put" (benchmark shows 60% less time spent in "put").
As a result:
- only a single consumer possible ("get" vs. "get" race condition)
- only a single producer possible ("put" vs. "put" race condition)
- no blocking size limit possible
- no consumer -> producer notifications (task_done/join API)
Queue is on the critical path: any moment spent here increases client
application wait for object data, transaction completion, etc.
As we have a single consumer (client application's thread) and a single
producer (lib.dispatcher, which can be called from several threads but
serialises calls internally) for each queue, Queue.Queue's locking scheme
can be relaxed to reduce latency.
"""
__slots__ = ('_lock', '_unlock', '_popleft', '_append', '_queue')
def __init__(self):
lock = Lock()
self._lock = lock.acquire
self._unlock = lock.release
self._queue = queue = deque()
self._popleft = queue.popleft
self._append = queue.append
def get(self, block):
if block:
self._lock(False)
while True:
try:
return self._popleft()
except IndexError:
if not block:
raise Empty
self._lock()
def put(self, item):
self._append(item)
self._lock(False)
self._unlock()
def empty(self):
return not self._queue
class ContainerBase(object): class ContainerBase(object):
def __init__(self): def __init__(self):
...@@ -44,7 +90,7 @@ class ThreadContainer(ContainerBase): ...@@ -44,7 +90,7 @@ class ThreadContainer(ContainerBase):
def _new(self): def _new(self):
return { return {
'queue': Queue(0), 'queue': SimpleQueue(),
'answer': None, 'answer': None,
} }
...@@ -65,7 +111,7 @@ class TransactionContainer(ContainerBase): ...@@ -65,7 +111,7 @@ class TransactionContainer(ContainerBase):
def _new(self, txn): def _new(self, txn):
return { return {
'queue': Queue(0), 'queue': SimpleQueue(),
'txn': txn, 'txn': txn,
'ttid': None, 'ttid': None,
'data_dict': {}, 'data_dict': {},
......
from threading import Lock as threading_Lock from threading import Lock as threading_Lock
from threading import RLock as threading_RLock from threading import RLock as threading_RLock
from threading import currentThread from threading import currentThread
from Queue import Queue as Queue_Queue
from Queue import Empty from Queue import Empty
""" """
...@@ -137,46 +136,9 @@ class VerboseLock(VerboseLockBase): ...@@ -137,46 +136,9 @@ class VerboseLock(VerboseLockBase):
return self.lock.locked() return self.lock.locked()
_locked = locked _locked = locked
class VerboseQueue(Queue_Queue):
def __init__(self, maxsize=0):
if maxsize <= 0:
self.put = self._verbose_put
Queue_Queue.__init__(self, maxsize=maxsize)
def _verbose_note(self, fmt, *args):
sys.stderr.write(fmt % args + '\n')
sys.stderr.flush()
def get(self, block=True, timeout=None):
note = self._verbose_note
me = '[%r]%s.get(block=%r, timeout=%r)' % (LockUser(), self, block, timeout)
note('%s waiting', me)
try:
result = Queue_Queue.get(self, block=block, timeout=timeout)
except Exception, exc:
note('%s got exeption %r', me, exc)
raise
note('%s got item', me)
return result
def _verbose_put(self, item, block=True, timeout=None):
note = self._verbose_note
me = '[%r]%s.put(..., block=%r, timeout=%r)' % (LockUser(), self, block, timeout)
try:
Queue_Queue.put(self, item, block=block, timeout=timeout)
except Exception, exc:
note('%s got exeption %r', me, exc)
raise
note('%s put item', me)
def __repr__(self):
return '<%s@%X>' % (self.__class__.__name__, id(self))
if VERBOSE_LOCKING: if VERBOSE_LOCKING:
Lock = VerboseLock Lock = VerboseLock
RLock = VerboseRLock RLock = VerboseRLock
Queue = VerboseQueue
else: else:
Lock = threading_Lock Lock = threading_Lock
RLock = threading_RLock RLock = threading_RLock
Queue = Queue_Queue
...@@ -27,7 +27,6 @@ from neo.lib.connector import ConnectorException, ConnectorTryAgainException, \ ...@@ -27,7 +27,6 @@ from neo.lib.connector import ConnectorException, ConnectorTryAgainException, \
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, PACKET_HEADER_FORMAT from neo.lib.protocol import Packets, PACKET_HEADER_FORMAT
from . import NeoUnitTestBase from . import NeoUnitTestBase
from neo.lib.locking import Queue
class ConnectionTests(NeoUnitTestBase): class ConnectionTests(NeoUnitTestBase):
...@@ -850,12 +849,11 @@ class MTConnectionTests(ConnectionTests): ...@@ -850,12 +849,11 @@ class MTConnectionTests(ConnectionTests):
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
def test_MTClientConnectionQueueParameter(self): def test_MTClientConnectionQueueParameter(self):
queue = Queue()
ask = self._makeClientConnection().ask ask = self._makeClientConnection().ask
packet = Packets.AskPrimary() # Any non-Ping simple "ask" packet packet = Packets.AskPrimary() # Any non-Ping simple "ask" packet
# One cannot "ask" anything without a queue # One cannot "ask" anything without a queue
self.assertRaises(TypeError, ask, packet) self.assertRaises(TypeError, ask, packet)
ask(packet, queue=queue) ask(packet, queue=object())
# ... except Ping # ... except Ping
ask(Packets.Ping()) ask(Packets.Ping())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment