Commit 4be41f5f authored by Cédric Le Ninivin's avatar Cédric Le Ninivin

ReplicateStorage: Add basic support of Attachment

replicatestorage: Add first version of attachment synchronization

ReplicateStorage: add global Signature dict and extended skip

replicate storage: Add support to Attachment list parameter
parent e745c253
......@@ -39,7 +39,13 @@
return rusha.digestFromString(content);
}
function generateHashFromArrayBuffer(content) {
// XXX Improve performance by moving calculation to WebWorker
return rusha.digestFromArrayBuffer(content);
}
function ReplicateStorage(spec) {
var i, i_length;
this._query_options = spec.query || {};
this._local_sub_storage = jIO.createJIO(spec.local_sub_storage);
......@@ -100,6 +106,43 @@
if (this._check_remote_deletion === undefined) {
this._check_remote_deletion = true;
}
this._check_local_attachment_modification =
spec.check_local_attachment_modification;
if (this._check_local_attachment_modification === undefined) {
this._check_local_attachment_modification = true;
}
this._check_local_attachment_creation =
spec.check_local_attachment_creation;
if (this._check_local_attachment_creation === undefined) {
this._check_local_attachment_creation = true;
}
this._check_local_attachment_deletion =
spec.check_local_attachment_deletion;
if (this._check_local_attachment_deletion === undefined) {
this._check_local_attachment_deletion = true;
}
this._check_remote_attachment_modification =
spec.check_remote_attachment_modification;
if (this._check_remote_attachment_modification === undefined) {
this._check_remote_attachment_modification = true;
}
this._check_remote_attachment_creation =
spec.check_remote_attachment_creation;
if (this._check_remote_attachment_creation === undefined) {
this._check_remote_attachment_creation = true;
}
this._check_remote_attachment_deletion =
spec.check_remote_attachment_deletion;
if (this._check_remote_attachment_deletion === undefined) {
this._check_remote_attachment_deletion = true;
}
if (spec.attachment_list) {
i_length = spec.attachment_list.length;
this._attachment_dict = {};
for (i = 0; i < i_length; i += 1) {
this._attachment_dict[spec.attachment_list[i]] = {};
}
}
}
ReplicateStorage.prototype.remove = function (id) {
......@@ -126,6 +169,32 @@
return this._local_sub_storage.get.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.getAttachment = function () {
return this._local_sub_storage.getAttachment.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.allAttachments = function () {
return this._local_sub_storage.allAttachments.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.putAttachment = function (id) {
if (id === this._signature_hash) {
throw new jIO.util.jIOError(this._signature_hash + " is frozen",
403);
}
return this._local_sub_storage.putAttachment.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.removeAttachment = function (id) {
if (id === this._signature_hash) {
throw new jIO.util.jIOError(this._signature_hash + " is frozen",
403);
}
return this._local_sub_storage.removeAttachment.apply(
this._local_sub_storage,
arguments
);
};
ReplicateStorage.prototype.hasCapacity = function () {
return this._local_sub_storage.hasCapacity.apply(this._local_sub_storage,
arguments);
......@@ -139,91 +208,537 @@
ReplicateStorage.prototype.repair = function () {
var context = this,
argument_list = arguments,
skip_document_dict = {};
skip_document_dict = {},
signature_dict = {};
// Do not sync the signature document
skip_document_dict[context._signature_hash] = null;
skip_document_dict[context._signature_hash] = {
skip: true,
skip_attachments: true
};
function allAttachmentMethod(storage, id) {
if (context._attachment_dict) {
return new RSVP.Queue()
.push(function () {
return context._attachment_dict;
});
}
return storage.allAttachments(id);
}
function isElementSkippable(id, attachment_id) {
if (attachment_id === undefined) {
return skip_document_dict[id] !== undefined
&& skip_document_dict[id].skip === true;
}
return skip_document_dict[id] !== undefined
&& (skip_document_dict[id].skip_attachments === true
|| (skip_document_dict[id].attachments !== undefined
&& skip_document_dict[id].attachments[attachment_id] === true));
}
function addElementToSkipList(id, attachment_id) {
if (skip_document_dict[id] === undefined) {
skip_document_dict[id] = {
skip: false,
skip_attachments: false,
attachments: {}
};
}
if (attachment_id === undefined) {
skip_document_dict[id].skip = true;
} else {
skip_document_dict[id].attachments[attachment_id] = true;
}
}
function checkLocalAttachmentDeletion(queue, destination,
id, attachment_id,
source) {
var status_hash,
attachment_blob,
hash_document = signature_dict[id],
attachment_signature_dict = hash_document.attachments_hash;
if (isElementSkippable(id, attachment_id)) {
return;
}
queue
.push(function () {
// NOTE: If we get here, it means a signature exists for the
// attachment but it is no longer present in attachment list
status_hash = attachment_signature_dict[attachment_id];
return destination.getAttachment(id, attachment_id)
.push(function (result) {
attachment_blob = result;
// Calculate Attachment Hash
return new RSVP.Queue()
.push(function () {
return jIO.util.readBlobAsArrayBuffer(attachment_blob);
})
.push(function (evt) {
return generateHashFromArrayBuffer(evt.target.result);
})
.push(function (remote_hash) {
if (remote_hash === status_hash) {
// No modification. Attachment can be removed.
return destination.removeAttachment(id, attachment_id)
.push(function () {
hash_document.updated = true;
delete attachment_signature_dict[attachment_id];
addElementToSkipList(id, attachment_id);
});
}
// Modifications on remote side
// Push them locally
return source.putAttachment(
id,
attachment_id,
attachment_blob
).push(function () {
hash_document.updated = true;
attachment_signature_dict[attachment_id] = remote_hash;
addElementToSkipList(id, attachment_id);
});
});
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
// Note that here destination document deletion is not checked
hash_document.updated = true;
delete attachment_signature_dict[attachment_id];
addElementToSkipList(id, attachment_id);
return;
}
throw error;
});
});
}
function updateAttachmentAndHash(queue,
source, destination,
source_id, destination_id,
attachment_id,
hash_document,
garbage_collect,
attachment_blob_dict) {
var attachment_blob;
if (isElementSkippable(destination_id, attachment_id)) {
return;
}
queue
.push(function () {
return source.getAttachment(source_id, attachment_id);
})
.push(function (result) {
attachment_blob = result;
if (attachment_blob_dict !== undefined) {
attachment_blob_dict[attachment_id] = attachment_blob;
}
if (destination !== undefined && destination_id !== undefined) {
return destination.putAttachment(
destination_id,
attachment_id,
attachment_blob
);
}
return;
})
.push(function () {
if (hash_document !== undefined) {
return new RSVP.Queue()
.push(function () {
return jIO.util.readBlobAsArrayBuffer(attachment_blob);
})
.push(function (evt) {
return generateHashFromArrayBuffer(
evt.target.result
);
})
.push(function (hash) {
hash_document.attachments_hash[attachment_id] = hash;
hash_document.updated = true;
return;
});
}
return;
})
.push(function () {
if (garbage_collect === true) {
// XXX Do we need to make our own garbage collect? Shouldn't we
// trust jIO garbage collect?
return source.removeAttachment(source_id, attachment_id);
}
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
// It is not important if the attachment does not exists
return;
}
throw error;
});
}
function propagateModification(source, destination, doc, hash, id,
function copyDocumentAndAttachments(source, destination,
source_id, destination_id,
doc,
hash_document,
garbage_collect) {
if (garbage_collect === undefined) {
garbage_collect = false;
}
return new RSVP.Queue()
.push(function () {
if (doc !== undefined) {
return doc;
}
return source.get(source_id);
})
.push(function (result) {
if (hash_document.hash === undefined) {
hash_document.hash = generateHash(result);
}
return destination.put(destination_id, result);
})
.push(function () {
return allAttachmentMethod(source, source_id);
})
.push(function (attachments_dict) {
var queue = new RSVP.Queue(),
attachment_id;
for (attachment_id in attachments_dict) {
if (attachments_dict.hasOwnProperty(attachment_id)) {
updateAttachmentAndHash(queue,
source, destination,
source_id, destination_id,
attachment_id,
hash_document,
garbage_collect);
}
}
return queue;
})
.push(function () {
if (source_id === destination_id) {
skip_document_dict[destination_id] = {
skip: true,
skip_attachments: true
};
}
if (garbage_collect === true) {
return source.remove(source_id);
}
return;
})
.push(function () {
if (garbage_collect === true) {
delete signature_dict[source_id];
skip_document_dict[source_id] = {
skip: true,
skip_attachments: true
};
return context._signature_sub_storage.remove(source_id);
}
return;
});
}
function propagateModification(source, destination, doc, id, hash,
options) {
var result,
post_id,
to_skip = true;
post_id;
if (options === undefined) {
options = {};
}
if (options.use_post) {
result = destination.post(doc)
.push(function (new_id) {
to_skip = false;
var hash_document = {
hash: hash,
attachments_hash: {},
updated: true
};
post_id = new_id;
return source.put(post_id, doc);
})
.push(function () {
return source.remove(id);
})
.push(function () {
return context._signature_sub_storage.remove(id);
signature_dict[post_id] = hash_document;
return copyDocumentAndAttachments(source, source,
id, post_id,
doc,
hash_document,
true);
})
.push(function () {
to_skip = true;
return context._signature_sub_storage.put(post_id, {
"hash": hash
});
return copyDocumentAndAttachments(source, destination,
post_id, post_id,
doc,
signature_dict[post_id]);
})
.push(function () {
skip_document_dict[post_id] = null;
return post_id;
});
} else {
result = destination.put(id, doc)
.push(function () {
return context._signature_sub_storage.put(id, {
"hash": hash
});
signature_dict[id].hash = hash;
signature_dict[id].updated = true;
addElementToSkipList(id);
return id;
});
}
return result
return result;
}
function pushAttachment(queue, destination,
id, attachment_id, attachment_blob) {
queue
.push(function () {
if (to_skip) {
skip_document_dict[id] = null;
}
return destination.putAttachment(id, attachment_id, attachment_blob);
});
}
function checkLocalDeletion(queue, destination, id, source) {
var status_hash;
var remote_hash_document = {
hash: undefined,
attachments_hash: {}
},
attachment_blob_dict = {};
// XXX Not sure it needs to be checked there
if (isElementSkippable(id)) {
return;
}
queue
.push(function () {
return context._signature_sub_storage.get(id);
})
.push(function (result) {
status_hash = result.hash;
var hash_document = result;
signature_dict[id] = hash_document;
return destination.get(id)
.push(function (doc) {
var remote_hash = generateHash(stringify(doc));
if (remote_hash === status_hash) {
return destination.remove(id)
.push(function () {
return context._signature_sub_storage.remove(id);
})
.push(function () {
skip_document_dict[id] = null;
});
}
// Modifications on remote side
// Push them locally
return propagateModification(destination, source, doc,
remote_hash, id);
// We first fetch the integrality the document to see if it can
// be removed or it needs to be updated on the source
remote_hash_document.hash = generateHash(stringify(doc));
return allAttachmentMethod(destination, id)
.push(function (attachment_dict) {
var attachment_id,
attachment_queue = new RSVP.Queue();
// All attachments are fetch and their hash is calculated
// XXX I do not see anyway to optimize this at the moment
// Case A: No change, everything is fetch to be sure
// Case B: Everything needs to be fetch to be store in the
// source
for (attachment_id in attachment_dict) {
if (attachment_dict.hasOwnProperty(attachment_id)) {
updateAttachmentAndHash(attachment_queue,
destination, undefined,
id, undefined,
attachment_id,
remote_hash_document,
false,
attachment_blob_dict);
}
}
return attachment_queue;
})
.push(function () {
var attachment_id,
attachments_hash = hash_document.attachments_hash,
remote_attachments_hash =
remote_hash_document.attachments_hash,
modified =
remote_hash_document.hash !== hash_document.hash;
// Compare all hash to looking for a change
for (attachment_id in remote_attachments_hash) {
if (remote_attachments_hash.hasOwnProperty(attachment_id)) {
modified =
attachments_hash[attachment_id] !==
remote_attachments_hash[attachment_id];
if (modified === true) {
break;
}
}
}
if (modified === false) {
// No Modification. Remove the destination version
return destination.remove(id)
.push(function () {
delete signature_dict[id];
return context._signature_sub_storage.remove(id);
})
.push(function () {
skip_document_dict[id] = {
skip: true,
skip_attachments: true
};
return;
});
}
// Modifications on destination side
// Push them locally
return propagateModification(destination, source, doc, id,
remote_hash_document.hash)
.push(function () {
var attachment_id,
attachment_dict =
remote_hash_document.attachments_hash,
attachment_queue = new RSVP.Queue();
for (attachment_id in attachment_dict) {
if (attachment_dict.hasOwnProperty(attachment_id)) {
pushAttachment(attachment_queue,
source,
id, attachment_id,
attachment_blob_dict[attachment_id]
);
}
}
return attachment_queue;
})
.push(function () {
signature_dict[id] = remote_hash_document;
signature_dict[id].updated = true;
skip_document_dict[id] = {
skip: true,
skip_attachments: true
};
return;
});
});
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
// Document has also been removed at destination
return context._signature_sub_storage.remove(id)
.push(function () {
skip_document_dict[id] = null;
skip_document_dict[id] = {
skip: true,
skip_attachments: true
};
delete signature_dict[id];
return;
});
}
throw error;
});
})
.push(function () {
if (signature_dict[id] !== undefined
&& signature_dict[id].updated === true) {
delete signature_dict[id].updated;
return context._signature_sub_storage.put(
id,
signature_dict[id]
);
}
});
}
function checkAttachmentSignatureDifference(queue, source, destination,
id, attachment_id,
options) {
var attachment_blob,
hash_document = signature_dict[id],
attachment_signature_dict = hash_document.attachments_hash;
if (isElementSkippable(id, attachment_id)) {
return;
}
queue
.push(function () {
return source.getAttachment(id, attachment_id);
})
.push(function (result) {
attachment_blob = result;
// Calculate Attachment Hash
return new RSVP.Queue()
.push(function () {
return jIO.util.readBlobAsArrayBuffer(attachment_blob);
})
.push(function (evt) {
return generateHashFromArrayBuffer(evt.target.result);
});
})
.push(function (local_hash) {
var status_hash = attachment_signature_dict[attachment_id];
if (local_hash !== status_hash) {
// Local modification
return destination.getAttachment(id, attachment_id,
{format: "array_buffer"})
.push(function (remote_attachment) {
var remote_hash = generateHashFromArrayBuffer(
remote_attachment
);
if (remote_hash !== status_hash) {
// Modification on both Side
if (remote_hash === local_hash) {
// Same modification on both side.
hash_document.updated = true;
attachment_signature_dict[attachment_id] = local_hash;
addElementToSkipList(id, attachment_id);
return;
}
if (options.conflict_ignore === true) {
// There is a conflict, but noone care
return;
}
if (options.conflict_force !== true) {
throw new jIO.util.jIOError("Conflict on '" + id
+ "' with attachment '"
+ attachment_id + "'",
409);
}
}
return destination.putAttachment(id, attachment_id,
attachment_blob)
.push(function () {
hash_document.updated = true;
attachment_signature_dict[attachment_id] = local_hash;
addElementToSkipList(id, attachment_id);
});
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
// Destination attachment do not exists or has been removed
return destination.putAttachment(id, attachment_id,
attachment_blob)
.push(function () {
hash_document.updated = true;
attachment_signature_dict[attachment_id] = local_hash;
addElementToSkipList(id, attachment_id);
}, function (error) {
// Destination document has been removed
// NOTE: We expect 404 error to raise when putting an
// attachment to a non-existent attachment.
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return copyDocumentAndAttachments(
source,
destination,
id,
id,
undefined,
hash_document
);
}
throw error;
});
}
throw error;
});
}
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
// Handle deletion of the attachment if necessary.
if (options.check_attachment_deletion === true) {
var attachment_deletion_queue = new RSVP.Queue();
checkLocalAttachmentDeletion(attachment_deletion_queue,
destination,
id, attachment_id,
source);
return attachment_deletion_queue;
}
return;
}
throw error;
});
}
......@@ -231,13 +746,19 @@
conflict_force, conflict_ignore,
is_creation, is_modification,
getMethod, options) {
var hash_document;
if (skip_document_dict[id] !== undefined
&& skip_document_dict[id].skip === true
&& skip_document_dict[id].skip_attachments === true) {
return;
}
queue
.push(function () {
// Optimisation to save a get call to signature storage
if (is_creation === true) {
return RSVP.all([
getMethod(id),
{hash: undefined}
{hash: undefined, attachments_hash: {}}
]);
}
if (is_modification === true) {
......@@ -252,9 +773,17 @@
})
.push(function (result_list) {
var doc = result_list[0],
local_hash = generateHash(stringify(doc)),
local_hash,
status_hash = result_list[1].hash;
signature_dict[id] = result_list[1];
hash_document = signature_dict[id];
// XXX Hackish
hash_document.updated = false;
if (isElementSkippable(id)) {
// Move directly to checking attachments
return id;
}
local_hash = generateHash(stringify(doc));
if (local_hash !== status_hash) {
// Local modifications
return destination.get(id)
......@@ -264,15 +793,12 @@
// Modifications on both sides
if (local_hash === remote_hash) {
// Same modifications on both side \o/
return context._signature_sub_storage.put(id, {
"hash": local_hash
})
.push(function () {
skip_document_dict[id] = null;
});
hash_document.updated = true;
hash_document.hash = local_hash;
return id;
}
if (conflict_ignore === true) {
return;
return id;
}
if (conflict_force !== true) {
throw new jIO.util.jIOError("Conflict on '" + id + "': " +
......@@ -281,8 +807,8 @@
409);
}
}
return propagateModification(source, destination, doc,
local_hash, id);
return propagateModification(source, destination, doc, id,
local_hash);
}, function (error) {
var use_post;
if ((error instanceof jIO.util.jIOError) &&
......@@ -296,16 +822,67 @@
// modification
use_post = false;
}
return propagateModification(source, destination, doc,
local_hash, id,
return propagateModification(source, destination, doc, id,
local_hash,
{use_post: use_post});
}
throw error;
});
}
return id;
})
.push(function (current_id) {
id = current_id;
hash_document = signature_dict[id];
if (skip_document_dict[id] !== undefined
&& skip_document_dict.skip_attachments === true) {
// No attachments to check
return {};
}
return allAttachmentMethod(source, id);
})
.push(function (local_attachment_dict) {
var attachment_id,
attachment_creation,
attachment_modification,
attachment_signature_dict = hash_document.attachments_hash,
attachment_queue = new RSVP.Queue();
for (attachment_id in local_attachment_dict) {
if (local_attachment_dict.hasOwnProperty(attachment_id)) {
attachment_modification = options.check_attachment_modification
&& attachment_signature_dict.hasOwnProperty(attachment_id);
attachment_creation = options.check_attachment_creation
&& !attachment_signature_dict.hasOwnProperty(attachment_id);
if (attachment_creation || attachment_modification) {
checkAttachmentSignatureDifference(attachment_queue,
source, destination,
id, attachment_id,
options);
}
}
}
if (options.check_attachment_deletion === true) {
for (attachment_id in attachment_signature_dict) {
if (attachment_signature_dict.hasOwnProperty(attachment_id)) {
if (!local_attachment_dict.hasOwnProperty(attachment_id)) {
checkLocalAttachmentDeletion(attachment_queue, destination,
id, attachment_id,
source);
}
}
}
}
return attachment_queue;
})
.push(function () {
if (hash_document.updated === true) {
delete hash_document.updated;
return context._signature_sub_storage.put(id, hash_document);
}
});
}
function checkBulkSignatureDifference(queue, source, destination, id_list,
document_status_list, options,
conflict_force, conflict_ignore) {
......@@ -360,18 +937,10 @@
is_creation,
key;
for (i = 0; i < result_list[0].data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty(
result_list[0].data.rows[i].id
)) {
local_dict[result_list[0].data.rows[i].id] = i;
}
local_dict[result_list[0].data.rows[i].id] = i;
}
for (i = 0; i < result_list[1].data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty(
result_list[1].data.rows[i].id
)) {
signature_dict[result_list[1].data.rows[i].id] = i;
}
signature_dict[result_list[1].data.rows[i].id] = i;
}
for (key in local_dict) {
if (local_dict.hasOwnProperty(key)) {
......@@ -471,7 +1040,13 @@
CONFLICT_KEEP_REMOTE)),
check_modification: context._check_local_modification,
check_creation: context._check_local_creation,
check_deletion: context._check_local_deletion
check_deletion: context._check_local_deletion,
check_attachment_modification:
context._check_local_attachment_modification,
check_attachment_creation:
context._check_local_attachment_creation,
check_attachment_deletion:
context._check_local_attachment_deletion
});
}
})
......@@ -500,7 +1075,13 @@
CONFLICT_CONTINUE),
check_modification: context._check_remote_modification,
check_creation: context._check_remote_creation,
check_deletion: context._check_remote_deletion
check_deletion: context._check_remote_deletion,
check_attachment_modification:
context._check_remote_attachment_modification,
check_attachment_creation:
context._check_remote_attachment_creation,
check_attachment_deletion:
context._check_remote_attachment_deletion
});
}
});
......
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment