From ecb3ce1137f22367222971133f20a5df4d5d7a11 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Aur=C3=A9lien=20Calonne?= <aurel@nexedi.com>
Date: Mon, 5 Feb 2007 15:58:10 +0000
Subject: [PATCH] manage connection to primary master node in dispatcher try
 reconnection to primary master node when handling connection failure some
 variable fix

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@149 71dcc9de-d417-0410-9af5-da40c76e7ee4
---
 neo/client/NEOStorage.py | 11 +++--
 neo/client/app.py        | 73 ++---------------------------
 neo/client/dispatcher.py | 99 +++++++++++++++++++++++++++++++++++++++-
 neo/client/handler.py    | 41 +++++++++++------
 4 files changed, 135 insertions(+), 89 deletions(-)

diff --git a/neo/client/NEOStorage.py b/neo/client/NEOStorage.py
index a0468606..097b88ec 100644
--- a/neo/client/NEOStorage.py
+++ b/neo/client/NEOStorage.py
@@ -40,11 +40,14 @@ class NEOStorage(BaseStorage.BaseStorage,
         # Create dispatcher thread
         dispatcher = Dispatcher(em, message_queue, request_queue)
         dispatcher.setDaemon(True)
-        dispatcher.start()
         # Import here to prevent recursive import
         from neo.client.app import Application
         self.app = Application(master_nodes, name, em, dispatcher,
                                message_queue, request_queue)
+        # Connect to primary master node
+        dispatcher.connectToPrimaryMasterNode(self.app)
+        # Start dispatcher
+        dispatcher.start()
 
     def load(self, oid, version=None):
         try:
@@ -114,14 +117,14 @@ class NEOStorage(BaseStorage.BaseStorage,
             if self.app.conflict_serial <= self.app.tid:
                 # Try to resolve conflict only if conflicting serial is older
                 # than the current transaction ID
-                new_data = self.tryToResolveConflict(oid, 
+                new_data = self.tryToResolveConflict(oid,
                                                      self.app.conflict_serial,
                                                      serial, data)
                 if new_data is not None:
                     # Try again after conflict resolution
-                    self.store(oid, self.app.conflict_serial, 
+                    self.store(oid, self.app.conflict_serial,
                                new_data, version, transaction)
-                    return ConflictResolution.ResolvedSerial 
+                    return ConflictResolution.ResolvedSerial
             raise POSException.ConflictError(oid=oid,
                                              serials=(self.app.tid,
                                                       serial),data=data)
diff --git a/neo/client/app.py b/neo/client/app.py
index ed5f2c00..fb90f1f1 100644
--- a/neo/client/app.py
+++ b/neo/client/app.py
@@ -127,6 +127,7 @@ class Application(ThreadingMixIn, object):
         self.queue = message_queue
         self.request_queue = request_queue
         self.primary_master_node = None
+        self.master_node_list = master_nodes.split(' ')
         self.master_conn = None
         self.uuid = None
         self.mq_cache = MQ()
@@ -165,21 +166,6 @@ class Application(ThreadingMixIn, object):
                 if uuid != INVALID_UUID:
                     break
             self.uuid = uuid
-        # Connect to primary master node
-        self.master_node_list = master_nodes.split(' ')
-        while 1:
-            self.node_not_ready = 0
-            logging.info("trying to connect to primary master...")
-            self.connectToPrimaryMasterNode()
-            if not self.node_not_ready and self.pt.filled():
-                # got a connection and partition table
-                break
-            else:
-                # wait a bit before reasking
-                t = time()
-                while time() < t + 1:
-                    pass
-        logging.info("connected to primary master node")
 
     def _waitMessage(self,block=1):
         """Wait for a message returned by dispatcher in queues."""
@@ -193,6 +179,8 @@ class Application(ThreadingMixIn, object):
             if global_message is not None:
                 global_message[0].handler.dispatch(global_message[0], global_message[1])
         # Next get messages we are waiting for
+        if not hasattr(self.local_var, 'tmp_q'):
+            return
         message = None
         if block:
             message = self.local_var.tmp_q.get(True, None)
@@ -205,59 +193,6 @@ class Application(ThreadingMixIn, object):
         if message is not None:
             message[0].handler.dispatch(message[0], message[1])
 
-    def connectToPrimaryMasterNode(self):
-        """Connect to the primary master node."""
-        addr, port = self.master_node_list[0].split(':')
-        port = int(port)
-        handler = ClientEventHandler(self, self.dispatcher)
-        n = MasterNode(server = (addr, port))
-        self.nm.add(n)
-
-        # Connect to first master node defined and get primary master node
-        self.local_var.tmp_q = Queue(1)
-        if self.primary_master_node is None:
-            conn = ClientConnection(self.em, handler, (addr, port))
-            msg_id = conn.getNextId()
-            p = Packet()
-            p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
-                                        addr, port, self.name)
-            # send message to dispatcher
-            self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
-            self.primary_master_node = None
-            self.node_not_ready = 0
-
-            while 1:
-                self._waitMessage(block=0)
-                if self.primary_master_node == -1:
-                    raise NEOStorageError("Unable to initialize connection to master node %s:%d" %(addr, port))
-                if self.primary_master_node is not None:
-                    break
-                if self.node_not_ready:
-                    # must wait
-                    return
-        logging.info('primary master node is %s' %(self.primary_master_node.server,))
-        # Close connection if not already connected to primary master node
-        if self.primary_master_node.getServer() !=  (addr, port):
-            for conn in self.em.getConnectionList():
-                conn.close()
-
-            # Connect to primary master node
-            conn = ClientConnection(self.em, handler, self.primary_master_node.server)
-            msg_id = conn.getNextId()
-            p = Packet()
-            p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
-                                        self.primary_master_node.server[0],
-                                        self.primary_master_node.server[1] , self.name)
-            # send message to dispatcher
-            self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
-
-        self.master_conn = conn
-        # Wait for primary master node information
-        while 1:
-            self._waitMessage(block=0)
-            if self.pt.operational() or self.node_not_ready:
-                break
-
 
     def new_oid(self):
         """Get a new OID."""
@@ -313,7 +248,7 @@ class Application(ThreadingMixIn, object):
         shuffle(cell_list)
         self.local_var.asked_object = -1
         for cell in cell_list:
-            logging.debug('trying to load %s from %s', 
+            logging.debug('trying to load %s from %s',
                           dump(oid), dump(cell.getUUID()))
             conn = self.cm.getConnForNode(cell)
             if conn is None:
diff --git a/neo/client/dispatcher.py b/neo/client/dispatcher.py
index 7069dd42..56e4a675 100644
--- a/neo/client/dispatcher.py
+++ b/neo/client/dispatcher.py
@@ -1,8 +1,11 @@
 from threading import Thread
 from Queue import Empty, Queue
 
-from neo.protocol import PING, Packet
+from neo.protocol import PING, Packet, CLIENT_NODE_TYPE
+from neo.connection import ClientConnection
+from neo.node import MasterNode
 
+from time import time
 import logging
 
 class Dispatcher(Thread):
@@ -18,6 +21,8 @@ class Dispatcher(Thread):
         # This dict is used to associate conn/message id to client thread queue
         # and thus redispatch answer to the original thread
         self.message_table = {}
+        # Indicate if we are in process of connection to master node
+        self.connecting_to_master_node = 0
 
     def run(self):
         while 1:
@@ -35,6 +40,7 @@ class Dispatcher(Thread):
                     break
                 # Send message to waiting thread
                 key = "%s-%s" %(conn.getUUID(),packet.getId())
+                #logging.info('dispatcher got packet %s' %(key,))
                 if self.message_table.has_key(key):
                     tmp_q = self.message_table.pop(key)
                     tmp_q.put((conn, packet), True)
@@ -62,5 +68,96 @@ class Dispatcher(Thread):
             except Empty:
                 continue
 
+    def connectToPrimaryMasterNode(self, app):
+        """Connect to a primary master node.
+        This can be called either at bootstrap or when
+        client got disconnected during process"""
+        # Indicate we are trying to connect to avoid multiple try a time
+        self.connecting_to_master_node = 1
+        from neo.client.handler import ClientEventHandler
+        if app.pt is not None:
+            app.pt.clear()
+        master_index = 0
+        conn = None
+        # Make application execute remaining message if any
+        app._waitMessage(block=0)
+        handler = ClientEventHandler(app, app.dispatcher)
+        while 1:
+            if app.pt is not None and app.pt.operational():
+                # Connected to primary master node and got all informations
+                break
+            app.node_not_ready = 0
+            if app.primary_master_node is None:
+                # Try with master node defined in config
+                addr, port = app.master_node_list[master_index].split(':')
+                port = int(port)
+            else:
+                addr, port = app.primary_master_node.getServer()
+            # Request Node Identification
+            conn = ClientConnection(app.em, handler, (addr, port))
+            if app.nm.getNodeByServer((addr, port)) is None:
+                n = MasterNode(server = (addr, port))
+                app.nm.add(n)
+            msg_id = conn.getNextId()
+            p = Packet()
+            p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, app.uuid,
+                                        '0.0.0.0', 0, app.name)
+            # Send message
+            conn.addPacket(p)
+            conn.expectMessage(msg_id)
+            app.local_var.tmp_q = Queue(1)
+            # Wait for answer
+            while 1:
+                try:
+                    self.em.poll(1)
+                except TypeError:
+                    t = time()
+                    while time() < t + 1:
+                        pass
+                    break
+                # Check if we got a reply
+                try:
+                    conn, packet =  self.message.get_nowait()
+                    method_type = packet.getType()
+                    if method_type == PING:
+                        # Must answer with no delay
+                        conn.addPacket(Packet().pong(packet.getId()))
+                        break
+                    else:
+                        # Process message by handler
+                        conn.handler.dispatch(conn, packet)
+                except Empty:
+                    pass
 
+                # Now check result
+                if app.primary_master_node is not None:
+                    if app.primary_master_node == -1:
+                        # Connection failed, try with another master node
+                        app.primary_master_node = None
+                        master_index += 1
+                        break
+                    elif app.primary_master_node.getServer() != (addr, port):
+                        # Master node changed, connect to new one
+                        break
+                    elif app.node_not_ready:
+                        # Wait a bit and reask again
+                        t = time()
+                        while time() < t + 1:
+                            pass
+                        break
+                    elif app.pt is not None and app.pt.operational():
+                        # Connected to primary master node
+                        break
+
+                # If nothing, check if we have new message to send
+                try:
+                    m = self._message_queue.get_nowait()
+                    if m is not None:
+                        tmp_q, msg_id, conn, p = m
+                        conn.addPacket(p)
+                except Empty:
+                    continue
 
+        logging.info("connected to primary master node %s %d" %app.primary_master_node.getServer())
+        app.master_conn = conn
+        self.connecting_to_master_node = 0
diff --git a/neo/client/handler.py b/neo/client/handler.py
index ce1c6d8b..7d71a110 100644
--- a/neo/client/handler.py
+++ b/neo/client/handler.py
@@ -32,9 +32,12 @@ class ClientEventHandler(EventHandler):
         if app.primary_master_node is None:
             # Failed to connect to a master node
             app.primary_master_node = -1
-        elif uuid == self.app.primary_master_node.getUUID():
+        elif self.app.primary_master_node is not None and uuid == \
+                 self.app.primary_master_node.getUUID():
             logging.critical("connection to primary master node failed")
-            raise NEOStorageError("connection to primary master node failed")
+            if self.dispatcher.connecting_to_master_node == 0:
+                logging.critical("trying reconnection to master node...")
+                self.dispatcher.connectToPrimaryMasterNode(app)
         else:
             # Connection to a storage node failed
             app.storage_node = -1
@@ -42,12 +45,17 @@ class ClientEventHandler(EventHandler):
 
     def connectionClosed(self, conn):
         uuid = conn.getUUID()
-        if self.app.master_conn is None:
+        app = self.app
+        if app.master_conn is None:
             EventHandler.connectionClosed(self, conn)
-        elif uuid == self.app.master_conn.getUUID():
+        elif uuid == app.master_conn.getUUID():
             logging.critical("connection to primary master node closed")
-            # FIXME, client must try to connect to master node again
-            raise NEOStorageError("connection to primary master node closed")
+            # Close connection
+            app.master_conn.close()
+            app.master_conn = None
+            if self.dispatcher.connecting_to_master_node == 0:
+                logging.critical("trying reconnection to master node...")
+                self.dispatcher.connectToPrimaryMasterNode(app)
         else:
             app = self.app
             node = app.nm.getNodeByUUID(uuid)
@@ -66,13 +74,15 @@ class ClientEventHandler(EventHandler):
                 app.queue.put((None, msg_id, conn, p), True)
                 # Remove from pool connection
                 app.cm.removeConnection(node)
-        EventHandler.connectionClosed(self, conn)
+            EventHandler.connectionClosed(self, conn)
 
     def timeoutExpired(self, conn):
         uuid = conn.getUUID()
         if uuid == self.app.primary_master_node.getUUID():
             logging.critical("connection timeout to primary master node expired")
-            raise NEOStorageError("connection timeout to primary master node expired")
+            if self.dispatcher.connecting_to_master_node == 0:
+                logging.critical("trying reconnection to master node...")
+                self.dispatcher.connectToPrimaryMasterNode(app)
         else:
             app = self.app
             node = app.nm.getNodeByUUID(uuid)
@@ -94,7 +104,9 @@ class ClientEventHandler(EventHandler):
         uuid = conn.getUUID()
         if uuid == self.app.primary_master_node.getUUID():
             logging.critical("primary master node is broken")
-            raise NEOStorageError("primary master node is broken")
+            if self.dispatcher.connecting_to_master_node == 0:
+                logging.critical("trying reconnection to master node...")
+                self.dispatcher.connectToPrimaryMasterNode(app)
         else:
             app = self.app
             node = app.nm.getNodeByUUID(uuid)
@@ -246,13 +258,12 @@ class ClientEventHandler(EventHandler):
             nm = app.nm
             node = nm.getNodeByUUID(uuid)
             # This must be sent only by a primary master node.
-            # Note that this may be sent before I know that it is 
+            # Note that this may be sent before I know that it is
             # a primary master node.
             if not isinstance(node, MasterNode):
-                logging.warn('ignoring notify node information from %s', 
+                logging.warn('ignoring notify node information from %s',
                              dump(uuid))
                 return
-
             for node_type, ip_address, port, uuid, state in node_list:
                 # Register new nodes.
                 addr = (ip_address, port)
@@ -354,8 +365,8 @@ class ClientEventHandler(EventHandler):
             app._cache_lock_acquire()
             try:
                 for oid in oid_list:
-                    if app.cache.has_key(oid):
-                        del app.cache[oid]
+                    if app.mq_cache.has_key(oid):
+                        del app.mq_cache[oid]
             finally:
                 app._cache_lock_release()
         else:
@@ -371,7 +382,7 @@ class ClientEventHandler(EventHandler):
 
     def handleStopOperation(self, conn, packet):
         if isinstance(conn, ClientConnection):
-            raise NEOStorageError('operation stopped')
+            logging.critical("master node ask to stop operation")
         else:
             self.handleUnexpectedPacket(conn, packet)
 
-- 
2.30.9