Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mitogen
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
nexedi
mitogen
Commits
e3d967eb
Commit
e3d967eb
authored
Sep 26, 2017
by
David Wilson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
issue #20: initial implementation of mitogen.master.Select().
parent
14783c75
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
211 additions
and
12 deletions
+211
-12
docs/api.rst
docs/api.rst
+98
-0
docs/index.rst
docs/index.rst
+27
-0
mitogen/core.py
mitogen/core.py
+26
-12
mitogen/master.py
mitogen/master.py
+60
-0
No files found.
docs/api.rst
View file @
e3d967eb
...
...
@@ -60,6 +60,104 @@ be sent to any context that will be used to establish additional child
contexts.
.. class:: mitogen.master.Select (receivers=(), oneshot=True)
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:py:class:`mitogen.core.Receiver` or :py:class:`mitogen.master.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is ``True``, then remove each receiver as it yields a result;
since :py:meth:`__iter__` terminates once the final receiver is removed
from the select, this makes it convenient to respond to several call
results with minimal effort:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
with mitogen.master.Select(recvs) as select:
for recv, msg in select:
value = msg.unpickle()
print 'Got %s from %s' % (value, recv)
total += value
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
:py:class:`Select` may also be used to drive a long-running scheduler:
.. code-block:: python
with mitogen.master.Select() as select:
while running():
for recv, msg in select:
process_result(recv.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work())
]
with mitogen.master.Select(selects, oneshot=False) as select:
while subselects and any(subselects): # Calls __bool__()
print select.get()
.. py:method:: get (timeout=None)
Fetch the next available value from any receiver, or raise
:py:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
:param float timeout:
Timeout in seconds.
:return:
`(receiver, msg)`
.. py:method:: __bool__ ()
Return ``True`` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver.
Necessary to prevent memory leaks in long-running receivers. This is
called automatically when the Python ``with:`` statement is used.
.. py:method:: empty ()
Return ``True`` if no items appear to be queued on this receiver. Like
:py:class:`Queue.Queue`, this function's return value cannot be relied
upon.
.. py:method:: __iter__ (self)
Yield the result of :py:meth:`get` until no receivers remain in the
select, either because `oneshot` is ``True``, or each receiver was
explicitly removed via :py:meth:`remove`.
.. py:method:: add (recv)
Add the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :py:meth:`remove`, then it will
still be returned by a subsequent :py:meth:`get`. This may change in a
future version.
mitogen.fakessh
---------------
...
...
docs/index.rst
View file @
e3d967eb
...
...
@@ -298,6 +298,33 @@ usual into the slave process.
mitogen.utils.run_with_broker(main)
Scatter/Gather Function Calls
#############################
Functions may be invoked asynchronously, with results returned as they become
available.
.. code-block:: python
def disk_usage(path):
return sum((os.path.getsize(os.path.join(dirpath, name))
for dirpath, dirnames, filenames in os.walk(path)
for name in dirnames + filenames), 0)
if __name__ == '__main__' and mitogen.is_master:
contexts = connect_contexts(...)
receivers = [c.call_async(disk_usage, '/tmp') for c in contexts]
total = 0
for recv, msg in mitogen.master.Select(receivers):
value = result.unpickle()
print 'Context %s /tmp usage: %d' % (recv.context, value)
total += value
print 'Total /tmp usage across all contexts: %d' % (total,)
Event-driven IO
###############
...
...
mitogen/core.py
View file @
e3d967eb
...
...
@@ -264,9 +264,28 @@ class Sender(object):
)
def
_queue_interruptible_get
(
queue
,
timeout
=
None
,
block
=
True
):
if
timeout
:
timeout
+=
time
.
time
()
msg
=
None
while
msg
is
None
and
(
timeout
is
None
or
timeout
<
time
.
time
()):
try
:
msg
=
queue
.
get
(
True
,
0.5
,
block
=
block
)
except
Queue
.
Empty
:
if
block
:
break
if
msg
is
None
:
raise
TimeoutError
(
'deadline exceeded.'
)
return
msg
class
Receiver
(
object
):
def
__init__
(
self
,
router
,
handle
=
None
,
persist
=
True
,
respondent
=
None
):
self
.
router
=
router
self
.
notify
=
[]
self
.
handle
=
handle
# Avoid __repr__ crash in add_handler()
self
.
handle
=
router
.
add_handler
(
self
.
_on_receive
,
handle
,
persist
,
respondent
)
...
...
@@ -279,27 +298,22 @@ class Receiver(object):
"""Callback from the Stream; appends data to the internal queue."""
IOLOG
.
debug
(
'%r._on_receive(%r)'
,
self
,
msg
)
self
.
_queue
.
put
(
msg
)
for
func
in
self
.
notify
:
func
(
self
)
def
close
(
self
):
self
.
_queue
.
put
(
_DEAD
)
def
empty
(
self
):
return
self
.
_queue
.
empty
()
def
get
(
self
,
timeout
=
None
):
"""Receive an object, or ``None`` if `timeout` is reached."""
IOLOG
.
debug
(
'%r.on_receive(timeout=%r)'
,
self
,
timeout
)
if
timeout
:
timeout
+=
time
.
time
()
msg
=
None
while
msg
is
None
and
(
timeout
is
None
or
timeout
<
time
.
time
()):
try
:
msg
=
self
.
_queue
.
get
(
True
,
0.5
)
except
Queue
.
Empty
:
continue
if
msg
is
None
:
raise
TimeoutError
(
'deadline exceeded.'
)
msg
=
_queue_interruptible_get
(
self
.
_queue
,
timeout
)
IOLOG
.
debug
(
'%r.on_receive() got %r'
,
self
,
msg
)
if
msg
==
_DEAD
:
raise
ChannelError
(
'Channel closed by local end.'
)
...
...
mitogen/master.py
View file @
e3d967eb
...
...
@@ -231,6 +231,66 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'),
co
.
co_consts
[
arg2
]
or
())
class
Select
(
object
):
def
__init__
(
self
,
receivers
=
(),
oneshot
=
True
):
self
.
_receivers
=
[]
self
.
_oneshot
=
oneshot
self
.
_queue
=
Queue
.
Queue
()
self
.
_notify
=
[]
for
recv
in
receivers
:
self
.
add
(
recv
)
def
_put
(
self
,
value
):
self
.
_queue
.
put
(
value
)
for
func
in
self
.
_notify
:
func
(
self
)
def
__bool__
(
self
):
return
bool
(
self
.
_receivers
)
def
__enter__
(
self
):
return
self
def
__exit__
(
self
,
e_type
,
e_val
,
e_tb
):
self
.
close
()
def
__iter__
(
self
):
while
self
.
_receivers
:
recv
,
msg
=
self
.
get
()
if
self
.
_oneshot
:
self
.
remove
(
recv
)
yield
recv
,
msg
def
add
(
self
,
recv
):
self
.
_receivers
.
append
(
recv
)
recv
.
notify
.
append
(
self
.
_put
)
# Avoid race by polling once after installation.
if
not
recv
.
empty
():
self
.
_put
(
recv
)
def
remove
(
self
,
recv
):
recv
.
notify
.
remove
(
self
.
_put
)
def
close
(
self
):
for
recv
in
self
.
_receivers
[:]:
self
.
remove
(
recv
)
def
empty
(
self
):
return
self
.
_queue
.
empty
()
def
get
(
self
,
timeout
=
None
):
while
True
:
recv
=
mitogen
.
core
.
_queue_interruptible_get
(
queue
,
timeout
)
try
:
return
recv
,
recv
.
get
(
block
=
False
)
except
mitogen
.
core
.
TimeoutError
:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
continue
class
LogForwarder
(
object
):
def
__init__
(
self
,
router
):
self
.
_router
=
router
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment