Commit 34ecf915 authored by Tristan Cavelier's avatar Tristan Cavelier

replicate synchronizes on allDocs

parent 1f85aad6
......@@ -44,9 +44,7 @@
resolve = require('rsvp').resolve,
addStorageFunction = require('jio').addStorage,
uniqueJSONStringify = require('jio').util.uniqueJSONStringify,
forEach = require('jio').util.forEach,
chain = resolve(),
resolved = chain,
cache = {};
function logandreturn() {
......@@ -65,6 +63,52 @@
return promise.then(null, function (reason) { return reason; });
}
//////////////////////////////////////////////////////////////////////
function RowFIFO() {
this.index = 0;
this.end = 0;
this.length = 0;
this.ids = {};
}
RowFIFO.prototype.extend = function (array) {
var i, l, value;
for (i = 0, l = array.length; i < l; i += 1) {
value = array[i];
if (this.ids[value.id]) { return; }
this.ids[value.id] = true;
this[this.end] = value;
this.end += 1;
this.length += 1;
}
return this;
};
RowFIFO.prototype.push = function () {
this.extend([].slice.call(arguments));
return this.length;
};
RowFIFO.prototype.shift = function () {
if (this.index >= this.end) { return; }
this.length -= 1;
var val = this[this.index];
delete this[this.index];
delete this.ids[val.id];
this.index += 1;
return val;
};
function exportAllDocsRowsToFIFO(this_storage, allDocs) {
var fifo;
fifo = this_storage._cache.rowsToSynchronize =
this_storage._cache.rowsToSynchronize || new RowFIFO();
fifo.extend(allDocs.data.rows);
}
//////////////////////////////////////////////////////////////////////
/**
* firstFulfilled(promises): promises< last_fulfilment_value >
*
......@@ -151,6 +195,36 @@
}, onCancel);
}
function arrayShifter(array, callback) {
var cancelled, p1 = resolve(), p2;
return new Promise(function (done, fail, notify) {
var value;
function next() {
if (array.length) {
try {
value = callback.call(null, array.shift(), array);
} catch (e) {
return fail(e);
}
if (cancelled) { return; }
if (value && typeof value.then === "function") {
p1 = value;
p2 = value.then(next, fail, notify);
} else {
p2 = p2.then(next, fail, notify);
}
return;
}
done();
}
p2 = p1.then(next);
}, function () {
cancelled = true;
if (typeof p1.cancel === "function") { p1.cancel(); }
if (typeof p2.cancel === "function") { p2.cancel(); }
});
}
// //////////////////////////////////////////////////////////////////////
// /**
......@@ -237,13 +311,12 @@
}
}
ReplicateStorage.prototype.syncAllDocs = function (command, alldocs) {
if (this._cache.syncAllDocs) {
return this._cache.syncAllDocs;
ReplicateStorage.prototype.syncRowFIFO = function (command) {
if (this._cache.syncRowFIFO) {
return this._cache.syncRowFIFO;
}
console.log('syncing');
var storage_list = this._storage_list, it = this, cache_storage;
var storage_list = this._storage_list, it = this, cache_storage, p;
if (this._cache_storage) {
cache_storage = command.storage(this._cache_storage);
}
......@@ -252,18 +325,19 @@
return command.storage(description);
});
function returnThe404ReasonsElseNull(reason) {
if (reason.status === 404) {
return 404;
}
return null;
function doNothing() {
return;
}
function getSubStoragesDocument(id) {
return all(storage_list.map(function (storage) {
return storage.get({"_id": id}).
then(null, returnThe404ReasonsElseNull);
return success(storage.get({"_id": id}));
}));
}
function removeSubStorageDocuments(id) {
return all(storage_list.map(function (storage) {
return success(storage.remove({"_id": id}));
}));
}
......@@ -271,22 +345,34 @@
return it.syncGetAnswerList(command, answers);
}
function is404Answer(answer) {
return answer.status === 404;
}
function isSuccessAnswer(answer) {
return answer.result === "success";
}
function checkAnswers(id, answers) {
if (cache_storage) {
if (answers.every(function (answer) {
return answer.status === 404;
})) {
if (answers.every(is404Answer)) {
cache_storage.remove({"_id": id});
} else if (answers.every(function (answer) {
return answer.result === "success";
})) {
} else if (answers.every(isSuccessAnswer)) {
cache_storage.remove({"_id": id});
}
}
}
function deleteCache() {
delete it._cache.syncAllDocs;
delete it._cache.syncRowFIFO;
}
if (cache_storage) {
p = cache_storage.allDocs().then(function (answer) {
exportAllDocsRowsToFIFO(it, answer);
});
} else {
p = chain;
}
/*
......@@ -300,33 +386,34 @@
* - a ko > b ok
* - a ko > b ko
*/
this._cache.syncAllDocs =
forEach(alldocs.data.rows, function (row) {
p = p.then(function () {
return arrayShifter(it._cache.rowsToSynchronize, function (row) {
if (cache_storage) {
return cache_storage.get({"_id": row.id}).then(function (answer) {
if (answer.data.state === "Deleted") {
return removeSubStorageDocuments(row.id).
then(checkAnswers.bind(null, row.id), doNothing);
}
return getSubStoragesDocument(row.id).
then(synchronizeDocument).
then(checkAnswers.bind(null, row.id), doNothing);
}, function (reason) {
if (reason.status === 404) {
return getSubStoragesDocument(row.id).
then(synchronizeDocument).
then(checkAnswers.bind(null, row.id), doNothing);
}
throw reason;
});
}
return getSubStoragesDocument(row.id).
then(synchronizeDocument).
then(checkAnswers.bind(null, row.id));
then(null, doNothing);
});
if (cache_storage) {
this._cache.syncAllDocs = this._cache.syncAllDocs.then(function () {
return cache_storage.allDocs({"include_docs": true});
}).then(function (answers) {
console.log(answers);
forEach(answers.data.rows, function (row) {
if (row.doc.state === "Deleted") {
return it._remove(command, {"_id": row.id}).
then(null, returnThe404ReasonsElseNull).
then(function () {
cache_storage.remove({"_id": row.id});
}).then(null, function () { return; }); // ignore error
}
return getSubStoragesDocument(row.id).
then(synchronizeDocument).
then(checkAnswers.bind(null, row.id));
});
}).then(null, function () { return; }); // ignore error
}
this._cache.syncAllDocs.then(deleteCache, deleteCache);
return this._cache.syncAllDocs;
});
p.then(deleteCache, deleteCache);
this._cache.syncRowFIFO = p;
return p;
};
ReplicateStorage.prototype.syncGetAnswerList = function (command,
......@@ -336,7 +423,7 @@
/*jslint continue: true */
for (i = 0, l = answer_list.length; i < l; i += 1) {
answer = answer_list[i];
if (!answer || answer === 404) { continue; }
if (!answer || answer.result !== "success") { continue; }
if (!winner) {
winner = answer;
winner_index = i;
......@@ -361,9 +448,15 @@
// document synchronisation
for (i = 0, l = answer_list.length; i < l; i += 1) {
answer = answer_list[i];
if (!answer) { continue; }
if (i === winner_index) { continue; }
if (answer === 404) {
if (!answer) {
promise_list.push(resolve({"status": 0}));
continue;
}
if (i === winner_index) {
promise_list.push(resolve({"result": "success"}));
continue;
}
if (answer.status === 404) {
delete winner._id;
promise_list.push(success(
command.storage(this._storage_list[i]).post(winner)
......@@ -373,14 +466,18 @@
// resolving the get method.
continue;
}
delete answer._attachments;
if (uniqueJSONStringify(answer.data) !== winner_str) {
promise_list.push(success(
command.storage(this._storage_list[i]).put(winner)
));
if (answer.result === "success") {
delete answer._attachments;
if (uniqueJSONStringify(answer.data) !== winner_str) {
promise_list.push(success(
command.storage(this._storage_list[i]).put(winner)
));
continue;
}
promise_list.push(resolve({"result": "success"}));
continue;
}
promise_list.push(resolved);
promise_list.push(resolve({"status": 0}));
}
return all(promise_list);
// XXX .then synchronize attachments
......@@ -404,6 +501,13 @@
if (typeof metadata._id !== "string" || metadata._id === "") {
metadata._id = a.id;
}
if (thiz._cache_storage && cache_promise === 0) {
// the metadata is set, but the cache needs to be updated
cache_promise = command.storage(thiz._cache_storage).put({
"_id": metadata._id,
"state": "Updated"
});
}
done(a);
return a;
}, function (e) {
......@@ -413,6 +517,8 @@
"_id": metadata._id,
"state": "Updated"
});
} else {
cache_promise = 0;
}
error_count += 1;
if (error_count === promises.length) {
......@@ -660,7 +766,7 @@
};
ReplicateStorage.prototype._allDocs = function (command, param, option) {
var promise_list = [], index, length = this._storage_list.length;
var promise_list = [], index, me = this, length = me._storage_list.length;
for (index = 0; index < length; index += 1) {
promise_list[index] =
command.storage(this._storage_list[index]).allDocs(option);
......@@ -696,16 +802,15 @@
}
}
return {"data": {"total_rows": (rows || []).length, "rows": rows || []}};
}).then(function (answer) {
exportAllDocsRowsToFIFO(me, answer);
me.syncRowFIFO(command);
return answer;
});
};
ReplicateStorage.prototype.allDocs = function (command, param, option) {
var this_ = this;
return this._allDocs(command, param, option).
then(function (answer) {
this_.syncAllDocs(command, answer);
return answer;
}).
then(command.success, command.error, command.notify);
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment