Commit 17596665 authored by Guido van Rossum's avatar Guido van Rossum

New I/O code from Tony Lownds implement newline feature correctly,

and implements .newlines attribute in a 2.x-compatible fashion.
parent 384283f1
...@@ -61,10 +61,26 @@ def open(file, mode="r", buffering=None, encoding=None, newline=None): ...@@ -61,10 +61,26 @@ def open(file, mode="r", buffering=None, encoding=None, newline=None):
can be: 0 = unbuffered, 1 = line buffered, can be: 0 = unbuffered, 1 = line buffered,
larger = fully buffered. larger = fully buffered.
encoding: optional string giving the text encoding. encoding: optional string giving the text encoding.
newline: optional newlines specifier; must be None, '\n' or '\r\n'; newline: optional newlines specifier; must be None, '', '\n', '\r'
specifies the line ending expected on input and written on or '\r\n'; all other values are illegal. It controls the
output. If None, use universal newlines on input and handling of line endings. It works as follows:
use os.linesep on output.
* On input, if `newline` is `None`, universal newlines
mode is enabled. Lines in the input can end in `'\n'`,
`'\r'`, or `'\r\n'`, and these are translated into
`'\n'` before being returned to the caller. If it is
`''`, universal newline mode is enabled, but line endings
are returned to the caller untranslated. If it has any of
the other legal values, input lines are only terminated by
the given string, and the line ending is returned to the
caller untranslated.
* On output, if `newline` is `None`, any `'\n'`
characters written are translated to the system default
line separator, `os.linesep`. If `newline` is `''`,
no translation takes place. If `newline` is any of the
other legal values, any `'\n'` characters written are
translated to the given string.
(*) If a file descriptor is given, it is closed when the returned (*) If a file descriptor is given, it is closed when the returned
I/O object is closed. If you don't want this to happen, use I/O object is closed. If you don't want this to happen, use
...@@ -958,6 +974,17 @@ class TextIOBase(IOBase): ...@@ -958,6 +974,17 @@ class TextIOBase(IOBase):
"""Subclasses should override.""" """Subclasses should override."""
return None return None
@property
def newlines(self):
"""newlines -> None | str | tuple of str. Line endings translated
so far.
Only line endings translated during reading are considered.
Subclasses should override.
"""
return None
class TextIOWrapper(TextIOBase): class TextIOWrapper(TextIOBase):
...@@ -969,7 +996,7 @@ class TextIOWrapper(TextIOBase): ...@@ -969,7 +996,7 @@ class TextIOWrapper(TextIOBase):
_CHUNK_SIZE = 128 _CHUNK_SIZE = 128
def __init__(self, buffer, encoding=None, newline=None): def __init__(self, buffer, encoding=None, newline=None):
if newline not in (None, "\n", "\r\n"): if newline not in (None, "", "\n", "\r", "\r\n"):
raise ValueError("illegal newline value: %r" % (newline,)) raise ValueError("illegal newline value: %r" % (newline,))
if encoding is None: if encoding is None:
try: try:
...@@ -987,8 +1014,12 @@ class TextIOWrapper(TextIOBase): ...@@ -987,8 +1014,12 @@ class TextIOWrapper(TextIOBase):
self.buffer = buffer self.buffer = buffer
self._encoding = encoding self._encoding = encoding
self._newline = newline or os.linesep self._readuniversal = not newline
self._fix_newlines = newline is None self._readtranslate = newline is None
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
self._seennl = 0
self._decoder = None self._decoder = None
self._pending = "" self._pending = ""
self._snapshot = None self._snapshot = None
...@@ -1032,13 +1063,15 @@ class TextIOWrapper(TextIOBase): ...@@ -1032,13 +1063,15 @@ class TextIOWrapper(TextIOBase):
def write(self, s: str): def write(self, s: str):
if self.closed: if self.closed:
raise ValueError("write to closed file") raise ValueError("write to closed file")
haslf = "\n" in s
if haslf and self._writetranslate and self._writenl != "\n":
s = s.replace("\n", self._writenl)
# XXX What if we were just reading? # XXX What if we were just reading?
b = s.encode(self._encoding) b = s.encode(self._encoding)
if isinstance(b, str): if isinstance(b, str):
b = bytes(b) b = bytes(b)
n = self.buffer.write(b) self.buffer.write(b)
if "\n" in s: if haslf and self.isatty():
# XXX only if isatty
self.flush() self.flush()
self._snapshot = self._decoder = None self._snapshot = self._decoder = None
return len(s) return len(s)
...@@ -1159,7 +1192,7 @@ class TextIOWrapper(TextIOBase): ...@@ -1159,7 +1192,7 @@ class TextIOWrapper(TextIOBase):
res += decoder.decode(self.buffer.read(), True) res += decoder.decode(self.buffer.read(), True)
self._pending = "" self._pending = ""
self._snapshot = None self._snapshot = None
return res.replace("\r\n", "\n") return self._replacenl(res)
else: else:
while len(res) < n: while len(res) < n:
readahead, pending = self._read_chunk() readahead, pending = self._read_chunk()
...@@ -1167,7 +1200,7 @@ class TextIOWrapper(TextIOBase): ...@@ -1167,7 +1200,7 @@ class TextIOWrapper(TextIOBase):
if not readahead: if not readahead:
break break
self._pending = res[n:] self._pending = res[n:]
return res[:n].replace("\r\n", "\n") return self._replacenl(res[:n])
def __next__(self): def __next__(self):
self._telling = False self._telling = False
...@@ -1189,59 +1222,136 @@ class TextIOWrapper(TextIOBase): ...@@ -1189,59 +1222,136 @@ class TextIOWrapper(TextIOBase):
line = self._pending line = self._pending
start = 0 start = 0
cr_eof = False
decoder = self._decoder or self._get_decoder() decoder = self._decoder or self._get_decoder()
pos = endpos = None
ending = None
while True: while True:
if self._readuniversal:
# Universal newline search. Find any of \r, \r\n, \n
# In C we'd look for these in parallel of course. # In C we'd look for these in parallel of course.
nlpos = line.find("\n", start) nlpos = line.find("\n", start)
crpos = line.find("\r", start) crpos = line.find("\r", start)
if nlpos >= 0 and crpos >= 0: if crpos == -1:
endpos = min(nlpos, crpos) if nlpos == -1:
start = len(line)
else: else:
endpos = nlpos if nlpos >= 0 else crpos # Found \n
pos = nlpos
if endpos != -1: endpos = pos + 1
endc = line[endpos] ending = self._LF
if endc == "\n":
ending = "\n"
break break
elif nlpos == -1:
# We've seen \r - is it standalone, \r\n or \r at end of line? if crpos == len(line) - 1:
if endpos + 1 < len(line): # Found \r at end of buffer, must keep reading
if line[endpos+1] == "\n": start = crpos
ending = "\r\n" cr_eof = True
else: else:
ending = "\r" # Found lone \r
ending = self._CR
pos = crpos
endpos = pos + 1
break
elif nlpos < crpos:
# Found \n
pos = nlpos
endpos = pos + 1
ending = self._LF
break
elif nlpos == crpos + 1:
# Found \r\n
ending = self._CRLF
pos = crpos
endpos = pos + 2
break break
# There might be a following \n in the next block of data ...
start = endpos
else: else:
start = len(line) # Found \r
pos = crpos
endpos = pos + 1
ending = self._CR
break
else:
# non-universal
pos = line.find(self._readnl)
if pos >= 0:
endpos = pos+len(self._readnl)
ending = self._nlflag(self._readnl)
break
# No line ending seen yet - get more data # No line ending seen yet - get more data
more_line = ''
while True: while True:
readahead, pending = self._read_chunk() readahead, pending = self._read_chunk()
more_line = pending more_line = pending
if more_line or not readahead: if more_line or not readahead:
break break
if more_line:
if not more_line:
ending = ""
endpos = len(line)
break
line += more_line line += more_line
nextpos = endpos + len(ending)
self._pending = line[nextpos:]
# XXX Update self.newlines here if we want to support that
if self._fix_newlines and ending not in ("\n", ""):
return line[:endpos] + "\n"
else: else:
return line[:nextpos] # end of file
self._pending = ''
self._snapshot = None
if cr_eof:
self._seennl |= self._CR
return line[:-1] + '\n'
else:
return line
self._pending = line[endpos:]
if self._readtranslate:
self._seennl |= ending
if ending != self._LF:
return line[:pos] + '\n'
else:
return line[:endpos]
else:
return line[:endpos]
def _replacenl(self, data):
# Replace newlines in data as needed and record that they have
# been seen.
if not self._readtranslate:
return data
if self._readuniversal:
crlf = data.count('\r\n')
cr = data.count('\r') - crlf
lf = data.count('\n') - crlf
self._seennl |= (lf and self._LF) | (cr and self._CR) \
| (crlf and self._CRLF)
if crlf:
data = data.replace("\r\n", "\n")
if cr:
data = data.replace("\r", "\n")
elif self._readnl == '\n':
# Only need to detect if \n was seen.
if data.count('\n'):
self._seennl |= self._LF
else:
newdata = data.replace(self._readnl, '\n')
if newdata is not data:
self._seennl |= self._nlflag(self._readnl)
data = newdata
return data
_LF = 1
_CR = 2
_CRLF = 4
@property
def newlines(self):
return (None,
"\n",
"\r",
("\r", "\n"),
"\r\n",
("\n", "\r\n"),
("\r", "\r\n"),
("\r", "\n", "\r\n")
)[self._seennl]
def _nlflag(self, nlstr):
return [None, "\n", "\r", None, "\r\n"].index(nlstr)
class StringIO(TextIOWrapper): class StringIO(TextIOWrapper):
......
"""Unit tests for io.py.""" """Unit tests for io.py."""
import os
import sys import sys
import time import time
import array import array
...@@ -481,30 +482,61 @@ class TextIOWrapperTest(unittest.TestCase): ...@@ -481,30 +482,61 @@ class TextIOWrapperTest(unittest.TestCase):
def tearDown(self): def tearDown(self):
test_support.unlink(test_support.TESTFN) test_support.unlink(test_support.TESTFN)
def testNewlinesInput(self):
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
(None, normalized.decode("ASCII").splitlines(True)),
("", testdata.decode("ASCII").splitlines(True)),
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
]:
buf = io.BytesIO(testdata)
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
self.assertEquals(txt.readlines(), expected)
txt.seek(0)
self.assertEquals(txt.read(), "".join(expected))
def testNewlinesOutput(self):
testdict = {
"": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
"\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
"\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
"\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
}
tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
for newline, expected in tests:
buf = io.BytesIO()
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
txt.write("AAA\nB")
txt.write("BB\nCCC\n")
txt.write("X\rY\r\nZ")
txt.flush()
self.assertEquals(buf.getvalue(), expected)
def testNewlines(self): def testNewlines(self):
input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
tests = [ tests = [
[ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
[ '\n', input_lines ], [ '', input_lines ],
[ '\r\n', input_lines ], [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
[ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
[ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
] ]
encodings = ('utf-8', 'latin-1') encodings = ('utf-8', 'latin-1')
# Try a range of pad sizes to test the case where \r is the last # Try a range of buffer sizes to test the case where \r is the last
# character in TextIOWrapper._pending_line. # character in TextIOWrapper._pending_line.
for encoding in encodings: for encoding in encodings:
for do_reads in (False, True):
for padlen in chain(range(10), range(50, 60)):
pad = '.' * padlen
data_lines = [ pad + line for line in input_lines ]
# XXX: str.encode() should return bytes # XXX: str.encode() should return bytes
data = bytes(''.join(data_lines).encode(encoding)) data = bytes(''.join(input_lines).encode(encoding))
for do_reads in (False, True):
for newline, exp_line_ends in tests: for bufsize in range(1, 10):
exp_lines = [ pad + line for line in exp_line_ends ] for newline, exp_lines in tests:
bufio = io.BufferedReader(io.BytesIO(data)) bufio = io.BufferedReader(io.BytesIO(data), bufsize)
textio = io.TextIOWrapper(bufio, newline=newline, textio = io.TextIOWrapper(bufio, newline=newline,
encoding=encoding) encoding=encoding)
if do_reads: if do_reads:
...@@ -522,6 +554,47 @@ class TextIOWrapperTest(unittest.TestCase): ...@@ -522,6 +554,47 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEquals(got_line, exp_line) self.assertEquals(got_line, exp_line)
self.assertEquals(len(got_lines), len(exp_lines)) self.assertEquals(len(got_lines), len(exp_lines))
def testNewlinesInput(self):
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
(None, normalized.decode("ASCII").splitlines(True)),
("", testdata.decode("ASCII").splitlines(True)),
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
]:
buf = io.BytesIO(testdata)
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
self.assertEquals(txt.readlines(), expected)
txt.seek(0)
self.assertEquals(txt.read(), "".join(expected))
def testNewlinesOutput(self):
import os
orig_linesep = os.linesep
data = "AAA\nBBB\rCCC\n"
data_lf = b"AAA\nBBB\rCCC\n"
data_cr = b"AAA\rBBB\rCCC\r"
data_crlf = b"AAA\r\nBBB\rCCC\r\n"
for os.linesep, newline, expected in [
("\n", None, data_lf),
("\r\n", None, data_crlf),
("\n", "", data_lf),
("\r\n", "", data_lf),
("\n", "\n", data_lf),
("\r\n", "\n", data_lf),
("\n", "\r", data_cr),
("\r\n", "\r", data_cr),
("\n", "\r\n", data_crlf),
("\r\n", "\r\n", data_crlf),
]:
buf = io.BytesIO()
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
txt.write(data)
txt.close()
self.assertEquals(buf.getvalue(), expected)
# Systematic tests of the text I/O API # Systematic tests of the text I/O API
def testBasicIO(self): def testBasicIO(self):
......
...@@ -12,9 +12,8 @@ FATX = 'x' * (2**14) ...@@ -12,9 +12,8 @@ FATX = 'x' * (2**14)
DATA_TEMPLATE = [ DATA_TEMPLATE = [
"line1=1", "line1=1",
"line2='this is a very long line designed to go past the magic " + "line2='this is a very long line designed to go past any default " +
"hundred character limit that is inside fileobject.c and which " + "buffer limits that exist in io.py but we also want to test " +
"is meant to speed up the common case, but we also want to test " +
"the uncommon case, naturally.'", "the uncommon case, naturally.'",
"def line3():pass", "def line3():pass",
"line4 = '%s'" % FATX, "line4 = '%s'" % FATX,
...@@ -32,7 +31,7 @@ DATA_SPLIT = [x + "\n" for x in DATA_TEMPLATE] ...@@ -32,7 +31,7 @@ DATA_SPLIT = [x + "\n" for x in DATA_TEMPLATE]
class TestGenericUnivNewlines(unittest.TestCase): class TestGenericUnivNewlines(unittest.TestCase):
# use a class variable DATA to define the data to write to the file # use a class variable DATA to define the data to write to the file
# and a class variable NEWLINE to set the expected newlines value # and a class variable NEWLINE to set the expected newlines value
READMODE = 'U' READMODE = 'r'
WRITEMODE = 'wb' WRITEMODE = 'wb'
def setUp(self): def setUp(self):
...@@ -79,12 +78,6 @@ class TestGenericUnivNewlines(unittest.TestCase): ...@@ -79,12 +78,6 @@ class TestGenericUnivNewlines(unittest.TestCase):
self.assertEqual(data, DATA_SPLIT[1:]) self.assertEqual(data, DATA_SPLIT[1:])
class TestNativeNewlines(TestGenericUnivNewlines):
NEWLINE = None
DATA = DATA_LF
READMODE = 'r'
WRITEMODE = 'w'
class TestCRNewlines(TestGenericUnivNewlines): class TestCRNewlines(TestGenericUnivNewlines):
NEWLINE = '\r' NEWLINE = '\r'
DATA = DATA_CR DATA = DATA_CR
...@@ -104,7 +97,6 @@ class TestMixedNewlines(TestGenericUnivNewlines): ...@@ -104,7 +97,6 @@ class TestMixedNewlines(TestGenericUnivNewlines):
def test_main(): def test_main():
test_support.run_unittest( test_support.run_unittest(
TestNativeNewlines,
TestCRNewlines, TestCRNewlines,
TestLFNewlines, TestLFNewlines,
TestCRLFNewlines, TestCRLFNewlines,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment