Commit 97af23cc authored by Julien Muchembled's avatar Julien Muchembled

Maximize resiliency by taking into account the topology of storage nodes

This commit adds a contraint when tweaking the partition table with replicas,
so that cells of each partition are assigned as far as possible from each
other, e.g. not on the same machine even if each one has several disks, and
in any case not on the same storage device.

Currently, the topology path of each node is automatically calculated by the
storage backend. Both MySQL and SQLite return a 2-tuple (host, st_dev).
To be improved:
- Add a storage option to override the path: the 'tweak' algorithm can already
  handle topology paths of any length, so something like (room, machine, disk)
  could be done easily.
- Write OS-specific code to determine the real hardware behind st_dev
  (e.g. 2 different 'st_dev' values may actually refer to the same disk,
   because of layers like partitioning, device-mapper, loop, btrfs subvolumes,
   and so on).
- Make 'neoctl' report in some way if the PT is optimal. Meanwhile,
  if it isn't, the master only logs a WARNING during tweak.
parent d4ea398d
......@@ -218,7 +218,7 @@ class Application(ThreadedApplication):
node=node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, None)
NodeTypes.CLIENT, self.uuid, None, self.name, (), None)
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
......
......@@ -47,7 +47,7 @@ class ConnectionPool(object):
conn = MTClientConnection(app, app.storage_event_handler, node,
dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name, app.id_timestamp)
app.uuid, None, app.name, (), app.id_timestamp)
try:
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
......
......@@ -26,13 +26,14 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, node_type, server=None):
def __init__(self, app, node_type, server=None, devpath=()):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
is ready.
"""
self.server = server
self.devpath = devpath
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
......@@ -43,7 +44,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, None))
self.server, self.app.name, self.devpath, None))
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
......
......@@ -34,6 +34,7 @@ class SocketConnector(object):
is_closed = is_server = None
connect_limit = {}
CONNECT_LIMIT = 1
SOMAXCONN = 5 # for threaded tests
def __new__(cls, addr, s=None):
if s is None:
......@@ -124,7 +125,7 @@ class SocketConnector(object):
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._bind(self.addr)
self.socket.listen(5)
self.socket.listen(self.SOMAXCONN)
except socket.error, e:
self.socket.close()
self._error('listen', e)
......
......@@ -28,6 +28,7 @@ class Node(object):
_connection = None
_identified = False
devpath = ()
id_timestamp = None
def __init__(self, manager, address=None, uuid=None, state=NodeStates.DOWN):
......
......@@ -22,7 +22,7 @@ from struct import Struct
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 3
PROTOCOL_VERSION = 4
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# Avoid memory errors on corrupted data.
......@@ -685,6 +685,7 @@ class RequestIdentification(Packet):
PUUID('uuid'),
PAddress('address'),
PString('name'),
PList('devpath', PString('devid')),
PFloat('id_timestamp'),
)
......
......@@ -24,7 +24,7 @@ from ..app import monotonic_time
class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp):
address, name, devpath, id_timestamp):
app = self.app
self.checkClusterName(name)
if address == app.server:
......@@ -101,6 +101,8 @@ class IdentificationHandler(EventHandler):
uuid=uuid, address=address)
else:
node.setUUID(uuid)
if devpath:
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time()
node.setState(state)
conn.setHandler(handler)
......@@ -120,7 +122,7 @@ class IdentificationHandler(EventHandler):
class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp):
address, name, devpath, id_timestamp):
app = self.app
self.checkClusterName(name)
if address == app.server:
......
......@@ -38,7 +38,7 @@ class ElectionHandler(MasterHandler):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election))
app.uuid, app.server, app.name, (), app.election))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
......
......@@ -178,7 +178,7 @@ class PartitionTable(neo.lib.pt.PartitionTable):
def tweak(self, drop_list=()):
"""Optimize partition table
This reassigns cells in 3 ways:
This reassigns cells in 4 ways:
- Discard cells of nodes listed in 'drop_list'. For partitions with too
few readable cells, some cells are instead marked as FEEDING. This is
a preliminary step to drop these nodes, otherwise the partition table
......@@ -187,6 +187,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
- When a transaction creates new objects (oids are roughly allocated
sequentially), we expect better performance by maximizing the number
of involved nodes (i.e. parallelizing writes).
- For maximum resiliency, cells of each partition are assigned as far
as possible from each other, by checking the topology path of nodes.
Examples of optimal partition tables with np=10, nr=1 and 5 nodes:
......@@ -215,6 +217,17 @@ class PartitionTable(neo.lib.pt.PartitionTable):
U. .U U.
.U U. U.
U. U. .U
For the topology, let's consider an example with paths of the form
(room, machine, disk):
- if there are more rooms than the number of replicas, 2 cells of the
same partition must not be assigned in the same room;
- otherwise, topology paths are checked at a deeper depth,
e.g. not on the same machine and distributed evenly
(off by 1) among rooms.
But the topology is expected to be optimal, otherwise it is ignored.
In some cases, we could fall back to a non-optimal topology but
that would cause extra replication if the user wants to fix it.
"""
# Collect some data in a usable form for the rest of the method.
node_list = {node: {} for node in self.count_dict
......@@ -242,6 +255,67 @@ class PartitionTable(neo.lib.pt.PartitionTable):
i += 1
option_dict = Counter(map(tuple, x))
# Initialize variables/functions to optimize the topology.
devpath_max = []
devpaths = [()] * node_count
if repeats > 1:
_devpaths = [x[0].devpath for x in node_list]
max_depth = min(map(len, _devpaths))
depth = 0
while 1:
if depth < max_depth:
depth += 1
x = Counter(x[:depth] for x in _devpaths)
n = len(x)
x = set(x.itervalues())
# TODO: Prove it works. If the code turns out to be:
# - too pessimistic, the topology is ignored when
# resiliency could be maximized;
# - or worse too optimistic, in which case this
# method raises, possibly after a very long time.
if len(x) == 1 or max(x) * repeats <= node_count:
i, x = divmod(repeats, n)
devpath_max.append((i + 1, x) if x else (i, n))
if n < repeats:
continue
devpaths = [x[:depth] for x in _devpaths]
break
logging.warning("Can't maximize resiliency: fix the topology"
" of your storage nodes and make sure they're all running."
" %s storage device failure(s) may be enough to lose all"
" the database." % (repeats - 1))
break
topology = [{} for _ in xrange(self.np)]
def update_topology():
for offset in option:
n = topology[offset]
for i, (j, k) in zip(devpath, devpath_max):
try:
i, x = n[i]
except KeyError:
n[i] = i, x = [0, {}]
if i == j or i + 1 == j and k == sum(
1 for i in n.itervalues() if i[0] == j):
# Too many cells would be assigned at this topology
# node.
return False
n = x
# The topology may be optimal with this option. Apply it.
for offset in option:
n = topology[offset]
for i in devpath:
n = n[i]
n[0] += 1
n = n[1]
return True
def revert_topology():
for offset in option:
n = topology[offset]
for i in devpath:
n = n[i]
n[0] -= 1
n = n[1]
# Strategies to find the "best" permutation of nodes.
def node_options():
# The second part of the key goes with the above cosmetic sort.
......@@ -291,24 +365,27 @@ class PartitionTable(neo.lib.pt.PartitionTable):
new = [] # the solution
stack = [] # data recursion
def options():
return iter(node_options[len(new)][-1])
x = node_options[len(new)]
return devpaths[x[-2]], iter(x[-1])
for node_options in node_options(): # for each strategy
iter_option = options()
devpath, iter_option = options()
while 1:
try:
option = next(iter_option)
except StopIteration: # 1st strategy only
except StopIteration:
if new:
iter_option = stack.pop()
option_dict[new.pop()] += 1
devpath, iter_option = stack.pop()
option = new.pop()
revert_topology()
option_dict[option] += 1
continue
break
if option_dict[option]:
if option_dict[option] and update_topology():
new.append(option)
if len(new) == len(node_list):
if len(new) == node_count:
break
stack.append(iter_option)
iter_option = options()
stack.append((devpath, iter_option))
devpath, iter_option = options()
option_dict[option] -= 1
if new:
break
......
......@@ -71,6 +71,7 @@ class Application(BaseApplication):
self.dm.setup(reset=config.getReset(), dedup=config.getDedup())
self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only
if config.getUUID() is not None:
......@@ -203,7 +204,8 @@ class Application(BaseApplication):
pt = self.pt
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server)
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
uuid = self.uuid
......
......@@ -51,7 +51,7 @@ class Checker(object):
else:
conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, app.id_timestamp))
uuid, app.server, name, (), app.id_timestamp))
self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict)
conn_set.discard(None)
......
......@@ -383,7 +383,7 @@ class ImporterDatabaseManager(DatabaseManager):
updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit
dropPartitionsTemporary
_getDevPath dropPartitionsTemporary
""".split():
setattr(self, x, getattr(db, x))
if self._writeback:
......
......@@ -167,6 +167,15 @@ class DatabaseManager(object):
raise
sys.exit(self.LOCKED)
def _getDevPath(self):
"""
"""
@requires(_getDevPath)
def getTopologyPath(self):
# On Windows, st_dev only exists since Python 3.4
return socket.gethostname(), str(os.stat(self._getDevPath()).st_dev)
@abstract
def erase(self):
""""""
......
......@@ -226,6 +226,11 @@ class MySQLDatabaseManager(DatabaseManager):
"""Escape special characters in a string."""
return self.conn.escape_string
def _getDevPath(self):
# BBB: MySQL is moving to Performance Schema.
return self.query("SELECT * FROM information_schema.global_variables"
" WHERE variable_name='datadir'")[0][1]
def erase(self):
self.query("DROP TABLE IF EXISTS"
" config, pt, trans, obj, data, bigdata, ttrans, tobj")
......
......@@ -86,6 +86,9 @@ class SQLiteDatabaseManager(DatabaseManager):
q("PRAGMA journal_mode = MEMORY")
self._config = {}
def _getDevPath(self):
return self.db
def _commit(self):
retry_if_locked(self.conn.commit)
......
......@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
id_timestamp):
devpath, id_timestamp):
self.checkClusterName(name)
app = self.app
# reject any incoming connections if not ready
......
......@@ -350,7 +350,7 @@ class Replicator(object):
try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name,
app.id_timestamp))
(), app.id_timestamp))
except ConnectionClosed:
if previous_node is self.current_node:
return
......
......@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random, time, unittest
from collections import defaultdict
from collections import Counter, defaultdict
from .. import NeoUnitTestBase
from neo.lib import logging
from neo.lib.protocol import NodeStates, CellStates
......@@ -291,13 +291,17 @@ class MasterPartitionTableTests(NeoUnitTestBase):
self.update(pt, self.tweak(pt, sn[:1]))
self.assertPartitionTable(pt, '.U.|..U|.U.|..U|.U.|..U|.U.')
def test_18_tweak(self):
s = repr(time.time())
logging.info("using seed %r", s)
r = random.Random(s)
def test_18_tweakBigPT(self):
seed = repr(time.time())
logging.info("using seed %r", seed)
sn_count = 11
sn = [self.createStorage(None, i + 1, NodeStates.RUNNING)
for i in xrange(sn_count)]
for topo in 0, 1:
r = random.Random(seed)
if topo:
for i, s in enumerate(sn, sn_count):
s.devpath = str(i % 5),
pt = PartitionTable(1000, 2)
pt.setID(1)
for offset in xrange(pt.np):
......@@ -311,6 +315,70 @@ class MasterPartitionTableTests(NeoUnitTestBase):
self.tweak(pt)
self.update(pt)
def test_19_topology(self):
sn_count = 16
sn = [self.createStorage(None, i + 1, NodeStates.RUNNING)
for i in xrange(sn_count)]
pt = PartitionTable(48, 2)
pt.make(sn)
pt.log()
for i, s in enumerate(sn, sn_count):
s.devpath = tuple(bin(i)[3:-1])
self.assertEqual(Counter(x[2] for x in self.tweak(pt)), {
CellStates.OUT_OF_DATE: 96,
CellStates.FEEDING: 96,
})
self.update(pt)
x = lambda n, *x: ('|'.join(x[:1]*n), '|'.join(x[1:]*n))
for even, np, i, topo, expected in (
## Optimal topology.
# All nodes have same number of cells.
(1, 2, 2, ("00", "01", "02", "10", "11", "12"), ('UU...U|..UUU.',
'UU.U..|..U.UU')),
(1, 7, 1, "0001122", (
'U.....U|.U.U...|..U.U..|U....U.|.U....U|..UU...|....UU.',
'U..U...|.U...U.|..U.U..|U.....U|.U.U...|..U..U.|....U.U')),
(1, 4, 1, "00011122", ('U......U|.U.U....|..U.U...|.....UU.',
'U..U....|.U..U...|..U...U.|.....U.U')),
(1, 9, 1, "000111222", ('U.......U|.U.U.....|..U.U....|'
'.....UU..|U......U.|.U......U|'
'..UU.....|....U.U..|.....U.U.',
'U..U.....|.U....U..|..U.U....|'
'.....U.U.|U.......U|.U.U.....|'
'..U...U..|....U..U.|.....U..U')),
# Some nodes have a extra cell.
(0, 8, 1, "0001122", ('U.....U|.U.U...|..U.U..|U....U.|'
'.U....U|..UU...|....UU.|U.....U',
'U..U...|.U...U.|..U.U..|U.....U|'
'.U.U...|..U..U.|....U.U|U..U...')),
## Topology ignored.
(1, 6, 1, ("00", "01", "1"), 'UU.|U.U|.UU|UU.|U.U|.UU'),
(1, 5, 2, "01233", 'UUU..|U..UU|.UUU.|UU..U|..UUU'),
):
assert len(topo) <= sn_count
sn2 = sn[:len(topo)]
for s in sn2:
s.devpath = ()
k = (1,7)[even]
pt = PartitionTable(np*k, i)
pt.make(sn2)
for devpath, s in zip(topo, sn2):
s.devpath = tuple(devpath)
if type(expected) is tuple:
self.assertTrue(self.tweak(pt))
self.update(pt)
self.assertPartitionTable(pt, '|'.join(expected[:1]*k))
pt.clear()
pt.make(sn2)
self.assertPartitionTable(pt, '|'.join(expected[1:]*k))
self.assertFalse(pt.tweak())
else:
expected = '|'.join((expected,)*k)
self.assertFalse(pt.tweak())
self.assertPartitionTable(pt, expected)
pt.clear()
pt.make(sn2)
self.assertPartitionTable(pt, expected)
if __name__ == '__main__':
unittest.main()
......
......@@ -171,6 +171,8 @@ class Serialized(object):
# a single-core CPU, other threads are still busy and haven't
# sent anything yet on the network. This causes tic() to
# return prematurely. Passing a non-zero value is a hack.
# We also increase SocketConnector.SOMAXCONN in tests so that
# a connection attempt is never delayed inside the kernel.
timeout=0):
# If you're in a pdb here, 'n' switches to another thread
# (the following lines are not supposed to be debugged into)
......@@ -612,6 +614,7 @@ class NEOCluster(object):
Patch(BaseConnection, getTimeout=lambda orig, self: None),
Patch(SimpleQueue, __init__=__init__),
Patch(SocketConnector, CONNECT_LIMIT=0),
Patch(SocketConnector, SOMAXCONN=128), # see Serialized.tic comment
Patch(SocketConnector, _bind=lambda orig, self, addr: orig(self, BIND)),
Patch(SocketConnector, _connect = lambda orig, self, addr:
orig(self, ServerNode.resolv(addr))))
......@@ -771,7 +774,7 @@ class NEOCluster(object):
else NodeStates.RUNNING)
for node in self.storage_list if storage_list is None else storage_list:
state = self.getNodeState(node)
assert state == expected_state, (node, state)
assert state == expected_state, (repr(node), state)
def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw):
if self.started:
......
......@@ -20,10 +20,12 @@ from ZODB.POSException import ReadOnlyError, POSKeyError
import unittest
from collections import defaultdict
from functools import wraps
from itertools import product
from neo.lib import logging
from neo.client.exception import NEOStorageError
from neo.master.handlers.backup import BackupHandler
from neo.storage.checker import CHECK_COUNT
from neo.storage.database.manager import DatabaseManager
from neo.storage import replicator
from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection
......@@ -524,6 +526,29 @@ class ReplicationTests(NEOThreadedTest):
self.assertTrue(s.is_alive())
self.checkReplicas(cluster)
def testTopology(self):
"""
In addition to MasterPartitionTableTests.test_19_topology, this checks
correct propagation of the paths from storage nodes to tweak().
"""
with Patch(DatabaseManager, getTopologyPath=lambda *_: next(topology)):
for topology, expected in (
(iter("0" * 9),
'UU.......|..UU.....|....UU...|'
'......UU.|U.......U|.UU......|'
'...UU....|.....UU..|.......UU'),
(product("012", "012"),
'U..U.....|.U....U..|..U.U....|'
'.....U.U.|U.......U|.U.U.....|'
'..U...U..|....U..U.|.....U..U'),
):
with NEOCluster(replicas=1, partitions=9,
storage_count=9) as cluster:
for i, s in enumerate(cluster.storage_list, 1):
s.uuid = i
cluster.start()
self.assertPartitionTable(cluster, expected)
@with_cluster(start_cluster=0, replicas=1, storage_count=4, partitions=2)
def testTweakVsReplication(self, cluster, done=False):
S = cluster.storage_list
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment