Commit 823c9d7a authored by Grégory Wisniewski's avatar Grégory Wisniewski

Split answers and notifications in client handlers.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@576 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent bed32b02
......@@ -70,7 +70,7 @@ class ConnectionPool(object):
while True:
logging.info('trying to connect to %s', node)
app.setNodeReady()
handler = StorageBootstrapEventHandler(app, app.dispatcher)
handler = StorageBootstrapHandler(app, app.dispatcher)
conn = MTClientConnection(app.em, handler, addr,
connector_handler=app.connector_handler)
conn.lock()
......@@ -236,8 +236,9 @@ class Application(object):
self.ptid = INVALID_PTID
self.num_replicas = 0
self.num_partitions = 0
self.primary_handler = PrimaryEventHandler(self, self.dispatcher)
self.storage_handler = StorageEventHandler(self, self.dispatcher)
self.storage_handler = StorageAnswersHandler(self, self.dispatcher)
self.primary_handler = PrimaryAnswersHandler(self, self.dispatcher)
self.notifications_handler = PrimaryNotificationsHandler(self)
# Internal attribute distinct between thread
self.local_var = ThreadContext()
# Lock definition :
......@@ -276,6 +277,9 @@ class Application(object):
"""Wait for a message returned by the dispatcher in queues."""
local_queue = self.local_var.queue
if handler is None:
handler = self.notifications_handler
while 1:
try:
if msg_id is None:
......@@ -862,7 +866,7 @@ class Application(object):
close = __del__
def sync(self):
self._waitMessage(handler=self.storage_handler)
self._waitMessage()
def connectToPrimaryMasterNode(self):
self.master_conn = None
......@@ -880,7 +884,7 @@ class Application(object):
master_index = 0
conn = None
# Make application execute remaining message if any
self._waitMessage(handler=self.storage_handler)
self._waitMessage()
while True:
self.setNodeReady()
if self.primary_master_node is None:
......@@ -894,7 +898,7 @@ class Application(object):
else:
addr, port = self.primary_master_node.getServer()
# Request Node Identification
handler = PrimaryBoostrapEventHandler(self, self.dispatcher)
handler = PrimaryBootstrapHandler(self, self.dispatcher)
conn = MTClientConnection(self.em, handler, (addr, port),
connector_handler=self.connector_handler)
self._nm_acquire()
......@@ -939,7 +943,7 @@ class Application(object):
sleep(1)
logging.info("connected to primary master node %s" % self.primary_master_node)
conn.setHandler(PrimaryEventHandler(self, self.dispatcher))
conn.setHandler(PrimaryAnswersHandler(self, self.dispatcher))
self.master_conn = conn
finally:
......
......@@ -34,13 +34,14 @@ from neo.handler import identification_required, restrict_node_types
from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64
class BaseClientEventHandler(EventHandler):
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
def __init__(self, app, dispatcher):
self.app = app
self.dispatcher = dispatcher
super(BaseClientEventHandler, self).__init__()
super(BaseHandler, self).__init__()
def dispatch(self, conn, packet):
# Before calling superclass's dispatch method, lock the connection.
......@@ -48,7 +49,7 @@ class BaseClientEventHandler(EventHandler):
# packet.
conn.lock()
try:
super(BaseClientEventHandler, self).dispatch(conn, packet)
super(BaseHandler, self).dispatch(conn, packet)
finally:
conn.release()
......@@ -60,56 +61,29 @@ class BaseClientEventHandler(EventHandler):
else:
queue.put((conn, packet))
def _dealWithStorageFailure(self, conn, node, state):
app = self.app
# Remove from pool connection
app.cp.removeConnection(node)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
for queue in queue_set:
queue.put((conn, None))
# Notify the primary master node of the failure.
conn = app.master_conn
if conn is not None:
conn.lock()
try:
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port,
node.getUUID(), state)]
conn.notify(protocol.notifyNodeInformation(node_list))
finally:
conn.unlock()
class PrimaryBoostrapEventHandler(BaseClientEventHandler):
class PrimaryBootstrapHandler(BaseHandler):
# Bootstrap handler used when looking for the primary master
def connectionFailed(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).connectionFailed(conn)
super(PrimaryBootstrapHandler, self).connectionFailed(conn)
def connectionClosed(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).connectionClosed(conn)
super(PrimaryBootstrapHandler, self).connectionClosed(conn)
def peerBroken(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).peerBroken(conn)
super(PrimaryBootstrapHandler, self).peerBroken(conn)
def timeoutExpired(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).timeoutExpired(conn)
super(PrimaryBootstrapHandler, self).timeoutExpired(conn)
def handleNotReady(self, conn, packet, message):
self.app.setNodeNotReady()
......@@ -238,7 +212,7 @@ class PrimaryBoostrapEventHandler(BaseClientEventHandler):
@identification_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
# This handler is in PrimaryBoostrapEventHandler, since this
# This handler is in PrimaryBootstrapHandler, since this
# basicaly is an answer to askPrimaryMaster.
# Extract from P-NEO-Protocol.Description:
# Connection to primary master node (PMN in service state)
......@@ -272,8 +246,14 @@ class PrimaryBoostrapEventHandler(BaseClientEventHandler):
pt.setCell(offset, node, state)
class PrimaryEventHandler(BaseClientEventHandler):
# Handler used to communicate with the primary master
class PrimaryNotificationsHandler(EventHandler):
""" Handler that process the notifications from the primary master """
# For notifications we do not need a dispatcher
def __init__(self, app):
self.app = app
EventHandler.__init__(self)
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
......@@ -283,17 +263,38 @@ class PrimaryEventHandler(BaseClientEventHandler):
app.master_conn = None
app.primary_master_node = None
app.connectToPrimaryMasterNode()
super(PrimaryEventHandler, self).peerBroken(conn)
EventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
self.app.connectToPrimaryMasterNode()
super(PrimaryEventHandler, self).timeoutExpired(conn)
EventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
self.app.connectToPrimaryMasterNode()
super(PrimaryEventHandler, self).peerBroken(conn)
EventHandler.peerBroken(self, conn)
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
def handleInvalidateObjects(self, conn, packet, oid_list, tid):
app = self.app
app._cache_lock_acquire()
try:
# ZODB required a dict with oid as key, so create it
oids = {}
for oid in oid_list:
oids[oid] = tid
try:
del app.mq_cache[oid]
except KeyError:
pass
db = app.getDB()
if db is not None:
db.invalidate(tid, oids)
finally:
app._cache_lock_release()
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
......@@ -327,6 +328,7 @@ class PrimaryEventHandler(BaseClientEventHandler):
n.setState(state)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
app = self.app
nm = app.nm
......@@ -343,9 +345,32 @@ class PrimaryEventHandler(BaseClientEventHandler):
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
# FIXME: Why FEEDING_STATE cells are kept in the PT ?
pt.setCell(offset, node, state)
class PrimaryAnswersHandler(BaseHandler):
""" Handle that process expected packets from the primary master """
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
app = self.app
app.master_conn.close()
app.master_conn = None
app.primary_master_node = None
app.connectToPrimaryMasterNode()
super(PrimaryAnswersHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
self.app.connectToPrimaryMasterNode()
super(PrimaryAnswersHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
self.app.connectToPrimaryMasterNode()
super(PrimaryAnswersHandler, self).peerBroken(conn)
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app.setTID(tid)
......@@ -361,38 +386,68 @@ class PrimaryEventHandler(BaseClientEventHandler):
app.setTransactionFinished()
class StorageBootstrapEventHandler(BaseClientEventHandler):
# Handler used when connecting to a storage node
class StorageBaseHandler(BaseHandler):
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageBootstrapEventHandler, self).connectionFailed(conn)
def _dealWithStorageFailure(self, conn, node, state):
app = self.app
# Remove from pool connection
app.cp.removeConnection(node)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
for queue in queue_set:
queue.put((conn, None))
# Notify the primary master node of the failure.
conn = app.master_conn
if conn is not None:
conn.lock()
try:
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port,
node.getUUID(), state)]
conn.notify(protocol.notifyNodeInformation(node_list))
finally:
conn.unlock()
def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageBootstrapEventHandler, self).connectionClosed(conn)
super(StorageBaseHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageBootstrapEventHandler, self).timeoutExpired(conn)
super(StorageBaseHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, BROKEN_STATE)
super(StorageBootstrapEventHandler, self).peerBroken(conn)
super(StorageBaseHandler, self).peerBroken(conn)
class StorageBootstrapHandler(StorageBaseHandler):
""" Handler used when connecting to a storage node """
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageBootstrapHandler, self).connectionFailed(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
uuid, ip_address, port, num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
......@@ -412,48 +467,11 @@ class StorageBootstrapEventHandler(BaseClientEventHandler):
node.setUUID(uuid)
class StorageEventHandler(BaseClientEventHandler):
# Handle all messages related to ZODB operations
def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageEventHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageEventHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, BROKEN_STATE)
super(StorageEventHandler, self).peerBroken(conn)
def handleInvalidateObjects(self, conn, packet, oid_list, tid):
app = self.app
app._cache_lock_acquire()
try:
# ZODB required a dict with oid as key, so create it
oids = {}
for oid in oid_list:
oids[oid] = tid
try:
del app.mq_cache[oid]
except KeyError:
pass
db = app.getDB()
if db is not None:
db.invalidate(tid, oids)
finally:
app._cache_lock_release()
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
class StorageAnswersHandler(StorageBaseHandler):
""" Handle all messages related to ZODB operations """
def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial, compression,
checksum, data):
def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial,
compression, checksum, data):
app = self.app
app.local_var.asked_object = (oid, start_serial, end_serial, compression,
checksum, data)
......@@ -503,3 +521,4 @@ class StorageEventHandler(BaseClientEventHandler):
app = self.app
app.local_var.node_tids[conn.getUUID()] = tid_list
......@@ -45,18 +45,19 @@ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIF
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.exception import ElectionFailure
from neo.client.handler import BaseClientEventHandler, PrimaryBoostrapEventHandler, \
PrimaryEventHandler, StorageBootstrapEventHandler, StorageEventHandler
from neo.client.handler import BaseHandler, PrimaryBootstrapHandler, \
PrimaryNotificationsHandler, PrimaryAnswersHandler, \
StorageBootstrapHandler, StorageAnswersHandler
from neo.node import StorageNode
from neo.util import dump
MARKER = []
class BaseClientEventHandlerTest(NeoTestBase):
class BaseClientHandlerTest(NeoTestBase):
def setUp(self):
dispatcher = Mock({'getQueue': queue, 'connectToPrimaryMasterNode': None})
self.handler = BaseClientEventHandler(dispatcher)
self.handler = BaseClientHandler(dispatcher)
def getConnection(self, uuid=None, port=10010, next_id=None, ip='127.0.0.1'):
if uuid is None:
......@@ -69,7 +70,7 @@ class BaseClientEventHandlerTest(NeoTestBase):
'unlock': None})
class ClientEventHandlerTest(NeoTestBase):
class ClientHandlerTest(NeoTestBase):
def setUp(self):
# Silence all log messages
......@@ -88,13 +89,19 @@ class ClientEventHandlerTest(NeoTestBase):
def getDispatcher(self, queue=None):
return Mock({'getQueue': queue, 'connectToPrimaryMasterNode': None})
def instanciateHandler(self, handler_class, app, dispatcher):
if handler_class is PrimaryNotificationsHandler:
return handler_class(app)
else:
return handler_class(app, dispatcher)
def test_ping(self):
"""
Simplest test: check that a PING packet is answered by a PONG
packet.
"""
dispatcher = self.getDispatcher()
client_handler = BaseClientEventHandler(None, dispatcher)
client_handler = BaseHandler(None, dispatcher)
conn = self.getConnection()
client_handler.packetReceived(conn, protocol.ping())
self.checkAnswerPacket(conn, protocol.PONG)
......@@ -103,7 +110,7 @@ class ClientEventHandlerTest(NeoTestBase):
class App:
primary_master_node = None
app = App()
method(self.getDispatcher(), app, PrimaryBoostrapEventHandler)
method(self.getDispatcher(), app, PrimaryBootstrapHandler)
self.assertEqual(app.primary_master_node, -1)
def _testMasterWithMethod(self, method, handler_class):
......@@ -167,7 +174,7 @@ class ClientEventHandlerTest(NeoTestBase):
self.assertEqual(len(queue_2.mockGetNamedCalls('put')), 0)
def _testConnectionFailed(self, dispatcher, app, handler_class, uuid=None, conn=None):
client_handler = handler_class(app, dispatcher)
client_handler = self.instanciateHandler(handler_class, app, dispatcher)
if conn is None:
conn = self.getConnection(uuid=uuid)
client_handler.connectionFailed(conn)
......@@ -177,10 +184,10 @@ class ClientEventHandlerTest(NeoTestBase):
def test_storageConnectionFailed(self):
self._testStorageWithMethod(self._testConnectionFailed,
StorageBootstrapEventHandler)
StorageBootstrapHandler)
def _testConnectionClosed(self, dispatcher, app, handler_class, uuid=None, conn=None):
client_handler = handler_class(app, dispatcher)
client_handler = self.instanciateHandler(handler_class, app, dispatcher)
if conn is None:
conn = self.getConnection(uuid=uuid)
client_handler.connectionClosed(conn)
......@@ -190,16 +197,16 @@ class ClientEventHandlerTest(NeoTestBase):
def test_masterConnectionClosed(self):
self._testMasterWithMethod(self._testConnectionClosed,
PrimaryEventHandler)
PrimaryNotificationsHandler)
def test_storageConnectionClosed(self):
self._testStorageWithMethod(self._testConnectionClosed,
StorageBootstrapEventHandler)
StorageBootstrapHandler)
self._testStorageWithMethod(self._testConnectionClosed,
StorageEventHandler)
StorageAnswersHandler)
def _testTimeoutExpired(self, dispatcher, app, handler_class, uuid=None, conn=None):
client_handler = handler_class(app, dispatcher)
client_handler = self.instanciateHandler(handler_class, app, dispatcher)
if conn is None:
conn = self.getConnection(uuid=uuid)
client_handler.timeoutExpired(conn)
......@@ -208,16 +215,16 @@ class ClientEventHandlerTest(NeoTestBase):
self._testInitialMasterWithMethod(self._testTimeoutExpired)
def test_masterTimeoutExpired(self):
self._testMasterWithMethod(self._testTimeoutExpired, PrimaryEventHandler)
self._testMasterWithMethod(self._testTimeoutExpired, PrimaryNotificationsHandler)
def test_storageTimeoutExpired(self):
self._testStorageWithMethod(self._testTimeoutExpired,
StorageEventHandler)
StorageAnswersHandler)
self._testStorageWithMethod(self._testTimeoutExpired,
StorageBootstrapEventHandler)
StorageBootstrapHandler)
def _testPeerBroken(self, dispatcher, app, handler_class, uuid=None, conn=None):
client_handler = handler_class(app, dispatcher)
client_handler = self.instanciateHandler(handler_class, app, dispatcher)
if conn is None:
conn = self.getConnection(uuid=uuid)
client_handler.peerBroken(conn)
......@@ -226,22 +233,22 @@ class ClientEventHandlerTest(NeoTestBase):
self._testInitialMasterWithMethod(self._testPeerBroken)
def test_masterPeerBroken(self):
self._testMasterWithMethod(self._testPeerBroken, PrimaryEventHandler)
self._testMasterWithMethod(self._testPeerBroken, PrimaryNotificationsHandler)
def test_storagePeerBroken(self):
self._testStorageWithMethod(self._testPeerBroken,
StorageBootstrapEventHandler, state=BROKEN_STATE)
StorageBootstrapHandler, state=BROKEN_STATE)
self._testStorageWithMethod(self._testPeerBroken,
StorageEventHandler, state=BROKEN_STATE)
StorageAnswersHandler, state=BROKEN_STATE)
def test_notReady(self):
app = Mock({'setNodeNotReady': None})
dispatcher = self.getDispatcher()
client_handler = PrimaryBoostrapEventHandler(app, dispatcher)
client_handler = PrimaryBootstrapHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleNotReady(conn, None, None)
self.assertEquals(len(app.mockGetNamedCalls('setNodeNotReady')), 1)
client_handler = StorageBootstrapEventHandler(app, dispatcher)
client_handler = StorageBootstrapHandler(app, dispatcher)
client_handler.handleNotReady(conn, None, None)
self.assertEquals(len(app.mockGetNamedCalls('setNodeNotReady')), 2)
......@@ -252,7 +259,7 @@ class ClientEventHandlerTest(NeoTestBase):
pt = None
app = App()
dispatcher = self.getDispatcher()
client_handler = PrimaryBoostrapEventHandler(app, dispatcher)
client_handler = PrimaryBootstrapHandler(app, dispatcher)
conn = self.getConnection()
uuid = self.getNewUUID()
app.uuid = 'C' * 16
......@@ -276,7 +283,7 @@ class ClientEventHandlerTest(NeoTestBase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = PrimaryBoostrapEventHandler(app, dispatcher)
client_handler = PrimaryBootstrapHandler(app, dispatcher)
conn = self.getConnection()
uuid = self.getNewUUID()
your_uuid = 'C' * 16
......@@ -298,7 +305,7 @@ class ClientEventHandlerTest(NeoTestBase):
pt = None
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageBootstrapEventHandler(app, dispatcher)
client_handler = StorageBootstrapHandler(app, dispatcher)
conn = self.getConnection()
uuid = self.getNewUUID()
app.uuid = 'C' * 16
......@@ -316,7 +323,7 @@ class ClientEventHandlerTest(NeoTestBase):
# Master node handler
def test_initialAnswerPrimaryMaster(self):
client_handler = PrimaryBoostrapEventHandler(None, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(None, self.getDispatcher())
conn = Mock({'getUUID': None})
self._testHandleUnexpectedPacketCalledWithMedhod(
client_handler, client_handler.handleAnswerPrimaryMaster,
......@@ -328,7 +335,7 @@ class ClientEventHandlerTest(NeoTestBase):
class App:
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': None, 'add': None})
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleAnswerPrimaryMaster(conn, None, 0, [])
# Check that nothing happened
......@@ -341,7 +348,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': None, 'add': None})
primary_master_node = None
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
test_master_list = [('127.0.0.1', 10010, self.getNewUUID())]
client_handler.handleAnswerPrimaryMaster(conn, None, INVALID_UUID, test_master_list)
......@@ -370,7 +377,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': node, 'add': None})
primary_master_node = None
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
test_node_uuid = self.getNewUUID()
test_master_list = [('127.0.0.1', 10010, test_node_uuid)]
......@@ -400,7 +407,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': node, 'add': None})
primary_master_node = None
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
test_master_list = [('127.0.0.1', 10010, test_node_uuid)]
client_handler.handleAnswerPrimaryMaster(conn, None, INVALID_UUID, test_master_list)
......@@ -437,7 +444,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': node, 'add': None})
primary_master_node = test_primary_master_node
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
# If primary master is already set *and* is not given primary master
# handle call raises.
......@@ -458,7 +465,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': node, 'add': None})
primary_master_node = node
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleAnswerPrimaryMaster(conn, None, test_node_uuid, [])
# Check that primary node is (still) node.
......@@ -474,7 +481,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': ReturnValues(node, None), 'getNodeByServer': node, 'add': None})
primary_master_node = None
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleAnswerPrimaryMaster(conn, None, test_primary_node_uuid, [])
# Test sanity checks
......@@ -492,7 +499,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': node, 'add': None})
primary_master_node = None
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
test_master_list = [('127.0.0.1', 10010, test_node_uuid)]
client_handler.handleAnswerPrimaryMaster(conn, None, test_node_uuid, test_master_list)
......@@ -508,7 +515,7 @@ class ClientEventHandlerTest(NeoTestBase):
self.assertTrue(app.primary_master_node is node)
def test_initialSendPartitionTable(self):
client_handler = PrimaryBoostrapEventHandler(None, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(None, self.getDispatcher())
conn = Mock({'getUUID': None})
self._testHandleUnexpectedPacketCalledWithMedhod(
client_handler, client_handler.handleSendPartitionTable,
......@@ -521,7 +528,7 @@ class ClientEventHandlerTest(NeoTestBase):
nm = Mock({'getNodeByUUID': node})
pt = None
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleSendPartitionTable(conn, None, 0, [])
# Check that nothing happened
......@@ -535,7 +542,7 @@ class ClientEventHandlerTest(NeoTestBase):
pt = Mock({'clear': None})
ptid = test_ptid
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleSendPartitionTable(conn, None, test_ptid + 1, [])
# Check that partition table got cleared and ptid got updated
......@@ -551,7 +558,7 @@ class ClientEventHandlerTest(NeoTestBase):
ptid = test_ptid
test_storage_uuid = self.getNewUUID()
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
# TODO: use realistic values
test_row_list = [(0, [(test_storage_uuid, 0)])]
......@@ -576,7 +583,7 @@ class ClientEventHandlerTest(NeoTestBase):
ptid = test_ptid
test_storage_uuid = self.getNewUUID()
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection()
# TODO: use realistic values
test_row_list = [(0, [(test_storage_uuid, 0)])]
......@@ -590,7 +597,7 @@ class ClientEventHandlerTest(NeoTestBase):
test_row_list[0][1][0][1])
def test_initialNotifyNodeInformation(self):
client_handler = PrimaryBoostrapEventHandler(None, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(None, self.getDispatcher())
conn = Mock({'getUUID': None})
self._testHandleUnexpectedPacketCalledWithMedhod(
client_handler, client_handler.handleNotifyNodeInformation,
......@@ -603,7 +610,7 @@ class ClientEventHandlerTest(NeoTestBase):
class App:
nm = Mock({'getNodeByUUID': node})
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection(uuid=test_master_uuid)
client_handler.handleNotifyNodeInformation(conn, None, ())
......@@ -617,7 +624,7 @@ class ClientEventHandlerTest(NeoTestBase):
class App:
nm = Mock({'getNodeByUUID': node})
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection(uuid=test_master_uuid)
self.assertRaises(TypeError, client_handler.handleNotifyNodeInformation,
conn, None, None)
......@@ -636,8 +643,8 @@ class ClientEventHandlerTest(NeoTestBase):
'add': None,
'remove': None})
app = App()
#client_handler = ClientEventHandler(app, selClientEventHandlerf.getDispatcher())
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
#client_handler = ClientHandler(app, selClientHandlerf.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = self.getConnection(uuid=test_master_uuid)
client_handler.handleNotifyNodeInformation(conn, None, test_node_list)
# Return nm so caller can check handler actions.
......@@ -705,7 +712,7 @@ class ClientEventHandlerTest(NeoTestBase):
pt = None
ptid = INVALID_PTID
app = App()
client_handler = PrimaryBoostrapEventHandler(app, self.getDispatcher())
client_handler = PrimaryBootstrapHandler(app, self.getDispatcher())
conn = Mock({'getUUID': None})
self._testHandleUnexpectedPacketCalledWithMedhod(
client_handler, client_handler.handleNotifyPartitionChanges,
......@@ -721,7 +728,7 @@ class ClientEventHandlerTest(NeoTestBase):
ptid = INVALID_PTID
primary_master_node = node
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection(uuid=test_master_uuid)
client_handler.handleNotifyPartitionChanges(conn, None, 0, [])
# Check that nothing happened
......@@ -735,7 +742,7 @@ class ClientEventHandlerTest(NeoTestBase):
ptid = INVALID_PTID
primary_master_node = None
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection()
client_handler.handleNotifyPartitionChanges(conn, None, 0, [])
# Check that nothing happened
......@@ -754,7 +761,7 @@ class ClientEventHandlerTest(NeoTestBase):
ptid = INVALID_PTID
primary_master_node = test_master_node
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection(uuid=test_sender_uuid)
client_handler.handleNotifyPartitionChanges(conn, None, 0, [])
# Check that nothing happened
......@@ -770,7 +777,7 @@ class ClientEventHandlerTest(NeoTestBase):
primary_master_node = node
ptid = test_ptid
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection(uuid=test_master_uuid)
client_handler.handleNotifyPartitionChanges(conn, None, test_ptid, [])
# Check that nothing happened
......@@ -788,7 +795,7 @@ class ClientEventHandlerTest(NeoTestBase):
ptid = test_ptid
uuid = None # XXX: Is it really needed ?
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection(uuid=test_master_uuid)
test_storage_uuid = self.getNewUUID()
# TODO: use realistic values
......@@ -818,7 +825,7 @@ class ClientEventHandlerTest(NeoTestBase):
ptid = test_ptid
uuid = uuid4
app = App()
client_handler = PrimaryEventHandler(app, self.getDispatcher())
client_handler = PrimaryNotificationsHandler(app)
conn = self.getConnection(uuid=uuid1)
test_cell_list = [
(0, uuid1, UP_TO_DATE_STATE),
......@@ -835,22 +842,19 @@ class ClientEventHandlerTest(NeoTestBase):
self.assertEquals(calls[2].getParam(0).getUUID(), uuid4)
self.assertEquals(calls[0].getParam(0).getState(), TEMPORARILY_DOWN_STATE)
self.assertEquals(calls[1].getParam(0).getState(), TEMPORARILY_DOWN_STATE)
# check the discarded cell is removed from the pt
calls = app.pt.mockGetNamedCalls('removeCell')
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(1).getUUID(), uuid2)
# and the others are updated
self.assertEqual(app.ptid, test_ptid + 1)
calls = app.pt.mockGetNamedCalls('setCell')
self.assertEqual(len(calls), 3)
self.assertEqual(len(calls), 4)
self.assertEquals(calls[0].getParam(1).getUUID(), uuid1)
self.assertEquals(calls[1].getParam(1).getUUID(), uuid3)
self.assertEquals(calls[2].getParam(1).getUUID(), uuid4)
self.assertEquals(calls[1].getParam(1).getUUID(), uuid2)
self.assertEquals(calls[2].getParam(1).getUUID(), uuid3)
self.assertEquals(calls[3].getParam(1).getUUID(), uuid4)
def test_AnswerNewTID(self):
app = Mock({'setTID': None})
dispatcher = self.getDispatcher()
client_handler = PrimaryEventHandler(app, dispatcher)
client_handler = PrimaryAnswersHandler(app, dispatcher)
conn = self.getConnection()
test_tid = 1
client_handler.handleAnswerNewTID(conn, None, test_tid)
......@@ -862,7 +866,7 @@ class ClientEventHandlerTest(NeoTestBase):
test_tid = 1
app = Mock({'getTID': test_tid, 'setTransactionFinished': None})
dispatcher = self.getDispatcher()
client_handler = PrimaryEventHandler(app, dispatcher)
client_handler = PrimaryAnswersHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleNotifyTransactionFinished(conn, None, test_tid)
self.assertEquals(len(app.mockGetNamedCalls('setTransactionFinished')), 1)
......@@ -885,7 +889,7 @@ class ClientEventHandlerTest(NeoTestBase):
mq_cache = Mock({'__delitem__': None})
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = PrimaryNotificationsHandler(app )
conn = self.getConnection()
test_tid = 1
test_oid_list = ['\x00\x00\x00\x00\x00\x00\x00\x01', '\x00\x00\x00\x00\x00\x00\x00\x02']
......@@ -915,7 +919,7 @@ class ClientEventHandlerTest(NeoTestBase):
new_oid_list = []
app = App()
dispatcher = self.getDispatcher()
client_handler = PrimaryEventHandler(app, dispatcher)
client_handler = PrimaryAnswersHandler(app, dispatcher)
conn = self.getConnection()
test_oid_list = ['\x00\x00\x00\x00\x00\x00\x00\x01', '\x00\x00\x00\x00\x00\x00\x00\x02']
client_handler.handleAnswerNewOIDs(conn, None, test_oid_list[:])
......@@ -933,7 +937,7 @@ class ClientEventHandlerTest(NeoTestBase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
# TODO: use realistic values
test_object_data = ('\x00\x00\x00\x00\x00\x00\x00\x01', 0, 0, 0, 0, 'test')
......@@ -942,7 +946,7 @@ class ClientEventHandlerTest(NeoTestBase):
def _testAnswerStoreObject(self, app, conflicting, oid, serial):
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleAnswerStoreObject(conn, None, conflicting, oid, serial)
......@@ -970,7 +974,7 @@ class ClientEventHandlerTest(NeoTestBase):
test_tid = 10
app = Mock({'getTID': test_tid, 'setTransactionVoted': None})
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleAnswerStoreTransaction(conn, None, test_tid)
self.assertEquals(len(app.mockGetNamedCalls('setTransactionVoted')), 1)
......@@ -983,7 +987,7 @@ class ClientEventHandlerTest(NeoTestBase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
tid = '\x00\x00\x00\x00\x00\x00\x00\x01' # TODO: use a more realistic tid
user = 'bar'
......@@ -1005,7 +1009,7 @@ class ClientEventHandlerTest(NeoTestBase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
test_oid = '\x00\x00\x00\x00\x00\x00\x00\x01'
# TODO: use realistic values
......@@ -1024,7 +1028,7 @@ class ClientEventHandlerTest(NeoTestBase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleOidNotFound(conn, None, None)
self.assertEquals(app.local_var.asked_object, -1)
......@@ -1037,7 +1041,7 @@ class ClientEventHandlerTest(NeoTestBase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleTidNotFound(conn, None, None)
self.assertEquals(app.local_var.txn_info, -1)
......@@ -1049,7 +1053,7 @@ class ClientEventHandlerTest(NeoTestBase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = StorageEventHandler(app, dispatcher)
client_handler = StorageAnswersHandler(app, dispatcher)
conn = self.getConnection()
test_tid_list = ['\x00\x00\x00\x00\x00\x00\x00\x01', '\x00\x00\x00\x00\x00\x00\x00\x02']
client_handler.handleAnswerTIDs(conn, None, test_tid_list[:])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment