Commit 7f06df6f authored by Julien Muchembled's avatar Julien Muchembled

Prevent removing nodes that contain the only readable cells

And contrary to previous implementation, there is no automatic reallocation
of partition amongst remaining nodes. Now only 'tweak' does that.
parent a1539219
...@@ -19,6 +19,7 @@ import random ...@@ -19,6 +19,7 @@ import random
from . import MasterHandler from . import MasterHandler
from ..app import StateChangedException from ..app import StateChangedException
from neo.lib import logging from neo.lib import logging
from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError
from neo.lib.protocol import Errors, uuid_str from neo.lib.protocol import Errors, uuid_str
from neo.lib.util import dump from neo.lib.util import dump
...@@ -100,21 +101,18 @@ class AdministrationHandler(MasterHandler): ...@@ -100,21 +101,18 @@ class AdministrationHandler(MasterHandler):
node.setState(state) node.setState(state)
elif state == NodeStates.DOWN and node.isStorage(): elif state == NodeStates.DOWN and node.isStorage():
# update it's state try:
cell_list = app.pt.dropNodeList([node],
not modify_partition_table)
except PartitionTableException, e:
raise ProtocolError(str(e))
node.setState(state) node.setState(state)
if node.isConnected(): if node.isConnected():
# notify itself so it can shutdown # notify itself so it can shutdown
node.notify(Packets.NotifyNodeInformation([node.asTuple()])) node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
# close to avoid handle the closure as a connection lost # close to avoid handle the closure as a connection lost
node.getConnection().abort() node.getConnection().abort()
# modify the partition table if required if not modify_partition_table:
cell_list = []
if modify_partition_table:
# remove from pt
cell_list = app.pt.dropNode(node)
app.nm.remove(node)
else:
# outdate node in partition table
cell_list = app.pt.outdate() cell_list = app.pt.outdate()
app.broadcastPartitionChanges(cell_list) app.broadcastPartitionChanges(cell_list)
......
...@@ -98,35 +98,45 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -98,35 +98,45 @@ class PartitionTable(neo.lib.pt.PartitionTable):
min_count = count min_count = count
return min_node return min_node
def dropNode(self, node): def dropNodeList(self, node_list, simulate=False):
cell_list = [] partition_list = []
uuid = node.getUUID() change_list = []
feeding_list = []
for offset, row in enumerate(self.partition_list): for offset, row in enumerate(self.partition_list):
if row is None: new_row = []
continue partition_list.append(new_row)
feeding = None
drop_readable = uptodate = False
for cell in row: for cell in row:
if cell.getNode() is node: node = cell.getNode()
if not cell.isFeeding(): if node in node_list:
# If this cell is not feeding, find another node change_list.append((offset, node.getUUID(),
# to be added. CellStates.DISCARDED))
node_list = [c.getNode() for c in row] if cell.isReadable():
n = self.findLeastUsedNode(node_list) drop_readable = True
if n is not None: else:
row.append(Cell(n, new_row.append(cell)
CellStates.OUT_OF_DATE)) if cell.isFeeding():
self.count_dict[n] += 1 feeding = cell
cell_list.append((offset, n.getUUID(), elif cell.isUpToDate():
CellStates.OUT_OF_DATE)) uptodate = True
row.remove(cell) if feeding is not None:
cell_list.append((offset, uuid, CellStates.DISCARDED)) if len(new_row) < len(row):
break change_list.append((offset, feeding.getUUID(),
CellStates.UP_TO_DATE))
try: feeding_list.append(feeding)
del self.count_dict[node] elif drop_readable and not uptodate:
except KeyError: raise neo.lib.pt.PartitionTableException(
pass "Refuse to drop nodes that contain the only readable"
" copies of partition %u" % offset)
return cell_list if not simulate:
self.partition_list = partition_list
for cell in feeding_list:
cell.setState(CellStates.UP_TO_DATE)
self.count_dict[cell.getNode()] += 1
for node in node_list:
self.count_dict.pop(node, None)
return change_list
def load(self, ptid, row_list, nm): def load(self, ptid, row_list, nm):
""" """
......
...@@ -606,7 +606,10 @@ class NEOCluster(object): ...@@ -606,7 +606,10 @@ class NEOCluster(object):
def expectClusterState(self, state, *args, **kw): def expectClusterState(self, state, *args, **kw):
def callback(last_try): def callback(last_try):
current_try = self.neoctl.getClusterState() try:
current_try = self.neoctl.getClusterState()
except NotReadyException:
current_try = None
return current_try == state, current_try return current_try == state, current_try
self.expectCondition(callback, *args, **kw) self.expectCondition(callback, *args, **kw)
......
...@@ -335,6 +335,9 @@ class StorageTests(NEOFunctionalTest): ...@@ -335,6 +335,9 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectStorageNotKnown(started[0]) self.neo.expectStorageNotKnown(started[0])
self.neo.expectAssignedCells(started[0], 0) self.neo.expectAssignedCells(started[0], 0)
self.neo.expectAssignedCells(started[1], 10) self.neo.expectAssignedCells(started[1], 10)
self.assertRaises(RuntimeError, self.neo.neoctl.dropNode,
started[1].getUUID())
self.neo.expectClusterRunning()
def testReplicationThenRunningWithReplicas(self): def testReplicationThenRunningWithReplicas(self):
""" Add a replicas to a cluster, wait for the replication to finish, """ Add a replicas to a cluster, wait for the replication to finish,
...@@ -393,23 +396,6 @@ class StorageTests(NEOFunctionalTest): ...@@ -393,23 +396,6 @@ class StorageTests(NEOFunctionalTest):
self.__setup(storage_number=2, partitions=5000, master_count=1) self.__setup(storage_number=2, partitions=5000, master_count=1)
self.neo.expectClusterState(ClusterStates.RUNNING) self.neo.expectClusterState(ClusterStates.RUNNING)
def testDropNodeWithOtherPending(self):
""" Ensure we can drop a node """
# start with one storage
(started, stopped) = self.__setup(storage_number=2, replicas=1,
pending_number=1, partitions=10)
self.neo.expectRunning(started[0])
self.neo.expectStorageNotKnown(stopped[0])
self.neo.expectOudatedCells(number=0)
self.neo.expectClusterRunning()
# set the second storage in pending state and drop the first
stopped[0].start()
self.neo.expectPending(stopped[0])
self.neo.neoctl.dropNode(started[0].getUUID())
self.neo.expectStorageNotKnown(started[0])
self.neo.expectPending(stopped[0])
def testRecoveryWithMultiplePT(self): def testRecoveryWithMultiplePT(self):
# start a cluster with 2 storages and a replica # start a cluster with 2 storages and a replica
(started, stopped) = self.__setup(storage_number=2, replicas=1, (started, stopped) = self.__setup(storage_number=2, replicas=1,
......
...@@ -19,6 +19,7 @@ from collections import defaultdict ...@@ -19,6 +19,7 @@ from collections import defaultdict
from mock import Mock from mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.protocol import NodeStates, CellStates from neo.lib.protocol import NodeStates, CellStates
from neo.lib.pt import PartitionTableException
from neo.master.pt import PartitionTable from neo.master.pt import PartitionTable
from neo.lib.node import StorageNode from neo.lib.node import StorageNode
...@@ -143,64 +144,35 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -143,64 +144,35 @@ class MasterPartitionTableTests(NeoUnitTestBase):
cell = cells[0] cell = cells[0]
self.assertEqual(cell.getState(), CellStates.UP_TO_DATE) self.assertEqual(cell.getState(), CellStates.UP_TO_DATE)
def test_15_dropNode(self): def test_15_dropNodeList(self):
num_partitions = 4 sn = [StorageNode(Mock(), None, i + 1, NodeStates.RUNNING)
num_replicas = 2 for i in xrange(3)]
pt = PartitionTable(num_partitions, num_replicas) pt = PartitionTable(3, 0)
# add nodes pt.setCell(0, sn[0], CellStates.OUT_OF_DATE)
uuid1 = self.getStorageUUID() pt.setCell(1, sn[1], CellStates.FEEDING)
server1 = ("127.0.0.1", 19001) pt.setCell(1, sn[2], CellStates.OUT_OF_DATE)
sn1 = StorageNode(Mock(), server1, uuid1, NodeStates.RUNNING) pt.setCell(2, sn[0], CellStates.OUT_OF_DATE)
uuid2 = self.getStorageUUID() pt.setCell(2, sn[1], CellStates.FEEDING)
server2 = ("127.0.0.2", 19002) pt.setCell(2, sn[2], CellStates.UP_TO_DATE)
sn2 = StorageNode(Mock(), server2, uuid2, NodeStates.RUNNING)
uuid3 = self.getStorageUUID() self.assertEqual(sorted(pt.dropNodeList(sn[:1], True)), [
server3 = ("127.0.0.3", 19001) (0, 1, CellStates.DISCARDED),
sn3 = StorageNode(Mock(), server3, uuid3, NodeStates.RUNNING) (2, 1, CellStates.DISCARDED),
uuid4 = self.getStorageUUID() (2, 2, CellStates.UP_TO_DATE)])
server4 = ("127.0.0.4", 19001)
sn4 = StorageNode(Mock(), server4, uuid4, NodeStates.RUNNING) self.assertEqual(sorted(pt.dropNodeList(sn[2:], True)), [
# partition looks like: (1, 2, CellStates.UP_TO_DATE),
# 0 : sn1, sn2 (1, 3, CellStates.DISCARDED),
# 1 : sn1, sn3 (2, 2, CellStates.UP_TO_DATE),
# 2 : sn1, sn3 (2, 3, CellStates.DISCARDED)])
# 3 : sn1, sn4
# node is not feeding, so retrive least use node to replace it self.assertRaises(PartitionTableException, pt.dropNodeList, sn[1:2])
# so sn2 must be repaced by sn4 in partition 0 pt.setCell(1, sn[2], CellStates.UP_TO_DATE)
pt.setCell(0, sn1, CellStates.UP_TO_DATE) self.assertEqual(sorted(pt.dropNodeList(sn[1:2])), [
pt.setCell(0, sn2, CellStates.UP_TO_DATE) (1, 2, CellStates.DISCARDED),
pt.setCell(1, sn1, CellStates.UP_TO_DATE) (2, 2, CellStates.DISCARDED)])
pt.setCell(1, sn3, CellStates.UP_TO_DATE)
pt.setCell(2, sn1, CellStates.UP_TO_DATE) self.assertEqual(self.tweak(pt), [(2, 3, CellStates.FEEDING)])
pt.setCell(2, sn3, CellStates.UP_TO_DATE)
pt.setCell(3, sn1, CellStates.UP_TO_DATE)
pt.setCell(3, sn4, CellStates.UP_TO_DATE)
self.assertEqual(sorted(pt.dropNode(sn2)), [
(0, uuid2, CellStates.DISCARDED),
(0, uuid4, CellStates.OUT_OF_DATE)])
for x in xrange(num_partitions):
self.assertEqual(len(pt.getCellList(x)), 2)
# same test but with feeding state, no other will be added
pt.clear()
pt.setCell(0, sn1, CellStates.UP_TO_DATE)
pt.setCell(0, sn2, CellStates.FEEDING)
pt.setCell(1, sn1, CellStates.UP_TO_DATE)
pt.setCell(1, sn3, CellStates.UP_TO_DATE)
pt.setCell(2, sn1, CellStates.UP_TO_DATE)
pt.setCell(2, sn3, CellStates.UP_TO_DATE)
pt.setCell(3, sn1, CellStates.UP_TO_DATE)
pt.setCell(3, sn4, CellStates.UP_TO_DATE)
cell_list = pt.dropNode(sn2)
self.assertEqual(len(cell_list), 1)
for offset, uuid, state in cell_list:
self.assertEqual(offset, 0)
self.assertEqual(state, CellStates.DISCARDED)
self.assertEqual(uuid, uuid2)
for x in xrange(num_partitions):
if x == 0:
self.assertEqual(len(pt.getCellList(x)), 1)
else:
self.assertEqual(len(pt.getCellList(x)), 2)
def test_16_make(self): def test_16_make(self):
num_partitions = 5 num_partitions = 5
...@@ -319,8 +291,10 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -319,8 +291,10 @@ class MasterPartitionTableTests(NeoUnitTestBase):
self.update(pt, change_list) self.update(pt, change_list)
self.checkPT(pt) self.checkPT(pt)
for offset in pt.getAssignedPartitionList(sn[1].getUUID()): self.assertRaises(PartitionTableException, pt.dropNodeList, sn[1:4])
pt.removeCell(offset, sn[1]) self.assertEqual(6, len(pt.dropNodeList(sn[1:3], True)))
self.assertEqual(3, len(pt.dropNodeList([sn[1]])))
pt.addNodeList([sn[1]])
change_list = self.tweak(pt) change_list = self.tweak(pt)
self.assertEqual(3, len(change_list)) self.assertEqual(3, len(change_list))
self.update(pt, change_list) self.update(pt, change_list)
......
...@@ -302,20 +302,20 @@ class Test(NEOThreadedTest): ...@@ -302,20 +302,20 @@ class Test(NEOThreadedTest):
def test_notifyNodeInformation(self): def test_notifyNodeInformation(self):
# translated from MasterNotificationsHandlerTests # translated from MasterNotificationsHandlerTests
# (neo.tests.client.testMasterHandler) # (neo.tests.client.testMasterHandler)
cluster = NEOCluster() cluster = NEOCluster(replicas=1)
try: try:
cluster.start() cluster.start()
cluster.db # open DB cluster.db # open DB
cluster.client.setPoll(0) cluster.client.setPoll(0)
storage, = cluster.client.nm.getStorageList() s0, s1 = cluster.client.nm.getStorageList()
conn = storage.getConnection() conn = s0.getConnection()
self.assertFalse(conn.isClosed()) self.assertFalse(conn.isClosed())
getCellSortKey = cluster.client.cp.getCellSortKey getCellSortKey = cluster.client.cp.getCellSortKey
self.assertEqual(getCellSortKey(storage), CELL_CONNECTED) self.assertEqual(getCellSortKey(s0), CELL_CONNECTED)
cluster.neoctl.dropNode(cluster.storage.uuid) cluster.neoctl.dropNode(s0.getUUID())
self.assertFalse(cluster.client.nm.getStorageList()) self.assertEqual([s1], cluster.client.nm.getStorageList())
self.assertTrue(conn.isClosed()) self.assertTrue(conn.isClosed())
self.assertEqual(getCellSortKey(storage), CELL_GOOD) self.assertEqual(getCellSortKey(s0), CELL_GOOD)
# XXX: the test originally checked that 'unregister' method # XXX: the test originally checked that 'unregister' method
# was called (even if it's useless in this case), # was called (even if it's useless in this case),
# but we would need an API to do that easily. # but we would need an API to do that easily.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment