Commit 2ca7c335 authored by Julien Muchembled's avatar Julien Muchembled

master: improve algorithm to tweak the partition table

The most important change is that it does not discard readable cells too
quickly anymore. A partition can now have multiple FEEDING cells, to avoid
going below the wanted level of replication.

The new algorithm is also better at minimizing the amount replication.
parent ca58ccd7
...@@ -174,8 +174,9 @@ class AdministrationHandler(MasterHandler): ...@@ -174,8 +174,9 @@ class AdministrationHandler(MasterHandler):
ClusterStates.BACKINGUP): ClusterStates.BACKINGUP):
raise ProtocolError('Can not tweak partition table in %s state' raise ProtocolError('Can not tweak partition table in %s state'
% state) % state)
app.broadcastPartitionChanges(app.pt.tweak( app.broadcastPartitionChanges(app.pt.tweak([node
map(app.nm.getByUUID, uuid_list))) for node in app.nm.getStorageList()
if node.getUUID() in uuid_list or not node.isRunning()]))
conn.answer(Errors.Ack('')) conn.answer(Errors.Ack(''))
def truncate(self, conn, tid): def truncate(self, conn, tid):
......
This diff is collapsed.
...@@ -190,6 +190,11 @@ class NeoTestBase(unittest.TestCase): ...@@ -190,6 +190,11 @@ class NeoTestBase(unittest.TestCase):
"Mock objects can't be compared with '==' or '!='" "Mock objects can't be compared with '==' or '!='"
return super(NeoTestBase, self).assertEqual(first, second, msg=msg) return super(NeoTestBase, self).assertEqual(first, second, msg=msg)
def assertPartitionTable(self, pt, expected, key=None):
self.assertEqual(
expected if isinstance(expected, str) else '|'.join(expected),
'|'.join(pt._formatRows(sorted(pt.count_dict, key=key))))
class NeoUnitTestBase(NeoTestBase): class NeoUnitTestBase(NeoTestBase):
""" Base class for neo tests, implements common checks """ """ Base class for neo tests, implements common checks """
......
...@@ -14,9 +14,10 @@ ...@@ -14,9 +14,10 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import random, time, unittest
from collections import defaultdict from collections import defaultdict
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib import logging
from neo.lib.protocol import NodeStates, CellStates from neo.lib.protocol import NodeStates, CellStates
from neo.lib.pt import PartitionTableException from neo.lib.pt import PartitionTableException
from neo.master.pt import PartitionTable from neo.master.pt import PartitionTable
...@@ -45,7 +46,7 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -45,7 +46,7 @@ class MasterPartitionTableTests(NeoUnitTestBase):
self.assertEqual(len(pt.getRow(x)), 0) self.assertEqual(len(pt.getRow(x)), 0)
self.assertFalse(pt.operational()) self.assertFalse(pt.operational())
self.assertFalse(pt.filled()) self.assertFalse(pt.filled())
self.assertRaises(RuntimeError, pt.make, []) self.assertRaises(AssertionError, pt.make, [])
self.assertFalse(pt.operational()) self.assertFalse(pt.operational())
self.assertFalse(pt.filled()) self.assertFalse(pt.filled())
...@@ -132,77 +133,35 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -132,77 +133,35 @@ class MasterPartitionTableTests(NeoUnitTestBase):
(1, 2, CellStates.DISCARDED), (1, 2, CellStates.DISCARDED),
(2, 2, CellStates.DISCARDED)]) (2, 2, CellStates.DISCARDED)])
pt._setCell(0, sn[0], CellStates.UP_TO_DATE)
self.assertEqual(self.tweak(pt), [(2, 3, CellStates.FEEDING)]) self.assertEqual(self.tweak(pt), [(2, 3, CellStates.FEEDING)])
def test_16_make(self): def test_16_make(self):
num_partitions = 5 node_list = [self.createStorage(
num_replicas = 1 ("127.0.0.1", 19000 + i), self.getStorageUUID(),
pt = PartitionTable(num_partitions, num_replicas) NodeStates.RUNNING)
# add nodes for i in xrange(4)]
uuid1 = self.getStorageUUID() for np, nr, expected in (
server1 = ("127.0.0.1", 19001) (3, 0, 'U..|.U.|..U'),
sn1 = self.createStorage(server1, uuid1, NodeStates.RUNNING) (5, 1, 'UU..|..UU|UU..|..UU|UU..'),
# add not running node (9, 2, 'UUU.|UU.U|U.UU|.UUU|UUU.|UU.U|U.UU|.UUU|UUU.'),
uuid2 = self.getStorageUUID() ):
server2 = ("127.0.0.2", 19001) pt = PartitionTable(np, nr)
sn2 = self.createStorage(server2, uuid2) pt.make(node_list)
sn2.setState(NodeStates.DOWN) self.assertPartitionTable(pt, expected)
# add node without uuid self.assertTrue(pt.filled())
server3 = ("127.0.0.3", 19001) self.assertTrue(pt.operational())
sn3 = self.createStorage(server3, None, NodeStates.RUNNING) # create a pt with less nodes
# add clear node pt.clear()
uuid4 = self.getStorageUUID() self.assertFalse(pt.filled())
server4 = ("127.0.0.4", 19001) self.assertFalse(pt.operational())
sn4 = self.createStorage(server4, uuid4, NodeStates.RUNNING) pt.make(node_list[:1])
uuid5 = self.getStorageUUID() self.assertPartitionTable(pt, '|'.join('U' * np))
server5 = ("127.0.0.5", 1900) self.assertTrue(pt.filled())
sn5 = self.createStorage(server5, uuid5, NodeStates.RUNNING) self.assertTrue(pt.operational())
# make the table
pt.make([sn1, sn2, sn3, sn4, sn5])
# check it's ok, only running nodes and node with uuid
# must be present
for x in xrange(num_partitions):
cells = pt.getCellList(x)
self.assertEqual(len(cells), 2)
nodes = [x.getNode() for x in cells]
for node in nodes:
self.assertTrue(node in (sn1, sn4, sn5))
self.assertTrue(node not in (sn2, sn3))
self.assertTrue(pt.filled())
self.assertTrue(pt.operational())
# create a pt with less nodes
pt.clear()
self.assertFalse(pt.filled())
self.assertFalse(pt.operational())
pt.make([sn1])
# check it's ok
for x in xrange(num_partitions):
cells = pt.getCellList(x)
self.assertEqual(len(cells), 1)
nodes = [x.getNode() for x in cells]
for node in nodes:
self.assertEqual(node, sn1)
self.assertTrue(pt.filled())
self.assertTrue(pt.operational())
def _pt_states(self, pt):
node_dict = defaultdict(list)
for offset, row in enumerate(pt.partition_list):
for cell in row:
state_list = node_dict[cell.getNode()]
if state_list:
self.assertTrue(state_list[-1][0] < offset)
state_list.append((offset, str(cell.getState())[0]))
return map(dict, sorted(node_dict.itervalues()))
def checkPT(self, pt, exclude_empty=False):
new_pt = PartitionTable(pt.np, pt.nr)
new_pt.make(node for node, count in pt.count_dict.iteritems()
if count or not exclude_empty)
self.assertEqual(self._pt_states(pt), self._pt_states(new_pt))
def update(self, pt, change_list=None): def update(self, pt, change_list=None):
offset_list = range(pt.np) offset_list = xrange(pt.np)
for node in pt.count_dict: for node in pt.count_dict:
pt.updatable(node.getUUID(), offset_list) pt.updatable(node.getUUID(), offset_list)
if change_list is None: if change_list is None:
...@@ -215,9 +174,11 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -215,9 +174,11 @@ class MasterPartitionTableTests(NeoUnitTestBase):
for offset, uuid, state in change_list: for offset, uuid, state in change_list:
if state is CellStates.OUT_OF_DATE: if state is CellStates.OUT_OF_DATE:
pt.setUpToDate(node_dict[uuid], offset) pt.setUpToDate(node_dict[uuid], offset)
pt.log()
def tweak(self, pt, drop_list=()): def tweak(self, pt, drop_list=()):
change_list = pt.tweak(drop_list) change_list = pt.tweak(drop_list)
pt.log()
self.assertFalse(pt.tweak(drop_list)) self.assertFalse(pt.tweak(drop_list))
return change_list return change_list
...@@ -225,6 +186,7 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -225,6 +186,7 @@ class MasterPartitionTableTests(NeoUnitTestBase):
sn = [self.createStorage(None, i + 1, NodeStates.RUNNING) sn = [self.createStorage(None, i + 1, NodeStates.RUNNING)
for i in xrange(5)] for i in xrange(5)]
pt = PartitionTable(5, 2) pt = PartitionTable(5, 2)
pt.setID(1)
# part 0 # part 0
pt._setCell(0, sn[0], CellStates.DISCARDED) pt._setCell(0, sn[0], CellStates.DISCARDED)
pt._setCell(0, sn[1], CellStates.UP_TO_DATE) pt._setCell(0, sn[1], CellStates.UP_TO_DATE)
...@@ -246,45 +208,108 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -246,45 +208,108 @@ class MasterPartitionTableTests(NeoUnitTestBase):
pt._setCell(4, sn[4], CellStates.UP_TO_DATE) pt._setCell(4, sn[4], CellStates.UP_TO_DATE)
count_dict = defaultdict(int) count_dict = defaultdict(int)
self.assertPartitionTable(pt, (
'.U...',
'FFO..',
'FUU..',
'UUUU.',
'U...U'))
change_list = self.tweak(pt) change_list = self.tweak(pt)
self.assertPartitionTable(pt, (
'.UO.O',
'UU.O.',
'UFU.O',
'.UUU.',
'U..OU'))
for offset, uuid, state in change_list: for offset, uuid, state in change_list:
count_dict[state] += 1 count_dict[state] += 1
self.assertEqual(count_dict, {CellStates.DISCARDED: 3, self.assertEqual(count_dict, {CellStates.DISCARDED: 2,
CellStates.FEEDING: 1,
CellStates.OUT_OF_DATE: 5, CellStates.OUT_OF_DATE: 5,
CellStates.UP_TO_DATE: 3}) CellStates.UP_TO_DATE: 3})
self.update(pt, change_list) self.update(pt)
self.checkPT(pt) self.assertPartitionTable(pt, (
'.UU.U',
'UU.U.',
'U.U.U',
'.UUU.',
'U..UU'))
self.assertRaises(PartitionTableException, pt.dropNodeList, sn[1:4]) self.assertRaises(PartitionTableException, pt.dropNodeList, sn[1:4])
self.assertEqual(6, len(pt.dropNodeList(sn[1:3], True))) self.assertEqual(6, len(pt.dropNodeList(sn[1:3], True)))
self.assertEqual(3, len(pt.dropNodeList([sn[1]]))) self.assertEqual(3, len(pt.dropNodeList([sn[1]])))
pt.addNodeList([sn[1]]) pt.addNodeList([sn[1]])
self.assertPartitionTable(pt, (
'..U.U',
'U..U.',
'U.U.U',
'..UU.',
'U..UU'))
change_list = self.tweak(pt) change_list = self.tweak(pt)
self.assertPartitionTable(pt, (
'.OU.U',
'UO.U.',
'U.U.U',
'.OUU.',
'U..UU'))
self.assertEqual(3, len(change_list)) self.assertEqual(3, len(change_list))
self.update(pt, change_list) self.update(pt, change_list)
self.checkPT(pt)
for np, i in (12, 0), (12, 1), (13, 2): for np, i, expected in (
(12, 0, ('U...|.U..|..U.|...U|'
'U...|.U..|..U.|...U|'
'U...|.U..|..U.|...U',)),
(12, 1, ('UU...|..UU.|U...U|.UU..|...UU|'
'UU...|..UU.|U...U|.UU..|...UU|'
'UU...|..UU.',)),
(13, 2, ('U.UU.|.U.UU|UUU..|..UUU|UU..U|'
'U.UU.|.U.UU|UUU..|..UUU|UU..U|'
'U.UU.|.U.UU|UUU..',
'UUU..|U..UU|.UUU.|UU..U|..UUU|'
'UUU..|U..UU|.UUU.|UU..U|..UUU|'
'UUU..|U..UU|.UUU.')),
):
pt = PartitionTable(np, i) pt = PartitionTable(np, i)
i += 1 i += 1
pt.make(sn[:i]) pt.make(sn[:i])
pt.log()
for n in sn[i:i+3]: for n in sn[i:i+3]:
self.assertEqual([n], pt.addNodeList([n])) self.assertEqual([n], pt.addNodeList([n]))
self.update(pt, self.tweak(pt)) self.update(pt, self.tweak(pt))
self.checkPT(pt) self.assertPartitionTable(pt, expected[0])
pt.clear() pt.clear()
pt.make(sn[:i]) pt.make(sn[:i])
for n in sn[i:i+3]: for n in sn[i:i+3]:
self.assertEqual([n], pt.addNodeList([n])) self.assertEqual([n], pt.addNodeList([n]))
self.tweak(pt) self.tweak(pt)
self.update(pt) self.update(pt)
self.checkPT(pt) self.assertPartitionTable(pt, expected[-1])
pt = PartitionTable(7, 0) pt = PartitionTable(7, 0)
pt.make(sn[:1]) pt.make(sn[:1])
pt.addNodeList(sn[1:3]) pt.addNodeList(sn[1:3])
self.assertPartitionTable(pt, 'U..|U..|U..|U..|U..|U..|U..')
self.update(pt, self.tweak(pt, sn[:1])) self.update(pt, self.tweak(pt, sn[:1]))
self.checkPT(pt, True) self.assertPartitionTable(pt, '.U.|..U|.U.|..U|.U.|..U|.U.')
def test_18_tweak(self):
s = repr(time.time())
logging.info("using seed %r", s)
r = random.Random(s)
sn_count = 11
sn = [self.createStorage(None, i + 1, NodeStates.RUNNING)
for i in xrange(sn_count)]
pt = PartitionTable(1000, 2)
pt.setID(1)
for offset in xrange(pt.np):
state = CellStates.UP_TO_DATE
k = r.randrange(1, sn_count)
for s in r.sample(sn, k):
pt._setCell(offset, s, state)
if k * r.random() < 1:
state = CellStates.OUT_OF_DATE
pt.log()
self.tweak(pt)
self.update(pt)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -1025,11 +1025,11 @@ class NEOThreadedTest(NeoTestBase): ...@@ -1025,11 +1025,11 @@ class NEOThreadedTest(NeoTestBase):
with Patch(client, _getFinalTID=lambda *_: None): with Patch(client, _getFinalTID=lambda *_: None):
self.assertRaises(ConnectionClosed, txn.commit) self.assertRaises(ConnectionClosed, txn.commit)
def assertPartitionTable(self, cluster, stats, pt_node=None): def assertPartitionTable(self, cluster, expected, pt_node=None):
pt = (pt_node or cluster.admin).pt
index = [x.uuid for x in cluster.storage_list].index index = [x.uuid for x in cluster.storage_list].index
self.assertEqual(stats, '|'.join(pt._formatRows(sorted( super(NEOThreadedTest, self).assertPartitionTable(
pt.count_dict, key=lambda x: index(x.getUUID()))))) (pt_node or cluster.admin).pt, expected,
lambda x: index(x.getUUID()))
@staticmethod @staticmethod
def noConnection(jar, storage): def noConnection(jar, storage):
......
...@@ -317,13 +317,13 @@ class ReplicationTests(NEOThreadedTest): ...@@ -317,13 +317,13 @@ class ReplicationTests(NEOThreadedTest):
s2.start() s2.start()
self.tic() self.tic()
cluster.enableStorageList([s2]) cluster.enableStorageList([s2])
# 2 UP_TO_DATE cells should become FEEDING, # 2 UP_TO_DATE cells become FEEDING:
# and be dropped only when the replication is done, # they are dropped only when the replication is done,
# so that 1 storage can still die without data loss. # so that 1 storage can still die without data loss.
with Patch(s0.dm, changePartitionTable=changePartitionTable): with Patch(s0.dm, changePartitionTable=changePartitionTable):
cluster.neoctl.tweakPartitionTable() cluster.neoctl.tweakPartitionTable()
self.tic() self.tic()
expectedFailure(self.assertEqual)(cluster.neoctl.getClusterState(), self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING) ClusterStates.RUNNING)
@with_cluster(start_cluster=0, partitions=3, replicas=1, storage_count=3) @with_cluster(start_cluster=0, partitions=3, replicas=1, storage_count=3)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment