Commit 60feb380 authored by Nicolas Delaby's avatar Nicolas Delaby

- use path instead of object for PubSync, SubSync

- modify source code to use activity
(by fabien)


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@15166 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent d88724c6
......@@ -1105,7 +1105,7 @@ class ERP5Conduit(XMLSyncUtilsMixin):
roles = self.convertXmlValue(xml.childNodes[0].data,data_type='tokens')
user = self.getAttribute(xml,'id')
roles = list(roles) # Needed for CPS, or we have a CPS error
LOG('local_role: ',0,'user: %s roles: %s' % (repr(user),repr(roles)))
#LOG('local_role: ',0,'user: %s roles: %s' % (repr(user),repr(roles)))
#user = roles[0]
#roles = roles[1:]
if xml.nodeName.find(self.local_role_tag)>=0:
......@@ -1122,14 +1122,14 @@ class ERP5Conduit(XMLSyncUtilsMixin):
"""
conflict_list = []
# We want to add a local role
LOG('addLocalPermissionNode, xml',0,xml)
#LOG('addLocalPermissionNode, xml',0,xml)
if len(xml.childNodes)>0:
roles = self.convertXmlValue(xml.childNodes[0].data,data_type='tokens')
roles = list(roles) # Needed for CPS, or we have a CPS error
else:
roles = ()
permission = self.getAttribute(xml,'id')
LOG('local_role: ',0,'permission: %s roles: %s' % (repr(permission),repr(roles)))
#LOG('local_role: ',0,'permission: %s roles: %s' % (repr(permission),repr(roles)))
#user = roles[0]
#roles = roles[1:]
if xml.nodeName.find(self.local_permission_tag)>=0:
......
......@@ -222,12 +222,13 @@ class PublicationSynchronization(XMLSyncUtils):
return {'has_response':1,'xml':xml_a}
def PubSync(self, publication, msg=None, RESPONSE=None, subscriber=None):
def PubSync(self, publication_path, msg=None, RESPONSE=None, subscriber=None):
"""
This is the synchronization method for the server
"""
#LOG('PubSync',0,'Starting... publication: %s' % str(publication))
#LOG('PubSync',0,'Starting... publication: %s' % str(publication_path))
# Read the request from the client
publication = self.unrestrictedTraverse(publication_path)
xml_client = msg
if xml_client is None:
xml_client = self.readResponse(from_url=publication.getPublicationUrl())
......
......@@ -1039,10 +1039,12 @@ class Subscription(Folder, SyncCode):
o_id = signature.getObjectId()
#try with id param too, because gid is not catalogged
object_list = self.getObjectList(gid = b16decode(gid), id = o_id)
#LOG('getObjectFromGid :',0,'object_list=%s, gid=%s, o_id=%s' % (object_list, gid, o_id))
if o is not None and o in object_list:
return o
#XXX Slow !!!
#LOG('entering in the slow loop of getObjectFromGid !!!',0,'')
object_list = self.getObjectList(gid = b16decode(gid))
#LOG('getObjectFromGid :', 0, 'object_list slow loop=%s, gid=%s' % (object_list, gid))
for o in object_list:
o_gid = self.getGidFromObject(o)
if o_gid == gid:
......@@ -1055,7 +1057,6 @@ class Subscription(Folder, SyncCode):
return the object corresponding to the id
"""
object_list = self.getObjectList(id=id)
#XXX very slow with lot of objects
o = None
for object in object_list:
if object.getId() == id:
......
......@@ -79,12 +79,12 @@ class SubscriptionSynchronization(XMLSyncUtils):
return {'has_response':1,'xml':xml_a}
def SubSync(self, subscription, msg=None, RESPONSE=None):
def SubSync(self, subscription_path, msg=None, RESPONSE=None):
"""
This is the synchronization method for the client
"""
response = None #check if subsync replies to this messages
subscription = self.unrestrictedTraverse(subscription_path)
if msg==None and (subscription.getSubscriptionUrl()).find('file')>=0:
msg = self.readResponse(sync_id=subscription.getSourceURI(),
from_url=subscription.getSubscriptionUrl())
......
......@@ -68,7 +68,8 @@ class SyncCode(Persistent):
PUB_CONFLICT_MERGE = 6
PUB_CONFLICT_CLIENT_WIN = 8
MAX_LINES = 1000
MAX_LINES = 5000
MAX_OBJECTS = 100
action_tag = 'workflow_action'
#NOT_EDITABLE_PROPERTY = ('id','object','uid','xupdate:element',action_tag,
......
......@@ -341,7 +341,7 @@ class SynchronizationTool( SubscriptionSynchronization,
"""
reset a subscription
"""
self.SubSync(self.getSubscription(title))
self.SubSync(self.getSubscription(title).getPath())
if RESPONSE is not None:
RESPONSE.redirect('manageSubscriptions')
......@@ -512,6 +512,7 @@ class SynchronizationTool( SubscriptionSynchronization,
#LOG('getSynchronizationState, subscriber_list:',0,subscriber_list)
for subscriber in subscriber_list:
signature = subscriber.getSignatureFromObjectId(o_id)
#XXX check if signature could be not None ...
if signature is not None:
state = signature.getStatus()
#LOG('getSynchronizationState:',0,'sub.dest :%s, state: %s' % \
......@@ -861,16 +862,20 @@ class SynchronizationTool( SubscriptionSynchronization,
if send:
if isinstance(to_url, str):
if to_url.find('http://')==0:
# XXX Make sure this is not a problem
if domain.domain_type == self.PUB:
return None
# we will send an http response
domain = aq_base(domain)
#LOG('domain.domain_type',0,domain.domain_type)
#LOG("getattr(domain, 'getActivityEnabled', None)",0,getattr(domain, 'getActivityEnabled', None))
#LOG("domain.getActivityEnabled()",0,domain.getActivityEnabled())
if domain.domain_type == self.PUB and not domain.getActivityEnabled():
# not use activity
# XXX Make sure this is not a problem
return None
#use activities to send send an http response
#LOG('sendResponse, will start sendHttpResponse, xml',0,'')
self.activate(activity='RAMQueue').sendHttpResponse(sync_id=sync_id,
to_url=to_url,
xml=xml, domain=domain)
return None
xml=xml,
domain_path=domain.getPath())
elif to_url.find('file://')==0:
filename = to_url[len('file:/'):]
stream = file(filename,'w')
......@@ -886,13 +891,15 @@ class SynchronizationTool( SubscriptionSynchronization,
return xml
security.declarePrivate('sendHttpResponse')
def sendHttpResponse(self, to_url=None, sync_id=None, xml=None, domain=None ):
def sendHttpResponse(self, to_url=None, sync_id=None, xml=None,
domain_path=None ):
domain = self.unrestrictedTraverse(domain_path)
#LOG('sendHttpResponse, self.getPhysicalPath: ',0,self.getPhysicalPath())
#LOG('sendHttpResponse, starting with domain:',0,domain)
#LOG('sendHttpResponse, xml:',0,xml)
if domain is not None:
if domain.domain_type == self.PUB:
return xml
if domain.domain_type == self.PUB and not domain.getActivityEnabled():
return xml
# Retrieve the proxy from os variables
proxy_url = ''
if os.environ.has_key('http_proxy'):
......@@ -928,7 +935,7 @@ class SynchronizationTool( SubscriptionSynchronization,
result = urllib2.urlopen(request).read()
except socket.error, msg:
self.activate(activity='RAMQueue').sendHttpResponse(to_url=to_url,
sync_id=sync_id, xml=xml, domain=domain)
sync_id=sync_id, xml=xml, domain_path=domain.getPath())
LOG('sendHttpResponse, socket ERROR:',0,msg)
LOG('sendHttpResponse, url,data',0,(url, data))
return
......@@ -940,17 +947,18 @@ class SynchronizationTool( SubscriptionSynchronization,
#LOG('sendHttpResponse, before result, domain:',0,domain)
if domain is not None:
if domain.domain_type == self.SUB:
gpg_key = domain.getGPGKey()
if result not in (None,''):
#if gpg_key not in ('',None):
# result = self.sendResponse(domain=domain,xml=result,send=0)
#uf = self.acl_users
#user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
#user = uf.getUserById('syncml').__of__(uf)
#newSecurityManager(None, user)
#self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result)
self.readResponse(sync_id=sync_id,text=result)
if domain.domain_type == self.SUB and not domain.getActivityEnabled():
#if we don't use activity :
gpg_key = domain.getGPGKey()
if result not in (None,''):
#if gpg_key not in ('',None):
# result = self.sendResponse(domain=domain,xml=result,send=0)
#uf = self.acl_users
#user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
#user = uf.getUserById('syncml').__of__(uf)
#newSecurityManager(None, user)
#self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result)
self.readResponse(sync_id=sync_id,text=result)
security.declarePublic('sync')
def sync(self):
......@@ -965,8 +973,8 @@ class SynchronizationTool( SubscriptionSynchronization,
#LOG('sync, message_list:',0,message_list)
if len(message_list) == 0:
for subscription in self.getSubscriptionList():
#LOG('sync, subcription:',0,subscription)
self.activate(activity='RAMQueue').SubSync(subscription)
#LOG('sync, type(subcription):',0,type(subscription))
self.activate(activity='RAMQueue').SubSync(subscription.getPath())
security.declarePublic('readResponse')
def readResponse(self, text=None, sync_id=None, to_url=None, from_url=None):
......@@ -1013,24 +1021,33 @@ class SynchronizationTool( SubscriptionSynchronization,
commands.getstatusoutput('rm -f /tmp/%s.gz.gpg' % filename)
# Get the target and then find the corresponding publication or
# Subscription
#LOG('type(text) : ',0,type(text))
xml = Parse(text)
#XXX this function is not very optimized and should be improved
url = self.getTarget(xml)
for publication in self.getPublicationList():
if publication.getPublicationUrl()==url and \
publication.getTitle()==sync_id:
result = self.PubSync(publication,xml)
# Then encrypt the message
xml = result['xml']
#must be commented because this method is alredy called
#xml = self.sendResponse(xml=xml,domain=publication,send=0)
return xml
if publication.getActivityEnabled():
#use activities to send SyncML data.
self.activate(activity='RAMQueue').PubSync(publication.getPath(),
text)
return ' '
else:
result = self.PubSync(publication.getPath(),xml)
# Then encrypt the message
xml = result['xml']
#must be commented because this method is alredy called
#xml = self.sendResponse(xml=xml,domain=publication,send=0)
return xml
for subscription in self.getSubscriptionList():
if subscription.getSubscriptionUrl()==url and \
subscription.getTitle()==sync_id:
result = self.activate(activity='RAMQueue').SubSync(\
self.getSubscription(sync_id), text)
subscription_path = self.getSubscription(sync_id).getPath()
self.activate(activity='RAMQueue').SubSync(subscription_path,
text)
return ' '
#result = self.SubSync(self.getSubscription(sync_id),xml)
# we use from only if we have a file
......
......@@ -29,7 +29,6 @@
import smtplib
from Products.ERP5SyncML.SyncCode import SyncCode
from Products.ERP5SyncML.Subscription import Signature
from DateTime import DateTime
from StringIO import StringIO
from xml.dom.ext import PrettyPrint
from ERP5Diff import ERP5Diff
......@@ -689,6 +688,7 @@ class XMLSyncUtilsMixin(SyncCode):
if object is not None, this usually means we want to set the
actual xupdate on the signature.
"""
#LOG('getSyncMLData starting...',0,'')
local_gid_list = []
syncml_data = kw.get('syncml_data','')
result = {'finished':1}
......@@ -702,6 +702,7 @@ class XMLSyncUtilsMixin(SyncCode):
#object_gid = domain.getGidFromObject(object)
local_gid_list = map(lambda x: domain.getGidFromObject(x),object_list)
# Objects to remove
#LOG('remove object to remove ...',0,'')
for object_gid in subscriber.getGidList():
if not (object_gid in local_gid_list):
# This is an object to remove
......@@ -721,6 +722,7 @@ class XMLSyncUtilsMixin(SyncCode):
local_gid_list = []
loop = 0
for object_path in subscriber.getRemainingObjectPathList():
#LOG('getRemainingObject :',0,[[subscriber.getRemainingObjectPathList()[i][3] for i in range(5)],[subscriber.getRemainingObjectPathList()[-i][3] for i in range(5)]])
if max is not None and loop >= max:
result['finished'] = 0
break
......@@ -777,6 +779,9 @@ class XMLSyncUtilsMixin(SyncCode):
status = self.PARTIAL
signature.setAction('Add')
xml_string = '<!--' + short_string + '-->'
else:#if there is no partial data,
#we could remove the object from the remain list
subscriber.removeRemainingObjectPath(object_path)
gid = signature.getRid()#in fisrt, we try with rid if there is one
if gid == None:
gid = signature.getGid()
......@@ -964,7 +969,6 @@ class XMLSyncUtilsMixin(SyncCode):
if action.nodeName == 'Add':
# Then store the xml of this new subobject
if object is None:
object_id = domain.generateNewIdWithGenerator(object=destination,gid=gid)
#if object_id is not None:
add_data = conduit.addNode(xml=data_subnode,
object=destination, object_id=object_id)
......@@ -979,7 +983,6 @@ class XMLSyncUtilsMixin(SyncCode):
else:
#Object was retrieve but need to be updated without recreated
#usefull when an object is only deleted by workflow.
object_id = domain.generateNewIdWithGenerator(object=destination,gid=gid)
add_data = conduit.addNode(xml=data_subnode,
object=destination,
object_id=object_id,
......@@ -1309,13 +1312,13 @@ class XMLSyncUtils(XMLSyncUtilsMixin):
# Now we should send confirmations
cmd_id_before_getsyncmldata = cmd_id
cmd_id = cmd_id+1
if getattr(domain, 'getActivityEnabled', None) and domain.getActivityEnabled():
if domain.getActivityEnabled():
#use activities to get SyncML data.
if not (isinstance(remote_xml, str) or isinstance(remote_xml, unicode)):
string_io = StringIO()
PrettyPrint(remote_xml,stream=string_io)
remote_xml = string_io.getvalue()
self.activate().SyncModifActivity(
self.activate(activity='RAMQueue').SyncModifActivity(
domain_relative_url = domain.getRelativeUrl(),
remote_xml = remote_xml,
subscriber_relative_url = subscriber.getRelativeUrl(),
......@@ -1332,25 +1335,30 @@ class XMLSyncUtils(XMLSyncUtilsMixin):
remote_xml=remote_xml,
subscriber=subscriber,
cmd_id=cmd_id,xml_confirmation=xml_confirmation,
conduit=conduit)
conduit=conduit,
max=self.MAX_OBJECTS)
syncml_data = result['syncml_data']
xml_confirmation = result['xml_confirmation']
cmd_id = result['cmd_id']
return self.sendSyncModif(syncml_data, cmd_id_before_getsyncmldata,
subscriber, domain, xml_confirmation,
remote_xml, xml_list, has_status_list, has_response)
remote_xml, xml_list, has_status_list,
has_response)
def SyncModifActivity(self, **kw):
domain = self.unrestrictedTraverse(kw['domain_relative_url'])
subscriber = self.unrestrictedTraverse(kw['subscriber_relative_url'])
conduit = subscriber.getConduit()
result = self.getSyncMLData(domain = domain, subscriber = subscriber,
conduit = conduit, max = 100, **kw)
conduit = conduit, max = self.MAX_OBJECTS, **kw)
syncml_data = result['syncml_data']
cmd_id = result['cmd_id']
kw['syncml_data'] = syncml_data
kw['cmd_id'] = cmd_id
finished = result['finished']
#LOG('finished =',0,finished)
if not finished:
self.activate().SyncModifActivity(**kw)
self.activate(activity='RAMQueue').SyncModifActivity(**kw)
else:
xml_confirmation = result['xml_confirmation']
cmd_id = result['cmd_id']
......
......@@ -196,10 +196,10 @@ class TestERP5SyncMLMixin:
file.write('')
file.close()
nb_message = 1
result = portal_sync.SubSync(subscription)
result = portal_sync.SubSync(subscription.getPath())
while result['has_response']==1:
portal_sync.PubSync(publication)
result = portal_sync.SubSync(subscription)
portal_sync.PubSync(publication.getPath())
result = portal_sync.SubSync(subscription.getPath())
nb_message += 1 + result['has_response']
return nb_message
......@@ -227,16 +227,16 @@ class TestERP5SyncMLMixin:
file.write('')
file.close()
nb_message = 1
result = portal_sync.SubSync(subscription)
result = portal_sync.SubSync(subscription.getPath())
while result['has_response']==1:
# We do thing three times, so that we will test
# if we manage well duplicate messages
portal_sync.PubSync(publication)
portal_sync.PubSync(publication)
portal_sync.PubSync(publication)
result = portal_sync.SubSync(subscription)
result = portal_sync.SubSync(subscription)
result = portal_sync.SubSync(subscription)
portal_sync.PubSync(publication.getPath())
portal_sync.PubSync(publication.getPath())
portal_sync.PubSync(publication.getPath())
result = portal_sync.SubSync(subscription.getPath())
result = portal_sync.SubSync(subscription.getPath())
result = portal_sync.SubSync(subscription.getPath())
nb_message += 1 + result['has_response']
return nb_message
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment