Commit 5c41439c authored by Grégory Wisniewski's avatar Grégory Wisniewski

Bug fix: Client nodes fail reconnect to primary master. Now the master connection

is initiated at client loading to retreive the partition table, then on demand
if lost.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@590 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 2b25ca85
...@@ -268,10 +268,7 @@ class Application(object): ...@@ -268,10 +268,7 @@ class Application(object):
lock = Lock() lock = Lock()
self._pt_acquire = lock.acquire self._pt_acquire = lock.acquire
self._pt_release = lock.release self._pt_release = lock.release
# Connect to master node self.master_conn = self._getMasterConnection()
self.connectToPrimaryMasterNode()
if self.uuid == INVALID_UUID:
raise NEOStorageError('No UUID given from the primary master')
def _waitMessage(self, target_conn = None, msg_id = None, handler=None): def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
"""Wait for a message returned by the dispatcher in queues.""" """Wait for a message returned by the dispatcher in queues."""
...@@ -310,9 +307,7 @@ class Application(object): ...@@ -310,9 +307,7 @@ class Application(object):
def _askPrimary(self, packet, timeout=5, additional_timeout=30): def _askPrimary(self, packet, timeout=5, additional_timeout=30):
""" Send a request to the primary master and process it's answer """ """ Send a request to the primary master and process it's answer """
if self.master_conn is None: conn = self._getMasterConnection()
raise NEOStorageError("Connection to master node failed")
conn = self.master_conn
conn.lock() conn.lock()
try: try:
msg_id = conn.ask(packet, timeout, additional_timeout) msg_id = conn.ask(packet, timeout, additional_timeout)
...@@ -321,6 +316,12 @@ class Application(object): ...@@ -321,6 +316,12 @@ class Application(object):
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler) self._waitMessage(conn, msg_id, self.primary_handler)
def _getMasterConnection(self):
""" Connect to the primary master node on demand """
if self.master_conn is None:
self.master_conn = self.connectToPrimaryMasterNode()
return self.master_conn
def registerDB(self, db, limit): def registerDB(self, db, limit):
self._db = db self._db = db
...@@ -626,7 +627,7 @@ class Application(object): ...@@ -626,7 +627,7 @@ class Application(object):
conn.unlock() conn.unlock()
# Abort the transaction in the primary master node. # Abort the transaction in the primary master node.
conn = self.master_conn conn = self._getMasterConnection()
conn.lock() conn.lock()
try: try:
conn.notify(protocol.abortTransaction(self.local_var.tid)) conn.notify(protocol.abortTransaction(self.local_var.tid))
...@@ -913,15 +914,10 @@ class Application(object): ...@@ -913,15 +914,10 @@ class Application(object):
self._waitMessage() self._waitMessage()
def connectToPrimaryMasterNode(self): def connectToPrimaryMasterNode(self):
self.master_conn = None
logging.debug('connecting to primary master...') logging.debug('connecting to primary master...')
# acquire the lock to allow only one thread to connect to the primary # acquire the lock to allow only one thread to connect to the primary
lock = self._connecting_to_master_node_acquire(1) lock = self._connecting_to_master_node_acquire(1)
try: try:
if self.master_conn is not None:
# another thread has done the job
logging.debug('already connected')
return
if self.pt is not None: if self.pt is not None:
# pt is protected with the master lock # pt is protected with the master lock
self.pt.clear() self.pt.clear()
...@@ -981,14 +977,15 @@ class Application(object): ...@@ -981,14 +977,15 @@ class Application(object):
elif self.pt is not None and self.pt.operational(): elif self.pt is not None and self.pt.operational():
# Connected to primary master node # Connected to primary master node
break break
if self.pt is not None and self.pt.operational(): if self.pt is not None and self.pt.operational() \
and self.uuid != INVALID_UUID:
# Connected to primary master node and got all informations # Connected to primary master node and got all informations
break break
sleep(1) sleep(1)
logging.info("connected to primary master node %s" % self.primary_master_node) logging.info("connected to primary master node %s" % self.primary_master_node)
conn.setHandler(PrimaryNotificationsHandler(self, self.dispatcher)) conn.setHandler(PrimaryNotificationsHandler(self, self.dispatcher))
self.master_conn = conn return conn
finally: finally:
self._connecting_to_master_node_release() self._connecting_to_master_node_release()
......
...@@ -266,17 +266,14 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -266,17 +266,14 @@ class PrimaryNotificationsHandler(BaseHandler):
app.master_conn.close() app.master_conn.close()
app.master_conn = None app.master_conn = None
app.primary_master_node = None app.primary_master_node = None
app.connectToPrimaryMasterNode()
BaseHandler.connectionClosed(self, conn) BaseHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired") logging.critical("connection timeout to primary master node expired")
self.app.connectToPrimaryMasterNode()
BaseHandler.timeoutExpired(self, conn) BaseHandler.timeoutExpired(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
logging.critical("primary master node is broken") logging.critical("primary master node is broken")
self.app.connectToPrimaryMasterNode()
BaseHandler.peerBroken(self, conn) BaseHandler.peerBroken(self, conn)
def handleStopOperation(self, conn, packet): def handleStopOperation(self, conn, packet):
...@@ -362,17 +359,14 @@ class PrimaryAnswersHandler(BaseHandler): ...@@ -362,17 +359,14 @@ class PrimaryAnswersHandler(BaseHandler):
app.master_conn.close() app.master_conn.close()
app.master_conn = None app.master_conn = None
app.primary_master_node = None app.primary_master_node = None
app.connectToPrimaryMasterNode()
super(PrimaryAnswersHandler, self).connectionClosed(conn) super(PrimaryAnswersHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired") logging.critical("connection timeout to primary master node expired")
self.app.connectToPrimaryMasterNode()
super(PrimaryAnswersHandler, self).timeoutExpired(conn) super(PrimaryAnswersHandler, self).timeoutExpired(conn)
def peerBroken(self, conn): def peerBroken(self, conn):
logging.critical("primary master node is broken") logging.critical("primary master node is broken")
self.app.connectToPrimaryMasterNode()
super(PrimaryAnswersHandler, self).peerBroken(conn) super(PrimaryAnswersHandler, self).peerBroken(conn)
def handleAnswerNewTID(self, conn, packet, tid): def handleAnswerNewTID(self, conn, packet, tid):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment