Commit f2b9cf4e authored by Serhiy Storchaka's avatar Serhiy Storchaka

Issue #16165: Fix sched.scheduler.run() method was block a scheduler for

other threads.
parent c04957bf
...@@ -128,27 +128,29 @@ class scheduler: ...@@ -128,27 +128,29 @@ class scheduler:
""" """
# localize variable access to minimize overhead # localize variable access to minimize overhead
# and to improve thread safety # and to improve thread safety
with self._lock: lock = self._lock
q = self._queue q = self._queue
delayfunc = self.delayfunc delayfunc = self.delayfunc
timefunc = self.timefunc timefunc = self.timefunc
pop = heapq.heappop pop = heapq.heappop
while q: while True:
time, priority, action, argument, kwargs = checked_event = q[0] with lock:
if not q:
break
time, priority, action, argument, kwargs = q[0]
now = timefunc() now = timefunc()
if now < time: if time > now:
delay = True
else:
delay = False
pop(q)
if delay:
if not blocking: if not blocking:
return time - now return time - now
delayfunc(time - now) delayfunc(time - now)
else: else:
event = pop(q)
# Verify that the event was not removed or altered
# by another thread after we last looked at q[0].
if event is checked_event:
action(*argument, **kwargs) action(*argument, **kwargs)
delayfunc(0) # Let other threads run delayfunc(0) # Let other threads run
else:
heapq.heappush(q, event)
@property @property
def queue(self): def queue(self):
......
...@@ -4,7 +4,10 @@ import sched ...@@ -4,7 +4,10 @@ import sched
import time import time
import unittest import unittest
from test import support from test import support
try:
import threading
except ImportError:
threading = None
class TestCase(unittest.TestCase): class TestCase(unittest.TestCase):
...@@ -26,6 +29,20 @@ class TestCase(unittest.TestCase): ...@@ -26,6 +29,20 @@ class TestCase(unittest.TestCase):
scheduler.run() scheduler.run()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_enter_concurrent(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enter(0.03, 1, fun, (0.03,))
t = threading.Thread(target=scheduler.run)
t.start()
for x in [0.05, 0.04, 0.02, 0.01]:
z = scheduler.enter(x, 1, fun, (x,))
scheduler.run()
t.join()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
def test_priority(self): def test_priority(self):
l = [] l = []
fun = lambda x: l.append(x) fun = lambda x: l.append(x)
...@@ -50,6 +67,24 @@ class TestCase(unittest.TestCase): ...@@ -50,6 +67,24 @@ class TestCase(unittest.TestCase):
scheduler.run() scheduler.run()
self.assertEqual(l, [0.02, 0.03, 0.04]) self.assertEqual(l, [0.02, 0.03, 0.04])
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_cancel_concurrent(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
now = time.time()
event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
t = threading.Thread(target=scheduler.run)
t.start()
scheduler.cancel(event1)
scheduler.cancel(event5)
t.join()
self.assertEqual(l, [0.02, 0.03, 0.04])
def test_empty(self): def test_empty(self):
l = [] l = []
fun = lambda x: l.append(x) fun = lambda x: l.append(x)
......
...@@ -124,6 +124,9 @@ Core and Builtins ...@@ -124,6 +124,9 @@ Core and Builtins
Library Library
------- -------
- Issue #16165: Fix sched.scheduler.run() method was block a scheduler for
other threads.
- Issue #16641: Fix default values of sched.scheduler.enter arguments were - Issue #16641: Fix default values of sched.scheduler.enter arguments were
modifiable. modifiable.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment