testERP5SyncML.py 57.2 KB
Newer Older
Sebastien Robin's avatar
Sebastien Robin committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
##############################################################################
#
# Copyright (c) 2004 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################



#
# Skeleton ZopeTestCase
#

from random import randint

import os, sys
if __name__ == '__main__':
    execfile(os.path.join(sys.path[0], 'framework.py'))

# Needed in order to have a log file inside the current folder
os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log')
os.environ['EVENT_LOG_SEVERITY'] = '-300'

from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
47 48
from DateTime import DateTime
from Products.ERP5.Document.Person import Person
Sebastien Robin's avatar
Sebastien Robin committed
49 50
from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager
from Products.ERP5SyncML.Conduit.ERP5Conduit import ERP5Conduit
51
from Products.ERP5SyncML.SyncCode import SyncCode
Sebastien Robin's avatar
Sebastien Robin committed
52 53 54 55 56 57
from zLOG import LOG
import time

class TestERP5SyncML(ERP5TypeTestCase):

  # Different variables used for this test
58
  run_all_test = 1
59
  workflow_id = 'edit_workflow'
Sebastien Robin's avatar
Sebastien Robin committed
60 61
  first_name1 = 'Sebastien'
  last_name1 = 'Robin'
Sebastien Robin's avatar
Sebastien Robin committed
62 63 64 65
  # At the beginning, I was using iso-8859-15 strings, but actually
  # erp5 is using utf-8 everywhere
  #description1 = 'description1 --- $sdfr_sdfsdf_oisfsopf'
  description1 = 'description1 --- $sdfr\xc3\xa7_sdfs\xc3\xa7df_oisfsopf'
66 67 68 69
  lang1 = 'fr'
  format2 = 'html'
  format3 = 'xml'
  format4 = 'txt'
Sebastien Robin's avatar
Sebastien Robin committed
70 71
  first_name2 = 'Jean-Paul'
  last_name2 = 'Smets'
Sebastien Robin's avatar
Sebastien Robin committed
72 73
  #description2 = 'description2@  $*< <<<  ----- >>>></title>&oekd'
  description2 = 'description2\xc3\xa9\xc3\xa0@  $*< <<<  ----- >>>></title>&oekd'
74
  lang2 = 'en'
Sebastien Robin's avatar
Sebastien Robin committed
75 76
  first_name3 = 'Yoshinori'
  last_name3 = 'Okuji'
Sebastien Robin's avatar
Sebastien Robin committed
77 78 79
  #description3 = 'description3 sdf__sdf_df___&&]]]'
  description3 = 'description3 \xc3\xa7sdf__sdf\xc3\xa7\xc3\xa7\xc3\xa7_df___&&\xc3\xa9]]]\xc2\xb0\xc2\xb0\xc2\xb0\xc2\xb0\xc2\xb0\xc2\xb0'
  #description4 = 'description4 sdflkmooo^^^^]]]]]{{{{{{{'
80 81 82
  description4 = 'description4 sdflkmooo^^^^]]]]]{{{{{{{'
  lang3 = 'jp'
  lang4 = 'ca'
Sebastien Robin's avatar
Sebastien Robin committed
83 84 85 86 87 88 89 90 91 92
  xml_mapping = 'asXML'
  id1 = '170'
  id2 = '171'
  pub_id = 'Publication'
  sub_id1 = 'Subscription1'
  sub_id2 = 'Subscription2'
  nb_subscription = 2
  nb_publication = 1
  nb_synchronization = 3
  nb_message_first_synchronization = 6
93 94 95 96 97 98
  subscription_url1 = 'file://tmp/sync_client1'
  subscription_url2 = 'file://tmp/sync_client2'
  publication_url = 'file://tmp/sync_server'
  #publication_url = 'server@localhost'
  #subscription_url1 = 'client1@localhost'
  #subscription_url2 = 'client2@localhost'
Sebastien Robin's avatar
Sebastien Robin committed
99 100 101 102 103

  def getBusinessTemplateList(self):
    """
      Return the list of business templates.

104
      the business template sync_crm give 3 folders:
105
      /person_server 
Sebastien Robin's avatar
Sebastien Robin committed
106 107 108
      /person_client1 : empty
      /person_client2 : empty
    """
109
    #return ('sync_crm',)
110
    return ()
Sebastien Robin's avatar
Sebastien Robin committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126

  def getSynchronizationTool(self):
    return getattr(self.getPortal(), 'portal_synchronizations', None)

  def getPersonClient1(self):
    return getattr(self.getPortal(), 'person_client1', None)

  def getPersonServer(self):
    return getattr(self.getPortal(), 'person_server', None)

  def getPersonClient2(self):
    return getattr(self.getPortal(), 'person_client2', None)

  def getPortalId(self):
    return self.getPortal().getId()

127
  def test_01_HasEverything(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
128
    # Test if portal_synchronizations was created
129
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
130 131
    if not quiet:
      ZopeTestCase._print('\nTest Has Everything ')
132
      LOG('Testing... ',0,'test_01_HasEverything')
Sebastien Robin's avatar
Sebastien Robin committed
133
    self.failUnless(self.getSynchronizationTool()!=None)
134 135 136
    #self.failUnless(self.getPersonServer()!=None)
    #self.failUnless(self.getPersonClient1()!=None)
    #self.failUnless(self.getPersonClient2()!=None)
Sebastien Robin's avatar
Sebastien Robin committed
137

138
  def test_02_AddPublication(self, quiet=0, run=run_all_test):
139
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
140 141
    if not quiet:
      ZopeTestCase._print('\nTest Add a Publication ')
142
      LOG('Testing... ',0,'test_02_AddPublication')
Sebastien Robin's avatar
Sebastien Robin committed
143 144
    portal_id = self.getPortalName()
    portal_sync = self.getSynchronizationTool()
145
    portal_sync.manage_addPublication(self.pub_id,self.publication_url,
146 147
                                      '/%s/person_server' % portal_id,'objectValues',
                                      self.xml_mapping,'ERP5Conduit','')
Sebastien Robin's avatar
Sebastien Robin committed
148 149 150
    pub = portal_sync.getPublication(self.pub_id)
    self.failUnless(pub is not None)

151
  def test_03_AddSubscription1(self, quiet=0, run=run_all_test):
152
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
153 154
    if not quiet:
      ZopeTestCase._print('\nTest Add First Subscription ')
155
      LOG('Testing... ',0,'test_03_AddSubscription1')
Sebastien Robin's avatar
Sebastien Robin committed
156 157
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
158
    portal_sync.manage_addSubscription(self.sub_id1,self.publication_url,
159 160
                          self.subscription_url1,'/%s/person_client1' % portal_id,'objectValues',
                          self.xml_mapping,'ERP5Conduit','')
Sebastien Robin's avatar
Sebastien Robin committed
161 162 163
    sub = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(sub is not None)

164
  def test_04_AddSubscription2(self, quiet=0, run=run_all_test):
165
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
166 167
    if not quiet:
      ZopeTestCase._print('\nTest Add Second Subscription ')
168
      LOG('Testing... ',0,'test_04_AddSubscription2')
Sebastien Robin's avatar
Sebastien Robin committed
169 170
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
171
    portal_sync.manage_addSubscription(self.sub_id2,self.publication_url,
172 173
                          self.subscription_url2,'/%s/person_client2' % portal_id,'objectValues',
                          self.xml_mapping,'ERP5Conduit','')
Sebastien Robin's avatar
Sebastien Robin committed
174 175 176
    sub = portal_sync.getSubscription(self.sub_id2)
    self.failUnless(sub is not None)

177
  def login(self, quiet=0):
Sebastien Robin's avatar
Sebastien Robin committed
178
    uf = self.getPortal().acl_users
179
    uf._doAddUser('seb', '', ['Manager'], [])
180 181
    uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], [])
    uf._doAddUser('syncml', '', ['Manager'], [])
182
    user = uf.getUserById('seb').__of__(uf)
Sebastien Robin's avatar
Sebastien Robin committed
183 184
    newSecurityManager(None, user)

185
  def populatePersonServer(self, quiet=0, run=run_all_test):
186
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
187 188
    if not quiet:
      ZopeTestCase._print('\nTest Populate Person Server ')
189
      LOG('Testing... ',0,'populatePersonServer')
Sebastien Robin's avatar
Sebastien Robin committed
190
    self.login()
191 192 193 194 195 196 197 198 199 200 201 202 203
    portal = self.getPortal()
    if not hasattr(portal,'person_server'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_server')
    if not hasattr(portal,'person_client1'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_client1')
    if not hasattr(portal,'person_client2'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_client2')
Sebastien Robin's avatar
Sebastien Robin committed
204
    person_id = ''
205
    person_server = self.getPersonServer()
Sebastien Robin's avatar
Sebastien Robin committed
206 207 208 209 210 211 212 213 214 215 216 217
    person1 = person_server.newContent(id=self.id1,portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
          'description':self.description1}
    person1.edit(**kw)
    person2 = person_server.newContent(id=self.id2,portal_type='Person')
    kw = {'first_name':self.first_name2,'last_name':self.last_name2,
          'description':self.description2}
    person2.edit(**kw)
    nb_person = len(person_server.objectValues())
    self.failUnless(nb_person==2)
    return nb_person

218 219 220 221
  def setupPublicationAndSubscription(self, quiet=0, run=1):
    self.test_02_AddPublication(quiet=1,run=1)
    self.test_03_AddSubscription1(quiet=1,run=1)
    self.test_04_AddSubscription2(quiet=1,run=1)
222
      
223
  def setupPublicationAndSubscriptionAndGid(self, quiet=0, run=1):
224 225 226 227 228 229 230 231 232 233 234 235 236
    self.setupPublicationAndSubscription(quiet=1,run=1)
    def getGid(object):
      return object.getTitle()
    portal_sync = self.getSynchronizationTool()
    sub1 = portal_sync.getSubscription(self.sub_id1)
    sub2 = portal_sync.getSubscription(self.sub_id2)
    pub = portal_sync.getPublication(self.pub_id)
    pub.setGidGenerator(getGid)
    sub1.setGidGenerator(getGid)
    sub2.setGidGenerator(getGid)
    pub.setIdGenerator('generateNewId')
    sub1.setIdGenerator('generateNewId')
    sub2.setIdGenerator('generateNewId')
Sebastien Robin's avatar
Sebastien Robin committed
237

238
  def test_05_GetSynchronizationList(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
239 240 241
    # This test the getSynchronizationList, ie,
    # We want to see if we retrieve both the subscription
    # and the publication
242
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
243 244
    if not quiet:
      ZopeTestCase._print('\nTest getSynchronizationList ')
245
      LOG('Testing... ',0,'test_05_GetSynchronizationList')
246
    self.setupPublicationAndSubscription(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
247 248 249 250
    portal_sync = self.getSynchronizationTool()
    synchronization_list = portal_sync.getSynchronizationList()
    self.failUnless(len(synchronization_list)==self.nb_synchronization)

251
  def test_06_GetObjectList(self, quiet=0, run=run_all_test):
252 253 254 255 256
    """
    This test the default getObjectList, ie, when the
    query is 'objectValues', and this also test if we enter
    a new method for the query
    """
257
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
258 259
    if not quiet:
      ZopeTestCase._print('\nTest getObjectList ')
260
      LOG('Testing... ',0,'test_06_GetObjectList')
Sebastien Robin's avatar
Sebastien Robin committed
261
    self.login()
262
    self.setupPublicationAndSubscription(quiet=1,run=1)
263
    nb_person = self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
    portal_sync = self.getSynchronizationTool()
    publication_list = portal_sync.getPublicationList()
    publication = publication_list[0]
    object_list = publication.getObjectList()
    self.failUnless(len(object_list)==nb_person)
    # now try to set a different method for query
    def query(object):
      object_list = object.objectValues()
      return_list = []
      for o in object_list:
        if o.getId()==self.id1:
          return_list.append(o)
      return return_list
    publication.setQuery(query)
    object_list = publication.getObjectList()
    self.failUnless(len(object_list)==1)

281
  def test_07_ExportImport(self, quiet=0, run=run_all_test):
282 283 284 285
    """
    We will try to export a person with asXML
    And then try to add it to another folder with a conduit
    """
286
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
287 288
    if not quiet:
      ZopeTestCase._print('\nTest Export and Import ')
289
      LOG('Testing... ',0,'test_07_ExportImport')
Sebastien Robin's avatar
Sebastien Robin committed
290
    self.login()
291
    self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
292 293 294 295 296 297 298 299 300 301
    person_server = self.getPersonServer()
    person_client1 = self.getPersonClient1()
    person = person_server._getOb(self.id1)
    xml_output = person.asXML()
    conduit = ERP5Conduit()
    conduit.addNode(object=person_client1,xml=xml_output)
    self.failUnless(len(person_client1.objectValues())==1)
    new_object = person_client1._getOb(self.id1)
    self.failUnless(new_object.getLastName()==self.last_name1)
    self.failUnless(new_object.getFirstName()==self.first_name1)
302 303 304 305 306
    # XXX We should also looks at the workflow history
    self.failUnless(len(new_object.workflow_history[self.workflow_id])==2)
    s_local_role = person_server.get_local_roles()
    c_local_role = person_client1.get_local_roles()
    self.assertEqual(s_local_role,c_local_role)
Sebastien Robin's avatar
Sebastien Robin committed
307

308
  def synchronize(self, id, run=1):
309 310 311 312
    """
    This just define how we synchronize, we have
    to define it here because it is specific to the unit testing
    """
Sebastien Robin's avatar
Sebastien Robin committed
313
    portal_sync = self.getSynchronizationTool()
314
    #portal_sync.email = None # XXX To be removed
Sebastien Robin's avatar
Sebastien Robin committed
315 316 317 318 319 320 321
    subscription = portal_sync.getSubscription(id)
    publication = None
    for publication in portal_sync.getPublicationList():
      if publication.getPublicationUrl()==subscription.getSubscriptionUrl():
        publication = publication
    self.failUnless(publication is not None)
    # reset files, because we do sync by files
322 323 324 325
    file = open('/tmp/sync_client1','w')
    file.write('')
    file.close()
    file = open('/tmp/sync_client2','w')
Sebastien Robin's avatar
Sebastien Robin committed
326 327 328 329 330 331
    file.write('')
    file.close()
    file = open('/tmp/sync','w')
    file.write('')
    file.close()
    nb_message = 1
332
    result = portal_sync.SubSync(subscription.getTitle())
333
    while result['has_response']==1:
334 335
      portal_sync.PubSync(publication.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
336
      nb_message += 1 + result['has_response']
Sebastien Robin's avatar
Sebastien Robin committed
337 338
    return nb_message

339
  def synchronizeWithBrokenMessage(self, id, run=1):
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
    """
    This just define how we synchronize, we have
    to define it here because it is specific to the unit testing
    """
    portal_sync = self.getSynchronizationTool()
    #portal_sync.email = None # XXX To be removed
    subscription = portal_sync.getSubscription(id)
    publication = None
    for publication in portal_sync.getPublicationList():
      if publication.getPublicationUrl()==subscription.getSubscriptionUrl():
        publication = publication
    self.failUnless(publication is not None)
    # reset files, because we do sync by files
    file = open('/tmp/sync_client1','w')
    file.write('')
    file.close()
    file = open('/tmp/sync_client2','w')
    file.write('')
    file.close()
    file = open('/tmp/sync','w')
    file.write('')
    file.close()
    nb_message = 1
363
    result = portal_sync.SubSync(subscription.getTitle())
364 365 366
    while result['has_response']==1:
      # We do thing three times, so that we will test
      # if we manage well duplicate messages
367 368 369 370 371 372
      portal_sync.PubSync(publication.getTitle())
      portal_sync.PubSync(publication.getTitle())
      portal_sync.PubSync(publication.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
373 374 375
      nb_message += 1 + result['has_response']
    return nb_message

376
  def test_08_FirstSynchronization(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
377 378
    # We will try to populate the folder person_client1
    # with the data form person_server
379
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
380 381
    if not quiet:
      ZopeTestCase._print('\nTest First Synchronization ')
382
      LOG('Testing... ',0,'test_08_FirstSynchronization')
Sebastien Robin's avatar
Sebastien Robin committed
383
    self.login()
384
    self.setupPublicationAndSubscription(quiet=1,run=1)
385
    nb_person = self.populatePersonServer(quiet=1,run=1)
386 387 388
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC)
Sebastien Robin's avatar
Sebastien Robin committed
389 390
    # Synchronize the first client
    nb_message1 = self.synchronize(self.sub_id1)
391 392 393 394 395
    for sub in portal_sync.getSubscriptionList():
      if sub.getTitle() == self.sub_id1:
        self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
      else:
        self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC)
Sebastien Robin's avatar
Sebastien Robin committed
396 397 398
    self.failUnless(nb_message1==self.nb_message_first_synchronization)
    # Synchronize the second client
    nb_message2 = self.synchronize(self.sub_id2)
399 400
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
Sebastien Robin's avatar
Sebastien Robin committed
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
    self.failUnless(nb_message2==self.nb_message_first_synchronization)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    subscription2 = portal_sync.getSubscription(self.sub_id2)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
    self.failUnless(person1_s.getId()==self.id1)
    self.failUnless(person1_s.getFirstName()==self.first_name1)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_c.getId()==self.id1)
    self.failUnless(person1_c.getFirstName()==self.first_name1)
    self.failUnless(person1_c.getLastName()==self.last_name1)
    self.failUnless(len(subscription2.getObjectList())==nb_person)
    person_client2 = self.getPersonClient2()
    person2_c = person_client2._getOb(self.id1)
    self.failUnless(person2_c.getId()==self.id1)
    self.failUnless(person2_c.getFirstName()==self.first_name1)
    self.failUnless(person2_c.getLastName()==self.last_name1)

423
  def test_09_FirstSynchronizationWithLongLines(self, quiet=0, run=run_all_test):
424 425 426 427 428
    # We will try to populate the folder person_client1
    # with the data form person_server
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest First Synchronization With Long Lines ')
429
      LOG('Testing... ',0,'test_09_FirstSynchronizationWithLongLines')
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
    self.login()
    self.setupPublicationAndSubscription(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    person_server = self.getPersonServer()
    long_line = 'a' * 10000 + ' --- '
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':long_line} 
    person1_s.edit(**kw)
    # Synchronize the first client
    nb_message1 = self.synchronize(self.sub_id1)
    self.failUnless(nb_message1==self.nb_message_first_synchronization)
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    self.failUnless(person1_s.getId()==self.id1)
    self.failUnless(person1_s.getFirstName()==long_line)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_c.getId()==self.id1)
    self.failUnless(person1_c.getFirstName()==long_line)
    self.failUnless(person1_c.getLastName()==self.last_name1)

453
  def test_10_GetObjectFromGid(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
454 455
    # We will try to get an object from a publication
    # just by givin the gid
456
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
457 458
    if not quiet:
      ZopeTestCase._print('\nTest getObjectFromGid ')
459
      LOG('Testing... ',0,'test_10_GetObjectFromGid')
Sebastien Robin's avatar
Sebastien Robin committed
460
    self.login()
461
    self.setupPublicationAndSubscription(quiet=1,run=1)
462
    self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
463 464 465 466 467 468 469
    # By default we can just give the id
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
    object = publication.getObjectFromGid(self.id1)
    self.failUnless(object is not None)
    self.failUnless(object.getId()==self.id1)

470
  def test_11_GetSynchronizationState(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
471 472
    # We will try to get the state of objects
    # that are just synchronized,
473
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
474 475
    if not quiet:
      ZopeTestCase._print('\nTest getSynchronizationState ')
476 477
      LOG('Testing... ',0,'test_11_GetSynchronizationState')
    self.test_08_FirstSynchronization(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
478 479 480 481 482 483 484 485 486 487 488 489 490
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    state_list_s = portal_sync.getSynchronizationState(person1_s)
    self.failUnless(len(state_list_s)==self.nb_subscription) # one state
                                                  # for each subscriber
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    state_list_c = portal_sync.getSynchronizationState(person1_c)
    self.failUnless(len(state_list_c)==1) # one state
                                        # for each subscriber
    self.checkSynchronizationStateIsSynchronized()

491
  def checkSynchronizationStateIsSynchronized(self, quiet=0, run=1):
Sebastien Robin's avatar
Sebastien Robin committed
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    for person in person_server.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
        self.failUnless(state[1]==state[0].SYNCHRONIZED)
    person_client1 = self.getPersonClient1()
    for person in person_client1.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
        self.failUnless(state[1]==state[0].SYNCHRONIZED)
    person_client2 = self.getPersonClient2()
    for person in person_client2.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
        self.failUnless(state[1]==state[0].SYNCHRONIZED)
Sebastien Robin's avatar
Sebastien Robin committed
508 509 510 511
    # Check for each signature that the tempXML is None
    for sub in portal_sync.getSubscriptionList():
      for m in sub.getSignatureList():
        self.assertEquals(m.getTempXML(),None)
512
        self.assertEquals(m.getPartialXML(),None)
Sebastien Robin's avatar
Sebastien Robin committed
513 514 515
    for pub in portal_sync.getPublicationList():
      for sub in pub.getSubscriberList():
        for m in sub.getSignatureList():
516
          self.assertEquals(m.getPartialXML(),None)
Sebastien Robin's avatar
Sebastien Robin committed
517

518
  def checkSynchronizationStateIsConflict(self, quiet=0, run=1):
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    for person in person_server.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.failUnless(state[1]==state[0].CONFLICT)
    person_client1 = self.getPersonClient1()
    for person in person_client1.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.failUnless(state[1]==state[0].CONFLICT)
    person_client2 = self.getPersonClient2()
    for person in person_client2.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.failUnless(state[1]==state[0].CONFLICT)
    # make sure sub object are also in a conflict mode
    person = person_client1._getOb(self.id1)
    sub_person = person.newContent(id=self.id1,portal_type='Person')
    state_list = portal_sync.getSynchronizationState(sub_person)
    for state in state_list:
      self.failUnless(state[1]==state[0].CONFLICT)

545
  def test_12_UpdateSimpleData(self, quiet=0, run=run_all_test):
546
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
547 548
    if not quiet:
      ZopeTestCase._print('\nTest Update Simple Data ')
549 550
      LOG('Testing... ',0,'test_12_UpdateSimpleData')
    self.test_08_FirstSynchronization(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_s.edit(**kw)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_s.getFirstName()==self.first_name3)
    self.failUnless(person1_s.getLastName()==self.last_name3)
    self.failUnless(person1_c.getFirstName()==self.first_name3)
    self.failUnless(person1_c.getLastName()==self.last_name3)
    # Then we do only modification on a client
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
568
    #person1_c.setModificationDate(DateTime()+1)
Sebastien Robin's avatar
Sebastien Robin committed
569 570 571 572 573 574 575 576 577 578
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getFirstName()==self.first_name1)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    self.failUnless(person1_c.getFirstName()==self.first_name1)
    self.failUnless(person1_c.getLastName()==self.last_name1)
    # Then we do only modification on both the client and the server
    # and of course, on the same object
    kw = {'first_name':self.first_name3}
    person1_s.edit(**kw)
579
    #person1_s.setModificationDate(DateTime()+2)
Sebastien Robin's avatar
Sebastien Robin committed
580 581
    kw = {'description':self.description3}
    person1_c.edit(**kw)
582
    #person1_c.setModificationDate(DateTime()+2)
583 584 585 586 587 588
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getFirstName()==self.first_name3)
    self.failUnless(person1_s.getDescription()==self.description3)
    self.failUnless(person1_c.getFirstName()==self.first_name3)
    self.failUnless(person1_c.getDescription()==self.description3)
Sebastien Robin's avatar
Sebastien Robin committed
589

590
  def test_13_GetConflictList(self, quiet=0, run=run_all_test):
591
    # We will try to generate a conflict and then to get it
592
    # We will also make sure it contains what we want
593
    if not run: return
594 595
    if not quiet:
      ZopeTestCase._print('\nTest Get Conflict List ')
596 597
      LOG('Testing... ',0,'test_13_GetConflictList')
    self.test_08_FirstSynchronization(quiet=1,run=1)
598 599 600 601 602 603 604 605 606
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person1_s.setDescription(self.description2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person1_c.setDescription(self.description3)
    self.synchronize(self.sub_id1)
607 608 609
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==1)
    conflict = conflict_list[0]
610 611
    self.failUnless(person1_c.getDescription()==self.description3)
    self.failUnless(person1_s.getDescription()==self.description2)
612 613 614 615 616 617
    self.failUnless(conflict.getPropertyId()=='description')
    self.failUnless(conflict.getPublisherValue()==self.description2)
    self.failUnless(conflict.getSubscriberValue()==self.description3)
    subscriber = conflict.getSubscriber()
    self.failUnless(subscriber.getSubscriptionUrl()==self.subscription_url1)

618
  def test_14_GetPublisherAndSubscriberDocument(self, quiet=0, run=run_all_test):
619 620 621 622 623
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Get Publisher And Subscriber Document ')
624 625
      LOG('Testing... ',0,'test_14_GetPublisherAndSubscriberDocument')
    self.test_13_GetConflictList(quiet=1,run=1)
626 627 628 629 630 631 632 633 634 635 636 637 638
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    publisher_document = conflict.getPublisherDocument()
    self.failUnless(publisher_document.getDescription()==self.description2)
    subscriber_document = conflict.getSubscriberDocument()
    self.failUnless(subscriber_document.getDescription()==self.description3)

639
  def test_15_ApplyPublisherValue(self, quiet=0, run=run_all_test):
640 641
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
642
    if not run: return
643
    self.test_13_GetConflictList(quiet=1,run=1)
644 645
    if not quiet:
      ZopeTestCase._print('\nTest Apply Publisher Value ')
646
      LOG('Testing... ',0,'test_15_ApplyPublisherValue')
647 648 649 650 651 652 653 654 655 656 657
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict.applyPublisherValue()
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_c.getDescription()==self.description2)
658
    self.failUnless(person1_s.getDescription()==self.description2)
659 660 661
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==0)

662
  def test_16_ApplySubscriberValue(self, quiet=0, run=run_all_test):
663 664
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
665
    if not run: return
666
    self.test_13_GetConflictList(quiet=1,run=1)
667 668
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
669 670
    if not quiet:
      ZopeTestCase._print('\nTest Apply Subscriber Value ')
671
      LOG('Testing... ',0,'test_16_ApplySubscriberValue')
672 673 674 675 676 677 678 679 680 681 682 683 684
    conflict = conflict_list[0]
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict.applySubscriberValue()
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getDescription()==self.description3)
    self.failUnless(person1_c.getDescription()==self.description3)
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==0)

685 686 687 688 689 690 691 692
  def populatePersonServerWithSubObject(self, quiet=0, run=run_all_test):
    """
    Before this method, we need to call populatePersonServer
    Then it will give the following tree :
    - person_server :
      - id1
        - id1
          - id2
693
          - id1
694 695 696 697 698 699 700 701 702 703 704 705
      - id2
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Populate Person Server With Sub Object ')
      LOG('Testing... ',0,'populatePersonServerWithSubObject')
    person_server = self.getPersonServer()
    person1 = person_server._getOb(self.id1)
    sub_person1 = person1.newContent(id=self.id1,portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
          'description':self.description1}
    sub_person1.edit(**kw)
706 707 708 709 710
    sub_sub_person1 = sub_person1.newContent(id=self.id1,portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
          'description':self.description1}
    sub_sub_person1.edit(**kw)
    sub_sub_person2 = sub_person1.newContent(id=self.id2,portal_type='Person')
711 712
    kw = {'first_name':self.first_name2,'last_name':self.last_name2,
          'description':self.description2}
713
    sub_sub_person2.edit(**kw)
714
    # remove ('','portal...','person_server')
715 716 717
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
718
    self.failUnless(len_path==3)
719

720
  def test_17_AddSubObject(self, quiet=0, run=run_all_test):
721 722 723 724 725 726
    """
    In this test, we synchronize, then add sub object on the
    server and then see if the next synchronization will also
    create sub-objects on the client
    """
    if not run: return
727
    self.test_08_FirstSynchronization(quiet=1,run=1)
728 729
    if not quiet:
      ZopeTestCase._print('\nTest Add Sub Object ')
730
      LOG('Testing... ',0,'test_17_AddSubObject')
731 732
    self.populatePersonServerWithSubObject(quiet=1,run=1)
    self.synchronize(self.sub_id1)
733
    self.synchronize(self.sub_id2)
734 735 736 737
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
738 739
    sub_sub_person1 = sub_person1_c._getOb(self.id1)
    sub_sub_person2 = sub_person1_c._getOb(self.id2)
740
    # remove ('','portal...','person_server')
741
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
742
    self.failUnless(len_path==3)
743 744 745 746 747 748 749 750 751
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
    self.failUnless(sub_sub_person1.getDescription()==self.description1)
    self.failUnless(sub_sub_person1.getFirstName()==self.first_name1)
    self.failUnless(sub_sub_person1.getLastName()==self.last_name1)
    self.failUnless(sub_sub_person2.getDescription()==self.description2)
    self.failUnless(sub_sub_person2.getFirstName()==self.first_name2)
    self.failUnless(sub_sub_person2.getLastName()==self.last_name2)

752
  def test_18_UpdateSubObject(self, quiet=0, run=run_all_test):
753
    """
754
      In this test, we start with a tree of object already
755
    synchronized, then we update a subobject, and we will see
756 757 758
    if it is updated correctly.
      To make this test a bit more harder, we will update on both
    the client and the server by the same time
759 760
    """
    if not run: return
761
    self.test_17_AddSubObject(quiet=1,run=1)
762 763
    if not quiet:
      ZopeTestCase._print('\nTest Update Sub Object ')
764
      LOG('Testing... ',0,'test_18_UpdateSubObject')
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
    sub_sub_person_c = sub_person1_c._getOb(self.id2)
    person_server = self.getPersonServer()
    sub_sub_person_s = person_server._getOb(self.id1)._getOb(self.id1)._getOb(self.id2)
    kw = {'first_name':self.first_name3}
    sub_sub_person_c.edit(**kw)
    kw = {'description':self.description3}
    sub_sub_person_s.edit(**kw)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(sub_sub_person_c.getDescription()==self.description3)
    self.failUnless(sub_sub_person_c.getFirstName()==self.first_name3)
    self.failUnless(sub_sub_person_s.getDescription()==self.description3)
    self.failUnless(sub_sub_person_s.getFirstName()==self.first_name3)
781

782
  def test_19_DeleteObject(self, quiet=0, run=run_all_test):
783 784 785 786 787 788
    """
      We will do a first synchronization, then delete an object on both
    sides, and we will see if nothing is left on the server and also
    on the two clients
    """
    if not run: return
789
    self.test_08_FirstSynchronization(quiet=1,run=1)
790 791
    if not quiet:
      ZopeTestCase._print('\nTest Delete Object ')
792
      LOG('Testing... ',0,'test_19_DeleteObject')
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807
    person_server = self.getPersonServer()
    person_server.manage_delObjects(self.id1)
    person_client1 = self.getPersonClient1()
    person_client1.manage_delObjects(self.id2)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    subscription2 = portal_sync.getSubscription(self.sub_id2)
    self.failUnless(len(publication.getObjectList())==0)
    self.failUnless(len(subscription1.getObjectList())==0)
    self.failUnless(len(subscription2.getObjectList())==0)

808
  def test_20_DeleteSubObject(self, quiet=0, run=run_all_test):
809 810 811 812 813 814 815 816 817 818 819 820
    """
      We will do a first synchronization, then delete a sub-object on both
    sides, and we will see if nothing is left on the server and also
    on the two clients
    - before :         after :
      - id1             - id1 
        - id1             - id1
          - id2         - id2
          - id1
      - id2
    """
    if not run: return
821
    self.test_17_AddSubObject(quiet=1,run=1)
822 823
    if not quiet:
      ZopeTestCase._print('\nTest Delete Sub Object ')
824
      LOG('Testing... ',0,'test_20_DeleteSubObject')
825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    sub_object_s.manage_delObjects(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    sub_object_c1.manage_delObjects(self.id2)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    len_s = len(sub_object_s.objectValues())
    len_c1 = len(sub_object_c1.objectValues())
    len_c2 = len(sub_object_c2.objectValues())
    self.failUnless(len_s==len_c1==len_c2==0)

841
  def test_21_GetConflictListOnSubObject(self, quiet=0, run=run_all_test):
842 843 844 845 846
    """
    We will change several attributes on a sub object on both the server
    and a client, then we will see if we have correctly the conflict list
    """
    if not run: return
847
    self.test_17_AddSubObject(quiet=1,run=1)
848 849
    if not quiet:
      ZopeTestCase._print('\nTest Get Conflict List On Sub Object ')
850
      LOG('Testing... ',0,'test_21_GetConflictListOnSubObject')
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873
    person_server = self.getPersonServer()
    object_s = person_server._getOb(self.id1)
    sub_object_s = object_s._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    # Change values so that we will get conflicts
    kw = {'language':self.lang2,'description':self.description2}
    sub_object_s.edit(**kw)
    kw = {'language':self.lang3,'description':self.description3}
    sub_object_c1.edit(**kw)
    self.synchronize(self.sub_id1)
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==2)
    conflict_list = portal_sync.getConflictList(sub_object_c1)
    self.failUnless(len(conflict_list)==0)
    conflict_list = portal_sync.getConflictList(object_s)
    self.failUnless(len(conflict_list)==0)
    conflict_list = portal_sync.getConflictList(sub_object_s)
    self.failUnless(len(conflict_list)==2)

874
  def test_22_ApplyPublisherDocumentOnSubObject(self, quiet=0, run=run_all_test):
875 876 877 878 879
    """
    there's several conflict on a sub object, we will see if we can
    correctly have the publisher version of this document
    """
    if not run: return
880
    self.test_21_GetConflictListOnSubObject(quiet=1,run=1)
881 882
    if not quiet:
      ZopeTestCase._print('\nTest Apply Publisher Document On Sub Object ')
883
      LOG('Testing... ',0,'test_22_ApplyPublisherDocumentOnSubObject')
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    conflict.applyPublisherDocument()
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(sub_object_s.getDescription()==self.description2)
    self.failUnless(sub_object_s.getLanguage()==self.lang2)
    self.failUnless(sub_object_c1.getDescription()==self.description2)
    self.failUnless(sub_object_c1.getLanguage()==self.lang2)
    self.failUnless(sub_object_c2.getDescription()==self.description2)
    self.failUnless(sub_object_c2.getLanguage()==self.lang2)

904
  def test_23_ApplySubscriberDocumentOnSubObject(self, quiet=0, run=run_all_test):
905 906 907 908 909
    """
    there's several conflict on a sub object, we will see if we can
    correctly have the subscriber version of this document
    """
    if not run: return
910
    self.test_21_GetConflictListOnSubObject(quiet=1,run=1)
911 912
    if not quiet:
      ZopeTestCase._print('\nTest Apply Subscriber Document On Sub Object ')
913
      LOG('Testing... ',0,'test_23_ApplySubscriberDocumentOnSubObject')
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    conflict.applySubscriberDocument()
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(sub_object_s.getDescription()==self.description3)
    self.failUnless(sub_object_s.getLanguage()==self.lang3)
    self.failUnless(sub_object_c1.getDescription()==self.description3)
    self.failUnless(sub_object_c1.getLanguage()==self.lang3)
    self.failUnless(sub_object_c2.getDescription()==self.description3)
    self.failUnless(sub_object_c2.getLanguage()==self.lang3)

934
  def test_24_SynchronizeWithStrangeGid(self, quiet=0, run=run_all_test):
935 936 937 938 939 940 941 942
    """
    By default, the synchronization process use the id in order to
    recognize objects (because by default, getGid==getId. Here, we will see 
    if it also works with a somewhat strange getGid
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Synchronize With Strange Gid ')
943
      LOG('Testing... ',0,'test_24_SynchronizeWithStrangeGid')
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
    self.login()
    self.setupPublicationAndSubscriptionAndGid(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    # This will test adding object
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    publication = portal_sync.getPublication(self.pub_id)
    self.failUnless(len(publication.getObjectList())==nb_person)
    gid = self.first_name1 +  ' ' + self.last_name1 # ie the title 'Sebastien Robin'
    person_c1 = subscription1.getObjectFromGid(gid)
    id_c1 = person_c1.getId()
    self.failUnless(id_c1 in ('1','2')) # id given by the default generateNewId
    person_s = publication.getObjectFromGid(gid)
    id_s = person_s.getId()
    self.failUnless(id_s==self.id1)
    # This will test updating object
    person_s.setDescription(self.description3)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person_s.getDescription()==self.description3)
    self.failUnless(person_c1.getDescription()==self.description3)
    # This will test deleting object
    person_server = self.getPersonServer()
    person_client1 = self.getPersonClient1()
    person_server.manage_delObjects(self.id2)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(len(subscription1.getObjectList())==(nb_person-1))
    self.failUnless(len(publication.getObjectList())==(nb_person-1))
    person_s = publication.getObjectFromGid(gid)
    person_c1 = subscription1.getObjectFromGid(gid)
    self.failUnless(person_s.getDescription()==self.description3)
    self.failUnless(person_c1.getDescription()==self.description3)

981
  def test_25_MultiNodeConflict(self, quiet=0, run=run_all_test):
982 983 984 985 986
    """
    We will create conflicts with 3 differents nodes, and we will
    solve it by taking one full version of documents.
    """
    if not run: return
987
    self.test_08_FirstSynchronization(quiet=1,run=1)
988 989
    if not quiet:
      ZopeTestCase._print('\nTest Multi Node Conflict ')
990
      LOG('Testing... ',0,'test_25_MultiNodeConflict')
991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'language':self.lang2,'description':self.description2,
          'format':self.format2}
    person1_s.edit(**kw)
    person_client1 = self.getPersonClient1()
    person1_c1 = person_client1._getOb(self.id1)
    kw = {'language':self.lang3,'description':self.description3,
          'format':self.format3}
    person1_c1.edit(**kw)
    person_client2 = self.getPersonClient2()
    person1_c2 = person_client2._getOb(self.id1)
    kw = {'language':self.lang4,'description':self.description4,
          'format':self.format4}
    person1_c2.edit(**kw)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==6)
1011 1012 1013
    # check if we have the state conflict on all clients
    self.checkSynchronizationStateIsConflict()

1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
    # we will take :
    # description on person_server
    # language on person_client1
    # format on person_client2
    for conflict in conflict_list:
      subscriber = conflict.getSubscriber()
      property = conflict.getPropertyId()
      resolve = 0
      if property == 'language':
        if subscriber.getSubscriptionUrl()==self.subscription_url1:
          resolve = 1
          conflict.applySubscriberValue()
      if property == 'format':
        if subscriber.getSubscriptionUrl()==self.subscription_url2:
          resolve = 1
          conflict.applySubscriberValue()
      if not resolve:
        conflict.applyPublisherValue()
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getDescription()==self.description2)
    self.failUnless(person1_c1.getDescription()==self.description2)
    self.failUnless(person1_c2.getDescription()==self.description2)
    self.failUnless(person1_s.getLanguage()==self.lang3)
    self.failUnless(person1_c1.getLanguage()==self.lang3)
    self.failUnless(person1_c2.getLanguage()==self.lang3)
    self.failUnless(person1_s.getFormat()==self.format4)
    self.failUnless(person1_c1.getFormat()==self.format4)
    self.failUnless(person1_c2.getFormat()==self.format4)
Sebastien Robin's avatar
Sebastien Robin committed
1044
        
1045

1046
  def test_26_SynchronizeWorkflowHistory(self, quiet=0, run=run_all_test):
1047 1048 1049 1050 1051 1052
    """
    We will do a synchronization, then we will edit two times
    the object on the server, then two times the object on the
    client, and see if the global history as 4 more actions.
    """
    if not run: return
1053
    self.test_08_FirstSynchronization(quiet=1,run=1)
1054 1055
    if not quiet:
      ZopeTestCase._print('\nTest Synchronize WorkflowHistory ')
1056
      LOG('Testing... ',0,'test_26_SynchronizeWorkflowHistory')
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    kw1 = {'description':self.description1}
    kw2 = {'description':self.description2}
    len_wf = len(person1_s.workflow_history[self.workflow_id])
    person1_s.edit(**kw2)
    person1_c.edit(**kw2)
    person1_s.edit(**kw1)
    person1_c.edit(**kw1)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(len(person1_s.workflow_history[self.workflow_id])==len_wf+4)
    self.failUnless(len(person1_c.workflow_history[self.workflow_id])==len_wf+4)

1073
  def test_27_UpdateLocalRole(self, quiet=0, run=run_all_test):
1074 1075 1076 1077 1078
    """
    We will do a first synchronization, then modify, add and delete
    an user role and see if it is correctly synchronized
    """
    if not run: return
1079
    self.test_08_FirstSynchronization(quiet=1,run=1)
1080 1081
    if not quiet:
      ZopeTestCase._print('\nTest Update Local Role ')
1082
      LOG('Testing... ',0,'test_27_UpdateLocalRole')
1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105
    # First, Create a new user
    uf = self.getPortal().acl_users
    uf._doAddUser('jp', '', ['Manager'], [])
    user = uf.getUserById('jp').__of__(uf)
    # then update create and delete roles
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)
    person1_s.manage_setLocalRoles('seb',['Manager','Owner'])
    person2_s.manage_setLocalRoles('jp',['Manager','Owner'])
    person2_s.manage_delLocalRoles(['seb'])
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    role_1_s = person1_s.get_local_roles()
    role_2_s = person2_s.get_local_roles()
    role_1_c = person1_c.get_local_roles()
    role_2_c = person2_c.get_local_roles()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)

1106
  def test_28_PartialData(self, quiet=0, run=run_all_test):
1107 1108 1109 1110 1111 1112
    """
    We will do a first synchronization, then we will do a change, then
    we will modify the SyncCode max_line value so it
    it will generate many messages
    """
    if not run: return
1113
    self.test_08_FirstSynchronization(quiet=1,run=1)
1114 1115
    if not quiet:
      ZopeTestCase._print('\nTest Partial Data ')
1116
      LOG('Testing... ',0,'test_28_PartialData')
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
    previous_max_lines = SyncCode.MAX_LINES
    SyncCode.MAX_LINES = 10
    self.populatePersonServerWithSubObject(quiet=1,run=1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
    sub_sub_person1 = sub_person1_c._getOb(self.id1)
    sub_sub_person2 = sub_person1_c._getOb(self.id2)
    # remove ('','portal...','person_server')
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
1133 1134 1135 1136 1137 1138
    self.assertEquals(sub_sub_person1.getDescription(),self.description1)
    self.assertEquals(sub_sub_person1.getFirstName(),self.first_name1)
    self.assertEquals(sub_sub_person1.getLastName(),self.last_name1)
    self.assertEquals(sub_sub_person2.getDescription(),self.description2)
    self.assertEquals(sub_sub_person2.getFirstName(),self.first_name2)
    self.assertEquals(sub_sub_person2.getLastName(),self.last_name2)
1139 1140
    SyncCode.MAX_LINES = previous_max_lines

1141
  def test_29_BrokenMessage(self, quiet=0, run=run_all_test):
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
    """
    With http synchronization, when a message is not well
    received, then we send message again, we want to
    be sure that is such case we don't do stupid things
    
    If we want to make this test more intersting, it is
    better to split messages
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Broken Message ')
1153
      LOG('Testing... ',0,'test_29_BrokenMessage')
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
    previous_max_lines = SyncCode.MAX_LINES
    SyncCode.MAX_LINES = 10
    self.setupPublicationAndSubscription(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    # Synchronize the first client
    nb_message1 = self.synchronizeWithBrokenMessage(self.sub_id1)
    #self.failUnless(nb_message1==self.nb_message_first_synchronization)
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
    self.failUnless(person1_s.getId()==self.id1)
    self.failUnless(person1_s.getFirstName()==self.first_name1)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_c.getId()==self.id1)
    self.failUnless(person1_c.getFirstName()==self.first_name1)
    self.failUnless(person1_c.getLastName()==self.last_name1)
    SyncCode.MAX_LINES = previous_max_lines

1177
  def test_30_GetSynchronizationType(self, quiet=0, run=run_all_test):
1178 1179 1180 1181 1182
    # We will try to update some simple data, first
    # we change on the server side, the on the client side
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Get Synchronization Type ')
1183 1184
      LOG('Testing... ',0,'test_30_GetSynchronizationType')
    self.test_08_FirstSynchronization(quiet=1,run=1)
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
    # First we do only modification on server
    # Check for each subsription that the synchronization type
    # is TWO WAY
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_s.edit(**kw)
    self.synchronize(self.sub_id1)
    # Then we do only modification on a client
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
    # Then we do only modification on both the client and the server
    # and of course, on the same object
    kw = {'first_name':self.first_name3}
    person1_s.edit(**kw)
    kw = {'description':self.description3}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)

1214
  def test_31_UpdateLocalPermission(self, quiet=0, run=run_all_test):
1215 1216 1217 1218 1219
    """
    We will do a first synchronization, then modify, add and delete
    an user role and see if it is correctly synchronized
    """
    if not run: return
1220
    self.test_08_FirstSynchronization(quiet=1,run=1)
1221 1222
    if not quiet:
      ZopeTestCase._print('\nTest Update Local Permission ')
1223
      LOG('Testing... ',0,'test_31_UpdateLocalPermission')
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
    # then create roles
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)
    person1_s.manage_setLocalPermissions('View',['Manager','Owner'])
    person2_s.manage_setLocalPermissions('View',['Manager','Owner'])
    person2_s.manage_setLocalPermissions('View management screens',['Owner',])
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    role_1_s = person1_s.get_local_permissions()
    role_2_s = person2_s.get_local_permissions()
    role_1_c = person1_c.get_local_permissions()
    role_2_c = person2_c.get_local_permissions()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)
    person1_s.manage_setLocalPermissions('View',['Owner'])
    person2_s.manage_setLocalPermissions('View',None)
    person2_s.manage_setLocalPermissions('View management screens',())
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    role_1_s = person1_s.get_local_permissions()
    role_2_s = person2_s.get_local_permissions()
    role_1_c = person1_c.get_local_permissions()
    role_2_c = person2_c.get_local_permissions()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)

1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
  def test_32_AddOneWaySubscription(self, quiet=0, run=1):
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Add One Way Subscription ')
      LOG('Testing... ',0,'test_32_AddOneWaySubscription')
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
    portal_sync.manage_addSubscription(self.sub_id1,self.publication_url,
                          self.subscription_url1,'/%s/person_client1' % portal_id,'objectValues',
                          '','ERP5Conduit','')
    sub = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(sub is not None)

  def test_33_OneWaySync(self, quiet=0, run=1):
    """
    We will test if we can synchronize only from to server to the client.
    We want to make sure in this case that all modifications on the client
    will not be taken into account.
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest One Way Sync ')
      LOG('Testing... ',0,'test_33_OneWaySync')
    self.test_02_AddPublication(quiet=1,run=1)
    self.test_32_AddOneWaySubscription(quiet=1,run=1)

    nb_person = self.populatePersonServer(quiet=1,run=1)
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC)
    # First do the sync from the server to the client
    nb_message1 = self.synchronize(self.sub_id1)
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
    self.assertEquals(nb_message1,self.nb_message_first_synchronization)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.assertEquals(len(subscription1.getObjectList()),nb_person)
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
    self.failUnless(person1_s.getId()==self.id1)
    self.failUnless(person1_s.getFirstName()==self.first_name1)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_c.getId()==self.id1)
    self.failUnless(person1_c.getFirstName()==self.first_name1)
    self.failUnless(person1_c.getLastName()==self.last_name1)
    # Then we change things on both sides and we look if there
    # is synchronization from only one way
    person1_c.setFirstName(self.first_name2)
    person1_s.setLastName(self.last_name2)
    nb_message1 = self.synchronize(self.sub_id1)
    self.assertEquals(person1_c.getLastName(),self.last_name2)
    self.assertEquals(person1_s.getFirstName(),self.first_name1)


1311

Sebastien Robin's avatar
Sebastien Robin committed
1312 1313 1314 1315 1316 1317 1318 1319 1320
if __name__ == '__main__':
    framework()
else:
    import unittest
    def test_suite():
        suite = unittest.TestSuite()
        suite.addTest(unittest.makeSuite(TestERP5SyncML))
        return suite