Commit b9c4c3e4 authored by Guido van Rossum's avatar Guido van Rossum

Speed up next() by disabling snapshot updating then.

parent cba608ca
"""New I/O library conforming to PEP 3116.
This is an early prototype; eventually some of this will be
reimplemented in C and the rest may be turned into a package.
This is a prototype; hopefully eventually some of this will be
reimplemented in C.
Conformance of alternative implementations: all arguments are intended
to be positional-only except the arguments of the open() function.
......@@ -11,6 +11,7 @@ a leading underscore are not part of the specification (except "magic"
names like __iter__). Only the top-level names listed in the __all__
variable are part of the specification.
XXX edge cases when switching between reading/writing
XXX need to default buffer size to 1 if isatty()
XXX need to support 1 meaning line-buffered
XXX don't use assert to validate input requirements
......@@ -877,7 +878,7 @@ class TextIOWrapper(TextIOBase):
Character and line based layer over a BufferedIOBase object.
"""
_CHUNK_SIZE = 64
_CHUNK_SIZE = 128
def __init__(self, buffer, encoding=None, newline=None):
if newline not in (None, "\n", "\r\n"):
......@@ -894,7 +895,7 @@ class TextIOWrapper(TextIOBase):
self._decoder_in_rest_pickle = None
self._pending = ""
self._snapshot = None
self._seekable = self.buffer.seekable()
self._seekable = self._telling = self.buffer.seekable()
# A word about _snapshot. This attribute is either None, or a
# tuple (decoder_pickle, readahead, pending) where decoder_pickle
......@@ -908,6 +909,7 @@ class TextIOWrapper(TextIOBase):
def flush(self):
self.buffer.flush()
self._telling = self._seekable
def close(self):
self.flush()
......@@ -945,7 +947,7 @@ class TextIOWrapper(TextIOBase):
def _read_chunk(self):
assert self._decoder is not None
if not self._seekable:
if not self._telling:
readahead = self.buffer.read(self._CHUNK_SIZE)
pending = self._decoder.decode(readahead, not readahead)
return readahead, pending
......@@ -976,6 +978,8 @@ class TextIOWrapper(TextIOBase):
def tell(self):
if not self._seekable:
raise IOError("Underlying stream is not seekable")
if not self._telling:
raise IOError("Telling position disabled by next() call")
self.flush()
position = self.buffer.tell()
if self._decoder is None or self._snapshot is None:
......@@ -1016,6 +1020,7 @@ class TextIOWrapper(TextIOBase):
(whence,))
if pos < 0:
raise ValueError("Negative seek position %r" % (pos,))
self.flush()
orig_pos = pos
ds, pos = self._decode_decoder_state(pos)
if not ds:
......@@ -1050,6 +1055,15 @@ class TextIOWrapper(TextIOBase):
self._pending = res[n:]
return res[:n]
def next(self) -> str:
self._telling = False
line = self.readline()
if not line:
self._snapshot = None
self._telling = self._seekable
raise StopIteration
return line
def readline(self, limit=None):
if limit is not None:
# XXX Hack to support limit argument, for backwards compatibility
......
"""Unit tests for io.py."""
import sys
import time
import unittest
from itertools import chain
from test import test_support
......@@ -549,6 +550,63 @@ class TextIOWrapperTest(unittest.TestCase):
rlines.append((pos, line))
self.assertEquals(rlines, wlines)
def testTelling(self):
f = io.open(test_support.TESTFN, "w+", encoding="utf8")
p0 = f.tell()
f.write(u"\xff\n")
p1 = f.tell()
f.write(u"\xff\n")
p2 = f.tell()
f.seek(0)
self.assertEquals(f.tell(), p0)
self.assertEquals(f.readline(), u"\xff\n")
self.assertEquals(f.tell(), p1)
self.assertEquals(f.readline(), u"\xff\n")
self.assertEquals(f.tell(), p2)
f.seek(0)
for line in f:
self.assertEquals(line, u"\xff\n")
self.assertRaises(IOError, f.tell)
self.assertEquals(f.tell(), p2)
f.close()
def timingTest(self):
timer = time.time
enc = "utf8"
line = u"\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
nlines = 10000
nchars = len(line)
nbytes = len(line.encode(enc))
for chunk_size in (32, 64, 128, 256):
f = io.open(test_support.TESTFN, "w+", encoding=enc)
f._CHUNK_SIZE = chunk_size
t0 = timer()
for i in range(nlines):
f.write(line)
f.flush()
t1 = timer()
f.seek(0)
for line in f:
pass
t2 = timer()
f.seek(0)
while f.readline():
pass
t3 = timer()
f.seek(0)
while f.readline():
f.tell()
t4 = timer()
f.close()
if test_support.verbose:
print("\nTiming test: %d lines of %d characters (%d bytes)" %
(nlines, nchars, nbytes))
print("File chunk size: %6s" % f._CHUNK_SIZE)
print("Writing: %6.3f seconds" % (t1-t0))
print("Reading using iteration: %6.3f seconds" % (t2-t1))
print("Reading using readline(): %6.3f seconds" % (t3-t2))
print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
# XXX Tests for open()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment