Commit de0feb4e authored by Julien Muchembled's avatar Julien Muchembled

qa: more Importer tests

parent 34d0725e
......@@ -476,7 +476,7 @@ class NEOCluster(object):
return True, None
self.expectCondition(start)
def stop(self, clients=True):
def stop(self, clients=True, ignore_errors=False):
# Suspend all processes to kill before actually killing them, so that
# nodes don't log errors because they get disconnected from other nodes:
# otherwise, storage nodes would often flush MB of logs just because we
......@@ -503,7 +503,7 @@ class NEOCluster(object):
zodb_storage.close()
self.zodb_storage_list = []
time.sleep(0.5)
if error_list:
if error_list and not ignore_errors:
raise NodeProcessError('\n'.join(error_list))
def waitAll(self):
......
......@@ -46,16 +46,18 @@ class ZODBTestCase(TestCase):
super(ZODBTestCase, self).__init__(methodName)
test = getattr(self, methodName).__func__
def runTest():
failed = True
try:
self.neo.start()
self.open()
test(self)
if not functional:
orphan = self.neo.storage.dm.getOrphanList()
failed = False
finally:
self.close()
if functional:
self.neo.stop()
self.neo.stop(ignore_errors=failed)
else:
self.neo.stop(None)
if functional:
......
......@@ -14,17 +14,66 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import os, unittest
from functools import partial, wraps
from ZODB.DB import DB
from ZODB.FileStorage import FileStorage
from ZODB.tests import testZODB
import ZODB
from neo.storage import database as database_module
from neo.storage.database.importer import ImporterDatabaseManager
from .. import expectedFailure, getTempDirectory, Patch
from . import ZODBTestCase
class NEOZODBTests(ZODBTestCase, testZODB.ZODBTests):
def open(self):
self._open(_db=ZODB.DB(self.neo.getZODBStorage()))
self._open(_db=DB(self.neo.getZODBStorage()))
class DummyImporter(ImporterDatabaseManager):
compress = True
def __init__(self, zodb, getAdapterKlass, name,
database, engine=None, wait=None):
self._conf = conf = {
'adapter': name,
'database': database,
'engine': engine,
'wait': wait,
}
self.zodb = zodb
with Patch(database_module,
getAdapterKlass=lambda orig, name: getAdapterKlass(name)):
super(DummyImporter, self).__init__(None)
_parse = startJobs = lambda *args, **kw: None
class NEOZODBImporterTests(NEOZODBTests):
@classmethod
def setUpClass(cls):
super(NEOZODBImporterTests, cls).setUpClass()
path = os.path.join(getTempDirectory(),
"%s.%s.fs" % (cls.__module__, cls.__name__))
DB(FileStorage(path)).close()
cls._importer_config = ('root', {'storage': """<filestorage>
path %s
</filestorage>""" % path}),
def run(self, *args, **kw):
with Patch(database_module, getAdapterKlass=lambda *args:
partial(DummyImporter, self._importer_config, *args)):
super(ZODBTestCase, self).run(*args, **kw)
checkMultipleUndoInOneTransaction = expectedFailure(IndexError)(
NEOZODBTests.checkMultipleUndoInOneTransaction)
if __name__ == "__main__":
suite = unittest.makeSuite(NEOZODBTests, 'check')
suite = unittest.TestSuite((
unittest.makeSuite(NEOZODBTests, 'check'),
unittest.makeSuite(NEOZODBImporterTests, 'check'),
))
unittest.main(defaultTest='suite')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment