Commit ecb3ce11 authored by Aurel's avatar Aurel

manage connection to primary master node in dispatcher

try reconnection to primary master node when handling connection
failure
some variable fix


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@149 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 475ecad0
......@@ -40,11 +40,14 @@ class NEOStorage(BaseStorage.BaseStorage,
# Create dispatcher thread
dispatcher = Dispatcher(em, message_queue, request_queue)
dispatcher.setDaemon(True)
dispatcher.start()
# Import here to prevent recursive import
from neo.client.app import Application
self.app = Application(master_nodes, name, em, dispatcher,
message_queue, request_queue)
# Connect to primary master node
dispatcher.connectToPrimaryMasterNode(self.app)
# Start dispatcher
dispatcher.start()
def load(self, oid, version=None):
try:
......@@ -114,14 +117,14 @@ class NEOStorage(BaseStorage.BaseStorage,
if self.app.conflict_serial <= self.app.tid:
# Try to resolve conflict only if conflicting serial is older
# than the current transaction ID
new_data = self.tryToResolveConflict(oid,
new_data = self.tryToResolveConflict(oid,
self.app.conflict_serial,
serial, data)
if new_data is not None:
# Try again after conflict resolution
self.store(oid, self.app.conflict_serial,
self.store(oid, self.app.conflict_serial,
new_data, version, transaction)
return ConflictResolution.ResolvedSerial
return ConflictResolution.ResolvedSerial
raise POSException.ConflictError(oid=oid,
serials=(self.app.tid,
serial),data=data)
......
......@@ -127,6 +127,7 @@ class Application(ThreadingMixIn, object):
self.queue = message_queue
self.request_queue = request_queue
self.primary_master_node = None
self.master_node_list = master_nodes.split(' ')
self.master_conn = None
self.uuid = None
self.mq_cache = MQ()
......@@ -165,21 +166,6 @@ class Application(ThreadingMixIn, object):
if uuid != INVALID_UUID:
break
self.uuid = uuid
# Connect to primary master node
self.master_node_list = master_nodes.split(' ')
while 1:
self.node_not_ready = 0
logging.info("trying to connect to primary master...")
self.connectToPrimaryMasterNode()
if not self.node_not_ready and self.pt.filled():
# got a connection and partition table
break
else:
# wait a bit before reasking
t = time()
while time() < t + 1:
pass
logging.info("connected to primary master node")
def _waitMessage(self,block=1):
"""Wait for a message returned by dispatcher in queues."""
......@@ -193,6 +179,8 @@ class Application(ThreadingMixIn, object):
if global_message is not None:
global_message[0].handler.dispatch(global_message[0], global_message[1])
# Next get messages we are waiting for
if not hasattr(self.local_var, 'tmp_q'):
return
message = None
if block:
message = self.local_var.tmp_q.get(True, None)
......@@ -205,59 +193,6 @@ class Application(ThreadingMixIn, object):
if message is not None:
message[0].handler.dispatch(message[0], message[1])
def connectToPrimaryMasterNode(self):
"""Connect to the primary master node."""
addr, port = self.master_node_list[0].split(':')
port = int(port)
handler = ClientEventHandler(self, self.dispatcher)
n = MasterNode(server = (addr, port))
self.nm.add(n)
# Connect to first master node defined and get primary master node
self.local_var.tmp_q = Queue(1)
if self.primary_master_node is None:
conn = ClientConnection(self.em, handler, (addr, port))
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
addr, port, self.name)
# send message to dispatcher
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.primary_master_node = None
self.node_not_ready = 0
while 1:
self._waitMessage(block=0)
if self.primary_master_node == -1:
raise NEOStorageError("Unable to initialize connection to master node %s:%d" %(addr, port))
if self.primary_master_node is not None:
break
if self.node_not_ready:
# must wait
return
logging.info('primary master node is %s' %(self.primary_master_node.server,))
# Close connection if not already connected to primary master node
if self.primary_master_node.getServer() != (addr, port):
for conn in self.em.getConnectionList():
conn.close()
# Connect to primary master node
conn = ClientConnection(self.em, handler, self.primary_master_node.server)
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
self.primary_master_node.server[0],
self.primary_master_node.server[1] , self.name)
# send message to dispatcher
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.master_conn = conn
# Wait for primary master node information
while 1:
self._waitMessage(block=0)
if self.pt.operational() or self.node_not_ready:
break
def new_oid(self):
"""Get a new OID."""
......@@ -313,7 +248,7 @@ class Application(ThreadingMixIn, object):
shuffle(cell_list)
self.local_var.asked_object = -1
for cell in cell_list:
logging.debug('trying to load %s from %s',
logging.debug('trying to load %s from %s',
dump(oid), dump(cell.getUUID()))
conn = self.cm.getConnForNode(cell)
if conn is None:
......
from threading import Thread
from Queue import Empty, Queue
from neo.protocol import PING, Packet
from neo.protocol import PING, Packet, CLIENT_NODE_TYPE
from neo.connection import ClientConnection
from neo.node import MasterNode
from time import time
import logging
class Dispatcher(Thread):
......@@ -18,6 +21,8 @@ class Dispatcher(Thread):
# This dict is used to associate conn/message id to client thread queue
# and thus redispatch answer to the original thread
self.message_table = {}
# Indicate if we are in process of connection to master node
self.connecting_to_master_node = 0
def run(self):
while 1:
......@@ -35,6 +40,7 @@ class Dispatcher(Thread):
break
# Send message to waiting thread
key = "%s-%s" %(conn.getUUID(),packet.getId())
#logging.info('dispatcher got packet %s' %(key,))
if self.message_table.has_key(key):
tmp_q = self.message_table.pop(key)
tmp_q.put((conn, packet), True)
......@@ -62,5 +68,96 @@ class Dispatcher(Thread):
except Empty:
continue
def connectToPrimaryMasterNode(self, app):
"""Connect to a primary master node.
This can be called either at bootstrap or when
client got disconnected during process"""
# Indicate we are trying to connect to avoid multiple try a time
self.connecting_to_master_node = 1
from neo.client.handler import ClientEventHandler
if app.pt is not None:
app.pt.clear()
master_index = 0
conn = None
# Make application execute remaining message if any
app._waitMessage(block=0)
handler = ClientEventHandler(app, app.dispatcher)
while 1:
if app.pt is not None and app.pt.operational():
# Connected to primary master node and got all informations
break
app.node_not_ready = 0
if app.primary_master_node is None:
# Try with master node defined in config
addr, port = app.master_node_list[master_index].split(':')
port = int(port)
else:
addr, port = app.primary_master_node.getServer()
# Request Node Identification
conn = ClientConnection(app.em, handler, (addr, port))
if app.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
app.nm.add(n)
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, app.uuid,
'0.0.0.0', 0, app.name)
# Send message
conn.addPacket(p)
conn.expectMessage(msg_id)
app.local_var.tmp_q = Queue(1)
# Wait for answer
while 1:
try:
self.em.poll(1)
except TypeError:
t = time()
while time() < t + 1:
pass
break
# Check if we got a reply
try:
conn, packet = self.message.get_nowait()
method_type = packet.getType()
if method_type == PING:
# Must answer with no delay
conn.addPacket(Packet().pong(packet.getId()))
break
else:
# Process message by handler
conn.handler.dispatch(conn, packet)
except Empty:
pass
# Now check result
if app.primary_master_node is not None:
if app.primary_master_node == -1:
# Connection failed, try with another master node
app.primary_master_node = None
master_index += 1
break
elif app.primary_master_node.getServer() != (addr, port):
# Master node changed, connect to new one
break
elif app.node_not_ready:
# Wait a bit and reask again
t = time()
while time() < t + 1:
pass
break
elif app.pt is not None and app.pt.operational():
# Connected to primary master node
break
# If nothing, check if we have new message to send
try:
m = self._message_queue.get_nowait()
if m is not None:
tmp_q, msg_id, conn, p = m
conn.addPacket(p)
except Empty:
continue
logging.info("connected to primary master node %s %d" %app.primary_master_node.getServer())
app.master_conn = conn
self.connecting_to_master_node = 0
......@@ -32,9 +32,12 @@ class ClientEventHandler(EventHandler):
if app.primary_master_node is None:
# Failed to connect to a master node
app.primary_master_node = -1
elif uuid == self.app.primary_master_node.getUUID():
elif self.app.primary_master_node is not None and uuid == \
self.app.primary_master_node.getUUID():
logging.critical("connection to primary master node failed")
raise NEOStorageError("connection to primary master node failed")
if self.dispatcher.connecting_to_master_node == 0:
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app)
else:
# Connection to a storage node failed
app.storage_node = -1
......@@ -42,12 +45,17 @@ class ClientEventHandler(EventHandler):
def connectionClosed(self, conn):
uuid = conn.getUUID()
if self.app.master_conn is None:
app = self.app
if app.master_conn is None:
EventHandler.connectionClosed(self, conn)
elif uuid == self.app.master_conn.getUUID():
elif uuid == app.master_conn.getUUID():
logging.critical("connection to primary master node closed")
# FIXME, client must try to connect to master node again
raise NEOStorageError("connection to primary master node closed")
# Close connection
app.master_conn.close()
app.master_conn = None
if self.dispatcher.connecting_to_master_node == 0:
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app)
else:
app = self.app
node = app.nm.getNodeByUUID(uuid)
......@@ -66,13 +74,15 @@ class ClientEventHandler(EventHandler):
app.queue.put((None, msg_id, conn, p), True)
# Remove from pool connection
app.cm.removeConnection(node)
EventHandler.connectionClosed(self, conn)
EventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
uuid = conn.getUUID()
if uuid == self.app.primary_master_node.getUUID():
logging.critical("connection timeout to primary master node expired")
raise NEOStorageError("connection timeout to primary master node expired")
if self.dispatcher.connecting_to_master_node == 0:
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app)
else:
app = self.app
node = app.nm.getNodeByUUID(uuid)
......@@ -94,7 +104,9 @@ class ClientEventHandler(EventHandler):
uuid = conn.getUUID()
if uuid == self.app.primary_master_node.getUUID():
logging.critical("primary master node is broken")
raise NEOStorageError("primary master node is broken")
if self.dispatcher.connecting_to_master_node == 0:
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app)
else:
app = self.app
node = app.nm.getNodeByUUID(uuid)
......@@ -246,13 +258,12 @@ class ClientEventHandler(EventHandler):
nm = app.nm
node = nm.getNodeByUUID(uuid)
# This must be sent only by a primary master node.
# Note that this may be sent before I know that it is
# Note that this may be sent before I know that it is
# a primary master node.
if not isinstance(node, MasterNode):
logging.warn('ignoring notify node information from %s',
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
......@@ -354,8 +365,8 @@ class ClientEventHandler(EventHandler):
app._cache_lock_acquire()
try:
for oid in oid_list:
if app.cache.has_key(oid):
del app.cache[oid]
if app.mq_cache.has_key(oid):
del app.mq_cache[oid]
finally:
app._cache_lock_release()
else:
......@@ -371,7 +382,7 @@ class ClientEventHandler(EventHandler):
def handleStopOperation(self, conn, packet):
if isinstance(conn, ClientConnection):
raise NEOStorageError('operation stopped')
logging.critical("master node ask to stop operation")
else:
self.handleUnexpectedPacket(conn, packet)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment