Commit 76abb6ef authored by Julien Muchembled's avatar Julien Muchembled

admin: abort all unanswered requests when connection to master is lost

Admin aborts by sending an error packet for each request.
Without this, the neoctl would wait forever.
parent 34c276b8
...@@ -29,29 +29,6 @@ from neo.lib.pt import PartitionTable ...@@ -29,29 +29,6 @@ from neo.lib.pt import PartitionTable
from neo.lib.protocol import NodeTypes, NodeStates, Packets, Errors from neo.lib.protocol import NodeTypes, NodeStates, Packets, Errors
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
class Dispatcher:
"""Dispatcher use to redirect master request to handler"""
def __init__(self):
# associate conn/message_id to dispatch
# message to connection
self.message_table = {}
def register(self, msg_id, conn, kw=None):
self.message_table[msg_id] = conn, kw
def pop(self, msg_id):
return self.message_table.pop(msg_id)
def registered(self, msg_id):
return self.message_table.has_key(msg_id)
def clear(self):
"""
Unregister packet expected for a given connection
"""
self.message_table.clear()
class Application(object): class Application(object):
"""The storage node application.""" """The storage node application."""
...@@ -74,7 +51,6 @@ class Application(object): ...@@ -74,7 +51,6 @@ class Application(object):
self.primary_master_node = None self.primary_master_node = None
self.request_handler = MasterRequestEventHandler(self) self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self) self.master_event_handler = MasterEventHandler(self)
self.dispatcher = Dispatcher()
self.cluster_state = None self.cluster_state = None
self.reset() self.reset()
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
...@@ -122,7 +98,6 @@ class Application(object): ...@@ -122,7 +98,6 @@ class Application(object):
except PrimaryFailure: except PrimaryFailure:
neo.lib.logging.error('primary master is down') neo.lib.logging.error('primary master is down')
def connectToPrimary(self): def connectToPrimary(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
...@@ -161,6 +136,7 @@ class Application(object): ...@@ -161,6 +136,7 @@ class Application(object):
# passive handler # passive handler
self.master_conn.setHandler(self.master_event_handler) self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskNodeInformation()) self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable()) self.master_conn.ask(Packets.AskPartitionTable())
...@@ -175,16 +151,12 @@ class Application(object): ...@@ -175,16 +151,12 @@ class Application(object):
row = [] row = []
try: try:
for cell in self.pt.getCellList(offset): for cell in self.pt.getCellList(offset):
if uuid is not None and cell.getUUID() != uuid: if uuid is None or cell.getUUID() == uuid:
continue
else:
row.append((cell.getUUID(), cell.getState())) row.append((cell.getUUID(), cell.getState()))
except TypeError: except TypeError:
pass pass
row_list.append((offset, row)) row_list.append((offset, row))
except IndexError: except IndexError:
p = Errors.ProtocolError('invalid partition table offset') conn.notify(Errors.ProtocolError('invalid partition table offset'))
conn.notify(p) else:
return conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
p = Packets.AnswerPartitionList(self.pt.getID(), row_list)
conn.answer(p)
...@@ -25,44 +25,24 @@ from neo.lib.util import dump ...@@ -25,44 +25,24 @@ from neo.lib.util import dump
def check_primary_master(func): def check_primary_master(func):
def wrapper(self, *args, **kw): def wrapper(self, *args, **kw):
if self.app.master_conn is None: if self.app.bootstrapped:
raise protocol.NotReadyError('Not connected to a primary master.')
return func(self, *args, **kw) return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper return wrapper
def forward_ask(klass): def forward_ask(klass):
@check_primary_master return check_primary_master(lambda self, conn, *args, **kw:
def wrapper(self, conn, *args, **kw): self.app.master_conn.ask(klass(*args, **kw),
app = self.app conn=conn, msg_id=conn.getPeerId()))
msg_id = app.master_conn.ask(klass(*args, **kw))
app.dispatcher.register(msg_id, conn, {'msg_id': conn.getPeerId()})
return wrapper
def forward_answer(klass):
def wrapper(self, conn, *args, **kw):
packet = klass(*args, **kw)
self._answerNeoCTL(conn, packet)
return wrapper
class AdminEventHandler(EventHandler): class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster.""" """This class deals with events for administrating cluster."""
@check_primary_master @check_primary_master
def askPartitionList(self, conn, min_offset, max_offset, uuid): def askPartitionList(self, conn, min_offset, max_offset, uuid):
neo.lib.logging.info("ask partition list from %s to %s for %s" % neo.lib.logging.info("ask partition list from %s to %s for %s",
(min_offset, max_offset, dump(uuid))) min_offset, max_offset, dump(uuid))
app = self.app self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
# check we have one pt otherwise ask it to PMN
if app.pt is None:
msg_id = self.app.master_conn.ask(Packets.AskPartitionTable())
app.dispatcher.register(msg_id, conn,
{'min_offset' : min_offset,
'max_offset' : max_offset,
'uuid' : uuid,
'msg_id' : conn.getPeerId()})
else:
app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master @check_primary_master
def askNodeList(self, conn, node_type): def askNodeList(self, conn, node_type):
...@@ -79,7 +59,7 @@ class AdminEventHandler(EventHandler): ...@@ -79,7 +59,7 @@ class AdminEventHandler(EventHandler):
@check_primary_master @check_primary_master
def setNodeState(self, conn, uuid, state, modify_partition_table): def setNodeState(self, conn, uuid, state, modify_partition_table):
neo.lib.logging.info("set node state for %s-%s" %(dump(uuid), state)) neo.lib.logging.info("set node state for %s-%s", dump(uuid), state)
node = self.app.nm.getByUUID(uuid) node = self.app.nm.getByUUID(uuid)
if node is None: if node is None:
raise protocol.ProtocolError('invalid uuid') raise protocol.ProtocolError('invalid uuid')
...@@ -90,17 +70,10 @@ class AdminEventHandler(EventHandler): ...@@ -90,17 +70,10 @@ class AdminEventHandler(EventHandler):
return return
# forward to primary master node # forward to primary master node
p = Packets.SetNodeState(uuid, state, modify_partition_table) p = Packets.SetNodeState(uuid, state, modify_partition_table)
msg_id = self.app.master_conn.ask(p) self.app.master_conn.ask(p, conn=conn, msg_id=conn.getPeerId())
self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
@check_primary_master @check_primary_master
def askClusterState(self, conn): def askClusterState(self, conn):
if self.app.cluster_state is None:
# required it from PMN first
msg_id = self.app.master_conn.ask(Packets.AskClusterState())
self.app.dispatcher.register(msg_id, conn,
{'msg_id' : conn.getPeerId()})
else:
conn.answer(Packets.AnswerClusterState(self.app.cluster_state)) conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master @check_primary_master
...@@ -119,7 +92,7 @@ class MasterEventHandler(EventHandler): ...@@ -119,7 +92,7 @@ class MasterEventHandler(EventHandler):
app = self.app app = self.app
if app.listening_conn: # if running if app.listening_conn: # if running
assert app.master_conn in (conn, None) assert app.master_conn in (conn, None)
app.dispatcher.clear() conn.cancelRequests("connection to master lost")
app.reset() app.reset()
app.uuid = None app.uuid = None
raise PrimaryFailure raise PrimaryFailure
...@@ -131,14 +104,20 @@ class MasterEventHandler(EventHandler): ...@@ -131,14 +104,20 @@ class MasterEventHandler(EventHandler):
self._connectionLost(conn) self._connectionLost(conn)
def dispatch(self, conn, packet, kw={}): def dispatch(self, conn, packet, kw={}):
if packet.isResponse() and \ if 'conn' in kw:
self.app.dispatcher.registered(packet.getId()):
# expected answer # expected answer
if packet.isError():
packet.setId(kw['msg_id'])
kw['conn'].answer(packet)
else:
self.app.request_handler.dispatch(conn, packet, kw) self.app.request_handler.dispatch(conn, packet, kw)
else: else:
# unexpectexd answers and notifications # unexpected answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet, kw) super(MasterEventHandler, self).dispatch(conn, packet, kw)
def answerClusterState(self, conn, state):
self.app.cluster_state = state
def answerNodeInformation(self, conn): def answerNodeInformation(self, conn):
# XXX: This will no more exists when the initialization module will be # XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap) # implemented for factorize code (as done for bootstrap)
...@@ -159,27 +138,7 @@ class MasterEventHandler(EventHandler): ...@@ -159,27 +138,7 @@ class MasterEventHandler(EventHandler):
self.app.cluster_state = cluster_state self.app.cluster_state = cluster_state
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, node_list):
app = self.app self.app.nm.update(node_list)
app.nm.update(node_list)
class MasterRequestEventHandler(EventHandler): class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node""" """ This class handle all answer from primary master node"""
def _answerNeoCTL(self, conn, packet):
msg_id = conn.getPeerId()
client_conn, kw = self.app.dispatcher.pop(msg_id)
client_conn.answer(packet)
def answerClusterState(self, conn, state):
neo.lib.logging.info("answerClusterState for a conn")
self.app.cluster_state = state
self._answerNeoCTL(conn, Packets.AnswerClusterState(state))
def answerPartitionTable(self, conn, ptid, row_list):
neo.lib.logging.info("answerPartitionTable for a conn")
client_conn, kw = self.app.dispatcher.pop(conn.getPeerId())
# sent client the partition table
self.app.sendPartitionTable(client_conn)
ack = forward_answer(Errors.Ack)
protocolError = forward_answer(Errors.ProtocolError)
...@@ -26,7 +26,7 @@ from .connector import ConnectorException, ConnectorTryAgainException, \ ...@@ -26,7 +26,7 @@ from .connector import ConnectorException, ConnectorTryAgainException, \
from .locking import RLock from .locking import RLock
from .logger import PACKET_LOGGER from .logger import PACKET_LOGGER
from .profiling import profiler_decorator from .profiling import profiler_decorator
from .protocol import PacketMalformedError, Packets, ParserState from .protocol import Errors, PacketMalformedError, Packets, ParserState
from .util import dump, ReadBuffer from .util import dump, ReadBuffer
CRITICAL_TIMEOUT = 30 CRITICAL_TIMEOUT = 30
...@@ -99,6 +99,19 @@ class HandlerSwitcher(object): ...@@ -99,6 +99,19 @@ class HandlerSwitcher(object):
def isPending(self): def isPending(self):
return bool(self._pending[0][0]) return bool(self._pending[0][0])
def cancelRequests(self, conn, message):
if self.isPending():
p = Errors.ProtocolError(message)
while True:
request_dict, handler = self._pending[0]
while request_dict:
msg_id, request = request_dict.popitem()
p.setId(msg_id)
handler.packetReceived(conn, p, request[3])
if len(self._pending) == 1:
break
del self._pending[0]
def getHandler(self): def getHandler(self):
return self._pending[0][1] return self._pending[0][1]
...@@ -244,8 +257,12 @@ class BaseConnection(object): ...@@ -244,8 +257,12 @@ class BaseConnection(object):
self._handlers = HandlerSwitcher(handler) self._handlers = HandlerSwitcher(handler)
event_manager.register(self) event_manager.register(self)
def isPending(self): getHandler = property(lambda self: self._handlers.getHandler)
return self._handlers.isPending() getLastHandler = property(lambda self: self._handlers.getLastHandler)
isPending = property(lambda self: self._handlers.isPending)
def cancelRequests(self, *args, **kw):
return self._handlers.cancelRequests(self, *args, **kw)
def updateTimeout(self, t=None): def updateTimeout(self, t=None):
if not self._queue: if not self._queue:
...@@ -313,9 +330,6 @@ class BaseConnection(object): ...@@ -313,9 +330,6 @@ class BaseConnection(object):
__del__ = close __del__ = close
def getHandler(self):
return self._handlers.getHandler()
def setHandler(self, handler): def setHandler(self, handler):
if self._handlers.setHandler(handler): if self._handlers.setHandler(handler):
neo.lib.logging.debug('Set handler %r on %r', handler, self) neo.lib.logging.debug('Set handler %r on %r', handler, self)
...@@ -532,12 +546,12 @@ class Connection(BaseConnection): ...@@ -532,12 +546,12 @@ class Connection(BaseConnection):
self._on_close = None self._on_close = None
del self.write_buf[:] del self.write_buf[:]
self.read_buf.clear() self.read_buf.clear()
self._handlers.clear()
if self.connecting: if self.connecting:
handler.connectionFailed(self) handler.connectionFailed(self)
self.connecting = False self.connecting = False
else: else:
handler.connectionClosed(self) handler.connectionClosed(self)
self._handlers.clear()
def _closure(self): def _closure(self):
assert self.connector is not None, self.whoSetConnector() assert self.connector is not None, self.whoSetConnector()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment