Commit 73520d57 authored by Giampaolo Rodola's avatar Giampaolo Rodola

Fix #8684: make sched.scheduler class thread-safe

parent a23d65cc
......@@ -27,6 +27,9 @@ scheduler:
.. versionchanged:: 3.3
*timefunc* and *delayfunc* parameters are optional.
.. versionchanged:: 3.3
:class:`scheduler` class can be safely used in multi-threaded
environments.
Example::
......@@ -47,33 +50,6 @@ Example::
From print_time 930343700.273
930343700.276
In multi-threaded environments, the :class:`scheduler` class has limitations
with respect to thread-safety, inability to insert a new task before
the one currently pending in a running scheduler, and holding up the main
thread until the event queue is empty. Instead, the preferred approach
is to use the :class:`threading.Timer` class instead.
Example::
>>> import time
>>> from threading import Timer
>>> def print_time():
... print("From print_time", time.time())
...
>>> def print_some_times():
... print(time.time())
... Timer(5, print_time, ()).start()
... Timer(10, print_time, ()).start()
... time.sleep(11) # sleep while time-delay events execute
... print(time.time())
...
>>> print_some_times()
930343690.257
From print_time 930343695.274
From print_time 930343700.273
930343701.301
.. _scheduler-objects:
Scheduler Objects
......
......@@ -662,6 +662,10 @@ should be used. For example, this will send a ``'HEAD'`` request::
sched
-----
* :class:`~sched.scheduler` class can now be safely used in multi-threaded
environments. (Contributed by Josiah Carlson and Giampaolo Rodolà in
:issue:`8684`)
* *timefunc* and *delayfunct* parameters of :class:`~sched.scheduler` class
constructor are now optional and defaults to :func:`time.time` and
:func:`time.sleep` respectively. (Contributed by Chris Clark in
......
......@@ -30,6 +30,7 @@ has another way to reference private data (besides global variables).
import time
import heapq
import threading
from collections import namedtuple
__all__ = ["scheduler"]
......@@ -48,6 +49,7 @@ class scheduler:
"""Initialize a new instance, passing the time and delay
functions"""
self._queue = []
self._lock = threading.RLock()
self.timefunc = timefunc
self.delayfunc = delayfunc
......@@ -58,9 +60,10 @@ class scheduler:
if necessary.
"""
event = Event(time, priority, action, argument, kwargs)
heapq.heappush(self._queue, event)
return event # The ID
with self._lock:
event = Event(time, priority, action, argument, kwargs)
heapq.heappush(self._queue, event)
return event # The ID
def enter(self, delay, priority, action, argument=[], kwargs={}):
"""A variant that specifies the time as a relative time.
......@@ -68,8 +71,9 @@ class scheduler:
This is actually the more commonly used interface.
"""
time = self.timefunc() + delay
return self.enterabs(time, priority, action, argument, kwargs)
with self._lock:
time = self.timefunc() + delay
return self.enterabs(time, priority, action, argument, kwargs)
def cancel(self, event):
"""Remove an event from the queue.
......@@ -78,12 +82,14 @@ class scheduler:
If the event is not in the queue, this raises ValueError.
"""
self._queue.remove(event)
heapq.heapify(self._queue)
with self._lock:
self._queue.remove(event)
heapq.heapify(self._queue)
def empty(self):
"""Check whether the queue is empty."""
return not self._queue
with self._lock:
return not self._queue
def run(self):
"""Execute events until the queue is empty.
......@@ -108,24 +114,25 @@ class scheduler:
"""
# localize variable access to minimize overhead
# and to improve thread safety
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
while q:
time, priority, action, argument, kwargs = checked_event = q[0]
now = timefunc()
if now < time:
delayfunc(time - now)
else:
event = pop(q)
# Verify that the event was not removed or altered
# by another thread after we last looked at q[0].
if event is checked_event:
action(*argument, **kwargs)
delayfunc(0) # Let other threads run
with self._lock:
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
while q:
time, priority, action, argument, kwargs = checked_event = q[0]
now = timefunc()
if now < time:
delayfunc(time - now)
else:
heapq.heappush(q, event)
event = pop(q)
# Verify that the event was not removed or altered
# by another thread after we last looked at q[0].
if event is checked_event:
action(*argument, **kwargs)
delayfunc(0) # Let other threads run
else:
heapq.heappush(q, event)
@property
def queue(self):
......@@ -138,5 +145,6 @@ class scheduler:
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
events = self._queue[:]
return map(heapq.heappop, [events]*len(events))
with self._lock:
events = self._queue[:]
return map(heapq.heappop, [events]*len(events))
......@@ -409,6 +409,9 @@ Core and Builtins
Library
-------
- Issue #8684 sched.scheduler class can be safely used in multi-threaded
environments.
- Alias resource.error to OSError ala PEP 3151.
- Issue #5689: Add support for lzma compression to the tarfile module.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment