Commit e4673df6 authored by Guido van Rossum's avatar Guido van Rossum

Implement long positioning (Unix only, probably).

Etc., etc.
parent 8292c734
"""New I/O library.
"""New I/O library conforming to PEP 3116.
This is an early prototype; eventually some of this will be
reimplemented in C and the rest may be turned into a package.
See PEP 3116.
Conformance of alternative implementations: all arguments are intended
to be positional-only except the arguments of the open() function.
Argument names except those of the open() function are not part of the
specification. Instance variables and methods whose name starts with
a leading underscore are not part of the specification (except "magic"
names like __iter__). Only the top-level names listed in the __all__
variable are part of the specification.
XXX need to default buffer size to 1 if isatty()
XXX need to support 1 meaning line-buffered
......@@ -142,6 +148,9 @@ class IOBase:
This does not define read(), readinto() and write(), nor
readline() and friends, since their signatures vary per layer.
Not that calling any method (even inquiries) on a closed file is
undefined. Implementations may raise IOError in this case.
"""
### Internal ###
......@@ -153,19 +162,20 @@ class IOBase:
### Positioning ###
def seek(self, pos: int, whence: int = 0) -> None:
"""seek(pos: int, whence: int = 0) -> None. Change stream position.
def seek(self, pos: int, whence: int = 0) -> int:
"""seek(pos: int, whence: int = 0) -> int. Change stream position.
Seek to byte offset pos relative to position indicated by whence:
0 Start of stream (the default). pos should be >= 0;
1 Current position - whence may be negative;
2 End of stream - whence usually negative.
Returns the new absolute position.
"""
self._unsupported("seek")
def tell(self) -> int:
"""tell() -> int. Return current stream position."""
self._unsupported("tell")
return self.seek(0, 1)
def truncate(self, pos: int = None) -> None:
"""truncate(size: int = None) -> None. Truncate file to size bytes.
......@@ -432,7 +442,7 @@ class _BufferedIOMixin(BufferedIOBase):
### Positioning ###
def seek(self, pos, whence=0):
self.raw.seek(pos, whence)
return self.raw.seek(pos, whence)
def tell(self):
return self.raw.tell()
......@@ -515,6 +525,7 @@ class _MemoryIOMixin(BufferedIOBase):
self._pos = max(0, len(self._buffer) + pos)
else:
raise IOError("invalid whence value")
return self._pos
def tell(self):
return self._pos
......@@ -620,8 +631,9 @@ class BufferedReader(_BufferedIOMixin):
def seek(self, pos, whence=0):
if whence == 1:
pos -= len(self._read_buf)
self.raw.seek(pos, whence)
pos = self.raw.seek(pos, whence)
self._read_buf = b""
return pos
class BufferedWriter(_BufferedIOMixin):
......@@ -679,7 +691,7 @@ class BufferedWriter(_BufferedIOMixin):
def seek(self, pos, whence=0):
self.flush()
self.raw.seek(pos, whence)
return self.raw.seek(pos, whence)
class BufferedRWPair(BufferedIOBase):
......@@ -750,13 +762,9 @@ class BufferedRandom(BufferedWriter, BufferedReader):
self.flush()
# First do the raw seek, then empty the read buffer, so that
# if the raw seek fails, we don't lose buffered data forever.
self.raw.seek(pos, whence)
pos = self.raw.seek(pos, whence)
self._read_buf = b""
# XXX I suppose we could implement some magic here to move through the
# existing read buffer in the case of seek(<some small +ve number>, 1)
# XXX OTOH it might be good to *guarantee* that the buffer is
# empty after a seek or flush; for small relative forward
# seeks one might as well use small reads instead.
return pos
def tell(self):
if (self._write_buf):
......
......@@ -4,10 +4,10 @@ import unittest
from itertools import chain
from test import test_support
import io
import io # The module under test
class MockIO(io.RawIOBase):
class MockRawIO(io.RawIOBase):
def __init__(self, read_stack=()):
self._read_stack = list(read_stack)
......@@ -56,13 +56,13 @@ class MockFileIO(io.BytesIO):
class MockNonBlockWriterIO(io.RawIOBase):
def __init__(self, blockingScript):
self.bs = list(blockingScript)
def __init__(self, blocking_script):
self._blocking_script = list(blocking_script)
self._write_stack = []
def write(self, b):
self._write_stack.append(b[:])
n = self.bs.pop(0)
n = self._blocking_script.pop(0)
if (n < 0):
raise io.BlockingIOError(0, "test blocking", -n)
else:
......@@ -90,6 +90,23 @@ class IOTest(unittest.TestCase):
f.seek(-2, 2)
f.truncate()
def large_file_ops(self, f):
assert f.readable()
assert f.writable()
self.assertEqual(f.seek(2**32), 2**32)
self.assertEqual(f.tell(), 2**32)
self.assertEqual(f.write(b"xxx"), 3)
self.assertEqual(f.tell(), 2**32 + 3)
self.assertEqual(f.seek(-1, 1), 2**32 + 2)
f.truncate()
self.assertEqual(f.tell(), 2**32 + 2)
self.assertEqual(f.seek(0, 2), 2**32 + 2)
f.truncate(2**32 + 1)
self.assertEqual(f.tell(), 2**32 + 1)
self.assertEqual(f.seek(0, 2), 2**32 + 1)
self.assertEqual(f.seek(-1, 2), 2**32)
self.assertEqual(f.read(2), b"x")
def read_ops(self, f):
data = f.read(5)
self.assertEqual(data, b"hello")
......@@ -130,19 +147,9 @@ class IOTest(unittest.TestCase):
f = io.BytesIO(data)
self.read_ops(f)
def test_fileio_FileIO(self):
import _fileio
f = _fileio._FileIO(test_support.TESTFN, "w")
self.assertEqual(f.readable(), False)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
self.write_ops(f)
f.close()
f = _fileio._FileIO(test_support.TESTFN, "r")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), False)
self.assertEqual(f.seekable(), True)
self.read_ops(f)
def test_large_file_ops(self):
f = io.open(test_support.TESTFN, "w+b", buffering=0)
self.large_file_ops(f)
f.close()
......@@ -205,7 +212,7 @@ class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
class BufferedReaderTest(unittest.TestCase):
def testRead(self):
rawio = MockIO((b"abc", b"d", b"efg"))
rawio = MockRawIO((b"abc", b"d", b"efg"))
bufio = io.BufferedReader(rawio)
self.assertEquals(b"abcdef", bufio.read(6))
......@@ -231,7 +238,7 @@ class BufferedReaderTest(unittest.TestCase):
def testReadNonBlocking(self):
# Inject some None's in there to simulate EWOULDBLOCK
rawio = MockIO((b"abc", b"d", None, b"efg", None, None))
rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
bufio = io.BufferedReader(rawio)
self.assertEquals(b"abcd", bufio.read(6))
......@@ -241,19 +248,19 @@ class BufferedReaderTest(unittest.TestCase):
self.assertEquals(b"", bufio.read())
def testReadToEof(self):
rawio = MockIO((b"abc", b"d", b"efg"))
rawio = MockRawIO((b"abc", b"d", b"efg"))
bufio = io.BufferedReader(rawio)
self.assertEquals(b"abcdefg", bufio.read(9000))
def testReadNoArgs(self):
rawio = MockIO((b"abc", b"d", b"efg"))
rawio = MockRawIO((b"abc", b"d", b"efg"))
bufio = io.BufferedReader(rawio)
self.assertEquals(b"abcdefg", bufio.read())
def testFileno(self):
rawio = MockIO((b"abc", b"d", b"efg"))
rawio = MockRawIO((b"abc", b"d", b"efg"))
bufio = io.BufferedReader(rawio)
self.assertEquals(42, bufio.fileno())
......@@ -268,7 +275,7 @@ class BufferedWriterTest(unittest.TestCase):
def testWrite(self):
# Write to the buffered IO but don't overflow the buffer.
writer = MockIO()
writer = MockRawIO()
bufio = io.BufferedWriter(writer, 8)
bufio.write(b"abc")
......@@ -276,7 +283,7 @@ class BufferedWriterTest(unittest.TestCase):
self.assertFalse(writer._write_stack)
def testWriteOverflow(self):
writer = MockIO()
writer = MockRawIO()
bufio = io.BufferedWriter(writer, 8)
bufio.write(b"abc")
......@@ -305,13 +312,13 @@ class BufferedWriterTest(unittest.TestCase):
# later.
def testFileno(self):
rawio = MockIO((b"abc", b"d", b"efg"))
rawio = MockRawIO((b"abc", b"d", b"efg"))
bufio = io.BufferedWriter(rawio)
self.assertEquals(42, bufio.fileno())
def testFlush(self):
writer = MockIO()
writer = MockRawIO()
bufio = io.BufferedWriter(writer, 8)
bufio.write(b"abc")
......@@ -323,8 +330,8 @@ class BufferedWriterTest(unittest.TestCase):
class BufferedRWPairTest(unittest.TestCase):
def testRWPair(self):
r = MockIO(())
w = MockIO()
r = MockRawIO(())
w = MockRawIO()
pair = io.BufferedRWPair(r, w)
# XXX need implementation
......@@ -333,7 +340,7 @@ class BufferedRWPairTest(unittest.TestCase):
class BufferedRandomTest(unittest.TestCase):
def testReadAndWrite(self):
raw = MockIO((b"asdf", b"ghjk"))
raw = MockRawIO((b"asdf", b"ghjk"))
rw = io.BufferedRandom(raw, 8, 12)
self.assertEqual(b"as", rw.read(2))
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment