Commit f44c7e89 authored by Ka-Ping Yee's avatar Ka-Ping Yee

Make TextIOWrapper's seek/tell work properly with stateful decoders;

document and rename things to make seek/tell workings a little clearer.

Add a weird decoder for testing TextIOWrapper's seek/tell methods.

Document the getstate/setstate protocol conventions for IncrementalDecoders.
parent b5dc90b5
......@@ -237,7 +237,7 @@ class IncrementalDecoder(object):
"""
def __init__(self, errors='strict'):
"""
Creates a IncrementalDecoder instance.
Create a IncrementalDecoder instance.
The IncrementalDecoder may use different error handling schemes by
providing the errors keyword argument. See the module docstring
......@@ -247,28 +247,35 @@ class IncrementalDecoder(object):
def decode(self, input, final=False):
"""
Decodes input and returns the resulting object.
Decode input and returns the resulting object.
"""
raise NotImplementedError
def reset(self):
"""
Resets the decoder to the initial state.
Reset the decoder to the initial state.
"""
def getstate(self):
"""
Return the current state of the decoder. This must be a
(buffered_input, additional_state_info) tuple. By convention,
additional_state_info should represent the state of the decoder
WITHOUT yet having processed the contents of buffered_input.
Return the current state of the decoder.
This must be a (buffered_input, additional_state_info) tuple.
buffered_input must be a bytes object containing bytes that
were passed to decode() that have not yet been converted.
additional_state_info must be a non-negative integer
representing the state of the decoder WITHOUT yet having
processed the contents of buffered_input. In the initial state
and after reset(), getstate() must return (b"", 0).
"""
return (b"", 0)
def setstate(self, state):
"""
Set the current state of the decoder. state must have been
returned by getstate().
Set the current state of the decoder.
state must have been returned by getstate(). The effect of
setstate((b"", 0)) must be equivalent to reset().
"""
class BufferedIncrementalDecoder(IncrementalDecoder):
......
This diff is collapsed.
......@@ -8,6 +8,7 @@ import unittest
from itertools import chain
from test import test_support
import codecs
import io # The module under test
......@@ -486,6 +487,122 @@ class BufferedRandomTest(unittest.TestCase):
self.assertEquals(b"fl", rw.read(11))
self.assertRaises(TypeError, rw.seek, 0.0)
# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
# properties:
# - A single output character can correspond to many bytes of input.
# - The number of input bytes to complete the character can be
# undetermined until the last input byte is received.
# - The number of input bytes can vary depending on previous input.
# - A single input byte can correspond to many characters of output.
# - The number of output characters can be undetermined until the
# last input byte is received.
# - The number of output characters can vary depending on previous input.
class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
"""
For testing seek/tell behavior with a stateful, buffering decoder.
Input is a sequence of words. Words may be fixed-length (length set
by input) or variable-length (period-terminated). In variable-length
mode, extra periods are ignored. Possible words are:
- 'i' followed by a number sets the input length, I (maximum 99).
When I is set to 0, words are space-terminated.
- 'o' followed by a number sets the output length, O (maximum 99).
- Any other word is converted into a word followed by a period on
the output. The output word consists of the input word truncated
or padded out with hyphens to make its length equal to O. If O
is 0, the word is output verbatim without truncating or padding.
I and O are initially set to 1. When I changes, any buffered input is
re-scanned according to the new I. EOF also terminates the last word.
"""
def __init__(self, errors='strict'):
codecs.IncrementalEncoder.__init__(self, errors)
self.reset()
def __repr__(self):
return '<SID %x>' % id(self)
def reset(self):
self.i = 1
self.o = 1
self.buffer = bytearray()
def getstate(self):
i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
return bytes(self.buffer), i*100 + o
def setstate(self, state):
buffer, io = state
self.buffer = bytearray(buffer)
i, o = divmod(io, 100)
self.i, self.o = i ^ 1, o ^ 1
def decode(self, input, final=False):
output = ''
for b in input:
if self.i == 0: # variable-length, terminated with period
if b == ord('.'):
if self.buffer:
output += self.process_word()
else:
self.buffer.append(b)
else: # fixed-length, terminate after self.i bytes
self.buffer.append(b)
if len(self.buffer) == self.i:
output += self.process_word()
if final and self.buffer: # EOF terminates the last word
output += self.process_word()
return output
def process_word(self):
output = ''
if self.buffer[0] == ord('i'):
self.i = min(99, int(self.buffer[1:] or 0)) # set input length
elif self.buffer[0] == ord('o'):
self.o = min(99, int(self.buffer[1:] or 0)) # set output length
else:
output = self.buffer.decode('ascii')
if len(output) < self.o:
output += '-'*self.o # pad out with hyphens
if self.o:
output = output[:self.o] # truncate to output length
output += '.'
self.buffer = bytearray()
return output
class StatefulIncrementalDecoderTest(unittest.TestCase):
"""
Make sure the StatefulIncrementalDecoder actually works.
"""
test_cases = [
# I=1 fixed-length mode
(b'abcd', False, 'a.b.c.d.'),
# I=0, O=0, variable-length mode
(b'oiabcd', True, 'abcd.'),
# I=0, O=0, variable-length mode, should ignore extra periods
(b'oi...abcd...', True, 'abcd.'),
# I=0, O=6
(b'i.o6.xyz.', False, 'xyz---.'),
# I=2, O=6
(b'i.i2.o6xyz', True, 'xy----.z-----.'),
# I=0, O=3
(b'i.o3.x.xyz.toolong.', False, 'x--.xyz.too.'),
# I=6, O=3
(b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.')
]
def testDecoder(self):
# Try a few one-shot test cases.
for input, eof, output in self.test_cases:
d = StatefulIncrementalDecoder()
self.assertEquals(d.decode(input, eof), output)
# Also test an unfinished decode, followed by forcing EOF.
d = StatefulIncrementalDecoder()
self.assertEquals(d.decode(b'oiabcd'), '')
self.assertEquals(d.decode(b'', 1), 'abcd.')
class TextIOWrapperTest(unittest.TestCase):
......@@ -765,6 +882,60 @@ class TextIOWrapperTest(unittest.TestCase):
f.readline()
f.tell()
def testSeekAndTell(self):
"""Test seek/tell using the StatefulIncrementalDecoder."""
def lookupTestDecoder(name):
if self.codecEnabled and name == 'test_decoder':
return codecs.CodecInfo(
name='test_decoder', encode=None, decode=None,
incrementalencoder=None,
streamreader=None, streamwriter=None,
incrementaldecoder=StatefulIncrementalDecoder)
def testSeekAndTellWithData(data, min_pos=0):
"""Tell/seek to various points within a data stream and ensure
that the decoded data returned by read() is consistent."""
f = io.open(test_support.TESTFN, 'wb')
f.write(data)
f.close()
f = io.open(test_support.TESTFN, encoding='test_decoder')
decoded = f.read()
f.close()
for i in range(min_pos, len(decoded) + 1): # seek positions
for j in [1, 5, len(decoded) - i]: # read lengths
f = io.open(test_support.TESTFN, encoding='test_decoder')
self.assertEquals(f.read(i), decoded[:i])
cookie = f.tell()
self.assertEquals(f.read(j), decoded[i:i + j])
f.seek(cookie)
self.assertEquals(f.read(), decoded[i:])
f.close()
# Register a special incremental decoder for testing.
codecs.register(lookupTestDecoder)
self.codecEnabled = 1
# Run the tests.
try:
# Try each test case.
for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
testSeekAndTellWithData(input)
# Position each test case so that it crosses a chunk boundary.
CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
offset = CHUNK_SIZE - len(input)//2
prefix = b'.'*offset
# Don't bother seeking into the prefix (takes too long).
min_pos = offset*2
testSeekAndTellWithData(prefix + input, min_pos)
# Ensure our test decoder won't interfere with subsequent tests.
finally:
self.codecEnabled = 0
def testEncodedWrites(self):
data = "1234567890"
tests = ("utf-16",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment