Commit 47865933 authored by Serhiy Storchaka's avatar Serhiy Storchaka

Issue #28664: test_bz2 now works on non-Windows platforms without bunzip2

(e.g. on Android).
parents dff60387 5f832ae5
...@@ -8,6 +8,7 @@ import pickle ...@@ -8,6 +8,7 @@ import pickle
import glob import glob
import pathlib import pathlib
import random import random
import shutil
import subprocess import subprocess
import sys import sys
from test.support import unlink from test.support import unlink
...@@ -22,6 +23,16 @@ except ImportError: ...@@ -22,6 +23,16 @@ except ImportError:
bz2 = support.import_module('bz2') bz2 = support.import_module('bz2')
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
has_cmdline_bunzip2 = None
def ext_decompress(data):
global has_cmdline_bunzip2
if has_cmdline_bunzip2 is None:
has_cmdline_bunzip2 = bool(shutil.which('bunzip2'))
if has_cmdline_bunzip2:
return subprocess.check_output(['bunzip2'], input=data)
else:
return bz2.decompress(data)
class BaseTest(unittest.TestCase): class BaseTest(unittest.TestCase):
"Base for other testcases." "Base for other testcases."
...@@ -74,24 +85,6 @@ class BaseTest(unittest.TestCase): ...@@ -74,24 +85,6 @@ class BaseTest(unittest.TestCase):
if os.path.isfile(self.filename): if os.path.isfile(self.filename):
os.unlink(self.filename) os.unlink(self.filename)
if sys.platform == "win32":
# bunzip2 isn't available to run on Windows.
def decompress(self, data):
return bz2.decompress(data)
else:
def decompress(self, data):
pop = subprocess.Popen("bunzip2", shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
pop.stdin.write(data)
pop.stdin.close()
ret = pop.stdout.read()
pop.stdout.close()
if pop.wait() != 0:
ret = bz2.decompress(data)
return ret
class BZ2FileTest(BaseTest): class BZ2FileTest(BaseTest):
"Test the BZ2File class." "Test the BZ2File class."
...@@ -252,7 +245,7 @@ class BZ2FileTest(BaseTest): ...@@ -252,7 +245,7 @@ class BZ2FileTest(BaseTest):
self.assertRaises(TypeError, bz2f.write) self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT) bz2f.write(self.TEXT)
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT) self.assertEqual(ext_decompress(f.read()), self.TEXT)
def testWriteChunks10(self): def testWriteChunks10(self):
with BZ2File(self.filename, "w") as bz2f: with BZ2File(self.filename, "w") as bz2f:
...@@ -264,7 +257,7 @@ class BZ2FileTest(BaseTest): ...@@ -264,7 +257,7 @@ class BZ2FileTest(BaseTest):
bz2f.write(str) bz2f.write(str)
n += 1 n += 1
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT) self.assertEqual(ext_decompress(f.read()), self.TEXT)
def testWriteNonDefaultCompressLevel(self): def testWriteNonDefaultCompressLevel(self):
expected = bz2.compress(self.TEXT, compresslevel=5) expected = bz2.compress(self.TEXT, compresslevel=5)
...@@ -281,7 +274,7 @@ class BZ2FileTest(BaseTest): ...@@ -281,7 +274,7 @@ class BZ2FileTest(BaseTest):
# should raise an exception. # should raise an exception.
self.assertRaises(ValueError, bz2f.writelines, ["a"]) self.assertRaises(ValueError, bz2f.writelines, ["a"])
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT) self.assertEqual(ext_decompress(f.read()), self.TEXT)
def testWriteMethodsOnReadOnlyFile(self): def testWriteMethodsOnReadOnlyFile(self):
with BZ2File(self.filename, "w") as bz2f: with BZ2File(self.filename, "w") as bz2f:
...@@ -299,7 +292,7 @@ class BZ2FileTest(BaseTest): ...@@ -299,7 +292,7 @@ class BZ2FileTest(BaseTest):
self.assertRaises(TypeError, bz2f.write) self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT) bz2f.write(self.TEXT)
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT * 2) self.assertEqual(ext_decompress(f.read()), self.TEXT * 2)
def testSeekForward(self): def testSeekForward(self):
self.createTempFile() self.createTempFile()
...@@ -602,7 +595,7 @@ class BZ2FileTest(BaseTest): ...@@ -602,7 +595,7 @@ class BZ2FileTest(BaseTest):
with BZ2File(bio, "w") as bz2f: with BZ2File(bio, "w") as bz2f:
self.assertRaises(TypeError, bz2f.write) self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT) bz2f.write(self.TEXT)
self.assertEqual(self.decompress(bio.getvalue()), self.TEXT) self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT)
self.assertFalse(bio.closed) self.assertFalse(bio.closed)
def testSeekForwardBytesIO(self): def testSeekForwardBytesIO(self):
...@@ -639,7 +632,7 @@ class BZ2CompressorTest(BaseTest): ...@@ -639,7 +632,7 @@ class BZ2CompressorTest(BaseTest):
self.assertRaises(TypeError, bz2c.compress) self.assertRaises(TypeError, bz2c.compress)
data = bz2c.compress(self.TEXT) data = bz2c.compress(self.TEXT)
data += bz2c.flush() data += bz2c.flush()
self.assertEqual(self.decompress(data), self.TEXT) self.assertEqual(ext_decompress(data), self.TEXT)
def testCompressEmptyString(self): def testCompressEmptyString(self):
bz2c = BZ2Compressor() bz2c = BZ2Compressor()
...@@ -658,7 +651,7 @@ class BZ2CompressorTest(BaseTest): ...@@ -658,7 +651,7 @@ class BZ2CompressorTest(BaseTest):
data += bz2c.compress(str) data += bz2c.compress(str)
n += 1 n += 1
data += bz2c.flush() data += bz2c.flush()
self.assertEqual(self.decompress(data), self.TEXT) self.assertEqual(ext_decompress(data), self.TEXT)
@bigmemtest(size=_4G + 100, memuse=2) @bigmemtest(size=_4G + 100, memuse=2)
def testCompress4G(self, size): def testCompress4G(self, size):
...@@ -838,7 +831,7 @@ class BZ2DecompressorTest(BaseTest): ...@@ -838,7 +831,7 @@ class BZ2DecompressorTest(BaseTest):
class CompressDecompressTest(BaseTest): class CompressDecompressTest(BaseTest):
def testCompress(self): def testCompress(self):
data = bz2.compress(self.TEXT) data = bz2.compress(self.TEXT)
self.assertEqual(self.decompress(data), self.TEXT) self.assertEqual(ext_decompress(data), self.TEXT)
def testCompressEmptyString(self): def testCompressEmptyString(self):
text = bz2.compress(b'') text = bz2.compress(b'')
...@@ -888,14 +881,14 @@ class OpenTest(BaseTest): ...@@ -888,14 +881,14 @@ class OpenTest(BaseTest):
with self.open(self.filename, mode) as f: with self.open(self.filename, mode) as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT) self.assertEqual(file_data, self.TEXT)
with self.open(self.filename, "rb") as f: with self.open(self.filename, "rb") as f:
self.assertEqual(f.read(), self.TEXT) self.assertEqual(f.read(), self.TEXT)
with self.open(self.filename, "ab") as f: with self.open(self.filename, "ab") as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2) self.assertEqual(file_data, self.TEXT * 2)
def test_implicit_binary_modes(self): def test_implicit_binary_modes(self):
...@@ -906,14 +899,14 @@ class OpenTest(BaseTest): ...@@ -906,14 +899,14 @@ class OpenTest(BaseTest):
with self.open(self.filename, mode) as f: with self.open(self.filename, mode) as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT) self.assertEqual(file_data, self.TEXT)
with self.open(self.filename, "r") as f: with self.open(self.filename, "r") as f:
self.assertEqual(f.read(), self.TEXT) self.assertEqual(f.read(), self.TEXT)
with self.open(self.filename, "a") as f: with self.open(self.filename, "a") as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2) self.assertEqual(file_data, self.TEXT * 2)
def test_text_modes(self): def test_text_modes(self):
...@@ -925,14 +918,14 @@ class OpenTest(BaseTest): ...@@ -925,14 +918,14 @@ class OpenTest(BaseTest):
with self.open(self.filename, mode) as f: with self.open(self.filename, mode) as f:
f.write(text) f.write(text)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()).decode("ascii") file_data = ext_decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol) self.assertEqual(file_data, text_native_eol)
with self.open(self.filename, "rt") as f: with self.open(self.filename, "rt") as f:
self.assertEqual(f.read(), text) self.assertEqual(f.read(), text)
with self.open(self.filename, "at") as f: with self.open(self.filename, "at") as f:
f.write(text) f.write(text)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()).decode("ascii") file_data = ext_decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol * 2) self.assertEqual(file_data, text_native_eol * 2)
def test_x_mode(self): def test_x_mode(self):
...@@ -973,7 +966,7 @@ class OpenTest(BaseTest): ...@@ -973,7 +966,7 @@ class OpenTest(BaseTest):
with self.open(self.filename, "wt", encoding="utf-16-le") as f: with self.open(self.filename, "wt", encoding="utf-16-le") as f:
f.write(text) f.write(text)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()).decode("utf-16-le") file_data = ext_decompress(f.read()).decode("utf-16-le")
self.assertEqual(file_data, text_native_eol) self.assertEqual(file_data, text_native_eol)
with self.open(self.filename, "rt", encoding="utf-16-le") as f: with self.open(self.filename, "rt", encoding="utf-16-le") as f:
self.assertEqual(f.read(), text) self.assertEqual(f.read(), text)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment