Commit 23c69cc6 authored by Aurel's avatar Aurel

make deletion scalable

parent bd0f9813
...@@ -45,6 +45,7 @@ from Products.ERP5SyncML.XMLSyncUtils import getConduitByName, \ ...@@ -45,6 +45,7 @@ from Products.ERP5SyncML.XMLSyncUtils import getConduitByName, \
buildAnchorFromDate buildAnchorFromDate
from Products.ERP5SyncML.SyncMLConstant import MAX_OBJECTS, ACTIVITY_PRIORITY,\ from Products.ERP5SyncML.SyncMLConstant import MAX_OBJECTS, ACTIVITY_PRIORITY,\
NULL_ANCHOR NULL_ANCHOR
from Products.ERP5SyncML.SyncMLMessage import SyncMLResponse
from Products.ERP5SyncML.Transport.HTTP import HTTPTransport from Products.ERP5SyncML.Transport.HTTP import HTTPTransport
from Products.ERP5SyncML.Transport.File import FileTransport from Products.ERP5SyncML.Transport.File import FileTransport
from Products.ERP5SyncML.Transport.Mail import MailTransport from Products.ERP5SyncML.Transport.Mail import MailTransport
...@@ -143,6 +144,7 @@ class SyncMLSubscription(XMLObject): ...@@ -143,6 +144,7 @@ class SyncMLSubscription(XMLObject):
generated_other_activity = True generated_other_activity = True
r = [x.getId() for x in r] r = [x.getId() for x in r]
message_id_list = self.getNextMessageIdList(id_count=result_count) message_id_list = self.getNextMessageIdList(id_count=result_count)
# XXX maybe (result_count / packet_size) + 1 instead of result_count
message_id_list.reverse() # We pop each id in the following loop message_id_list.reverse() # We pop each id in the following loop
activate = self.getPortalObject().portal_synchronizations.activate activate = self.getPortalObject().portal_synchronizations.activate
callback_method = getattr(activate(**activate_kw), callback) callback_method = getattr(activate(**activate_kw), callback)
...@@ -180,7 +182,7 @@ class SyncMLSubscription(XMLObject): ...@@ -180,7 +182,7 @@ class SyncMLSubscription(XMLObject):
# First register sent message in case we received same message multiple time # First register sent message in case we received same message multiple time
# XXX-must be check according to specification # XXX-must be check according to specification
# XXX-performance killer in scalable environment # XXX-performance killer in scalable environment
# XXX must use memcached instead # XXX maybe use memcached instead for this ?
# self.setLastSentMessage(xml) # self.setLastSentMessage(xml)
# XXX must review all of this # XXX must review all of this
...@@ -465,17 +467,89 @@ class SyncMLSubscription(XMLObject): ...@@ -465,17 +467,89 @@ class SyncMLSubscription(XMLObject):
syncml_response=syncml_response, syncml_response=syncml_response,
simulate=simulate) simulate=simulate)
def _getDeletedData(self, syncml_response): def _getDeletedData(self, syncml_response=None):
""" """
Add delete command to syncml resposne Add delete command to syncml resposne
XXX Delete signature from database when we receive confirmation message
""" """
# XXX not efficient at all, must be review later if self.getIsActivityEnabled():
id_list = [x.getId() for x in self.contentValues() if x.getValidationState() == "not_synchronized"] self.recurseCallMethod(
method_id="getId",
min_depth=1,
max_depth=1,
activate_kw={'priority': ACTIVITY_PRIORITY,
'group_method_id' : "%s/checkAndSendDeleteMessage"
% (self.getRelativeUrl()),
'tag' : "%s_delete" % self.getRelativeUrl()})
self.activate(after_tag="%s_delete" %(self.getRelativeUrl()),
priority=ACTIVITY_PRIORITY+1,
)._sendFinalMessage()
else:
# XXX not efficient at all but must not be used (former way)
syncml_logger.warning("Using non-efficient way to retrieve delete object on %s"
% (self.getRelativeUrl(),))
id_list = [x.getId() for x in self.contentValues() if \
x.getValidationState() == "not_synchronized"]
for gid in id_list: for gid in id_list:
syncml_response.addDeleteCommand(gid=gid) syncml_response.addDeleteCommand(gid=gid)
def _sendFinalMessage(self):
"""
Send an empty message containing the final tag to notify the end of
the "sending_modification" stage of the synchronization
"""
syncml_response = SyncMLResponse()
syncml_response.addHeader(
session_id=self.getSessionId(),
message_id=self.getNextMessageId(),
target=self.getUrlString(),
source=self.getSubscriptionUrlString())
syncml_response.addBody()
syncml_response.addFinal()
final_activate_kw = {
'after_method_id' : ("processServerSynchronization",
"processClientSynchronization"),
'priority' :ACTIVITY_PRIORITY + 1,
'tag' : "%s_delete" %(self.getRelativeUrl(),)
}
syncml_logger.warning("Sending final message for modificationson on %s"
% (self.getRelativeUrl(),))
self.activate(**final_activate_kw).sendMessage(xml=str(syncml_response))
def checkAndSendDeleteMessage(self, message_list):
"""
This is a group method that will be invoked for a message list
It check signature synchronization state to know which one has
to be deleted and send the syncml message
"""
syncml_logger.warning("Checking deleted signature on %s"
% (self.getRelativeUrl(),))
to_delete_id_list = []
for m in message_list:
if m[0].getValidationState() == "not_synchronized":
to_delete_id_list.append(m[0].getId())
syncml_logger.warning("\tdeleted object is %s"
% (to_delete_id_list,))
if len(to_delete_id_list):
syncml_response = SyncMLResponse()
syncml_response.addHeader(
session_id=self.getSessionId(),
message_id=self.getNextMessageId(),
target=self.getUrlString(),
source=self.getSubscriptionUrlString())
syncml_response.addBody()
for gid in to_delete_id_list:
syncml_response.addDeleteCommand(gid=gid)
syncml_logger.info("%s sendDeleteCommand for %s"
% (self.getRelativeUrl(), to_delete_id_list))
self.activate(activity="SQLQueue",
tag="%s_delete" % (self.getRelativeUrl(),),
priority=ACTIVITY_PRIORITY).sendMessage(xml=str(syncml_response))
def _getSyncMLData(self, syncml_response, id_list=None): def _getSyncMLData(self, syncml_response, id_list=None):
""" """
XXX Comment to be fixed XXX Comment to be fixed
......
...@@ -253,13 +253,6 @@ class SyncMLAsynchronousEngine(EngineMixin): ...@@ -253,13 +253,6 @@ class SyncMLAsynchronousEngine(EngineMixin):
'tag' :tag, 'tag' :tag,
'priority' :ACTIVITY_PRIORITY 'priority' :ACTIVITY_PRIORITY
} }
final_activate_kw = {
'activity' : 'SQLQueue',
'after_tag' : tag,
'after_method_id' : ("processServerSynchronization",
"processClientSynchronization"),
'priority' :ACTIVITY_PRIORITY + 1
}
method_kw = { method_kw = {
'subscription_path' : subscription.getRelativeUrl(), 'subscription_path' : subscription.getRelativeUrl(),
} }
...@@ -272,12 +265,8 @@ class SyncMLAsynchronousEngine(EngineMixin): ...@@ -272,12 +265,8 @@ class SyncMLAsynchronousEngine(EngineMixin):
activity_count=pref.getPreferredRetrievalActivityCount(), activity_count=pref.getPreferredRetrievalActivityCount(),
) )
# Then get deleted document # Then get deleted document
# this will act as the final message of this sync part # this will send also the final message of this sync part
activate = subscription.getPortalObject().portal_synchronizations.activate subscription.activate(after_tag=tag)._getDeletedData()
callback_method = getattr(activate(**activate_kw), "sendDeleteCommand")
callback_method(message_id=subscription.getNextMessageId(),
activate_kw=activate_kw,
**method_kw)
return True return True
......
...@@ -474,38 +474,4 @@ class SynchronizationTool(BaseTool): ...@@ -474,38 +474,4 @@ class SynchronizationTool(BaseTool):
subscription.activate(**activate_kw).sendMessage(xml=str(syncml_response)) subscription.activate(**activate_kw).sendMessage(xml=str(syncml_response))
def sendDeleteCommand(self, message_id, subscription_path, activate_kw):
"""
This methods is intented to be called by asynchronous engine in activity to
send delete sync commands for a set of data
As engines are not zodb object, the tool acts as a placeholder for method
that need to be called in activities
"""
subscription = self.restrictedTraverse(subscription_path)
assert subscription is not None, "Impossible to find subscription %s" \
% (subscription_path)
# Build Message
syncml_response = SyncMLResponse()
syncml_response.addHeader(
session_id=subscription.getSessionId(),
message_id=message_id,
target=subscription.getUrlString(),
source=subscription.getSubscriptionUrlString())
syncml_response.addBody()
subscription._getDeletedData(
syncml_response=syncml_response,
)
# Notify that all modifications were sent
syncml_response.addFinal()
# Send the message in activity to prevent recomputing data in case of
# transport failure
syncml_logger.info("%s sendDeleteCommand with final tag"
% (subscription.getRelativeUrl()))
subscription.activate(**activate_kw).sendMessage(xml=str(syncml_response))
InitializeClass(SynchronizationTool) InitializeClass(SynchronizationTool)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment