Commit 2d9e5a62 authored by Jason Madden's avatar Jason Madden

documentation [skip ci]

parent 458c6655
...@@ -40,7 +40,8 @@ if not os.path.exists('changelog.rst') and os.path.exists('../changelog.rst'): ...@@ -40,7 +40,8 @@ if not os.path.exists('changelog.rst') and os.path.exists('../changelog.rst'):
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage', 'sphinx.ext.intersphinx', 'mysphinxext', extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage', 'sphinx.ext.intersphinx', 'mysphinxext',
'sphinx.ext.extlinks'] 'sphinx.ext.extlinks']
intersphinx_mapping = {'http://docs.python.org/': None} intersphinx_mapping = {'http://docs.python.org/': None,
'https://greenlet.readthedocs.org/en/latest/': None}
extlinks = {'issue': ('https://github.com/gevent/gevent/issues/%s', extlinks = {'issue': ('https://github.com/gevent/gevent/issues/%s',
'issue #'), 'issue #'),
......
...@@ -9,4 +9,6 @@ ...@@ -9,4 +9,6 @@
.. autofunction:: get_hub .. autofunction:: get_hub
.. autofunction:: reinit
.. autoclass:: Waiter .. autoclass:: Waiter
...@@ -95,7 +95,11 @@ Spawn helpers ...@@ -95,7 +95,11 @@ Spawn helpers
Useful general functions Useful general functions
------------------------ ------------------------
.. autofunction:: getcurrent .. function:: getcurrent()
Return the currently executing greenlet (the one that called this
function). Note that this may be an instance of :class:`Greenlet`
or :class:`greenlet.greenlet`.
.. autofunction:: sleep .. autofunction:: sleep
......
...@@ -2,7 +2,18 @@ ...@@ -2,7 +2,18 @@
:mod:`gevent.threadpool` :mod:`gevent.threadpool`
======================== ========================
.. automodule:: gevent.threadpool .. currentmodule:: gevent.threadpool
:members:
:undoc-members: .. autoclass:: ThreadPool
:inherited-members: :members: imap, imap_unordered, map, map_async, apply_async, kill,
join, spawn
.. method:: apply(func, args=None, kwds=None)
Rough equivalent of the :func:`apply` builtin, blocking until
the result is ready and returning it.
.. warning:: As implemented, attempting to use
:meth:`Threadpool.appy` from inside another function that
was itself spawned in a threadpool (any threadpool) will
lead to the hub throwing LoopExit.
...@@ -28,3 +28,4 @@ API reference ...@@ -28,3 +28,4 @@ API reference
gevent.thread gevent.thread
gevent.util gevent.util
gevent.wsgi gevent.wsgi
gevent.hub
...@@ -382,7 +382,7 @@ class Greenlet(greenlet): ...@@ -382,7 +382,7 @@ class Greenlet(greenlet):
Depending on what this greenlet is executing and the state of the event loop, Depending on what this greenlet is executing and the state of the event loop,
the exception may or may not be raised immediately when this greenlet resumes the exception may or may not be raised immediately when this greenlet resumes
execution. It may be raised an a subsequent green call, or, if this greenlet execution. It may be raised on a subsequent green call, or, if this greenlet
exits before making such a call, it may not be raised at all. As of 1.1, an example exits before making such a call, it may not be raised at all. As of 1.1, an example
where the exception is raised later is if this greenlet had called ``sleep(0)``; an where the exception is raised later is if this greenlet had called ``sleep(0)``; an
example where the exception is raised immediately is if this greenlet had called ``sleep(0.1)``. example where the exception is raised immediately is if this greenlet had called ``sleep(0.1)``.
......
...@@ -115,10 +115,11 @@ def kill(greenlet, exception=GreenletExit): ...@@ -115,10 +115,11 @@ def kill(greenlet, exception=GreenletExit):
.. note:: .. note::
The method :meth:`gevent.Greenlet.kill` method does the same and The method :meth:`Greenlet.kill` method does the same and
more (and the same caveats listed there apply here). However, the MAIN more (and the same caveats listed there apply here). However, the MAIN
greenlet - the one that exists initially - does not have a greenlet - the one that exists initially - does not have a
``kill()`` method so you have to use this function. ``kill()`` method, and neither do any created with :func:`spawn_raw`,
so you have to use this function.
.. versionchanged:: 1.1a2 .. versionchanged:: 1.1a2
If the ``greenlet`` has a ``kill`` method, calls it. This prevents a If the ``greenlet`` has a ``kill`` method, calls it. This prevents a
...@@ -178,7 +179,7 @@ class signal(object): ...@@ -178,7 +179,7 @@ class signal(object):
def reinit(): def reinit():
""" """
Prepare the gevent hub to run in a new process. Prepare the gevent hub to run in a new (forked) process.
This should be called *immediately* after :func:`os.fork` in the This should be called *immediately* after :func:`os.fork` in the
child process. This is done automatically by child process. This is done automatically by
...@@ -191,7 +192,15 @@ def reinit(): ...@@ -191,7 +192,15 @@ def reinit():
.. note:: Registered fork watchers may or may not run before .. note:: Registered fork watchers may or may not run before
this function (and thus ``gevent.os.fork``) return. If they have this function (and thus ``gevent.os.fork``) return. If they have
not run, they will run "soon", after an iteration of the event loop. not run, they will run "soon", after an iteration of the event loop.
You can force this by inserting a few small (but non-zero) calls to :func:`sleep`. You can force this by inserting a few small (but non-zero) calls to :func:`sleep`
after fork returns. (As of gevent 1.1 and before, fork watchers will
not have run, but this may change in the future.)
.. note:: This function may be removed in a future major release
if the fork process can be more smoothly managed.
.. warning:: See remarks in :func:`gevent.os.fork` about greenlets
and libev watchers in the child process.
""" """
# The loop reinit function in turn calls libev's ev_loop_fork # The loop reinit function in turn calls libev's ev_loop_fork
# function. # function.
......
""" """
This module provides cooperative versions of os.read() and os.write(). This module provides cooperative versions of os.read() and os.write().
On Posix platforms this uses non-blocking IO, on Windows a threadpool On Posix platforms this uses non-blocking IO, on Windows a threadpool
is used. is used.
""" """
...@@ -33,6 +34,9 @@ if fcntl: ...@@ -33,6 +34,9 @@ if fcntl:
__extensions__ += ['make_nonblocking', 'nb_read', 'nb_write'] __extensions__ += ['make_nonblocking', 'nb_read', 'nb_write']
def make_nonblocking(fd): def make_nonblocking(fd):
"""Put the file descriptor *fd* into non-blocking mode if possible.
:return: A boolean value that evaluates to True if successful."""
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
if not bool(flags & os.O_NONBLOCK): if not bool(flags & os.O_NONBLOCK):
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
...@@ -81,15 +85,21 @@ if fcntl: ...@@ -81,15 +85,21 @@ if fcntl:
def tp_read(fd, n): def tp_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string """Read up to *n* bytes from file descriptor *fd*. Return a string
containing the bytes read. If end-of-file is reached, an empty string containing the bytes read. If end-of-file is reached, an empty string
is returned.""" is returned.
Reading is done using the threadpool.
"""
return get_hub().threadpool.apply(_read, (fd, n)) return get_hub().threadpool.apply(_read, (fd, n))
def tp_write(fd, buf): def tp_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the """Write bytes from buffer *buf* to file descriptor *fd*. Return the
number of bytes written.""" number of bytes written.
Writing is done using the threadpool.
"""
return get_hub().threadpool.apply(_write, (fd, buf)) return get_hub().threadpool.apply(_write, (fd, buf))
...@@ -170,7 +180,15 @@ if hasattr(os, 'fork'): ...@@ -170,7 +180,15 @@ if hasattr(os, 'fork'):
Fork a child process and start a child watcher for it in the parent process. Fork a child process and start a child watcher for it in the parent process.
This call cooperates with the :func:`gevent.os.waitpid` to enable cooperatively waiting This call cooperates with the :func:`gevent.os.waitpid` to enable cooperatively waiting
for children to finish. for children to finish. When monkey-patching, these functions are patched in as
:func:`os.fork` and :func:`os.waitpid`, respectively.
In the child process, this function calls :func:`gevent.hub.reinit` before returning.
.. warning:: Forking a process that uses greenlets does not eliminate all non-running
greenlets. Any that were scheduled in the hub of the forking thread in the parent
remain scheduled in the child; compare this to how normal threads operate. (This behaviour
may change is a subsequent major release.)
:keyword callback: If given, a callable that will be called with the child watcher :keyword callback: If given, a callable that will be called with the child watcher
when the child finishes. when the child finishes.
......
...@@ -208,6 +208,11 @@ class GroupMappingMixin(object): ...@@ -208,6 +208,11 @@ class GroupMappingMixin(object):
return self.spawn(func, *args, **kwds).get() return self.spawn(func, *args, **kwds).get()
def map(self, func, iterable): def map(self, func, iterable):
"""Return a list made by applying the *func* to each element of
the iterable.
.. seealso:: :meth:`imap`
"""
return list(self.imap(func, iterable)) return list(self.imap(func, iterable))
def map_cb(self, func, iterable, callback=None): def map_cb(self, func, iterable, callback=None):
......
...@@ -97,6 +97,7 @@ class ThreadPool(GroupMappingMixin): ...@@ -97,6 +97,7 @@ class ThreadPool(GroupMappingMixin):
self._init(self._maxsize) self._init(self._maxsize)
def join(self): def join(self):
"""Waits until all outstanding tasks have been completed."""
delay = 0.0005 delay = 0.0005
while self.task_queue.unfinished_tasks > 0: while self.task_queue.unfinished_tasks > 0:
sleep(delay) sleep(delay)
...@@ -144,22 +145,33 @@ class ThreadPool(GroupMappingMixin): ...@@ -144,22 +145,33 @@ class ThreadPool(GroupMappingMixin):
raise raise
def spawn(self, func, *args, **kwargs): def spawn(self, func, *args, **kwargs):
"""
Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
Waits until a slot is available. Creates a new thread if necessary.
:return: A :class:`gevent.event.AsyncResult`.
"""
while True: while True:
semaphore = self._semaphore semaphore = self._semaphore
semaphore.acquire() semaphore.acquire()
if semaphore is self._semaphore: if semaphore is self._semaphore:
break break
thread_result = None
try: try:
task_queue = self.task_queue task_queue = self.task_queue
result = AsyncResult() result = AsyncResult()
thread_result = ThreadResult(result, hub=self.hub) # XXX We're calling the semaphore release function in the hub, otherwise
# we get LoopExit (why?). Previously it was done with a rawlink on the
# AsyncResult and the comment that it is "competing for order with get(); this is not
# good, just make ThreadResult release the semaphore before doing anything else"
thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release)
task_queue.put((func, args, kwargs, thread_result)) task_queue.put((func, args, kwargs, thread_result))
self.adjust() self.adjust()
# rawlink() must be the last call
result.rawlink(lambda *args: self._semaphore.release())
# XXX this _semaphore.release() is competing for order with get()
# XXX this is not good, just make ThreadResult release the semaphore before doing anything else
except: except:
if thread_result is not None:
thread_result.destroy()
semaphore.release() semaphore.release()
raise raise
return result return result
...@@ -210,6 +222,10 @@ class ThreadPool(GroupMappingMixin): ...@@ -210,6 +222,10 @@ class ThreadPool(GroupMappingMixin):
self._decrease_size() self._decrease_size()
def apply_e(self, expected_errors, function, args=None, kwargs=None): def apply_e(self, expected_errors, function, args=None, kwargs=None):
"""
.. deprecated:: 1.1a2
Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
"""
# Deprecated but never documented. In the past, before # Deprecated but never documented. In the past, before
# self.apply() allowed all errors to be raised to the caller, # self.apply() allowed all errors to be raised to the caller,
# expected_errors allowed a caller to specify a set of errors # expected_errors allowed a caller to specify a set of errors
...@@ -233,8 +249,9 @@ class ThreadPool(GroupMappingMixin): ...@@ -233,8 +249,9 @@ class ThreadPool(GroupMappingMixin):
class ThreadResult(object): class ThreadResult(object):
exc_info = () exc_info = ()
_call_when_ready = None
def __init__(self, receiver, hub=None): def __init__(self, receiver, hub=None, call_when_ready=None):
if hub is None: if hub is None:
hub = get_hub() hub = get_hub()
self.receiver = receiver self.receiver = receiver
...@@ -243,6 +260,8 @@ class ThreadResult(object): ...@@ -243,6 +260,8 @@ class ThreadResult(object):
self.context = None self.context = None
self.async = hub.loop.async() self.async = hub.loop.async()
self.async.start(self._on_async) self.async.start(self._on_async)
if call_when_ready:
self._call_when_ready = call_when_ready
@property @property
def exception(self): def exception(self):
...@@ -250,12 +269,18 @@ class ThreadResult(object): ...@@ -250,12 +269,18 @@ class ThreadResult(object):
def _on_async(self): def _on_async(self):
self.async.stop() self.async.stop()
if self._call_when_ready:
# Typically this is pool.semaphore.release and we have to
# call this in the Hub; if we don't we get the dreaded
# LoopExit (XXX: Why?)
self._call_when_ready()
try: try:
if self.exc_info: if self.exc_info:
self.hub.handle_error(self.context, *self.exc_info) self.hub.handle_error(self.context, *self.exc_info)
self.context = None self.context = None
self.async = None self.async = None
self.hub = None self.hub = None
self._call_when_ready = None
if self.receiver is not None: if self.receiver is not None:
self.receiver(self) self.receiver(self)
finally: finally:
...@@ -264,14 +289,27 @@ class ThreadResult(object): ...@@ -264,14 +289,27 @@ class ThreadResult(object):
if self.exc_info: if self.exc_info:
self.exc_info = (self.exc_info[0], self.exc_info[1], None) self.exc_info = (self.exc_info[0], self.exc_info[1], None)
def destroy(self):
if self.async is not None:
self.async.stop()
self.async = None
self.context = None
self.hub = None
self._call_when_ready = None
self.receiver = None
def _ready(self):
if self.async is not None:
self.async.send()
def set(self, value): def set(self, value):
self.value = value self.value = value
self.async.send() self._ready()
def handle_error(self, context, exc_info): def handle_error(self, context, exc_info):
self.context = context self.context = context
self.exc_info = exc_info self.exc_info = exc_info
self.async.send() self._ready()
# link protocol: # link protocol:
def successful(self): def successful(self):
...@@ -279,7 +317,10 @@ class ThreadResult(object): ...@@ -279,7 +317,10 @@ class ThreadResult(object):
def wrap_errors(errors, function, args, kwargs): def wrap_errors(errors, function, args, kwargs):
# Deprecated but never documented. """
.. deprecated:: 1.1a2
Only used by ThreadPool.apply_e.
"""
try: try:
return True, function(*args, **kwargs) return True, function(*args, **kwargs)
except errors as ex: except errors as ex:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment