Commit 1b70a832 authored by Grégory Wisniewski's avatar Grégory Wisniewski

As for previous commits, raise notReadyError exception. All error send answer

packets (with the id from the answered packet) when possible.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@507 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent eea5aff2
...@@ -143,7 +143,11 @@ class EventHandler(object): ...@@ -143,7 +143,11 @@ class EventHandler(object):
logging.info('malformed packet from %s:%d: %s', *args) logging.info('malformed packet from %s:%d: %s', *args)
else: else:
logging.info('malformed packet %s from %s:%d: %s', packet.getType(), *args) logging.info('malformed packet %s from %s:%d: %s', packet.getType(), *args)
conn.notify(protocol.protocolError(error_message)) response = protocol.protocolError(error_message)
if packet is not None:
conn.answer(response, packet)
else:
conn.notify(response)
conn.abort() conn.abort()
self.peerBroken(conn) self.peerBroken(conn)
...@@ -154,13 +158,18 @@ class EventHandler(object): ...@@ -154,13 +158,18 @@ class EventHandler(object):
else: else:
message = 'unexpected packet: %s' % message message = 'unexpected packet: %s' % message
logging.info('%s', message) logging.info('%s', message)
conn.notify(protocol.protocolError(message)) conn.answer(protocol.protocolError(message), packet)
conn.abort() conn.abort()
self.peerBroken(conn) self.peerBroken(conn)
def brokenNodeDisallowedError(conn, packet, message=None): def brokenNodeDisallowedError(conn, packet, message=None):
""" Called when a broken node send packets """ """ Called when a broken node send packets """
conn.notify(protocol.brokenNodeDisallowedError('go away')) conn.answer(protocol.brokenNodeDisallowedError('go away'), packet)
conn.abort()
def notReadyError(self, conn, packet, message=None):
""" Called when the node is not ready """
conn.answer(protocol.notReady('retry later'), packet)
conn.abort() conn.abort()
def dispatch(self, conn, packet): def dispatch(self, conn, packet):
...@@ -178,6 +187,8 @@ class EventHandler(object): ...@@ -178,6 +187,8 @@ class EventHandler(object):
self.packetMalformed(conn, packet, msg) self.packetMalformed(conn, packet, msg)
except BrokenNotDisallowedError, msg: except BrokenNotDisallowedError, msg:
self.brokenNodeDisallowedError(conn, packet, msg) self.brokenNodeDisallowedError(conn, packet, msg)
except NotReadyError, msg:
self.notReadyError(conn, packet, msg)
# Packet handlers. # Packet handlers.
......
...@@ -175,9 +175,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -175,9 +175,7 @@ class ElectionEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.answer(protocol.notReady('retry later'), packet) raise protocol.NotReadyError
conn.abort()
return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.answer(protocol.protocolError('invalid cluster name'), packet) conn.answer(protocol.protocolError('invalid cluster name'), packet)
......
...@@ -69,9 +69,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -69,9 +69,7 @@ class RecoveryEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.answer(protocol.notReady('retry later'), packet) raise protocol.NotReadyError
conn.abort()
return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.answer(protocol.protocolError('invalid cluster name'), packet) conn.answer(protocol.protocolError('invalid cluster name'), packet)
......
...@@ -146,6 +146,10 @@ server: 127.0.0.1:10023 ...@@ -146,6 +146,10 @@ server: 127.0.0.1:10023
""" Check if the BrokenNotDisallowedError exception wxas raised """ """ Check if the BrokenNotDisallowedError exception wxas raised """
self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs) self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs)
def checkNotReadyErrorRaised(self, method, *args, **kwargs):
""" Check if the NotReadyError exception wxas raised """
self.assertRaises(protocol.NotReadyError, method, *args, **kwargs)
def checkCalledAcceptNodeIdentification(self, conn, packet_number=0): def checkCalledAcceptNodeIdentification(self, conn, packet_number=0):
""" Check Accept Node Identification has been send""" """ Check Accept Node Identification has been send"""
self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1) self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1)
...@@ -528,14 +532,15 @@ server: 127.0.0.1:10023 ...@@ -528,14 +532,15 @@ server: 127.0.0.1:10023
# test connection of a storage node # test connection of a storage node
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None, conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None,
"isServerConnection" : True}) "isServerConnection" : True})
election.handleRequestNodeIdentification(conn, self.checkNotReadyErrorRaised(
packet=packet, election.handleRequestNodeIdentification,
node_type=STORAGE_NODE_TYPE, conn,
uuid=uuid, packet=packet,
ip_address='127.0.0.1', node_type=STORAGE_NODE_TYPE,
port=self.storage_port, uuid=uuid,
name=self.app.name,) ip_address='127.0.0.1',
self.checkCalledAbort(conn) port=self.storage_port,
name=self.app.name,)
# known node # known node
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None, conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None,
......
...@@ -156,6 +156,10 @@ server: 127.0.0.1:10023 ...@@ -156,6 +156,10 @@ server: 127.0.0.1:10023
""" Check if the BrokenNotDisallowedError exception wxas raised """ """ Check if the BrokenNotDisallowedError exception wxas raised """
self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs) self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs)
def checkNotReadyErrorRaised(self, method, *args, **kwargs):
""" Check if the NotReadyError exception wxas raised """
self.assertRaises(protocol.NotReadyError, method, *args, **kwargs)
def checkCalledAcceptNodeIdentification(self, conn, packet_number=0): def checkCalledAcceptNodeIdentification(self, conn, packet_number=0):
""" Check Accept Node Identification has been send""" """ Check Accept Node Identification has been send"""
self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1) self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1)
...@@ -271,14 +275,15 @@ server: 127.0.0.1:10023 ...@@ -271,14 +275,15 @@ server: 127.0.0.1:10023
# test connection from a client node, rejectet # test connection from a client node, rejectet
uuid = self.getNewUUID() uuid = self.getNewUUID()
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
recovery.handleRequestNodeIdentification(conn, self.checkNotReadyErrorRaised(
packet=packet, recovery.handleRequestNodeIdentification,
node_type=CLIENT_NODE_TYPE, conn,
uuid=uuid, packet=packet,
ip_address='127.0.0.1', node_type=CLIENT_NODE_TYPE,
port=self.client_port, uuid=uuid,
name=self.app.name,) ip_address='127.0.0.1',
self.checkCalledAbort(conn) port=self.client_port,
name=self.app.name,)
# 1. unknown storage node with known address, must be rejected # 1. unknown storage node with known address, must be rejected
uuid = self.getNewUUID() uuid = self.getNewUUID()
......
...@@ -135,6 +135,10 @@ server: 127.0.0.1:10023 ...@@ -135,6 +135,10 @@ server: 127.0.0.1:10023
""" Check if the BrokenNotDisallowedError exception wxas raised """ """ Check if the BrokenNotDisallowedError exception wxas raised """
self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs) self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs)
def checkNotReadyErrorRaised(self, method, *args, **kwargs):
""" Check if the NotReadyError exception wxas raised """
self.assertRaises(protocol.NotReadyError, method, *args, **kwargs)
def checkCalledAcceptNodeIdentification(self, conn, packet_number=0): def checkCalledAcceptNodeIdentification(self, conn, packet_number=0):
""" Check Accept Node Identification has been send""" """ Check Accept Node Identification has been send"""
self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1) self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1)
...@@ -294,14 +298,15 @@ server: 127.0.0.1:10023 ...@@ -294,14 +298,15 @@ server: 127.0.0.1:10023
# test connection from a client node, rejectet # test connection from a client node, rejectet
uuid = self.getNewUUID() uuid = self.getNewUUID()
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
verification.handleRequestNodeIdentification(conn, self.checkNotReadyErrorRaised(
packet=packet, verification.handleRequestNodeIdentification,
node_type=CLIENT_NODE_TYPE, conn,
uuid=uuid, packet=packet,
ip_address='127.0.0.1', node_type=CLIENT_NODE_TYPE,
port=self.client_port, uuid=uuid,
name=self.app.name,) ip_address='127.0.0.1',
self.checkCalledAbort(conn) port=self.client_port,
name=self.app.name,)
# 1. unknown storage node with known address, must be rejected # 1. unknown storage node with known address, must be rejected
uuid = self.getNewUUID() uuid = self.getNewUUID()
......
...@@ -93,9 +93,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -93,9 +93,7 @@ class VerificationEventHandler(MasterEventHandler):
app = self.app app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('reject a connection from a client') logging.info('reject a connection from a client')
conn.answer(protocol.notReady('retry later'), packet) raise protocol.NotReadyError
conn.abort()
return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.answer(protocol.protocolError('invalid cluster name'), packet) conn.answer(protocol.protocolError('invalid cluster name'), packet)
......
...@@ -113,9 +113,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -113,9 +113,7 @@ class BootstrapEventHandler(StorageEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.answer(protocol.notReady('retry later'), packet) raise protocol.NotReadyError
conn.abort()
return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.answer(protocol.protocolError('invalid cluster name'), packet) conn.answer(protocol.protocolError('invalid cluster name'), packet)
......
...@@ -147,18 +147,15 @@ class OperationEventHandler(StorageEventHandler): ...@@ -147,18 +147,15 @@ class OperationEventHandler(StorageEventHandler):
addr = (ip_address, port) addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node is None: if node is None:
if node_type == MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If I do not know such a node, and it is not even a master # If I do not know such a node, and it is not even a master
# node, simply reject it. # node, simply reject it.
logging.error('reject an unknown node %s', dump(uuid)) logging.error('reject an unknown node %s', dump(uuid))
conn.answer(protocol.notReady('unknown node'), packet) raise protocol.NotReadyError
conn.abort() node = app.nm.getNodeByServer(addr)
return if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else: else:
# If this node is broken, reject it. # If this node is broken, reject it.
if node.getUUID() == uuid: if node.getUUID() == uuid:
......
...@@ -122,6 +122,10 @@ server: 127.0.0.1:10020 ...@@ -122,6 +122,10 @@ server: 127.0.0.1:10020
""" Check if the BrokenNotDisallowedError exception wxas raised """ """ Check if the BrokenNotDisallowedError exception wxas raised """
self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs) self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs)
def checkNotReadyErrorRaised(self, method, *args, **kwargs):
""" Check if the NotReadyError exception wxas raised """
self.assertRaises(protocol.NotReadyError, method, *args, **kwargs)
# Method to test the kind of packet returned in answer # Method to test the kind of packet returned in answer
def checkCalledRequestNodeIdentification(self, conn, packet_number=0): def checkCalledRequestNodeIdentification(self, conn, packet_number=0):
""" Check Request Node Identification has been send""" """ Check Request Node Identification has been send"""
...@@ -305,7 +309,8 @@ server: 127.0.0.1:10020 ...@@ -305,7 +309,8 @@ server: 127.0.0.1:10020
packet = Packet(msg_type=REQUEST_NODE_IDENTIFICATION) packet = Packet(msg_type=REQUEST_NODE_IDENTIFICATION)
conn = Mock({"isServerConnection": True, conn = Mock({"isServerConnection": True,
"getAddress" : ("127.0.0.1", self.master_port), }) "getAddress" : ("127.0.0.1", self.master_port), })
self.bootstrap.handleRequestNodeIdentification( self.checkNotReadyErrorRaised(
self.bootstrap.handleRequestNodeIdentification,
conn=conn, conn=conn,
uuid=self.getNewUUID(), uuid=self.getNewUUID(),
packet=packet, packet=packet,
...@@ -313,7 +318,6 @@ server: 127.0.0.1:10020 ...@@ -313,7 +318,6 @@ server: 127.0.0.1:10020
node_type=STORAGE_NODE_TYPE, node_type=STORAGE_NODE_TYPE,
ip_address='127.0.0.1', ip_address='127.0.0.1',
name=self.app.name,) name=self.app.name,)
self.checkCalledAbort(conn)
self.assertEquals(len(conn.mockGetNamedCalls("setUUID")), 0) self.assertEquals(len(conn.mockGetNamedCalls("setUUID")), 0)
def test_08_handleRequestNodeIdentification3(self): def test_08_handleRequestNodeIdentification3(self):
......
...@@ -63,6 +63,10 @@ class StorageOperationTests(unittest.TestCase): ...@@ -63,6 +63,10 @@ class StorageOperationTests(unittest.TestCase):
""" Check if the BrokenNotDisallowedError exception wxas raised """ """ Check if the BrokenNotDisallowedError exception wxas raised """
self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs) self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs)
def checkNotReadyErrorRaised(self, method, *args, **kwargs):
""" Check if the NotReadyError exception wxas raised """
self.assertRaises(protocol.NotReadyError, method, *args, **kwargs)
def checkCalledAbort(self, conn, packet_number=0): def checkCalledAbort(self, conn, packet_number=0):
"""Check the abort method has been called and an error packet has been sent""" """Check the abort method has been called and an error packet has been sent"""
# sometimes we answer an error, sometimes we just send it # sometimes we answer an error, sometimes we just send it
...@@ -397,7 +401,8 @@ server: 127.0.0.1:10020 ...@@ -397,7 +401,8 @@ server: 127.0.0.1:10020
"getAddress" : ("127.0.0.1", self.master_port), "getAddress" : ("127.0.0.1", self.master_port),
}) })
count = len(self.app.nm.getNodeList()) count = len(self.app.nm.getNodeList())
self.operation.handleRequestNodeIdentification( self.checkNotReadyErrorRaised(
self.operation.handleRequestNodeIdentification,
conn=conn, conn=conn,
uuid=self.getNewUUID(), uuid=self.getNewUUID(),
packet=packet, packet=packet,
...@@ -405,7 +410,6 @@ server: 127.0.0.1:10020 ...@@ -405,7 +410,6 @@ server: 127.0.0.1:10020
node_type=STORAGE_NODE_TYPE, node_type=STORAGE_NODE_TYPE,
ip_address='192.168.1.1', ip_address='192.168.1.1',
name=self.app.name,) name=self.app.name,)
self.checkCalledAbort(conn)
self.assertEquals(len(self.app.nm.getNodeList()), count) self.assertEquals(len(self.app.nm.getNodeList()), count)
def test_09_handleRequestNodeIdentification5(self): def test_09_handleRequestNodeIdentification5(self):
......
...@@ -139,6 +139,10 @@ server: 127.0.0.1:10020 ...@@ -139,6 +139,10 @@ server: 127.0.0.1:10020
""" Check if the BrokenNotDisallowedError exception wxas raised """ """ Check if the BrokenNotDisallowedError exception wxas raised """
self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs) self.assertRaises(protocol.BrokenNotDisallowedError, method, *args, **kwargs)
def checkNotReadyErrorRaised(self, method, *args, **kwargs):
""" Check if the NotReadyError exception wxas raised """
self.assertRaises(protocol.NotReadyError, method, *args, **kwargs)
def checkCalledAbort(self, conn, packet_number=0): def checkCalledAbort(self, conn, packet_number=0):
"""Check the abort method has been called and an error packet has been sent""" """Check the abort method has been called and an error packet has been sent"""
# sometimes we answer an error, sometimes we just notify it # sometimes we answer an error, sometimes we just notify it
...@@ -230,9 +234,10 @@ server: 127.0.0.1:10020 ...@@ -230,9 +234,10 @@ server: 127.0.0.1:10020
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
p = Packet(msg_type=REQUEST_NODE_IDENTIFICATION) p = Packet(msg_type=REQUEST_NODE_IDENTIFICATION)
self.verification.handleRequestNodeIdentification(conn, p, CLIENT_NODE_TYPE, self.checkNotReadyErrorRaised(
uuid, "127.0.0.1", self.client_port, "zatt") self.verification.handleRequestNodeIdentification,
self.checkCalledAbort(conn) conn, p, CLIENT_NODE_TYPE,
uuid, "127.0.0.1", self.client_port, "zatt")
# not a master node # not a master node
uuid = self.getNewUUID() uuid = self.getNewUUID()
...@@ -240,9 +245,10 @@ server: 127.0.0.1:10020 ...@@ -240,9 +245,10 @@ server: 127.0.0.1:10020
"getAddress" : ("127.0.0.1", self.client_port), "getAddress" : ("127.0.0.1", self.client_port),
"isServerConnection" : True}) "isServerConnection" : True})
p = Packet(msg_type=REQUEST_NODE_IDENTIFICATION) p = Packet(msg_type=REQUEST_NODE_IDENTIFICATION)
self.verification.handleRequestNodeIdentification(conn, p, CLIENT_NODE_TYPE, self.checkNotReadyErrorRaised(
uuid, "127.0.0.1", self.client_port, "zatt") self.verification.handleRequestNodeIdentification,
self.checkCalledAbort(conn) conn, p, CLIENT_NODE_TYPE,
uuid, "127.0.0.1", self.client_port, "zatt")
# bad name # bad name
uuid = self.getNewUUID() uuid = self.getNewUUID()
......
...@@ -69,9 +69,7 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -69,9 +69,7 @@ class VerificationEventHandler(StorageEventHandler):
app = self.app app = self.app
if node_type != MASTER_NODE_TYPE: if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master') logging.info('reject a connection from a non-master')
conn.answer(protocol.notReady('retry later'), packet) raise protocol.NotReadyError
conn.abort()
return
if name != app.name: if name != app.name:
logging.error('reject an alien cluster') logging.error('reject an alien cluster')
conn.answer(protocol.protocolError( conn.answer(protocol.protocolError(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment