Commit 6412b121 authored by Neil Schemenauer's avatar Neil Schemenauer

Remove a number of tests that differ only in input data size. It seems

no bug motivated their inclusion and the chance of them triggering a
problem seems unlikely.  Refactor to reduce code duplication.  Rename
'hamlet_scene' to 'HAMLET_SCENE'.  Test is much faster now.  Closes #960995.
parent 204d7861
...@@ -74,25 +74,12 @@ class ExceptionTestCase(unittest.TestCase): ...@@ -74,25 +74,12 @@ class ExceptionTestCase(unittest.TestCase):
class CompressTestCase(unittest.TestCase): class CompressTestCase(unittest.TestCase):
# Test compression in one go (whole message compression) # Test compression in one go (whole message compression)
def test_speech(self): def test_speech(self):
# decompress(compress(data)) better be data x = zlib.compress(HAMLET_SCENE)
x = zlib.compress(hamlet_scene) self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
self.assertEqual(zlib.decompress(x), hamlet_scene)
def test_speech8(self):
# decompress(compress(data)) better be data -- more compression chances
data = hamlet_scene * 8
x = zlib.compress(data)
self.assertEqual(zlib.decompress(x), data)
def test_speech16(self):
# decompress(compress(data)) better be data -- more compression chances
data = hamlet_scene * 16
x = zlib.compress(data)
self.assertEqual(zlib.decompress(x), data)
def test_speech128(self): def test_speech128(self):
# decompress(compress(data)) better be data -- more compression chances # compress more data
data = hamlet_scene * 8 * 16 data = HAMLET_SCENE * 128
x = zlib.compress(data) x = zlib.compress(data)
self.assertEqual(zlib.decompress(x), data) self.assertEqual(zlib.decompress(x), data)
...@@ -101,22 +88,10 @@ class CompressTestCase(unittest.TestCase): ...@@ -101,22 +88,10 @@ class CompressTestCase(unittest.TestCase):
class CompressObjectTestCase(unittest.TestCase): class CompressObjectTestCase(unittest.TestCase):
# Test compression object # Test compression object
def test_pairsmall(self):
# use compress object in straightforward manner, decompress w/ object
data = hamlet_scene
co = zlib.compressobj()
x1 = co.compress(data)
x2 = co.flush()
self.assertRaises(zlib.error, co.flush) # second flush should not work
dco = zlib.decompressobj()
y1 = dco.decompress(x1 + x2)
y2 = dco.flush()
self.assertEqual(data, y1 + y2)
def test_pair(self): def test_pair(self):
# straightforward compress/decompress objects, more compression # straightforward compress/decompress objects
data = hamlet_scene * 8 * 16 data = HAMLET_SCENE * 128
co = zlib.compressobj(zlib.Z_BEST_COMPRESSION, zlib.DEFLATED) co = zlib.compressobj()
x1 = co.compress(data) x1 = co.compress(data)
x2 = co.flush() x2 = co.flush()
self.assertRaises(zlib.error, co.flush) # second flush should not work self.assertRaises(zlib.error, co.flush) # second flush should not work
...@@ -133,16 +108,16 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -133,16 +108,16 @@ class CompressObjectTestCase(unittest.TestCase):
memlevel = 9 memlevel = 9
strategy = zlib.Z_FILTERED strategy = zlib.Z_FILTERED
co = zlib.compressobj(level, method, wbits, memlevel, strategy) co = zlib.compressobj(level, method, wbits, memlevel, strategy)
x1 = co.compress(hamlet_scene) x1 = co.compress(HAMLET_SCENE)
x2 = co.flush() x2 = co.flush()
dco = zlib.decompressobj(wbits) dco = zlib.decompressobj(wbits)
y1 = dco.decompress(x1 + x2) y1 = dco.decompress(x1 + x2)
y2 = dco.flush() y2 = dco.flush()
self.assertEqual(hamlet_scene, y1 + y2) self.assertEqual(HAMLET_SCENE, y1 + y2)
def test_compressincremental(self): def test_compressincremental(self):
# compress object in steps, decompress object as one-shot # compress object in steps, decompress object as one-shot
data = hamlet_scene * 8 * 16 data = HAMLET_SCENE * 128
co = zlib.compressobj() co = zlib.compressobj()
bufs = [] bufs = []
for i in range(0, len(data), 256): for i in range(0, len(data), 256):
...@@ -155,13 +130,14 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -155,13 +130,14 @@ class CompressObjectTestCase(unittest.TestCase):
y2 = dco.flush() y2 = dco.flush()
self.assertEqual(data, y1 + y2) self.assertEqual(data, y1 + y2)
def test_decompressincremental(self): def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
# compress object in steps, decompress object in steps # compress object in steps, decompress object in steps
data = hamlet_scene * 8 * 16 source = source or HAMLET_SCENE
data = source * 128
co = zlib.compressobj() co = zlib.compressobj()
bufs = [] bufs = []
for i in range(0, len(data), 256): for i in range(0, len(data), cx):
bufs.append(co.compress(data[i:i+256])) bufs.append(co.compress(data[i:i+cx]))
bufs.append(co.flush()) bufs.append(co.flush())
combuf = ''.join(bufs) combuf = ''.join(bufs)
...@@ -169,100 +145,38 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -169,100 +145,38 @@ class CompressObjectTestCase(unittest.TestCase):
dco = zlib.decompressobj() dco = zlib.decompressobj()
bufs = [] bufs = []
for i in range(0, len(combuf), 128): for i in range(0, len(combuf), dcx):
bufs.append(dco.decompress(combuf[i:i+128])) bufs.append(dco.decompress(combuf[i:i+dcx]))
self.assertEqual('', dco.unconsumed_tail, ######## self.assertEqual('', dco.unconsumed_tail, ########
"(A) uct should be '': not %d long" % "(A) uct should be '': not %d long" %
len(dco.unconsumed_tail)) len(dco.unconsumed_tail))
bufs.append(dco.flush()) if flush:
bufs.append(dco.flush())
else:
while True:
chunk = dco.decompress('')
if chunk:
bufs.append(chunk)
else:
break
self.assertEqual('', dco.unconsumed_tail, ######## self.assertEqual('', dco.unconsumed_tail, ########
"(B) uct should be '': not %d long" % "(B) uct should be '': not %d long" %
len(dco.unconsumed_tail)) len(dco.unconsumed_tail))
self.assertEqual(data, ''.join(bufs)) self.assertEqual(data, ''.join(bufs))
# Failure means: "decompressobj with init options failed" # Failure means: "decompressobj with init options failed"
def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): def test_decompincflush(self):
# compress object in steps, decompress object in steps, loop sizes self.test_decompinc(flush=True)
source = source or hamlet_scene
for reps in sizes:
data = source * reps
co = zlib.compressobj()
bufs = []
for i in range(0, len(data), cx):
bufs.append(co.compress(data[i:i+cx]))
bufs.append(co.flush())
combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf))
dco = zlib.decompressobj() def test_decompimax(self, source=None, cx=256, dcx=64):
bufs = [] # compress in steps, decompress in length-restricted steps
for i in range(0, len(combuf), dcx): source = source or HAMLET_SCENE
bufs.append(dco.decompress(combuf[i:i+dcx]))
self.assertEqual('', dco.unconsumed_tail, ########
"(A) uct should be '': not %d long" %
len(dco.unconsumed_tail))
if flush:
bufs.append(dco.flush())
else:
while True:
chunk = dco.decompress('')
if chunk:
bufs.append(chunk)
else:
break
self.assertEqual('', dco.unconsumed_tail, ########
"(B) uct should be '': not %d long" %
len(dco.unconsumed_tail))
self.assertEqual(data, ''.join(bufs))
# Failure means: "decompressobj with init options failed"
def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
# compress in steps, decompress in length-restricted steps, loop sizes
source = source or hamlet_scene
for reps in sizes:
# Check a decompression object with max_length specified
data = source * reps
co = zlib.compressobj()
bufs = []
for i in range(0, len(data), cx):
bufs.append(co.compress(data[i:i+cx]))
bufs.append(co.flush())
combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf),
'compressed data failure')
dco = zlib.decompressobj()
bufs = []
cb = combuf
while cb:
#max_length = 1 + len(cb)//10
chunk = dco.decompress(cb, dcx)
self.failIf(len(chunk) > dcx,
'chunk too big (%d>%d)' % (len(chunk), dcx))
bufs.append(chunk)
cb = dco.unconsumed_tail
if flush:
bufs.append(dco.flush())
else:
while True:
chunk = dco.decompress('', dcx)
self.failIf(len(chunk) > dcx,
'chunk too big in tail (%d>%d)' % (len(chunk), dcx))
if chunk:
bufs.append(chunk)
else:
break
self.assertEqual(len(data), len(''.join(bufs)))
self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
def test_decompressmaxlen(self):
# Check a decompression object with max_length specified # Check a decompression object with max_length specified
data = hamlet_scene * 8 * 16 data = source * 128
co = zlib.compressobj() co = zlib.compressobj()
bufs = [] bufs = []
for i in range(0, len(data), 256): for i in range(0, len(data), cx):
bufs.append(co.compress(data[i:i+256])) bufs.append(co.compress(data[i:i+cx]))
bufs.append(co.flush()) bufs.append(co.flush())
combuf = ''.join(bufs) combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf), self.assertEqual(data, zlib.decompress(combuf),
...@@ -272,20 +186,18 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -272,20 +186,18 @@ class CompressObjectTestCase(unittest.TestCase):
bufs = [] bufs = []
cb = combuf cb = combuf
while cb: while cb:
max_length = 1 + len(cb)//10 #max_length = 1 + len(cb)//10
chunk = dco.decompress(cb, max_length) chunk = dco.decompress(cb, dcx)
self.failIf(len(chunk) > max_length, self.failIf(len(chunk) > dcx,
'chunk too big (%d>%d)' % (len(chunk),max_length)) 'chunk too big (%d>%d)' % (len(chunk), dcx))
bufs.append(chunk) bufs.append(chunk)
cb = dco.unconsumed_tail cb = dco.unconsumed_tail
bufs.append(dco.flush()) bufs.append(dco.flush())
self.assertEqual(len(data), len(''.join(bufs)))
self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
def test_decompressmaxlenflushless(self): def test_decompressmaxlen(self, flush=False):
# identical to test_decompressmaxlen except flush is replaced # Check a decompression object with max_length specified
# with an equivalent. This works and other fails on (eg) 2.2.2 data = HAMLET_SCENE * 128
data = hamlet_scene * 8 * 16
co = zlib.compressobj() co = zlib.compressobj()
bufs = [] bufs = []
for i in range(0, len(data), 256): for i in range(0, len(data), 256):
...@@ -293,7 +205,7 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -293,7 +205,7 @@ class CompressObjectTestCase(unittest.TestCase):
bufs.append(co.flush()) bufs.append(co.flush())
combuf = ''.join(bufs) combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf), self.assertEqual(data, zlib.decompress(combuf),
'compressed data mismatch') 'compressed data failure')
dco = zlib.decompressobj() dco = zlib.decompressobj()
bufs = [] bufs = []
...@@ -305,16 +217,19 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -305,16 +217,19 @@ class CompressObjectTestCase(unittest.TestCase):
'chunk too big (%d>%d)' % (len(chunk),max_length)) 'chunk too big (%d>%d)' % (len(chunk),max_length))
bufs.append(chunk) bufs.append(chunk)
cb = dco.unconsumed_tail cb = dco.unconsumed_tail
if flush:
#bufs.append(dco.flush()) bufs.append(dco.flush())
while len(chunk): else:
chunk = dco.decompress('', max_length) while chunk:
self.failIf(len(chunk) > max_length, chunk = dco.decompress('', max_length)
'chunk too big (%d>%d)' % (len(chunk),max_length)) self.failIf(len(chunk) > max_length,
bufs.append(chunk) 'chunk too big (%d>%d)' % (len(chunk),max_length))
bufs.append(chunk)
self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
def test_decompressmaxlenflush(self):
self.test_decompressmaxlen(flush=True)
def test_maxlenmisc(self): def test_maxlenmisc(self):
# Misc tests of max_length # Misc tests of max_length
dco = zlib.decompressobj() dco = zlib.decompressobj()
...@@ -327,7 +242,7 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -327,7 +242,7 @@ class CompressObjectTestCase(unittest.TestCase):
sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
sync_opt = [getattr(zlib, opt) for opt in sync_opt sync_opt = [getattr(zlib, opt) for opt in sync_opt
if hasattr(zlib, opt)] if hasattr(zlib, opt)]
data = hamlet_scene * 8 data = HAMLET_SCENE * 8
for sync in sync_opt: for sync in sync_opt:
for level in range(10): for level in range(10):
...@@ -349,7 +264,7 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -349,7 +264,7 @@ class CompressObjectTestCase(unittest.TestCase):
# Testing on 17K of "random" data # Testing on 17K of "random" data
# Create compressor and decompressor objects # Create compressor and decompressor objects
co = zlib.compressobj(9) co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
dco = zlib.decompressobj() dco = zlib.decompressobj()
# Try 17K of data # Try 17K of data
...@@ -375,23 +290,6 @@ class CompressObjectTestCase(unittest.TestCase): ...@@ -375,23 +290,6 @@ class CompressObjectTestCase(unittest.TestCase):
# if decompressed data is different from the input data, choke. # if decompressed data is different from the input data, choke.
self.assertEqual(expanded, data, "17K random source doesn't match") self.assertEqual(expanded, data, "17K random source doesn't match")
def test_manydecompinc(self):
# Run incremental decompress test for a large range of sizes
self.test_decompinc(sizes=[1<<n for n in range(8)],
flush=True, cx=32, dcx=4)
def test_manydecompimax(self):
# Run incremental decompress maxlen test for a large range of sizes
# avoid the flush bug
self.test_decompimax(sizes=[1<<n for n in range(8)],
flush=False, cx=32, dcx=4)
def test_manydecompimaxflush(self):
# Run incremental decompress maxlen test for a large range of sizes
# avoid the flush bug
self.test_decompimax(sizes=[1<<n for n in range(8)],
flush=True, cx=32, dcx=4)
def genblock(seed, length, step=1024, generator=random): def genblock(seed, length, step=1024, generator=random):
"""length-byte stream of random data from a seed (in step-byte blocks).""" """length-byte stream of random data from a seed (in step-byte blocks)."""
...@@ -417,7 +315,7 @@ def choose_lines(source, number, seed=None, generator=random): ...@@ -417,7 +315,7 @@ def choose_lines(source, number, seed=None, generator=random):
hamlet_scene = """ HAMLET_SCENE = """
LAERTES LAERTES
O, fear me not. O, fear me not.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment