Commit 37bc1b1b authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add more tests for neo storage side, factorize better the code.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1216 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5a2a8dc8
......@@ -36,18 +36,38 @@ OBJECT_NUMBER = 100
class StorageTests(unittest.TestCase):
def setUp(self):
pass
self.neo = None
def tearDown(self):
pass
if self.neo is not None:
self.neo.stop()
def queryCount(self, db, query):
db.query(query)
result = db.store_result().fetch_row()[0][0]
return result
def populate(self, neo):
db, conn = neo.getConnection()
def __setup(self, storage_number=2, pending_number=0, replicas=1, partitions=10):
# create a neo cluster
storage_number = ['test_neo%d' % i for i in xrange(storage_number)]
self.neo = NEOCluster(storage_number, port_base=20000,
master_node_count=2,
partitions=10, replicas=replicas,
)
self.neo.setupDB()
# too many pending storage nodes requested
if pending_number > storage_number:
pending_number = storage_number
storage_processes = self.neo.getStorageProcessList()
start_storage_number = len(storage_processes) - pending_number
# return a tuple of storage processes lists
started_processes = storage_processes[:start_storage_number]
stopped_processes = storage_processes[start_storage_number:]
self.neo.start(except_storages=stopped_processes)
return (started_processes, stopped_processes)
def __populate(self):
db, conn = self.neo.getConnection()
root = conn.root()
for i in xrange(OBJECT_NUMBER):
root[i] = PObject(i)
......@@ -55,13 +75,13 @@ class StorageTests(unittest.TestCase):
conn.close()
db.close()
def checkDatabase(self, neo, db_name):
def __checkDatabase(self, db_name):
db = MySQLdb.connect(db=db_name, user='test')
# wait for the sql transaction to be commited
def callback(last_try):
object_number = self.queryCount(db, 'select count(*) from obj')
return object_number == OBJECT_NUMBER + 2, last_try
neo.expectCondition(callback, 0, 1)
return object_number == OBJECT_NUMBER + 2, object_number
self.neo.expectCondition(callback, 0, 1)
# no more temporarily objects
t_objects = self.queryCount(db, 'select count(*) from tobj')
self.assertEqual(t_objects, 0)
......@@ -73,56 +93,134 @@ class StorageTests(unittest.TestCase):
objects = self.queryCount(db, query)
self.assertEqual(objects, OBJECT_NUMBER + 1)
def __checkReplicationDone(self, neo, databases):
def __checkReplicationDone(self):
# wait for replication to finish
neo.expectNoOudatedCells(timeout=10)
self.neo.expectOudatedCells(number=0, timeout=10)
# check databases
for db_name in databases:
self.checkDatabase(neo, db_name)
for db_name in self.neo.db_list:
self.__checkDatabase(db_name)
# check storages state
self.assertEqual(len(neo.getStorageNodeList(protocol.RUNNING_STATE)), 2)
storage_list = self.neo.getStorageNodeList(protocol.RUNNING_STATE)
self.assertEqual(len(storage_list), 2)
def __expectRunning(self, process):
self.neo.expectStorageState(process.getUUID(), protocol.RUNNING_STATE)
def __expectPending(self, process):
self.neo.expectStorageState(process.getUUID(), protocol.PENDING_STATE)
def __expectUnavailable(self, process):
self.neo.expectStorageState(process.getUUID(),
protocol.TEMPORARILY_DOWN_STATE)
def testReplicationWithoutBreak(self):
# create a neo cluster
databases = ['test_neo1', 'test_neo2']
neo = NEOCluster(databases, port_base=20000,
master_node_count=2,
partitions=10, replicas=1,
)
neo.setupDB()
neo.start()
# populate the cluster then check the databases
self.__setup(storage_number=2, replicas=1)
self.__populate()
self.__checkReplicationDone()
# populate the cluster and check
self.populate(neo)
self.__checkReplicationDone(neo, databases)
def testNewNodesInPendingState(self):
def testReplicationWithNewStorage(self):
# start with the first storage
processes = self.__setup(storage_number=3, replicas=1, pending_number=2)
started, stopped = processes
self.__expectRunning(started[0])
self.neo.expectClusterRunning()
# create a neo cluster
databases = ['test_neo1', 'test_neo2']
neo = NEOCluster(databases, port_base=20000,
master_node_count=2,
partitions=10, replicas=1,
)
neo.setupDB()
# start the second then the third
stopped[0].start()
self.__expectPending(stopped[0])
self.neo.expectClusterRunning()
stopped[1].start()
self.__expectPending(stopped[1])
self.neo.expectClusterRunning()
def testReplicationWithNewStorage(self):
# populate one storage
new_storage = neo.getStorageProcessList()[-1]
neo.start(except_storages=[new_storage])
self.populate(neo)
processes = self.__setup(storage_number=2, replicas=1, pending_number=1)
started, stopped = processes
self.__populate()
self.neo.expectClusterRunning()
# start the second
new_storage.start()
neo.expectStorageState(new_storage.getUUID(), protocol.PENDING_STATE)
stopped[0].start()
self.__expectPending(stopped[0])
self.neo.expectClusterRunning()
# add it to the partition table
neo.neoctl.enableStorageList([new_storage.getUUID()])
neo.expectStorageState(new_storage.getUUID(), protocol.RUNNING_STATE)
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.__expectRunning(stopped[0])
self.neo.expectClusterRunning()
# wait for replication to finish then check
self.__checkReplicationDone(neo, databases)
self.__checkReplicationDone()
self.neo.expectClusterRunning()
def testOudatedCellsOnDownStorage(self):
# populate the two storages
(started, _) = self.__setup(storage_number=2, replicas=1)
self.__populate()
self.__checkReplicationDone()
self.neo.expectClusterRunning()
# stop one storage and check outdated cells
started[0].stop()
self.neo.expectOudatedCells(number=10)
self.neo.expectClusterRunning()
def testVerificationTriggered(self):
# start neo with one storages
(started, _) = self.__setup(replicas=0, storage_number=1)
self.__expectRunning(started[0])
# stop it, the cluster must switch to verification
started[0].stop()
self.__expectUnavailable(started[0])
self.neo.expectClusterVeryfing()
# restart it, the cluster must come back to running state
started[0].start()
self.__expectRunning(started[0])
self.neo.expectClusterRunning()
def testSequentialStorageKill(self):
# start neo with three storages / two replicas
(started, _) = self.__setup(replicas=2, storage_number=3, partitions=10)
self.__expectRunning(started[0])
self.__expectRunning(started[1])
self.__expectRunning(started[2])
self.neo.expectOudatedCells(number=0)
self.neo.expectClusterRunning()
# stop one storage, cluster must remains running
started[0].stop()
self.__expectUnavailable(started[0])
self.__expectRunning(started[1])
self.__expectRunning(started[2])
self.neo.expectOudatedCells(number=10)
self.neo.expectClusterRunning()
# stop a second storage, cluster is still running
started[1].stop()
self.__expectUnavailable(started[0])
self.__expectUnavailable(started[1])
self.__expectRunning(started[2])
self.neo.expectOudatedCells(number=20)
self.neo.expectClusterRunning()
# stop the last, cluster died
started[2].stop()
self.__expectUnavailable(started[0])
self.__expectUnavailable(started[1])
self.__expectUnavailable(started[2])
self.neo.expectOudatedCells(number=20)
self.neo.expectClusterVeryfing()
if __name__ == "__main__":
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment