Commit e0eb5b15 authored by Guido van Rossum's avatar Guido van Rossum

Make test_zipfile pass.

The zipfile module now does all I/O in binary mode using bytes.
(Maybe we should support wrapping a TextIOWrapper around it
when text mode reading is requested?)
Even the password is a bytes array now.
Had to fix py_compile.py to use bytes while I was at it.
The _struct needed a patch to support bytes, str8 and str
for the 's' and 'p' formats.
parent 9c3e7815
...@@ -72,10 +72,10 @@ else: ...@@ -72,10 +72,10 @@ else:
def wr_long(f, x): def wr_long(f, x):
"""Internal; write a 32-bit int to a file in little-endian order.""" """Internal; write a 32-bit int to a file in little-endian order."""
f.write(chr( x & 0xff)) f.write(bytes([x & 0xff,
f.write(chr((x >> 8) & 0xff)) (x >> 8) & 0xff,
f.write(chr((x >> 16) & 0xff)) (x >> 16) & 0xff,
f.write(chr((x >> 24) & 0xff)) (x >> 24) & 0xff]))
def compile(file, cfile=None, dfile=None, doraise=False): def compile(file, cfile=None, dfile=None, doraise=False):
"""Byte-compile one Python source file to Python bytecode. """Byte-compile one Python source file to Python bytecode.
...@@ -133,7 +133,7 @@ def compile(file, cfile=None, dfile=None, doraise=False): ...@@ -133,7 +133,7 @@ def compile(file, cfile=None, dfile=None, doraise=False):
if cfile is None: if cfile is None:
cfile = file + (__debug__ and 'c' or 'o') cfile = file + (__debug__ and 'c' or 'o')
fc = open(cfile, 'wb') fc = open(cfile, 'wb')
fc.write('\0\0\0\0') fc.write(b'\0\0\0\0')
wr_long(fc, timestamp) wr_long(fc, timestamp)
marshal.dump(codeobject, fc) marshal.dump(codeobject, fc)
fc.flush() fc.flush()
......
...@@ -3,10 +3,8 @@ try: ...@@ -3,10 +3,8 @@ try:
import zlib import zlib
except ImportError: except ImportError:
zlib = None zlib = None
import zipfile, os, unittest, sys, shutil, struct, io
import zipfile, os, unittest, sys, shutil, struct
from StringIO import StringIO
from tempfile import TemporaryFile from tempfile import TemporaryFile
from random import randint, random from random import randint, random
...@@ -18,9 +16,10 @@ FIXEDTEST_SIZE = 10 ...@@ -18,9 +16,10 @@ FIXEDTEST_SIZE = 10
class TestsWithSourceFile(unittest.TestCase): class TestsWithSourceFile(unittest.TestCase):
def setUp(self): def setUp(self):
self.line_gen = ("Zipfile test line %d. random float: %f" % (i, random()) self.line_gen = (bytes("Zipfile test line %d. random float: %f" %
for i in range(FIXEDTEST_SIZE)) (i, random()))
self.data = '\n'.join(self.line_gen) + '\n' for i in range(FIXEDTEST_SIZE))
self.data = b'\n'.join(self.line_gen) + b'\n'
# Make a source file with some lines # Make a source file with some lines
fp = open(TESTFN, "wb") fp = open(TESTFN, "wb")
...@@ -45,14 +44,8 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -45,14 +44,8 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(zipfp.read("strfile"), self.data) self.assertEqual(zipfp.read("strfile"), self.data)
# Print the ZIP directory # Print the ZIP directory
fp = StringIO() fp = io.StringIO()
stdout = sys.stdout zipfp.printdir(file=fp)
try:
sys.stdout = fp
zipfp.printdir()
finally:
sys.stdout = stdout
directory = fp.getvalue() directory = fp.getvalue()
lines = directory.splitlines() lines = directory.splitlines()
...@@ -95,7 +88,7 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -95,7 +88,7 @@ class TestsWithSourceFile(unittest.TestCase):
zipfp.close() zipfp.close()
def testStored(self): def testStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipTest(f, zipfile.ZIP_STORED) self.zipTest(f, zipfile.ZIP_STORED)
def zipOpenTest(self, f, compression): def zipOpenTest(self, f, compression):
...@@ -119,12 +112,12 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -119,12 +112,12 @@ class TestsWithSourceFile(unittest.TestCase):
break break
zipdata2.append(read_data) zipdata2.append(read_data)
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(b''.join(zipdata1), self.data)
self.assertEqual(''.join(zipdata2), self.data) self.assertEqual(b''.join(zipdata2), self.data)
zipfp.close() zipfp.close()
def testOpenStored(self): def testOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipOpenTest(f, zipfile.ZIP_STORED) self.zipOpenTest(f, zipfile.ZIP_STORED)
def zipRandomOpenTest(self, f, compression): def zipRandomOpenTest(self, f, compression):
...@@ -140,11 +133,11 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -140,11 +133,11 @@ class TestsWithSourceFile(unittest.TestCase):
break break
zipdata1.append(read_data) zipdata1.append(read_data)
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(b''.join(zipdata1), self.data)
zipfp.close() zipfp.close()
def testRandomOpenStored(self): def testRandomOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipRandomOpenTest(f, zipfile.ZIP_STORED) self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
def zipReadlineTest(self, f, compression): def zipReadlineTest(self, f, compression):
...@@ -181,40 +174,40 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -181,40 +174,40 @@ class TestsWithSourceFile(unittest.TestCase):
zipfp.close() zipfp.close()
def testReadlineStored(self): def testReadlineStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipReadlineTest(f, zipfile.ZIP_STORED) self.zipReadlineTest(f, zipfile.ZIP_STORED)
def testReadlinesStored(self): def testReadlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipReadlinesTest(f, zipfile.ZIP_STORED) self.zipReadlinesTest(f, zipfile.ZIP_STORED)
def testIterlinesStored(self): def testIterlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipIterlinesTest(f, zipfile.ZIP_STORED) self.zipIterlinesTest(f, zipfile.ZIP_STORED)
if zlib: if zlib:
def testDeflated(self): def testDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipTest(f, zipfile.ZIP_DEFLATED) self.zipTest(f, zipfile.ZIP_DEFLATED)
def testOpenDeflated(self): def testOpenDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipOpenTest(f, zipfile.ZIP_DEFLATED) self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
def testRandomOpenDeflated(self): def testRandomOpenDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED) self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
def testReadlineDeflated(self): def testReadlineDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipReadlineTest(f, zipfile.ZIP_DEFLATED) self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
def testReadlinesDeflated(self): def testReadlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED) self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
def testIterlinesDeflated(self): def testIterlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED) self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
def testLowCompression(self): def testLowCompression(self):
...@@ -227,8 +220,8 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -227,8 +220,8 @@ class TestsWithSourceFile(unittest.TestCase):
# Get an open object for strfile # Get an open object for strfile
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
openobj = zipfp.open("strfile") openobj = zipfp.open("strfile")
self.assertEqual(openobj.read(1), '1') self.assertEqual(openobj.read(1), b'1')
self.assertEqual(openobj.read(1), '2') self.assertEqual(openobj.read(1), b'2')
def testAbsoluteArcnames(self): def testAbsoluteArcnames(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
...@@ -251,8 +244,9 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -251,8 +244,9 @@ class TestZip64InSmallFiles(unittest.TestCase):
self._limit = zipfile.ZIP64_LIMIT self._limit = zipfile.ZIP64_LIMIT
zipfile.ZIP64_LIMIT = 5 zipfile.ZIP64_LIMIT = 5
line_gen = ("Test of zipfile line %d." % i for i in range(0, FIXEDTEST_SIZE)) line_gen = (bytes("Test of zipfile line %d." % i)
self.data = '\n'.join(line_gen) for i in range(0, FIXEDTEST_SIZE))
self.data = b'\n'.join(line_gen)
# Make a source file with some lines # Make a source file with some lines
fp = open(TESTFN, "wb") fp = open(TESTFN, "wb")
...@@ -272,7 +266,7 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -272,7 +266,7 @@ class TestZip64InSmallFiles(unittest.TestCase):
zipfp.close() zipfp.close()
def testLargeFileException(self): def testLargeFileException(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.largeFileExceptionTest(f, zipfile.ZIP_STORED) self.largeFileExceptionTest(f, zipfile.ZIP_STORED)
self.largeFileExceptionTest2(f, zipfile.ZIP_STORED) self.largeFileExceptionTest2(f, zipfile.ZIP_STORED)
...@@ -291,14 +285,8 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -291,14 +285,8 @@ class TestZip64InSmallFiles(unittest.TestCase):
self.assertEqual(zipfp.read("strfile"), self.data) self.assertEqual(zipfp.read("strfile"), self.data)
# Print the ZIP directory # Print the ZIP directory
fp = StringIO() fp = io.StringIO()
stdout = sys.stdout zipfp.printdir(fp)
try:
sys.stdout = fp
zipfp.printdir()
finally:
sys.stdout = stdout
directory = fp.getvalue() directory = fp.getvalue()
lines = directory.splitlines() lines = directory.splitlines()
...@@ -343,13 +331,13 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -343,13 +331,13 @@ class TestZip64InSmallFiles(unittest.TestCase):
zipfp.close() zipfp.close()
def testStored(self): def testStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipTest(f, zipfile.ZIP_STORED) self.zipTest(f, zipfile.ZIP_STORED)
if zlib: if zlib:
def testDeflated(self): def testDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipTest(f, zipfile.ZIP_DEFLATED) self.zipTest(f, zipfile.ZIP_DEFLATED)
def testAbsoluteArcnames(self): def testAbsoluteArcnames(self):
...@@ -440,7 +428,7 @@ class OtherTests(unittest.TestCase): ...@@ -440,7 +428,7 @@ class OtherTests(unittest.TestCase):
os.unlink(TESTFN) os.unlink(TESTFN)
filename = 'testfile.txt' filename = 'testfile.txt'
content = 'hello, world. this is some content.' content = b'hello, world. this is some content.'
try: try:
zf = zipfile.ZipFile(TESTFN, 'a') zf = zipfile.ZipFile(TESTFN, 'a')
...@@ -483,7 +471,7 @@ class OtherTests(unittest.TestCase): ...@@ -483,7 +471,7 @@ class OtherTests(unittest.TestCase):
# This test checks that the is_zipfile function correctly identifies # This test checks that the is_zipfile function correctly identifies
# a file that is a zip file # a file that is a zip file
zipf = zipfile.ZipFile(TESTFN, mode="w") zipf = zipfile.ZipFile(TESTFN, mode="w")
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
zipf.close() zipf.close()
chk = zipfile.is_zipfile(TESTFN) chk = zipfile.is_zipfile(TESTFN)
self.assert_(chk is True) self.assert_(chk is True)
...@@ -504,7 +492,7 @@ class OtherTests(unittest.TestCase): ...@@ -504,7 +492,7 @@ class OtherTests(unittest.TestCase):
def testClosedZipRaisesRuntimeError(self): def testClosedZipRaisesRuntimeError(self):
# Verify that testzip() doesn't swallow inappropriate exceptions. # Verify that testzip() doesn't swallow inappropriate exceptions.
data = StringIO() data = io.BytesIO()
zipf = zipfile.ZipFile(data, mode="w") zipf = zipfile.ZipFile(data, mode="w")
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close() zipf.close()
...@@ -525,15 +513,15 @@ class DecryptionTests(unittest.TestCase): ...@@ -525,15 +513,15 @@ class DecryptionTests(unittest.TestCase):
# ZIP file # ZIP file
data = ( data = (
'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
'\x00\x00L\x00\x00\x00\x00\x00' ) b'\x00\x00L\x00\x00\x00\x00\x00' )
plain = 'zipfile.py encryption test' plain = b'zipfile.py encryption test'
def setUp(self): def setUp(self):
fp = open(TESTFN, "wb") fp = open(TESTFN, "wb")
...@@ -551,18 +539,19 @@ class DecryptionTests(unittest.TestCase): ...@@ -551,18 +539,19 @@ class DecryptionTests(unittest.TestCase):
self.assertRaises(RuntimeError, self.zip.read, "test.txt") self.assertRaises(RuntimeError, self.zip.read, "test.txt")
def testBadPassword(self): def testBadPassword(self):
self.zip.setpassword("perl") self.zip.setpassword(b"perl")
self.assertRaises(RuntimeError, self.zip.read, "test.txt") self.assertRaises(RuntimeError, self.zip.read, "test.txt")
def testGoodPassword(self): def testGoodPassword(self):
self.zip.setpassword("python") self.zip.setpassword(b"python")
self.assertEquals(self.zip.read("test.txt"), self.plain) self.assertEquals(self.zip.read("test.txt"), self.plain)
class TestsWithRandomBinaryFiles(unittest.TestCase): class TestsWithRandomBinaryFiles(unittest.TestCase):
def setUp(self): def setUp(self):
datacount = randint(16, 64)*1024 + randint(1, 1024) datacount = randint(16, 64)*1024 + randint(1, 1024)
self.data = ''.join((struct.pack('<f', random()*randint(-1000, 1000)) for i in range(datacount))) self.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
for i in range(datacount))
# Make a source file with some lines # Make a source file with some lines
fp = open(TESTFN, "wb") fp = open(TESTFN, "wb")
...@@ -592,7 +581,7 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -592,7 +581,7 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
zipfp.close() zipfp.close()
def testStored(self): def testStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipTest(f, zipfile.ZIP_STORED) self.zipTest(f, zipfile.ZIP_STORED)
def zipOpenTest(self, f, compression): def zipOpenTest(self, f, compression):
...@@ -616,17 +605,17 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -616,17 +605,17 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
break break
zipdata2.append(read_data) zipdata2.append(read_data)
testdata1 = ''.join(zipdata1) testdata1 = b''.join(zipdata1)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data) self.assertEqual(testdata1, self.data)
testdata2 = ''.join(zipdata2) testdata2 = b''.join(zipdata2)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data) self.assertEqual(testdata1, self.data)
zipfp.close() zipfp.close()
def testOpenStored(self): def testOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipOpenTest(f, zipfile.ZIP_STORED) self.zipOpenTest(f, zipfile.ZIP_STORED)
def zipRandomOpenTest(self, f, compression): def zipRandomOpenTest(self, f, compression):
...@@ -642,13 +631,13 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -642,13 +631,13 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
break break
zipdata1.append(read_data) zipdata1.append(read_data)
testdata = ''.join(zipdata1) testdata = b''.join(zipdata1)
self.assertEqual(len(testdata), len(self.data)) self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data) self.assertEqual(testdata, self.data)
zipfp.close() zipfp.close()
def testRandomOpenStored(self): def testRandomOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zipRandomOpenTest(f, zipfile.ZIP_STORED) self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
class TestsWithMultipleOpens(unittest.TestCase): class TestsWithMultipleOpens(unittest.TestCase):
...@@ -682,8 +671,8 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -682,8 +671,8 @@ class TestsWithMultipleOpens(unittest.TestCase):
data2 = zopen2.read(500) data2 = zopen2.read(500)
data1 += zopen1.read(500) data1 += zopen1.read(500)
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, '1'*FIXEDTEST_SIZE) self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE) self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
zipf.close() zipf.close()
def testInterleaved(self): def testInterleaved(self):
...@@ -696,8 +685,8 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -696,8 +685,8 @@ class TestsWithMultipleOpens(unittest.TestCase):
data2 = zopen2.read(500) data2 = zopen2.read(500)
data1 += zopen1.read(500) data1 += zopen1.read(500)
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, '1'*FIXEDTEST_SIZE) self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE) self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
zipf.close() zipf.close()
def tearDown(self): def tearDown(self):
...@@ -706,13 +695,19 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -706,13 +695,19 @@ class TestsWithMultipleOpens(unittest.TestCase):
class UniversalNewlineTests(unittest.TestCase): class UniversalNewlineTests(unittest.TestCase):
def setUp(self): def setUp(self):
self.line_gen = ["Test of zipfile line %d." % i for i in range(FIXEDTEST_SIZE)] self.line_gen = [bytes("Test of zipfile line %d." % i)
for i in range(FIXEDTEST_SIZE)]
self.seps = ('\r', '\r\n', '\n') self.seps = ('\r', '\r\n', '\n')
self.arcdata, self.arcfiles = {}, {} self.arcdata, self.arcfiles = {}, {}
for n, s in enumerate(self.seps): for n, s in enumerate(self.seps):
self.arcdata[s] = s.join(self.line_gen) + s b = s.encode("ascii")
self.arcdata[s] = b.join(self.line_gen) + b
self.arcfiles[s] = '%s-%d' % (TESTFN, n) self.arcfiles[s] = '%s-%d' % (TESTFN, n)
file(self.arcfiles[s], "wb").write(self.arcdata[s]) f = open(self.arcfiles[s], "wb")
try:
f.write(self.arcdata[s])
finally:
f.close()
def makeTestArchive(self, f, compression): def makeTestArchive(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
...@@ -741,7 +736,7 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -741,7 +736,7 @@ class UniversalNewlineTests(unittest.TestCase):
zipopen = zipfp.open(fn, "rU") zipopen = zipfp.open(fn, "rU")
for line in self.line_gen: for line in self.line_gen:
linedata = zipopen.readline() linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n') self.assertEqual(linedata, line + b'\n')
zipfp.close() zipfp.close()
...@@ -753,7 +748,7 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -753,7 +748,7 @@ class UniversalNewlineTests(unittest.TestCase):
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
ziplines = zipfp.open(fn, "rU").readlines() ziplines = zipfp.open(fn, "rU").readlines()
for line, zipline in zip(self.line_gen, ziplines): for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + b'\n')
zipfp.close() zipfp.close()
...@@ -764,41 +759,41 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -764,41 +759,41 @@ class UniversalNewlineTests(unittest.TestCase):
zipfp = zipfile.ZipFile(f, "r") zipfp = zipfile.ZipFile(f, "r")
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")): for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + b'\n')
zipfp.close() zipfp.close()
def testReadStored(self): def testReadStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readTest(f, zipfile.ZIP_STORED) self.readTest(f, zipfile.ZIP_STORED)
def testReadlineStored(self): def testReadlineStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlineTest(f, zipfile.ZIP_STORED) self.readlineTest(f, zipfile.ZIP_STORED)
def testReadlinesStored(self): def testReadlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlinesTest(f, zipfile.ZIP_STORED) self.readlinesTest(f, zipfile.ZIP_STORED)
def testIterlinesStored(self): def testIterlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlinesTest(f, zipfile.ZIP_STORED) self.iterlinesTest(f, zipfile.ZIP_STORED)
if zlib: if zlib:
def testReadDeflated(self): def testReadDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readTest(f, zipfile.ZIP_DEFLATED) self.readTest(f, zipfile.ZIP_DEFLATED)
def testReadlineDeflated(self): def testReadlineDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlineTest(f, zipfile.ZIP_DEFLATED) self.readlineTest(f, zipfile.ZIP_DEFLATED)
def testReadlinesDeflated(self): def testReadlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlinesTest(f, zipfile.ZIP_DEFLATED) self.readlinesTest(f, zipfile.ZIP_DEFLATED)
def testIterlinesDeflated(self): def testIterlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlinesTest(f, zipfile.ZIP_DEFLATED) self.iterlinesTest(f, zipfile.ZIP_DEFLATED)
def tearDown(self): def tearDown(self):
......
""" """
Read and write ZIP files. Read and write ZIP files.
XXX references to utf-8 need further investigation.
""" """
import struct, os, time, sys import struct, os, time, sys
import binascii, io import binascii, io
...@@ -33,15 +35,15 @@ ZIP_DEFLATED = 8 ...@@ -33,15 +35,15 @@ ZIP_DEFLATED = 8
# Here are some struct module formats for reading headers # Here are some struct module formats for reading headers
structEndArchive = "<4s4H2lH" # 9 items, end of archive, 22 bytes structEndArchive = "<4s4H2lH" # 9 items, end of archive, 22 bytes
stringEndArchive = "PK\005\006" # magic number for end of archive record stringEndArchive = b"PK\005\006" # magic number for end of archive record
structCentralDir = "<4s4B4HlLL5HLl"# 19 items, central directory, 46 bytes structCentralDir = "<4s4B4HlLL5HLl"# 19 items, central directory, 46 bytes
stringCentralDir = "PK\001\002" # magic number for central directory stringCentralDir = b"PK\001\002" # magic number for central directory
structFileHeader = "<4s2B4HlLL2H" # 12 items, file header record, 30 bytes structFileHeader = "<4s2B4HlLL2H" # 12 items, file header record, 30 bytes
stringFileHeader = "PK\003\004" # magic number for file header stringFileHeader = b"PK\003\004" # magic number for file header
structEndArchive64Locator = "<4slql" # 4 items, locate Zip64 header, 20 bytes structEndArchive64Locator = "<4slql" # 4 items, locate Zip64 header, 20 bytes
stringEndArchive64Locator = "PK\x06\x07" # magic token for locator header stringEndArchive64Locator = b"PK\x06\x07" # magic token for locator header
structEndArchive64 = "<4sqhhllqqqq" # 10 items, end of archive (Zip64), 56 bytes structEndArchive64 = "<4sqhhllqqqq" # 10 items, end of archive (Zip64), 56 bytes
stringEndArchive64 = "PK\x06\x06" # magic token for Zip64 header stringEndArchive64 = b"PK\x06\x06" # magic token for Zip64 header
# indexes of entries in the central directory structure # indexes of entries in the central directory structure
...@@ -82,7 +84,7 @@ _FH_EXTRA_FIELD_LENGTH = 11 ...@@ -82,7 +84,7 @@ _FH_EXTRA_FIELD_LENGTH = 11
def is_zipfile(filename): def is_zipfile(filename):
"""Quickly see if file is a ZIP file by checking the magic number.""" """Quickly see if file is a ZIP file by checking the magic number."""
try: try:
fpin = open(filename, "rb") fpin = io.open(filename, "rb")
endrec = _EndRecData(fpin) endrec = _EndRecData(fpin)
fpin.close() fpin.close()
if endrec: if endrec:
...@@ -206,8 +208,8 @@ class ZipInfo (object): ...@@ -206,8 +208,8 @@ class ZipInfo (object):
self.date_time = date_time # year, month, day, hour, min, sec self.date_time = date_time # year, month, day, hour, min, sec
# Standard values: # Standard values:
self.compress_type = ZIP_STORED # Type of compression for the file self.compress_type = ZIP_STORED # Type of compression for the file
self.comment = "" # Comment for each file self.comment = b"" # Comment for each file
self.extra = "" # ZIP extra data self.extra = b"" # ZIP extra data
if sys.platform == 'win32': if sys.platform == 'win32':
self.create_system = 0 # System which created ZIP archive self.create_system = 0 # System which created ZIP archive
else: else:
...@@ -257,7 +259,7 @@ class ZipInfo (object): ...@@ -257,7 +259,7 @@ class ZipInfo (object):
self.compress_type, dostime, dosdate, CRC, self.compress_type, dostime, dosdate, CRC,
compress_size, file_size, compress_size, file_size,
len(self.filename), len(extra)) len(self.filename), len(extra))
return header + self.filename + extra return header + self.filename.encode("utf-8") + extra
def _decodeExtra(self): def _decodeExtra(self):
# Try to decode the extra field. # Try to decode the extra field.
...@@ -331,7 +333,7 @@ class _ZipDecrypter: ...@@ -331,7 +333,7 @@ class _ZipDecrypter:
def _crc32(self, ch, crc): def _crc32(self, ch, crc):
"""Compute the CRC32 primitive on one byte.""" """Compute the CRC32 primitive on one byte."""
return ((crc >> 8) & 0xffffff) ^ self.crctable[(crc ^ ord(ch)) & 0xff] return ((crc >> 8) & 0xffffff) ^ self.crctable[(crc ^ ch) & 0xff]
def __init__(self, pwd): def __init__(self, pwd):
self.key0 = 305419896 self.key0 = 305419896
...@@ -344,20 +346,13 @@ class _ZipDecrypter: ...@@ -344,20 +346,13 @@ class _ZipDecrypter:
self.key0 = self._crc32(c, self.key0) self.key0 = self._crc32(c, self.key0)
self.key1 = (self.key1 + (self.key0 & 255)) & 4294967295 self.key1 = (self.key1 + (self.key0 & 255)) & 4294967295
self.key1 = (self.key1 * 134775813 + 1) & 4294967295 self.key1 = (self.key1 * 134775813 + 1) & 4294967295
self.key2 = self._crc32(chr((self.key1 >> 24) & 255), self.key2) self.key2 = self._crc32((self.key1 >> 24) & 255, self.key2)
def __call__(self, c): def __call__(self, c):
"""Decrypt a single character.""" """Decrypt a single character."""
# XXX When this is called with a byte instead of a char, ord() assert isinstance(c, int)
# isn't needed. Don't die in that case. In the future we should
# just leave this out, once we're always using bytes.
try:
c = ord(c)
except TypeError:
pass
k = self.key2 | 2 k = self.key2 | 2
c = c ^ (((k * (k^1)) >> 8) & 255) c = c ^ (((k * (k^1)) >> 8) & 255)
c = chr(c)
self._UpdateKeys(c) self._UpdateKeys(c)
return c return c
...@@ -370,13 +365,13 @@ class ZipExtFile: ...@@ -370,13 +365,13 @@ class ZipExtFile:
self.fileobj = fileobj self.fileobj = fileobj
self.decrypter = decrypt self.decrypter = decrypt
self.bytes_read = 0 self.bytes_read = 0
self.rawbuffer = '' self.rawbuffer = b''
self.readbuffer = '' self.readbuffer = b''
self.linebuffer = '' self.linebuffer = b''
self.eof = False self.eof = False
self.univ_newlines = False self.univ_newlines = False
self.nlSeps = ("\n", ) self.nlSeps = (b"\n", )
self.lastdiscard = '' self.lastdiscard = b''
self.compress_type = zipinfo.compress_type self.compress_type = zipinfo.compress_type
self.compress_size = zipinfo.compress_size self.compress_size = zipinfo.compress_size
...@@ -394,9 +389,9 @@ class ZipExtFile: ...@@ -394,9 +389,9 @@ class ZipExtFile:
self.univ_newlines = univ_newlines self.univ_newlines = univ_newlines
# pick line separator char(s) based on universal newlines flag # pick line separator char(s) based on universal newlines flag
self.nlSeps = ("\n", ) self.nlSeps = (b"\n", )
if self.univ_newlines: if self.univ_newlines:
self.nlSeps = ("\r\n", "\r", "\n") self.nlSeps = (b"\r\n", b"\r", b"\n")
def __iter__(self): def __iter__(self):
return self return self
...@@ -417,7 +412,7 @@ class ZipExtFile: ...@@ -417,7 +412,7 @@ class ZipExtFile:
# ugly check for cases where half of an \r\n pair was # ugly check for cases where half of an \r\n pair was
# read on the last pass, and the \r was discarded. In this # read on the last pass, and the \r was discarded. In this
# case we just throw away the \n at the start of the buffer. # case we just throw away the \n at the start of the buffer.
if (self.lastdiscard, self.linebuffer[0]) == ('\r','\n'): if (self.lastdiscard, self.linebuffer[0]) == (b'\r', b'\n'):
self.linebuffer = self.linebuffer[1:] self.linebuffer = self.linebuffer[1:]
for sep in self.nlSeps: for sep in self.nlSeps:
...@@ -435,7 +430,7 @@ class ZipExtFile: ...@@ -435,7 +430,7 @@ class ZipExtFile:
if size < 0: if size < 0:
size = sys.maxint size = sys.maxint
elif size == 0: elif size == 0:
return '' return b''
# check for a newline already in buffer # check for a newline already in buffer
nl, nllen = self._checkfornewline() nl, nllen = self._checkfornewline()
...@@ -461,7 +456,7 @@ class ZipExtFile: ...@@ -461,7 +456,7 @@ class ZipExtFile:
# so return current buffer # so return current buffer
if nl < 0: if nl < 0:
s = self.linebuffer s = self.linebuffer
self.linebuffer = '' self.linebuffer = b''
return s return s
buf = self.linebuffer[:nl] buf = self.linebuffer[:nl]
...@@ -470,7 +465,7 @@ class ZipExtFile: ...@@ -470,7 +465,7 @@ class ZipExtFile:
# line is always returned with \n as newline char (except possibly # line is always returned with \n as newline char (except possibly
# for a final incomplete line in the file, which is handled above). # for a final incomplete line in the file, which is handled above).
return buf + "\n" return buf + b"\n"
def readlines(self, sizehint = -1): def readlines(self, sizehint = -1):
"""Return a list with all (following) lines. The sizehint parameter """Return a list with all (following) lines. The sizehint parameter
...@@ -516,18 +511,23 @@ class ZipExtFile: ...@@ -516,18 +511,23 @@ class ZipExtFile:
# try to read from file (if necessary) # try to read from file (if necessary)
if bytesToRead > 0: if bytesToRead > 0:
bytes = self.fileobj.read(bytesToRead) data = self.fileobj.read(bytesToRead)
self.bytes_read += len(bytes) self.bytes_read += len(data)
self.rawbuffer += bytes try:
self.rawbuffer += data
except:
print(repr(self.fileobj), repr(self.rawbuffer),
repr(data))
raise
# handle contents of raw buffer # handle contents of raw buffer
if self.rawbuffer: if self.rawbuffer:
newdata = self.rawbuffer newdata = self.rawbuffer
self.rawbuffer = '' self.rawbuffer = b''
# decrypt new data if we were given an object to handle that # decrypt new data if we were given an object to handle that
if newdata and self.decrypter is not None: if newdata and self.decrypter is not None:
newdata = ''.join(map(self.decrypter, newdata)) newdata = bytes(map(self.decrypter, newdata))
# decompress newly read data if necessary # decompress newly read data if necessary
if newdata and self.compress_type == ZIP_DEFLATED: if newdata and self.compress_type == ZIP_DEFLATED:
...@@ -546,13 +546,13 @@ class ZipExtFile: ...@@ -546,13 +546,13 @@ class ZipExtFile:
# return what the user asked for # return what the user asked for
if size is None or len(self.readbuffer) <= size: if size is None or len(self.readbuffer) <= size:
bytes = self.readbuffer data = self.readbuffer
self.readbuffer = '' self.readbuffer = b''
else: else:
bytes = self.readbuffer[:size] data = self.readbuffer[:size]
self.readbuffer = self.readbuffer[size:] self.readbuffer = self.readbuffer[size:]
return bytes return data
class ZipFile: class ZipFile:
...@@ -593,15 +593,16 @@ class ZipFile: ...@@ -593,15 +593,16 @@ class ZipFile:
# Check if we were passed a file-like object # Check if we were passed a file-like object
if isinstance(file, basestring): if isinstance(file, basestring):
# No, it's a filename
self._filePassed = 0 self._filePassed = 0
self.filename = file self.filename = file
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'} modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'}
try: try:
self.fp = open(file, modeDict[mode]) self.fp = io.open(file, modeDict[mode])
except IOError: except IOError:
if mode == 'a': if mode == 'a':
mode = key = 'w' mode = key = 'w'
self.fp = open(file, modeDict[mode]) self.fp = io.open(file, modeDict[mode])
else: else:
raise raise
else: else:
...@@ -661,7 +662,7 @@ class ZipFile: ...@@ -661,7 +662,7 @@ class ZipFile:
self.start_dir = offset_cd + concat self.start_dir = offset_cd + concat
fp.seek(self.start_dir, 0) fp.seek(self.start_dir, 0)
data = fp.read(size_cd) data = fp.read(size_cd)
fp = io.StringIO(data) fp = io.BytesIO(data)
total = 0 total = 0
while total < size_cd: while total < size_cd:
centdir = fp.read(46) centdir = fp.read(46)
...@@ -673,7 +674,7 @@ class ZipFile: ...@@ -673,7 +674,7 @@ class ZipFile:
print(centdir) print(centdir)
filename = fp.read(centdir[_CD_FILENAME_LENGTH]) filename = fp.read(centdir[_CD_FILENAME_LENGTH])
# Create ZipInfo instance to store file information # Create ZipInfo instance to store file information
x = ZipInfo(filename) x = ZipInfo(str(filename))
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
total = (total + centdir[_CD_FILENAME_LENGTH] total = (total + centdir[_CD_FILENAME_LENGTH]
...@@ -708,12 +709,16 @@ class ZipFile: ...@@ -708,12 +709,16 @@ class ZipFile:
archive.""" archive."""
return self.filelist return self.filelist
def printdir(self): def printdir(self, file=None):
"""Print a table of contents for the zip file.""" """Print a table of contents for the zip file."""
print("%-46s %19s %12s" % ("File Name", "Modified ", "Size")) if file is None:
file = sys.stdout
print("%-46s %19s %12s" % ("File Name", "Modified ", "Size"),
file=file)
for zinfo in self.filelist: for zinfo in self.filelist:
date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time
print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size)) print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size),
file=file)
def testzip(self): def testzip(self):
"""Read all the files and check the CRC.""" """Read all the files and check the CRC."""
...@@ -730,6 +735,7 @@ class ZipFile: ...@@ -730,6 +735,7 @@ class ZipFile:
def setpassword(self, pwd): def setpassword(self, pwd):
"""Set default password for encrypted files.""" """Set default password for encrypted files."""
assert isinstance(pwd, bytes)
self.pwd = pwd self.pwd = pwd
def read(self, name, pwd=None): def read(self, name, pwd=None):
...@@ -749,7 +755,7 @@ class ZipFile: ...@@ -749,7 +755,7 @@ class ZipFile:
if self._filePassed: if self._filePassed:
zef_file = self.fp zef_file = self.fp
else: else:
zef_file = open(self.filename, 'rb') zef_file = io.open(self.filename, 'rb')
# Get info object for name # Get info object for name
zinfo = self.getinfo(name) zinfo = self.getinfo(name)
...@@ -768,9 +774,9 @@ class ZipFile: ...@@ -768,9 +774,9 @@ class ZipFile:
if fheader[_FH_EXTRA_FIELD_LENGTH]: if fheader[_FH_EXTRA_FIELD_LENGTH]:
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
if fname != zinfo.orig_filename: if fname != zinfo.orig_filename.encode("utf-8"):
raise BadZipfile, \ raise BadZipfile, \
'File name in directory "%s" and header "%s" differ.' % ( 'File name in directory %r and header %r differ.' % (
zinfo.orig_filename, fname) zinfo.orig_filename, fname)
# check for encrypted flag & handle password # check for encrypted flag & handle password
...@@ -790,7 +796,7 @@ class ZipFile: ...@@ -790,7 +796,7 @@ class ZipFile:
# and is used to check the correctness of the password. # and is used to check the correctness of the password.
bytes = zef_file.read(12) bytes = zef_file.read(12)
h = map(zd, bytes[0:12]) h = map(zd, bytes[0:12])
if ord(h[11]) != ((zinfo.CRC>>24)&255): if h[11] != ((zinfo.CRC>>24) & 255):
raise RuntimeError, "Bad password for file %s" % name raise RuntimeError, "Bad password for file %s" % name
# build and return a ZipExtFile # build and return a ZipExtFile
...@@ -852,7 +858,7 @@ class ZipFile: ...@@ -852,7 +858,7 @@ class ZipFile:
self._writecheck(zinfo) self._writecheck(zinfo)
self._didModify = True self._didModify = True
fp = open(filename, "rb") fp = io.open(filename, "rb")
# Must overwrite CRC and sizes with correct data later # Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0 zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0 zinfo.compress_size = compress_size = 0
...@@ -982,7 +988,7 @@ class ZipFile: ...@@ -982,7 +988,7 @@ class ZipFile:
0, zinfo.internal_attr, zinfo.external_attr, 0, zinfo.internal_attr, zinfo.external_attr,
header_offset) header_offset)
self.fp.write(centdir) self.fp.write(centdir)
self.fp.write(zinfo.filename) self.fp.write(zinfo.filename.encode("utf-8"))
self.fp.write(extra_data) self.fp.write(extra_data)
self.fp.write(zinfo.comment) self.fp.write(zinfo.comment)
...@@ -1163,7 +1169,7 @@ def main(args = None): ...@@ -1163,7 +1169,7 @@ def main(args = None):
tgtdir = os.path.dirname(tgt) tgtdir = os.path.dirname(tgt)
if not os.path.exists(tgtdir): if not os.path.exists(tgtdir):
os.makedirs(tgtdir) os.makedirs(tgtdir)
fp = open(tgt, 'wb') fp = io.open(tgt, 'wb')
fp.write(zf.read(path)) fp.write(zf.read(path))
fp.close() fp.close()
zf.close() zf.close()
......
...@@ -1635,27 +1635,57 @@ s_pack_internal(PyStructObject *soself, PyObject *args, int offset, char* buf) ...@@ -1635,27 +1635,57 @@ s_pack_internal(PyStructObject *soself, PyObject *args, int offset, char* buf)
const formatdef *e = code->fmtdef; const formatdef *e = code->fmtdef;
char *res = buf + code->offset; char *res = buf + code->offset;
if (e->format == 's') { if (e->format == 's') {
if (!PyString_Check(v)) { int isstring;
void *p;
if (PyUnicode_Check(v)) {
v = _PyUnicode_AsDefaultEncodedString(v, NULL);
if (v == NULL)
return -1;
}
isstring = PyString_Check(v);
if (!isstring && !PyBytes_Check(v)) {
PyErr_SetString(StructError, PyErr_SetString(StructError,
"argument for 's' must be a string"); "argument for 's' must be a string");
return -1; return -1;
} }
n = PyString_GET_SIZE(v); if (isstring) {
n = PyString_GET_SIZE(v);
p = PyString_AS_STRING(v);
}
else {
n = PyBytes_GET_SIZE(v);
p = PyBytes_AS_STRING(v);
}
if (n > code->size) if (n > code->size)
n = code->size; n = code->size;
if (n > 0) if (n > 0)
memcpy(res, PyString_AS_STRING(v), n); memcpy(res, p, n);
} else if (e->format == 'p') { } else if (e->format == 'p') {
if (!PyString_Check(v)) { int isstring;
void *p;
if (PyUnicode_Check(v)) {
v = _PyUnicode_AsDefaultEncodedString(v, NULL);
if (v == NULL)
return -1;
}
isstring = PyString_Check(v);
if (!isstring && !PyBytes_Check(v)) {
PyErr_SetString(StructError, PyErr_SetString(StructError,
"argument for 'p' must be a string"); "argument for 'p' must be a string");
return -1; return -1;
} }
n = PyString_GET_SIZE(v); if (isstring) {
n = PyString_GET_SIZE(v);
p = PyString_AS_STRING(v);
}
else {
n = PyBytes_GET_SIZE(v);
p = PyBytes_AS_STRING(v);
}
if (n > (code->size - 1)) if (n > (code->size - 1))
n = code->size - 1; n = code->size - 1;
if (n > 0) if (n > 0)
memcpy(res + 1, PyString_AS_STRING(v), n); memcpy(res + 1, p, n);
if (n > 255) if (n > 255)
n = 255; n = 255;
*res = Py_SAFE_DOWNCAST(n, Py_ssize_t, unsigned char); *res = Py_SAFE_DOWNCAST(n, Py_ssize_t, unsigned char);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment