Commit fc2049b7 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Take locks around connection's ask/notify instead of app.

Nothing is protected between lock/unlock, only writes to local_var are done and
and does not need exclusive access because local_var is thread-specific.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1926 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent db8db123
...@@ -245,22 +245,14 @@ class Application(object): ...@@ -245,22 +245,14 @@ class Application(object):
@profiler_decorator @profiler_decorator
def _askStorage(self, conn, packet): def _askStorage(self, conn, packet):
""" Send a request to a storage node and process it's answer """ """ Send a request to a storage node and process it's answer """
try: msg_id = conn.ask(packet)
msg_id = conn.ask(packet)
finally:
# assume that the connection was already locked
conn.unlock()
self._waitMessage(conn, msg_id, self.storage_handler) self._waitMessage(conn, msg_id, self.storage_handler)
@profiler_decorator @profiler_decorator
def _askPrimary(self, packet): def _askPrimary(self, packet):
""" Send a request to the primary master and process it's answer """ """ Send a request to the primary master and process it's answer """
conn = self._getMasterConnection() conn = self._getMasterConnection()
conn.lock() msg_id = conn.ask(packet)
try:
msg_id = conn.ask(packet)
finally:
conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler) self._waitMessage(conn, msg_id, self.primary_handler)
@profiler_decorator @profiler_decorator
...@@ -327,16 +319,12 @@ class Application(object): ...@@ -327,16 +319,12 @@ class Application(object):
connector=self.connector_handler(), connector=self.connector_handler(),
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
# Query for primary master node # Query for primary master node
conn.lock() if conn.getConnector() is None:
try: # This happens if a connection could not be established.
if conn.getConnector() is None: logging.error('Connection to master node %s failed',
# This happens if a connection could not be established. self.trying_master_node)
logging.error('Connection to master node %s failed', continue
self.trying_master_node) msg_id = conn.ask(Packets.AskPrimary())
continue
msg_id = conn.ask(Packets.AskPrimary())
finally:
conn.unlock()
try: try:
self._waitMessage(conn, msg_id, self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler) handler=self.primary_bootstrap_handler)
...@@ -349,18 +337,14 @@ class Application(object): ...@@ -349,18 +337,14 @@ class Application(object):
logging.info('connected to a primary master node') logging.info('connected to a primary master node')
# Identify to primary master and request initial data # Identify to primary master and request initial data
while conn.getUUID() is None: while conn.getUUID() is None:
conn.lock() if conn.getConnector() is None:
try: logging.error('Connection to master node %s lost',
if conn.getConnector() is None: self.trying_master_node)
logging.error('Connection to master node %s lost', self.primary_master_node = None
self.trying_master_node) break
self.primary_master_node = None p = Packets.RequestIdentification(NodeTypes.CLIENT,
break self.uuid, None, self.name)
p = Packets.RequestIdentification(NodeTypes.CLIENT, msg_id = conn.ask(p)
self.uuid, None, self.name)
msg_id = conn.ask(p)
finally:
conn.unlock()
try: try:
self._waitMessage(conn, msg_id, self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler) handler=self.primary_bootstrap_handler)
...@@ -371,18 +355,10 @@ class Application(object): ...@@ -371,18 +355,10 @@ class Application(object):
# Node identification was refused by master. # Node identification was refused by master.
sleep(1) sleep(1)
if self.uuid is not None: if self.uuid is not None:
conn.lock() msg_id = conn.ask(Packets.AskNodeInformation())
try:
msg_id = conn.ask(Packets.AskNodeInformation())
finally:
conn.unlock()
self._waitMessage(conn, msg_id, self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler) handler=self.primary_bootstrap_handler)
conn.lock() msg_id = conn.ask(Packets.AskPartitionTable([]))
try:
msg_id = conn.ask(Packets.AskPartitionTable([]))
finally:
conn.unlock()
self._waitMessage(conn, msg_id, self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler) handler=self.primary_bootstrap_handler)
ready = self.uuid is not None and self.pt is not None \ ready = self.uuid is not None and self.pt is not None \
...@@ -603,10 +579,7 @@ class Application(object): ...@@ -603,10 +579,7 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try: try:
try: conn.ask(p)
conn.ask(p)
finally:
conn.unlock()
except ConnectionClosed: except ConnectionClosed:
continue continue
...@@ -749,18 +722,11 @@ class Application(object): ...@@ -749,18 +722,11 @@ class Application(object):
conn = self.cp.getConnForCell(cell) conn = self.cp.getConnForCell(cell)
if conn is None: if conn is None:
continue continue
try: conn.notify(Packets.AbortTransaction(self.local_var.tid))
conn.notify(Packets.AbortTransaction(self.local_var.tid))
finally:
conn.unlock()
# Abort the transaction in the primary master node. # Abort the transaction in the primary master node.
conn = self._getMasterConnection() conn = self._getMasterConnection()
conn.lock() conn.notify(Packets.AbortTransaction(self.local_var.tid))
try:
conn.notify(Packets.AbortTransaction(self.local_var.tid))
finally:
conn.unlock()
self.local_var.clear() self.local_var.clear()
@profiler_decorator @profiler_decorator
...@@ -877,11 +843,7 @@ class Application(object): ...@@ -877,11 +843,7 @@ class Application(object):
conn = self.cp.getConnForNode(storage_node) conn = self.cp.getConnForNode(storage_node)
if conn is None: if conn is None:
continue continue
conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
try:
conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
finally:
conn.unlock()
# Wait for answers from all storages. # Wait for answers from all storages.
while len(self.local_var.node_tids) != len(storage_node_list): while len(self.local_var.node_tids) != len(storage_node_list):
......
...@@ -116,7 +116,6 @@ class ConnectionPool(object): ...@@ -116,7 +116,6 @@ class ConnectionPool(object):
return None return None
self.connection_dict[node.getUUID()] = conn self.connection_dict[node.getUUID()] = conn
conn.lock()
return conn return conn
@profiler_decorator @profiler_decorator
...@@ -135,7 +134,6 @@ class ConnectionPool(object): ...@@ -135,7 +134,6 @@ class ConnectionPool(object):
try: try:
conn = self.connection_dict[uuid] conn = self.connection_dict[uuid]
# Already connected to node # Already connected to node
conn.lock()
return conn return conn
except KeyError: except KeyError:
# Create new connection to node # Create new connection to node
......
...@@ -596,20 +596,26 @@ class MTClientConnection(ClientConnection): ...@@ -596,20 +596,26 @@ class MTClientConnection(ClientConnection):
def analyse(self, *args, **kw): def analyse(self, *args, **kw):
return super(MTClientConnection, self).analyse(*args, **kw) return super(MTClientConnection, self).analyse(*args, **kw)
@lockCheckWrapper
def notify(self, *args, **kw): def notify(self, *args, **kw):
return super(MTClientConnection, self).notify(*args, **kw) self.lock()
try:
return super(MTClientConnection, self).notify(*args, **kw)
finally:
self.unlock()
@lockCheckWrapper
def ask(self, packet, timeout=CRITICAL_TIMEOUT): def ask(self, packet, timeout=CRITICAL_TIMEOUT):
msg_id = self._getNextId() self.lock()
packet.setId(msg_id) try:
self.dispatcher.register(self, msg_id, self._local_var.queue) msg_id = self._getNextId()
self._addPacket(packet) packet.setId(msg_id)
if not self._handlers.isPending(): self.dispatcher.register(self, msg_id, self._local_var.queue)
self._timeout.update(time(), timeout=timeout) self._addPacket(packet)
self._handlers.emit(packet) if not self._handlers.isPending():
return msg_id self._timeout.update(time(), timeout=timeout)
self._handlers.emit(packet)
return msg_id
finally:
self.unlock()
@lockCheckWrapper @lockCheckWrapper
def answer(self, *args, **kw): def answer(self, *args, **kw):
......
...@@ -950,7 +950,6 @@ class ClientApplicationTests(NeoTestBase): ...@@ -950,7 +950,6 @@ class ClientApplicationTests(NeoTestBase):
Application._waitMessage = _waitMessage_old Application._waitMessage = _waitMessage_old
# check packet sent, connection unlocked and dispatcher updated # check packet sent, connection unlocked and dispatcher updated
self.checkAskNewTid(conn) self.checkAskNewTid(conn)
self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
self.checkDispatcherRegisterCalled(app, conn) self.checkDispatcherRegisterCalled(app, conn)
# and _waitMessage called # and _waitMessage called
self.assertTrue(self.test_ok) self.assertTrue(self.test_ok)
...@@ -976,8 +975,6 @@ class ClientApplicationTests(NeoTestBase): ...@@ -976,8 +975,6 @@ class ClientApplicationTests(NeoTestBase):
Application._waitMessage = _waitMessage_old Application._waitMessage = _waitMessage_old
# check packet sent, connection locked during process and dispatcher updated # check packet sent, connection locked during process and dispatcher updated
self.checkAskNewTid(conn) self.checkAskNewTid(conn)
self.assertEquals(len(conn.mockGetNamedCalls('lock')), 1)
self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
self.checkDispatcherRegisterCalled(app, conn) self.checkDispatcherRegisterCalled(app, conn)
# and _waitMessage called # and _waitMessage called
self.assertTrue(self.test_ok) self.assertTrue(self.test_ok)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment