Commit 3c6b4f57 authored by Hugo Ricateau's avatar Hugo Ricateau

Improves performance when there is only one worker to run the task queue.

parent 692af860
......@@ -185,3 +185,61 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule):
return super(StrategyModule, self).run(iterator, play_context)
finally:
self._remove_wrappers()
def _queue_task(self, host, task, task_vars, play_context):
''' handles queueing the task up to be sent to a worker '''
# method adapted from
# https://github.com/ansible/ansible/blob/b9c94cc0c4f43944867ae4a1c664081217c786a2/lib/ansible/plugins/strategy/__init__.py#L213-L270
# ( ansible stable-2.4 )
display.debug("entering _queue_task() for %s/%s" % (host.name, task.action))
if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = Lock()
# and then queue the new task
try:
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
queued = False
starting_worker = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive():
self._queued_task_cache[(host.name, task._uuid)] = {
'host': host,
'task': task,
'task_vars': task_vars,
'play_context': play_context
}
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc
# changes to the original code start here
# this hack allows to bypass the forking of a multi-process if there is only one worker; reduces the execution time
if len(self._workers) > 1:
worker_prc.start()
else:
worker_prc.run()
# changes to the original code end here
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
if queued:
break
elif self._cur_worker == starting_worker:
time.sleep(0.0001)
self._pending_results += 1
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
display.debug("got an error while queuing: %s" % e)
return
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment