Commit 37d48b2e authored by Tristan Cavelier's avatar Tristan Cavelier

replicate handles cache and delete sync

parent 2abfb7f3
......@@ -41,10 +41,26 @@
var Promise = require('rsvp').Promise,
all = require('rsvp').all,
resolve = require('rsvp').resolve,
addStorageFunction = require('jio').addStorage,
uniqueJSONStringify = require('jio').util.uniqueJSONStringify,
forEach = require('jio').util.forEach,
chain = resolve(),
resolved = chain,
cache = {};
function logandreturn() {
var args = [].slice.call(arguments);
console.log.apply(console, args.map(JSON.stringify));
return args[args.length - 1];
}
function logandthrow() {
var args = [].slice.call(arguments);
console.warn.apply(console, args.map(JSON.stringify));
throw args[args.length - 1];
}
function success(promise) {
return promise.then(null, function (reason) { return reason; });
}
......@@ -155,6 +171,58 @@
// S4() + S4() + S4();
// }
// /**
// * This buffer can queue actions with the `exec` method. If an action is not
// * in execution, then its `exec` method will return the action promise,
// * otherwise it queues the new given action. The first set action will be
// * executed at "t + 1", and after action end, the next action is executed.
// *
// * @class ActionBuffer
// * @constructor
// */
// function ActionBuffer() {
// this._buffers = {};
// }
// ActionBuffer.prototype._shift = function (name) {
// var buffer = this._buffers[name];
// if (buffer) {
// if (buffer.length > 1) {
// buffer.shift();
// return;
// }
// delete this._buffers[name];
// }
// };
// ActionBuffer.prototype.exec = function (name, action) {
// var p, res, buffer = this._buffers[name], this_ = this;
// function shiftAndReturn(answer) {
// this_._shift(name);
// return answer;
// }
// function shiftAndThrow(reason) {
// this_._shift(name);
// throw reason;
// }
// if (buffer) {
// if (buffer.length > 1) {
// return buffer[1];
// }
// buffer.push(buffer[0].then(action).then(shiftAndReturn, shiftAndThrow));
// return buffer[1];
// }
// p = RSVP.resolve();
// res = p.then(function () {
// this_._shift(name);
// return action();
// }).then(shiftAndReturn, shiftAndThrow);
// this._buffers[name] = [null, res];
// return res;
// };
//////////////////////////////////////////////////////////////////////
function ReplicateStorage(spec) {
if (!Array.isArray(spec.storage_list)) {
throw new TypeError("ReplicateStorage(): " +
......@@ -164,19 +232,27 @@
cache[str] = cache[str] || {};
this._cache = cache[str];
this._storage_list = spec.storage_list;
if (typeof spec.cache_storage === "object" && spec.cache_storage !== null) {
this._cache_storage = spec.cache_storage;
}
}
ReplicateStorage.prototype.syncAllDocs = function (command, alldocs) {
if (this._cache.syncAllDocs) {
return this._cache.syncAllDocs;
}
console.log('syncing');
var storage_list = this._storage_list, this_ = this;
var storage_list = this._storage_list, it = this, cache_storage;
if (this._cache_storage) {
cache_storage = command.storage(this._cache_storage);
}
storage_list = storage_list.map(function (description) {
return command.storage(description);
});
function returnThe404ReasonsElseNull(reason) {
if (reason.status === 404) {
return 404;
......@@ -192,28 +268,63 @@
}
function synchronizeDocument(answers) {
return this_.syncGetAnswerList(command, answers);
return it.syncGetAnswerList(command, answers);
}
function checkAnswers(answers) {
var i;
for (i = 0; i < answers.length; i += 1) {
if (answers[i].result !== "success") {
throw answers[i];
function checkAnswers(id, answers) {
if (cache_storage) {
if (answers.every(function (answer) {
return answer.status === 404;
})) {
cache_storage.remove({"_id": id});
} else if (answers.every(function (answer) {
return answer.result === "success";
})) {
cache_storage.remove({"_id": id});
}
}
}
function deleteCache() {
delete this_._cache.syncAllDocs;
delete it._cache.syncAllDocs;
}
/*
* `a` and `b` are storage get responses, `c` is cache storage:
*
* - a 404 -> b 404 > c remove sync
*
* `a` and `b` are storage sync responses, `c` is cache storage:
*
* - a ok > b ok > c remove sync
* - a ko > b ok
* - a ko > b ko
*/
this._cache.syncAllDocs =
jIO.util.forEach(alldocs.data.rows, function (row) {
forEach(alldocs.data.rows, function (row) {
return getSubStoragesDocument(row.id).
then(synchronizeDocument).
then(checkAnswers.bind(null, row.id));
});
if (cache_storage) {
this._cache.syncAllDocs = this._cache.syncAllDocs.then(function () {
return cache_storage.allDocs({"include_docs": true});
}).then(function (answers) {
console.log(answers);
forEach(answers.data.rows, function (row) {
if (row.doc.state === "Deleted") {
return it._remove(command, {"_id": row.id}).
then(null, returnThe404ReasonsElseNull).
then(function () {
cache_storage.remove({"_id": row.id});
}).then(null, function () { return; }); // ignore error
}
return getSubStoragesDocument(row.id).
then(synchronizeDocument).
then(checkAnswers);
then(checkAnswers.bind(null, row.id));
});
}).then(null, function () { return; }); // ignore error
}
this._cache.syncAllDocs.then(deleteCache, deleteCache);
return this._cache.syncAllDocs;
};
......@@ -267,63 +378,168 @@
promise_list.push(success(
command.storage(this._storage_list[i]).put(winner)
));
continue;
}
promise_list.push(resolved);
}
return all(promise_list);
// XXX .then synchronize attachments
};
ReplicateStorage.prototype.post = function (command, metadata, option) {
var promise_list = [], index, length = this._storage_list.length;
// if (!isDate(metadata.modified)) {
// command.error(
// 409,
// "invalid 'modified' metadata",
// "The metadata 'modified' should be a valid date string or date object"
// );
// return;
// }
for (index = 0; index < length; index += 1) {
promise_list[index] =
command.storage(this._storage_list[index]).post(metadata, option);
/**
* Post a document, returns the first response received.
*
* `a` and `b` are storage put responses, `c` is cache storage:
*
* - a ok > b ok
* - a ko > c put sync > b ok
*/
ReplicateStorage.prototype._post = function (command, metadata, option) {
var promises, error_count = 0, thiz = this, cache_promise;
return new Promise(function (done, fail, notify) {
promises = thiz._storage_list.map(function (desc) {
return chain.then(function () {
return command.storage(desc).post(metadata, option);
}).then(function (a) {
if (typeof metadata._id !== "string" || metadata._id === "") {
metadata._id = a.id;
}
done(a);
return a;
}, function (e) {
if (thiz._cache_storage && !cache_promise &&
typeof metadata._id === "string" && metadata._id !== "") {
cache_promise = command.storage(thiz._cache_storage).put({
"_id": metadata._id,
"state": "Updated"
});
}
firstFulfilled(promise_list).
then(command.success, command.error, command.notify);
error_count += 1;
if (error_count === promises.length) {
fail(e);
}
throw e;
}, notify);
});
}, function () {
promises.forEach(function (promise) {
promise.cancel();
});
});
};
ReplicateStorage.prototype.put = function (command, metadata, option) {
var promise_list = [], index, length = this._storage_list.length;
// if (!isDate(metadata.modified)) {
// command.error(
// 409,
// "invalid 'modified' metadata",
// "The metadata 'modified' should be a valid date string or date object"
// );
// return;
// }
for (index = 0; index < length; index += 1) {
promise_list[index] =
command.storage(this._storage_list[index]).put(metadata, option);
/**
* Put a document, returns the first response received.
*
* `a` and `b` are storage put responses, `c` is cache storage:
*
* - a ok > b ok > c remove sync
* - a ko > c put sync > b ok
*/
ReplicateStorage.prototype._put = function (command, metadata, option) {
var promises, error_count = 0, count = 0, thiz = this, cache_promise;
return new Promise(function (done, fail, notify) {
promises = thiz._storage_list.map(function (desc) {
return chain.then(function () {
return command.storage(desc).put(metadata, option);
}).then(function (a) {
if (thiz._cache_storage) {
count += 1;
if (count === promises.length) {
command.storage(thiz._cache_storage).remove({
"_id": metadata._id
});
}
firstFulfilled(promise_list).
then(command.success, command.error, command.notify);
}
done(a);
return a;
}, function (e) {
if (thiz._cache_storage && !cache_promise) {
cache_promise = command.storage(thiz._cache_storage).put({
"_id": metadata._id,
"state": "Updated"
});
}
error_count += 1;
if (error_count === promises.length) {
fail(e);
}
throw e;
}, notify);
});
}, function () {
promises.forEach(function (promise) {
promise.cancel();
});
});
};
ReplicateStorage.prototype.putAttachment = function (command, param, option) {
var promise_list = [], index, length = this._storage_list.length;
for (index = 0; index < length; index += 1) {
promise_list[index] =
command.storage(this._storage_list[index]).putAttachment(param, option);
/**
* Remove a document, returns the first response received.
*
* `a` and `b` are storage put responses, `c` is cache storage:
*
* - a ok > b ok > c remove sync
* - a !404 > c put sync > b ok
*/
ReplicateStorage.prototype._remove = function (command, metadata, option) {
var promises, error_count = 0, count = 0, thiz = this, cache_promise;
return new Promise(function (done, fail, notify) {
promises = thiz._storage_list.map(function (desc) {
return chain.then(function () {
return command.storage(desc).remove(metadata, option);
}).then(function (a) {
if (thiz._cache_storage) {
count += 1;
if (count === promises.length) {
command.storage(thiz._cache_storage).remove({
"_id": metadata._id
});
}
firstFulfilled(promise_list).
}
done(a);
return a;
}, function (e) {
if (e.status !== 404 && thiz._cache_storage && !cache_promise) {
cache_promise = command.storage(thiz._cache_storage).put({
"_id": metadata._id,
"state": "Deleted"
});
}
error_count += 1;
if (error_count === promises.length) {
fail(e);
}
throw e;
}, notify);
});
}, function () {
promises.forEach(function (promise) {
promise.cancel();
});
});
};
ReplicateStorage.prototype.post = function (command, metadata, option) {
return this._post(command, metadata, option).
then(command.success, command.error, command.notify);
};
ReplicateStorage.prototype.put = function (command, metadata, option) {
return this._put(command, metadata, option).
then(command.success, command.error, command.notify);
};
ReplicateStorage.prototype.remove = function (command, param, option) {
return this._remove(command, param, option).
then(command.success, command.error, command.notify);
};
ReplicateStorage.prototype.putAttachment = function (command, param, option) {
var promise_list = [], index, length = this._storage_list.length;
for (index = 0; index < length; index += 1) {
promise_list[index] =
command.storage(this._storage_list[index]).remove(param, option);
command.storage(this._storage_list[index]).putAttachment(param, option);
}
firstFulfilled(promise_list).
then(command.success, command.error, command.notify);
......@@ -344,6 +560,58 @@
then(command.success, command.error, command.notify);
};
// /**
// * Respond with the first get answer received and synchronize the document to
// * the other storages in the background.
// */
// ReplicateStorage.prototype.get = function (command, param, option) {
// var promise_list = [], index, length = this._storage_list.length,
// answer_list = [], this_ = this;
// for (index = 0; index < length; index += 1) {
// promise_list[index] =
// command.storage(this._storage_list[index]).get(param, option);
// }
// new Promise(function (resolve, reject, notify) {
// var count = 0, error_count = 0;
// function resolver(index) {
// return function (answer) {
// count += 1;
// if (count === 1) {
// resolve(answer);
// }
// answer_list[index] = answer;
// if (count + error_count === length && count > 0) {
// this_.syncGetAnswerList(command, answer_list);
// }
// };
// }
// function rejecter(index) {
// return function (reason) {
// error_count += 1;
// if (reason.status === 404) {
// answer_list[index] = 404;
// }
// if (error_count === length) {
// reject(reason);
// }
// if (count + error_count === length && count > 0) {
// this_.syncGetAnswerList(command, answer_list);
// }
// };
// }
// for (index = 0; index < length; index += 1) {
// promise_list[index].then(resolver(index), rejecter(index), notify);
// }
// }, function () {
// for (index = 0; index < length; index += 1) {
// promise_list[index].cancel();
// }
// }).then(command.success, command.error, command.notify);
// };
/**
* Respond with the first get answer received and synchronize the document to
* the other storages in the background.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment