Commit d83a1306 authored by Romain Courteaud's avatar Romain Courteaud

ReplicateStorage: add attachment support

Attachments are synchronized only of explicitely activated from the jIO configuration.
Replication deletes a document only if it doesn't contain any attachment.

Thanks to Cedric Leninivin for his work on this topic.

/reviewed-on nexedi/jio!42
parent a6e2e877
......@@ -39,6 +39,11 @@
return rusha.digestFromString(content);
}
function generateHashFromArrayBuffer(content) {
// XXX Improve performance by moving calculation to WebWorker
return rusha.digestFromArrayBuffer(content);
}
function ReplicateStorage(spec) {
this._query_options = spec.query || {};
......@@ -99,6 +104,36 @@
if (this._check_remote_deletion === undefined) {
this._check_remote_deletion = true;
}
this._check_local_attachment_modification =
spec.check_local_attachment_modification;
if (this._check_local_attachment_modification === undefined) {
this._check_local_attachment_modification = false;
}
this._check_local_attachment_creation =
spec.check_local_attachment_creation;
if (this._check_local_attachment_creation === undefined) {
this._check_local_attachment_creation = false;
}
this._check_local_attachment_deletion =
spec.check_local_attachment_deletion;
if (this._check_local_attachment_deletion === undefined) {
this._check_local_attachment_deletion = false;
}
this._check_remote_attachment_modification =
spec.check_remote_attachment_modification;
if (this._check_remote_attachment_modification === undefined) {
this._check_remote_attachment_modification = false;
}
this._check_remote_attachment_creation =
spec.check_remote_attachment_creation;
if (this._check_remote_attachment_creation === undefined) {
this._check_remote_attachment_creation = false;
}
this._check_remote_attachment_deletion =
spec.check_remote_attachment_deletion;
if (this._check_remote_attachment_deletion === undefined) {
this._check_remote_attachment_deletion = false;
}
}
ReplicateStorage.prototype.remove = function (id) {
......@@ -125,6 +160,32 @@
return this._local_sub_storage.get.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.getAttachment = function () {
return this._local_sub_storage.getAttachment.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.allAttachments = function () {
return this._local_sub_storage.allAttachments.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.putAttachment = function (id) {
if (id === this._signature_hash) {
throw new jIO.util.jIOError(this._signature_hash + " is frozen",
403);
}
return this._local_sub_storage.putAttachment.apply(this._local_sub_storage,
arguments);
};
ReplicateStorage.prototype.removeAttachment = function (id) {
if (id === this._signature_hash) {
throw new jIO.util.jIOError(this._signature_hash + " is frozen",
403);
}
return this._local_sub_storage.removeAttachment.apply(
this._local_sub_storage,
arguments
);
};
ReplicateStorage.prototype.hasCapacity = function () {
return this._local_sub_storage.hasCapacity.apply(this._local_sub_storage,
arguments);
......@@ -143,6 +204,333 @@
// Do not sync the signature document
skip_document_dict[context._signature_hash] = null;
function propagateAttachmentDeletion(skip_attachment_dict,
destination,
id, name) {
return destination.removeAttachment(id, name)
.push(function () {
return context._signature_sub_storage.removeAttachment(id, name);
})
.push(function () {
skip_attachment_dict[name] = null;
});
}
function propagateAttachmentModification(skip_attachment_dict,
destination,
blob, hash, id, name) {
return destination.putAttachment(id, name, blob)
.push(function () {
return context._signature_sub_storage.putAttachment(id, name,
JSON.stringify({
hash: hash
}));
})
.push(function () {
skip_attachment_dict[name] = null;
});
}
function checkAndPropagateAttachment(skip_attachment_dict,
status_hash, local_hash, blob,
source, destination, id, name,
conflict_force, conflict_revert,
conflict_ignore) {
var remote_blob;
return destination.getAttachment(id, name)
.push(function (result) {
remote_blob = result;
return jIO.util.readBlobAsArrayBuffer(remote_blob);
})
.push(function (evt) {
return generateHashFromArrayBuffer(
evt.target.result
);
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
remote_blob = null;
return null;
}
throw error;
})
.push(function (remote_hash) {
if (local_hash === remote_hash) {
// Same modifications on both side
if (local_hash === null) {
// Deleted on both side, drop signature
return context._signature_sub_storage.removeAttachment(id, name)
.push(function () {
skip_attachment_dict[id] = null;
});
}
return context._signature_sub_storage.putAttachment(id, name,
JSON.stringify({
hash: local_hash
}))
.push(function () {
skip_document_dict[id] = null;
});
}
if ((remote_hash === status_hash) || (conflict_force === true)) {
// Modified only locally. No conflict or force
if (local_hash === null) {
// Deleted locally
return propagateAttachmentDeletion(skip_attachment_dict,
destination,
id, name);
}
return propagateAttachmentModification(skip_attachment_dict,
destination, blob,
local_hash, id, name);
}
// Conflict cases
if (conflict_ignore === true) {
return;
}
if ((conflict_revert === true) || (local_hash === null)) {
// Automatically resolve conflict or force revert
if (remote_hash === null) {
// Deleted remotely
return propagateAttachmentDeletion(skip_attachment_dict,
source, id, name);
}
return propagateAttachmentModification(
skip_attachment_dict,
source,
remote_blob,
remote_hash,
id,
name
);
}
// Minimize conflict if it can be resolved
if (remote_hash === null) {
// Copy remote modification remotely
return propagateAttachmentModification(skip_attachment_dict,
destination, blob,
local_hash, id, name);
}
throw new jIO.util.jIOError("Conflict on '" + id +
"' with attachment '" +
name + "'",
409);
});
}
function checkAttachmentSignatureDifference(skip_attachment_dict,
queue, source,
destination, id, name,
conflict_force,
conflict_revert,
conflict_ignore,
is_creation, is_modification) {
var blob,
status_hash;
queue
.push(function () {
// Optimisation to save a get call to signature storage
if (is_creation === true) {
return RSVP.all([
source.getAttachment(id, name),
{hash: null}
]);
}
if (is_modification === true) {
return RSVP.all([
source.getAttachment(id, name),
context._signature_sub_storage.getAttachment(
id,
name,
{format: 'json'}
)
]);
}
throw new jIO.util.jIOError("Unexpected call of"
+ " checkAttachmentSignatureDifference",
409);
})
.push(function (result_list) {
blob = result_list[0];
status_hash = result_list[1].hash;
return jIO.util.readBlobAsArrayBuffer(blob);
})
.push(function (evt) {
var array_buffer = evt.target.result,
local_hash = generateHashFromArrayBuffer(array_buffer);
if (local_hash !== status_hash) {
return checkAndPropagateAttachment(skip_attachment_dict,
status_hash, local_hash, blob,
source, destination, id, name,
conflict_force, conflict_revert,
conflict_ignore);
}
});
}
function checkAttachmentLocalDeletion(skip_attachment_dict,
queue, destination, id, name, source,
conflict_force, conflict_revert,
conflict_ignore) {
var status_hash;
queue
.push(function () {
return context._signature_sub_storage.getAttachment(id, name,
{format: 'json'});
})
.push(function (result) {
status_hash = result.hash;
return checkAndPropagateAttachment(skip_attachment_dict,
status_hash, null, null,
source, destination, id, name,
conflict_force, conflict_revert,
conflict_ignore);
});
}
function pushDocumentAttachment(skip_attachment_dict, id, source,
destination, options) {
var queue = new RSVP.Queue();
return queue
.push(function () {
return RSVP.all([
source.allAttachments(id)
.push(undefined, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return {};
}
throw error;
}),
context._signature_sub_storage.allAttachments(id)
.push(undefined, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return {};
}
throw error;
})
]);
})
.push(function (result_list) {
var local_dict = {},
signature_dict = {},
is_modification,
is_creation,
key;
for (key in result_list[0]) {
if (result_list[0].hasOwnProperty(key)) {
if (!skip_attachment_dict.hasOwnProperty(key)) {
local_dict[key] = null;
}
}
}
for (key in result_list[1]) {
if (result_list[1].hasOwnProperty(key)) {
if (!skip_attachment_dict.hasOwnProperty(key)) {
signature_dict[key] = null;
}
}
}
for (key in local_dict) {
if (local_dict.hasOwnProperty(key)) {
is_modification = signature_dict.hasOwnProperty(key)
&& options.check_modification;
is_creation = !signature_dict.hasOwnProperty(key)
&& options.check_creation;
if (is_modification === true || is_creation === true) {
checkAttachmentSignatureDifference(skip_attachment_dict,
queue, source,
destination, id, key,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
is_creation,
is_modification);
}
}
}
if (options.check_deletion === true) {
for (key in signature_dict) {
if (signature_dict.hasOwnProperty(key)) {
if (!local_dict.hasOwnProperty(key)) {
checkAttachmentLocalDeletion(skip_attachment_dict,
queue, destination, id, key,
source,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore);
}
}
}
}
});
}
function repairDocumentAttachment(id) {
var skip_attachment_dict = {};
return new RSVP.Queue()
.push(function () {
if (context._check_local_attachment_modification ||
context._check_local_attachment_creation ||
context._check_local_attachment_deletion) {
return pushDocumentAttachment(
skip_attachment_dict,
id,
context._local_sub_storage,
context._remote_sub_storage,
{
conflict_force: (context._conflict_handling ===
CONFLICT_KEEP_LOCAL),
conflict_revert: (context._conflict_handling ===
CONFLICT_KEEP_REMOTE),
conflict_ignore: (context._conflict_handling ===
CONFLICT_CONTINUE),
check_modification:
context._check_local_attachment_modification,
check_creation: context._check_local_attachment_creation,
check_deletion: context._check_local_attachment_deletion
}
);
}
})
.push(function () {
if (context._check_remote_attachment_modification ||
context._check_remote_attachment_creation ||
context._check_remote_attachment_deletion) {
return pushDocumentAttachment(
skip_attachment_dict,
id,
context._remote_sub_storage,
context._local_sub_storage,
{
use_revert_post: context._use_remote_post,
conflict_force: (context._conflict_handling ===
CONFLICT_KEEP_REMOTE),
conflict_revert: (context._conflict_handling ===
CONFLICT_KEEP_LOCAL),
conflict_ignore: (context._conflict_handling ===
CONFLICT_CONTINUE),
check_modification:
context._check_remote_attachment_modification,
check_creation: context._check_remote_attachment_creation,
check_deletion: context._check_remote_attachment_deletion
}
);
}
});
}
function propagateModification(source, destination, doc, hash, id,
options) {
var result,
......@@ -158,6 +546,33 @@
post_id = new_id;
return source.put(post_id, doc);
})
.push(function () {
// Copy all attachments
// This is not related to attachment replication
// It's just about not losing user data
return source.allAttachments(id);
})
.push(function (attachment_dict) {
var key,
copy_queue = new RSVP.Queue();
function copyAttachment(name) {
copy_queue
.push(function () {
return source.getAttachment(id, name);
})
.push(function (blob) {
return source.putAttachment(post_id, name, blob);
});
}
for (key in attachment_dict) {
if (attachment_dict.hasOwnProperty(key)) {
copyAttachment(key);
}
}
return copy_queue;
})
.push(function () {
return source.remove(id);
})
......@@ -190,9 +605,27 @@
}
function propagateDeletion(destination, id) {
return destination.remove(id)
// Do not delete a document if it has an attachment
// ie, replication should prevent losing user data
// Synchronize attachments before, to ensure
// all of them will be deleted too
return repairDocumentAttachment(id)
.push(function () {
return context._signature_sub_storage.remove(id);
return destination.allAttachments(id);
})
.push(function (attachment_dict) {
if (JSON.stringify(attachment_dict) === "{}") {
return destination.remove(id)
.push(function () {
return context._signature_sub_storage.remove(id);
});
}
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return;
}
throw error;
})
.push(function () {
skip_document_dict[id] = null;
......@@ -551,6 +984,34 @@
check_deletion: context._check_remote_deletion
});
}
})
.push(function () {
if (context._check_local_attachment_modification ||
context._check_local_attachment_creation ||
context._check_local_attachment_deletion ||
context._check_remote_attachment_modification ||
context._check_remote_attachment_creation ||
context._check_remote_attachment_deletion) {
// Attachments are synchronized if and only if their parent document
// has been also marked as synchronized.
return context._signature_sub_storage.allDocs()
.push(function (result) {
var i,
repair_document_queue = new RSVP.Queue();
function repairDocument(id) {
repair_document_queue
.push(function () {
return repairDocumentAttachment(id);
});
}
for (i = 0; i < result.data.total_rows; i += 1) {
repairDocument(result.data.rows[i].id);
}
return repair_document_queue;
});
}
});
};
......
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment