Commit b9b47dd3 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add a second test for storage replication, factorise the first and add an helper

in NeoCluster class.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1201 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ef9fcb16
......@@ -412,6 +412,16 @@ class NEOCluster(object):
return uuid is None or uuid == current_try, current_try
self.expectCondition(callback, timeout, delay)
def expectNoOudatedCells(self, timeout=0, delay=1):
def callback(last_try):
row_list = self.neoctl.getPartitionRowList()[1]
for row in row_list:
for cell in row[1]:
if cell[1] != protocol.UP_TO_DATE_STATE:
return False, last_try
return True, last_try
self.expectCondition(callback, timeout, delay)
def expectClusterState(self, state, timeout=0, delay=1):
def callback(last_try):
current_try = self.neoctl.getClusterState()
......
......@@ -15,18 +15,15 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os
import time
import ZODB
import MySQLdb
import unittest
import tempfile
import transaction
from ZODB.FileStorage import FileStorage
from Persistence import Persistent
from neo.tests.functional import NEOCluster
from neo.client.Storage import Storage as NEOStorage
from neo import protocol
class PObject(Persistent):
......@@ -34,11 +31,12 @@ class PObject(Persistent):
self.value = value
class ImportExportTests(unittest.TestCase):
OBJECT_NUMBER = 100
class StorageTests(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix='neo_import_export_')
print "using the temp directory %s" % self.temp_dir
pass
def tearDown(self):
pass
......@@ -47,11 +45,53 @@ class ImportExportTests(unittest.TestCase):
db.query(query)
result = db.store_result().fetch_row()[0][0]
return result
def populate(self, storage):
db = ZODB.DB(storage=storage)
conn = db.open()
root = conn.root()
for i in xrange(OBJECT_NUMBER):
root[i] = PObject(i)
transaction.commit()
conn.close()
storage.close()
def getNeoStorage(self, neo):
return NEOStorage(master_nodes=neo.master_nodes,
connector='SocketConnector',
name=neo.cluster_name,
)
def checkDatabase(self, neo, db_name):
db = MySQLdb.connect(db=db_name, user='test')
# wait for the sql transaction to be commited
def callback(last_try):
object_number = self.queryCount(db, 'select count(*) from obj')
return object_number == OBJECT_NUMBER + 2, last_try
neo.expectCondition(callback, 0, 1)
# no more temporarily objects
t_objects = self.queryCount(db, 'select count(*) from tobj')
self.assertEqual(t_objects, 0)
# One revision per object and two for the root, before and after
revisions = self.queryCount(db, 'select count(*) from obj')
self.assertEqual(revisions, OBJECT_NUMBER + 2)
# One object more for the root
query = 'select count(*) from (select * from obj group by oid) as t'
objects = self.queryCount(db, query)
self.assertEqual(objects, OBJECT_NUMBER + 1)
def __checkReplicationDone(self, neo, databases):
# wait for replication to finish
neo.expectNoOudatedCells(timeout=10)
# check databases
for db_name in databases:
self.checkDatabase(neo, db_name)
# check storages state
self.assertEqual(len(neo.getStorageNodeList(protocol.RUNNING_STATE)), 2)
def testReplicationWithoutBreak(self):
OBJECT_NUMBER = 100
# create a neo cluster
databases = ['test_neo1', 'test_neo2']
neo = NEOCluster(databases, port_base=20000,
......@@ -61,34 +101,35 @@ class ImportExportTests(unittest.TestCase):
neo.setupDB()
neo.start()
# create a neo storage
args = {'connector': 'SocketConnector', 'name': neo.cluster_name}
storage = NEOStorage(master_nodes=neo.master_nodes, **args)
db = ZODB.DB(storage=storage)
# populate the cluster
conn = db.open()
root = conn.root()
for i in xrange(OBJECT_NUMBER):
root[i] = PObject(i)
transaction.commit()
conn.close()
storage.close()
# XXX: replace this sleep by a callback as done in testMaster
time.sleep(1)
# populate the cluster and check
self.populate(self.getNeoStorage(neo))
self.__checkReplicationDone(neo, databases)
# check databases
for index, db_name in enumerate(databases):
db = MySQLdb.connect(db=db_name, user='test')
# One revision per object and two for the root, before and after
revisions = self.queryCount(db, 'select count(*) from obj')
self.assertEqual(revisions, OBJECT_NUMBER + 2)
# One object more for the root
query = 'select count(*) from (select * from obj group by oid) as t'
objects = self.queryCount(db, query)
self.assertEqual(objects, OBJECT_NUMBER + 1)
def testReplicationWithNewStorage(self):
# create a neo cluster
databases = ['test_neo1', 'test_neo2']
neo = NEOCluster(databases, port_base=20000,
master_node_count=2,
partitions=10, replicas=1,
)
neo.setupDB()
# populate one storage
new_storage = neo.getStorageProcessList()[-1]
neo.start(except_storages=[new_storage])
self.populate(self.getNeoStorage(neo))
# start the second
new_storage.start()
neo.expectStorageState(new_storage.getUUID(), protocol.PENDING_STATE)
# add it to the partition table
neo.neoctl.enableStorageList([new_storage.getUUID()])
neo.expectStorageState(new_storage.getUUID(), protocol.RUNNING_STATE)
# wait for replication to finish then check
self.__checkReplicationDone(neo, databases)
if __name__ == "__main__":
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment