Commit 4fe937d1 authored by Julien Muchembled's avatar Julien Muchembled

admin: simplify code

parent cf55f66e
...@@ -18,8 +18,7 @@ from neo.lib import logging ...@@ -18,8 +18,7 @@ from neo.lib import logging
from neo.lib.app import BaseApplication, buildOptionParser from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.connection import ListeningConnection from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \ from .handler import AdminEventHandler, MasterEventHandler
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager from neo.lib.bootstrap import BootstrapManager
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
...@@ -54,7 +53,6 @@ class Application(BaseApplication): ...@@ -54,7 +53,6 @@ class Application(BaseApplication):
self.pt = None self.pt = None
self.uuid = config.get('nid') self.uuid = config.get('nid')
logging.node(self.name, self.uuid) logging.node(self.name, self.uuid)
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self) self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None self.cluster_state = None
self.reset() self.reset()
......
...@@ -20,28 +20,46 @@ from neo.lib.protocol import uuid_str, Packets ...@@ -20,28 +20,46 @@ from neo.lib.protocol import uuid_str, Packets
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
def check_primary_master(func): def AdminEventHandlerType(name, bases, d):
def wrapper(self, *args, **kw): def check_primary_master(func):
if self.app.master_conn is not None: def wrapper(self, *args, **kw):
return func(self, *args, **kw) if self.app.master_conn is not None:
raise protocol.NotReadyError('Not connected to a primary master.') return func(self, *args, **kw)
return wrapper raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return check_primary_master(lambda self, conn, *args, **kw: def forward_ask(klass):
self.app.master_conn.ask(klass(*args, **kw), return lambda self, conn, *args: self.app.master_conn.ask(
conn=conn, msg_id=conn.getPeerId())) klass(*args), conn=conn, msg_id=conn.getPeerId())
del d['__metaclass__']
for x in (
Packets.AddPendingNodes,
Packets.AskLastIDs,
Packets.AskLastTransaction,
Packets.AskRecovery,
Packets.CheckReplicas,
Packets.Repair,
Packets.SetClusterState,
Packets.SetNodeState,
Packets.SetNumReplicas,
Packets.Truncate,
Packets.TweakPartitionTable,
):
d[x.handler_method_name] = forward_ask(x)
return type(name, bases, {k: v if k[0] == '_' else check_primary_master(v)
for k, v in d.iteritems()})
class AdminEventHandler(EventHandler): class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster.""" """This class deals with events for administrating cluster."""
@check_primary_master __metaclass__ = AdminEventHandlerType
def askPartitionList(self, conn, min_offset, max_offset, uuid): def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s", logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid)) min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid) self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
def askNodeList(self, conn, node_type): def askNodeList(self, conn, node_type):
if node_type is None: if node_type is None:
node_type = 'all' node_type = 'all'
...@@ -54,37 +72,22 @@ class AdminEventHandler(EventHandler): ...@@ -54,37 +72,22 @@ class AdminEventHandler(EventHandler):
p = Packets.AnswerNodeList(node_information_list) p = Packets.AnswerNodeList(node_information_list)
conn.answer(p) conn.answer(p)
@check_primary_master
def askClusterState(self, conn): def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state)) conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master
def askPrimary(self, conn): def askPrimary(self, conn):
master_node = self.app.master_node master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID())) conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
@check_primary_master
def flushLog(self, conn): def flushLog(self, conn):
self.app.master_conn.send(Packets.FlushLog()) self.app.master_conn.send(Packets.FlushLog())
super(AdminEventHandler, self).flushLog(conn) super(AdminEventHandler, self).flushLog(conn)
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
askRecovery = forward_ask(Packets.AskRecovery)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
setNumReplicas = forward_ask(Packets.SetNumReplicas)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
""" This class is just used to dispatch message to right handler""" """ This class is just used to dispatch message to right handler"""
def _connectionLost(self, conn): def connectionClosed(self, conn):
app = self.app app = self.app
if app.listening_conn: # if running if app.listening_conn: # if running
assert app.master_conn in (conn, None) assert app.master_conn in (conn, None)
...@@ -93,38 +96,22 @@ class MasterEventHandler(EventHandler): ...@@ -93,38 +96,22 @@ class MasterEventHandler(EventHandler):
app.uuid = None app.uuid = None
raise PrimaryFailure raise PrimaryFailure
def connectionFailed(self, conn):
self._connectionLost(conn)
def connectionClosed(self, conn):
self._connectionLost(conn)
def dispatch(self, conn, packet, kw={}): def dispatch(self, conn, packet, kw={}):
if 'conn' in kw: forward = kw.get('conn')
# expected answer if forward is None:
if packet.isResponse():
packet.setId(kw['msg_id'])
kw['conn'].answer(packet)
else:
self.app.request_handler.dispatch(conn, packet, kw)
else:
# unexpected answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet, kw) super(MasterEventHandler, self).dispatch(conn, packet, kw)
else:
packet.setId(kw['msg_id'])
forward.answer(packet)
def answerClusterState(self, conn, state): def answerClusterState(self, conn, state):
self.app.cluster_state = state self.app.cluster_state = state
notifyClusterInformation = answerClusterState
def sendPartitionTable(self, conn, ptid, num_replicas, row_list): def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable) pt = self.app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm) pt.load(ptid, num_replicas, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list): def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm) self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
# XXX: to be deleted ?
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment