Commit ae37c472 authored by Vincent Pelletier's avatar Vincent Pelletier

Use a (non-blocking) lock to prevent two simultaneous reconnection attemps to...

Use a (non-blocking) lock to prevent two simultaneous reconnection attemps to master instead of a simple variable.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@266 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 31481811
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Thread from threading import Thread, Lock
from Queue import Empty, Queue from Queue import Empty, Queue
from neo.protocol import PING, Packet, CLIENT_NODE_TYPE, FINISH_TRANSACTION from neo.protocol import PING, Packet, CLIENT_NODE_TYPE, FINISH_TRANSACTION
...@@ -36,7 +36,7 @@ class Dispatcher(Thread): ...@@ -36,7 +36,7 @@ class Dispatcher(Thread):
# and thus redispatch answer to the original thread # and thus redispatch answer to the original thread
self.message_table = {} self.message_table = {}
# Indicate if we are in process of connection to master node # Indicate if we are in process of connection to master node
self.connecting_to_master_node = 0 self.connecting_to_master_node = Lock()
def run(self): def run(self):
while 1: while 1:
...@@ -66,77 +66,80 @@ class Dispatcher(Thread): ...@@ -66,77 +66,80 @@ class Dispatcher(Thread):
This can be called either at bootstrap or when This can be called either at bootstrap or when
client got disconnected during process""" client got disconnected during process"""
# Indicate we are trying to connect to avoid multiple try a time # Indicate we are trying to connect to avoid multiple try a time
self.connecting_to_master_node = 1 acquired = self.connecting_to_master_node.acquire(blocking=0)
from neo.client.handler import ClientEventHandler if acquired:
if app.pt is not None: try:
app.pt.clear() from neo.client.handler import ClientEventHandler
master_index = 0 if app.pt is not None:
t = 0 app.pt.clear()
conn = None master_index = 0
# Make application execute remaining message if any t = 0
app._waitMessage() conn = None
handler = ClientEventHandler(app, app.dispatcher) # Make application execute remaining message if any
while 1: app._waitMessage()
if t + 1 < time(): handler = ClientEventHandler(app, app.dispatcher)
if app.pt is not None and app.pt.operational(): while 1:
# Connected to primary master node and got all informations if t + 1 < time():
break if app.pt is not None and app.pt.operational():
app.local_var.node_not_ready = 0 # Connected to primary master node and got all informations
if app.primary_master_node is None: break
# Try with master node defined in config app.local_var.node_not_ready = 0
try: if app.primary_master_node is None:
addr, port = app.master_node_list[master_index].split(':') # Try with master node defined in config
except IndexError: try:
master_index = 0 addr, port = app.master_node_list[master_index].split(':')
addr, port = app.master_node_list[master_index].split(':') except IndexError:
port = int(port) master_index = 0
else: addr, port = app.master_node_list[master_index].split(':')
addr, port = app.primary_master_node.getServer() port = int(port)
# Request Node Identification else:
conn = MTClientConnection(app.em, handler, (addr, port), connector_handler=connector) addr, port = app.primary_master_node.getServer()
if app.nm.getNodeByServer((addr, port)) is None: # Request Node Identification
n = MasterNode(server = (addr, port)) conn = MTClientConnection(app.em, handler, (addr, port), connector_handler=connector)
app.nm.add(n) if app.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
app.nm.add(n)
conn.lock() conn.lock()
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, app.uuid, p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, app.uuid,
'0.0.0.0', 0, app.name) '0.0.0.0', 0, app.name)
# Send message # Send message
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.register(conn, msg_id, app.getQueue()) self.register(conn, msg_id, app.getQueue())
finally: finally:
conn.unlock() conn.unlock()
# Wait for answer # Wait for answer
while 1: while 1:
try: try:
self.em.poll(1) self.em.poll(1)
except TypeError: except TypeError:
break break
app._waitMessage() app._waitMessage()
# Now check result # Now check result
if app.primary_master_node is not None: if app.primary_master_node is not None:
if app.primary_master_node == -1: if app.primary_master_node == -1:
# Connection failed, try with another master node # Connection failed, try with another master node
app.primary_master_node = None app.primary_master_node = None
master_index += 1 master_index += 1
break break
elif app.primary_master_node.getServer() != (addr, port): elif app.primary_master_node.getServer() != (addr, port):
# Master node changed, connect to new one # Master node changed, connect to new one
break break
elif app.local_var.node_not_ready: elif app.local_var.node_not_ready:
# Wait a bit and reask again # Wait a bit and reask again
break break
elif app.pt is not None and app.pt.operational(): elif app.pt is not None and app.pt.operational():
# Connected to primary master node # Connected to primary master node
break break
t = time() t = time()
logging.info("connected to primary master node %s:%d" %app.primary_master_node.getServer()) logging.info("connected to primary master node %s:%d" %app.primary_master_node.getServer())
app.master_conn = conn app.master_conn = conn
self.connecting_to_master_node = 0 finally:
self.connecting_to_master_node.release()
...@@ -101,9 +101,7 @@ class ClientEventHandler(EventHandler): ...@@ -101,9 +101,7 @@ class ClientEventHandler(EventHandler):
elif self.app.primary_master_node is not None and uuid == \ elif self.app.primary_master_node is not None and uuid == \
self.app.primary_master_node.getUUID(): self.app.primary_master_node.getUUID():
logging.critical("connection to primary master node failed") logging.critical("connection to primary master node failed")
if self.dispatcher.connecting_to_master_node == 0: self.dispatcher.connectToPrimaryMasterNode(app, conn)
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app, conn)
else: else:
# Connection to a storage node failed # Connection to a storage node failed
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
...@@ -124,9 +122,7 @@ class ClientEventHandler(EventHandler): ...@@ -124,9 +122,7 @@ class ClientEventHandler(EventHandler):
app.master_conn.close() app.master_conn.close()
app.master_conn = None app.master_conn = None
app.primary_master_node = None app.primary_master_node = None
if self.dispatcher.connecting_to_master_node == 0: self.dispatcher.connectToPrimaryMasterNode(app, conn)
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app, conn)
else: else:
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
if isinstance(node, StorageNode): if isinstance(node, StorageNode):
...@@ -145,9 +141,7 @@ class ClientEventHandler(EventHandler): ...@@ -145,9 +141,7 @@ class ClientEventHandler(EventHandler):
app.primary_master_node = -1 app.primary_master_node = -1
if app.master_conn is not None and uuid == app.primary_master_node.getUUID(): if app.master_conn is not None and uuid == app.primary_master_node.getUUID():
logging.critical("connection timeout to primary master node expired") logging.critical("connection timeout to primary master node expired")
if self.dispatcher.connecting_to_master_node == 0: self.dispatcher.connectToPrimaryMasterNode(app, conn)
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app, conn)
else: else:
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
if isinstance(node, StorageNode): if isinstance(node, StorageNode):
...@@ -165,9 +159,7 @@ class ClientEventHandler(EventHandler): ...@@ -165,9 +159,7 @@ class ClientEventHandler(EventHandler):
app.primary_master_node = -1 app.primary_master_node = -1
if app.master_conn is not None and uuid == app.primary_master_node.getUUID(): if app.master_conn is not None and uuid == app.primary_master_node.getUUID():
logging.critical("primary master node is broken") logging.critical("primary master node is broken")
if self.dispatcher.connecting_to_master_node == 0: self.dispatcher.connectToPrimaryMasterNode(app, conn)
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app, conn)
else: else:
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
if isinstance(node, StorageNode): if isinstance(node, StorageNode):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment