Commit 185fa3b3 authored by Vincent Pelletier's avatar Vincent Pelletier

Move connectToPrimaryMasterNode from Dispatcher class to App class since it...

Move connectToPrimaryMasterNode from Dispatcher class to App class since it does not functionally belong to dispatcher.
This simplifies connectToPrimaryMasterNode code and startup code a bit:
 - connectToPrimaryMasterNode does not need to manually poll to get a response (we use normal code for this)
   This removes a difference between initial master connection and a reconnection.
 - make App instance get a workable dispatcher from the beginning
 - make App handle initial master connection instead of Storage
 - rename "connector" parameter of connectToPrimaryMasterNode into "connector_handler", since it's what it really is
Update caller (ClientEventHandler class).


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@320 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 186f65da
...@@ -44,11 +44,8 @@ class Storage(BaseStorage.BaseStorage, ...@@ -44,11 +44,8 @@ class Storage(BaseStorage.BaseStorage,
# Create dispatcher thread # Create dispatcher thread
dispatcher = Dispatcher(em) dispatcher = Dispatcher(em)
dispatcher.setDaemon(True) dispatcher.setDaemon(True)
self.app = Application(master_nodes, name, em, dispatcher, connector)
# Connect to primary master node
dispatcher.connectToPrimaryMasterNode(self.app, self.app.connector_handler)
# Start dispatcher
dispatcher.start() dispatcher.start()
self.app = Application(master_nodes, name, em, dispatcher, connector)
def load(self, oid, version=None): def load(self, oid, version=None):
try: try:
......
...@@ -213,6 +213,8 @@ class Application(object): ...@@ -213,6 +213,8 @@ class Application(object):
# _oid_lock is used in order to not call multiple oid # _oid_lock is used in order to not call multiple oid
# generation at the same time # generation at the same time
# _cache_lock is used for the client cache # _cache_lock is used for the client cache
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attemps
lock = Lock() lock = Lock()
self._load_lock_acquire = lock.acquire self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release self._load_lock_release = lock.release
...@@ -222,6 +224,9 @@ class Application(object): ...@@ -222,6 +224,9 @@ class Application(object):
lock = Lock() lock = Lock()
self._cache_lock_acquire = lock.acquire self._cache_lock_acquire = lock.acquire
self._cache_lock_release = lock.release self._cache_lock_release = lock.release
lock = Lock()
self._connecting_to_master_node_acquire = lock.acquire
self._connecting_to_master_node_release = lock.release
# XXX Generate an UUID for self. For now, just use a random string. # XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID. # Avoid an invalid UUID.
if self.uuid is None: if self.uuid is None:
...@@ -230,6 +235,8 @@ class Application(object): ...@@ -230,6 +235,8 @@ class Application(object):
if uuid != INVALID_UUID: if uuid != INVALID_UUID:
break break
self.uuid = uuid self.uuid = uuid
# Connect to master node
self.connectToPrimaryMasterNode(self.connector_handler)
def getQueue(self): def getQueue(self):
try: try:
...@@ -896,3 +903,79 @@ class Application(object): ...@@ -896,3 +903,79 @@ class Application(object):
def sync(self): def sync(self):
self._waitMessage() self._waitMessage()
def connectToPrimaryMasterNode(self, connector_handler):
"""Connect to a primary master node.
This can be called either at bootstrap or when
client got disconnected during process"""
# Indicate we are trying to connect to avoid multiple try a time
acquired = self._connecting_to_master_node_acquire(0)
if acquired:
try:
if self.pt is not None:
self.pt.clear()
master_index = 0
conn = None
# Make application execute remaining message if any
self._waitMessage()
handler = ClientEventHandler(self, self.dispatcher)
while 1:
self.local_var.node_not_ready = 0
if self.primary_master_node is None:
# Try with master node defined in config
try:
addr, port = self.master_node_list[master_index].split(':')
except IndexError:
master_index = 0
addr, port = self.master_node_list[master_index].split(':')
port = int(port)
else:
addr, port = self.primary_master_node.getServer()
# Request Node Identification
conn = MTClientConnection(self.em, handler, (addr, port), connector_handler=connector_handler)
if self.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
self.nm.add(n)
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
'0.0.0.0', 0, self.name)
# Send message
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue())
finally:
conn.unlock()
# Wait for answer
while 1:
self._waitMessage()
# Now check result
if self.primary_master_node is not None:
if self.primary_master_node == -1:
# Connection failed, try with another master node
self.primary_master_node = None
master_index += 1
break
elif self.primary_master_node.getServer() != (addr, port):
# Master node changed, connect to new one
break
elif self.local_var.node_not_ready:
# Wait a bit and reask again
break
elif self.pt is not None and self.pt.operational():
# Connected to primary master node
break
if self.pt is not None and self.pt.operational():
# Connected to primary master node and got all informations
break
sleep(1)
logging.info("connected to primary master node %s:%d" % self.primary_master_node.getServer())
self.master_conn = conn
finally:
self._connecting_to_master_node_release()
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Thread, Lock from threading import Thread
from Queue import Empty, Queue from Queue import Empty, Queue
from neo.protocol import PING, Packet, CLIENT_NODE_TYPE, FINISH_TRANSACTION from neo.protocol import PING, Packet, CLIENT_NODE_TYPE, FINISH_TRANSACTION
...@@ -23,7 +23,6 @@ from neo.connection import MTClientConnection ...@@ -23,7 +23,6 @@ from neo.connection import MTClientConnection
from neo.node import MasterNode from neo.node import MasterNode
from neo.client.handler import ClientEventHandler from neo.client.handler import ClientEventHandler
import time
import logging import logging
class Dispatcher(Thread): class Dispatcher(Thread):
...@@ -35,8 +34,6 @@ class Dispatcher(Thread): ...@@ -35,8 +34,6 @@ class Dispatcher(Thread):
# This dict is used to associate conn/message id to client thread queue # This dict is used to associate conn/message id to client thread queue
# and thus redispatch answer to the original thread # and thus redispatch answer to the original thread
self.message_table = {} self.message_table = {}
# Indicate if we are in process of connection to master node
self.connecting_to_master_node = Lock()
def run(self): def run(self):
while 1: while 1:
...@@ -65,82 +62,3 @@ class Dispatcher(Thread): ...@@ -65,82 +62,3 @@ class Dispatcher(Thread):
return True return True
return False return False
def connectToPrimaryMasterNode(self, app, connector):
"""Connect to a primary master node.
This can be called either at bootstrap or when
client got disconnected during process"""
# Indicate we are trying to connect to avoid multiple try a time
acquired = self.connecting_to_master_node.acquire(0)
if acquired:
try:
if app.pt is not None:
app.pt.clear()
master_index = 0
conn = None
# Make application execute remaining message if any
app._waitMessage()
handler = ClientEventHandler(app, app.dispatcher)
while 1:
app.local_var.node_not_ready = 0
if app.primary_master_node is None:
# Try with master node defined in config
try:
addr, port = app.master_node_list[master_index].split(':')
except IndexError:
master_index = 0
addr, port = app.master_node_list[master_index].split(':')
port = int(port)
else:
addr, port = app.primary_master_node.getServer()
# Request Node Identification
conn = MTClientConnection(app.em, handler, (addr, port), connector_handler=connector)
if app.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
app.nm.add(n)
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, app.uuid,
'0.0.0.0', 0, app.name)
# Send message
conn.addPacket(p)
conn.expectMessage(msg_id)
self.register(conn, msg_id, app.getQueue())
finally:
conn.unlock()
# Wait for answer
while 1:
try:
self.em.poll(1)
except TypeError:
break
app._waitMessage()
# Now check result
if app.primary_master_node is not None:
if app.primary_master_node == -1:
# Connection failed, try with another master node
app.primary_master_node = None
master_index += 1
break
elif app.primary_master_node.getServer() != (addr, port):
# Master node changed, connect to new one
break
elif app.local_var.node_not_ready:
# Wait a bit and reask again
break
elif app.pt is not None and app.pt.operational():
# Connected to primary master node
break
if app.pt is not None and app.pt.operational():
# Connected to primary master node and got all informations
break
time.sleep(1)
logging.info("connected to primary master node %s:%d" %app.primary_master_node.getServer())
app.master_conn = conn
finally:
self.connecting_to_master_node.release()
...@@ -103,7 +103,7 @@ class ClientEventHandler(BaseClientEventHandler): ...@@ -103,7 +103,7 @@ class ClientEventHandler(BaseClientEventHandler):
elif self.app.primary_master_node is not None and uuid == \ elif self.app.primary_master_node is not None and uuid == \
self.app.primary_master_node.getUUID(): self.app.primary_master_node.getUUID():
logging.critical("connection to primary master node failed") logging.critical("connection to primary master node failed")
self.dispatcher.connectToPrimaryMasterNode(app, conn) app.connectToPrimaryMasterNode(conn)
else: else:
# Connection to a storage node failed # Connection to a storage node failed
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
...@@ -124,7 +124,7 @@ class ClientEventHandler(BaseClientEventHandler): ...@@ -124,7 +124,7 @@ class ClientEventHandler(BaseClientEventHandler):
app.master_conn.close() app.master_conn.close()
app.master_conn = None app.master_conn = None
app.primary_master_node = None app.primary_master_node = None
self.dispatcher.connectToPrimaryMasterNode(app, conn) app.connectToPrimaryMasterNode(conn)
else: else:
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
...@@ -143,7 +143,7 @@ class ClientEventHandler(BaseClientEventHandler): ...@@ -143,7 +143,7 @@ class ClientEventHandler(BaseClientEventHandler):
app.primary_master_node = -1 app.primary_master_node = -1
elif app.master_conn is not None and uuid == app.primary_master_node.getUUID(): elif app.master_conn is not None and uuid == app.primary_master_node.getUUID():
logging.critical("connection timeout to primary master node expired") logging.critical("connection timeout to primary master node expired")
self.dispatcher.connectToPrimaryMasterNode(app, conn) app.connectToPrimaryMasterNode(conn)
else: else:
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
...@@ -161,7 +161,7 @@ class ClientEventHandler(BaseClientEventHandler): ...@@ -161,7 +161,7 @@ class ClientEventHandler(BaseClientEventHandler):
app.primary_master_node = -1 app.primary_master_node = -1
elif app.master_conn is not None and uuid == app.primary_master_node.getUUID(): elif app.master_conn is not None and uuid == app.primary_master_node.getUUID():
logging.critical("primary master node is broken") logging.critical("primary master node is broken")
self.dispatcher.connectToPrimaryMasterNode(app, conn) app.connectToPrimaryMasterNode(conn)
else: else:
node = app.nm.getNodeByServer(conn.getAddress()) node = app.nm.getNodeByServer(conn.getAddress())
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment