Commit 6d330027 authored by Vincent Pelletier's avatar Vincent Pelletier

Sort cell list after randomising it.

There are 2 objectives:
- Prevent randomly trying to connect to an unresponsive storage node, which
  impairs performances a lot. Note that this happens only when the master
  didn't notice the disconnection, so the node is still in running state in
  the node manager.
- Increase connection reuse, saving the cost of establishing a new
  connection and a slot in connection pool.
Randomisation should be kept to even out storage node use.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2173 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent a29756a3
...@@ -447,6 +447,7 @@ class Application(object): ...@@ -447,6 +447,7 @@ class Application(object):
raise NEOStorageNotFoundError() raise NEOStorageNotFoundError()
shuffle(cell_list) shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
self.local_var.asked_object = 0 self.local_var.asked_object = 0
for cell in cell_list: for cell in cell_list:
logging.debug('trying to load %s from %s', logging.debug('trying to load %s from %s',
...@@ -859,6 +860,7 @@ class Application(object): ...@@ -859,6 +860,7 @@ class Application(object):
assert len(cell_list), 'No cell found for transaction %s' % ( assert len(cell_list), 'No cell found for transaction %s' % (
dump(undone_tid), ) dump(undone_tid), )
shuffle(cell_list) shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForCell(cell) conn = self.cp.getConnForCell(cell)
if conn is None: if conn is None:
...@@ -976,6 +978,7 @@ class Application(object): ...@@ -976,6 +978,7 @@ class Application(object):
for tid in ordered_tids: for tid in ordered_tids:
cell_list = self._getCellListForTID(tid, readable=True) cell_list = self._getCellListForTID(tid, readable=True)
shuffle(cell_list) shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForCell(cell) conn = self.cp.getConnForCell(cell)
if conn is not None: if conn is not None:
...@@ -1021,7 +1024,7 @@ class Application(object): ...@@ -1021,7 +1024,7 @@ class Application(object):
# Get history informations for object first # Get history informations for object first
cell_list = self._getCellListForOID(oid, readable=True) cell_list = self._getCellListForOID(oid, readable=True)
shuffle(cell_list) shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForCell(cell) conn = self.cp.getConnForCell(cell)
if conn is None: if conn is None:
...@@ -1060,7 +1063,7 @@ class Application(object): ...@@ -1060,7 +1063,7 @@ class Application(object):
for serial, size in self.local_var.history[1]: for serial, size in self.local_var.history[1]:
self._getCellListForTID(serial, readable=True) self._getCellListForTID(serial, readable=True)
shuffle(cell_list) shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForCell(cell) conn = self.cp.getConnForCell(cell)
if conn is None: if conn is None:
......
...@@ -22,6 +22,19 @@ from neo.protocol import NodeTypes, Packets ...@@ -22,6 +22,19 @@ from neo.protocol import NodeTypes, Packets
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo.client.exception import ConnectionClosed from neo.client.exception import ConnectionClosed
from neo.profiling import profiler_decorator from neo.profiling import profiler_decorator
import time
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600
# Cell list sort keys
# We are connected to storage node hosting cell, high priority
CELL_CONNECTED = -1
# normal priority
CELL_GOOD = 0
# Storage node hosting cell failed recently, low priority
CELL_FAILED = 1
class ConnectionPool(object): class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes.""" """This class manages a pool of connections to storage nodes."""
...@@ -36,6 +49,7 @@ class ConnectionPool(object): ...@@ -36,6 +49,7 @@ class ConnectionPool(object):
l = RLock() l = RLock()
self.connection_lock_acquire = l.acquire self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release self.connection_lock_release = l.release
self.node_failure_dict = {}
@profiler_decorator @profiler_decorator
def _initNodeConnection(self, node): def _initNodeConnection(self, node):
...@@ -59,6 +73,7 @@ class ConnectionPool(object): ...@@ -59,6 +73,7 @@ class ConnectionPool(object):
if conn.getConnector() is None: if conn.getConnector() is None:
# This happens, if a connection could not be established. # This happens, if a connection could not be established.
logging.error('Connection to %r failed', node) logging.error('Connection to %r failed', node)
self.notifyFailure(node)
return None return None
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
...@@ -72,6 +87,7 @@ class ConnectionPool(object): ...@@ -72,6 +87,7 @@ class ConnectionPool(object):
handler=app.storage_bootstrap_handler) handler=app.storage_bootstrap_handler)
except ConnectionClosed: except ConnectionClosed:
logging.error('Connection to %r failed', node) logging.error('Connection to %r failed', node)
self.notifyFailure(node)
return None return None
if app.isNodeReady(): if app.isNodeReady():
...@@ -79,6 +95,7 @@ class ConnectionPool(object): ...@@ -79,6 +95,7 @@ class ConnectionPool(object):
return conn return conn
else: else:
logging.info('%r not ready', node) logging.info('%r not ready', node)
self.notifyFailure(node)
return None return None
@profiler_decorator @profiler_decorator
...@@ -111,6 +128,28 @@ class ConnectionPool(object): ...@@ -111,6 +128,28 @@ class ConnectionPool(object):
self.connection_dict[node.getUUID()] = conn self.connection_dict[node.getUUID()] = conn
return conn return conn
@profiler_decorator
def notifyFailure(self, node):
self._notifyFailure(node.getUUID(), time.time() + MAX_FAILURE_AGE)
def _notifyFailure(self, uuid, at):
self.node_failure_dict[uuid] = at
@profiler_decorator
def getCellSortKey(self, cell):
return self._getCellSortKey(cell.getUUID(), time.time())
def _getCellSortKey(self, uuid, now):
if uuid in self.connection_dict:
result = CELL_CONNECTED
else:
failure = self.node_failure_dict.get(uuid)
if failure is None or failure < now:
result = CELL_GOOD
else:
result = CELL_FAILED
return result
@profiler_decorator @profiler_decorator
def getConnForCell(self, cell): def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode()) return self.getConnForNode(cell.getNode())
......
...@@ -195,21 +195,21 @@ class ClientApplicationTests(NeoTestBase): ...@@ -195,21 +195,21 @@ class ClientApplicationTests(NeoTestBase):
tid = self.makeTID() tid = self.makeTID()
# cache cleared # cache cleared
self.assertTrue(oid not in mq) self.assertTrue(oid not in mq)
app.pt = Mock({ 'getCellListForOID': (), }) app.pt = Mock({ 'getCellListForOID': [], })
app.local_var.history = (oid, [(tid, 0)]) app.local_var.history = (oid, [(tid, 0)])
# If object len is 0, this object doesn't exist anymore because its # If object len is 0, this object doesn't exist anymore because its
# creation has been undone. # creation has been undone.
self.assertRaises(KeyError, app.getSerial, oid) self.assertRaises(KeyError, app.getSerial, oid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1) self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1)
# Otherwise, result from ZODB # Otherwise, result from ZODB
app.pt = Mock({ 'getCellListForOID': (), }) app.pt = Mock({ 'getCellListForOID': [], })
app.local_var.history = (oid, [(tid, 1)]) app.local_var.history = (oid, [(tid, 1)])
self.assertEquals(app.getSerial(oid), tid) self.assertEquals(app.getSerial(oid), tid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1) self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1)
# fill the cache -> hit # fill the cache -> hit
mq.store(oid, (tid, ' ')) mq.store(oid, (tid, ' '))
self.assertTrue(oid in mq) self.assertTrue(oid in mq)
app.pt = Mock({ 'getCellListForOID': (), }) app.pt = Mock({ 'getCellListForOID': [], })
app.getSerial(oid) app.getSerial(oid)
self.assertEquals(app.getSerial(oid), tid) self.assertEquals(app.getSerial(oid), tid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 0) self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 0)
...@@ -231,7 +231,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -231,7 +231,7 @@ class ClientApplicationTests(NeoTestBase):
'fakeReceived': packet, 'fakeReceived': packet,
}) })
app.local_var.queue = Mock({'get' : (conn, None)}) app.local_var.queue = Mock({'get' : (conn, None)})
app.pt = Mock({ 'getCellListForOID': (cell, ), }) app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn}) app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1 app.local_var.asked_object = -1
Application._waitMessage = self._waitMessage Application._waitMessage = self._waitMessage
...@@ -247,7 +247,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -247,7 +247,7 @@ class ClientApplicationTests(NeoTestBase):
'getAddress': ('127.0.0.1', 0), 'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet, 'fakeReceived': packet,
}) })
app.pt = Mock({ 'getCellListForOID': (cell, ), }) app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn}) app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1 app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.load, oid) self.assertRaises(NEOStorageNotFoundError, app.load, oid)
...@@ -289,7 +289,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -289,7 +289,7 @@ class ClientApplicationTests(NeoTestBase):
'getAddress': ('127.0.0.1', 0), 'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet, 'fakeReceived': packet,
}) })
app.pt = Mock({ 'getCellListForOID': (cell, ), }) app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn}) app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1 app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.loadSerial, oid, tid2) self.assertRaises(NEOStorageNotFoundError, app.loadSerial, oid, tid2)
...@@ -329,7 +329,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -329,7 +329,7 @@ class ClientApplicationTests(NeoTestBase):
'getAddress': ('127.0.0.1', 0), 'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet, 'fakeReceived': packet,
}) })
app.pt = Mock({ 'getCellListForOID': (cell, ), }) app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn}) app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1 app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.loadBefore, oid, tid2) self.assertRaises(NEOStorageNotFoundError, app.loadBefore, oid, tid2)
...@@ -772,8 +772,8 @@ class ClientApplicationTests(NeoTestBase): ...@@ -772,8 +772,8 @@ class ClientApplicationTests(NeoTestBase):
'getState': 'FakeState', 'getState': 'FakeState',
}) })
app.pt = Mock({ app.pt = Mock({
'getCellListForTID': (cell, ), 'getCellListForTID': [cell, ],
'getCellListForOID': (cell, ), 'getCellListForOID': [cell, ],
}) })
app.cp = Mock({'getConnForCell': conn, 'getConnForNode': conn}) app.cp = Mock({'getConnForCell': conn, 'getConnForNode': conn})
def tryToResolveConflict(oid, conflict_serial, serial, data, def tryToResolveConflict(oid, conflict_serial, serial, data,
......
...@@ -48,6 +48,26 @@ class ConnectionPoolTests(NeoTestBase): ...@@ -48,6 +48,26 @@ class ConnectionPoolTests(NeoTestBase):
# TODO: test getConnForNode (requires splitting complex functionalities) # TODO: test getConnForNode (requires splitting complex functionalities)
def test_CellSortKey(self):
pool = ConnectionPool(None)
node_uuid_1 = self.getNewUUID()
node_uuid_2 = self.getNewUUID()
node_uuid_3 = self.getNewUUID()
# We are connected to node 1
pool.connection_dict[node_uuid_1] = None
# A connection to node 3 failed, will be forgotten at 5
pool._notifyFailure(node_uuid_3, 5)
getCellSortKey = pool._getCellSortKey
# At 0, key values are not ambiguous
self.assertTrue(getCellSortKey(node_uuid_1, 0) < getCellSortKey(
node_uuid_2, 0) < getCellSortKey(node_uuid_3, 0))
# At 10, nodes 2 and 3 have the same key value
self.assertTrue(getCellSortKey(node_uuid_1, 10) < getCellSortKey(
node_uuid_2, 10))
self.assertEqual(getCellSortKey(node_uuid_2, 10), getCellSortKey(
node_uuid_3, 10))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
  • That's not ideal. At startup or after nodes are back, this prevents full load balancing until some data is written. And for some not-so-rare use cases, it may be forever.

    That's actually what happened during an upgrade: a migration had to read a lot of data and I noticed that only 1 node was busy.

    Since this commit, the code has changed a lot but the idea of distinguishing connected and other good nodes was kept. I consider dropping it. The code would become:

    --- a/neo/client/pool.py
    +++ b/neo/client/pool.py
    @@ -62,19 +62,16 @@ def _initNodeConnection(self, node):
         def getCellSortKey(self, cell, random=random.random):
             # The use of 'random' suffles cells to randomise node to access.
             uuid = cell.getUUID()
    -        # First, prefer a connected node.
    -        if uuid in self.connection_dict:
    -            return random()
    -        # Then one that didn't fail recently.
    +        # Prefer a node that didn't fail recently.
             failure = self.node_failure_dict.get(uuid)
             if failure:
                 if time.time() < failure:
    -                # At last, order by date of connection failure.
    +                # Or order by date of connection failure.
                     return failure
                 # Do not use 'del' statement: we didn't lock, so another
                 # thread might have removed uuid from node_failure_dict.
                 self.node_failure_dict.pop(uuid, None)
    -        return 1 + random()
    +        return random()
     
         def getConnForNode(self, node):
             """Return a locked connection object to a given node
  • A compromise could be to overlap the 2 cases, e.g. random() vs 0.5 + random(), but I doubt it's worth the complexity.

    Edited by Julien Muchembled
  • It seems that the intention of this commit was also linked to a feature that was removed in 77132157.

  • It is related to the original idea of limiting the number of connections kept opened, yes.

    About having to write for actual balancing to happen, what about ordering cells when there is no connection, and then continuing with this approach ?

    Or, if startup time does not hurt too much from it, what about establishing all connections from the begining ?

  • About having to write for actual balancing to happen, what about ordering cells when there is no connection, and then continuing with this approach ?

    I don't understand.

    Anyway, without any objection on the fact that we don't limit anymore the number of connections, my original suggestion is the simplest so I'll do that.

  • @jm

    Here was my thought process:

    The way I understand "we have to write" affecting node selection for reads is that until a write we will not have connections to all nodes.

    The way I understand "not having connections to all nodes" affecting node selection is that we prefer nodes we are already connected with.

    The way I understand "nodes we are already connected with" as being a load balancing issue is if there is no randomisation before the first connection got established (so all first connections end up on the same node).

    So it looks like randomisation is not applied as it should be: whenever there is no clear outstanding node, which includes the case where no connection exists at all.

    Am I missing/misunderstanding something ? I did not re-read the code beyond above diff nor look for the most recent implementation.

Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment