Commit 8f404296 authored by Yury Selivanov's avatar Yury Selivanov Committed by GitHub

bpo-33792: Add selector and proactor windows policies (GH-7487)

parent 52698c7a
......@@ -733,6 +733,10 @@ include:
* Exceptions occurring in cancelled tasks are no longer logged.
(Contributed by Yury Selivanov in :issue:`30508`.)
* New ``WindowsSelectorEventLoopPolicy`` and
``WindowsProactorEventLoopPolicy`` classes.
(Contributed by Yury Selivanov in :issue:`33792`.)
Several ``asyncio`` APIs have been
:ref:`deprecated <whatsnew37-asyncio-deprecated>`.
......
......@@ -21,7 +21,8 @@ from .log import logger
__all__ = (
'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
'DefaultEventLoopPolicy',
'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
'WindowsProactorEventLoopPolicy',
)
......@@ -801,8 +802,12 @@ class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
SelectorEventLoop = _WindowsSelectorEventLoop
class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = SelectorEventLoop
DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = ProactorEventLoop
DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
......@@ -166,5 +166,36 @@ class ProactorTests(test_utils.TestCase):
fut.cancel()
class WinPolicyTests(test_utils.TestCase):
def test_selector_win_policy(self):
async def main():
self.assertIsInstance(
asyncio.get_running_loop(),
asyncio.SelectorEventLoop)
old_policy = asyncio.get_event_loop_policy()
try:
asyncio.set_event_loop_policy(
asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
finally:
asyncio.set_event_loop_policy(old_policy)
def test_proactor_win_policy(self):
async def main():
self.assertIsInstance(
asyncio.get_running_loop(),
asyncio.ProactorEventLoop)
old_policy = asyncio.get_event_loop_policy()
try:
asyncio.set_event_loop_policy(
asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(main())
finally:
asyncio.set_event_loop_policy(old_policy)
if __name__ == '__main__':
unittest.main()
Add asyncio.WindowsSelectorEventLoopPolicy and
asyncio.WindowsProactorEventLoopPolicy.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment