Commit 6adfec0f authored by Tristan Cavelier's avatar Tristan Cavelier

replicatestorage synchronises alldocs documents

parent c0b1a7fd
......@@ -42,7 +42,8 @@
var Promise = require('rsvp').Promise,
all = require('rsvp').all,
addStorageFunction = require('jio').addStorage,
uniqueJSONStringify = require('jio').util.uniqueJSONStringify;
uniqueJSONStringify = require('jio').util.uniqueJSONStringify,
cache = {};
function success(promise) {
return promise.then(null, function (reason) { return reason; });
......@@ -92,6 +93,48 @@
}, onCancel);
}
function allFulfilled(promises) {
var length = promises.length;
function onCancel() {
var i;
for (i = 0; i < promises.length; i += 1) {
if (typeof promises[i].cancel === "function") {
promises[i].cancel();
}
}
}
if (length === 0) {
return new Promise(function (done) { done([]); });
}
return new Promise(function (resolve, reject, notify) {
var i, count = 0, error_count = 0, results = [];
function resolver(i) {
return function (value) {
count += 1;
results[i] = value;
if (count === length) {
resolve(results);
}
};
}
function rejecter(err) {
error_count += 1;
count += 1;
if (error_count === length) {
reject(err);
}
}
for (i = 0; i < length; i += 1) {
promises[i].then(resolver(i), rejecter, notify);
}
}, onCancel);
}
// //////////////////////////////////////////////////////////////////////
// /**
......@@ -117,9 +160,64 @@
throw new TypeError("ReplicateStorage(): " +
"storage_list is not of type array");
}
var str = uniqueJSONStringify(spec);
cache[str] = cache[str] || {};
this._cache = cache[str];
this._storage_list = spec.storage_list;
}
ReplicateStorage.prototype.syncAllDocs = function (command, alldocs) {
if (this._cache.syncAllDocs) {
return this._cache.syncAllDocs;
}
var storage_list = this._storage_list, this_ = this;
storage_list = storage_list.map(function (description) {
return command.storage(description);
});
function returnThe404ReasonsElseNull(reason) {
if (reason.status === 404) {
return 404;
}
return null;
}
function getSubStoragesDocument(id) {
return all(storage_list.map(function (storage) {
return storage.get({"_id": id}).
then(null, returnThe404ReasonsElseNull);
}));
}
function synchronizeDocument(answers) {
return this_.syncGetAnswerList(command, answers);
}
function checkAnswers(answers) {
var i;
for (i = 0; i < answers.length; i += 1) {
if (answers[i].result !== "success") {
throw answers[i];
}
}
}
function deleteCache() {
delete this_._cache.syncAllDocs;
}
this._cache.syncAllDocs =
jIO.util.forEach(alldocs.data.rows, function (row) {
return getSubStoragesDocument(row.id).
then(synchronizeDocument).
then(checkAnswers);
});
this._cache.syncAllDocs.then(deleteCache, deleteCache);
return this._cache.syncAllDocs;
};
ReplicateStorage.prototype.syncGetAnswerList = function (command,
answer_list) {
var i, l, answer, answer_modified_date, winner, winner_modified_date,
......@@ -251,8 +349,7 @@
* the other storages in the background.
*/
ReplicateStorage.prototype.get = function (command, param, option) {
var promise_list = [], index, length = this._storage_list.length,
answer_list = [], this_ = this;
var promise_list = [], index, length = this._storage_list.length;
for (index = 0; index < length; index += 1) {
promise_list[index] =
command.storage(this._storage_list[index]).get(param, option);
......@@ -260,36 +357,22 @@
new Promise(function (resolve, reject, notify) {
var count = 0, error_count = 0;
function resolver(index) {
return function (answer) {
count += 1;
if (count === 1) {
resolve(answer);
}
answer_list[index] = answer;
if (count + error_count === length && count > 0) {
this_.syncGetAnswerList(command, answer_list);
}
};
function resolver(answer) {
count += 1;
if (count === 1) {
resolve(answer);
}
}
function rejecter(index) {
return function (reason) {
error_count += 1;
if (reason.status === 404) {
answer_list[index] = 404;
}
if (error_count === length) {
reject(reason);
}
if (count + error_count === length && count > 0) {
this_.syncGetAnswerList(command, answer_list);
}
};
function rejecter(reason) {
error_count += 1;
if (error_count === length) {
reject(reason);
}
}
for (index = 0; index < length; index += 1) {
promise_list[index].then(resolver(index), rejecter(index), notify);
promise_list[index].then(resolver, rejecter, notify);
}
}, function () {
for (index = 0; index < length; index += 1) {
......@@ -308,24 +391,26 @@
then(command.success, command.error, command.notify);
};
ReplicateStorage.prototype.allDocs = function (command, param, option) {
ReplicateStorage.prototype._allDocs = function (command, param, option) {
var promise_list = [], index, length = this._storage_list.length;
for (index = 0; index < length; index += 1) {
promise_list[index] =
success(command.storage(this._storage_list[index]).allDocs(option));
command.storage(this._storage_list[index]).allDocs(option);
}
all(promise_list).then(function (answers) {
/*jslint unparam: true */
return allFulfilled(promise_list).then(function (answers) {
/*jslint unparam: false */
// merge responses
var i, j, k, found, rows;
// browsing answers
for (i = 0; i < answers.length; i += 1) {
if (answers[i].result === "success") {
if (answers[i]) {
rows = answers[i].data.rows;
break;
}
}
for (i += 1; i < answers.length; i += 1) {
if (answers[i].result === "success") {
if (answers[i]) {
// browsing answer rows
for (j = 0; j < answers[i].data.rows.length; j += 1) {
found = false;
......@@ -343,8 +428,17 @@
}
}
return {"data": {"total_rows": (rows || []).length, "rows": rows || []}};
}).then(command.success, command.error, command.notify);
/*jslint unparam: true */
});
};
ReplicateStorage.prototype.allDocs = function (command, param, option) {
var this_ = this;
return this._allDocs(command, param, option).
then(function (answer) {
this_.syncAllDocs(command, answer);
return answer;
}).
then(command.success, command.error, command.notify);
};
ReplicateStorage.prototype.check = function (command, param, option) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment