Commit a340846d authored by Grégory Wisniewski's avatar Grégory Wisniewski

Split partition table class in two. One for the admin node, another for other

node types. Add getCellListForID() in which is computed the cell related to an
OID or TID. Add _getPartitionTable() accessor in client application that check
if the master connection is established or not, and reconnect to obtain the last
partition table. Move the connecting_to_master_node lock in the accessors
instead of connectToPrimaryMaster(). This latter method is also set as private.
Note that the connection to primary master is not established at client startup
but when trying to access to the partition table or the primary connection. 
Split tests for the partition table and update others tests altered with changes
above. Fix a database name for mysqldb module, remove some unused import.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@627 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 3776e9d9
......@@ -42,7 +42,6 @@ from neo.event import EventManager
from neo.locking import RLock, Lock
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.utils import p64, u64, oid_repr
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
......@@ -229,6 +228,7 @@ class Application(object):
self.nm = NodeManager()
self.cp = ConnectionPool(self)
self.pt = None
self.master_conn = None
self.primary_master_node = None
self.master_node_list = master_nodes.split(' ')
# no self-assigned UUID, primary master will supply us one
......@@ -268,13 +268,6 @@ class Application(object):
lock = Lock()
self._pt_acquire = lock.acquire
self._pt_release = lock.release
self.master_conn = None
self.num_replicas = None
self.num_partitions = None
self.master_conn = self._getMasterConnection()
assert self.master_conn is not None
assert self.num_partitions is not None
assert self.num_replicas is not None
def _notifyDeadStorage(self, s_node):
""" Notify a storage failure to the primary master """
......@@ -339,10 +332,93 @@ class Application(object):
def _getMasterConnection(self):
""" Connect to the primary master node on demand """
if self.master_conn is None:
self.master_conn = self.connectToPrimaryMasterNode()
return self.master_conn
# acquire the lock to allow only one thread to connect to the primary
lock = self._connecting_to_master_node_acquire(True)
try:
if self.master_conn is None:
self.master_conn = self._connectToPrimaryMasterNode()
return self.master_conn
finally:
self._connecting_to_master_node_release()
def _getPartitionTable(self):
""" Return the partition table manager, reconnect the PMN if needed """
self._pt_acquire(True)
try:
if self.master_conn is None:
self.master_conn = self._connectToPrimaryMasterNode()
assert self.pt is not None
return self.pt
finally:
self._pt_release()
def _connectToPrimaryMasterNode(self):
logging.debug('connecting to primary master...')
master_index = 0
# Make application execute remaining message if any
self._waitMessage()
while True:
self.setNodeReady()
if self.primary_master_node is None:
# Try with master node defined in config
try:
addr, port = self.master_node_list[master_index].split(':')
except IndexError:
master_index = 0
addr, port = self.master_node_list[master_index].split(':')
port = int(port)
else:
addr, port = self.primary_master_node.getServer()
# Request Node Identification
handler = PrimaryBootstrapHandler(self, self.dispatcher)
conn = MTClientConnection(self.em, handler, (addr, port),
connector_handler=self.connector_handler)
self._nm_acquire()
try:
if self.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
self.nm.add(n)
finally:
self._nm_release()
conn.lock()
try:
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally:
conn.unlock()
# Wait for answer
while 1:
self._waitMessage(handler=handler)
# Now check result
if self.primary_master_node is not None:
if self.primary_master_node == -1:
# Connection failed, try with another master node
self.primary_master_node = None
master_index += 1
break
elif self.primary_master_node.getServer() != (addr, port):
# Master node changed, connect to new one
break
elif not self.isNodeReady():
# Wait a bit and reask again
break
elif self.pt is not None and self.pt.operational():
# Connected to primary master node
break
if self.pt is not None and self.pt.operational() \
and self.uuid != INVALID_UUID:
# Connected to primary master node and got all informations
break
sleep(1)
logging.info("connected to primary master node %s" % self.primary_master_node)
conn.setHandler(PrimaryNotificationsHandler(self, self.dispatcher))
return conn
def registerDB(self, db, limit):
self._db = db
......@@ -385,14 +461,8 @@ class Application(object):
def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0):
"""Internal method which manage load ,loadSerial and loadBefore."""
partition_id = u64(oid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
pt = self._getPartitionTable()
cell_list = pt.getCellListForID(oid, readable=True)
if len(cell_list) == 0:
# No cells available, so why are we running ?
logging.error('oid %s not found because no storage is available for it', dump(oid))
......@@ -526,12 +596,8 @@ class Application(object):
logging.debug('storing oid %s serial %s',
dump(oid), dump(serial))
# Find which storage node to use
partition_id = u64(oid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, writable=True)
finally:
self._pt_release()
pt = self._getPartitionTable()
cell_list = pt.getCellListForID(oid, writable=True)
if len(cell_list) == 0:
# FIXME must wait for cluster to be ready
raise NEOStorageError
......@@ -586,12 +652,8 @@ class Application(object):
ext = dumps(transaction._extension)
oid_list = self.local_var.data_dict.keys()
# Store data on each node
partition_id = u64(self.local_var.tid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, writable=True)
finally:
self._pt_release()
pt = self._getPartitionTable()
cell_list = pt.getCellListForID(self.local_var.tid, writable=True)
self.local_var.voted_counter = 0
for cell in cell_list:
logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
......@@ -621,19 +683,12 @@ class Application(object):
return
cell_set = set()
self._pt_acquire()
try:
# select nodes where objects were stored
for oid in self.local_var.data_dict.iterkeys():
partition_id = u64(oid) % self.num_partitions
cell_set |= set(self.pt.getCellList(partition_id, writable=True))
# select nodes where transaction was stored
partition_id = u64(self.local_var.tid) % self.num_partitions
cell_set |= set(self.pt.getCellList(partition_id, writable=True))
finally:
self._pt_release()
pt = self._getPartitionTable()
# select nodes where objects were stored
for oid in self.local_var.data_dict.iterkeys():
cell_set |= set(pt.getCellListForID(oid, writable=True))
# select nodes where transaction was stored
cell_set |= set(pt.getCellListForID(self.local_var.tid, writable=True))
# cancel transaction one all those nodes
for cell in cell_set:
......@@ -691,12 +746,8 @@ class Application(object):
raise StorageTransactionError(self, transaction_id)
# First get transaction information from a storage node.
partition_id = u64(transaction_id) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, writable=True)
finally:
self._pt_release()
pt = self._getPartitionTable()
cell_list = pt.getCellListForID(transaction_id, writable=True)
shuffle(cell_list)
for cell in cell_list:
conn = self.cp.getConnForNode(cell)
......@@ -761,11 +812,8 @@ class Application(object):
# First get a list of transactions from all storage nodes.
# Each storage node will return TIDs only for UP_TO_DATE_STATE and
# FEEDING_STATE cells
self._pt_acquire()
try:
storage_node_list = self.pt.getNodeList()
finally:
self._pt_release()
pt = self._getPartitionTable()
storage_node_list = pt.getNodeList()
self.local_var.node_tids = {}
for storage_node in storage_node_list:
......@@ -800,12 +848,7 @@ class Application(object):
# For each transaction, get info
undo_info = []
for tid in ordered_tids:
partition_id = u64(tid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
cell_list = pt.getCellListForID(tid, readable=True)
shuffle(cell_list)
for cell in cell_list:
conn = self.cp.getConnForNode(storage_node)
......@@ -848,12 +891,8 @@ class Application(object):
# FIXME: filter function isn't used
def history(self, oid, version=None, length=1, filter=None, object_only=0):
# Get history informations for object first
partition_id = u64(oid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
pt = self._getPartitionTable()
cell_list = pt.getCellListForID(oid, readable=True)
shuffle(cell_list)
for cell in cell_list:
......@@ -883,12 +922,7 @@ class Application(object):
# Now that we have object informations, get txn informations
history_list = []
for serial, size in self.local_var.history[1]:
partition_id = u64(serial) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
pt.getCellListForID(serial, readable=True)
shuffle(cell_list)
for cell in cell_list:
......@@ -932,83 +966,6 @@ class Application(object):
def sync(self):
self._waitMessage()
def connectToPrimaryMasterNode(self):
logging.debug('connecting to primary master...')
# acquire the lock to allow only one thread to connect to the primary
lock = self._connecting_to_master_node_acquire(1)
try:
if self.pt is not None:
# pt is protected with the master lock
self.pt.clear()
master_index = 0
conn = None
# Make application execute remaining message if any
self._waitMessage()
while True:
self.setNodeReady()
if self.primary_master_node is None:
# Try with master node defined in config
try:
addr, port = self.master_node_list[master_index].split(':')
except IndexError:
master_index = 0
addr, port = self.master_node_list[master_index].split(':')
port = int(port)
else:
addr, port = self.primary_master_node.getServer()
# Request Node Identification
handler = PrimaryBootstrapHandler(self, self.dispatcher)
conn = MTClientConnection(self.em, handler, (addr, port),
connector_handler=self.connector_handler)
self._nm_acquire()
try:
if self.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
self.nm.add(n)
finally:
self._nm_release()
conn.lock()
try:
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally:
conn.unlock()
# Wait for answer
while 1:
self._waitMessage(handler=handler)
# Now check result
if self.primary_master_node is not None:
if self.primary_master_node == -1:
# Connection failed, try with another master node
self.primary_master_node = None
master_index += 1
break
elif self.primary_master_node.getServer() != (addr, port):
# Master node changed, connect to new one
break
elif not self.isNodeReady():
# Wait a bit and reask again
break
elif self.pt is not None and self.pt.operational():
# Connected to primary master node
break
if self.pt is not None and self.pt.operational() \
and self.uuid != INVALID_UUID:
# Connected to primary master node and got all informations
break
sleep(1)
logging.info("connected to primary master node %s" % self.primary_master_node)
conn.setHandler(PrimaryNotificationsHandler(self, self.dispatcher))
return conn
finally:
self._connecting_to_master_node_release()
def setNodeReady(self):
self.local_var.node_ready = True
......
......@@ -166,11 +166,8 @@ class PrimaryBootstrapHandler(PrimaryHandler):
# got an uuid from the primary master
app.uuid = your_uuid
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
app.num_partitions = num_partitions
app.num_replicas = num_replicas
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
# Ask a primary master.
conn.lock()
......
......@@ -28,17 +28,23 @@ from neo.protocol import *
from neo.util import makeChecksum
from neo.pt import PartitionTable
import neo.connection
import os
def _getMasterConnection(self):
self.uuid = 'C' * 16
self.num_partitions = 10
self.num_replicas = 1
self.pt = PartitionTable(self.num_partitions, self.num_replicas)
if self.master_conn is None:
self.uuid = 'C' * 16
self.num_partitions = 10
self.num_replicas = 1
self.pt = Mock({
'getCellListForID': (),
})
self.master_conn = Mock()
return self.master_conn
def _getPartitionTable(self):
if self.pt is None:
self.master_conn = _getMasterConnection(self)
return self.pt
def _waitMessage(self, conn=None, msg_id=None, handler=None):
if conn is not None and handler is not None:
handler.dispatch(conn, conn.fakeReceived())
......@@ -52,13 +58,16 @@ class ClientApplicationTest(NeoTestBase):
# apply monkey patches
self._getMasterConnection = Application._getMasterConnection
self._waitMessage = Application._waitMessage
self._getPartitionTable = Application._getPartitionTable
Application._getMasterConnection = _getMasterConnection
Application._waitMessage = _waitMessage
Application._getPartitionTable = _getPartitionTable
def tearDown(self):
# restore environnement
Application._getMasterConnection = self._getMasterConnection
Application._waitMessage = self._waitMessage
Application._getPartitionTable = self._getPartitionTable
# some helpers
......@@ -96,7 +105,7 @@ class ClientApplicationTest(NeoTestBase):
conn = Mock({ 'getNextId': 1, 'fakeReceived': packet, })
cell = Mock({ 'getServer': 'FakeServer', 'getState': 'FakeState', })
app.cp = Mock({ 'getConnForNode': conn})
app.pt = Mock({ 'getCellList': (cell, cell, ) })
app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
return oid
def voteTransaction(self, app):
......@@ -105,7 +114,7 @@ class ClientApplicationTest(NeoTestBase):
packet = protocol.answerStoreTransaction(tid=tid)
conn = Mock({ 'getNextId': 1, 'fakeReceived': packet, })
cell = Mock({ 'getServer': 'FakeServer', 'getState': 'FakeState', })
app.pt = Mock({ 'getCellList': (cell, cell, ) })
app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
app.cp = Mock({ 'getConnForNode': ReturnValues(None, conn), })
app.tpc_vote(txn)
......@@ -165,17 +174,17 @@ class ClientApplicationTest(NeoTestBase):
tid = self.makeTID()
# cache cleared -> result from ZODB
self.assertTrue(oid not in mq)
app.pt = Mock({ 'getCellList': (), })
app.pt = Mock({ 'getCellListForID': (), })
app.local_var.history = (oid, [(tid, 0)])
self.assertEquals(app.getSerial(oid), tid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 1)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForID')), 1)
# fill the cache -> hit
mq.store(oid, (tid, ''))
self.assertTrue(oid in mq)
app.pt = Mock({ 'getCellList': (), })
app.pt = Mock({ 'getCellListForID': (), })
app.getSerial(oid)
self.assertEquals(app.getSerial(oid), tid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForID')), 0)
def test_load(self):
app = self.getApp()
......@@ -193,7 +202,7 @@ class ClientApplicationTest(NeoTestBase):
'fakeReceived': packet,
})
app.local_var.queue = Mock({'get_nowait' : (conn, None)})
app.pt = Mock({ 'getCellList': (cell, ), })
app.pt = Mock({ 'getCellListForID': (cell, ), })
app.cp = Mock({ 'getConnForNode' : conn})
app.local_var.asked_object = -1
Application._waitMessage = self._waitMessage
......@@ -208,7 +217,7 @@ class ClientApplicationTest(NeoTestBase):
'getServer': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.pt = Mock({ 'getCellList': (cell, ), })
app.pt = Mock({ 'getCellListForID': (cell, ), })
app.cp = Mock({ 'getConnForNode' : conn})
app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.load, oid)
......@@ -248,7 +257,7 @@ class ClientApplicationTest(NeoTestBase):
'getServer': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.pt = Mock({ 'getCellList': (cell, ), })
app.pt = Mock({ 'getCellListForID': (cell, ), })
app.cp = Mock({ 'getConnForNode' : conn})
app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.loadSerial, oid, tid2)
......@@ -285,7 +294,7 @@ class ClientApplicationTest(NeoTestBase):
'getServer': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.pt = Mock({ 'getCellList': (cell, ), })
app.pt = Mock({ 'getCellListForID': (cell, ), })
app.cp = Mock({ 'getConnForNode' : conn})
app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.loadBefore, oid, tid2)
......@@ -369,12 +378,12 @@ class ClientApplicationTest(NeoTestBase):
# check partition_id and an empty cell list -> NEOStorageError
app.local_var.txn = txn
app.local_var.tid = tid
app.pt = Mock({ 'getCellList': (), })
app.pt = Mock({ 'getCellListForID': (), })
app.num_partitions = 2
self.assertRaises(NEOStorageError, app.store, oid, tid, '', None, txn)
calls = app.pt.mockGetNamedCalls('getCellList')
calls = app.pt.mockGetNamedCalls('getCellListForID')
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(0), 1) # oid=11
self.assertEquals(calls[0].getParam(0), oid) # oid=11
def test_store2(self):
app = self.getApp()
......@@ -393,7 +402,7 @@ class ClientApplicationTest(NeoTestBase):
'getServer': 'FakeServer',
'getState': 'FakeState',
})
app.pt = Mock({ 'getCellList': (cell, cell, )})
app.pt = Mock({ 'getCellListForID': (cell, cell, )})
app.cp = Mock({ 'getConnForNode': ReturnValues(None, conn)})
app.dispatcher = Mock({})
app.local_var.object_stored = (oid, tid)
......@@ -423,7 +432,7 @@ class ClientApplicationTest(NeoTestBase):
'getServer': 'FakeServer',
'getState': 'FakeState',
})
app.pt = Mock({ 'getCellList': (cell, cell, ) })
app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
app.dispatcher = Mock({})
app.conflict_serial = None # reset by hand
app.local_var.object_stored = ()
......@@ -462,7 +471,7 @@ class ClientApplicationTest(NeoTestBase):
'getServer': 'FakeServer',
'getState': 'FakeState',
})
app.pt = Mock({ 'getCellList': (cell, cell, ) })
app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
app.cp = Mock({ 'getConnForNode': ReturnValues(None, conn), })
app.dispatcher = Mock()
app.tpc_begin(txn, tid)
......@@ -490,7 +499,7 @@ class ClientApplicationTest(NeoTestBase):
'getServer': 'FakeServer',
'getState': 'FakeState',
})
app.pt = Mock({ 'getCellList': (cell, cell, ) })
app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
app.cp = Mock({ 'getConnForNode': ReturnValues(None, conn), })
app.dispatcher = Mock()
app.tpc_begin(txn, tid)
......@@ -509,7 +518,7 @@ class ClientApplicationTest(NeoTestBase):
self.assertFalse(app.local_var.txn is txn)
conn = Mock()
cell = Mock()
app.pt = Mock({'getCellList': (cell, cell)})
app.pt = Mock({'getCellListForID': (cell, cell)})
app.cp = Mock({'getConnForNode': ReturnValues(None, cell)})
app.tpc_abort(txn)
# no packet sent
......@@ -532,7 +541,7 @@ class ClientApplicationTest(NeoTestBase):
cell1 = Mock({ 'getNode': 'NODE1', '__hash__': 1 })
cell2 = Mock({ 'getNode': 'NODE2', '__hash__': 2 })
conn1, conn2 = Mock({ 'getNextId': 1, }), Mock({ 'getNextId': 2, })
app.pt = Mock({ 'getCellList': ReturnValues((cell1, ), (cell1, ), (cell1, cell2)), })
app.pt = Mock({ 'getCellListForID': ReturnValues((cell1, ), (cell1, ), (cell1, cell2)), })
app.cp = Mock({ 'getConnForNode': ReturnValues(conn1, conn2), })
# fake data
app.local_var.data_dict = {oid1: '', oid2: ''}
......@@ -557,7 +566,7 @@ class ClientApplicationTest(NeoTestBase):
self.assertFalse(app.local_var.txn is txn)
conn = Mock()
cell = Mock()
app.pt = Mock({'getCellList': (cell, cell)})
app.pt = Mock({'getCellListForID': (cell, cell)})
app.cp = Mock({'getConnForNode': ReturnValues(None, cell)})
app.tpc_finish(txn)
# no packet sent
......@@ -689,7 +698,7 @@ class ClientApplicationTest(NeoTestBase):
'getAddress': ('127.0.0.1', 10010),
})
cell = Mock({ 'getServer': 'FakeServer', 'getState': 'FakeState', })
app.pt = Mock({ 'getCellList': (cell, ) })
app.pt = Mock({ 'getCellListForID': (cell, ) })
app.cp = Mock({ 'getConnForNode': conn})
wrapper = Mock({'tryToResolveConflict': None})
txn4 = self.beginTransaction(app, tid=tid4)
......@@ -723,7 +732,7 @@ class ClientApplicationTest(NeoTestBase):
})
app.pt = Mock({
'getNodeList': (node1, node2, ),
'getCellList': ReturnValues([cell1], [cell2]),
'getCellListForID': ReturnValues([cell1], [cell2]),
})
app.cp = Mock({ 'getConnForNode': conn})
def _waitMessage(self, conn=None, msg_id=None, handler=None):
......@@ -758,7 +767,7 @@ class ClientApplicationTest(NeoTestBase):
object_cells = [ Mock({}), Mock({}) ]
history_cells = [ Mock({}), Mock({}) ]
app.pt = Mock({
'getCellList': ReturnValues(object_cells, history_cells,
'getCellListForID': ReturnValues(object_cells, history_cells,
history_cells),
})
app.cp = Mock({ 'getConnForNode': conn})
......@@ -783,6 +792,9 @@ class ClientApplicationTest(NeoTestBase):
# third iteration : node not ready
def _waitMessage4(app, conn=None, msg_id=None, handler=None):
app.local_var.node_ready = False
app.setNodeReady()
app.pt = Mock({'operational': True})
app.uuid = 'C' * 16
self.all_passed = True
# second iteration : master node changed
def _waitMessage3(app, conn=None, msg_id=None, handler=None):
......@@ -803,13 +815,12 @@ class ClientApplicationTest(NeoTestBase):
# faked environnement
app.connector_handler = DoNothingConnector
app.em = Mock({})
app.pt = Mock({ 'operational': ReturnValues(False, False, True, True)})
app.pt = Mock({ 'operational': False})
self.all_passed = False
try:
app.master_conn = app.connectToPrimaryMasterNode()
app.master_conn = app._connectToPrimaryMasterNode()
finally:
Application._waitMessage = _waitMessage_old
self.assertEquals(len(app.pt.mockGetNamedCalls('clear')), 1)
self.assertTrue(self.all_passed)
self.assertTrue(app.master_conn, neo.connection.MTClientConnection)
self.assertTrue(app.pt.operational())
......
......@@ -23,25 +23,8 @@ from mock import Mock, ReturnValues
from neo.tests.base import NeoTestBase
from neo import protocol
from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
ASK_OIDS, ANSWER_OIDS, INVALID_PTID, \
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, MASTER_NODE_TYPE, \
from neo.protocol import ERROR, \
INVALID_PTID, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.exception import ElectionFailure
......
......@@ -18,7 +18,6 @@
import logging
from neo.locking import RLock
import sys
import traceback
from neo import protocol
from neo.protocol import Packet, PacketMalformedError
......
......@@ -37,7 +37,7 @@ from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler
from neo.master.service import ServiceEventHandler
from neo.master.secondary import SecondaryEventHandler
from neo.pt import PartitionTable
from neo.master.pt import PartitionTable
from neo.util import dump
from neo.connector import getConnectorHandler
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
import neo.pt
from neo.protocol import OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE, RUNNING_STATE, BROKEN_STATE
class PartitionTable(neo.pt.PartitionTable):
"""This class manages a partition table for the primary master node"""
def make(self, node_list):
"""Make a new partition table from scratch."""
# First, filter the list of nodes.
node_list = [n for n in node_list \
if n.getState() == RUNNING_STATE and n.getUUID() is not None]
if len(node_list) == 0:
# Impossible.
raise RuntimeError, \
'cannot make a partition table with an empty storage node list'
# Take it into account that the number of storage nodes may be less than the
# number of replicas.
repeats = min(self.nr + 1, len(node_list))
index = 0
for offset in xrange(self.np):
row = []
for i in xrange(repeats):
node = node_list[index]
row.append(neo.pt.Cell(node))
self.count_dict[node] = self.count_dict.get(node, 0) + 1
index += 1
if index == len(node_list):
index = 0
self.partition_list[offset] = row
self.num_filled_rows = self.np
def findLeastUsedNode(self, excluded_node_list = ()):
min_count = self.np + 1
min_node = None
for node, count in self.count_dict.iteritems():
if min_count > count \
and node not in excluded_node_list \
and node.getState() == RUNNING_STATE:
min_node = node
min_count = count
return min_node
def dropNode(self, node):
cell_list = []
uuid = node.getUUID()
for offset, row in enumerate(self.partition_list):
if row is not None:
for cell in row:
if cell.getNode() is node:
if cell.getState() != FEEDING_STATE:
# If this cell is not feeding, find another node
# to be added.
node_list = [c.getNode() for c in row]
n = self.findLeastUsedNode(node_list)
if n is not None:
row.append(neo.pt.Cell(n, OUT_OF_DATE_STATE))
self.count_dict[n] += 1
cell_list.append((offset, n.getUUID(),
OUT_OF_DATE_STATE))
row.remove(cell)
cell_list.append((offset, uuid, DISCARDED_STATE))
break
try:
del self.count_dict[node]
except KeyError:
pass
return cell_list
def addNode(self, node):
"""Add a node. Take it into account that it might not be really a new
node. The strategy is, if a row does not contain a good number of
cells, add this node to the row, unless the node is already present
in the same row. Otherwise, check if this node should replace another
cell."""
cell_list = []
node_count = self.count_dict.get(node, 0)
for offset, row in enumerate(self.partition_list):
feeding_cell = None
max_count = 0
max_cell = None
num_cells = 0
skip = False
for cell in row:
if cell.getNode() == node:
skip = True
break
if cell.getState() == FEEDING_STATE:
feeding_cell = cell
else:
num_cells += 1
count = self.count_dict[cell.getNode()]
if count > max_count:
max_count = count
max_cell = cell
if skip:
continue
if num_cells <= self.nr:
row.append(neo.pt.Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
node_count += 1
else:
if max_count - node_count > 1:
if feeding_cell is not None \
or max_cell.getState() == OUT_OF_DATE_STATE:
# If there is a feeding cell already or it is
# out-of-date, just drop the node.
row.remove(max_cell)
cell_list.append((offset, max_cell.getUUID(),
DISCARDED_STATE))
self.count_dict[max_cell.getNode()] -= 1
else:
# Otherwise, use it as a feeding cell for safety.
max_cell.setState(FEEDING_STATE)
cell_list.append((offset, max_cell.getUUID(),
FEEDING_STATE))
# Don't count a feeding cell.
self.count_dict[max_cell.getNode()] -= 1
row.append(neo.pt.Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(),
OUT_OF_DATE_STATE))
node_count += 1
self.count_dict[node] = node_count
self.log()
return cell_list
def tweak(self):
"""Test if nodes are distributed uniformly. Otherwise, correct the
partition table."""
changed_cell_list = []
for offset, row in enumerate(self.partition_list):
removed_cell_list = []
feeding_cell = None
out_of_date_cell_present = False
out_of_date_cell_list = []
up_to_date_cell_list = []
for cell in row:
if cell.getNodeState() == BROKEN_STATE:
# Remove a broken cell.
removed_cell_list.append(cell)
elif cell.getState() == FEEDING_STATE:
if feeding_cell is None:
feeding_cell = cell
else:
# Remove an excessive feeding cell.
removed_cell_list.append(cell)
elif cell.getState() == OUT_OF_DATE_STATE:
out_of_date_cell_list.append(cell)
else:
up_to_date_cell_list.append(cell)
# If all cells are up-to-date, a feeding cell is not required.
if len(out_of_date_cell_list) == 0 and feeding_cell is not None:
removed_cell_list.append(feeding_cell)
ideal_num = self.nr + 1
while len(out_of_date_cell_list) + len(up_to_date_cell_list) > ideal_num:
# This row contains too many cells.
if len(up_to_date_cell_list) > 1:
# There are multiple up-to-date cells, so choose whatever
# used too much.
cell_list = out_of_date_cell_list + up_to_date_cell_list
else:
# Drop an out-of-date cell.
cell_list = out_of_date_cell_list
max_count = 0
chosen_cell = None
for cell in cell_list:
count = self.count_dict[cell.getNode()]
if max_count < count:
max_count = count
chosen_cell = cell
removed_cell_list.append(chosen_cell)
try:
out_of_date_cell_list.remove(chosen_cell)
except ValueError:
up_to_date_cell_list.remove(chosen_cell)
# Now remove cells really.
for cell in removed_cell_list:
row.remove(cell)
if cell.getState() != FEEDING_STATE:
self.count_dict[cell.getNode()] -= 1
changed_cell_list.append((offset, cell.getUUID(), DISCARDED_STATE))
# Add cells, if a row contains less than the number of replicas.
for offset, row in enumerate(self.partition_list):
num_cells = 0
for cell in row:
if cell.getState() != FEEDING_STATE:
num_cells += 1
while num_cells <= self.nr:
node = self.findLeastUsedNode([cell.getNode() for cell in row])
if node is None:
break
row.append(neo.pt.Cell(node, OUT_OF_DATE_STATE))
changed_cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
self.count_dict[node] += 1
num_cells += 1
# FIXME still not enough. It is necessary to check if it is possible
# to reduce differences between frequently used nodes and rarely used
# nodes by replacing cells.
self.log()
return changed_cell_list
def outdate(self):
"""Outdate all non-working nodes."""
cell_list = []
for offset, row in enumerate(self.partition_list):
for cell in row:
if cell.getNodeState() != RUNNING_STATE \
and cell.getState() != OUT_OF_DATE_STATE:
cell.setState(OUT_OF_DATE_STATE)
cell_list.append((offset, cell.getUUID(), OUT_OF_DATE_STATE))
return cell_list
from neo.master.tests.testMasterApp import MasterAppTests
from neo.master.tests.testMasterPT import MasterPartitionTableTests
from neo.master.tests.testMasterElectionHandler import MasterElectionTests
from neo.master.tests.testMasterRecoveryHandler import MasterRecoveryTests
from neo.master.tests.testMasterService import MasterServiceTests
from neo.master.tests.testMasterVerificationHandler import MasterVerificationeTests
from neo.master.tests.testMasterVerificationHandler import MasterVerificationTests
__all__ = [
'MasterAppTests',
......@@ -10,5 +11,6 @@ __all__ = [
'MasterRecoveryTests',
'MasterServiceTests',
'MasterVerificationeTests',
'MasterPartitionTableTests',
]
#
# Copyright (C) 2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import unittest, os
from mock import Mock
from neo.tests.base import NeoTestBase
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
BROKEN_STATE, INVALID_UUID
from neo.pt import Cell
from neo.master.pt import PartitionTable
from neo.node import StorageNode
class MasterPartitionTableTests(NeoTestBase):
def test_02_PartitionTable_creation(self):
num_partitions = 5
num_replicas = 3
pt = PartitionTable(num_partitions, num_replicas)
self.assertEqual(pt.np, num_partitions)
self.assertEqual(pt.nr, num_replicas)
self.assertEqual(pt.num_filled_rows, 0)
partition_list = pt.partition_list
self.assertEqual(len(partition_list), num_partitions)
for x in xrange(num_partitions):
part = partition_list[x]
self.failUnless(isinstance(part, list))
self.assertEqual(len(part), 0)
self.assertEqual(len(pt.count_dict), 0)
# no nodes or cells for now
self.assertEqual(len(pt.getNodeList()), 0)
for x in xrange(num_partitions):
self.assertEqual(len(pt.getCellList(x)), 0)
self.assertEqual(len(pt.getCellList(x, True)), 0)
self.assertEqual(len(pt.getRow(x)), 0)
self.assertFalse(pt.operational())
self.assertFalse(pt.filled())
self.assertRaises(RuntimeError, pt.make, [])
self.assertFalse(pt.operational())
self.assertFalse(pt.filled())
def test_11_findLeastUsedNode(self):
num_partitions = 5
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19001)
sn2 = StorageNode(server2, uuid2)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn2, UP_TO_DATE_STATE)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, uuid3)
pt.setCell(0, sn3, UP_TO_DATE_STATE)
# test
node = pt.findLeastUsedNode()
self.assertEqual(node, sn3)
node = pt.findLeastUsedNode((sn3,))
self.assertEqual(node, sn2)
node = pt.findLeastUsedNode((sn3,sn2))
self.assertEqual(node, sn1)
def test_13_outdate(self):
# create nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19003)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19004)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 19005)
sn5 = StorageNode(server5, uuid5)
# create partition table
num_partitions = 5
num_replicas = 3
pt = PartitionTable(num_partitions, num_replicas)
pt.setCell(0, sn1, OUT_OF_DATE_STATE)
sn1.setState(RUNNING_STATE)
pt.setCell(1, sn2, UP_TO_DATE_STATE)
sn2.setState(TEMPORARILY_DOWN_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
sn3.setState(DOWN_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
sn4.setState(BROKEN_STATE)
pt.setCell(4, sn5, UP_TO_DATE_STATE)
sn5.setState(RUNNING_STATE)
# outdate nodes
cells_outdated = pt.outdate()
self.assertEqual(len(cells_outdated), 3)
for offset, uuid, state in cells_outdated:
self.failUnless(offset in (1,2,3))
self.failUnless(uuid in (uuid2,uuid3,uuid4))
self.assertEqual(state, OUT_OF_DATE_STATE)
# check each cell
# part 1, already outdated
cells = pt.getCellList(0)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 2, must be outdated
cells = pt.getCellList(1)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 3, must be outdated
cells = pt.getCellList(2)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 4, already outdated
cells = pt.getCellList(3)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 5, remains running
cells = pt.getCellList(4)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
def test_14_addNode(self):
num_partitions = 5
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
# add it to an empty pt
cell_list = pt.addNode(sn1)
self.assertEqual(len(cell_list), 5)
# it must be added to all partitions
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 1)
self.assertEqual(pt.getCellList(x)[0].getState(), OUT_OF_DATE_STATE)
self.assertEqual(pt.getCellList(x)[0].getNode(), sn1)
self.assertEqual(pt.count_dict[sn1], 5)
# add same node again, must remain the same
cell_list = pt.addNode(sn1)
self.assertEqual(len(cell_list), 0)
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 1)
self.assertEqual(pt.getCellList(x)[0].getState(), OUT_OF_DATE_STATE)
self.assertEqual(pt.getCellList(x)[0].getNode(), sn1)
self.assertEqual(pt.count_dict[sn1], 5)
# add a second node to fill the partition table
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
# add it
cell_list = pt.addNode(sn2)
self.assertEqual(len(cell_list), 5)
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 2)
self.assertEqual(pt.getCellList(x)[0].getState(), OUT_OF_DATE_STATE)
self.failUnless(pt.getCellList(x)[0].getNode() in (sn1, sn2))
# test the most used node is remove from some partition
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19001)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 1900)
sn5 = StorageNode(server5, uuid5)
# partition looks like:
# 0 : sn1, sn2
# 1 : sn1, sn3
# 2 : sn1, sn4
# 3 : sn1, sn5
num_partitions = 4
num_replicas = 1
pt = PartitionTable(num_partitions, num_replicas)
# node most used is out of date, just dropped
pt.setCell(0, sn1, OUT_OF_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn1, OUT_OF_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, OUT_OF_DATE_STATE)
pt.setCell(2, sn4, UP_TO_DATE_STATE)
pt.setCell(3, sn1, OUT_OF_DATE_STATE)
pt.setCell(3, sn5, UP_TO_DATE_STATE)
uuid6 = self.getNewUUID()
server6 = ("127.0.0.6", 19006)
sn6 = StorageNode(server6, uuid6)
cell_list = pt.addNode(sn6)
# sn1 is removed twice and sn6 is added twice
self.assertEqual(len(cell_list), 4)
for offset, uuid, state in cell_list:
if offset in (0,1):
if uuid == uuid1:
self.assertEqual(state, DISCARDED_STATE)
elif uuid == uuid6:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid1, uuid6))
else:
self.failUnless(offset in (0, 1))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 2)
# there is a feeding cell, just dropped
pt.clear()
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(0, sn3, FEEDING_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn2, FEEDING_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn4, FEEDING_STATE)
pt.setCell(2, sn5, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
pt.setCell(3, sn5, FEEDING_STATE)
cell_list = pt.addNode(sn6)
# sn1 is removed twice and sn6 is added twice
self.assertEqual(len(cell_list), 4)
for offset, uuid, state in cell_list:
if offset in (0,1):
if uuid == uuid1:
self.assertEqual(state, DISCARDED_STATE)
elif uuid == uuid6:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid1, uuid6))
else:
self.failUnless(offset in (0, 1))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 3)
# there is no feeding cell, marked as feeding
pt.clear()
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn4, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn5, UP_TO_DATE_STATE)
cell_list = pt.addNode(sn6)
# sn1 is removed twice and sn6 is added twice
self.assertEqual(len(cell_list), 4)
for offset, uuid, state in cell_list:
if offset in (0,1):
if uuid == uuid1:
self.assertEqual(state, FEEDING_STATE)
elif uuid == uuid6:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid1, uuid6))
else:
self.failUnless(offset in (0, 1))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 3)
def test_15_dropNode(self):
num_partitions = 4
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19001)
sn4 = StorageNode(server4, uuid4)
# partition looks like:
# 0 : sn1, sn2
# 1 : sn1, sn3
# 2 : sn1, sn3
# 3 : sn1, sn4
# node is not feeding, so retrive least use node to replace it
# so sn2 must be repaced by sn4 in partition 0
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
cell_list = pt.dropNode(sn2)
self.assertEqual(len(cell_list), 2)
for offset, uuid, state in cell_list:
self.assertEqual(offset, 0)
if uuid == uuid2:
self.assertEqual(state, DISCARDED_STATE)
elif uuid == uuid4:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid2, uuid4))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 2)
# same test but with feeding state, no other will be added
pt.clear()
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, FEEDING_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
cell_list = pt.dropNode(sn2)
self.assertEqual(len(cell_list), 1)
for offset, uuid, state in cell_list:
self.assertEqual(offset, 0)
self.assertEqual(state, DISCARDED_STATE)
self.assertEqual(uuid ,uuid2)
for x in xrange(num_replicas):
if x == 0:
self.assertEqual(len(pt.getCellList(x)), 1)
else:
self.assertEqual(len(pt.getCellList(x)), 2)
def test_16_make(self):
num_partitions = 5
num_replicas = 1
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
# add not running node
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19001)
sn2 = StorageNode(server2, uuid2)
sn2.setState(TEMPORARILY_DOWN_STATE)
# add node without uuid
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, None)
# add clear node
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19001)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 1900)
sn5 = StorageNode(server5, uuid5)
# make the table
pt.make([sn1, sn2, sn3, sn4, sn5,])
# check it's ok, only running nodes and node with uuid
# must be present
for x in xrange(num_partitions):
cells = pt.getCellList(x)
self.assertEqual(len(cells), 2)
nodes = [x.getNode() for x in cells]
for node in nodes:
self.failUnless(node in (sn1, sn4, sn5))
self.failUnless(node not in (sn2, sn3))
self.assertTrue(pt.filled())
self.assertTrue(pt.operational())
# create a pt with less nodes
pt.clear()
self.assertFalse(pt.filled())
self.assertFalse(pt.operational())
pt.make([sn1,])
# check it's ok
for x in xrange(num_partitions):
cells = pt.getCellList(x)
self.assertEqual(len(cells), 1)
nodes = [x.getNode() for x in cells]
for node in nodes:
self.assertEqual(node, sn1)
self.assertTrue(pt.filled())
self.assertTrue(pt.operational())
def test_17_tweak(self):
# remove broken node
# remove if too many feeding nodes
# remove feeding if all cells are up to date
# if too many cells, remove most used cell
# if not enought cell, add least used node
# create nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19003)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19004)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 19005)
sn5 = StorageNode(server5, uuid5)
# create partition table
# 0 : sn1(discarded), sn2(up), -> sn2 must remain
# 1 : sn1(feeding), sn2(feeding), sn3(up) -> one feeding and sn3 must remain
# 2 : sn1(feeding), sn2(up), sn3(up) -> sn2 and sn3 must remain, feeding must go away
# 3 : sn1(up), sn2(up), sn3(up), sn4(up) -> only 3 cell must remain
# 4 : sn1(up), sn5(up) -> one more cell must be added
num_partitions = 5
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# part 0
pt.setCell(0, sn1, DISCARDED_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
# part 1
pt.setCell(1, sn1, FEEDING_STATE)
pt.setCell(1, sn2, FEEDING_STATE)
pt.setCell(1, sn3, OUT_OF_DATE_STATE)
# part 2
pt.setCell(2, sn1, FEEDING_STATE)
pt.setCell(2, sn2, UP_TO_DATE_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
# part 3
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn2, UP_TO_DATE_STATE)
pt.setCell(3, sn3, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
# part 4
pt.setCell(4, sn1, UP_TO_DATE_STATE)
pt.setCell(4, sn5, UP_TO_DATE_STATE)
# now tweak the table
pt.tweak()
# check part 1
cells = pt.getCellList(0)
self.assertEqual(len(cells), 3)
for cell in cells:
self.assertNotEqual(cell.getState(), DISCARDED_STATE)
if cell.getNode() == sn2:
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn2 in [x.getNode() for x in cells])
# check part 2
cells = pt.getCellList(1)
self.assertEqual(len(cells), 4)
for cell in cells:
if cell.getNode() == sn1:
self.assertEqual(cell.getState(), FEEDING_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn3 in [x.getNode() for x in cells])
self.failUnless(sn1 in [x.getNode() for x in cells])
# check part 3
cells = pt.getCellList(2)
self.assertEqual(len(cells), 3)
for cell in cells:
if cell.getNode() in (sn2, sn3):
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn3 in [x.getNode() for x in cells])
self.failUnless(sn2 in [x.getNode() for x in cells])
# check part 4
cells = pt.getCellList(3)
self.assertEqual(len(cells), 3)
for cell in cells:
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
# check part 5
cells = pt.getCellList(4)
self.assertEqual(len(cells), 3)
for cell in cells:
if cell.getNode() in (sn1, sn5):
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn1 in [x.getNode() for x in cells])
self.failUnless(sn5 in [x.getNode() for x in cells])
if __name__ == '__main__':
unittest.main()
......@@ -38,7 +38,7 @@ from neo.master.tests.connector import DoNothingConnector
from neo.connection import ClientConnection
class MasterVerificationeTests(NeoTestBase):
class MasterVerificationTests(NeoTestBase):
def setUp(self):
logging.basicConfig(level = logging.WARNING)
......
......@@ -20,6 +20,7 @@ import logging
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
BROKEN_STATE, VALID_CELL_STATE_LIST, HIDDEN_STATE
from ZODB.utils import u64
from neo.util import dump
class Cell(object):
......@@ -66,6 +67,12 @@ class PartitionTable(object):
self.partition_list = [[] for x in xrange(self.np)]
self.count_dict.clear()
def hasOffset(self, offset):
try:
return len(self.partition_list[offset]) > 0
except IndexError:
return False
def getNodeList(self):
"""Return all used nodes."""
node_list = []
......@@ -90,32 +97,8 @@ class PartitionTable(object):
except (TypeError, KeyError):
return []
def make(self, node_list):
"""Make a new partition table from scratch."""
# First, filter the list of nodes.
node_list = [n for n in node_list \
if n.getState() == RUNNING_STATE and n.getUUID() is not None]
if len(node_list) == 0:
# Impossible.
raise RuntimeError, \
'cannot make a partition table with an empty storage node list'
# Take it into account that the number of storage nodes may be less than the
# number of replicas.
repeats = min(self.nr + 1, len(node_list))
index = 0
for offset in xrange(self.np):
row = []
for i in xrange(repeats):
node = node_list[index]
row.append(Cell(node))
self.count_dict[node] = self.count_dict.get(node, 0) + 1
index += 1
if index == len(node_list):
index = 0
self.partition_list[offset] = row
self.num_filled_rows = self.np
def getCellListForID(self, id, readable=False, writable=False):
return self.getCellList(u64(id) % self.np, readable, writable)
def setCell(self, offset, node, state):
assert state in VALID_CELL_STATE_LIST
......@@ -159,12 +142,6 @@ class PartitionTable(object):
def filled(self):
return self.num_filled_rows == self.np
def hasOffset(self, offset):
try:
return len(self.partition_list[offset]) > 0
except IndexError:
return False
def log(self):
"""Help debugging partition table management.
......@@ -240,201 +217,9 @@ class PartitionTable(object):
return True
def findLeastUsedNode(self, excluded_node_list = ()):
min_count = self.np + 1
min_node = None
for node, count in self.count_dict.iteritems():
if min_count > count \
and node not in excluded_node_list \
and node.getState() == RUNNING_STATE:
min_node = node
min_count = count
return min_node
def dropNode(self, node):
cell_list = []
uuid = node.getUUID()
for offset, row in enumerate(self.partition_list):
if row is not None:
for cell in row:
if cell.getNode() is node:
if cell.getState() != FEEDING_STATE:
# If this cell is not feeding, find another node
# to be added.
node_list = [c.getNode() for c in row]
n = self.findLeastUsedNode(node_list)
if n is not None:
row.append(Cell(n, OUT_OF_DATE_STATE))
self.count_dict[n] += 1
cell_list.append((offset, n.getUUID(),
OUT_OF_DATE_STATE))
row.remove(cell)
cell_list.append((offset, uuid, DISCARDED_STATE))
break
try:
del self.count_dict[node]
except KeyError:
pass
return cell_list
def addNode(self, node):
"""Add a node. Take it into account that it might not be really a new
node. The strategy is, if a row does not contain a good number of
cells, add this node to the row, unless the node is already present
in the same row. Otherwise, check if this node should replace another
cell."""
cell_list = []
node_count = self.count_dict.get(node, 0)
for offset, row in enumerate(self.partition_list):
feeding_cell = None
max_count = 0
max_cell = None
num_cells = 0
skip = False
for cell in row:
if cell.getNode() == node:
skip = True
break
if cell.getState() == FEEDING_STATE:
feeding_cell = cell
else:
num_cells += 1
count = self.count_dict[cell.getNode()]
if count > max_count:
max_count = count
max_cell = cell
if skip:
continue
if num_cells <= self.nr:
row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
node_count += 1
else:
if max_count - node_count > 1:
if feeding_cell is not None \
or max_cell.getState() == OUT_OF_DATE_STATE:
# If there is a feeding cell already or it is
# out-of-date, just drop the node.
row.remove(max_cell)
cell_list.append((offset, max_cell.getUUID(),
DISCARDED_STATE))
self.count_dict[max_cell.getNode()] -= 1
else:
# Otherwise, use it as a feeding cell for safety.
max_cell.setState(FEEDING_STATE)
cell_list.append((offset, max_cell.getUUID(),
FEEDING_STATE))
# Don't count a feeding cell.
self.count_dict[max_cell.getNode()] -= 1
row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(),
OUT_OF_DATE_STATE))
node_count += 1
self.count_dict[node] = node_count
self.log()
return cell_list
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return ()
return [(cell.getUUID(), cell.getState()) for cell in row]
def tweak(self):
"""Test if nodes are distributed uniformly. Otherwise, correct the
partition table."""
changed_cell_list = []
for offset, row in enumerate(self.partition_list):
removed_cell_list = []
feeding_cell = None
out_of_date_cell_present = False
out_of_date_cell_list = []
up_to_date_cell_list = []
for cell in row:
if cell.getNodeState() == BROKEN_STATE:
# Remove a broken cell.
removed_cell_list.append(cell)
elif cell.getState() == FEEDING_STATE:
if feeding_cell is None:
feeding_cell = cell
else:
# Remove an excessive feeding cell.
removed_cell_list.append(cell)
elif cell.getState() == OUT_OF_DATE_STATE:
out_of_date_cell_list.append(cell)
else:
up_to_date_cell_list.append(cell)
# If all cells are up-to-date, a feeding cell is not required.
if len(out_of_date_cell_list) == 0 and feeding_cell is not None:
removed_cell_list.append(feeding_cell)
ideal_num = self.nr + 1
while len(out_of_date_cell_list) + len(up_to_date_cell_list) > ideal_num:
# This row contains too many cells.
if len(up_to_date_cell_list) > 1:
# There are multiple up-to-date cells, so choose whatever
# used too much.
cell_list = out_of_date_cell_list + up_to_date_cell_list
else:
# Drop an out-of-date cell.
cell_list = out_of_date_cell_list
max_count = 0
chosen_cell = None
for cell in cell_list:
count = self.count_dict[cell.getNode()]
if max_count < count:
max_count = count
chosen_cell = cell
removed_cell_list.append(chosen_cell)
try:
out_of_date_cell_list.remove(chosen_cell)
except ValueError:
up_to_date_cell_list.remove(chosen_cell)
# Now remove cells really.
for cell in removed_cell_list:
row.remove(cell)
if cell.getState() != FEEDING_STATE:
self.count_dict[cell.getNode()] -= 1
changed_cell_list.append((offset, cell.getUUID(), DISCARDED_STATE))
# Add cells, if a row contains less than the number of replicas.
for offset, row in enumerate(self.partition_list):
num_cells = 0
for cell in row:
if cell.getState() != FEEDING_STATE:
num_cells += 1
while num_cells <= self.nr:
node = self.findLeastUsedNode([cell.getNode() for cell in row])
if node is None:
break
row.append(Cell(node, OUT_OF_DATE_STATE))
changed_cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
self.count_dict[node] += 1
num_cells += 1
# FIXME still not enough. It is necessary to check if it is possible
# to reduce differences between frequently used nodes and rarely used
# nodes by replacing cells.
self.log()
return changed_cell_list
def outdate(self):
"""Outdate all non-working nodes."""
cell_list = []
for offset, row in enumerate(self.partition_list):
for cell in row:
if cell.getNodeState() != RUNNING_STATE \
and cell.getState() != OUT_OF_DATE_STATE:
cell.setState(OUT_OF_DATE_STATE)
cell_list.append((offset, cell.getUUID(), OUT_OF_DATE_STATE))
return cell_list
......@@ -25,7 +25,7 @@ from neo.tests.base import NeoTestBase
from neo.exception import DatabaseFailure
from neo.storage.mysqldb import MySQLDatabaseManager, p64, u64
NEO_SQL_DATABASE = 'test_mysqldb_1'
NEO_SQL_DATABASE = 'test_mysqldb1'
NEO_SQL_USER = 'test'
class StorageMySQSLdbTests(NeoTestBase):
......
......@@ -22,21 +22,9 @@ from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
BROKEN_STATE, INVALID_UUID
from neo.pt import Cell, PartitionTable
from neo.node import StorageNode
from neo.tests.base import NeoTestBase
class testPartitionTable(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def getNewUUID(self):
uuid = INVALID_UUID
while uuid == INVALID_UUID:
uuid = os.urandom(16)
self.uuid = uuid
return uuid
class testPartitionTable(NeoTestBase):
def test_01_Cell(self):
uuid = self.getNewUUID()
......@@ -59,32 +47,6 @@ class testPartitionTable(unittest.TestCase):
self.assertEquals(cell.getState(), FEEDING_STATE)
def test_02_PartitionTable_creation(self):
num_partitions = 5
num_replicas = 3
pt = PartitionTable(num_partitions, num_replicas)
self.assertEqual(pt.np, num_partitions)
self.assertEqual(pt.nr, num_replicas)
self.assertEqual(pt.num_filled_rows, 0)
partition_list = pt.partition_list
self.assertEqual(len(partition_list), num_partitions)
for x in xrange(num_partitions):
part = partition_list[x]
self.failUnless(isinstance(part, list))
self.assertEqual(len(part), 0)
self.assertEqual(len(pt.count_dict), 0)
# no nodes or cells for now
self.assertEqual(len(pt.getNodeList()), 0)
for x in xrange(num_partitions):
self.assertEqual(len(pt.getCellList(x)), 0)
self.assertEqual(len(pt.getCellList(x, True)), 0)
self.assertEqual(len(pt.getRow(x)), 0)
self.assertFalse(pt.operational())
self.assertFalse(pt.filled())
self.assertRaises(RuntimeError, pt.make, [])
self.assertFalse(pt.operational())
self.assertFalse(pt.filled())
def test_03_setCell(self):
num_partitions = 5
num_replicas = 2
......@@ -414,34 +376,6 @@ class testPartitionTable(unittest.TestCase):
# it's not up to date and running, so not operational
self.assertFalse(pt.operational())
def test_11_findLeastUsedNode(self):
num_partitions = 5
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19001)
sn2 = StorageNode(server2, uuid2)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn2, UP_TO_DATE_STATE)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, uuid3)
pt.setCell(0, sn3, UP_TO_DATE_STATE)
# test
node = pt.findLeastUsedNode()
self.assertEqual(node, sn3)
node = pt.findLeastUsedNode((sn3,))
self.assertEqual(node, sn2)
node = pt.findLeastUsedNode((sn3,sn2))
self.assertEqual(node, sn1)
def test_12_getRow(self):
num_partitions = 5
num_replicas = 2
......@@ -485,429 +419,6 @@ class testPartitionTable(unittest.TestCase):
# unknwon row
self.assertRaises(IndexError, pt.getRow, 5)
def test_13_outdate(self):
# create nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19003)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19004)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 19005)
sn5 = StorageNode(server5, uuid5)
# create partition table
num_partitions = 5
num_replicas = 3
pt = PartitionTable(num_partitions, num_replicas)
pt.setCell(0, sn1, OUT_OF_DATE_STATE)
sn1.setState(RUNNING_STATE)
pt.setCell(1, sn2, UP_TO_DATE_STATE)
sn2.setState(TEMPORARILY_DOWN_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
sn3.setState(DOWN_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
sn4.setState(BROKEN_STATE)
pt.setCell(4, sn5, UP_TO_DATE_STATE)
sn5.setState(RUNNING_STATE)
# outdate nodes
cells_outdated = pt.outdate()
self.assertEqual(len(cells_outdated), 3)
for offset, uuid, state in cells_outdated:
self.failUnless(offset in (1,2,3))
self.failUnless(uuid in (uuid2,uuid3,uuid4))
self.assertEqual(state, OUT_OF_DATE_STATE)
# check each cell
# part 1, already outdated
cells = pt.getCellList(0)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 2, must be outdated
cells = pt.getCellList(1)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 3, must be outdated
cells = pt.getCellList(2)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 4, already outdated
cells = pt.getCellList(3)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
# part 5, remains running
cells = pt.getCellList(4)
self.assertEqual(len(cells), 1)
cell = cells[0]
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
def test_14_addNode(self):
num_partitions = 5
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
# add it to an empty pt
cell_list = pt.addNode(sn1)
self.assertEqual(len(cell_list), 5)
# it must be added to all partitions
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 1)
self.assertEqual(pt.getCellList(x)[0].getState(), OUT_OF_DATE_STATE)
self.assertEqual(pt.getCellList(x)[0].getNode(), sn1)
self.assertEqual(pt.count_dict[sn1], 5)
# add same node again, must remain the same
cell_list = pt.addNode(sn1)
self.assertEqual(len(cell_list), 0)
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 1)
self.assertEqual(pt.getCellList(x)[0].getState(), OUT_OF_DATE_STATE)
self.assertEqual(pt.getCellList(x)[0].getNode(), sn1)
self.assertEqual(pt.count_dict[sn1], 5)
# add a second node to fill the partition table
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
# add it
cell_list = pt.addNode(sn2)
self.assertEqual(len(cell_list), 5)
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 2)
self.assertEqual(pt.getCellList(x)[0].getState(), OUT_OF_DATE_STATE)
self.failUnless(pt.getCellList(x)[0].getNode() in (sn1, sn2))
# test the most used node is remove from some partition
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19001)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 1900)
sn5 = StorageNode(server5, uuid5)
# partition looks like:
# 0 : sn1, sn2
# 1 : sn1, sn3
# 2 : sn1, sn4
# 3 : sn1, sn5
num_partitions = 4
num_replicas = 1
pt = PartitionTable(num_partitions, num_replicas)
# node most used is out of date, just dropped
pt.setCell(0, sn1, OUT_OF_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn1, OUT_OF_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, OUT_OF_DATE_STATE)
pt.setCell(2, sn4, UP_TO_DATE_STATE)
pt.setCell(3, sn1, OUT_OF_DATE_STATE)
pt.setCell(3, sn5, UP_TO_DATE_STATE)
uuid6 = self.getNewUUID()
server6 = ("127.0.0.6", 19006)
sn6 = StorageNode(server6, uuid6)
cell_list = pt.addNode(sn6)
# sn1 is removed twice and sn6 is added twice
self.assertEqual(len(cell_list), 4)
for offset, uuid, state in cell_list:
if offset in (0,1):
if uuid == uuid1:
self.assertEqual(state, DISCARDED_STATE)
elif uuid == uuid6:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid1, uuid6))
else:
self.failUnless(offset in (0, 1))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 2)
# there is a feeding cell, just dropped
pt.clear()
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(0, sn3, FEEDING_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn2, FEEDING_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn4, FEEDING_STATE)
pt.setCell(2, sn5, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
pt.setCell(3, sn5, FEEDING_STATE)
cell_list = pt.addNode(sn6)
# sn1 is removed twice and sn6 is added twice
self.assertEqual(len(cell_list), 4)
for offset, uuid, state in cell_list:
if offset in (0,1):
if uuid == uuid1:
self.assertEqual(state, DISCARDED_STATE)
elif uuid == uuid6:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid1, uuid6))
else:
self.failUnless(offset in (0, 1))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 3)
# there is no feeding cell, marked as feeding
pt.clear()
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn4, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn5, UP_TO_DATE_STATE)
cell_list = pt.addNode(sn6)
# sn1 is removed twice and sn6 is added twice
self.assertEqual(len(cell_list), 4)
for offset, uuid, state in cell_list:
if offset in (0,1):
if uuid == uuid1:
self.assertEqual(state, FEEDING_STATE)
elif uuid == uuid6:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid1, uuid6))
else:
self.failUnless(offset in (0, 1))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 3)
def test_15_dropNode(self):
num_partitions = 4
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19001)
sn4 = StorageNode(server4, uuid4)
# partition looks like:
# 0 : sn1, sn2
# 1 : sn1, sn3
# 2 : sn1, sn3
# 3 : sn1, sn4
# node is not feeding, so retrive least use node to replace it
# so sn2 must be repaced by sn4 in partition 0
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
cell_list = pt.dropNode(sn2)
self.assertEqual(len(cell_list), 2)
for offset, uuid, state in cell_list:
self.assertEqual(offset, 0)
if uuid == uuid2:
self.assertEqual(state, DISCARDED_STATE)
elif uuid == uuid4:
self.assertEqual(state, OUT_OF_DATE_STATE)
else:
self.failUnless(uuid in (uuid2, uuid4))
for x in xrange(num_replicas):
self.assertEqual(len(pt.getCellList(x)), 2)
# same test but with feeding state, no other will be added
pt.clear()
pt.setCell(0, sn1, UP_TO_DATE_STATE)
pt.setCell(0, sn2, FEEDING_STATE)
pt.setCell(1, sn1, UP_TO_DATE_STATE)
pt.setCell(1, sn3, UP_TO_DATE_STATE)
pt.setCell(2, sn1, UP_TO_DATE_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
cell_list = pt.dropNode(sn2)
self.assertEqual(len(cell_list), 1)
for offset, uuid, state in cell_list:
self.assertEqual(offset, 0)
self.assertEqual(state, DISCARDED_STATE)
self.assertEqual(uuid ,uuid2)
for x in xrange(num_replicas):
if x == 0:
self.assertEqual(len(pt.getCellList(x)), 1)
else:
self.assertEqual(len(pt.getCellList(x)), 2)
def test_16_make(self):
num_partitions = 5
num_replicas = 1
pt = PartitionTable(num_partitions, num_replicas)
# add nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
# add not running node
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19001)
sn2 = StorageNode(server2, uuid2)
sn2.setState(TEMPORARILY_DOWN_STATE)
# add node without uuid
server3 = ("127.0.0.3", 19001)
sn3 = StorageNode(server3, None)
# add clear node
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19001)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 1900)
sn5 = StorageNode(server5, uuid5)
# make the table
pt.make([sn1, sn2, sn3, sn4, sn5,])
# check it's ok, only running nodes and node with uuid
# must be present
for x in xrange(num_partitions):
cells = pt.getCellList(x)
self.assertEqual(len(cells), 2)
nodes = [x.getNode() for x in cells]
for node in nodes:
self.failUnless(node in (sn1, sn4, sn5))
self.failUnless(node not in (sn2, sn3))
self.assertTrue(pt.filled())
self.assertTrue(pt.operational())
# create a pt with less nodes
pt.clear()
self.assertFalse(pt.filled())
self.assertFalse(pt.operational())
pt.make([sn1,])
# check it's ok
for x in xrange(num_partitions):
cells = pt.getCellList(x)
self.assertEqual(len(cells), 1)
nodes = [x.getNode() for x in cells]
for node in nodes:
self.assertEqual(node, sn1)
self.assertTrue(pt.filled())
self.assertTrue(pt.operational())
def test_17_tweak(self):
# remove broken node
# remove if too many feeding nodes
# remove feeding if all cells are up to date
# if too many cells, remove most used cell
# if not enought cell, add least used node
# create nodes
uuid1 = self.getNewUUID()
server1 = ("127.0.0.1", 19001)
sn1 = StorageNode(server1, uuid1)
uuid2 = self.getNewUUID()
server2 = ("127.0.0.2", 19002)
sn2 = StorageNode(server2, uuid2)
uuid3 = self.getNewUUID()
server3 = ("127.0.0.3", 19003)
sn3 = StorageNode(server3, uuid3)
uuid4 = self.getNewUUID()
server4 = ("127.0.0.4", 19004)
sn4 = StorageNode(server4, uuid4)
uuid5 = self.getNewUUID()
server5 = ("127.0.0.5", 19005)
sn5 = StorageNode(server5, uuid5)
# create partition table
# 0 : sn1(discarded), sn2(up), -> sn2 must remain
# 1 : sn1(feeding), sn2(feeding), sn3(up) -> one feeding and sn3 must remain
# 2 : sn1(feeding), sn2(up), sn3(up) -> sn2 and sn3 must remain, feeding must go away
# 3 : sn1(up), sn2(up), sn3(up), sn4(up) -> only 3 cell must remain
# 4 : sn1(up), sn5(up) -> one more cell must be added
num_partitions = 5
num_replicas = 2
pt = PartitionTable(num_partitions, num_replicas)
# part 0
pt.setCell(0, sn1, DISCARDED_STATE)
pt.setCell(0, sn2, UP_TO_DATE_STATE)
# part 1
pt.setCell(1, sn1, FEEDING_STATE)
pt.setCell(1, sn2, FEEDING_STATE)
pt.setCell(1, sn3, OUT_OF_DATE_STATE)
# part 2
pt.setCell(2, sn1, FEEDING_STATE)
pt.setCell(2, sn2, UP_TO_DATE_STATE)
pt.setCell(2, sn3, UP_TO_DATE_STATE)
# part 3
pt.setCell(3, sn1, UP_TO_DATE_STATE)
pt.setCell(3, sn2, UP_TO_DATE_STATE)
pt.setCell(3, sn3, UP_TO_DATE_STATE)
pt.setCell(3, sn4, UP_TO_DATE_STATE)
# part 4
pt.setCell(4, sn1, UP_TO_DATE_STATE)
pt.setCell(4, sn5, UP_TO_DATE_STATE)
# now tweak the table
pt.tweak()
# check part 1
cells = pt.getCellList(0)
self.assertEqual(len(cells), 3)
for cell in cells:
self.assertNotEqual(cell.getState(), DISCARDED_STATE)
if cell.getNode() == sn2:
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn2 in [x.getNode() for x in cells])
# check part 2
cells = pt.getCellList(1)
self.assertEqual(len(cells), 4)
for cell in cells:
if cell.getNode() == sn1:
self.assertEqual(cell.getState(), FEEDING_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn3 in [x.getNode() for x in cells])
self.failUnless(sn1 in [x.getNode() for x in cells])
# check part 3
cells = pt.getCellList(2)
self.assertEqual(len(cells), 3)
for cell in cells:
if cell.getNode() in (sn2, sn3):
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn3 in [x.getNode() for x in cells])
self.failUnless(sn2 in [x.getNode() for x in cells])
# check part 4
cells = pt.getCellList(3)
self.assertEqual(len(cells), 3)
for cell in cells:
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
# check part 5
cells = pt.getCellList(4)
self.assertEqual(len(cells), 3)
for cell in cells:
if cell.getNode() in (sn1, sn5):
self.assertEqual(cell.getState(), UP_TO_DATE_STATE)
else:
self.assertEqual(cell.getState(), OUT_OF_DATE_STATE)
self.failUnless(sn1 in [x.getNode() for x in cells])
self.failUnless(sn5 in [x.getNode() for x in cells])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment