Commit 692130a2 authored by Richard Oudkerk's avatar Richard Oudkerk

Issue #12091: simplify ApplyResult and MapResult with threading.Event

Patch by Charles-François Natali
parent be39cfc9
...@@ -526,32 +526,26 @@ class Pool(object): ...@@ -526,32 +526,26 @@ class Pool(object):
class ApplyResult(object): class ApplyResult(object):
def __init__(self, cache, callback, error_callback): def __init__(self, cache, callback, error_callback):
self._cond = threading.Condition(threading.Lock()) self._event = threading.Event()
self._job = next(job_counter) self._job = next(job_counter)
self._cache = cache self._cache = cache
self._ready = False
self._callback = callback self._callback = callback
self._error_callback = error_callback self._error_callback = error_callback
cache[self._job] = self cache[self._job] = self
def ready(self): def ready(self):
return self._ready return self._event.is_set()
def successful(self): def successful(self):
assert self._ready assert self.ready()
return self._success return self._success
def wait(self, timeout=None): def wait(self, timeout=None):
self._cond.acquire() self._event.wait(timeout)
try:
if not self._ready:
self._cond.wait(timeout)
finally:
self._cond.release()
def get(self, timeout=None): def get(self, timeout=None):
self.wait(timeout) self.wait(timeout)
if not self._ready: if not self.ready():
raise TimeoutError raise TimeoutError
if self._success: if self._success:
return self._value return self._value
...@@ -564,12 +558,7 @@ class ApplyResult(object): ...@@ -564,12 +558,7 @@ class ApplyResult(object):
self._callback(self._value) self._callback(self._value)
if self._error_callback and not self._success: if self._error_callback and not self._success:
self._error_callback(self._value) self._error_callback(self._value)
self._cond.acquire() self._event.set()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job] del self._cache[self._job]
# #
...@@ -586,7 +575,7 @@ class MapResult(ApplyResult): ...@@ -586,7 +575,7 @@ class MapResult(ApplyResult):
self._chunksize = chunksize self._chunksize = chunksize
if chunksize <= 0: if chunksize <= 0:
self._number_left = 0 self._number_left = 0
self._ready = True self._event.set()
else: else:
self._number_left = length//chunksize + bool(length % chunksize) self._number_left = length//chunksize + bool(length % chunksize)
...@@ -599,24 +588,14 @@ class MapResult(ApplyResult): ...@@ -599,24 +588,14 @@ class MapResult(ApplyResult):
if self._callback: if self._callback:
self._callback(self._value) self._callback(self._value)
del self._cache[self._job] del self._cache[self._job]
self._cond.acquire() self._event.set()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
else: else:
self._success = False self._success = False
self._value = result self._value = result
if self._error_callback: if self._error_callback:
self._error_callback(self._value) self._error_callback(self._value)
del self._cache[self._job] del self._cache[self._job]
self._cond.acquire() self._event.set()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
# #
# Class whose instances are returned by `Pool.imap()` # Class whose instances are returned by `Pool.imap()`
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment