Commit 573ebb44 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Admin node use bootstrap manager to connect to the primary master.

Fix primary master reconnection on lost.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@853 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ed6e83e2
...@@ -25,8 +25,10 @@ from neo.event import EventManager ...@@ -25,8 +25,10 @@ from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
from neo.exception import PrimaryFailure from neo.exception import PrimaryFailure
from neo.admin.handler import MasterMonitoringEventHandler, AdminEventHandler, \ from neo.admin.handler import MasterMonitoringEventHandler, AdminEventHandler, \
MasterBootstrapEventHandler, MasterRequestEventHandler MasterEventHandler, MasterRequestEventHandler
from neo.connector import getConnectorHandler, ConnectorConnectionClosedException from neo.connector import getConnectorHandler, ConnectorConnectionClosedException
from neo.bootstrap import BootstrapManager
from neo.pt import PartitionTable
from neo import protocol from neo import protocol
class Dispatcher: class Dispatcher:
...@@ -117,60 +119,32 @@ class Application(object): ...@@ -117,60 +119,32 @@ class Application(object):
Note that I do not accept any connection from non-master nodes Note that I do not accept any connection from non-master nodes
at this stage.""" at this stage."""
logging.info('connecting to a primary master node')
handler = MasterBootstrapEventHandler(self)
em = self.em
nm = self.nm
# First of all, make sure that I have no connection. # First of all, make sure that I have no connection.
for conn in em.getConnectionList(): for conn in self.em.getConnectionList():
if not conn.isListeningConnection(): if not conn.isListeningConnection():
conn.close() conn.close()
index = 0 # search, find, connect and identify to the primary master
self.trying_master_node = None bootstrap = BootstrapManager(self, self.name, protocol.ADMIN_NODE_TYPE,
self.primary_master_node = None self.uuid, self.server)
self.master_conn = None data = bootstrap.getPrimaryConnection(self.connector_handler)
t = 0 (node, conn, uuid, num_partitions, num_replicas) = data
while 1: self.master_node = node
try: self.master_conn = conn
em.poll(1) self.uuid = uuid
except ConnectorConnectionClosedException:
self.primary_master_node = None if self.num_partitions is None:
continue self.num_partitions = num_partitions
if self.primary_master_node is not None: self.num_replicas = num_replicas
# If I know which is a primary master node, check if self.pt = PartitionTable(num_partitions, num_replicas)
# I have a connection to it already. elif self.num_partitions != num_partitions:
for conn in em.getConnectionList(): raise RuntimeError('the number of partitions is inconsistent')
if not conn.isListeningConnection() and not conn.isServerConnection(): elif self.num_replicas != num_replicas:
uuid = conn.getUUID() raise RuntimeError('the number of replicas is inconsistent')
if uuid is not None:
node = nm.getNodeByUUID(uuid) # passive handler
if node is self.primary_master_node: self.master_conn.setHandler(MasterEventHandler(self))
logging.info("connected to primary master node %s:%d" % node.getServer())
self.master_conn = conn
# Yes, I have.
return
if self.trying_master_node is None and t + 1 < time():
# Choose a master node to connect to.
if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node
else:
# Otherwise, check one by one.
master_list = nm.getMasterNodeList()
try:
self.trying_master_node = master_list[index]
except IndexError:
index = 0
self.trying_master_node = master_list[0]
index += 1
ClientConnection(em, handler, \
addr = self.trying_master_node.getServer(),
connector_handler = self.connector_handler)
t = time()
def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id): def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id):
# we have a pt # we have a pt
......
...@@ -118,29 +118,8 @@ class AdminEventHandler(EventHandler): ...@@ -118,29 +118,8 @@ class AdminEventHandler(EventHandler):
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
""" This class is just used to dispacth message to right handler""" """ This class is just used to dispacth message to right handler"""
def dispatch(self, conn, packet):
if self.app.dispatcher.registered(packet.getId()):
# answer to a request
self.app.request_handler.dispatch(conn, packet)
else:
# monitoring phase
self.app.monitoring_handler.dispatch(conn, packet)
class MasterBaseEventHandler(EventHandler):
""" This is the base class for connection to primary master node"""
def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted."""
raise UnexpectedPacketError
def _connectionLost(self, conn): def _connectionLost(self, conn):
app = self.app raise PrimaryFailure
if app.primary_master_node and conn.getUUID() == app.primary_master_node.getUUID():
raise PrimaryFailure
if app.trying_master_node is app.primary_master_node:
app.primary_master_node = None
app.trying_master_node = None
def connectionFailed(self, conn): def connectionFailed(self, conn):
self._connectionLost(conn) self._connectionLost(conn)
...@@ -158,6 +137,18 @@ class MasterBaseEventHandler(EventHandler): ...@@ -158,6 +137,18 @@ class MasterBaseEventHandler(EventHandler):
self._connectionLost(conn) self._connectionLost(conn)
EventHandler.peerBroken(self, conn) EventHandler.peerBroken(self, conn)
def dispatch(self, conn, packet):
if self.app.dispatcher.registered(packet.getId()):
# answer to a request
self.app.request_handler.dispatch(conn, packet)
else:
# monitoring phase
self.app.monitoring_handler.dispatch(conn, packet)
class MasterBaseEventHandler(EventHandler):
""" This is the base class for connection to primary master node"""
@decorators.identification_required @decorators.identification_required
def handleNotifyClusterInformation(self, con, packet, cluster_state): def handleNotifyClusterInformation(self, con, packet, cluster_state):
self.app.cluster_state = cluster_state self.app.cluster_state = cluster_state
...@@ -262,117 +253,6 @@ class MasterRequestEventHandler(MasterBaseEventHandler): ...@@ -262,117 +253,6 @@ class MasterRequestEventHandler(MasterBaseEventHandler):
client_conn.notify(p, kw['msg_id']) client_conn.notify(p, kw['msg_id'])
class MasterBootstrapEventHandler(MasterBaseEventHandler):
"""This class manage the bootstrap part to the primary master node"""
def connectionCompleted(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection completed while not trying to connect')
# Ask a primary master.
conn.ask(protocol.askPrimaryMaster())
EventHandler.connectionCompleted(self, conn)
def handleNotReady(self, conn, packet, message):
app = self.app
if app.trying_master_node is not None:
app.trying_master_node = None
conn.close()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', ip_address, port)
app.nm.remove(node)
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
if app.num_partitions is None:
app.num_partitions = num_partitions
app.num_replicas = num_replicas
app.pt = PartitionTable(num_partitions, num_replicas)
elif app.num_partitions != num_partitions:
raise RuntimeError('the number of partitions is inconsistent')
elif app.num_replicas != num_replicas:
raise RuntimeError('the number of replicas is inconsistent')
conn.setUUID(uuid)
node.setUUID(uuid)
if your_uuid != INVALID_UUID:
# got an uuid from the primary master
app.uuid = your_uuid
conn.ask(protocol.askNodeInformation())
conn.ask(protocol.askPartitionTable([]))
logging.info("changing handler for master conn")
conn.setHandler(MasterEventHandler(self.app))
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
else:
n.setUUID(INVALID_UUID)
if primary_uuid != INVALID_UUID:
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
app.primary_master_node = primary_node
if app.trying_master_node is primary_node:
# I am connected to the right one.
logging.info('connected to a primary master node')
# This is a workaround to prevent handling of
# packets for the verification phase.
else:
app.trying_master_node = None
conn.close()
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name)
conn.ask(p)
class MasterMonitoringEventHandler(MasterBaseEventHandler): class MasterMonitoringEventHandler(MasterBaseEventHandler):
"""This class deals with events for monitoring cluster.""" """This class deals with events for monitoring cluster."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment