Commit 7ffc96fd authored by Julien Muchembled's avatar Julien Muchembled

master: make sure that storage nodes have an up-to-date PT/NM when they're added

This revert commit bddc1802,
to fix the following storage crash:

  Traceback (most recent call last):
    ...
    File "neo/lib/handler.py", line 72, in dispatch
      method(conn, *args, **kw)
    File "neo/storage/handlers/master.py", line 44, in notifyPartitionChanges
      app.pt.update(ptid, cell_list, app.nm)
    File "neo/lib/pt.py", line 231, in update
      assert node is not None, 'No node found for uuid ' + uuid_str(uuid)
  AssertionError: No node found for uuid S3

Partitition table updates must also be processed with InitializationHandler
when nodes remain in PENDING state because they're not added to the cluster.
parent 9e433594
......@@ -256,7 +256,10 @@ class Application(BaseApplication):
# send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType())
if node_list and node.isRunning() and node is not exclude:
# We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters.
if node_list and node is not exclude:
node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list):
......@@ -266,7 +269,9 @@ class Application(BaseApplication):
self.pt.logUpdated()
packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList():
if node.isRunning() and not node.isMaster():
# As for broadcastNodesInformation, we don't send the full PT
# when pending storage nodes are added, so keep them notified.
if not node.isMaster():
node.send(packet)
def provideService(self):
......
......@@ -18,7 +18,8 @@ import weakref
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, StoppedOperation
from neo.lib.protocol import uuid_str, NodeStates, NodeTypes, Packets
from neo.lib.protocol import (uuid_str,
NodeStates, NodeTypes, Packets, ProtocolError)
class BaseHandler(EventHandler):
......@@ -66,6 +67,15 @@ class BaseMasterHandler(BaseHandler):
uuid_str(uuid))
self.app.tm.abortFor(uuid)
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id')
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
......
......@@ -23,7 +23,7 @@ class InitializationHandler(BaseMasterHandler):
def sendPartitionTable(self, conn, ptid, row_list):
app = self.app
pt = app.pt
pt.load(ptid, row_list, self.app.nm)
pt.load(ptid, row_list, app.nm)
if not pt.filled():
raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence.
......
......@@ -32,20 +32,9 @@ class MasterOperationHandler(BaseMasterHandler):
dm.commit()
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid <= app.pt.getID():
# Ignore this packet.
logging.debug('ignoring older partition changes')
return
# update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
# Check changes for replications
app.replicator.notifyPartitionChanges(cell_list)
super(MasterOperationHandler, self).notifyPartitionChanges(
conn, ptid, cell_list)
self.app.replicator.notifyPartitionChanges(cell_list)
def askLockInformation(self, conn, ttid, tid):
self.app.tm.lock(ttid, tid)
......
......@@ -50,9 +50,6 @@ class MasterAppTests(NeoUnitTestBase):
master.setConnection(master_conn)
storage.setConnection(storage_conn)
client.setConnection(client_conn)
master.setRunning()
client.setRunning()
storage.setRunning()
self.app.nm.add(storage)
self.app.nm.add(client)
......@@ -87,15 +84,6 @@ class MasterAppTests(NeoUnitTestBase):
self.checkNoPacketSent(master_conn)
self.checkNotifyNodeInformation(storage_conn)
# node not running, don't send informations
client.setPending()
self.app.broadcastNodesInformation([s_node])
# check conn
self.assertFalse(client_conn.mockGetNamedCalls('send'))
self.checkNoPacketSent(master_conn)
self.checkNotifyNodeInformation(storage_conn)
if __name__ == '__main__':
unittest.main()
......@@ -20,7 +20,7 @@ from .. import NeoUnitTestBase
from neo.storage.app import Application
from neo.storage.handlers.master import MasterOperationHandler
from neo.lib.pt import PartitionTable
from neo.lib.protocol import CellStates
from neo.lib.protocol import CellStates, ProtocolError
class StorageMasterHandlerTests(NeoUnitTestBase):
......@@ -55,7 +55,8 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
app.replicator = Mock({})
self.app.pt = Mock({'getID': 1})
count = len(self.app.nm.getList())
self.operation.notifyPartitionChanges(conn, 0, ())
self.assertRaises(ProtocolError, self.operation.notifyPartitionChanges,
conn, 0, ())
self.assertEqual(self.app.pt.getID(), 1)
self.assertEqual(len(self.app.nm.getList()), count)
calls = self.app.replicator.mockGetNamedCalls('removePartition')
......@@ -78,18 +79,18 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
app.nm.createStorage(uuid=uuid1)
app.nm.createStorage(uuid=uuid2)
app.nm.createStorage(uuid=uuid3)
ptid1, ptid2 = (1, 2)
self.assertNotEqual(ptid1, ptid2)
app.pt = PartitionTable(3, 1)
app.pt._id = 1
ptid = 2
app.dm = Mock({ })
app.replicator = Mock({})
self.operation.notifyPartitionChanges(conn, ptid2, cells)
self.operation.notifyPartitionChanges(conn, ptid, cells)
# ptid set
self.assertEqual(app.pt.getID(), ptid2)
self.assertEqual(app.pt.getID(), ptid)
# dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(ptid2, cells)
calls[0].checkArgs(ptid, cells)
if __name__ == "__main__":
unittest.main()
......@@ -1244,6 +1244,7 @@ class Test(NEOThreadedTest):
sys.exit()
def askPartitionTable(orig, self, conn):
p.revert()
del conn._queue[:] # XXX
conn.close()
if 1:
with Patch(cluster.master.pt, make=make), \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment