Commit e024f665 authored by Loic Esteve's avatar Loic Esteve

Improvements in the joblib backend

* Catching exceptions to have error messages inside ZOPE rather than timeout
* Fix timeout bug when timeout is None
* Preparation for the the two-stage computation when the transaction is aborted
  retried when all the jobs have finished running.
parent 445f70ba
...@@ -26,15 +26,33 @@ ...@@ -26,15 +26,33 @@
############################################################################## ##############################################################################
ENABLE_JOBLIB = True ENABLE_JOBLIB = True
import sys
import time
import transaction import transaction
from zLOG import LOG, INFO, WARNING
try: try:
from sklearn.externals.joblib import register_parallel_backend
from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend
from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend
from sklearn.externals.joblib._parallel_backends import SafeFunction
from sklearn.externals.joblib.my_exceptions import TransportableException, WorkerInterrupt
from sklearn.externals.joblib.format_stack import format_exc
except ImportError: except ImportError:
from zLOG import LOG, WARNING
LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!") LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!")
ENABLE_JOBLIB = False ENABLE_JOBLIB = False
class MySafeFunction(SafeFunction):
"""Wrapper around a SafeFunction that catches any exception
The exception can be handled in CMFActivityResult.get
"""
def __call__(self, *args, **kwargs):
try:
return super(MySafeFunction, self).__call__(*args, **kwargs)
except Exception as exc:
return exc
class CMFActivityResult(object): class CMFActivityResult(object):
def __init__(self, active_process, callback): def __init__(self, active_process, callback):
...@@ -43,16 +61,19 @@ class CMFActivityResult(object): ...@@ -43,16 +61,19 @@ class CMFActivityResult(object):
def get(self, timeout=None): def get(self, timeout=None):
while not self.active_process.getResultList(): while not self.active_process.getResultList():
time.sleep(1) time.sleep(1)
timeout -= 1 if timeout is not None:
if timeout < 0: timeout -= 1
raise RuntimeError('Timeout reached') if timeout < 0:
raise RuntimeError('Timeout reached')
transaction.commit() transaction.commit()
result = self.active_process.getResultList()[0].result result = self.active_process.getResultList()[0].result
# TODO raise before or after the callback?
if isinstance(result, Exception):
raise result
if self.callback is not None: if self.callback is not None:
self.callback(result) self.callback(result)
return result return result
if ENABLE_JOBLIB: if ENABLE_JOBLIB:
class CMFActivityBackend(ParallelBackendBase): class CMFActivityBackend(ParallelBackendBase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
...@@ -62,21 +83,38 @@ if ENABLE_JOBLIB: ...@@ -62,21 +83,38 @@ if ENABLE_JOBLIB:
return n_jobs return n_jobs
def apply_async(self, batch, callback=None): def apply_async(self, batch, callback=None):
"""Schedule a func to be run""" """Schedule a func to be run"""
active_process = self.zope_context.portal_activities.newActiveProcess() portal_activities = self.zope_context.portal_activities
active_process.batch = batch
active_process.activate(activity='SQLQueue', active_process=active_process).batch() # the creation of activitiy process here, might be removed.
transaction.commit() active_process = portal_activities.newActiveProcess()
return CMFActivityResult(active_process, callback)
# SQLJoblib == JoblibActivity
joblib_result = portal_activities.activate(activity='SQLQueue',
active_process=active_process).Base_callSafeFunction(MySafeFunction(batch))
# While activate() don't return the joblib_result
if joblib_result is None:
# Transaction commit, is a code crime.
transaction.commit()
joblib_result = CMFActivityResult(active_process, callback)
return joblib_result
def configure(self, n_jobs=1, parallel=None, **backend_args): def configure(self, n_jobs=1, parallel=None, **backend_args):
"""Reconfigure the backend and return the number of workers. This """Reconfigure the backend and return the number of workers. This
makes it possible to reuse an existing backend instance for successive makes it possible to reuse an existing backend instance for successive
independent calls to Parallel with different parameters.""" independent calls to Parallel with different parameters."""
# TODO LOG("CMFActivityBackend", INFO, 'n_jobs={}'.format(n_jobs))
LOG("CMFActivityBackend", INFO, 'parallel={}'.format(parallel))
if n_jobs == 1:
raise FallbackToBackend(SequentialBackend())
self.parallel = parallel self.parallel = parallel
# self.zope_context = backend_args['zope_context'] # self.zope_context = backend_args['zope_context']
return self.effective_n_jobs(n_jobs) return self.effective_n_jobs(n_jobs)
register_parallel_backend('CMFActivity', CMFActivityBackend)
else: else:
class CMFActivityBackend(object): class CMFActivityBackend(object):
pass pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment