Commit e396c363 authored by Charles-François Natali's avatar Charles-François Natali

Merge.

parents 65708cf5 2955a0bf
......@@ -47,8 +47,9 @@ Run an event loop
Stop running the event loop.
Every callback scheduled before :meth:`stop` is called will run.
Callback scheduled after :meth:`stop` is called won't. However, those
callbacks will run if :meth:`run_forever` is called again later.
Callbacks scheduled after :meth:`stop` is called will not run.
However, those callbacks will run if :meth:`run_forever` is called
again later.
.. method:: BaseEventLoop.is_closed()
......@@ -58,13 +59,11 @@ Run an event loop
.. method:: BaseEventLoop.close()
Close the event loop. The loop should not be running.
Close the event loop. The loop must not be running.
This clears the queues and shuts down the executor, but does not wait for
the executor to finish.
The event loop must not be running.
This is idempotent and irreversible. No other methods should be called after
this one.
......
......@@ -459,7 +459,7 @@ The event loop is running twice. The
example to raise an exception if the server is not listening, instead of
having to write a short coroutine to handle the exception and stop the
running loop. At :meth:`~BaseEventLoop.run_until_complete` exit, the loop is
no more running, so there is no need to stop the loop in case of an error.
no longer running, so there is no need to stop the loop in case of an error.
Echo server
-----------
......
......@@ -261,7 +261,7 @@ Example combining a :class:`Future` and a :ref:`coroutine function
print(future.result())
loop.close()
The coroutine function is responsible of the computation (which takes 1 second)
The coroutine function is responsible for the computation (which takes 1 second)
and it stores the result into the future. The
:meth:`~BaseEventLoop.run_until_complete` method waits for the completion of
the future.
......
......@@ -39,7 +39,7 @@ Here is a more detailed list of the package contents:
you absolutely, positively have to use a library that makes blocking
I/O calls.
Table of content:
Table of contents:
.. toctree::
:maxdepth: 3
......@@ -55,6 +55,6 @@ Table of content:
.. seealso::
The :mod:`asyncio` module was designed in the :PEP:`3156`. For a
The :mod:`asyncio` module was designed in :PEP:`3156`. For a
motivational primer on transports and protocols, see :PEP:`3153`.
......@@ -216,6 +216,10 @@ any that have been added to the map during asynchronous service) is closed.
empty bytes object implies that the channel has been closed from the
other end.
Note that :meth:`recv` may raise :exc:`BlockingIOError` , even though
:func:`select.select` or :func:`select.poll` has reported the socket
ready for reading.
.. method:: listen(backlog)
......
......@@ -398,7 +398,7 @@ For example::
print(res.get(timeout=1)) # prints "100"
# make worker sleep for 10 secs
res = pool.apply_async(sleep, 10)
res = pool.apply_async(sleep, [10])
print(res.get(timeout=1)) # raises multiprocessing.TimeoutError
# exiting the 'with'-block has stopped the pool
......
This diff is collapsed.
......@@ -461,7 +461,7 @@ The :mod:`test.support` module defines the following functions:
.. function:: make_bad_fd()
Create an invalid file descriptor by opening and closing a temporary file,
and returning its descripor.
and returning its descriptor.
.. function:: import_module(name, deprecated=False)
......@@ -554,6 +554,21 @@ The :mod:`test.support` module defines the following functions:
run simultaneously, which is a problem for buildbots.
.. function:: load_package_tests(pkg_dir, loader, standard_tests, pattern)
Generic implementation of the :mod:`unittest` ``load_tests`` protocol for
use in test packages. *pkg_dir* is the root directory of the package;
*loader*, *standard_tests*, and *pattern* are the arguments expected by
``load_tests``. In simple cases, the test package's ``__init__.py``
can be the following::
import os
from test.support import load_package_tests
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
The :mod:`test.support` module defines the following classes:
.. class:: TransientResource(exc, **kwargs)
......
......@@ -115,6 +115,8 @@ class async_chat(asyncore.dispatcher):
try:
data = self.recv(self.ac_in_buffer_size)
except BlockingIOError:
return
except OSError as why:
self.handle_error()
return
......
......@@ -270,9 +270,9 @@ class BaseEventLoop(events.AbstractEventLoop):
def stop(self):
"""Stop running the event loop.
Every callback scheduled before stop() is called will run.
Callback scheduled after stop() is called won't. However,
those callbacks will run if run_*() is called again later.
Every callback scheduled before stop() is called will run. Callbacks
scheduled after stop() is called will not run. However, those callbacks
will run if run_forever is called again later.
"""
self.call_soon(_raise_stop_error)
......
......@@ -44,13 +44,9 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
if self._read_fut is not None:
ov = "pending" if self._read_fut.ov.pending else "completed"
info.append('read=%s' % ov)
info.append('read=%s' % self._read_fut)
if self._write_fut is not None:
if self._write_fut.ov.pending:
info.append("write=pending=%s" % self._pending_write)
else:
info.append("write=completed")
info.append("write=%r" % self._write_fut)
if self._buffer:
bufsize = len(self._buffer)
info.append('write_bufsize=%s' % bufsize)
......
......@@ -74,7 +74,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
# event loop running in another thread cannot add a signal
# handler.
signal.set_wakeup_fd(self._csock.fileno())
except ValueError as exc:
except (ValueError, OSError) as exc:
raise RuntimeError(str(exc))
handle = events.Handle(callback, args, self)
......@@ -93,7 +93,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
if not self._signal_handlers:
try:
signal.set_wakeup_fd(-1)
except ValueError as nexc:
except (ValueError, OSError) as nexc:
logger.info('set_wakeup_fd(-1) failed: %s', nexc)
if exc.errno == errno.EINVAL:
......@@ -138,7 +138,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
if not self._signal_handlers:
try:
signal.set_wakeup_fd(-1)
except ValueError as exc:
except (ValueError, OSError) as exc:
logger.info('set_wakeup_fd(-1) failed: %s', exc)
return True
......
......@@ -38,42 +38,85 @@ class _OverlappedFuture(futures.Future):
def __init__(self, ov, *, loop=None):
super().__init__(loop=loop)
self.ov = ov
if self._source_traceback:
del self._source_traceback[-1]
self._ov = ov
def __repr__(self):
info = [self._state.lower()]
if self.ov.pending:
info.append('overlapped=pending')
else:
info.append('overlapped=completed')
if self._ov is not None:
state = 'pending' if self._ov.pending else 'completed'
info.append('overlapped=<%s, %#x>' % (state, self._ov.address))
if self._state == futures._FINISHED:
info.append(self._format_result())
if self._callbacks:
info.append(self._format_callbacks())
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
def cancel(self):
def _cancel_overlapped(self):
if self._ov is None:
return
try:
self.ov.cancel()
except OSError:
pass
self._ov.cancel()
except OSError as exc:
context = {
'message': 'Cancelling an overlapped future failed',
'exception': exc,
'future': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self._ov = None
def cancel(self):
self._cancel_overlapped()
return super().cancel()
def set_exception(self, exception):
super().set_exception(exception)
self._cancel_overlapped()
class _WaitHandleFuture(futures.Future):
"""Subclass of Future which represents a wait handle."""
def __init__(self, wait_handle, *, loop=None):
def __init__(self, handle, wait_handle, *, loop=None):
super().__init__(loop=loop)
self._handle = handle
self._wait_handle = wait_handle
def cancel(self):
super().cancel()
def _poll(self):
# non-blocking wait: use a timeout of 0 millisecond
return (_winapi.WaitForSingleObject(self._handle, 0) ==
_winapi.WAIT_OBJECT_0)
def __repr__(self):
info = [self._state.lower()]
if self._wait_handle:
state = 'pending' if self._poll() else 'completed'
info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle))
info.append('handle=<%#x>' % self._handle)
if self._state == futures._FINISHED:
info.append(self._format_result())
if self._callbacks:
info.append(self._format_callbacks())
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
def _unregister(self):
if self._wait_handle is None:
return
try:
_overlapped.UnregisterWait(self._wait_handle)
except OSError as e:
if e.winerror != _overlapped.ERROR_IO_PENDING:
raise
# ERROR_IO_PENDING is not an error, the wait was unregistered
self._wait_handle = None
def cancel(self):
self._unregister()
return super().cancel()
class PipeServer(object):
......@@ -208,6 +251,11 @@ class IocpProactor:
self._registered = weakref.WeakSet()
self._stopped_serving = weakref.WeakSet()
def __repr__(self):
return ('<%s overlapped#=%s result#=%s>'
% (self.__class__.__name__, len(self._cache),
len(self._results)))
def set_loop(self, loop):
self._loop = loop
......@@ -350,23 +398,19 @@ class IocpProactor:
ov = _overlapped.Overlapped(NULL)
wh = _overlapped.RegisterWaitWithQueue(
handle, self._iocp, ov.address, ms)
f = _WaitHandleFuture(wh, loop=self._loop)
f = _WaitHandleFuture(handle, wh, loop=self._loop)
def finish_wait_for_handle(trans, key, ov):
if not f.cancelled():
try:
_overlapped.UnregisterWait(wh)
except OSError as e:
if e.winerror != _overlapped.ERROR_IO_PENDING:
raise
# Note that this second wait means that we should only use
# this with handles types where a successful wait has no
# effect. So events or processes are all right, but locks
# or semaphores are not. Also note if the handle is
# signalled and then quickly reset, then we may return
# False even though we have not timed out.
return (_winapi.WaitForSingleObject(handle, 0) ==
_winapi.WAIT_OBJECT_0)
try:
return f._poll()
finally:
f._unregister()
self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
return f
......@@ -455,7 +499,7 @@ class IocpProactor:
def close(self):
# Cancel remaining registered operations.
for address, (f, ov, obj, callback) in list(self._cache.items()):
for address, (fut, ov, obj, callback) in list(self._cache.items()):
if obj is None:
# The operation was started with connect_pipe() which
# queues a task to Windows' thread pool. This cannot
......@@ -463,9 +507,17 @@ class IocpProactor:
del self._cache[address]
else:
try:
ov.cancel()
except OSError:
pass
fut.cancel()
except OSError as exc:
if self._loop is not None:
context = {
'message': 'Cancelling a future failed',
'exception': exc,
'future': fut,
}
if fut._source_traceback:
context['source_traceback'] = fut._source_traceback
self._loop.call_exception_handler(context)
while self._cache:
if not self._poll(1):
......@@ -476,6 +528,9 @@ class IocpProactor:
_winapi.CloseHandle(self._iocp)
self._iocp = None
def __del__(self):
self.close()
class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
......
......@@ -42,9 +42,9 @@ class PythonAPITestCase(unittest.TestCase):
# This test is unreliable, because it is possible that code in
# unittest changes the refcount of the '42' integer. So, it
# is disabled by default.
@requires("refcount")
@support.refcount_test
def test_PyLong_Long(self):
requires("refcount")
ref42 = grc(42)
pythonapi.PyLong_FromLong.restype = py_object
self.assertEqual(pythonapi.PyLong_FromLong(42), 42)
......
......@@ -38,8 +38,11 @@ class WindowsTestCase(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", 'Windows-specific test')
class FunctionCallTestCase(unittest.TestCase):
@requires("SEH")
@unittest.skipUnless('MSC' in sys.version, "SEH only supported by MSC")
@unittest.skipIf(sys.executable.endswith('_d.exe'),
"SEH not enabled in debug builds")
def test_SEH(self):
requires("SEH")
# Call functions with invalid arguments, and make sure
# that access violations are trapped and raise an
# exception.
......
......@@ -984,18 +984,16 @@ def load(fp, *, fmt=None, use_builtin_types=True, dict_type=dict):
fp.seek(0)
for info in _FORMATS.values():
if info['detect'](header):
p = info['parser'](
use_builtin_types=use_builtin_types,
dict_type=dict_type,
)
P = info['parser']
break
else:
raise InvalidFileException()
else:
p = _FORMATS[fmt]['parser'](use_builtin_types=use_builtin_types)
P = _FORMATS[fmt]['parser']
p = P(use_builtin_types=use_builtin_types, dict_type=dict_type)
return p.parse(fp)
......
......@@ -85,7 +85,7 @@ __all__ = [
"skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
"bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
"requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
"anticipate_failure",
"anticipate_failure", "load_package_tests",
# sys
"is_jython", "check_impl_detail",
# network
......@@ -188,6 +188,25 @@ def anticipate_failure(condition):
return unittest.expectedFailure
return lambda f: f
def load_package_tests(pkg_dir, loader, standard_tests, pattern):
"""Generic load_tests implementation for simple test packages.
Most packages can implement load_tests using this function as follows:
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
"""
if pattern is None:
pattern = "test*"
top_dir = os.path.dirname( # Lib
os.path.dirname( # test
os.path.dirname(__file__))) # support
package_tests = loader.discover(start_dir=pkg_dir,
top_level_dir=top_dir,
pattern=pattern)
standard_tests.addTests(package_tests)
return standard_tests
def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
"""Import and return a module, deliberately bypassing sys.modules.
......
......@@ -7,10 +7,12 @@ thread = support.import_module('_thread')
import asynchat
import asyncore
import errno
import socket
import sys
import time
import unittest
import unittest.mock
try:
import threading
except ImportError:
......@@ -273,6 +275,21 @@ class TestAsynchat_WithPoll(TestAsynchat):
usepoll = True
class TestAsynchatMocked(unittest.TestCase):
def test_blockingioerror(self):
# Issue #16133: handle_read() must ignore BlockingIOError
sock = unittest.mock.Mock()
sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
dispatcher = asynchat.async_chat()
dispatcher.set_socket(sock)
self.addCleanup(dispatcher.del_channel)
with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
dispatcher.handle_read()
self.assertFalse(error.called)
class TestHelperFunctions(unittest.TestCase):
def test_find_prefix_at_end(self):
self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
......
import os
import sys
import unittest
from test.support import run_unittest, import_module
from test.support import load_package_tests, import_module
# Skip tests if we don't have threading.
import_module('threading')
# Skip tests if we don't have concurrent.futures.
import_module('concurrent.futures')
def suite():
tests = unittest.TestSuite()
loader = unittest.TestLoader()
for fn in os.listdir(os.path.dirname(__file__)):
if fn.startswith("test") and fn.endswith(".py"):
mod_name = 'test.test_asyncio.' + fn[:-3]
try:
__import__(mod_name)
except unittest.SkipTest:
pass
else:
mod = sys.modules[mod_name]
tests.addTests(loader.loadTestsFromModule(mod))
return tests
def test_main():
run_unittest(suite())
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
from . import test_main
from . import load_tests
import unittest
if __name__ == '__main__':
test_main()
unittest.main()
......@@ -672,6 +672,8 @@ class SelectorTransportTests(test_utils.TestCase):
def test_connection_lost(self):
exc = OSError()
tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
self.assertIsNotNone(tr._protocol)
self.assertIsNotNone(tr._loop)
tr._call_connection_lost(exc)
self.protocol.connection_lost.assert_called_with(exc)
......@@ -679,8 +681,6 @@ class SelectorTransportTests(test_utils.TestCase):
self.assertIsNone(tr._sock)
self.assertIsNone(tr._protocol)
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
......
......@@ -42,7 +42,7 @@ class SubprocessMixin:
return (exitcode, data)
task = run(b'some data')
task = asyncio.wait_for(task, 10.0, loop=self.loop)
task = asyncio.wait_for(task, 60.0, loop=self.loop)
exitcode, stdout = self.loop.run_until_complete(task)
self.assertEqual(exitcode, 0)
self.assertEqual(stdout, b'some data')
......@@ -61,7 +61,7 @@ class SubprocessMixin:
return proc.returncode, stdout
task = run(b'some data')
task = asyncio.wait_for(task, 10.0, loop=self.loop)
task = asyncio.wait_for(task, 60.0, loop=self.loop)
exitcode, stdout = self.loop.run_until_complete(task)
self.assertEqual(exitcode, 0)
self.assertEqual(stdout, b'some data')
......
......@@ -435,6 +435,8 @@ class UnixReadPipeTransportTests(test_utils.TestCase):
def test__call_connection_lost(self):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
self.assertIsNotNone(tr._protocol)
self.assertIsNotNone(tr._loop)
err = None
tr._call_connection_lost(err)
......@@ -442,13 +444,13 @@ class UnixReadPipeTransportTests(test_utils.TestCase):
self.pipe.close.assert_called_with()
self.assertIsNone(tr._protocol)
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
def test__call_connection_lost_with_err(self):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
self.assertIsNotNone(tr._protocol)
self.assertIsNotNone(tr._loop)
err = OSError()
tr._call_connection_lost(err)
......@@ -456,9 +458,6 @@ class UnixReadPipeTransportTests(test_utils.TestCase):
self.pipe.close.assert_called_with()
self.assertIsNone(tr._protocol)
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
......@@ -717,6 +716,8 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
def test__call_connection_lost(self):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
self.assertIsNotNone(tr._protocol)
self.assertIsNotNone(tr._loop)
err = None
tr._call_connection_lost(err)
......@@ -724,13 +725,13 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
self.pipe.close.assert_called_with()
self.assertIsNone(tr._protocol)
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
def test__call_connection_lost_with_err(self):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
self.assertIsNotNone(tr._protocol)
self.assertIsNotNone(tr._loop)
err = OSError()
tr._call_connection_lost(err)
......@@ -738,8 +739,6 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
self.pipe.close.assert_called_with()
self.assertIsNone(tr._protocol)
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
def test_close(self):
......
......@@ -94,38 +94,48 @@ class ProactorTests(test_utils.TestCase):
event = _overlapped.CreateEvent(None, True, False, None)
self.addCleanup(_winapi.CloseHandle, event)
# Wait for unset event with 0.2s timeout;
# Wait for unset event with 0.5s timeout;
# result should be False at timeout
f = self.loop._proactor.wait_for_handle(event, 0.2)
fut = self.loop._proactor.wait_for_handle(event, 0.5)
start = self.loop.time()
self.loop.run_until_complete(f)
self.loop.run_until_complete(fut)
elapsed = self.loop.time() - start
self.assertFalse(f.result())
self.assertTrue(0.18 < elapsed < 0.9, elapsed)
self.assertFalse(fut.result())
self.assertTrue(0.48 < elapsed < 0.9, elapsed)
_overlapped.SetEvent(event)
# Wait for for set event;
# result should be True immediately
f = self.loop._proactor.wait_for_handle(event, 10)
fut = self.loop._proactor.wait_for_handle(event, 10)
start = self.loop.time()
self.loop.run_until_complete(f)
self.loop.run_until_complete(fut)
elapsed = self.loop.time() - start
self.assertTrue(f.result())
self.assertTrue(0 <= elapsed < 0.1, elapsed)
self.assertTrue(fut.result())
self.assertTrue(0 <= elapsed < 0.3, elapsed)
_overlapped.ResetEvent(event)
# Tulip issue #195: cancelling a done _WaitHandleFuture must not crash
fut.cancel()
def test_wait_for_handle_cancel(self):
event = _overlapped.CreateEvent(None, True, False, None)
self.addCleanup(_winapi.CloseHandle, event)
# Wait for unset event with a cancelled future;
# CancelledError should be raised immediately
f = self.loop._proactor.wait_for_handle(event, 10)
f.cancel()
fut = self.loop._proactor.wait_for_handle(event, 10)
fut.cancel()
start = self.loop.time()
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(f)
self.loop.run_until_complete(fut)
elapsed = self.loop.time() - start
self.assertTrue(0 <= elapsed < 0.1, elapsed)
# Tulip issue #195: cancelling a _WaitHandleFuture twice must not crash
fut = self.loop._proactor.wait_for_handle(event)
fut.cancel()
fut.cancel()
if __name__ == '__main__':
unittest.main()
import os
import sys
import unittest
import test.support
import collections
import email
from email.message import Message
from email._policybase import compat32
from test.support import load_package_tests
from test.test_email import __file__ as landmark
# Run all tests in package for '-m unittest test.test_email'
def load_tests(loader, standard_tests, pattern):
this_dir = os.path.dirname(__file__)
if pattern is None:
pattern = "test*"
package_tests = loader.discover(start_dir=this_dir, pattern=pattern)
standard_tests.addTests(package_tests)
return standard_tests
# used by regrtest and __main__.
def test_main():
here = os.path.dirname(__file__)
# Unittest mucks with the path, so we have to save and restore
# it to keep regrtest happy.
savepath = sys.path[:]
test.support._run_suite(unittest.defaultTestLoader.discover(here))
sys.path[:] = savepath
# Load all tests in package
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
# helper code used by a number of test modules.
......
from test.test_email import test_main
from test.test_email import load_tests
import unittest
test_main()
unittest.main()
......@@ -77,7 +77,7 @@ class GettextBaseTest(unittest.TestCase):
def tearDown(self):
self.env.__exit__()
del self.env
shutil.rmtree(os.path.split(LOCALEDIR)[0])
support.rmtree(os.path.split(LOCALEDIR)[0])
class GettextTestCase1(GettextBaseTest):
......
import os
import sys
from test import support
import unittest
from test.support import load_package_tests
def test_suite(package=__package__, directory=os.path.dirname(__file__)):
suite = unittest.TestSuite()
for name in os.listdir(directory):
if name.startswith(('.', '__')):
continue
path = os.path.join(directory, name)
if (os.path.isfile(path) and name.startswith('test_') and
name.endswith('.py')):
submodule_name = os.path.splitext(name)[0]
module_name = "{0}.{1}".format(package, submodule_name)
__import__(module_name, level=0)
module_tests = unittest.findTestCases(sys.modules[module_name])
suite.addTest(module_tests)
elif os.path.isdir(path):
package_name = "{0}.{1}".format(package, name)
__import__(package_name, level=0)
package_tests = getattr(sys.modules[package_name], 'test_suite')()
suite.addTest(package_tests)
else:
continue
return suite
def test_main():
start_dir = os.path.dirname(__file__)
top_dir = os.path.dirname(os.path.dirname(start_dir))
test_loader = unittest.TestLoader()
support.run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir))
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
"""Run importlib's test suite.
from . import load_tests
import unittest
Specifying the ``--builtin`` flag will run tests, where applicable, with
builtins.__import__ instead of importlib.__import__.
"""
if __name__ == '__main__':
from . import test_main
test_main()
unittest.main()
from .. import test_suite
import os
from test.support import load_package_tests
def test_suite():
directory = os.path.dirname(__file__)
return test_suite('importlib.test.builtin', directory)
if __name__ == '__main__':
from test.support import run_unittest
run_unittest(test_suite())
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
from . import load_tests
import unittest
unittest.main()
from .. import test_suite
import os.path
import unittest
import os
from test.support import load_package_tests
def test_suite():
directory = os.path.dirname(__file__)
return test_suite('importlib.test.extension', directory)
if __name__ == '__main__':
from test.support import run_unittest
run_unittest(test_suite())
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
from . import load_tests
import unittest
unittest.main()
from .. import test_suite
import os.path
import unittest
import os
from test.support import load_package_tests
def test_suite():
directory = os.path.dirname(__file__)
return test_suite('importlib.test.frozen', directory)
if __name__ == '__main__':
from test.support import run_unittest
run_unittest(test_suite())
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
from . import load_tests
import unittest
unittest.main()
from .. import test_suite
import os.path
import unittest
import os
from test.support import load_package_tests
def test_suite():
directory = os.path.dirname(__file__)
return test_suite('importlib.test.import_', directory)
if __name__ == '__main__':
from test.support import run_unittest
run_unittest(test_suite())
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
from . import load_tests
import unittest
unittest.main()
from .. import test_suite
import os.path
import unittest
import os
from test.support import load_package_tests
def test_suite():
directory = os.path.dirname(__file__)
return test.test_suite('importlib.test.source', directory)
if __name__ == '__main__':
from test.support import run_unittest
run_unittest(test_suite())
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
from . import load_tests
import unittest
unittest.main()
......@@ -42,23 +42,12 @@ class TestCTest(CTest):
'_json')
here = os.path.dirname(__file__)
def load_tests(*args):
suite = additional_tests()
loader = unittest.TestLoader()
for fn in os.listdir(here):
if fn.startswith("test") and fn.endswith(".py"):
modname = "test.test_json." + fn[:-3]
__import__(modname)
module = sys.modules[modname]
suite.addTests(loader.loadTestsFromModule(module))
return suite
def additional_tests():
def load_tests(loader, _, pattern):
suite = unittest.TestSuite()
for mod in (json, json.encoder, json.decoder):
suite.addTest(doctest.DocTestSuite(mod))
suite.addTest(TestPyTest('test_pyjson'))
suite.addTest(TestCTest('test_cjson'))
return suite
pkg_dir = os.path.dirname(__file__)
return support.load_package_tests(pkg_dir, loader, suite, pattern)
......@@ -207,6 +207,9 @@ class TestPlistlib(unittest.TestCase):
for fmt in ALL_FORMATS:
with self.subTest(fmt=fmt):
pl = self._create(fmt=fmt)
pl2 = plistlib.loads(TESTDATA[fmt], fmt=fmt)
self.assertEqual(dict(pl), dict(pl2),
"generated data was not identical to Apple's output")
pl2 = plistlib.loads(TESTDATA[fmt])
self.assertEqual(dict(pl), dict(pl2),
"generated data was not identical to Apple's output")
......@@ -217,6 +220,8 @@ class TestPlistlib(unittest.TestCase):
b = BytesIO()
pl = self._create(fmt=fmt)
plistlib.dump(pl, b, fmt=fmt)
pl2 = plistlib.load(BytesIO(b.getvalue()), fmt=fmt)
self.assertEqual(dict(pl), dict(pl2))
pl2 = plistlib.load(BytesIO(b.getvalue()))
self.assertEqual(dict(pl), dict(pl2))
......
"""
Very minimal unittests for parts of the readline module.
These tests were added to check that the libedit emulation on OSX and
the "real" readline have the same interface for history manipulation. That's
why the tests cover only a small subset of the interface.
"""
import os
import unittest
from test.support import run_unittest, import_module
from test.script_helper import assert_python_ok
# Skip tests if there is no readline module
readline = import_module('readline')
class TestHistoryManipulation (unittest.TestCase):
"""
These tests were added to check that the libedit emulation on OSX and the
"real" readline have the same interface for history manipulation. That's
why the tests cover only a small subset of the interface.
"""
@unittest.skipIf(not hasattr(readline, 'clear_history'),
"The history update test cannot be run because the "
......@@ -40,8 +43,18 @@ class TestHistoryManipulation (unittest.TestCase):
self.assertEqual(readline.get_current_history_length(), 1)
class TestReadline(unittest.TestCase):
def test_init(self):
# Issue #19884: Ensure that the ANSI sequence "\033[1034h" is not
# written into stdout when the readline module is imported and stdout
# is redirected to a pipe.
rc, stdout, stderr = assert_python_ok('-c', 'import readline',
TERM='xterm-256color')
self.assertEqual(stdout, b'')
def test_main():
run_unittest(TestHistoryManipulation)
run_unittest(TestHistoryManipulation, TestReadline)
if __name__ == "__main__":
test_main()
......@@ -21,11 +21,5 @@ def import_tool(toolname):
with support.DirsOnSysPath(scriptsdir):
return importlib.import_module(toolname)
def load_tests(loader, standard_tests, pattern):
this_dir = os.path.dirname(__file__)
if pattern is None:
pattern = "test*"
with support.DirsOnSysPath():
package_tests = loader.discover(start_dir=this_dir, pattern=pattern)
standard_tests.addTests(package_tests)
return standard_tests
def load_tests(*args):
return support.load_package_tests(os.path.dirname(__file__), *args)
......@@ -421,7 +421,10 @@ class Misc:
+ _flatten(args) + _flatten(list(kw.items())))
def tk_menuBar(self, *args):
"""Do not use. Needed in Tk 3.6 and earlier."""
pass # obsolete since Tk 4.0
# obsolete since Tk 4.0
import warnings
warnings.warn('tk_menuBar() does nothing and will be removed in 3.6',
DeprecationWarning, stacklevel=2)
def wait_variable(self, name='PY_VAR'):
"""Wait until the variable is modified.
......@@ -2674,7 +2677,11 @@ class Menu(Widget):
selectcolor, takefocus, tearoff, tearoffcommand, title, type."""
Widget.__init__(self, master, 'menu', cnf, kw)
def tk_bindForTraversal(self):
pass # obsolete since Tk 4.0
# obsolete since Tk 4.0
import warnings
warnings.warn('tk_bindForTraversal() does nothing and '
'will be removed in 3.6',
DeprecationWarning, stacklevel=2)
def tk_mbPost(self):
self.tk.call('tk_mbPost', self._w)
def tk_mbUnpost(self):
......
......@@ -916,6 +916,24 @@ class ScrollbarTest(AbstractWidgetTest, unittest.TestCase):
self.checkEnumParam(widget, 'orient', 'vertical', 'horizontal',
errmsg='bad orientation "{}": must be vertical or horizontal')
def test_activate(self):
sb = self.create()
for e in ('arrow1', 'slider', 'arrow2'):
sb.activate(e)
sb.activate('')
self.assertRaises(TypeError, sb.activate)
self.assertRaises(TypeError, sb.activate, 'arrow1', 'arrow2')
def test_set(self):
sb = self.create()
sb.set(0.2, 0.4)
self.assertEqual(sb.get(), (0.2, 0.4))
self.assertRaises(TclError, sb.set, 'abc', 'def')
self.assertRaises(TclError, sb.set, 0.6, 'def')
self.assertRaises(TclError, sb.set, 0.6, None)
self.assertRaises(TclError, sb.set, 0.6)
self.assertRaises(TclError, sb.set, 0.6, 0.7, 0.8)
@add_standard_options(StandardOptionsTests)
class PanedWindowTest(AbstractWidgetTest, unittest.TestCase):
......
This diff is collapsed.
......@@ -662,6 +662,7 @@ Kurt B. Kaiser
Tamito Kajiyama
Jan Kaliszewski
Peter van Kampen
Jan Kanis
Rafe Kaplan
Jacob Kaplan-Moss
Janne Karila
......@@ -997,7 +998,6 @@ Mike Pall
Todd R. Palmer
Juan David Ibáñez Palomar
Jan Palus
Martin Panter
Mathias Panzenböck
M. Papillon
Peter Parente
......
......@@ -27,8 +27,19 @@ Core and Builtins
Library
-------
- Issue #16133: The asynchat.async_chat.handle_read() method now ignores
BlockingIOError exceptions.
- Issue #19884: readline: Disable the meta modifier key if stdout is not
a terminal to not write the ANSI sequence "\033[1034h" into stdout. This
sequence is used on some terminal (ex: TERM=xterm-256color") to enable
support of 8 bit characters.
- Issue #21888: plistlib's load() and loads() now work if the fmt parameter is
specified.
- Issue #21044: tarfile.open() now handles fileobj with an integer 'name'
attribute. Based on patch by Martin Panter.
attribute. Based on patch by Antoine Pietri.
- Issue #21867: Prevent turtle crash due to invalid undo buffer size.
......@@ -206,6 +217,10 @@ IDLE
Tests
-----
- Issue #22002: Added ``load_package_tests`` function to test.support and used
it to implement/augment test discovery in test_asyncio, test_email,
test_importlib, test_json, and test_tools.
- Issue #21976: Fix test_ssl to accept LibreSSL version strings. Thanks
to William Orr.
......
......@@ -1019,6 +1019,21 @@ setup_readline(readlinestate *mod_state)
mod_state->begidx = PyLong_FromLong(0L);
mod_state->endidx = PyLong_FromLong(0L);
#ifndef __APPLE__
if (!isatty(STDOUT_FILENO)) {
/* Issue #19884: stdout is no a terminal. Disable meta modifier
keys to not write the ANSI sequence "\033[1034h" into stdout. On
terminals supporting 8 bit characters like TERM=xterm-256color
(which is now the default Fedora since Fedora 18), the meta key is
used to enable support of 8 bit characters (ANSI sequence
"\033[1034h").
With libedit, this call makes readline() crash. */
rl_variable_bind ("enable-meta-key", "off");
}
#endif
/* Initialize (allows .inputrc to override)
*
* XXX: A bug in the readline-2.2 library causes a memory leak
......
bits shared by the stringobject and unicodeobject implementations (and
bits shared by the bytesobject and unicodeobject implementations (and
possibly other modules, in a not too distant future).
the stuff in here is included into relevant places; see the individual
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment