Commit 31ab09f1 authored by Romain Courteaud's avatar Romain Courteaud

ReplicateStorage: prototype of bulk usage to speed up first replication

parent cbb8be6e
......@@ -258,6 +258,7 @@
return argument_list[0];
});
declareMethod(JioProxyStorage, "get", checkId);
declareMethod(JioProxyStorage, "bulk");
declareMethod(JioProxyStorage, "remove", checkId, function (argument_list) {
return argument_list[0];
});
......@@ -403,7 +404,11 @@
};
JioProxyStorage.prototype.hasCapacity = function (name) {
var storage_method = this.__storage.hasCapacity;
var storage_method = this.__storage.hasCapacity,
capacity_method = this.__storage[name];
if (capacity_method !== undefined) {
return true;
}
if ((storage_method === undefined) ||
!storage_method.apply(this.__storage, arguments)) {
throw new jIO.util.jIOError(
......
......@@ -167,7 +167,8 @@
});
}
function checkLocalCreation(queue, source, destination, id, options) {
function checkLocalCreation(queue, source, destination, id, options,
getMethod) {
var remote_doc;
queue
.push(function () {
......@@ -187,7 +188,7 @@
.push(function () {
// This document was never synced.
// Push it to the remote storage and store sync information
return source.get(id);
return getMethod(id);
})
.push(function (doc) {
var local_hash = generateHash(JSON.stringify(doc)),
......@@ -213,6 +214,34 @@
});
}
function checkBulkLocalCreation(queue, source, destination, id_list,
options) {
queue
.push(function () {
return source.bulk(id_list);
})
.push(function (result_list) {
var i,
sub_queue = new RSVP.Queue();
function getResult(j) {
return function (id) {
if (id !== id_list[j].parameter_list[0]) {
throw new Error("Does not access expected ID " + id);
}
return result_list[j];
};
}
for (i = 0; i < result_list.length; i += 1) {
checkLocalCreation(sub_queue, source, destination,
id_list[i].parameter_list[0],
options, getResult(i));
}
return sub_queue;
});
}
function checkLocalDeletion(queue, destination, id, source) {
var status_hash;
queue
......@@ -312,6 +341,7 @@
.push(function (result_list) {
var i,
local_dict = {},
new_list = [],
signature_dict = {},
key;
for (i = 0; i < result_list[0].data.total_rows; i += 1) {
......@@ -328,15 +358,28 @@
signature_dict[result_list[1].data.rows[i].id] = i;
}
}
if (options.check_creation === true) {
for (key in local_dict) {
if (local_dict.hasOwnProperty(key)) {
if (!signature_dict.hasOwnProperty(key)) {
checkLocalCreation(queue, source, destination, key, options);
if (options.use_bulk_get === true) {
new_list.push({
method: "get",
parameter_list: [key]
});
} else {
checkLocalCreation(queue, source, destination, key,
options, source.get.bind(source));
}
}
}
}
if ((options.use_bulk_get === true) && (new_list.length !== 0)) {
checkBulkLocalCreation(queue, source, destination, new_list,
options);
}
}
for (key in signature_dict) {
if (signature_dict.hasOwnProperty(key)) {
if (local_dict.hasOwnProperty(key)) {
......@@ -404,11 +447,23 @@
}
})
.push(function () {
// Autoactivate bulk if substorage implements it
// Keep it like this until the bulk API is stabilized
var use_bulk_get = false;
try {
use_bulk_get = context._remote_sub_storage.hasCapacity("bulk");
} catch (error) {
if (!((error instanceof jIO.util.jIOError) &&
(error.status_code === 501))) {
throw error;
}
}
if (context._check_remote_modification ||
context._check_remote_creation ||
context._check_remote_deletion) {
return pushStorage(context._remote_sub_storage,
context._local_sub_storage, {
use_bulk_get: use_bulk_get,
check_modification: context._check_remote_modification,
check_creation: context._check_remote_creation,
check_deletion: context._check_remote_deletion
......
......@@ -1825,4 +1825,84 @@
});
});
test("bulk remote document creation", function () {
stop();
expect(3);
var id,
post_id = "123456789",
context = this;
function Storage200Bulk(spec) {
this._sub_storage = jIO.createJIO(spec.sub_storage);
}
Storage200Bulk.prototype.bulk = function (args) {
deepEqual(args, [{
method: "get",
parameter_list: [post_id]
}]);
return this._sub_storage.get(post_id)
.push(function (doc) {
return [doc];
});
};
Storage200Bulk.prototype.post = function (param) {
return this.put(post_id, param);
};
Storage200Bulk.prototype.put = function () {
return this._sub_storage.put.apply(this._sub_storage, arguments);
};
Storage200Bulk.prototype.hasCapacity = function () {
return this._sub_storage.hasCapacity.apply(this._sub_storage, arguments);
};
Storage200Bulk.prototype.buildQuery = function () {
return this._sub_storage.buildQuery.apply(this._sub_storage, arguments);
};
jIO.addStorage(
'replicatestorage200bulk',
Storage200Bulk
);
this.jio = jIO.createJIO({
type: "replicate",
local_sub_storage: {
type: "memory"
},
remote_sub_storage: {
type: "replicatestorage200bulk",
sub_storage: {
type: "memory"
}
}
});
context.jio.__storage._remote_sub_storage.post({"title": "bar"})
.then(function (result) {
id = result;
return context.jio.repair();
})
.then(function () {
return context.jio.get(id);
})
.then(function (result) {
deepEqual(result, {
title: "bar"
});
})
.then(function () {
return context.jio.__storage._signature_sub_storage.get(id);
})
.then(function (result) {
deepEqual(result, {
hash: "6799f3ea80e325b89f19589282a343c376c1f1af"
});
})
.fail(function (error) {
ok(false, error);
})
.always(function () {
start();
});
});
}(jIO, QUnit));
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment