Commit c2c9e99d authored by Julien Muchembled's avatar Julien Muchembled

Better error reporting from the master to neoctl for denied requests

This stops abusing ProtocolError, which disconnects the admin node needlessly.

The many 'if ... raise RuntimeError' in neo/neoctl/neoctl.py
could be turned into assertions.
parent 21190ee7
...@@ -26,6 +26,9 @@ from .protocol import (NodeStates, NodeTypes, Packets, uuid_str, ...@@ -26,6 +26,9 @@ from .protocol import (NodeStates, NodeTypes, Packets, uuid_str,
from .util import cached_property from .util import cached_property
class AnswerDenied(Exception):
"""Helper exception to stop packet processing and answer a Denied error"""
class DelayEvent(Exception): class DelayEvent(Exception):
pass pass
...@@ -98,6 +101,8 @@ class EventHandler(object): ...@@ -98,6 +101,8 @@ class EventHandler(object):
% (m.im_class.__module__, m.im_class.__name__, m.__name__))) % (m.im_class.__module__, m.im_class.__name__, m.__name__)))
except NonReadableCell, e: except NonReadableCell, e:
conn.answer(Errors.NonReadableCell()) conn.answer(Errors.NonReadableCell())
except AnswerDenied, e:
conn.answer(Errors.Denied(str(e)))
except AssertionError: except AssertionError:
e = sys.exc_info() e = sys.exc_info()
try: try:
......
...@@ -62,6 +62,7 @@ class Enum(tuple): ...@@ -62,6 +62,7 @@ class Enum(tuple):
@Enum @Enum
def ErrorCodes(): def ErrorCodes():
ACK ACK
DENIED
NOT_READY NOT_READY
OID_NOT_FOUND OID_NOT_FOUND
TID_NOT_FOUND TID_NOT_FOUND
......
...@@ -21,9 +21,10 @@ from . import MasterHandler ...@@ -21,9 +21,10 @@ from . import MasterHandler
from ..app import monotonic_time, StateChangedException from ..app import monotonic_time, StateChangedException
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import StoppedOperation from neo.lib.exception import StoppedOperation
from neo.lib.handler import AnswerDenied
from neo.lib.pt import PartitionTableException from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \ from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, ProtocolError, uuid_str NodeStates, NodeTypes, Packets, uuid_str
from neo.lib.util import dump from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = { CLUSTER_STATE_WORKFLOW = {
...@@ -44,8 +45,8 @@ def check_state(*states): ...@@ -44,8 +45,8 @@ def check_state(*states):
def wrapper(self, *args): def wrapper(self, *args):
state = self.app.getClusterState() state = self.app.getClusterState()
if state not in states: if state not in states:
raise ProtocolError('%s RPC can not be used in %s state' raise AnswerDenied('%s RPC can not be used in %s state'
% (wrapped.__name__, state)) % (wrapped.__name__, state))
wrapped(self, *args) wrapped(self, *args)
return wraps(wrapped)(wrapper) return wraps(wrapped)(wrapper)
return decorator return decorator
...@@ -75,30 +76,28 @@ class AdministrationHandler(MasterHandler): ...@@ -75,30 +76,28 @@ class AdministrationHandler(MasterHandler):
# check request # check request
try: try:
if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]: if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
raise ProtocolError('Can not switch to this state') raise AnswerDenied('Can not switch to this state')
except KeyError: except KeyError:
if state != ClusterStates.STOPPING: if state != ClusterStates.STOPPING:
raise ProtocolError('Invalid state requested') raise AnswerDenied('Invalid state requested')
# change state # change state
if state == ClusterStates.VERIFYING: if state == ClusterStates.VERIFYING:
storage_list = app.nm.getStorageList(only_identified=True) storage_list = app.nm.getStorageList(only_identified=True)
if not storage_list: if not storage_list:
raise ProtocolError('Cannot exit recovery without any ' raise AnswerDenied(
'storage node') 'Cannot exit recovery without any storage node')
for node in storage_list: for node in storage_list:
assert node.isPending(), node assert node.isPending(), node
if node.getConnection().isPending(): if node.getConnection().isPending():
# XXX: It's wrong to use ProtocolError here. We must reply raise AnswerDenied(
# less aggressively because the admin has no way to 'Cannot exit recovery now: node %r is entering cluster'
# know that there's still pending activity. % node,)
raise ProtocolError('Cannot exit recovery now: node %r is '
'entering cluster' % (node, ))
app._startup_allowed = True app._startup_allowed = True
state = app.cluster_state state = app.cluster_state
elif state == ClusterStates.STARTING_BACKUP: elif state == ClusterStates.STARTING_BACKUP:
if app.tm.hasPending() or app.nm.getClientList(True): if app.tm.hasPending() or app.nm.getClientList(True):
raise ProtocolError("Can not switch to %s state with pending" raise AnswerDenied("Can not switch to %s state with pending"
" transactions or connected clients" % state) " transactions or connected clients" % state)
conn.answer(Errors.Ack('Cluster state changed')) conn.answer(Errors.Ack('Cluster state changed'))
...@@ -110,11 +109,11 @@ class AdministrationHandler(MasterHandler): ...@@ -110,11 +109,11 @@ class AdministrationHandler(MasterHandler):
app = self.app app = self.app
node = app.nm.getByUUID(uuid) node = app.nm.getByUUID(uuid)
if node is None: if node is None:
raise ProtocolError('unknown node') raise AnswerDenied('unknown node')
if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()): if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
raise ProtocolError('can not switch node to this state') raise AnswerDenied('can not switch node to %s state' % state)
if uuid == app.uuid: if uuid == app.uuid:
raise ProtocolError('can not kill primary master node') raise AnswerDenied('can not kill primary master node')
state_changed = state != node.getState() state_changed = state != node.getState()
message = ('state changed' if state_changed else message = ('state changed' if state_changed else
...@@ -124,7 +123,7 @@ class AdministrationHandler(MasterHandler): ...@@ -124,7 +123,7 @@ class AdministrationHandler(MasterHandler):
try: try:
cell_list = app.pt.dropNodeList([node], keep) cell_list = app.pt.dropNodeList([node], keep)
except PartitionTableException, e: except PartitionTableException, e:
raise ProtocolError(str(e)) raise AnswerDenied(str(e))
node.setState(state) node.setState(state)
if node.isConnected(): if node.isConnected():
# notify itself so it can shutdown # notify itself so it can shutdown
...@@ -183,7 +182,7 @@ class AdministrationHandler(MasterHandler): ...@@ -183,7 +182,7 @@ class AdministrationHandler(MasterHandler):
for uuid in uuid_list: for uuid in uuid_list:
node = getByUUID(uuid) node = getByUUID(uuid)
if node is None or not (node.isStorage() and node.isIdentified()): if node is None or not (node.isStorage() and node.isIdentified()):
raise ProtocolError("invalid storage node %s" % uuid_str(uuid)) raise AnswerDenied("invalid storage node %s" % uuid_str(uuid))
node_list.append(node) node_list.append(node)
repair = Packets.NotifyRepair(*args) repair = Packets.NotifyRepair(*args)
for node in node_list: for node in node_list:
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import ErrorCodes, Packets from neo.lib.protocol import ErrorCodes, Packets
...@@ -44,8 +45,8 @@ class CommandEventHandler(EventHandler): ...@@ -44,8 +45,8 @@ class CommandEventHandler(EventHandler):
def ack(self, conn, msg): def ack(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.ACK, msg)) self.__respond((Packets.Error, ErrorCodes.ACK, msg))
def protocolError(self, conn, msg): def denied(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.PROTOCOL_ERROR, msg)) sys.exit(msg)
def notReady(self, conn, msg): def notReady(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.NOT_READY, msg)) self.__respond((Packets.Error, ErrorCodes.NOT_READY, msg))
......
...@@ -433,7 +433,7 @@ class NEOCluster(object): ...@@ -433,7 +433,7 @@ class NEOCluster(object):
pending_count += 1 pending_count += 1
if pending_count == target[0]: if pending_count == target[0]:
neoctl.startCluster() neoctl.startCluster()
except (NotReadyException, RuntimeError): except (NotReadyException, SystemExit):
pass pass
if not pdb.wait(test, MAX_START_TIME): if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster') raise AssertionError('Timeout when starting cluster')
...@@ -445,7 +445,7 @@ class NEOCluster(object): ...@@ -445,7 +445,7 @@ class NEOCluster(object):
def start(last_try): def start(last_try):
try: try:
self.neoctl.startCluster() self.neoctl.startCluster()
except (NotReadyException, RuntimeError), e: except (NotReadyException, SystemExit), e:
return False, e return False, e
return True, None return True, None
self.expectCondition(start) self.expectCondition(start)
......
...@@ -47,7 +47,7 @@ class MasterTests(NEOFunctionalTest): ...@@ -47,7 +47,7 @@ class MasterTests(NEOFunctionalTest):
break break
neoctl.killNode(uuid) neoctl.killNode(uuid)
self.neo.expectDead(master) self.neo.expectDead(master)
self.assertRaises(RuntimeError, neoctl.killNode, primary_uuid) self.assertRaises(SystemExit, neoctl.killNode, primary_uuid)
def testStoppingPrimaryWithTwoSecondaries(self): def testStoppingPrimaryWithTwoSecondaries(self):
# Wait for masters to stabilize # Wait for masters to stabilize
......
...@@ -172,7 +172,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -172,7 +172,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectOudatedCells(2) self.neo.expectOudatedCells(2)
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
self.assertRaises(RuntimeError, self.neo.neoctl.killNode, self.assertRaises(SystemExit, self.neo.neoctl.killNode,
started[1].getUUID()) started[1].getUUID())
started[1].stop() started[1].stop()
# Cluster not operational anymore. Only cells of second storage that # Cluster not operational anymore. Only cells of second storage that
...@@ -323,7 +323,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -323,7 +323,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectStorageUnknown(started[0]) self.neo.expectStorageUnknown(started[0])
self.neo.expectAssignedCells(started[0], 0) self.neo.expectAssignedCells(started[0], 0)
self.neo.expectAssignedCells(started[1], 10) self.neo.expectAssignedCells(started[1], 10)
self.assertRaises(RuntimeError, self.neo.neoctl.dropNode, self.assertRaises(SystemExit, self.neo.neoctl.dropNode,
started[1].getUUID()) started[1].getUUID())
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
......
...@@ -934,7 +934,7 @@ class NEOCluster(object): ...@@ -934,7 +934,7 @@ class NEOCluster(object):
def startCluster(self): def startCluster(self):
try: try:
self.neoctl.startCluster() self.neoctl.startCluster()
except RuntimeError: except SystemExit:
Serialized.tic() Serialized.tic()
if self.neoctl.getClusterState() not in ( if self.neoctl.getClusterState() not in (
ClusterStates.BACKINGUP, ClusterStates.BACKINGUP,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment