Commit 7e2cc29b authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1511 from gevent/issue1510

Fix Popen.communicate() to raise exceptions from reading the streams. 
parents 785b7b55 3db1b2f6
...@@ -96,6 +96,7 @@ install: ...@@ -96,6 +96,7 @@ install:
- echo $BUILD_RUNTIMES/snakepit/$TRAVIS_PYTHON_VERSION.d - echo $BUILD_RUNTIMES/snakepit/$TRAVIS_PYTHON_VERSION.d
- ls -l $BUILD_RUNTIMES/snakepit/$TRAVIS_PYTHON_VERSION.d - ls -l $BUILD_RUNTIMES/snakepit/$TRAVIS_PYTHON_VERSION.d
- python -c 'import gevent; print(gevent.__version__)' - python -c 'import gevent; print(gevent.__version__)'
- python -c 'from gevent._compat import get_clock_info; print(get_clock_info("perf_counter"))'
script: script:
# The generic script for the matrix expansion is to # The generic script for the matrix expansion is to
# test the defaults. # test the defaults.
......
...@@ -39,7 +39,21 @@ Library and Dependency Updates ...@@ -39,7 +39,21 @@ Library and Dependency Updates
with debugging. The event libraries allocate small amounts of memory with debugging. The event libraries allocate small amounts of memory
at startup. The allocation functions have to take the GIL, but at startup. The allocation functions have to take the GIL, but
because of the limited amount of actual allocation that gets done because of the limited amount of actual allocation that gets done
this is not expected to be a concern. this is not expected to be a bottleneck.
Other
-----
- Make `gevent.subprocess.Popen.communicate` raise exceptions raised
by reading from the process, like the standard library. In
particular, under Python 3, if the process output is being decoded
as text, this can now raise ``UnicodeDecodeError``. Reported in
:issue:`1510` by Ofer Koren.
- Make `gevent.subprocess.Popen.communicate` be more careful about
closing files. Previously if a timeout error happened, a second call
to ``communicate`` might not close the pipe.
1.5a3 (2020-01-01) 1.5a3 (2020-01-01)
================== ==================
......
...@@ -188,7 +188,11 @@ except ImportError: ...@@ -188,7 +188,11 @@ except ImportError:
try: try:
# Python 3.3+ (PEP 418) # Python 3.3+ (PEP 418)
from time import perf_counter from time import perf_counter
from time import get_clock_info
from time import monotonic
perf_counter = perf_counter perf_counter = perf_counter
monotonic = monotonic
get_clock_info = get_clock_info
except ImportError: except ImportError:
import time import time
...@@ -196,7 +200,9 @@ except ImportError: ...@@ -196,7 +200,9 @@ except ImportError:
perf_counter = time.clock # pylint:disable=no-member perf_counter = time.clock # pylint:disable=no-member
else: else:
perf_counter = time.time perf_counter = time.time
monotonic = perf_counter
def get_clock_info(_):
return 'Unknown'
## Monitoring ## Monitoring
def get_this_psutil_process(): def get_this_psutil_process():
......
...@@ -371,7 +371,13 @@ class FileObjectBase(object): ...@@ -371,7 +371,13 @@ class FileObjectBase(object):
return getattr(self._io, name) return getattr(self._io, name)
def __repr__(self): def __repr__(self):
return '<%s _fobj=%r%s>' % (self.__class__.__name__, self.io, self._extra_repr()) return '<%s at 0x%x %s_fobj=%r%s>' % (
self.__class__.__name__,
id(self),
'closed' if self.closed else '',
self.io,
self._extra_repr()
)
def _extra_repr(self): def _extra_repr(self):
return '' return ''
......
...@@ -39,7 +39,6 @@ import sys ...@@ -39,7 +39,6 @@ import sys
import traceback import traceback
from gevent.event import AsyncResult from gevent.event import AsyncResult
from gevent.exceptions import ConcurrentObjectUseError
from gevent.hub import _get_hub_noargs as get_hub from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import linkproxy from gevent.hub import linkproxy
from gevent.hub import sleep from gevent.hub import sleep
...@@ -264,6 +263,13 @@ else: ...@@ -264,6 +263,13 @@ else:
fork = monkey.get_original('os', 'fork') fork = monkey.get_original('os', 'fork')
from gevent.os import fork_and_watch from gevent.os import fork_and_watch
try:
BrokenPipeError
except NameError: # Python 2
class BrokenPipeError(Exception):
"Never raised, never caught."
def call(*popenargs, **kwargs): def call(*popenargs, **kwargs):
""" """
call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None) -> returncode call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None) -> returncode
...@@ -437,6 +443,95 @@ def FileObject(*args, **kwargs): ...@@ -437,6 +443,95 @@ def FileObject(*args, **kwargs):
globals()['FileObject'] = _FileObject globals()['FileObject'] = _FileObject
return _FileObject(*args) return _FileObject(*args)
class _CommunicatingGreenlets(object):
# At most, exactly one of these objects may be created
# for a given Popen object. This ensures that only one background
# greenlet at a time will be reading from the file object. This matters because
# if a timeout exception is raised, the user may call back into communicate() to
# get the output (usually after killing the process; see run()). We must not
# lose output in that case (Python 3 specifically documents that raising a timeout
# doesn't lose output). Also, attempting to read from a pipe while it's already
# being read from results in `RuntimeError: reentrant call in io.BufferedReader`;
# the same thing happens if you attempt to close() it while that's in progress.
__slots__ = (
'stdin',
'stdout',
'stderr',
'_all_greenlets',
)
def __init__(self, popen, input_data):
self.stdin = self.stdout = self.stderr = None
if popen.stdin: # Even if no data, we need to close
self.stdin = spawn(self._write_and_close, popen.stdin, input_data)
# If the timeout parameter is used, and the caller calls back after
# getting a TimeoutExpired exception, we can wind up with multiple
# greenlets trying to run and read from and close stdout/stderr.
# That's bad because it can lead to 'RuntimeError: reentrant call in io.BufferedReader'.
# We can't just kill the previous greenlets when a timeout happens,
# though, because we risk losing the output collected by that greenlet
# (and Python 3, where timeout is an official parameter, explicitly says
# that no output should be lost in the event of a timeout.) Instead, we're
# watching for the exception and ignoring it. It's not elegant,
# but it works
if popen.stdout:
self.stdout = spawn(self._read_and_close, popen.stdout)
if popen.stderr:
self.stderr = spawn(self._read_and_close, popen.stderr)
all_greenlets = []
for g in self.stdin, self.stdout, self.stderr:
if g is not None:
all_greenlets.append(g)
self._all_greenlets = tuple(all_greenlets)
def __iter__(self):
return iter(self._all_greenlets)
def __bool__(self):
return bool(self._all_greenlets)
__nonzero__ = __bool__
def __len__(self):
return len(self._all_greenlets)
@staticmethod
def _write_and_close(fobj, data):
try:
if data:
fobj.write(data)
if hasattr(fobj, 'flush'):
# 3.6 started expecting flush to be called.
fobj.flush()
except (OSError, IOError, BrokenPipeError) as ex:
# Test cases from the stdlib can raise BrokenPipeError
# without setting an errno value. This matters because
# Python 2 doesn't have a BrokenPipeError.
if isinstance(ex, BrokenPipeError) and ex.errno is None:
ex.errno = errno.EPIPE
if ex.errno != errno.EPIPE and ex.errno != errno.EINVAL:
raise
finally:
try:
fobj.close()
except EnvironmentError:
pass
@staticmethod
def _read_and_close(fobj):
try:
return fobj.read()
finally:
try:
fobj.close()
except EnvironmentError:
pass
class Popen(object): class Popen(object):
""" """
The underlying process creation and management in this module is The underlying process creation and management in this module is
...@@ -706,13 +801,17 @@ class Popen(object): ...@@ -706,13 +801,17 @@ class Popen(object):
self._devnull = os.open(os.devnull, os.O_RDWR) self._devnull = os.open(os.devnull, os.O_RDWR)
return self._devnull return self._devnull
_stdout_buffer = None _communicating_greenlets = None
_stderr_buffer = None
def communicate(self, input=None, timeout=None): def communicate(self, input=None, timeout=None):
"""Interact with process: Send data to stdin. Read data from """
stdout and stderr, until end-of-file is reached. Wait for Interact with process and return its output and error.
process to terminate. The optional input argument should be a
- Send *input* data to stdin.
- Read data from stdout and stderr, until end-of-file is reached.
- Wait for process to terminate.
The optional *input* argument should be a
string to be sent to the child process, or None, if no data string to be sent to the child process, or None, if no data
should be sent to the child. should be sent to the child.
...@@ -731,57 +830,9 @@ class Popen(object): ...@@ -731,57 +830,9 @@ class Popen(object):
Honor a *timeout* even if there's no way to communicate with the child Honor a *timeout* even if there's no way to communicate with the child
(stdin, stdout, and stderr are not pipes). (stdin, stdout, and stderr are not pipes).
""" """
greenlets = [] if self._communicating_greenlets is None:
if self.stdin: self._communicating_greenlets = _CommunicatingGreenlets(self, input)
greenlets.append(spawn(write_and_close, self.stdin, input)) greenlets = self._communicating_greenlets
# If the timeout parameter is used, and the caller calls back after
# getting a TimeoutExpired exception, we can wind up with multiple
# greenlets trying to run and read from and close stdout/stderr.
# That's bad because it can lead to 'RuntimeError: reentrant call in io.BufferedReader'.
# We can't just kill the previous greenlets when a timeout happens,
# though, because we risk losing the output collected by that greenlet
# (and Python 3, where timeout is an official parameter, explicitly says
# that no output should be lost in the event of a timeout.) Instead, we're
# watching for the exception and ignoring it. It's not elegant,
# but it works
def _make_pipe_reader(pipe_name):
pipe = getattr(self, pipe_name)
buf_name = '_' + pipe_name + '_buffer'
def _read():
try:
data = pipe.read()
except (
# io.Buffered* can raise RuntimeError: 'reentrant call'
RuntimeError,
# unbuffered Posix IO that we're already waiting on
# can raise this. Closing the pipe will free those greenlets up.
ConcurrentObjectUseError
):
return
if not data:
return
the_buffer = getattr(self, buf_name)
if the_buffer:
the_buffer.append(data)
else:
setattr(self, buf_name, [data])
return _read
if self.stdout:
_read_out = _make_pipe_reader('stdout')
stdout = spawn(_read_out)
greenlets.append(stdout)
else:
stdout = None
if self.stderr:
_read_err = _make_pipe_reader('stderr')
stderr = spawn(_read_err)
greenlets.append(stderr)
else:
stderr = None
# If we were given stdin=stdout=stderr=None, we have no way to # If we were given stdin=stdout=stderr=None, we have no way to
# communicate with the child, and thus no greenlets to wait # communicate with the child, and thus no greenlets to wait
...@@ -793,9 +844,18 @@ class Popen(object): ...@@ -793,9 +844,18 @@ class Popen(object):
self.wait(timeout=timeout, _raise_exc=True) self.wait(timeout=timeout, _raise_exc=True)
done = joinall(greenlets, timeout=timeout) done = joinall(greenlets, timeout=timeout)
if timeout is not None and len(done) != len(greenlets): # Allow finished greenlets, if any, to raise. This takes priority over
# the timeout exception.
for greenlet in done:
greenlet.get()
if timeout is not None and len(done) != len(self._communicating_greenlets):
raise TimeoutExpired(self.args, timeout) raise TimeoutExpired(self.args, timeout)
# Close only after we're sure that everything is done
# (there was no timeout, or there was, but everything finished).
# There should be no greenlets still running, even from a prior
# attempt. If there are, then this can raise RuntimeError: 'reentrant call'.
# So we ensure that previous greenlets are dead.
for pipe in (self.stdout, self.stderr): for pipe in (self.stdout, self.stderr):
if pipe: if pipe:
try: try:
...@@ -805,21 +865,8 @@ class Popen(object): ...@@ -805,21 +865,8 @@ class Popen(object):
self.wait() self.wait()
def _get_output_value(pipe_name): return (None if greenlets.stdout is None else greenlets.stdout.get(),
buf_name = '_' + pipe_name + '_buffer' None if greenlets.stderr is None else greenlets.stderr.get())
buf_value = getattr(self, buf_name)
setattr(self, buf_name, None)
if buf_value:
buf_value = self._communicate_empty_value.join(buf_value)
else:
buf_value = self._communicate_empty_value
return buf_value
stdout_value = _get_output_value('stdout')
stderr_value = _get_output_value('stderr')
return (None if stdout is None else stdout_value,
None if stderr is None else stderr_value)
def poll(self): def poll(self):
"""Check if child process has terminated. Set and return :attr:`returncode` attribute.""" """Check if child process has terminated. Set and return :attr:`returncode` attribute."""
...@@ -1648,22 +1695,6 @@ class Popen(object): ...@@ -1648,22 +1695,6 @@ class Popen(object):
self.send_signal(signal.SIGKILL) self.send_signal(signal.SIGKILL)
def write_and_close(fobj, data):
try:
if data:
fobj.write(data)
if hasattr(fobj, 'flush'):
# 3.6 started expecting flush to be called.
fobj.flush()
except (OSError, IOError) as ex:
if ex.errno != errno.EPIPE and ex.errno != errno.EINVAL:
raise
finally:
try:
fobj.close()
except EnvironmentError:
pass
def _with_stdout_stderr(exc, stderr): def _with_stdout_stderr(exc, stderr):
# Prior to Python 3.5, most exceptions didn't have stdout # Prior to Python 3.5, most exceptions didn't have stdout
# and stderr attributes and can't take the stderr attribute in their # and stderr attributes and can't take the stderr attribute in their
......
...@@ -85,6 +85,7 @@ from .skipping import skipOnPurePython ...@@ -85,6 +85,7 @@ from .skipping import skipOnPurePython
from .skipping import skipWithCExtensions from .skipping import skipWithCExtensions
from .skipping import skipOnLibuvOnTravisOnCPython27 from .skipping import skipOnLibuvOnTravisOnCPython27
from .skipping import skipOnPy37 from .skipping import skipOnPy37
from .skipping import skipOnPy3
from .skipping import skipWithoutResource from .skipping import skipWithoutResource
from .skipping import skipWithoutExternalNetwork from .skipping import skipWithoutExternalNetwork
from .skipping import skipOnPy2 from .skipping import skipOnPy2
......
...@@ -22,16 +22,52 @@ from __future__ import absolute_import, print_function, division ...@@ -22,16 +22,52 @@ from __future__ import absolute_import, print_function, division
import os import os
import unittest import unittest
import re import re
import gc
import functools
from . import sysinfo from . import sysinfo
# Linux/OS X/BSD platforms can implement this by calling out to lsof # Linux/OS X/BSD platforms /can/ implement this by calling out to lsof.
# However, if psutil is available (it is cross-platform) use that.
# It is *much* faster than shelling out to lsof each time
# (Running 14 tests takes 3.964s with lsof and 0.046 with psutil)
# However, it still doesn't completely solve the issue on Windows: fds are reported
# as -1 there, so we can't fully check those.
def _collects(func):
# We've seen OSError: No such file or directory /proc/PID/fd/NUM.
# This occurs in the loop that checks open files. It first does
# listdir() and then tries readlink() on each file. But the file
# went away. This must be because of async GC in PyPy running
# destructors at arbitrary times. This became an issue in PyPy 7.2
# but could theoretically be an issue with any objects caught in a
# cycle. This is one reason we GC before we begin. (The other is
# to clean up outstanding objects that will close files in
# __del__.)
#
# Note that this can hide errors, though, by causing greenlets to get
# collected and drop references and thus close files. We should be deterministic
# and careful about closing things.
@functools.wraps(func)
def f():
gc.collect()
gc.collect()
enabled = gc.isenabled()
gc.disable()
try:
return func()
finally:
if enabled:
gc.enable()
return f
if sysinfo.WIN: if sysinfo.WIN:
def _run_lsof(): def _run_lsof():
raise unittest.SkipTest("lsof not expected on Windows") raise unittest.SkipTest("lsof not expected on Windows")
else: else:
@_collects
def _run_lsof(): def _run_lsof():
import tempfile import tempfile
pid = os.getpid() pid = os.getpid()
...@@ -70,6 +106,7 @@ def default_get_open_files(pipes=False): ...@@ -70,6 +106,7 @@ def default_get_open_files(pipes=False):
results['data'] = data results['data'] = data
return results return results
@_collects
def default_get_number_open_files(): def default_get_number_open_files():
if os.path.exists('/proc/'): if os.path.exists('/proc/'):
# Linux only # Linux only
...@@ -91,12 +128,8 @@ except ImportError: ...@@ -91,12 +128,8 @@ except ImportError:
get_open_files = default_get_open_files get_open_files = default_get_open_files
get_number_open_files = default_get_number_open_files get_number_open_files = default_get_number_open_files
else: else:
# If psutil is available (it is cross-platform) use that.
# It is *much* faster than shelling out to lsof each time
# (Running 14 tests takes 3.964s with lsof and 0.046 with psutil)
# However, it still doesn't completely solve the issue on Windows: fds are reported
# as -1 there, so we can't fully check those.
@_collects
def get_open_files(): def get_open_files():
""" """
Return a list of popenfile and pconn objects. Return a list of popenfile and pconn objects.
...@@ -108,37 +141,26 @@ else: ...@@ -108,37 +141,26 @@ else:
(socket.listen(1)). Unlike the lsof implementation, this will only (socket.listen(1)). Unlike the lsof implementation, this will only
return sockets in a state like that. return sockets in a state like that.
""" """
# We've seen OSError: No such file or directory
# /proc/PID/fd/NUM. This occurs in the loop that checks open
# files. It first does listdir() and then tries readlink() on
# each file. But the file went away. This must be because of
# async GC in PyPy running destructors at arbitrary times.
# This became an issue in PyPy 7.2 but could theoretically be
# an issue with any objects caught in a cycle. Try to clean
# that up before we begin.
import gc
gc.collect()
gc.collect()
results = dict() results = dict()
gc.disable()
try: for _ in range(3):
for _ in range(3): try:
try: process = psutil.Process()
process = psutil.Process() results['data'] = process.open_files() + process.connections('all')
results['data'] = process.open_files() + process.connections('all') break
break except OSError:
except OSError: pass
pass else:
else: # No break executed
# No break executed raise unittest.SkipTest("Unable to read open files")
raise unittest.SkipTest("Unable to read open files")
finally:
gc.enable()
for x in results['data']: for x in results['data']:
results[x.fd] = x results[x.fd] = x
results['data'] += ['From psutil', process] results['data'] += ['From psutil', process]
return results return results
@_collects
def get_number_open_files(): def get_number_open_files():
process = psutil.Process() process = psutil.Process()
try: try:
...@@ -146,3 +168,28 @@ else: ...@@ -146,3 +168,28 @@ else:
except AttributeError: except AttributeError:
# num_fds is unix only. Is num_handles close enough on Windows? # num_fds is unix only. Is num_handles close enough on Windows?
return 0 return 0
class DoesNotLeakFilesMixin(object): # pragma: no cover
"""
A test case mixin that helps find a method that's leaking an
open file.
Only mix this in when needed to debug, it slows tests down.
"""
def setUp(self):
self.__open_files_count = get_number_open_files()
super(DoesNotLeakFilesMixin, self).setUp()
def tearDown(self):
super(DoesNotLeakFilesMixin, self).tearDown()
after = get_number_open_files()
if after > self.__open_files_count:
raise AssertionError(
"Too many open files. Before: %s < After: %s.\n%s" % (
self.__open_files_count,
after,
get_open_files()
)
)
...@@ -674,6 +674,11 @@ if WIN: ...@@ -674,6 +674,11 @@ if WIN:
disabled_tests += [ disabled_tests += [
# Issue with Unix vs DOS newlines in the file vs from the server # Issue with Unix vs DOS newlines in the file vs from the server
'test_ssl.ThreadedTests.test_socketserver', 'test_ssl.ThreadedTests.test_socketserver',
# On appveyor, this sometimes produces 'A non-blocking socket
# operation could not be completed immediately', followed by
# 'No connection could be made because the target machine
# actively refused it'
'test_socket.NonBlockingTCPTests.testAccept',
] ]
# These are a problem on 3.5; on 3.6+ they wind up getting (accidentally) disabled. # These are a problem on 3.5; on 3.6+ they wind up getting (accidentally) disabled.
...@@ -803,6 +808,10 @@ if PY3: ...@@ -803,6 +808,10 @@ if PY3:
'test_subprocess.ProcessTestCaseNoPoll.test_cwd_with_relative_arg', 'test_subprocess.ProcessTestCaseNoPoll.test_cwd_with_relative_arg',
'test_subprocess.ProcessTestCase.test_cwd_with_relative_executable', 'test_subprocess.ProcessTestCase.test_cwd_with_relative_executable',
# In 3.7 and 3.8 on Travis CI, this appears to take the full 3 seconds.
# Can't reproduce it locally. We have our own copy of this that takes
# timing on CI into account.
'test_subprocess.RunFuncTestCase.test_run_with_shell_timeout_and_capture_output',
] ]
disabled_tests += [ disabled_tests += [
......
...@@ -43,6 +43,7 @@ skipOnPyPy3 = _do_not_skip ...@@ -43,6 +43,7 @@ skipOnPyPy3 = _do_not_skip
skipOnPyPyOnWindows = _do_not_skip skipOnPyPyOnWindows = _do_not_skip
skipOnPy2 = unittest.skip if sysinfo.PY2 else _do_not_skip skipOnPy2 = unittest.skip if sysinfo.PY2 else _do_not_skip
skipOnPy3 = unittest.skip if sysinfo.PY3 else _do_not_skip
skipOnPy37 = unittest.skip if sysinfo.PY37 else _do_not_skip skipOnPy37 = unittest.skip if sysinfo.PY37 else _do_not_skip
skipOnPurePython = unittest.skip if sysinfo.PURE_PYTHON else _do_not_skip skipOnPurePython = unittest.skip if sysinfo.PURE_PYTHON else _do_not_skip
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
from __future__ import absolute_import, print_function, division from __future__ import absolute_import, print_function, division
import sys import sys
from time import time
import os.path import os.path
from contextlib import contextmanager from contextlib import contextmanager
from unittest import TestCase as BaseTestCase from unittest import TestCase as BaseTestCase
...@@ -28,6 +27,8 @@ from functools import wraps ...@@ -28,6 +27,8 @@ from functools import wraps
import gevent import gevent
from gevent._util import LazyOnClass from gevent._util import LazyOnClass
from gevent._compat import perf_counter
from gevent._compat import get_clock_info
from . import sysinfo from . import sysinfo
from . import params from . import params
...@@ -63,13 +64,17 @@ class TimeAssertMixin(object): ...@@ -63,13 +64,17 @@ class TimeAssertMixin(object):
fuzzy = expected * 5.0 fuzzy = expected * 5.0
else: else:
fuzzy = expected / 2.0 fuzzy = expected / 2.0
start = time() min_time = expected - fuzzy
yield max_time = expected + fuzzy
elapsed = time() - start start = perf_counter()
yield (min_time, max_time)
elapsed = perf_counter() - start
try: try:
self.assertTrue( self.assertTrue(
expected - fuzzy <= elapsed <= expected + fuzzy, min_time <= elapsed <= max_time,
'Expected: %r; elapsed: %r; fuzzy %r' % (expected, elapsed, fuzzy)) 'Expected: %r; elapsed: %r; fuzzy %r; clock_info: %s' % (
expected, elapsed, fuzzy, get_clock_info('perf_counter')
))
except AssertionError: except AssertionError:
flaky.reraiseFlakyTestRaceCondition() flaky.reraiseFlakyTestRaceCondition()
...@@ -248,6 +253,11 @@ class TestCase(TestCaseMetaClass("NewBase", ...@@ -248,6 +253,11 @@ class TestCase(TestCaseMetaClass("NewBase",
__timeout__ = params.LOCAL_TIMEOUT if not sysinfo.RUNNING_ON_CI else params.CI_TIMEOUT __timeout__ = params.LOCAL_TIMEOUT if not sysinfo.RUNNING_ON_CI else params.CI_TIMEOUT
switch_expected = 'default' switch_expected = 'default'
#: Set this to true to cause errors that get reported to the hub to
#: always get propagated to the main greenlet. This can be done at the
#: class or method level.
#: .. caution:: This can hide errors and make it look like exceptions
#: are propagated even if they're not.
error_fatal = True error_fatal = True
uses_handle_error = True uses_handle_error = True
close_on_teardown = () close_on_teardown = ()
...@@ -257,7 +267,7 @@ class TestCase(TestCaseMetaClass("NewBase", ...@@ -257,7 +267,7 @@ class TestCase(TestCaseMetaClass("NewBase",
# pylint:disable=arguments-differ # pylint:disable=arguments-differ
if self.switch_expected == 'default': if self.switch_expected == 'default':
self.switch_expected = get_switch_expected(self.fullname) self.switch_expected = get_switch_expected(self.fullname)
return BaseTestCase.run(self, *args, **kwargs) return super(TestCase, self).run(*args, **kwargs)
def setUp(self): def setUp(self):
super(TestCase, self).setUp() super(TestCase, self).setUp()
......
...@@ -5,7 +5,6 @@ import sys ...@@ -5,7 +5,6 @@ import sys
import os import os
import glob import glob
import traceback import traceback
import time
import importlib import importlib
from datetime import timedelta from datetime import timedelta
...@@ -135,7 +134,7 @@ class Runner(object): ...@@ -135,7 +134,7 @@ class Runner(object):
def _reap_all(self): def _reap_all(self):
while self._reap() > 0: while self._reap() > 0:
time.sleep(self.TIME_WAIT_REAP) util.sleep(self.TIME_WAIT_REAP)
def _spawn(self, pool, cmd, options): def _spawn(self, pool, cmd, options):
while True: while True:
...@@ -144,7 +143,7 @@ class Runner(object): ...@@ -144,7 +143,7 @@ class Runner(object):
self._running_jobs.append(job) self._running_jobs.append(job)
return return
time.sleep(self.TIME_WAIT_SPAWN) util.sleep(self.TIME_WAIT_SPAWN)
def __call__(self): def __call__(self):
util.log("Running tests in parallel with concurrency %s" % (self._worker_count,),) util.log("Running tests in parallel with concurrency %s" % (self._worker_count,),)
...@@ -153,11 +152,11 @@ class Runner(object): ...@@ -153,11 +152,11 @@ class Runner(object):
# sequentially. # sequentially.
util.BUFFER_OUTPUT = self._worker_count > 1 or self._quiet util.BUFFER_OUTPUT = self._worker_count > 1 or self._quiet
start = time.time() start = util.perf_counter()
try: try:
self._run_tests() self._run_tests()
except KeyboardInterrupt: except KeyboardInterrupt:
self._report(time.time() - start, exit=False) self._report(util.perf_counter() - start, exit=False)
util.log('(partial results)\n') util.log('(partial results)\n')
raise raise
except: except:
...@@ -165,7 +164,7 @@ class Runner(object): ...@@ -165,7 +164,7 @@ class Runner(object):
raise raise
self._reap_all() self._reap_all()
self._report(time.time() - start, exit=True) self._report(util.perf_counter() - start, exit=True)
def _run_tests(self): def _run_tests(self):
"Runs the tests, produces no report." "Runs the tests, produces no report."
...@@ -215,7 +214,7 @@ class TravisFoldingRunner(object): ...@@ -215,7 +214,7 @@ class TravisFoldingRunner(object):
def __init__(self, runner, travis_fold_msg): def __init__(self, runner, travis_fold_msg):
self._runner = runner self._runner = runner
self._travis_fold_msg = travis_fold_msg self._travis_fold_msg = travis_fold_msg
self._travis_fold_name = str(int(time.time())) self._travis_fold_name = str(int(util.perf_counter()))
# A zope-style acquisition proxy would be convenient here. # A zope-style acquisition proxy would be convenient here.
run_tests = runner._run_tests run_tests = runner._run_tests
......
...@@ -18,9 +18,8 @@ ...@@ -18,9 +18,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE. # THE SOFTWARE.
import time
import gevent import gevent
from gevent._compat import perf_counter
from . import sysinfo from . import sysinfo
from . import leakcheck from . import leakcheck
...@@ -69,11 +68,11 @@ class _DelayWaitMixin(object): ...@@ -69,11 +68,11 @@ class _DelayWaitMixin(object):
seconds = getattr(timeout, 'seconds', timeout) seconds = getattr(timeout, 'seconds', timeout)
gevent.get_hub().loop.update_now() gevent.get_hub().loop.update_now()
start = time.time() start = perf_counter()
try: try:
result = self.wait(timeout) result = self.wait(timeout)
finally: finally:
self._check_delay_bounds(seconds, time.time() - start, self._check_delay_bounds(seconds, perf_counter() - start,
self._default_delay_min_adj, self._default_delay_min_adj,
self._default_delay_max_adj) self._default_delay_max_adj)
return result return result
......
...@@ -6,10 +6,11 @@ import traceback ...@@ -6,10 +6,11 @@ import traceback
import unittest import unittest
import threading import threading
import subprocess import subprocess
import time from time import sleep
from . import six from . import six
from gevent._config import validate_bool from gevent._config import validate_bool
from gevent._compat import perf_counter
from gevent.monkey import get_original from gevent.monkey import get_original
# pylint: disable=broad-except,attribute-defined-outside-init # pylint: disable=broad-except,attribute-defined-outside-init
...@@ -391,9 +392,9 @@ def run(command, **kwargs): # pylint:disable=too-many-locals ...@@ -391,9 +392,9 @@ def run(command, **kwargs): # pylint:disable=too-many-locals
name = popen.name name = popen.name
try: try:
time_start = time.time() time_start = perf_counter()
out, err = popen.communicate() out, err = popen.communicate()
took = time.time() - time_start took = perf_counter() - time_start
if popen.was_killed or popen.poll() is None: if popen.was_killed or popen.poll() is None:
result = 'TIMEOUT' result = 'TIMEOUT'
else: else:
...@@ -611,7 +612,7 @@ class TestServer(ExampleMixin, ...@@ -611,7 +612,7 @@ class TestServer(ExampleMixin,
def before(self): def before(self):
if self.before_delay is not None: if self.before_delay is not None:
time.sleep(self.before_delay) sleep(self.before_delay)
self.assertIsNone(self.popen.poll(), self.assertIsNone(self.popen.poll(),
'%s died with code %s' % ( '%s died with code %s' % (
self.example, self.popen.poll(), self.example, self.popen.poll(),
...@@ -619,7 +620,7 @@ class TestServer(ExampleMixin, ...@@ -619,7 +620,7 @@ class TestServer(ExampleMixin,
def after(self): def after(self):
if self.after_delay is not None: if self.after_delay is not None:
time.sleep(self.after_delay) sleep(self.after_delay)
self.assertIsNone(self.popen.poll(), self.assertIsNone(self.popen.poll(),
'%s died with code %s' % ( '%s died with code %s' % (
self.example, self.popen.poll(), self.example, self.popen.poll(),
...@@ -646,6 +647,6 @@ class alarm(threading.Thread): ...@@ -646,6 +647,6 @@ class alarm(threading.Thread):
self.start() self.start()
def run(self): def run(self):
time.sleep(self.timeout) sleep(self.timeout)
sys.stderr.write('Timeout.\n') sys.stderr.write('Timeout.\n')
os._exit(5) os._exit(5)
...@@ -5,37 +5,51 @@ import unittest ...@@ -5,37 +5,51 @@ import unittest
import sys import sys
import gevent.testing as greentest import gevent.testing as greentest
from gevent import core from gevent._config import Loop
available_loops = Loop().get_options()
available_loops.pop('libuv', None)
def not_available(name):
return isinstance(available_loops[name], ImportError)
class TestCore(unittest.TestCase):
def test_get_version(self): class WatcherTestMixin(object):
version = core.get_version() # pylint: disable=no-member kind = None
self.assertIsInstance(version, str)
self.assertTrue(version)
header_version = core.get_header_version() # pylint: disable=no-member
self.assertIsInstance(header_version, str)
self.assertTrue(header_version)
self.assertEqual(version, header_version)
class TestWatchers(unittest.TestCase):
def _makeOne(self): def _makeOne(self):
return core.loop() # pylint:disable=no-member return self.kind(default=False) # pylint:disable=not-callable
def destroyOne(self, loop): def destroyOne(self, loop):
loop.destroy() loop.destroy()
def setUp(self): def setUp(self):
self.loop = self._makeOne() self.loop = self._makeOne()
self.core = sys.modules[self.kind.__module__]
def tearDown(self): def tearDown(self):
self.destroyOne(self.loop) self.destroyOne(self.loop)
del self.loop del self.loop
def test_get_version(self):
version = self.core.get_version() # pylint: disable=no-member
self.assertIsInstance(version, str)
self.assertTrue(version)
header_version = self.core.get_header_version() # pylint: disable=no-member
self.assertIsInstance(header_version, str)
self.assertTrue(header_version)
self.assertEqual(version, header_version)
def test_events_conversion(self):
self.assertEqual(self.core._events_to_str(self.core.READ | self.core.WRITE), # pylint: disable=no-member
'READ|WRITE')
def test_EVENTS(self):
self.assertEqual(str(self.core.EVENTS), # pylint: disable=no-member
'gevent.core.EVENTS')
self.assertEqual(repr(self.core.EVENTS), # pylint: disable=no-member
'gevent.core.EVENTS')
def test_io(self): def test_io(self):
if greentest.WIN: if greentest.WIN:
# libev raises IOError, libuv raises ValueError # libev raises IOError, libuv raises ValueError
...@@ -46,30 +60,65 @@ class TestWatchers(unittest.TestCase): ...@@ -46,30 +60,65 @@ class TestWatchers(unittest.TestCase):
with self.assertRaises(Error): with self.assertRaises(Error):
self.loop.io(-1, 1) self.loop.io(-1, 1)
if hasattr(core, 'TIMER'): if hasattr(self.core, 'TIMER'):
# libev # libev
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.loop.io(1, core.TIMER) # pylint:disable=no-member self.loop.io(1, self.core.TIMER) # pylint:disable=no-member
# Test we can set events and io before it's started # Test we can set events and io before it's started
if not greentest.WIN: if not greentest.WIN:
# We can't do this with arbitrary FDs on windows; # We can't do this with arbitrary FDs on windows;
# see libev_vfd.h # see libev_vfd.h
io = self.loop.io(1, core.READ) # pylint:disable=no-member io = self.loop.io(1, self.core.READ) # pylint:disable=no-member
io.fd = 2 io.fd = 2
self.assertEqual(io.fd, 2) self.assertEqual(io.fd, 2)
io.events = core.WRITE # pylint:disable=no-member io.events = self.core.WRITE # pylint:disable=no-member
if not hasattr(core, 'libuv'): if not hasattr(self.core, 'libuv'):
# libev # libev
# pylint:disable=no-member # pylint:disable=no-member
self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET') self.assertEqual(self.core._events_to_str(io.events), 'WRITE|_IOFDSET')
else: else:
self.assertEqual(core._events_to_str(io.events), # pylint:disable=no-member self.assertEqual(self.core._events_to_str(io.events), # pylint:disable=no-member
'WRITE') 'WRITE')
io.start(lambda: None) io.start(lambda: None)
io.close() io.close()
def test_timer_constructor(self):
with self.assertRaises(ValueError):
self.loop.timer(1, -1)
def test_signal_constructor(self):
with self.assertRaises(ValueError):
self.loop.signal(1000)
class LibevTestMixin(WatcherTestMixin):
def test_flags_conversion(self):
# pylint: disable=no-member
core = self.core
if not greentest.WIN:
self.assertEqual(core.loop(2, default=False).backend_int, 2)
self.assertEqual(core.loop('select', default=False).backend, 'select')
self.assertEqual(core._flags_to_int(None), 0)
self.assertEqual(core._flags_to_int(['kqueue', 'SELECT']), core.BACKEND_KQUEUE | core.BACKEND_SELECT)
self.assertEqual(core._flags_to_list(core.BACKEND_PORT | core.BACKEND_POLL), ['port', 'poll'])
self.assertRaises(ValueError, core.loop, ['port', 'blabla'])
self.assertRaises(TypeError, core.loop, object())
@unittest.skipIf(not_available('libev-cext'), "Needs libev-cext")
class TestLibevCext(LibevTestMixin, unittest.TestCase):
kind = available_loops['libev-cext']
@unittest.skipIf(not_available('libev-cffi'), "Needs libev-cffi")
class TestLibevCffi(LibevTestMixin, unittest.TestCase):
kind = available_loops['libev-cffi']
@unittest.skipIf(not_available('libuv-cffi'), "Needs libuv-cffi")
class TestLibuvCffi(WatcherTestMixin, unittest.TestCase):
kind = available_loops['libuv-cffi']
@greentest.skipOnLibev("libuv-specific") @greentest.skipOnLibev("libuv-specific")
@greentest.skipOnWindows("Destroying the loop somehow fails") @greentest.skipOnWindows("Destroying the loop somehow fails")
def test_io_multiplex_events(self): def test_io_multiplex_events(self):
...@@ -77,7 +126,7 @@ class TestWatchers(unittest.TestCase): ...@@ -77,7 +126,7 @@ class TestWatchers(unittest.TestCase):
import socket import socket
sock = socket.socket() sock = socket.socket()
fd = sock.fileno() fd = sock.fileno()
core = self.core
read = self.loop.io(fd, core.READ) read = self.loop.io(fd, core.READ)
write = self.loop.io(fd, core.WRITE) write = self.loop.io(fd, core.WRITE)
...@@ -106,70 +155,6 @@ class TestWatchers(unittest.TestCase): ...@@ -106,70 +155,6 @@ class TestWatchers(unittest.TestCase):
write.close() write.close()
sock.close() sock.close()
def test_timer_constructor(self):
with self.assertRaises(ValueError):
self.loop.timer(1, -1)
def test_signal_constructor(self):
with self.assertRaises(ValueError):
self.loop.signal(1000)
class TestWatchersDefault(TestWatchers):
def _makeOne(self):
return core.loop(default=True) # pylint:disable=no-member
def destroyOne(self, loop):
return
# XXX: The crash may be fixed? The hang showed up after the crash was
# reproduced and fixed on linux and OS X.
@greentest.skipOnLibuvOnWin(
"This crashes with PyPy 5.10.0, only on Windows. "
"See https://ci.appveyor.com/project/denik/gevent/build/1.0.1380/job/lrlvid6mkjtyrhn5#L1103 "
"It has also timed out, but only on Appveyor CPython 3.6; local CPython 3.6 does not. "
"See https://ci.appveyor.com/project/denik/gevent/build/1.0.1414/job/yn7yi8b53vtqs8lw#L1523")
@greentest.skipIf(
greentest.LIBUV and greentest.RUNNING_ON_TRAVIS and sys.version_info == (3, 8, 0, 'beta', 4),
"Crashes on 3.8.0b4 on TravisCI. "
"(https://travis-ci.org/gevent/gevent/jobs/582031266#L215) "
"Unable to reproduce locally so far on macOS."
)
class TestWatchersDefaultDestroyed(TestWatchers):
def _makeOne(self):
# pylint: disable=no-member
l = core.loop(default=True)
l.destroy()
del l
return core.loop(default=True)
@greentest.skipOnLibuv("Tests for libev-only functions")
class TestLibev(unittest.TestCase):
def test_flags_conversion(self):
# pylint: disable=no-member
if not greentest.WIN:
self.assertEqual(core.loop(2, default=False).backend_int, 2)
self.assertEqual(core.loop('select', default=False).backend, 'select')
self.assertEqual(core._flags_to_int(None), 0)
self.assertEqual(core._flags_to_int(['kqueue', 'SELECT']), core.BACKEND_KQUEUE | core.BACKEND_SELECT)
self.assertEqual(core._flags_to_list(core.BACKEND_PORT | core.BACKEND_POLL), ['port', 'poll'])
self.assertRaises(ValueError, core.loop, ['port', 'blabla'])
self.assertRaises(TypeError, core.loop, object())
class TestEvents(unittest.TestCase):
def test_events_conversion(self):
self.assertEqual(core._events_to_str(core.READ | core.WRITE), # pylint: disable=no-member
'READ|WRITE')
def test_EVENTS(self):
self.assertEqual(str(core.EVENTS), # pylint: disable=no-member
'gevent.core.EVENTS')
self.assertEqual(repr(core.EVENTS), # pylint: disable=no-member
'gevent.core.EVENTS')
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
...@@ -4,7 +4,6 @@ import errno ...@@ -4,7 +4,6 @@ import errno
import unittest import unittest
import time import time
import gc
import tempfile import tempfile
import gevent.testing as greentest import gevent.testing as greentest
...@@ -34,12 +33,11 @@ python_universal_newlines = hasattr(sys.stdout, 'newlines') ...@@ -34,12 +33,11 @@ python_universal_newlines = hasattr(sys.stdout, 'newlines')
python_universal_newlines_broken = PY3 and subprocess.mswindows python_universal_newlines_broken = PY3 and subprocess.mswindows
@greentest.skipWithoutResource('subprocess') @greentest.skipWithoutResource('subprocess')
class Test(greentest.TestCase): class TestPopen(greentest.TestCase):
def setUp(self): # Use the normal error handling. Make sure that any background greenlets
greentest.TestCase.setUp(self) # subprocess spawns propagate errors as expected.
gc.collect() error_fatal = False
gc.collect()
def test_exit(self): def test_exit(self):
popen = subprocess.Popen([sys.executable, '-c', 'import sys; sys.exit(10)']) popen = subprocess.Popen([sys.executable, '-c', 'import sys; sys.exit(10)'])
...@@ -51,12 +49,10 @@ class Test(greentest.TestCase): ...@@ -51,12 +49,10 @@ class Test(greentest.TestCase):
self.assertEqual(popen.poll(), 11) self.assertEqual(popen.poll(), 11)
def test_child_exception(self): def test_child_exception(self):
try: with self.assertRaises(OSError) as exc:
subprocess.Popen(['*']).wait() subprocess.Popen(['*']).wait()
except OSError as ex:
assert ex.errno == 2, ex self.assertEqual(exc.exception.errno, 2)
else:
raise AssertionError('Expected OSError: [Errno 2] No such file or directory')
def test_leak(self): def test_leak(self):
num_before = greentest.get_number_open_files() num_before = greentest.get_number_open_files()
...@@ -65,9 +61,6 @@ class Test(greentest.TestCase): ...@@ -65,9 +61,6 @@ class Test(greentest.TestCase):
p.wait() p.wait()
p.stdout.close() p.stdout.close()
del p del p
if PYPY:
gc.collect()
gc.collect()
num_after = greentest.get_number_open_files() num_after = greentest.get_number_open_files()
self.assertEqual(num_before, num_after) self.assertEqual(num_before, num_after)
...@@ -106,7 +99,8 @@ class Test(greentest.TestCase): ...@@ -106,7 +99,8 @@ class Test(greentest.TestCase):
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True) universal_newlines=True
)
(stdout, stderr) = p.communicate('banana\r\n\xff\xff\xf2\xf9\r\n') (stdout, stderr) = p.communicate('banana\r\n\xff\xff\xf2\xf9\r\n')
self.assertIsInstance(stdout, str) self.assertIsInstance(stdout, str)
self.assertIsInstance(stderr, str) self.assertIsInstance(stderr, str)
...@@ -116,25 +110,51 @@ class Test(greentest.TestCase): ...@@ -116,25 +110,51 @@ class Test(greentest.TestCase):
self.assertEqual(stderr, self.assertEqual(stderr,
'pineapple\n\xff\xff\xf2\xf9\n') 'pineapple\n\xff\xff\xf2\xf9\n')
@greentest.skipOnWindows("Windows IO is weird; this doesn't raise")
@greentest.skipOnPy2("Only Python 2 decodes")
def test_communicate_undecodable(self):
# If the subprocess writes non-decodable data, `communicate` raises the
# same UnicodeDecodeError that the stdlib does, instead of
# printing it to the hub. This only applies to Python 3, because only it
# will actually use text mode.
# See https://github.com/gevent/gevent/issues/1510
with subprocess.Popen(
[
sys.executable,
'-W', 'ignore',
'-c',
"import os, sys; "
r'os.write(sys.stdout.fileno(), b"\xff")'
],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, universal_newlines=True
) as p:
with self.assertRaises(UnicodeDecodeError):
p.communicate()
@greentest.skipOnLibuvOnPyPyOnWin("hangs") @greentest.skipOnLibuvOnPyPyOnWin("hangs")
def test_universal1(self): def test_universal1(self):
p = subprocess.Popen([sys.executable, "-c", with subprocess.Popen(
'import sys,os;' + SETBINARY + [
'sys.stdout.write("line1\\n");' sys.executable, "-c",
'sys.stdout.flush();' 'import sys,os;' + SETBINARY +
'sys.stdout.write("line2\\r");' 'sys.stdout.write("line1\\n");'
'sys.stdout.flush();' 'sys.stdout.flush();'
'sys.stdout.write("line3\\r\\n");' 'sys.stdout.write("line2\\r");'
'sys.stdout.flush();' 'sys.stdout.flush();'
'sys.stdout.write("line4\\r");' 'sys.stdout.write("line3\\r\\n");'
'sys.stdout.flush();' 'sys.stdout.flush();'
'sys.stdout.write("\\nline5");' 'sys.stdout.write("line4\\r");'
'sys.stdout.flush();' 'sys.stdout.flush();'
'sys.stdout.write("\\nline6");'], 'sys.stdout.write("\\nline5");'
stdout=subprocess.PIPE, 'sys.stdout.flush();'
universal_newlines=1, 'sys.stdout.write("\\nline6");'
bufsize=1) ],
try: stdout=subprocess.PIPE,
universal_newlines=1,
bufsize=1
) as p:
stdout = p.stdout.read() stdout = p.stdout.read()
if python_universal_newlines: if python_universal_newlines:
# Interpreter with universal newline support # Interpreter with universal newline support
...@@ -149,26 +169,27 @@ class Test(greentest.TestCase): ...@@ -149,26 +169,27 @@ class Test(greentest.TestCase):
# Interpreter without universal newline support # Interpreter without universal newline support
self.assertEqual(stdout, self.assertEqual(stdout,
"line1\nline2\rline3\r\nline4\r\nline5\nline6") "line1\nline2\rline3\r\nline4\r\nline5\nline6")
finally:
p.stdout.close()
@greentest.skipOnLibuvOnPyPyOnWin("hangs") @greentest.skipOnLibuvOnPyPyOnWin("hangs")
def test_universal2(self): def test_universal2(self):
p = subprocess.Popen([sys.executable, "-c", with subprocess.Popen(
'import sys,os;' + SETBINARY + [
'sys.stdout.write("line1\\n");' sys.executable, "-c",
'sys.stdout.flush();' 'import sys,os;' + SETBINARY +
'sys.stdout.write("line2\\r");' 'sys.stdout.write("line1\\n");'
'sys.stdout.flush();' 'sys.stdout.flush();'
'sys.stdout.write("line3\\r\\n");' 'sys.stdout.write("line2\\r");'
'sys.stdout.flush();' 'sys.stdout.flush();'
'sys.stdout.write("line4\\r\\nline5");' 'sys.stdout.write("line3\\r\\n");'
'sys.stdout.flush();' 'sys.stdout.flush();'
'sys.stdout.write("\\nline6");'], 'sys.stdout.write("line4\\r\\nline5");'
stdout=subprocess.PIPE, 'sys.stdout.flush();'
universal_newlines=1, 'sys.stdout.write("\\nline6");'
bufsize=1) ],
try: stdout=subprocess.PIPE,
universal_newlines=1,
bufsize=1
) as p:
stdout = p.stdout.read() stdout = p.stdout.read()
if python_universal_newlines: if python_universal_newlines:
# Interpreter with universal newline support # Interpreter with universal newline support
...@@ -183,60 +204,54 @@ class Test(greentest.TestCase): ...@@ -183,60 +204,54 @@ class Test(greentest.TestCase):
# Interpreter without universal newline support # Interpreter without universal newline support
self.assertEqual(stdout, self.assertEqual(stdout,
"line1\nline2\rline3\r\nline4\r\nline5\nline6") "line1\nline2\rline3\r\nline4\r\nline5\nline6")
finally:
p.stdout.close()
@greentest.skipOnWindows("Uses 'grep' command") @greentest.skipOnWindows("Uses 'grep' command")
def test_nonblock_removed(self): def test_nonblock_removed(self):
# see issue #134 # see issue #134
r, w = os.pipe() r, w = os.pipe()
stdin = subprocess.FileObject(r) stdin = subprocess.FileObject(r)
p = subprocess.Popen(['grep', 'text'], stdin=stdin) with subprocess.Popen(['grep', 'text'], stdin=stdin) as p:
try: try:
# Closing one half of the pipe causes Python 3 on OS X to terminate the # Closing one half of the pipe causes Python 3 on OS X to terminate the
# child process; it exits with code 1 and the assert that p.poll is None # child process; it exits with code 1 and the assert that p.poll is None
# fails. Removing the close lets it pass under both Python 3 and 2.7. # fails. Removing the close lets it pass under both Python 3 and 2.7.
# If subprocess.Popen._remove_nonblock_flag is changed to a noop, then # If subprocess.Popen._remove_nonblock_flag is changed to a noop, then
# the test fails (as expected) even with the close removed # the test fails (as expected) even with the close removed
#os.close(w) #os.close(w)
time.sleep(0.1) time.sleep(0.1)
self.assertEqual(p.poll(), None) self.assertEqual(p.poll(), None)
finally: finally:
if p.poll() is None: if p.poll() is None:
p.kill() p.kill()
stdin.close() stdin.close()
os.close(w) os.close(w)
def test_issue148(self): def test_issue148(self):
for _ in range(7): for _ in range(7):
try: with self.assertRaises(OSError) as exc:
subprocess.Popen('this_name_must_not_exist') with subprocess.Popen('this_name_must_not_exist'):
except OSError as ex: pass
if ex.errno != errno.ENOENT:
raise self.assertEqual(exc.exception.errno, errno.ENOENT)
else:
raise AssertionError('must fail with ENOENT')
@greentest.skipOnLibuvOnPyPyOnWin("hangs") @greentest.skipOnLibuvOnPyPyOnWin("hangs")
def test_check_output_keyword_error(self): def test_check_output_keyword_error(self):
try: with self.assertRaises(subprocess.CalledProcessError) as exc: # pylint:disable=no-member
subprocess.check_output([sys.executable, '-c', 'import sys; sys.exit(44)']) subprocess.check_output([sys.executable, '-c', 'import sys; sys.exit(44)'])
except subprocess.CalledProcessError as e: # pylint:disable=no-member
self.assertEqual(e.returncode, 44)
else:
raise AssertionError('must fail with CalledProcessError')
self.assertEqual(exc.exception.returncode, 44)
@greentest.skipOnPy3("The default buffer changed in Py3")
def test_popen_bufsize(self): def test_popen_bufsize(self):
# Test that subprocess has unbuffered output by default # Test that subprocess has unbuffered output by default
# (as the vanilla subprocess module) # (as the vanilla subprocess module)
if PY3: with subprocess.Popen(
# The default changed under python 3. [sys.executable, '-u', '-c',
return 'import sys; sys.stdout.write(sys.stdin.readline())'],
p = subprocess.Popen([sys.executable, '-u', '-c', stdin=subprocess.PIPE, stdout=subprocess.PIPE
'import sys; sys.stdout.write(sys.stdin.readline())'], ) as p:
stdin=subprocess.PIPE, stdout=subprocess.PIPE) p.stdin.write(b'foobar\n')
p.stdin.write(b'foobar\n') r = p.stdout.readline()
r = p.stdout.readline()
self.assertEqual(r, b'foobar\n') self.assertEqual(r, b'foobar\n')
@greentest.ignores_leakcheck @greentest.ignores_leakcheck
...@@ -253,7 +268,6 @@ class Test(greentest.TestCase): ...@@ -253,7 +268,6 @@ class Test(greentest.TestCase):
def fn(): def fn():
with self.assertRaises(TypeError) as exc: with self.assertRaises(TypeError) as exc:
gevent.subprocess.Popen('echo 123', shell=True) gevent.subprocess.Popen('echo 123', shell=True)
raise AssertionError("Should not be able to construct Popen")
ex.append(exc.exception) ex.append(exc.exception)
thread = Thread(target=fn) thread = Thread(target=fn)
...@@ -267,10 +281,12 @@ class Test(greentest.TestCase): ...@@ -267,10 +281,12 @@ class Test(greentest.TestCase):
@greentest.skipOnLibuvOnPyPyOnWin("hangs") @greentest.skipOnLibuvOnPyPyOnWin("hangs")
def __test_no_output(self, kwargs, kind): def __test_no_output(self, kwargs, kind):
proc = subprocess.Popen([sys.executable, '-c', 'pass'], with subprocess.Popen(
stdout=subprocess.PIPE, [sys.executable, '-c', 'pass'],
**kwargs) stdout=subprocess.PIPE,
stdout, stderr = proc.communicate() **kwargs
) as proc:
stdout, stderr = proc.communicate()
self.assertIsInstance(stdout, kind) self.assertIsInstance(stdout, kind)
self.assertIsNone(stderr) self.assertIsNone(stderr)
...@@ -382,7 +398,7 @@ class TestFDs(unittest.TestCase): ...@@ -382,7 +398,7 @@ class TestFDs(unittest.TestCase):
from_path.assert_called_once_with('/proc/self/fd', [7], 42) from_path.assert_called_once_with('/proc/self/fd', [7], 42)
class RunFuncTestCase(greentest.TestCase): class RunFuncTestCase(greentest.TestCase):
# Based on code from python 3.6 # Based on code from python 3.6+
__timeout__ = greentest.LARGE_TIMEOUT __timeout__ = greentest.LARGE_TIMEOUT
...@@ -486,6 +502,17 @@ class RunFuncTestCase(greentest.TestCase): ...@@ -486,6 +502,17 @@ class RunFuncTestCase(greentest.TestCase):
env=newenv) env=newenv)
self.assertEqual(cp.returncode, 33) self.assertEqual(cp.returncode, 33)
# This test _might_ wind up a bit fragile on loaded build+test machines
# as it depends on the timing with wide enough margins for normal situations
# but does assert that it happened "soon enough" to believe the right thing
# happened.
@greentest.skipOnWindows("requires posix like 'sleep' shell command")
def test_run_with_shell_timeout_and_capture_output(self):
#Output capturing after a timeout mustn't hang forever on open filehandles
with self.runs_in_given_time(0.1):
with self.assertRaises(subprocess.TimeoutExpired):
subprocess.run('sleep 3', shell=True, timeout=0.1,
capture_output=True) # New session unspecified.
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment