Commit e860404e authored by Nadeem Vawda's avatar Nadeem Vawda

Add a function lzma.open(), to match gzip.open() and bz2.open().

parent 6cbb20cd
...@@ -29,6 +29,35 @@ from multiple threads, it is necessary to protect it with a lock. ...@@ -29,6 +29,35 @@ from multiple threads, it is necessary to protect it with a lock.
Reading and writing compressed files Reading and writing compressed files
------------------------------------ ------------------------------------
.. function:: open(filename, mode="rb", \*, format=None, check=-1, preset=None, filters=None, encoding=None, errors=None, newline=None)
Open an LZMA-compressed file in binary or text mode, returning a :term:`file
object`.
The *filename* argument can be either an actual file name (given as a
:class:`str` or :class:`bytes` object), in which case the named file is
opened, or it can be an existing file object to read from or write to.
The *mode* argument can be any of ``"r"``, ``"rb"``, ``"w"``, ``"wb"``,
``"a"`` or ``"ab"`` for binary mode, or ``"rt"``, ``"wt"``, or ``"at"`` for
text mode. The default is ``"rb"``.
When opening a file for reading, the *format* and *filters* arguments have
the same meanings as for :class:`LZMADecompressor`. In this case, the *check*
and *preset* arguments should not be used.
When opening a file for writing, the *format*, *check*, *preset* and
*filters* arguments have the same meanings as for :class:`LZMACompressor`.
For binary mode, this function is equivalent to the :class:`LZMAFile`
constructor: ``LZMAFile(filename, mode, ...)``. In this case, the *encoding*,
*errors* and *newline* arguments must not be provided.
For text mode, a :class:`LZMAFile` object is created, and wrapped in an
:class:`io.TextIOWrapper` instance with the specified encoding, error
handling behavior, and line ending(s).
.. class:: LZMAFile(filename=None, mode="r", \*, format=None, check=-1, preset=None, filters=None) .. class:: LZMAFile(filename=None, mode="r", \*, format=None, check=-1, preset=None, filters=None)
Open an LZMA-compressed file in binary mode. Open an LZMA-compressed file in binary mode.
......
...@@ -18,10 +18,11 @@ __all__ = [ ...@@ -18,10 +18,11 @@ __all__ = [
"MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME", "MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME",
"LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError", "LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError",
"compress", "decompress", "is_check_supported", "open", "compress", "decompress", "is_check_supported",
"encode_filter_properties", "decode_filter_properties", "encode_filter_properties", "decode_filter_properties",
] ]
import builtins
import io import io
from _lzma import * from _lzma import *
...@@ -122,7 +123,7 @@ class LZMAFile(io.BufferedIOBase): ...@@ -122,7 +123,7 @@ class LZMAFile(io.BufferedIOBase):
if isinstance(filename, (str, bytes)): if isinstance(filename, (str, bytes)):
if "b" not in mode: if "b" not in mode:
mode += "b" mode += "b"
self._fp = open(filename, mode) self._fp = builtins.open(filename, mode)
self._closefp = True self._closefp = True
self._mode = mode_code self._mode = mode_code
elif hasattr(filename, "read") or hasattr(filename, "write"): elif hasattr(filename, "read") or hasattr(filename, "write"):
...@@ -370,6 +371,51 @@ class LZMAFile(io.BufferedIOBase): ...@@ -370,6 +371,51 @@ class LZMAFile(io.BufferedIOBase):
return self._pos return self._pos
def open(filename, mode="rb", *,
format=None, check=-1, preset=None, filters=None,
encoding=None, errors=None, newline=None):
"""Open an LZMA-compressed file in binary or text mode.
filename can be either an actual file name (given as a str or bytes object),
in which case the named file is opened, or it can be an existing file object
to read from or write to.
The mode argument can be "r", "rb" (default), "w", "wb", "a", or "ab" for
binary mode, or "rt", "wt" or "at" for text mode.
The format, check, preset and filters arguments specify the compression
settings, as for LZMACompressor, LZMADecompressor and LZMAFile.
For binary mode, this function is equivalent to the LZMAFile constructor:
LZMAFile(filename, mode, ...). In this case, the encoding, errors and
newline arguments must not be provided.
For text mode, a LZMAFile object is created, and wrapped in an
io.TextIOWrapper instance with the specified encoding, error handling
behavior, and line ending(s).
"""
if "t" in mode:
if "b" in mode:
raise ValueError("Invalid mode: %r" % (mode,))
else:
if encoding is not None:
raise ValueError("Argument 'encoding' not supported in binary mode")
if errors is not None:
raise ValueError("Argument 'errors' not supported in binary mode")
if newline is not None:
raise ValueError("Argument 'newline' not supported in binary mode")
lz_mode = mode.replace("t", "")
binary_file = LZMAFile(filename, lz_mode, format=format, check=check,
preset=preset, filters=filters)
if "t" in mode:
return io.TextIOWrapper(binary_file, encoding, errors, newline)
else:
return binary_file
def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None): def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None):
"""Compress a block of data. """Compress a block of data.
......
...@@ -935,6 +935,106 @@ class FileTestCase(unittest.TestCase): ...@@ -935,6 +935,106 @@ class FileTestCase(unittest.TestCase):
self.assertRaises(ValueError, f.tell) self.assertRaises(ValueError, f.tell)
class OpenTestCase(unittest.TestCase):
def test_binary_modes(self):
with lzma.open(BytesIO(COMPRESSED_XZ), "rb") as f:
self.assertEqual(f.read(), INPUT)
with BytesIO() as bio:
with lzma.open(bio, "wb") as f:
f.write(INPUT)
file_data = lzma.decompress(bio.getvalue())
self.assertEqual(file_data, INPUT)
with lzma.open(bio, "ab") as f:
f.write(INPUT)
file_data = lzma.decompress(bio.getvalue())
self.assertEqual(file_data, INPUT * 2)
def test_text_modes(self):
uncompressed = INPUT.decode("ascii")
uncompressed_raw = uncompressed.replace("\n", os.linesep)
with lzma.open(BytesIO(COMPRESSED_XZ), "rt") as f:
self.assertEqual(f.read(), uncompressed)
with BytesIO() as bio:
with lzma.open(bio, "wt") as f:
f.write(uncompressed)
file_data = lzma.decompress(bio.getvalue()).decode("ascii")
self.assertEqual(file_data, uncompressed_raw)
with lzma.open(bio, "at") as f:
f.write(uncompressed)
file_data = lzma.decompress(bio.getvalue()).decode("ascii")
self.assertEqual(file_data, uncompressed_raw * 2)
def test_filename(self):
with TempFile(TESTFN):
with lzma.open(TESTFN, "wb") as f:
f.write(INPUT)
with open(TESTFN, "rb") as f:
file_data = lzma.decompress(f.read())
self.assertEqual(file_data, INPUT)
with lzma.open(TESTFN, "rb") as f:
self.assertEqual(f.read(), INPUT)
with lzma.open(TESTFN, "ab") as f:
f.write(INPUT)
with lzma.open(TESTFN, "rb") as f:
self.assertEqual(f.read(), INPUT * 2)
def test_bad_params(self):
# Test invalid parameter combinations.
with self.assertRaises(ValueError):
lzma.open(TESTFN, "")
with self.assertRaises(ValueError):
lzma.open(TESTFN, "x")
with self.assertRaises(ValueError):
lzma.open(TESTFN, "rbt")
with self.assertRaises(ValueError):
lzma.open(TESTFN, "rb", encoding="utf-8")
with self.assertRaises(ValueError):
lzma.open(TESTFN, "rb", errors="ignore")
with self.assertRaises(ValueError):
lzma.open(TESTFN, "rb", newline="\n")
def test_format_and_filters(self):
# Test non-default format and filter chain.
options = {"format": lzma.FORMAT_RAW, "filters": FILTERS_RAW_1}
with lzma.open(BytesIO(COMPRESSED_RAW_1), "rb", **options) as f:
self.assertEqual(f.read(), INPUT)
with BytesIO() as bio:
with lzma.open(bio, "wb", **options) as f:
f.write(INPUT)
file_data = lzma.decompress(bio.getvalue(), **options)
self.assertEqual(file_data, INPUT)
def test_encoding(self):
# Test non-default encoding.
uncompressed = INPUT.decode("ascii")
uncompressed_raw = uncompressed.replace("\n", os.linesep)
with BytesIO() as bio:
with lzma.open(bio, "wt", encoding="utf-16-le") as f:
f.write(uncompressed)
file_data = lzma.decompress(bio.getvalue()).decode("utf-16-le")
self.assertEqual(file_data, uncompressed_raw)
bio.seek(0)
with lzma.open(bio, "rt", encoding="utf-16-le") as f:
self.assertEqual(f.read(), uncompressed)
def test_encoding_error_handler(self):
# Test wih non-default encoding error handler.
with BytesIO(lzma.compress(b"foo\xffbar")) as bio:
with lzma.open(bio, "rt", encoding="ascii", errors="ignore") as f:
self.assertEqual(f.read(), "foobar")
def test_newline(self):
# Test with explicit newline (universal newline mode disabled).
text = INPUT.decode("ascii")
with BytesIO() as bio:
with lzma.open(bio, "wt", newline="\n") as f:
f.write(text)
bio.seek(0)
with lzma.open(bio, "rt", newline="\r") as f:
self.assertEqual(f.readlines(), [text])
class MiscellaneousTestCase(unittest.TestCase): class MiscellaneousTestCase(unittest.TestCase):
def test_is_check_supported(self): def test_is_check_supported(self):
...@@ -1385,6 +1485,7 @@ def test_main(): ...@@ -1385,6 +1485,7 @@ def test_main():
CompressorDecompressorTestCase, CompressorDecompressorTestCase,
CompressDecompressFunctionTestCase, CompressDecompressFunctionTestCase,
FileTestCase, FileTestCase,
OpenTestCase,
MiscellaneousTestCase, MiscellaneousTestCase,
) )
......
...@@ -17,8 +17,8 @@ Library ...@@ -17,8 +17,8 @@ Library
- LZMAFile now accepts the modes "rb"/"wb"/"ab" as synonyms of "r"/"w"/"a". - LZMAFile now accepts the modes "rb"/"wb"/"ab" as synonyms of "r"/"w"/"a".
- The bz2 module now contains an open() function, allowing compressed files to - The bz2 and lzma modules now each contain an open() function, allowing
conveniently be opened in text mode as well as binary mode. compressed files to readily be opened in text mode as well as binary mode.
- BZ2File.__init__() and LZMAFile.__init__() now accept a file object as their - BZ2File.__init__() and LZMAFile.__init__() now accept a file object as their
first argument, rather than requiring a separate "fileobj" argument. first argument, rather than requiring a separate "fileobj" argument.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment