Commit 38d7c424 authored by Antoine Pitrou's avatar Antoine Pitrou

Merged revisions 65686 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r65686 | antoine.pitrou | 2008-08-14 23:04:30 +0200 (jeu., 14 août 2008) | 3 lines

  Issue #3476: make BufferedReader and BufferedWriter thread-safe
........
parent ac984458
...@@ -61,6 +61,7 @@ import sys ...@@ -61,6 +61,7 @@ import sys
import codecs import codecs
import _fileio import _fileio
import warnings import warnings
import threading
# open() uses st_blksize whenever we can # open() uses st_blksize whenever we can
DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes
...@@ -895,6 +896,7 @@ class BufferedReader(_BufferedIOMixin): ...@@ -895,6 +896,7 @@ class BufferedReader(_BufferedIOMixin):
_BufferedIOMixin.__init__(self, raw) _BufferedIOMixin.__init__(self, raw)
self.buffer_size = buffer_size self.buffer_size = buffer_size
self._reset_read_buf() self._reset_read_buf()
self._read_lock = threading.Lock()
def _reset_read_buf(self): def _reset_read_buf(self):
self._read_buf = b"" self._read_buf = b""
...@@ -908,6 +910,10 @@ class BufferedReader(_BufferedIOMixin): ...@@ -908,6 +910,10 @@ class BufferedReader(_BufferedIOMixin):
mode. If n is negative, read until EOF or until read() would mode. If n is negative, read until EOF or until read() would
block. block.
""" """
with self._read_lock:
return self._read_unlocked(n)
def _read_unlocked(self, n=None):
nodata_val = b"" nodata_val = b""
empty_values = (b"", None) empty_values = (b"", None)
buf = self._read_buf buf = self._read_buf
...@@ -960,6 +966,10 @@ class BufferedReader(_BufferedIOMixin): ...@@ -960,6 +966,10 @@ class BufferedReader(_BufferedIOMixin):
do at most one raw read to satisfy it. We never return more do at most one raw read to satisfy it. We never return more
than self.buffer_size. than self.buffer_size.
""" """
with self._read_lock:
return self._peek_unlocked(n)
def _peek_unlocked(self, n=0):
want = min(n, self.buffer_size) want = min(n, self.buffer_size)
have = len(self._read_buf) - self._read_pos have = len(self._read_buf) - self._read_pos
if have < want: if have < want:
...@@ -976,13 +986,16 @@ class BufferedReader(_BufferedIOMixin): ...@@ -976,13 +986,16 @@ class BufferedReader(_BufferedIOMixin):
# only return buffered bytes. Otherwise, we do one raw read. # only return buffered bytes. Otherwise, we do one raw read.
if n <= 0: if n <= 0:
return b"" return b""
self.peek(1) with self._read_lock:
return self.read(min(n, len(self._read_buf) - self._read_pos)) self._peek_unlocked(1)
return self._read_unlocked(
min(n, len(self._read_buf) - self._read_pos))
def tell(self): def tell(self):
return self.raw.tell() - len(self._read_buf) + self._read_pos return self.raw.tell() - len(self._read_buf) + self._read_pos
def seek(self, pos, whence=0): def seek(self, pos, whence=0):
with self._read_lock:
if whence == 1: if whence == 1:
pos -= len(self._read_buf) - self._read_pos pos -= len(self._read_buf) - self._read_pos
pos = self.raw.seek(pos, whence) pos = self.raw.seek(pos, whence)
...@@ -1009,17 +1022,20 @@ class BufferedWriter(_BufferedIOMixin): ...@@ -1009,17 +1022,20 @@ class BufferedWriter(_BufferedIOMixin):
if max_buffer_size is None if max_buffer_size is None
else max_buffer_size) else max_buffer_size)
self._write_buf = bytearray() self._write_buf = bytearray()
self._write_lock = threading.Lock()
def write(self, b): def write(self, b):
if self.closed: if self.closed:
raise ValueError("write to closed file") raise ValueError("write to closed file")
if isinstance(b, str): if isinstance(b, str):
raise TypeError("can't write str to binary stream") raise TypeError("can't write str to binary stream")
# XXX we can implement some more tricks to try and avoid partial writes with self._write_lock:
# XXX we can implement some more tricks to try and avoid
# partial writes
if len(self._write_buf) > self.buffer_size: if len(self._write_buf) > self.buffer_size:
# We're full, so let's pre-flush the buffer # We're full, so let's pre-flush the buffer
try: try:
self.flush() self._flush_unlocked()
except BlockingIOError as e: except BlockingIOError as e:
# We can't accept anything else. # We can't accept anything else.
# XXX Why not just let the exception pass through? # XXX Why not just let the exception pass through?
...@@ -1029,23 +1045,28 @@ class BufferedWriter(_BufferedIOMixin): ...@@ -1029,23 +1045,28 @@ class BufferedWriter(_BufferedIOMixin):
written = len(self._write_buf) - before written = len(self._write_buf) - before
if len(self._write_buf) > self.buffer_size: if len(self._write_buf) > self.buffer_size:
try: try:
self.flush() self._flush_unlocked()
except BlockingIOError as e: except BlockingIOError as e:
if (len(self._write_buf) > self.max_buffer_size): if len(self._write_buf) > self.max_buffer_size:
# We've hit max_buffer_size. We have to accept a partial # We've hit max_buffer_size. We have to accept a
# write and cut back our buffer. # partial write and cut back our buffer.
overage = len(self._write_buf) - self.max_buffer_size overage = len(self._write_buf) - self.max_buffer_size
self._write_buf = self._write_buf[:self.max_buffer_size] self._write_buf = self._write_buf[:self.max_buffer_size]
raise BlockingIOError(e.errno, e.strerror, overage) raise BlockingIOError(e.errno, e.strerror, overage)
return written return written
def truncate(self, pos=None): def truncate(self, pos=None):
self.flush() with self._write_lock:
self._flush_unlocked()
if pos is None: if pos is None:
pos = self.raw.tell() pos = self.raw.tell()
return self.raw.truncate(pos) return self.raw.truncate(pos)
def flush(self): def flush(self):
with self._write_lock:
self._flush_unlocked()
def _flush_unlocked(self):
if self.closed: if self.closed:
raise ValueError("flush of closed file") raise ValueError("flush of closed file")
written = 0 written = 0
...@@ -1064,7 +1085,8 @@ class BufferedWriter(_BufferedIOMixin): ...@@ -1064,7 +1085,8 @@ class BufferedWriter(_BufferedIOMixin):
return self.raw.tell() + len(self._write_buf) return self.raw.tell() + len(self._write_buf)
def seek(self, pos, whence=0): def seek(self, pos, whence=0):
self.flush() with self._write_lock:
self._flush_unlocked()
return self.raw.seek(pos, whence) return self.raw.seek(pos, whence)
...@@ -1155,6 +1177,7 @@ class BufferedRandom(BufferedWriter, BufferedReader): ...@@ -1155,6 +1177,7 @@ class BufferedRandom(BufferedWriter, BufferedReader):
# First do the raw seek, then empty the read buffer, so that # First do the raw seek, then empty the read buffer, so that
# if the raw seek fails, we don't lose buffered data forever. # if the raw seek fails, we don't lose buffered data forever.
pos = self.raw.seek(pos, whence) pos = self.raw.seek(pos, whence)
with self._read_lock:
self._reset_read_buf() self._reset_read_buf()
return pos return pos
...@@ -1192,6 +1215,7 @@ class BufferedRandom(BufferedWriter, BufferedReader): ...@@ -1192,6 +1215,7 @@ class BufferedRandom(BufferedWriter, BufferedReader):
def write(self, b): def write(self, b):
if self._read_buf: if self._read_buf:
# Undo readahead # Undo readahead
with self._read_lock:
self.raw.seek(self._read_pos - len(self._read_buf), 1) self.raw.seek(self._read_pos - len(self._read_buf), 1)
self._reset_read_buf() self._reset_read_buf()
return BufferedWriter.write(self, b) return BufferedWriter.write(self, b)
......
...@@ -3,11 +3,15 @@ ...@@ -3,11 +3,15 @@
# See test_cmd_line_script.py for testing of script execution # See test_cmd_line_script.py for testing of script execution
import test.support, unittest import test.support, unittest
import os
import sys import sys
import subprocess import subprocess
def _spawn_python(*args): def _spawn_python(*args):
cmd_line = [sys.executable, '-E'] cmd_line = [sys.executable]
# When testing -S, we need PYTHONPATH to work (see test_site_flag())
if '-S' not in args:
cmd_line.append('-E')
cmd_line.extend(args) cmd_line.extend(args)
return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
...@@ -59,6 +63,16 @@ class CmdLineTest(unittest.TestCase): ...@@ -59,6 +63,16 @@ class CmdLineTest(unittest.TestCase):
self.verify_valid_flag('-Qwarnall') self.verify_valid_flag('-Qwarnall')
def test_site_flag(self): def test_site_flag(self):
if os.name == 'posix':
# Workaround bug #586680 by adding the extension dir to PYTHONPATH
from distutils.util import get_platform
s = "./build/lib.%s-%.3s" % (get_platform(), sys.version)
if hasattr(sys, 'gettotalrefcount'):
s += '-pydebug'
p = os.environ.get('PYTHONPATH', '')
if p:
p += ':'
os.environ['PYTHONPATH'] = p + s
self.verify_valid_flag('-S') self.verify_valid_flag('-S')
def test_usage(self): def test_usage(self):
......
...@@ -4,8 +4,10 @@ import os ...@@ -4,8 +4,10 @@ import os
import sys import sys
import time import time
import array import array
import threading
import random
import unittest import unittest
from itertools import chain from itertools import chain, cycle
from test import support from test import support
import codecs import codecs
...@@ -390,6 +392,49 @@ class BufferedReaderTest(unittest.TestCase): ...@@ -390,6 +392,49 @@ class BufferedReaderTest(unittest.TestCase):
# this test. Else, write it. # this test. Else, write it.
pass pass
def testThreads(self):
try:
# Write out many bytes with exactly the same number of 0's,
# 1's... 255's. This will help us check that concurrent reading
# doesn't duplicate or forget contents.
N = 1000
l = list(range(256)) * N
random.shuffle(l)
s = bytes(bytearray(l))
with io.open(support.TESTFN, "wb") as f:
f.write(s)
with io.open(support.TESTFN, "rb", buffering=0) as raw:
bufio = io.BufferedReader(raw, 8)
errors = []
results = []
def f():
try:
# Intra-buffer read then buffer-flushing read
for n in cycle([1, 19]):
s = bufio.read(n)
if not s:
break
# list.append() is atomic
results.append(s)
except Exception as e:
errors.append(e)
raise
threads = [threading.Thread(target=f) for x in range(20)]
for t in threads:
t.start()
time.sleep(0.02) # yield
for t in threads:
t.join()
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
s = b''.join(results)
for i in range(256):
c = bytes(bytearray([i]))
self.assertEqual(s.count(c), N)
finally:
support.unlink(support.TESTFN)
class BufferedWriterTest(unittest.TestCase): class BufferedWriterTest(unittest.TestCase):
...@@ -446,6 +491,38 @@ class BufferedWriterTest(unittest.TestCase): ...@@ -446,6 +491,38 @@ class BufferedWriterTest(unittest.TestCase):
self.assertEquals(b"abc", writer._write_stack[0]) self.assertEquals(b"abc", writer._write_stack[0])
def testThreads(self):
# BufferedWriter should not raise exceptions or crash
# when called from multiple threads.
try:
# We use a real file object because it allows us to
# exercise situations where the GIL is released before
# writing the buffer to the raw streams. This is in addition
# to concurrency issues due to switching threads in the middle
# of Python code.
with io.open(support.TESTFN, "wb", buffering=0) as raw:
bufio = io.BufferedWriter(raw, 8)
errors = []
def f():
try:
# Write enough bytes to flush the buffer
s = b"a" * 19
for i in range(50):
bufio.write(s)
except Exception as e:
errors.append(e)
raise
threads = [threading.Thread(target=f) for x in range(20)]
for t in threads:
t.start()
time.sleep(0.02) # yield
for t in threads:
t.join()
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
finally:
support.unlink(support.TESTFN)
class BufferedRWPairTest(unittest.TestCase): class BufferedRWPairTest(unittest.TestCase):
......
...@@ -30,6 +30,9 @@ Core and Builtins ...@@ -30,6 +30,9 @@ Core and Builtins
Library Library
------- -------
- Issue #3476: binary buffered reading through the new "io" library is now
thread-safe.
- Issue #1342811: Fix leak in Tkinter.Menu.delete. Commands associated to - Issue #1342811: Fix leak in Tkinter.Menu.delete. Commands associated to
menu entries were not deleted. menu entries were not deleted.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment