Commit 51bce1a8 by Grégory Wisniewski

Admin node often act as a simple relay, simplify those cases.

......@@ -23,6 +23,21 @@ from neo.protocol import Packets, Errors
from neo.exception import PrimaryFailure
from neo.util import dump
def forward_ask(klass):
def wrapper(self, conn, *args, **kw):
app =
if app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.')
msg_id = app.master_conn.ask(klass(*args, **kw))
app.dispatcher.register(msg_id, conn, {'msg_id': conn.getPeerId()})
return wrapper
def forward_answer(klass):
def wrapper(self, conn, *args, **kw):
packet = klass(*args, **kw)
self._answerNeoCTL(conn, packet)
return wrapper
class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
......@@ -72,22 +87,6 @@ class AdminEventHandler(EventHandler):
msg_id =, conn, {'msg_id' : conn.getPeerId()})
def setClusterState(self, conn, state):
# forward to primary
if is None:
raise protocol.NotReadyError('Not connected to a primary master.')
p = Packets.SetClusterState(state)
msg_id =, conn, {'msg_id' : conn.getPeerId()})
def addPendingNodes(self, conn, uuid_list):
if is None:
raise protocol.NotReadyError('Not connected to a primary master.')'Add nodes %s' % [dump(uuid) for uuid in uuid_list])
# forward the request to primary
msg_id =, conn, {'msg_id' : conn.getPeerId()})
def askClusterState(self, conn):
if is None:
if is None:
......@@ -106,6 +105,10 @@ class AdminEventHandler(EventHandler):
master_node =
conn.answer(Packets.AnswerPrimary(master_node.getUUID(), []))
addPendingNodes = forward_ask(Packets.AddPendingNodes)
setClusterState = forward_ask(Packets.SetClusterState)
class MasterEventHandler(EventHandler):
""" This class is just used to dispacth message to right handler"""
......@@ -188,7 +191,7 @@ class MasterEventHandler(EventHandler):
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
def __answerNeoCTL(self, conn, packet):
def _answerNeoCTL(self, conn, packet):
msg_id = conn.getPeerId()
client_conn, kw =
......@@ -198,23 +201,13 @@ class MasterRequestEventHandler(EventHandler): = state
self.__answerNeoCTL(conn, Packets.AnswerClusterState(state))
def answerNewNodes(self, conn, uuid_list):"answerNewNodes for a conn")
self.__answerNeoCTL(conn, Packets.AnswerNewNodes(uuid_list))
def answerPartitionTable(self, conn, ptid, row_list):"answerPartitionTable for a conn")
client_conn, kw =
# sent client the partition table
def answerNodeState(self, conn, uuid, state):
Packets.AnswerNodeState(uuid, state))
def ack(self, conn, msg):
self.__answerNeoCTL(conn, Errors.Ack(msg))
def protocolError(self, conn, msg):
self.__answerNeoCTL(conn, Errors.ProtocolError(msg))
answerNewNodes = forward_answer(Packets.AnswerNewNodes)
answerNodeState = forward_answer(Packets.AnswerNodeState)
ack = forward_answer(Errors.Ack)
protocolError = forward_answer(Errors.ProtocolError)
