Commit 656c1ae4 authored by Jeremy Hylton's avatar Jeremy Hylton

Add two tests for corrupted .index file

parent 2e5136c1
"""Do some minimal tests of data corruption"""
import os
import random
import stat
import tempfile
import unittest
import ZODB, ZODB.FileStorage
from StorageTestBase import StorageTestBase
class FileStorageCorruptTests(StorageTestBase):
__super_setUp = StorageTestBase.setUp
__super_tearDown = StorageTestBase.tearDown
def setUp(self):
self.path = tempfile.mktemp()
self._storage = ZODB.FileStorage.FileStorage(self.path, create=1)
self.__super_setUp()
def tearDown(self):
self.__super_tearDown()
for ext in '', '.old', '.tmp', '.lock', '.index':
path = self.path + ext
if os.path.exists(path):
os.remove(path)
def _do_stores(self):
oids = []
for i in range(5):
oid = self._storage.new_oid()
revid = self._dostore(oid)
oids.append((oid, revid))
return oids
def _check_stores(self, oids):
for oid, revid in oids:
data, s_revid = self._storage.load(oid, '')
self.assertEqual(s_revid, revid)
def checkTruncatedIndex(self):
oids = self._do_stores()
self._close()
# truncation the index file
path = self.path + '.index'
self.assert_(os.path.exists(path))
f = open(path, 'r+')
f.seek(0, 2)
size = f.tell()
f.seek(size / 2)
f.truncate()
f.close()
self._storage = ZODB.FileStorage.FileStorage(self.path)
self._check_stores(oids)
def checkCorruptedIndex(self):
oids = self._do_stores()
self._close()
# truncation the index file
path = self.path + '.index'
self.assert_(os.path.exists(path))
size = os.stat(path)[stat.ST_SIZE]
f = open(path, 'r+')
while f.tell() < size:
f.seek(random.randrange(1, size / 10), 1)
f.write('\000')
f.close()
self._storage = ZODB.FileStorage.FileStorage(self.path)
self._check_stores(oids)
...@@ -10,6 +10,7 @@ import Synchronization ...@@ -10,6 +10,7 @@ import Synchronization
import ConflictResolution import ConflictResolution
import HistoryStorage import HistoryStorage
import IteratorStorage import IteratorStorage
import Corruption
class FileStorageTests( class FileStorageTests(
StorageTestBase.StorageTestBase, StorageTestBase.StorageTestBase,
...@@ -37,7 +38,11 @@ class FileStorageTests( ...@@ -37,7 +38,11 @@ class FileStorageTests(
os.remove(path) os.remove(path)
def test_suite(): def test_suite():
return unittest.makeSuite(FileStorageTests, 'check') suite = unittest.makeSuite(FileStorageTests, 'check')
suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check')
suite.addTest(suite2)
## suite._tests.extend(suite2._tests)
return suite
def main(): def main():
alltests=test_suite() alltests=test_suite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment