Commit b0506d5b authored by Tristan Cavelier's avatar Tristan Cavelier

jio.js updated

parent fdcffe5b
...@@ -402,6 +402,23 @@ defaults.job_rule_conditions = {}; ...@@ -402,6 +402,23 @@ defaults.job_rule_conditions = {};
methodType(b.method) === 'writer'; methodType(b.method) === 'writer';
} }
/**
* Compare two jobs and test if they use metadata only
*
* @param {Object} a The first job to compare
* @param {Object} b The second job to compare
* @return {Boolean} True if equal, else false
*/
function useMetadataOnly(a, b) {
if (['post', 'put', 'get', 'remove', 'allDocs'].indexOf(a.method) === -1) {
return false;
}
if (['post', 'put', 'get', 'remove', 'allDocs'].indexOf(b.method) === -1) {
return false;
}
return true;
}
/** /**
* Compare two jobs and test if they are readers * Compare two jobs and test if they are readers
* *
...@@ -481,6 +498,7 @@ defaults.job_rule_conditions = {}; ...@@ -481,6 +498,7 @@ defaults.job_rule_conditions = {};
"sameStorageDescription": sameStorageDescription, "sameStorageDescription": sameStorageDescription,
"areWriters": areWriters, "areWriters": areWriters,
"areReaders": areReaders, "areReaders": areReaders,
"useMetadataOnly": useMetadataOnly,
"sameMethod": sameMethod, "sameMethod": sameMethod,
"sameDocumentId": sameDocumentId, "sameDocumentId": sameDocumentId,
"sameParameters": sameParameters, "sameParameters": sameParameters,
...@@ -1189,10 +1207,10 @@ function JIO(storage_spec, options) { ...@@ -1189,10 +1207,10 @@ function JIO(storage_spec, options) {
enableJobMaker(this, shared, options); enableJobMaker(this, shared, options);
enableJobReference(this, shared, options); enableJobReference(this, shared, options);
enableJobRetry(this, shared, options); enableJobRetry(this, shared, options);
enableJobTimeout(this, shared, options);
enableJobChecker(this, shared, options); enableJobChecker(this, shared, options);
enableJobQueue(this, shared, options); enableJobQueue(this, shared, options);
enableJobRecovery(this, shared, options); enableJobRecovery(this, shared, options);
enableJobTimeout(this, shared, options);
enableJobExecuter(this, shared, options); enableJobExecuter(this, shared, options);
shared.emit('load'); shared.emit('load');
...@@ -2306,7 +2324,9 @@ function enableJobChecker(jio, shared, options) { ...@@ -2306,7 +2324,9 @@ function enableJobChecker(jio, shared, options) {
// creates // creates
// - shared.job_rules Array // - shared.job_rules Array
// uses 'job' event // uses 'job:new' event
// emits 'job:modified', 'job:start', 'job:resolved',
// 'job:end', 'job:reject' events
var i; var i;
...@@ -2315,33 +2335,39 @@ function enableJobChecker(jio, shared, options) { ...@@ -2315,33 +2335,39 @@ function enableJobChecker(jio, shared, options) {
shared.job_rule_actions = { shared.job_rule_actions = {
wait: function (original_job, new_job) { wait: function (original_job, new_job) {
original_job.promise.always(function () { original_job.promise.always(function () {
shared.emit('job', new_job); new_job.state = 'ready';
new_job.modified = new Date();
shared.emit('job:modified', new_job);
shared.emit('job:start', new_job);
}); });
new_job.state = 'waiting'; new_job.state = 'waiting';
new_job.modified = new Date(); new_job.modified = new Date();
shared.emit('job:modified', new_job);
}, },
update: function (original_job, new_job) { update: function (original_job, new_job) {
if (!new_job.solver) { if (!new_job.solver) {
// promise associated to the job // promise associated to the job
new_job.state = 'done'; new_job.state = 'done';
shared.emit('jobDone', new_job); shared.emit('job:resolved', new_job, []); // XXX why resolve?
shared.emit('job:end', new_job);
} else { } else {
if (!original_job.solver) { if (!original_job.solver) {
original_job.solver = new_job.solver; original_job.solver = new_job.solver;
} else { } else {
original_job.promise.then( original_job.promise.then(
new_job.command.resolve, new_job.command.resolve,
new_job.command.reject new_job.command.reject,
new_job.command.notify
); );
} }
} }
new_job.state = 'running'; new_job.state = 'running';
new_job.modified = new Date(); new_job.modified = new Date();
shared.emit('job:modified', new_job);
}, },
deny: function (original_job, new_job) { deny: function (original_job, new_job) {
new_job.state = 'fail'; new_job.state = "running";
new_job.modified = new Date(); shared.emit('job:reject', new_job, [
restCommandRejecter(new_job, [
'precondition_failed', 'precondition_failed',
'command denied', 'command denied',
'Command rejected by the job checker.' 'Command rejected by the job checker.'
...@@ -2463,7 +2489,7 @@ function enableJobChecker(jio, shared, options) { ...@@ -2463,7 +2489,7 @@ function enableJobChecker(jio, shared, options) {
} }
} else { } else {
// browsing jobs // browsing jobs
for (j = 0; j < shared.jobs.length; j += 1) { for (j = shared.jobs.length - 1; j >= 0; j -= 1) {
if (shared.jobs[j] !== job) { if (shared.jobs[j] !== job) {
if ( if (
jobsRespectConditions( jobsRespectConditions(
...@@ -2498,10 +2524,11 @@ function enableJobChecker(jio, shared, options) { ...@@ -2498,10 +2524,11 @@ function enableJobChecker(jio, shared, options) {
], ],
"action": "update" "action": "update"
}, { }, {
"code_name": "writers update", "code_name": "metadata writers update",
"conditions": [ "conditions": [
"sameStorageDescription", "sameStorageDescription",
"areWriters", "areWriters",
"useMetadataOnly",
"sameMethod", "sameMethod",
"haveDocumentIds", "haveDocumentIds",
"sameParameters" "sameParameters"
...@@ -2528,7 +2555,7 @@ function enableJobChecker(jio, shared, options) { ...@@ -2528,7 +2555,7 @@ function enableJobChecker(jio, shared, options) {
} }
} }
shared.on('job', checkJob); shared.on('job:new', checkJob);
} }
...@@ -2544,19 +2571,28 @@ function enableJobChecker(jio, shared, options) { ...@@ -2544,19 +2571,28 @@ function enableJobChecker(jio, shared, options) {
function enableJobExecuter(jio, shared) { // , options) { function enableJobExecuter(jio, shared) { // , options) {
// uses 'job', 'jobDone', 'jobFail' and 'jobNotify' events // uses 'job:new' events
// emits 'jobRun' and 'jobEnd' events // uses actions 'job:resolve', 'job:reject' and 'job:notify'
// listeners // emits 'job:modified', 'job:started', 'job:resolved',
// 'job:rejected', 'job:notified' and 'job:end' events
// emits action 'job:start'
shared.on('job', function (param) { function startJobIfReady(job) {
if (job.state === 'ready') {
shared.emit('job:start', job);
}
}
function executeJobIfReady(param) {
var storage; var storage;
if (param.state === 'ready') { if (param.state === 'ready') {
param.tried += 1; param.tried += 1;
param.started = new Date(); param.started = new Date();
param.state = 'running'; param.state = 'running';
param.modified = new Date(); param.modified = new Date();
shared.emit('jobRun', param); shared.emit('job:modified', param);
shared.emit('job:started', param);
try { try {
storage = createStorage(deepClone(param.storage_spec)); storage = createStorage(deepClone(param.storage_spec));
} catch (e) { } catch (e) {
...@@ -2584,35 +2620,49 @@ function enableJobExecuter(jio, shared) { // , options) { ...@@ -2584,35 +2620,49 @@ function enableJobExecuter(jio, shared) { // , options) {
); );
}); });
} }
}); }
shared.on('jobDone', function (param, args) { function endAndResolveIfRunning(job, args) {
if (param.state === 'running') { if (job.state === 'running') {
param.state = 'done'; job.state = 'done';
param.modified = new Date(); job.modified = new Date();
shared.emit('jobEnd', param); shared.emit('job:modified', job);
if (param.solver) { if (job.solver) {
restCommandResolver(param, args); restCommandResolver(job, args);
}
shared.emit('job:resolved', job, args);
shared.emit('job:end', job);
} }
} }
});
shared.on('jobFail', function (param, args) { function endAndRejectIfRunning(job, args) {
if (param.state === 'running') { if (job.state === 'running') {
param.state = 'fail'; job.state = 'fail';
param.modified = new Date(); job.modified = new Date();
shared.emit('jobEnd', param); shared.emit('job:modified', job);
if (param.solver) { if (job.solver) {
restCommandRejecter(param, args); restCommandRejecter(job, args);
}
shared.emit('job:rejected', job, args);
shared.emit('job:end', job);
} }
} }
});
shared.on('jobNotify', function (param, args) { function notifyJobIfRunning(job, args) {
if (param.state === 'running' && param.solver) { if (job.state === 'running' && job.solver) {
param.solver.notify(args[0]); job.solver.notify(args[0]);
shared.emit('job:notified', job, args);
} }
}); }
// listeners
shared.on('job:new', startJobIfReady);
shared.on('job:start', executeJobIfReady);
shared.on('job:resolve', endAndResolveIfRunning);
shared.on('job:reject', endAndRejectIfRunning);
shared.on('job:notify', notifyJobIfRunning);
} }
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */ /*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
...@@ -2637,10 +2687,16 @@ function enableJobMaker(jio, shared, options) { ...@@ -2637,10 +2687,16 @@ function enableJobMaker(jio, shared, options) {
// - param.options object // - param.options object
// - param.command object // - param.command object
// uses method events // list of job events:
// add emits 'job' events // - Job existence -> new, end
// - Job execution -> started, stopped
// - Job resolution -> resolved, rejected, notified, cancelled
// - Job modification -> modified
// the job can emit 'jobDone', 'jobFail' and 'jobNotify' // emits actions 'job:resolve', 'job:reject' and 'job:notify'
// uses `rest method` events
// emits 'job:new' event
shared.job_keys = arrayExtend(shared.job_keys || [], [ shared.job_keys = arrayExtend(shared.job_keys || [], [
"created", "created",
...@@ -2653,55 +2709,56 @@ function enableJobMaker(jio, shared, options) { ...@@ -2653,55 +2709,56 @@ function enableJobMaker(jio, shared, options) {
"options" "options"
]); ]);
function addCommandToJob(param) { function addCommandToJob(job) {
param.command = {}; job.command = {};
param.command.resolve = function () { job.command.resolve = function () {
shared.emit('jobDone', param, arguments); shared.emit('job:resolve', job, arguments);
}; };
param.command.success = param.command.resolve; job.command.success = job.command.resolve;
param.command.reject = function () { job.command.reject = function () {
shared.emit('jobFail', param, arguments); shared.emit('job:reject', job, arguments);
}; };
param.command.error = param.command.reject; job.command.error = job.command.reject;
param.command.notify = function () { job.command.notify = function () {
shared.emit('jobNotify', param, arguments); shared.emit('job:notify', job, arguments);
}; };
param.command.storage = function () { job.command.storage = function () {
return shared.createRestApi.apply(null, arguments); return shared.createRestApi.apply(null, arguments);
}; };
} }
// listeners function createJobFromRest(param) {
shared.rest_method_names.forEach(function (method) {
shared.on(method, function (param) {
if (param.solver) { if (param.solver) {
// params are good // rest parameters are good
shared.emit('job', param); shared.emit('job:new', param);
}
} }
});
});
shared.on('job', function (param) { function initJob(job) {
// new or recovered job job.state = 'ready';
param.state = 'ready'; if (typeof job.tried !== 'number' || !isFinite(job.tried)) {
if (typeof param.tried !== 'number' || !isFinite(param.tried)) { job.tried = 0;
param.tried = 0;
} }
if (!param.created) { if (!job.created) {
param.created = new Date(); job.created = new Date();
} }
if (!param.command) { addCommandToJob(job);
addCommandToJob(param); job.modified = new Date();
} }
param.modified = new Date();
// listeners
shared.rest_method_names.forEach(function (method) {
shared.on(method, createJobFromRest);
}); });
shared.on('job:new', initJob);
} }
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */ /*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
/*global arrayExtend, localStorage, Workspace, uniqueJSONStringify, JobQueue, /*global arrayExtend, localStorage, Workspace, uniqueJSONStringify, JobQueue,
constants, indexOf */ constants, indexOf, setTimeout, clearTimeout */
function enableJobQueue(jio, shared, options) { function enableJobQueue(jio, shared, options) {
...@@ -2717,75 +2774,89 @@ function enableJobQueue(jio, shared, options) { ...@@ -2717,75 +2774,89 @@ function enableJobQueue(jio, shared, options) {
// - shared.workspace Workspace // - shared.workspace Workspace
// - shared.job_queue JobQueue // - shared.job_queue JobQueue
// uses 'job', 'jobRun', 'jobStop', 'jobEnd' events // uses 'job:new', 'job:started', 'job:stopped', 'job:modified',
// emits 'jobEnd' events // 'job:notified', 'job:end' events
if (options.job_management !== false) { // emits 'job:end' event
shared.job_keys = arrayExtend(shared.job_keys || [], ["id"]);
if (typeof options.workspace !== 'object') {
shared.workspace = localStorage;
} else {
shared.workspace = new Workspace(options.workspace);
}
if (!shared.storage_spec_str) { function postJobIfReady(param) {
shared.storage_spec_str = uniqueJSONStringify(shared.storage_spec); if (!param.stored && param.state === 'ready') {
} clearTimeout(param.queue_ident);
delete param.queue_ident;
shared.job_queue = new JobQueue(
shared.workspace,
'jio/jobs/' + shared.storage_spec_str,
shared.job_keys
);
shared.on('job', function (param) {
if (indexOf(param.state, ['fail', 'done']) === -1) {
if (!param.stored) {
shared.job_queue.load(); shared.job_queue.load();
shared.job_queue.post(param); shared.job_queue.post(param);
shared.job_queue.save(); shared.job_queue.save();
param.stored = true; param.stored = true;
} }
} }
});
['jobRun', 'jobStop'].forEach(function (event) { function deferredPutJob(param) {
shared.on(event, function (param) { if (param.queue_ident === undefined) {
param.queue_ident = setTimeout(function () {
delete param.queue_ident;
if (param.stored) { if (param.stored) {
shared.job_queue.load(); shared.job_queue.load();
if (param.state === 'done' || param.state === 'fail') {
if (shared.job_queue.remove(param.id)) {
shared.job_queue.save();
delete param.storad;
}
} else {
shared.job_queue.put(param); shared.job_queue.put(param);
shared.job_queue.save(); shared.job_queue.save();
} }
}
});
}); });
}
}
shared.on('jobEnd', function (param) { function removeJob(param) {
clearTimeout(param.queue_ident);
delete param.queue_ident;
if (param.stored) { if (param.stored) {
shared.job_queue.load(); shared.job_queue.load();
if (shared.job_queue.remove(param.id)) { shared.job_queue.remove(param.id);
shared.job_queue.save(); shared.job_queue.save();
delete param.stored;
delete param.id;
} }
} }
});
}
shared.on('job', function (param) { function initJob(param) {
if (!param.command.end) { if (!param.command.end) {
param.command.end = function () { param.command.end = function () {
shared.emit('jobEnd', param); shared.emit('job:end', param);
}; };
} }
}); }
shared.on('job:new', initJob);
if (options.job_management !== false) {
shared.job_keys = arrayExtend(shared.job_keys || [], ["id"]);
if (typeof options.workspace !== 'object') {
shared.workspace = localStorage;
} else {
shared.workspace = new Workspace(options.workspace);
}
if (!shared.storage_spec_str) {
shared.storage_spec_str = uniqueJSONStringify(shared.storage_spec);
}
shared.job_queue = new JobQueue(
shared.workspace,
'jio/jobs/' + shared.storage_spec_str,
shared.job_keys
);
// Listeners
shared.on('job:new', postJobIfReady);
shared.on('job:started', deferredPutJob);
shared.on('job:stopped', deferredPutJob);
shared.on('job:modified', deferredPutJob);
shared.on('job:notified', deferredPutJob);
shared.on('job:end', removeJob);
}
} }
...@@ -2800,20 +2871,23 @@ function enableJobRecovery(jio, shared, options) { ...@@ -2800,20 +2871,23 @@ function enableJobRecovery(jio, shared, options) {
// uses // uses
// - shared.job_queue JobQueue // - shared.job_queue JobQueue
// emits 'job:new' event
function numberOrDefault(number, default_value) { function numberOrDefault(number, default_value) {
return (typeof number === 'number' && return (typeof number === 'number' &&
isFinite(number) ? number : default_value); isFinite(number) ? number : default_value);
} }
function recoverJob(param) { function recoverJob(param) {
shared.job_queue.load();
shared.job_queue.remove(param.id); shared.job_queue.remove(param.id);
delete param.id; delete param.id;
if (methodType(param.method) === 'writer' || if (methodType(param.method) === 'writer' &&
param.state === 'ready' || (param.state === 'ready' ||
param.state === 'running' || param.state === 'running' ||
param.state === 'waiting') { param.state === 'waiting')) {
shared.job_queue.save(); shared.job_queue.save();
shared.emit('job', param); shared.emit('job:new', param);
} }
} }
...@@ -2870,17 +2944,17 @@ function enableJobReference(jio, shared, options) { ...@@ -2870,17 +2944,17 @@ function enableJobReference(jio, shared, options) {
// creates // creates
// - shared.jobs Object Array // - shared.jobs Object Array
// uses 'job', 'jobEnd' events // uses 'job:new' and 'job:end' events
shared.jobs = []; shared.jobs = [];
var job_references = new ReferenceArray(shared.jobs); var job_references = new ReferenceArray(shared.jobs);
shared.on('job', function (param) { shared.on('job:new', function (param) {
job_references.put(param); job_references.put(param);
}); });
shared.on('jobEnd', function (param) { shared.on('job:end', function (param) {
job_references.remove(param); job_references.remove(param);
}); });
} }
...@@ -2914,9 +2988,9 @@ function enableJobRetry(jio, shared, options) { ...@@ -2914,9 +2988,9 @@ function enableJobRetry(jio, shared, options) {
// - param.options object // - param.options object
// - param.command object // - param.command object
// uses 'job' and 'jobRetry' events // uses 'job:new' and 'job:retry' events
// emits 'job', 'jobFail' and 'jobStateChange' events // emits action 'job:start' event
// job can emit 'jobRetry' // emits 'job:retry', 'job:reject', 'job:modified' and 'job:stopped' events
shared.job_keys = arrayExtend(shared.job_keys || [], ["max_retry"]); shared.job_keys = arrayExtend(shared.job_keys || [], ["max_retry"]);
...@@ -2960,9 +3034,7 @@ function enableJobRetry(jio, shared, options) { ...@@ -2960,9 +3034,7 @@ function enableJobRetry(jio, shared, options) {
2 2
); );
// listeners function initJob(param) {
shared.on('job', function (param) {
if (typeof param.max_retry !== 'number' || param.max_retry < 0) { if (typeof param.max_retry !== 'number' || param.max_retry < 0) {
param.max_retry = positiveNumberOrDefault( param.max_retry = positiveNumberOrDefault(
param.options.max_retry, param.options.max_retry,
...@@ -2971,34 +3043,42 @@ function enableJobRetry(jio, shared, options) { ...@@ -2971,34 +3043,42 @@ function enableJobRetry(jio, shared, options) {
} }
param.command.reject = function (status) { param.command.reject = function (status) {
if (constants.http_action[status || 0] === "retry") { if (constants.http_action[status || 0] === "retry") {
shared.emit('jobRetry', param, arguments); shared.emit('job:retry', param, arguments);
} else { } else {
shared.emit('jobFail', param, arguments); shared.emit('job:reject', param, arguments);
} }
}; };
param.command.retry = function () { param.command.retry = function () {
shared.emit('jobRetry', param, arguments); shared.emit('job:retry', param, arguments);
}; };
}); }
shared.on('jobRetry', function (param, args) { function retryIfRunning(param, args) {
if (param.state === 'running') { if (param.state === 'running') {
if (param.max_retry === undefined || if (param.max_retry === undefined ||
param.max_retry === null || param.max_retry === null ||
param.max_retry >= param.tried) { param.max_retry >= param.tried) {
param.state = 'waiting'; param.state = 'waiting';
param.modified = new Date(); param.modified = new Date();
shared.emit('jobStop', param); shared.emit('job:modified', param);
shared.emit('job:stopped', param);
setTimeout(function () { setTimeout(function () {
param.state = 'ready'; param.state = 'ready';
param.modified = new Date(); param.modified = new Date();
shared.emit('job', param); shared.emit('job:modified', param);
shared.emit('job:start', param);
}, min(10000, param.tried * 2000)); }, min(10000, param.tried * 2000));
} else { } else {
shared.emit('jobFail', param, args); shared.emit('job:reject', param, args);
} }
} }
}); }
// listeners
shared.on('job:new', initJob);
shared.on('job:retry', retryIfRunning);
} }
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */ /*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
...@@ -3016,7 +3096,9 @@ function enableJobTimeout(jio, shared, options) { ...@@ -3016,7 +3096,9 @@ function enableJobTimeout(jio, shared, options) {
// - param.timeout_ident Timeout // - param.timeout_ident Timeout
// - param.state string 'running' // - param.state string 'running'
// uses 'job', 'jobDone', 'jobFail', 'jobRetry' and 'jobNotify' events // uses 'job:new', 'job:stopped', 'job:started',
// 'job:notified' and 'job:end' events
// emits 'job:modified' event
shared.job_keys = arrayExtend(shared.job_keys || [], ["timeout"]); shared.job_keys = arrayExtend(shared.job_keys || [], ["timeout"]);
...@@ -3041,36 +3123,41 @@ function enableJobTimeout(jio, shared, options) { ...@@ -3041,36 +3123,41 @@ function enableJobTimeout(jio, shared, options) {
}; };
} }
// listeners function initJob(job) {
if (typeof job.timeout !== 'number' || job.timeout < 0) {
shared.on('job', function (param) { job.timeout = positiveNumberOrDefault(
if (typeof param.timeout !== 'number' || param.timeout < 0) { job.options.timeout,
param.timeout = positiveNumberOrDefault(
param.options.timeout,
default_timeout default_timeout
); );
} }
param.modified = new Date(); job.modified = new Date();
}); shared.emit('job:modified', job);
}
["jobDone", "jobFail", "jobRetry"].forEach(function (event) { function clearJobTimeout(job) {
shared.on(event, function (param) { clearTimeout(job.timeout_ident);
clearTimeout(param.timeout_ident); delete job.timeout_ident;
delete param.timeout_ident; }
});
});
["jobRun", "jobNotify", "jobEnd"].forEach(function (event) { function restartJobTimeoutIfRunning(job) {
shared.on(event, function (param) { clearTimeout(job.timeout_ident);
clearTimeout(param.timeout_ident); if (job.state === 'running' && job.timeout > 0) {
if (param.state === 'running' && param.timeout > 0) { job.timeout_ident = setTimeout(timeoutReject(job), job.timeout);
param.timeout_ident = setTimeout(timeoutReject(param), param.timeout); job.modified = new Date();
param.modified = new Date();
} else { } else {
delete param.timeout_ident; delete job.timeout_ident;
} }
}); }
});
// listeners
shared.on('job:new', initJob);
shared.on("job:stopped", clearJobTimeout);
shared.on("job:end", clearJobTimeout);
shared.on("job:started", restartJobTimeoutIfRunning);
shared.on("job:notified", restartJobTimeoutIfRunning);
} }
/*jslint indent: 2, maxlen: 80, sloppy: true */ /*jslint indent: 2, maxlen: 80, sloppy: true */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment