Commit 68e27ab3 authored by Tristan Cavelier's avatar Tristan Cavelier

jio.js updated

parent a35b07ec
......@@ -402,6 +402,23 @@ defaults.job_rule_conditions = {};
methodType(b.method) === 'writer';
}
/**
* Compare two jobs and test if they use metadata only
*
* @param {Object} a The first job to compare
* @param {Object} b The second job to compare
* @return {Boolean} True if equal, else false
*/
function useMetadataOnly(a, b) {
if (['post', 'put', 'get', 'remove', 'allDocs'].indexOf(a.method) === -1) {
return false;
}
if (['post', 'put', 'get', 'remove', 'allDocs'].indexOf(b.method) === -1) {
return false;
}
return true;
}
/**
* Compare two jobs and test if they are readers
*
......@@ -481,6 +498,7 @@ defaults.job_rule_conditions = {};
"sameStorageDescription": sameStorageDescription,
"areWriters": areWriters,
"areReaders": areReaders,
"useMetadataOnly": useMetadataOnly,
"sameMethod": sameMethod,
"sameDocumentId": sameDocumentId,
"sameParameters": sameParameters,
......@@ -1189,10 +1207,10 @@ function JIO(storage_spec, options) {
enableJobMaker(this, shared, options);
enableJobReference(this, shared, options);
enableJobRetry(this, shared, options);
enableJobTimeout(this, shared, options);
enableJobChecker(this, shared, options);
enableJobQueue(this, shared, options);
enableJobRecovery(this, shared, options);
enableJobTimeout(this, shared, options);
enableJobExecuter(this, shared, options);
shared.emit('load');
......@@ -2306,7 +2324,9 @@ function enableJobChecker(jio, shared, options) {
// creates
// - shared.job_rules Array
// uses 'job' event
// uses 'job:new' event
// emits 'job:modified', 'job:start', 'job:resolved',
// 'job:end', 'job:reject' events
var i;
......@@ -2315,33 +2335,39 @@ function enableJobChecker(jio, shared, options) {
shared.job_rule_actions = {
wait: function (original_job, new_job) {
original_job.promise.always(function () {
shared.emit('job', new_job);
new_job.state = 'ready';
new_job.modified = new Date();
shared.emit('job:modified', new_job);
shared.emit('job:start', new_job);
});
new_job.state = 'waiting';
new_job.modified = new Date();
shared.emit('job:modified', new_job);
},
update: function (original_job, new_job) {
if (!new_job.solver) {
// promise associated to the job
new_job.state = 'done';
shared.emit('jobDone', new_job);
shared.emit('job:resolved', new_job, []); // XXX why resolve?
shared.emit('job:end', new_job);
} else {
if (!original_job.solver) {
original_job.solver = new_job.solver;
} else {
original_job.promise.then(
new_job.command.resolve,
new_job.command.reject
new_job.command.reject,
new_job.command.notify
);
}
}
new_job.state = 'running';
new_job.modified = new Date();
shared.emit('job:modified', new_job);
},
deny: function (original_job, new_job) {
new_job.state = 'fail';
new_job.modified = new Date();
restCommandRejecter(new_job, [
new_job.state = "running";
shared.emit('job:reject', new_job, [
'precondition_failed',
'command denied',
'Command rejected by the job checker.'
......@@ -2463,7 +2489,7 @@ function enableJobChecker(jio, shared, options) {
}
} else {
// browsing jobs
for (j = 0; j < shared.jobs.length; j += 1) {
for (j = shared.jobs.length - 1; j >= 0; j -= 1) {
if (shared.jobs[j] !== job) {
if (
jobsRespectConditions(
......@@ -2498,10 +2524,11 @@ function enableJobChecker(jio, shared, options) {
],
"action": "update"
}, {
"code_name": "writers update",
"code_name": "metadata writers update",
"conditions": [
"sameStorageDescription",
"areWriters",
"useMetadataOnly",
"sameMethod",
"haveDocumentIds",
"sameParameters"
......@@ -2528,7 +2555,7 @@ function enableJobChecker(jio, shared, options) {
}
}
shared.on('job', checkJob);
shared.on('job:new', checkJob);
}
......@@ -2544,19 +2571,28 @@ function enableJobChecker(jio, shared, options) {
function enableJobExecuter(jio, shared) { // , options) {
// uses 'job', 'jobDone', 'jobFail' and 'jobNotify' events
// emits 'jobRun' and 'jobEnd' events
// uses 'job:new' events
// uses actions 'job:resolve', 'job:reject' and 'job:notify'
// listeners
// emits 'job:modified', 'job:started', 'job:resolved',
// 'job:rejected', 'job:notified' and 'job:end' events
// emits action 'job:start'
shared.on('job', function (param) {
function startJobIfReady(job) {
if (job.state === 'ready') {
shared.emit('job:start', job);
}
}
function executeJobIfReady(param) {
var storage;
if (param.state === 'ready') {
param.tried += 1;
param.started = new Date();
param.state = 'running';
param.modified = new Date();
shared.emit('jobRun', param);
shared.emit('job:modified', param);
shared.emit('job:started', param);
try {
storage = createStorage(deepClone(param.storage_spec));
} catch (e) {
......@@ -2584,35 +2620,49 @@ function enableJobExecuter(jio, shared) { // , options) {
);
});
}
});
}
shared.on('jobDone', function (param, args) {
if (param.state === 'running') {
param.state = 'done';
param.modified = new Date();
shared.emit('jobEnd', param);
if (param.solver) {
restCommandResolver(param, args);
function endAndResolveIfRunning(job, args) {
if (job.state === 'running') {
job.state = 'done';
job.modified = new Date();
shared.emit('job:modified', job);
if (job.solver) {
restCommandResolver(job, args);
}
shared.emit('job:resolved', job, args);
shared.emit('job:end', job);
}
});
}
shared.on('jobFail', function (param, args) {
if (param.state === 'running') {
param.state = 'fail';
param.modified = new Date();
shared.emit('jobEnd', param);
if (param.solver) {
restCommandRejecter(param, args);
function endAndRejectIfRunning(job, args) {
if (job.state === 'running') {
job.state = 'fail';
job.modified = new Date();
shared.emit('job:modified', job);
if (job.solver) {
restCommandRejecter(job, args);
}
shared.emit('job:rejected', job, args);
shared.emit('job:end', job);
}
});
}
shared.on('jobNotify', function (param, args) {
if (param.state === 'running' && param.solver) {
param.solver.notify(args[0]);
function notifyJobIfRunning(job, args) {
if (job.state === 'running' && job.solver) {
job.solver.notify(args[0]);
shared.emit('job:notified', job, args);
}
});
}
// listeners
shared.on('job:new', startJobIfReady);
shared.on('job:start', executeJobIfReady);
shared.on('job:resolve', endAndResolveIfRunning);
shared.on('job:reject', endAndRejectIfRunning);
shared.on('job:notify', notifyJobIfRunning);
}
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
......@@ -2637,10 +2687,16 @@ function enableJobMaker(jio, shared, options) {
// - param.options object
// - param.command object
// uses method events
// add emits 'job' events
// list of job events:
// - Job existence -> new, end
// - Job execution -> started, stopped
// - Job resolution -> resolved, rejected, notified, cancelled
// - Job modification -> modified
// the job can emit 'jobDone', 'jobFail' and 'jobNotify'
// emits actions 'job:resolve', 'job:reject' and 'job:notify'
// uses `rest method` events
// emits 'job:new' event
shared.job_keys = arrayExtend(shared.job_keys || [], [
"created",
......@@ -2653,55 +2709,56 @@ function enableJobMaker(jio, shared, options) {
"options"
]);
function addCommandToJob(param) {
param.command = {};
param.command.resolve = function () {
shared.emit('jobDone', param, arguments);
function addCommandToJob(job) {
job.command = {};
job.command.resolve = function () {
shared.emit('job:resolve', job, arguments);
};
param.command.success = param.command.resolve;
param.command.reject = function () {
shared.emit('jobFail', param, arguments);
job.command.success = job.command.resolve;
job.command.reject = function () {
shared.emit('job:reject', job, arguments);
};
param.command.error = param.command.reject;
param.command.notify = function () {
shared.emit('jobNotify', param, arguments);
job.command.error = job.command.reject;
job.command.notify = function () {
shared.emit('job:notify', job, arguments);
};
param.command.storage = function () {
job.command.storage = function () {
return shared.createRestApi.apply(null, arguments);
};
}
function createJobFromRest(param) {
if (param.solver) {
// rest parameters are good
shared.emit('job:new', param);
}
}
function initJob(job) {
job.state = 'ready';
if (typeof job.tried !== 'number' || !isFinite(job.tried)) {
job.tried = 0;
}
if (!job.created) {
job.created = new Date();
}
addCommandToJob(job);
job.modified = new Date();
}
// listeners
shared.rest_method_names.forEach(function (method) {
shared.on(method, function (param) {
if (param.solver) {
// params are good
shared.emit('job', param);
}
});
shared.on(method, createJobFromRest);
});
shared.on('job', function (param) {
// new or recovered job
param.state = 'ready';
if (typeof param.tried !== 'number' || !isFinite(param.tried)) {
param.tried = 0;
}
if (!param.created) {
param.created = new Date();
}
if (!param.command) {
addCommandToJob(param);
}
param.modified = new Date();
});
shared.on('job:new', initJob);
}
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
/*global arrayExtend, localStorage, Workspace, uniqueJSONStringify, JobQueue,
constants, indexOf */
constants, indexOf, setTimeout, clearTimeout */
function enableJobQueue(jio, shared, options) {
......@@ -2717,8 +2774,56 @@ function enableJobQueue(jio, shared, options) {
// - shared.workspace Workspace
// - shared.job_queue JobQueue
// uses 'job', 'jobRun', 'jobStop', 'jobEnd' events
// emits 'jobEnd' events
// uses 'job:new', 'job:started', 'job:stopped', 'job:modified',
// 'job:notified', 'job:end' events
// emits 'job:end' event
function postJobIfReady(param) {
if (!param.stored && param.state === 'ready') {
clearTimeout(param.queue_ident);
delete param.queue_ident;
shared.job_queue.load();
shared.job_queue.post(param);
shared.job_queue.save();
param.stored = true;
}
}
function deferredPutJob(param) {
if (param.queue_ident === undefined) {
param.queue_ident = setTimeout(function () {
delete param.queue_ident;
if (param.stored) {
shared.job_queue.load();
shared.job_queue.put(param);
shared.job_queue.save();
}
});
}
}
function removeJob(param) {
clearTimeout(param.queue_ident);
delete param.queue_ident;
if (param.stored) {
shared.job_queue.load();
shared.job_queue.remove(param.id);
shared.job_queue.save();
delete param.stored;
delete param.id;
}
}
function initJob(param) {
if (!param.command.end) {
param.command.end = function () {
shared.emit('job:end', param);
};
}
}
shared.on('job:new', initJob);
if (options.job_management !== false) {
......@@ -2740,52 +2845,18 @@ function enableJobQueue(jio, shared, options) {
shared.job_keys
);
shared.on('job', function (param) {
if (indexOf(param.state, ['fail', 'done']) === -1) {
if (!param.stored) {
shared.job_queue.load();
shared.job_queue.post(param);
shared.job_queue.save();
param.stored = true;
}
}
});
// Listeners
['jobRun', 'jobStop'].forEach(function (event) {
shared.on(event, function (param) {
if (param.stored) {
shared.job_queue.load();
if (param.state === 'done' || param.state === 'fail') {
if (shared.job_queue.remove(param.id)) {
shared.job_queue.save();
delete param.storad;
}
} else {
shared.job_queue.put(param);
shared.job_queue.save();
}
}
});
});
shared.on('job:new', postJobIfReady);
shared.on('jobEnd', function (param) {
if (param.stored) {
shared.job_queue.load();
if (shared.job_queue.remove(param.id)) {
shared.job_queue.save();
}
}
});
shared.on('job:started', deferredPutJob);
shared.on('job:stopped', deferredPutJob);
shared.on('job:modified', deferredPutJob);
shared.on('job:notified', deferredPutJob);
}
shared.on('job:end', removeJob);
shared.on('job', function (param) {
if (!param.command.end) {
param.command.end = function () {
shared.emit('jobEnd', param);
};
}
});
}
}
......@@ -2800,20 +2871,23 @@ function enableJobRecovery(jio, shared, options) {
// uses
// - shared.job_queue JobQueue
// emits 'job:new' event
function numberOrDefault(number, default_value) {
return (typeof number === 'number' &&
isFinite(number) ? number : default_value);
}
function recoverJob(param) {
shared.job_queue.load();
shared.job_queue.remove(param.id);
delete param.id;
if (methodType(param.method) === 'writer' ||
param.state === 'ready' ||
param.state === 'running' ||
param.state === 'waiting') {
if (methodType(param.method) === 'writer' &&
(param.state === 'ready' ||
param.state === 'running' ||
param.state === 'waiting')) {
shared.job_queue.save();
shared.emit('job', param);
shared.emit('job:new', param);
}
}
......@@ -2870,17 +2944,17 @@ function enableJobReference(jio, shared, options) {
// creates
// - shared.jobs Object Array
// uses 'job', 'jobEnd' events
// uses 'job:new' and 'job:end' events
shared.jobs = [];
var job_references = new ReferenceArray(shared.jobs);
shared.on('job', function (param) {
shared.on('job:new', function (param) {
job_references.put(param);
});
shared.on('jobEnd', function (param) {
shared.on('job:end', function (param) {
job_references.remove(param);
});
}
......@@ -2914,9 +2988,9 @@ function enableJobRetry(jio, shared, options) {
// - param.options object
// - param.command object
// uses 'job' and 'jobRetry' events
// emits 'job', 'jobFail' and 'jobStateChange' events
// job can emit 'jobRetry'
// uses 'job:new' and 'job:retry' events
// emits action 'job:start' event
// emits 'job:retry', 'job:reject', 'job:modified' and 'job:stopped' events
shared.job_keys = arrayExtend(shared.job_keys || [], ["max_retry"]);
......@@ -2960,9 +3034,7 @@ function enableJobRetry(jio, shared, options) {
2
);
// listeners
shared.on('job', function (param) {
function initJob(param) {
if (typeof param.max_retry !== 'number' || param.max_retry < 0) {
param.max_retry = positiveNumberOrDefault(
param.options.max_retry,
......@@ -2971,34 +3043,42 @@ function enableJobRetry(jio, shared, options) {
}
param.command.reject = function (status) {
if (constants.http_action[status || 0] === "retry") {
shared.emit('jobRetry', param, arguments);
shared.emit('job:retry', param, arguments);
} else {
shared.emit('jobFail', param, arguments);
shared.emit('job:reject', param, arguments);
}
};
param.command.retry = function () {
shared.emit('jobRetry', param, arguments);
shared.emit('job:retry', param, arguments);
};
});
}
shared.on('jobRetry', function (param, args) {
function retryIfRunning(param, args) {
if (param.state === 'running') {
if (param.max_retry === undefined ||
param.max_retry === null ||
param.max_retry >= param.tried) {
param.state = 'waiting';
param.modified = new Date();
shared.emit('jobStop', param);
shared.emit('job:modified', param);
shared.emit('job:stopped', param);
setTimeout(function () {
param.state = 'ready';
param.modified = new Date();
shared.emit('job', param);
shared.emit('job:modified', param);
shared.emit('job:start', param);
}, min(10000, param.tried * 2000));
} else {
shared.emit('jobFail', param, args);
shared.emit('job:reject', param, args);
}
}
});
}
// listeners
shared.on('job:new', initJob);
shared.on('job:retry', retryIfRunning);
}
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
......@@ -3016,7 +3096,9 @@ function enableJobTimeout(jio, shared, options) {
// - param.timeout_ident Timeout
// - param.state string 'running'
// uses 'job', 'jobDone', 'jobFail', 'jobRetry' and 'jobNotify' events
// uses 'job:new', 'job:stopped', 'job:started',
// 'job:notified' and 'job:end' events
// emits 'job:modified' event
shared.job_keys = arrayExtend(shared.job_keys || [], ["timeout"]);
......@@ -3041,36 +3123,41 @@ function enableJobTimeout(jio, shared, options) {
};
}
// listeners
shared.on('job', function (param) {
if (typeof param.timeout !== 'number' || param.timeout < 0) {
param.timeout = positiveNumberOrDefault(
param.options.timeout,
function initJob(job) {
if (typeof job.timeout !== 'number' || job.timeout < 0) {
job.timeout = positiveNumberOrDefault(
job.options.timeout,
default_timeout
);
}
param.modified = new Date();
});
job.modified = new Date();
shared.emit('job:modified', job);
}
["jobDone", "jobFail", "jobRetry"].forEach(function (event) {
shared.on(event, function (param) {
clearTimeout(param.timeout_ident);
delete param.timeout_ident;
});
});
function clearJobTimeout(job) {
clearTimeout(job.timeout_ident);
delete job.timeout_ident;
}
["jobRun", "jobNotify", "jobEnd"].forEach(function (event) {
shared.on(event, function (param) {
clearTimeout(param.timeout_ident);
if (param.state === 'running' && param.timeout > 0) {
param.timeout_ident = setTimeout(timeoutReject(param), param.timeout);
param.modified = new Date();
} else {
delete param.timeout_ident;
}
});
});
function restartJobTimeoutIfRunning(job) {
clearTimeout(job.timeout_ident);
if (job.state === 'running' && job.timeout > 0) {
job.timeout_ident = setTimeout(timeoutReject(job), job.timeout);
job.modified = new Date();
} else {
delete job.timeout_ident;
}
}
// listeners
shared.on('job:new', initJob);
shared.on("job:stopped", clearJobTimeout);
shared.on("job:end", clearJobTimeout);
shared.on("job:started", restartJobTimeoutIfRunning);
shared.on("job:notified", restartJobTimeoutIfRunning);
}
/*jslint indent: 2, maxlen: 80, sloppy: true */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment