testERP5SyncML.py 78.2 KB
Newer Older
1
# -*- coding: utf-8 -*-
Sebastien Robin's avatar
Sebastien Robin committed
2
##############################################################################
3
#
Sebastien Robin's avatar
Sebastien Robin committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
# Copyright (c) 2004 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

30
import unittest
Sebastien Robin's avatar
Sebastien Robin committed
31
from Testing import ZopeTestCase
32
from Products.ERP5Type.tests.runUnitTest import tests_home
Sebastien Robin's avatar
Sebastien Robin committed
33
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
34
from AccessControl.SecurityManagement import newSecurityManager
Sebastien Robin's avatar
Sebastien Robin committed
35
from Products.ERP5SyncML.Conduit.ERP5Conduit import ERP5Conduit
36
from Products.ERP5SyncML.SyncCode import SyncCode
Sebastien Robin's avatar
Sebastien Robin committed
37
from zLOG import LOG
38
from base64 import b64encode, b64decode, b16encode, b16decode
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
39 40 41
import transaction
from ERP5Diff import ERP5Diff
from lxml import etree
42

43

44
class TestERP5SyncMLMixin:
Sebastien Robin's avatar
Sebastien Robin committed
45 46

  # Different variables used for this test
47
  workflow_id = 'edit_workflow'
Sebastien Robin's avatar
Sebastien Robin committed
48 49
  first_name1 = 'Sebastien'
  last_name1 = 'Robin'
Sebastien Robin's avatar
Sebastien Robin committed
50 51
  # At the beginning, I was using iso-8859-15 strings, but actually
  # erp5 is using utf-8 everywhere
52
  #description1 = 'description1 --- $sdfrç_sdfsçdf_oisfsopf'
Sebastien Robin's avatar
Sebastien Robin committed
53
  description1 = 'description1 --- $sdfr\xc3\xa7_sdfs\xc3\xa7df_oisfsopf'
54 55 56 57
  lang1 = 'fr'
  format2 = 'html'
  format3 = 'xml'
  format4 = 'txt'
Sebastien Robin's avatar
Sebastien Robin committed
58 59
  first_name2 = 'Jean-Paul'
  last_name2 = 'Smets'
60
  #description2 = 'description2éà@  $*< <<<  ----- >>>></title>&oekd'
Sebastien Robin's avatar
Sebastien Robin committed
61
  description2 = 'description2\xc3\xa9\xc3\xa0@  $*< <<<  ----- >>>></title>&oekd'
62
  lang2 = 'en'
Sebastien Robin's avatar
Sebastien Robin committed
63 64
  first_name3 = 'Yoshinori'
  last_name3 = 'Okuji'
65
  #description3 = 'description3 çsdf__sdfççç_df___&&é]]]°°°°°°'
Sebastien Robin's avatar
Sebastien Robin committed
66 67
  description3 = 'description3 \xc3\xa7sdf__sdf\xc3\xa7\xc3\xa7\xc3\xa7_df___&&\xc3\xa9]]]\xc2\xb0\xc2\xb0\xc2\xb0\xc2\xb0\xc2\xb0\xc2\xb0'
  #description4 = 'description4 sdflkmooo^^^^]]]]]{{{{{{{'
68 69 70
  description4 = 'description4 sdflkmooo^^^^]]]]]{{{{{{{'
  lang3 = 'jp'
  lang4 = 'ca'
Sebastien Robin's avatar
Sebastien Robin committed
71 72 73 74 75 76 77 78 79
  xml_mapping = 'asXML'
  id1 = '170'
  id2 = '171'
  pub_id = 'Publication'
  sub_id1 = 'Subscription1'
  sub_id2 = 'Subscription2'
  nb_subscription = 2
  nb_publication = 1
  nb_synchronization = 3
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
80 81
  nb_message_first_synchronization = 10
  nb_message_first_sync_max_lines = 10
82 83 84 85 86 87 88 89 90
  _subscription_url1 = tests_home + '/sync_client1'
  _subscription_url2 = tests_home + '/sync_client2'
  _publication_url = tests_home + '/sync_server'
  # XXX Why the prefix is not 'file://' ? This is inconsistent with urlopen:
  #     urlopen('file://tmp/foo') -> ERROR
  #     urlopen('file:///tmp/foo') -> OK
  subscription_url1 = 'file:/' + _subscription_url1
  subscription_url2 = 'file:/' + _subscription_url2
  publication_url = 'file:/' + _publication_url
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
91
  activity_enabled = False
92 93 94
  #publication_url = 'server@localhost'
  #subscription_url1 = 'client1@localhost'
  #subscription_url2 = 'client2@localhost'
Sebastien Robin's avatar
Sebastien Robin committed
95

Sebastien Robin's avatar
Sebastien Robin committed
96

97 98 99 100 101 102 103 104 105 106 107
  def afterSetUp(self):
    """Setup."""
    self.login()
    # This test creates Person inside Person, so we modifiy type information to
    # allow anything inside Person (we'll cleanup on teardown)
    self.getTypesTool().getTypeInfo('Person').filter_content_types = 0

  def beforeTearDown(self):
    """Clean up."""
    # type informations
    self.getTypesTool().getTypeInfo('Person').filter_content_types = 1
Sebastien Robin's avatar
Sebastien Robin committed
108

Sebastien Robin's avatar
Sebastien Robin committed
109 110 111 112
  def getBusinessTemplateList(self):
    """
      Return the list of business templates.

113
      the business template sync_crm give 3 folders:
114
      /person_server 
Sebastien Robin's avatar
Sebastien Robin committed
115 116 117
      /person_client1 : empty
      /person_client2 : empty
    """
Sebastien Robin's avatar
Sebastien Robin committed
118
    return ('erp5_base',)
Sebastien Robin's avatar
Sebastien Robin committed
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134

  def getSynchronizationTool(self):
    return getattr(self.getPortal(), 'portal_synchronizations', None)

  def getPersonClient1(self):
    return getattr(self.getPortal(), 'person_client1', None)

  def getPersonServer(self):
    return getattr(self.getPortal(), 'person_server', None)

  def getPersonClient2(self):
    return getattr(self.getPortal(), 'person_client2', None)

  def getPortalId(self):
    return self.getPortal().getId()

135 136 137 138 139 140 141
  def login(self, quiet=0):
    uf = self.getPortal().acl_users
    uf._doAddUser('fab', 'myPassword', ['Manager'], [])
    uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], [])
    uf._doAddUser('syncml', '', ['Manager'], [])
    user = uf.getUserById('fab').__of__(uf)
    newSecurityManager(None, user)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
142 143
  
  def initPersonModule(self, quiet=0, run=0):
144 145
    if not run: return
    if not quiet:
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
146 147
      ZopeTestCase._print('\nTest Init Person Module')
      LOG('Testing... ',0,'initPersonModule')
148 149 150 151 152 153 154 155 156 157 158 159 160 161
    self.login()
    portal = self.getPortal()
    if not hasattr(portal,'person_server'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_server')
    if not hasattr(portal,'person_client1'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_client1')
    if not hasattr(portal,'person_client2'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_client2')
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
162 163 164 165 166 167 168 169 170

  def populatePersonServer(self, quiet=0, run=0):
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Populate Person Server ')
      LOG('Testing... ',0,'populatePersonServer')
    self.login()
    portal = self.getPortal()
    self.initPersonModule(quiet=1, run=1)
171
    person_server = self.getPersonServer()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
172
    person1 = person_server.newContent(id=self.id1, portal_type='Person')
173 174 175
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
          'description':self.description1}
    person1.edit(**kw)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
176
    person2 = person_server.newContent(id=self.id2, portal_type='Person')
177 178 179 180
    kw = {'first_name':self.first_name2,'last_name':self.last_name2,
          'description':self.description2}
    person2.edit(**kw)
    nb_person = len(person_server.objectValues())
181
    self.assertEqual(nb_person, 2)
182 183
    return nb_person

Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  def populatePersonClient1(self, quiet=0, run=0):
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Populate Person Client 1 ')
      LOG('Testing... ',0,'populatePersonClient1')
    self.login()
    portal = self.getPortal()
    self.initPersonModule(quiet=1, run=1)
    person_client = self.getPersonClient1()
    for id in range(1, 60):
      person = person_client.newContent(id=id, portal_type='Person')
      kw = {'first_name':self.first_name1,'last_name':self.last_name1,
            'description':self.description1}
      person.edit(**kw)
    nb_person = len(person_client.objectValues())
    self.assertEqual(nb_person,59)
    return nb_person

202 203 204 205 206 207 208 209
  def synchronize(self, id, run=1):
    """
    This just define how we synchronize, we have
    to define it here because it is specific to the unit testing
    """
    portal_sync = self.getSynchronizationTool()
    subscription = portal_sync.getSubscription(id)
    publication = None
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
210 211 212 213
    for pub in portal_sync.getPublicationList():
      if pub.getPublicationUrl()==subscription.getPublicationUrl():
        publication = pub
    self.assertTrue(publication is not None)
214
    # reset files, because we do sync by files
215
    file = open(self._subscription_url1, 'w')
216 217
    file.write('')
    file.close()
218
    file = open(self._subscription_url2, 'w')
219 220
    file.write('')
    file.close()
221
    file = open(self._publication_url, 'w')
222 223 224
    file.write('')
    file.close()
    nb_message = 1
225
    result = portal_sync.SubSync(subscription.getPath())
226
    while result['has_response']==1:
227
      portal_sync.PubSync(publication.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
228 229 230
      if self.activity_enabled:
        transaction.commit()
        self.tic()
231
      result = portal_sync.SubSync(subscription.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
232 233 234
      if self.activity_enabled:
        transaction.commit()
        self.tic()
235 236 237 238 239 240 241 242 243 244 245 246
      nb_message += 1 + result['has_response']
    return nb_message

  def synchronizeWithBrokenMessage(self, id, run=1):
    """
    This just define how we synchronize, we have
    to define it here because it is specific to the unit testing
    """
    portal_sync = self.getSynchronizationTool()
    #portal_sync.email = None # XXX To be removed
    subscription = portal_sync.getSubscription(id)
    publication = None
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
247 248 249 250
    for pub in portal_sync.getPublicationList():
      if pub.getPublicationUrl()==subscription.getPublicationUrl():
        publication = pub
    self.assertTrue(publication is not None)
251
    # reset files, because we do sync by files
252
    file = open(self._subscription_url1, 'w')
253 254
    file.write('')
    file.close()
255
    file = open(self._subscription_url2, 'w')
256 257
    file.write('')
    file.close()
258
    file = open(self._publication_url, 'w')
259 260 261
    file.write('')
    file.close()
    nb_message = 1
262
    result = portal_sync.SubSync(subscription.getPath())
263 264 265
    while result['has_response']==1:
      # We do thing three times, so that we will test
      # if we manage well duplicate messages
266
      portal_sync.PubSync(publication.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
267 268 269
      if self.activity_enabled:
        transaction.commit()
        self.tic()
270
      portal_sync.PubSync(publication.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
271 272 273
      if self.activity_enabled:
        transaction.commit()
        self.tic()
274
      portal_sync.PubSync(publication.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
275 276 277
      if self.activity_enabled:
        transaction.commit()
        self.tic()
278
      result = portal_sync.SubSync(subscription.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
279 280 281
      if self.activity_enabled:
        transaction.commit()
        self.tic()
282
      result = portal_sync.SubSync(subscription.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
283 284 285
      if self.activity_enabled:
        transaction.commit()
        self.tic()
286
      result = portal_sync.SubSync(subscription.getPath())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
287 288 289
      if self.activity_enabled:
        transaction.commit()
        self.tic()
290 291 292 293 294 295 296 297 298
      nb_message += 1 + result['has_response']
    return nb_message

  def checkSynchronizationStateIsSynchronized(self, quiet=0, run=1):
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    for person in person_server.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
299
        self.assertEqual(state[1], state[0].SYNCHRONIZED)
300 301 302 303
    person_client1 = self.getPersonClient1()
    for person in person_client1.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
304
        self.assertEqual(state[1], state[0].SYNCHRONIZED)
305 306 307 308
    person_client2 = self.getPersonClient2()
    for person in person_client2.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
309
        self.assertEqual(state[1], state[0].SYNCHRONIZED)
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
    # Check for each signature that the tempXML is None
    for sub in portal_sync.getSubscriptionList():
      for m in sub.getSignatureList():
        self.assertEquals(m.getTempXML(),None)
        self.assertEquals(m.getPartialXML(),None)
    for pub in portal_sync.getPublicationList():
      for sub in pub.getSubscriberList():
        for m in sub.getSignatureList():
          self.assertEquals(m.getPartialXML(),None)

  def verifyFirstNameAndLastNameAreNotSynchronized(self, first_name, 
      last_name, person_server, person_client):
    """
      verify that the first and last name are NOT synchronized
    """
325 326 327 328
    self.assertNotEqual(person_server.getFirstName(), first_name)
    self.assertNotEqual(person_server.getLastName(), last_name)
    self.assertEqual(person_client.getFirstName(), first_name)
    self.assertEqual(person_client.getLastName(), last_name)
329 330 331 332 333 334

  def checkFirstSynchronization(self, id=None, nb_person=0):

    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    subscription2 = portal_sync.getSubscription(self.sub_id2)
335
    self.assertEqual(len(subscription1.getObjectList()), nb_person)
336 337 338
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
339 340 341
    self.assertEqual(person1_s.getId(), self.id1)
    self.assertEqual(person1_s.getFirstName(), self.first_name1)
    self.assertEqual(person1_s.getLastName(), self.last_name1)
342 343
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(id)
344 345 346 347
    self.assertEqual(person1_c.getId(), id)
    self.assertEqual(person1_c.getFirstName(), self.first_name1)
    self.assertEqual(person1_c.getLastName(), self.last_name1)
    self.assertEqual(len(subscription2.getObjectList()), nb_person)
348 349
    person_client2 = self.getPersonClient2()
    person2_c = person_client2._getOb(id) 
350 351 352
    self.assertEqual(person2_c.getId(), id)
    self.assertEqual(person2_c.getFirstName(), self.first_name1)
    self.assertEqual(person2_c.getLastName(), self.last_name1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
353 354 355 356 357 358 359 360 361 362
  
  def resetSignaturePublicationAndSubscription(self):
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    subscription2 = portal_sync.getSubscription(self.sub_id2)
    publication.resetAllSubscribers()
    subscription1.resetAllSignatures()
    transaction.commit()
    self.tic()
363

Nicolas Delaby's avatar
Nicolas Delaby committed
364 365
  def assertXMLViewIsEqual(self, sub_id, object_pub=None, object_sub=None,
                                                                  force=False):
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
366 367 368 369 370 371 372 373 374 375
    """
      Check the equality between two xml objects with gid as id
    """
    portal_sync = self.getSynchronizationTool()
    subscription = portal_sync.getSubscription(sub_id)
    publication = portal_sync.getPublication(self.pub_id)
    gid_pub = publication.getGidFromObject(object_pub)
    gid_sub = publication.getGidFromObject(object_sub)
    self.assertEqual(gid_pub, gid_sub)
    conduit = ERP5Conduit()
Nicolas Delaby's avatar
Nicolas Delaby committed
376 377
    xml_pub = conduit.getXMLFromObjectWithGid(object=object_pub, gid=gid_pub,
                                       xml_mapping=publication.getXMLMapping())
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
378
    #if One Way From Server there is not xml_mapping for subscription
Nicolas Delaby's avatar
Nicolas Delaby committed
379 380
    xml_sub = conduit.getXMLFromObjectWithGid(object=object_sub, gid=gid_sub,
                                 xml_mapping=subscription.getXMLMapping(force))
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
381 382 383 384
    erp5diff = ERP5Diff()
    erp5diff.compare(xml_pub, xml_sub)
    result = erp5diff.outputString()
    result = etree.XML(result)
Nicolas Delaby's avatar
Nicolas Delaby committed
385 386 387 388 389 390 391 392 393 394 395 396 397 398
    identity = True
    for update in result:
      #XXX edit workflow is not replaced, so discard workflow checking
      select = update.get('select', '')
      discarded_list = ('edit_workflow',)
      if 'edit_workflow' in  select:
        continue
      else:
        identity = False
        break
    if not identity:
      self.fail('diff between pub:%s and sub:%s \n%s' % (object_pub.getPath(),
                                                         object_sub.getPath(),
                                   etree.tostring(result, pretty_print=True)))
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
399 400 401 402 403 404 405

  def deletePublicationAndSubscription(self):
    portal_sync = self.getSynchronizationTool()
    portal_sync.manage_deletePublication(title=self.pub_id)
    portal_sync.manage_deleteSubscription(title=self.sub_id1)
    if portal_sync.getSubscription(title=self.sub_id2):
      portal_sync.manage_deleteSubscription(title=self.sub_id2)
406 407 408 409 410 411 412 413 414

class TestERP5SyncML(TestERP5SyncMLMixin, ERP5TypeTestCase):
  
  run_all_test = True
  def getTitle(self):
    """
    """
    return "ERP5 SyncML"

Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
  def setupPublicationAndSubscription(self, quiet=0, run=run_all_test):
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    if person_server is not None:
      portal = self.getPortal()
      portal._delObject(id='person_server')
      portal._delObject(id='person_client1')
      portal._delObject(id='person_client2')
      self.deletePublicationAndSubscription()
    self.test_02_AddPublication(quiet=1,run=1)
    self.test_03_AddSubscription1(quiet=1,run=1)
    self.test_04_AddSubscription2(quiet=1,run=1)
      
  def setupPublicationAndSubscriptionAndGid(self, quiet=0, run=run_all_test):
    self.setupPublicationAndSubscription(quiet=1,run=1)
    portal_sync = self.getSynchronizationTool()
    sub1 = portal_sync.getSubscription(self.sub_id1)
    sub2 = portal_sync.getSubscription(self.sub_id2)
    pub = portal_sync.getPublication(self.pub_id)
    sub1.setConduit('ERP5ConduitTitleGid')
    sub2.setConduit('ERP5ConduitTitleGid')
    pub.setConduit('ERP5ConduitTitleGid')
    pub.setSynchronizationIdGenerator('_generateNextId')
    sub1.setSynchronizationIdGenerator('_generateNextId')
    sub2.setSynchronizationIdGenerator('_generateNextId')

  def checkSynchronizationStateIsConflict(self, quiet=0, run=1):
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    for person in person_server.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.assertEqual(state[1], state[0].CONFLICT)
    person_client1 = self.getPersonClient1()
    for person in person_client1.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.assertEqual(state[1], state[0].CONFLICT)
    person_client2 = self.getPersonClient2()
    for person in person_client2.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.assertEqual(state[1], state[0].CONFLICT)
    # make sure sub object are also in a conflict mode
    person = person_client1._getOb(self.id1)
    # use a temp_object to create a no persistent object in person
    sub_person =\
    person.newContent(id=self.id1, portal_type='Person', temp_object=1)
    state_list = portal_sync.getSynchronizationState(sub_person)
    for state in state_list:
      self.assertEqual(state[1], state[0].CONFLICT)
  
  def populatePersonServerWithSubObject(self, quiet=0, run=run_all_test):
    """
    Before this method, we need to call populatePersonServer
    Then it will give the following tree :
    - person_server :
      - id1
        - id1
          - id2
          - id1
      - id2
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Populate Person Server With Sub Object ')
      LOG('Testing... ',0,'populatePersonServerWithSubObject')
    person_server = self.getPersonServer()
    person1 = person_server._getOb(self.id1)
    sub_person1 = person1.newContent(id=self.id1, portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
        'description':self.description1}
    sub_person1.edit(**kw)
    sub_sub_person1 = sub_person1.newContent(id=self.id1, portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
        'description':self.description1, 'default_telephone_text':'0689778308'}
    sub_sub_person1.edit(**kw)
    sub_sub_person2 = sub_person1.newContent(id=self.id2, portal_type='Person')
    kw = {'first_name':self.first_name2,'last_name':self.last_name2,
          'description':self.description2}
    sub_sub_person2.edit(**kw)
    # remove ('','portal...','person_server')
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
    self.assertEqual(len_path, 3)
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
    self.assertEqual(len_path, 3)

  def addAuthenticationToPublication(self, publication_id, login, password, 
      auth_format, auth_type):
    """
      add authentication to the publication
    """
    portal_sync = self.getSynchronizationTool()
    pub = portal_sync.getPublication(publication_id)
    pub.setLogin(login)
    pub.setPassword(password)
    pub.setAuthenticationFormat(auth_format)
    pub.setAuthenticationType(auth_type)


  def addAuthenticationToSubscription(self, subscription_id, login, password, 
      auth_format, auth_type):
    """
      add authentication to the subscription
    """
    portal_sync = self.getSynchronizationTool()
    sub = portal_sync.getSubscription(subscription_id)
    sub.setAuthenticated(False)
    sub.setLogin(login)
    sub.setPassword(password)
    sub.setAuthenticationFormat(auth_format)
    sub.setAuthenticationType(auth_type)


532
  def test_01_HasEverything(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
533
    # Test if portal_synchronizations was created
534
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
535 536
    if not quiet:
      ZopeTestCase._print('\nTest Has Everything ')
537
      LOG('Testing... ',0,'test_01_HasEverything')
538
    self.assertNotEqual(self.getSynchronizationTool(), None)
539 540 541
    #self.failUnless(self.getPersonServer()!=None)
    #self.failUnless(self.getPersonClient1()!=None)
    #self.failUnless(self.getPersonClient2()!=None)
Sebastien Robin's avatar
Sebastien Robin committed
542

543
  def test_02_AddPublication(self, quiet=0, run=run_all_test):
544
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
545 546
    if not quiet:
      ZopeTestCase._print('\nTest Add a Publication ')
547
      LOG('Testing... ',0,'test_02_AddPublication')
Sebastien Robin's avatar
Sebastien Robin committed
548 549
    portal_id = self.getPortalName()
    portal_sync = self.getSynchronizationTool()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
550 551
    if portal_sync.getPublication(self.pub_id) is not None:
      portal_sync.manage_deletePublication(title=self.pub_id)
552 553 554 555 556 557 558 559
    portal_sync.manage_addPublication(title=self.pub_id,
        publication_url=self.publication_url, 
        destination_path='/%s/person_server' % portal_id, 
        source_uri='Person', 
        query='objectValues', 
        xml_mapping=self.xml_mapping, 
        conduit='ERP5Conduit',
        gpg_key='',
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
560
        activity_enabled=self.activity_enabled,
561 562
        authentication_format='b64',
        authentication_type='syncml:auth-basic')
Sebastien Robin's avatar
Sebastien Robin committed
563 564 565
    pub = portal_sync.getPublication(self.pub_id)
    self.failUnless(pub is not None)

566
  def test_03_AddSubscription1(self, quiet=0, run=run_all_test):
567
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
568 569
    if not quiet:
      ZopeTestCase._print('\nTest Add First Subscription ')
570
      LOG('Testing... ',0,'test_03_AddSubscription1')
Sebastien Robin's avatar
Sebastien Robin committed
571 572
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
573 574
    if portal_sync.getSubscription(self.sub_id1) is not None:
      portal_sync.manage_deleteSubscription(title=self.sub_id1)
575 576 577 578 579 580 581 582 583 584
    portal_sync.manage_addSubscription(title=self.sub_id1, 
        publication_url=self.publication_url,
        subscription_url=self.subscription_url1, 
        destination_path='/%s/person_client1' % portal_id,
        source_uri='Person', 
        target_uri='Person', 
        query='objectValues', 
        xml_mapping=self.xml_mapping, 
        conduit='ERP5Conduit', 
        gpg_key='',
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
585
        activity_enabled=self.activity_enabled,
586 587
        login='fab',
        password='myPassword')
Sebastien Robin's avatar
Sebastien Robin committed
588 589 590
    sub = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(sub is not None)

591
  def test_04_AddSubscription2(self, quiet=0, run=run_all_test):
592
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
593 594
    if not quiet:
      ZopeTestCase._print('\nTest Add Second Subscription ')
595
      LOG('Testing... ',0,'test_04_AddSubscription2')
Sebastien Robin's avatar
Sebastien Robin committed
596 597
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
598 599
    if portal_sync.getSubscription(self.sub_id2) is not None:
      portal_sync.manage_deleteSubscription(title=self.sub_id2)
600 601 602 603 604 605 606 607 608 609
    portal_sync.manage_addSubscription(title=self.sub_id2, 
        publication_url=self.publication_url,
        subscription_url=self.subscription_url2, 
        destination_path='/%s/person_client2' % portal_id,
        source_uri='Person', 
        target_uri='Person', 
        query='objectValues', 
        xml_mapping=self.xml_mapping, 
        conduit='ERP5Conduit', 
        gpg_key='',
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
610
        activity_enabled=self.activity_enabled,
611 612
        login='fab',
        password='myPassword')
Sebastien Robin's avatar
Sebastien Robin committed
613 614 615
    sub = portal_sync.getSubscription(self.sub_id2)
    self.failUnless(sub is not None)

616
  def test_05_GetSynchronizationList(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
617 618 619
    # This test the getSynchronizationList, ie,
    # We want to see if we retrieve both the subscription
    # and the publication
620
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
621 622
    if not quiet:
      ZopeTestCase._print('\nTest getSynchronizationList ')
623
      LOG('Testing... ',0,'test_05_GetSynchronizationList')
624
    self.setupPublicationAndSubscription(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
625 626
    portal_sync = self.getSynchronizationTool()
    synchronization_list = portal_sync.getSynchronizationList()
627
    self.assertEqual(len(synchronization_list), self.nb_synchronization)
Sebastien Robin's avatar
Sebastien Robin committed
628

629
  def test_06_GetObjectList(self, quiet=0, run=run_all_test):
630 631 632 633 634
    """
    This test the default getObjectList, ie, when the
    query is 'objectValues', and this also test if we enter
    a new method for the query
    """
635
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
636 637
    if not quiet:
      ZopeTestCase._print('\nTest getObjectList ')
638
      LOG('Testing... ',0,'test_06_GetObjectList')
Sebastien Robin's avatar
Sebastien Robin committed
639
    self.login()
640
    self.setupPublicationAndSubscription(quiet=1,run=1)
641
    nb_person = self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
642 643 644 645
    portal_sync = self.getSynchronizationTool()
    publication_list = portal_sync.getPublicationList()
    publication = publication_list[0]
    object_list = publication.getObjectList()
646
    self.assertEqual(len(object_list), nb_person)
Sebastien Robin's avatar
Sebastien Robin committed
647 648 649 650 651 652 653 654 655 656
    # now try to set a different method for query
    def query(object):
      object_list = object.objectValues()
      return_list = []
      for o in object_list:
        if o.getId()==self.id1:
          return_list.append(o)
      return return_list
    publication.setQuery(query)
    object_list = publication.getObjectList()
657
    self.assertEqual(len(object_list), 1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
658 659 660 661 662 663
    # Add the query path
    portal_id = self.getPortalName()
    publication.setDestinationPath('/%s/' % portal_id)
    publication.setQuery('person_server/objectValues')
    object_list = publication.getObjectList()
    self.assertEqual(len(object_list), nb_person)
Sebastien Robin's avatar
Sebastien Robin committed
664

665
  def test_07_ExportImport(self, quiet=0, run=run_all_test):
666 667 668 669
    """
    We will try to export a person with asXML
    And then try to add it to another folder with a conduit
    """
670
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
671 672
    if not quiet:
      ZopeTestCase._print('\nTest Export and Import ')
673
      LOG('Testing... ',0,'test_07_ExportImport')
Sebastien Robin's avatar
Sebastien Robin committed
674
    self.login()
675
    self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
676 677 678 679 680 681
    person_server = self.getPersonServer()
    person_client1 = self.getPersonClient1()
    person = person_server._getOb(self.id1)
    xml_output = person.asXML()
    conduit = ERP5Conduit()
    conduit.addNode(object=person_client1,xml=xml_output)
682
    self.assertEqual(len(person_client1.objectValues()), 1)
Sebastien Robin's avatar
Sebastien Robin committed
683
    new_object = person_client1._getOb(self.id1)
684 685
    self.assertEqual(new_object.getLastName(), self.last_name1)
    self.assertEqual(new_object.getFirstName(), self.first_name1)
686
    # XXX We should also looks at the workflow history
687
    self.assertEqual(len(new_object.workflow_history[self.workflow_id]), 2)
688 689 690
    s_local_role = person_server.get_local_roles()
    c_local_role = person_client1.get_local_roles()
    self.assertEqual(s_local_role,c_local_role)
Sebastien Robin's avatar
Sebastien Robin committed
691

692
  def test_08_FirstSynchronization(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
693 694
    # We will try to populate the folder person_client1
    # with the data form person_server
695
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
696 697
    if not quiet:
      ZopeTestCase._print('\nTest First Synchronization ')
698
      LOG('Testing... ',0,'test_08_FirstSynchronization')
Sebastien Robin's avatar
Sebastien Robin committed
699
    self.login()
700 701
    self.setupPublicationAndSubscription(quiet=1, run=1)
    nb_person = self.populatePersonServer(quiet=1, run=1)
702 703
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
704
      self.assertEquals(sub.getSynchronizationType(), SyncCode.SLOW_SYNC)
Sebastien Robin's avatar
Sebastien Robin committed
705 706
    # Synchronize the first client
    nb_message1 = self.synchronize(self.sub_id1)
707 708
    for sub in portal_sync.getSubscriptionList():
      if sub.getTitle() == self.sub_id1:
709
        self.assertEquals(sub.getSynchronizationType(), SyncCode.TWO_WAY)
710
      else:
711
        self.assertEquals(sub.getSynchronizationType(), SyncCode.SLOW_SYNC)
712
    self.assertEqual(nb_message1, self.nb_message_first_synchronization)
Sebastien Robin's avatar
Sebastien Robin committed
713 714
    # Synchronize the second client
    nb_message2 = self.synchronize(self.sub_id2)
715
    for sub in portal_sync.getSubscriptionList():
716
      self.assertEquals(sub.getSynchronizationType(), SyncCode.TWO_WAY)
717
    self.assertEqual(nb_message2, self.nb_message_first_synchronization)
718
    self.checkFirstSynchronization(id=self.id1, nb_person=nb_person)
Sebastien Robin's avatar
Sebastien Robin committed
719

720
  def test_09_FirstSynchronizationWithLongLines(self, quiet=0, run=run_all_test):
721 722 723 724 725
    # We will try to populate the folder person_client1
    # with the data form person_server
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest First Synchronization With Long Lines ')
726
      LOG('Testing... ',0,'test_09_FirstSynchronizationWithLongLines')
727 728 729 730 731 732 733 734 735 736
    self.login()
    self.setupPublicationAndSubscription(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    person_server = self.getPersonServer()
    long_line = 'a' * 10000 + ' --- '
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':long_line} 
    person1_s.edit(**kw)
    # Synchronize the first client
    nb_message1 = self.synchronize(self.sub_id1)
737
    self.assertEqual(nb_message1, self.nb_message_first_synchronization)
738 739
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
740 741 742 743
    self.assertEqual(len(subscription1.getObjectList()), nb_person)
    self.assertEqual(person1_s.getId(), self.id1)
    self.assertEqual(person1_s.getFirstName(), long_line)
    self.assertEqual(person1_s.getLastName(), self.last_name1)
744 745
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
746
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
747

748
  def test_10_GetObjectFromGid(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
749 750
    # We will try to get an object from a publication
    # just by givin the gid
751
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
752 753
    if not quiet:
      ZopeTestCase._print('\nTest getObjectFromGid ')
754
      LOG('Testing... ',0,'test_10_GetObjectFromGid')
Sebastien Robin's avatar
Sebastien Robin committed
755
    self.login()
756
    self.setupPublicationAndSubscription(quiet=1,run=1)
757
    self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
758 759 760
    # By default we can just give the id
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
761
    object = publication.getObjectFromId(self.id1)
Sebastien Robin's avatar
Sebastien Robin committed
762
    self.failUnless(object is not None)
763
    self.assertEqual(object.getId(), self.id1)
Sebastien Robin's avatar
Sebastien Robin committed
764

765
  def test_11_GetSynchronizationState(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
766 767
    # We will try to get the state of objects
    # that are just synchronized,
768
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
769 770
    if not quiet:
      ZopeTestCase._print('\nTest getSynchronizationState ')
771 772
      LOG('Testing... ',0,'test_11_GetSynchronizationState')
    self.test_08_FirstSynchronization(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
773 774 775 776
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    state_list_s = portal_sync.getSynchronizationState(person1_s)
777
    self.assertEqual(len(state_list_s), self.nb_subscription) # one state
Sebastien Robin's avatar
Sebastien Robin committed
778 779 780 781
                                                  # for each subscriber
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    state_list_c = portal_sync.getSynchronizationState(person1_c)
782
    self.assertEqual(len(state_list_c), 1) # one state
Sebastien Robin's avatar
Sebastien Robin committed
783 784 785
                                        # for each subscriber
    self.checkSynchronizationStateIsSynchronized()

786
  def test_12_UpdateSimpleData(self, quiet=0, run=run_all_test):
787
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
788 789
    if not quiet:
      ZopeTestCase._print('\nTest Update Simple Data ')
790 791
      LOG('Testing... ',0,'test_12_UpdateSimpleData')
    self.test_08_FirstSynchronization(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
792 793 794 795 796 797 798 799 800 801
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_s.edit(**kw)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
802 803
    self.assertEqual(person1_s.getFirstName(), self.first_name3)
    self.assertEqual(person1_s.getLastName(), self.last_name3)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
804
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
Sebastien Robin's avatar
Sebastien Robin committed
805 806 807
    # Then we do only modification on a client
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
808
    #person1_c.setModificationDate(DateTime()+1)
Sebastien Robin's avatar
Sebastien Robin committed
809 810
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
811
    person1_s = person_server._getOb(self.id1)
812 813
    self.assertEqual(person1_s.getFirstName(), self.first_name1)
    self.assertEqual(person1_s.getLastName(), self.last_name1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
814
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
Sebastien Robin's avatar
Sebastien Robin committed
815 816 817 818 819 820
    # Then we do only modification on both the client and the server
    # and of course, on the same object
    kw = {'first_name':self.first_name3}
    person1_s.edit(**kw)
    kw = {'description':self.description3}
    person1_c.edit(**kw)
821 822
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
823
    #person1_s = person_server._getOb(self.id1)
824 825
    self.assertEqual(person1_s.getFirstName(), self.first_name3)
    self.assertEqual(person1_s.getDescription(), self.description3)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
826
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
Sebastien Robin's avatar
Sebastien Robin committed
827

828
  def test_13_GetConflictList(self, quiet=0, run=run_all_test):
829
    # We will try to generate a conflict and then to get it
830
    # We will also make sure it contains what we want
831
    if not run: return
832 833
    if not quiet:
      ZopeTestCase._print('\nTest Get Conflict List ')
834 835
      LOG('Testing... ',0,'test_13_GetConflictList')
    self.test_08_FirstSynchronization(quiet=1,run=1)
836 837 838 839 840 841 842 843 844
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person1_s.setDescription(self.description2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person1_c.setDescription(self.description3)
    self.synchronize(self.sub_id1)
845
    conflict_list = portal_sync.getConflictList()
846
    self.assertEqual(len(conflict_list), 1)
847
    conflict = conflict_list[0]
848 849 850 851 852
    self.assertEqual(person1_c.getDescription(), self.description3)
    self.assertEqual(person1_s.getDescription(), self.description2)
    self.assertEqual(conflict.getPropertyId(), 'description')
    self.assertEqual(conflict.getPublisherValue(), self.description2)
    self.assertEqual(conflict.getSubscriberValue(), self.description3)
853
    subscriber = conflict.getSubscriber()
854
    self.assertEqual(subscriber.getSubscriptionUrl(), self.subscription_url1)
855

856
  def test_14_GetPublisherAndSubscriberDocument(self, quiet=0, run=run_all_test):
857 858 859 860 861
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Get Publisher And Subscriber Document ')
862 863
      LOG('Testing... ',0,'test_14_GetPublisherAndSubscriberDocument')
    self.test_13_GetConflictList(quiet=1,run=1)
864 865 866 867 868 869 870 871 872
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    publisher_document = conflict.getPublisherDocument()
873
    self.assertEqual(publisher_document.getDescription(), self.description2)
874
    subscriber_document = conflict.getSubscriberDocument()
875
    self.assertEqual(subscriber_document.getDescription(), self.description3)
876

877
  def test_15_ApplyPublisherValue(self, quiet=0, run=run_all_test):
878 879
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
880
    if not run: return
881
    self.test_13_GetConflictList(quiet=1,run=1)
882 883
    if not quiet:
      ZopeTestCase._print('\nTest Apply Publisher Value ')
884
      LOG('Testing... ',0,'test_15_ApplyPublisherValue')
885 886 887 888 889 890 891 892 893 894
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict.applyPublisherValue()
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
895
    self.assertEqual(person1_c.getDescription(), self.description2)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
896
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
897
    conflict_list = portal_sync.getConflictList()
898
    self.assertEqual(len(conflict_list), 0)
899

900
  def test_16_ApplySubscriberValue(self, quiet=0, run=run_all_test):
901 902
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
903
    if not run: return
904
    self.test_13_GetConflictList(quiet=1,run=1)
905 906
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
907 908
    if not quiet:
      ZopeTestCase._print('\nTest Apply Subscriber Value ')
909
      LOG('Testing... ',0,'test_16_ApplySubscriberValue')
910 911 912 913 914 915 916 917
    conflict = conflict_list[0]
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict.applySubscriberValue()
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
918
    self.assertEqual(person1_s.getDescription(), self.description3)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
919
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
920
    conflict_list = portal_sync.getConflictList()
921
    self.assertEqual(len(conflict_list), 0)
922

923
  def test_17_AddSubObject(self, quiet=0, run=run_all_test):
924 925 926 927 928 929
    """
    In this test, we synchronize, then add sub object on the
    server and then see if the next synchronization will also
    create sub-objects on the client
    """
    if not run: return
930
    self.test_08_FirstSynchronization(quiet=1,run=1)
931 932
    if not quiet:
      ZopeTestCase._print('\nTest Add Sub Object ')
933
      LOG('Testing... ',0,'test_17_AddSubObject')
934 935
    self.populatePersonServerWithSubObject(quiet=1,run=1)
    self.synchronize(self.sub_id1)
936
    self.synchronize(self.sub_id2)
937 938 939 940
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
941 942
    sub_sub_person1 = sub_person1_c._getOb(self.id1)
    sub_sub_person2 = sub_person1_c._getOb(self.id2)
943
    # remove ('','portal...','person_server')
944
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
945
    self.assertEquals(len_path, 3)
946
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
947 948 949 950 951 952 953 954
    self.assertEquals(len_path, 3)
    self.assertEquals(sub_sub_person1.getDescription(), self.description1)
    self.assertEquals(sub_sub_person1.getFirstName(), self.first_name1)
    self.assertEquals(sub_sub_person1.getLastName(), self.last_name1)
    self.assertEquals(sub_sub_person1.getDefaultTelephoneText(), '+(0)-0689778308')
    self.assertEquals(sub_sub_person2.getDescription(), self.description2)
    self.assertEquals(sub_sub_person2.getFirstName(), self.first_name2)
    self.assertEquals(sub_sub_person2.getLastName(), self.last_name2)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
955 956 957 958
    #check two side (client, server)
    person_server = self.getPersonServer()
    sub_sub_person_s = person_server._getOb(self.id1)._getOb(self.id1)._getOb(self.id1)
    self.assertXMLViewIsEqual(self.sub_id1, sub_sub_person_s, sub_sub_person1)
959

960
  def test_18_UpdateSubObject(self, quiet=0, run=run_all_test):
961
    """
962
      In this test, we start with a tree of object already
963
    synchronized, then we update a subobject, and we will see
964 965 966
    if it is updated correctly.
      To make this test a bit more harder, we will update on both
    the client and the server by the same time
967 968
    """
    if not run: return
969
    self.test_17_AddSubObject(quiet=1,run=1)
970 971
    if not quiet:
      ZopeTestCase._print('\nTest Update Sub Object ')
972
      LOG('Testing... ',0,'test_18_UpdateSubObject')
973 974 975 976 977 978 979 980 981 982 983 984
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
    sub_sub_person_c = sub_person1_c._getOb(self.id2)
    person_server = self.getPersonServer()
    sub_sub_person_s = person_server._getOb(self.id1)._getOb(self.id1)._getOb(self.id2)
    kw = {'first_name':self.first_name3}
    sub_sub_person_c.edit(**kw)
    kw = {'description':self.description3}
    sub_sub_person_s.edit(**kw)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
985 986 987 988 989
    # refresh objects after synchronisation
    sub_sub_person_c = sub_person1_c._getOb(self.id2)
    sub_sub_person_s = person_server._getOb(self.id1)._getOb(self.id1)._getOb(self.id2)
    #self.assertEqual(sub_sub_person_c.getDescription(), self.description3)
    #self.assertEqual(sub_sub_person_c.getFirstName(), self.first_name3)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
990
    self.assertXMLViewIsEqual(self.sub_id1, sub_sub_person_s, sub_sub_person_c)
991

992
  def test_19_DeleteObject(self, quiet=0, run=run_all_test):
993 994 995 996 997 998
    """
      We will do a first synchronization, then delete an object on both
    sides, and we will see if nothing is left on the server and also
    on the two clients
    """
    if not run: return
999
    self.test_08_FirstSynchronization(quiet=1,run=1)
1000 1001
    if not quiet:
      ZopeTestCase._print('\nTest Delete Object ')
1002
      LOG('Testing... ',0,'test_19_DeleteObject')
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
    person_server = self.getPersonServer()
    person_server.manage_delObjects(self.id1)
    person_client1 = self.getPersonClient1()
    person_client1.manage_delObjects(self.id2)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    subscription2 = portal_sync.getSubscription(self.sub_id2)
1014 1015 1016
    self.assertEqual(len(publication.getObjectList()), 0)
    self.assertEqual(len(subscription1.getObjectList()), 0)
    self.assertEqual(len(subscription2.getObjectList()), 0)
1017

1018
  def test_20_DeleteSubObject(self, quiet=0, run=run_all_test):
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
    """
      We will do a first synchronization, then delete a sub-object on both
    sides, and we will see if nothing is left on the server and also
    on the two clients
    - before :         after :
      - id1             - id1 
        - id1             - id1
          - id2         - id2
          - id1
      - id2
    """
    if not run: return
1031
    self.test_17_AddSubObject(quiet=1,run=1)
1032 1033
    if not quiet:
      ZopeTestCase._print('\nTest Delete Sub Object ')
1034
      LOG('Testing... ',0,'test_20_DeleteSubObject')
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    sub_object_s.manage_delObjects(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    sub_object_c1.manage_delObjects(self.id2)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
1046 1047 1048
    # refresh objects
    sub_object_c1 = person_client1._getOb(self.id1)
    sub_object_c2 = person_client2._getOb(self.id1)
1049 1050 1051 1052 1053
    len_s = len(sub_object_s.objectValues())
    len_c1 = len(sub_object_c1.objectValues())
    len_c2 = len(sub_object_c2.objectValues())
    self.failUnless(len_s==len_c1==len_c2==0)

1054
  def test_21_GetConflictListOnSubObject(self, quiet=0, run=run_all_test):
1055 1056 1057 1058 1059
    """
    We will change several attributes on a sub object on both the server
    and a client, then we will see if we have correctly the conflict list
    """
    if not run: return
1060
    self.test_17_AddSubObject(quiet=1,run=1)
1061 1062
    if not quiet:
      ZopeTestCase._print('\nTest Get Conflict List On Sub Object ')
1063
      LOG('Testing... ',0,'test_21_GetConflictListOnSubObject')
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
    person_server = self.getPersonServer()
    object_s = person_server._getOb(self.id1)
    sub_object_s = object_s._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    # Change values so that we will get conflicts
    kw = {'language':self.lang2,'description':self.description2}
    sub_object_s.edit(**kw)
    kw = {'language':self.lang3,'description':self.description3}
    sub_object_c1.edit(**kw)
    self.synchronize(self.sub_id1)
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
1079
    self.assertEqual(len(conflict_list), 2)
1080
    conflict_list = portal_sync.getConflictList(sub_object_c1)
1081
    self.assertEqual(len(conflict_list), 0)
1082
    conflict_list = portal_sync.getConflictList(object_s)
1083
    self.assertEqual(len(conflict_list), 0)
1084
    conflict_list = portal_sync.getConflictList(sub_object_s)
1085
    self.assertEqual(len(conflict_list), 2)
1086

1087
  def test_22_ApplyPublisherDocumentOnSubObject(self, quiet=0, run=run_all_test):
1088 1089 1090 1091 1092
    """
    there's several conflict on a sub object, we will see if we can
    correctly have the publisher version of this document
    """
    if not run: return
1093
    self.test_21_GetConflictListOnSubObject(quiet=1,run=1)
1094 1095
    if not quiet:
      ZopeTestCase._print('\nTest Apply Publisher Document On Sub Object ')
1096
      LOG('Testing... ',0,'test_22_ApplyPublisherDocumentOnSubObject')
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    conflict.applyPublisherDocument()
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
1110 1111
    self.assertEqual(sub_object_s.getDescription(), self.description2)
    self.assertEqual(sub_object_s.getLanguage(), self.lang2)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1112 1113
    self.assertXMLViewIsEqual(self.sub_id1, sub_object_s, sub_object_c1)
    self.assertXMLViewIsEqual(self.sub_id2, sub_object_s, sub_object_c2)
1114

1115
  def test_23_ApplySubscriberDocumentOnSubObject(self, quiet=0, run=run_all_test):
1116 1117 1118 1119 1120
    """
    there's several conflict on a sub object, we will see if we can
    correctly have the subscriber version of this document
    """
    if not run: return
1121
    self.test_21_GetConflictListOnSubObject(quiet=1,run=1)
1122 1123
    if not quiet:
      ZopeTestCase._print('\nTest Apply Subscriber Document On Sub Object ')
1124
      LOG('Testing... ',0,'test_23_ApplySubscriberDocumentOnSubObject')
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    conflict.applySubscriberDocument()
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
1138 1139 1140 1141 1142 1143
    # refresh documents
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    #self.assertEqual(sub_object_s.getDescription(), self.description3)
    #self.assertEqual(sub_object_s.getLanguage(), self.lang3)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1144 1145
    self.assertXMLViewIsEqual(self.sub_id1, sub_object_s, sub_object_c1)
    self.assertXMLViewIsEqual(self.sub_id2, sub_object_s, sub_object_c2)
1146

1147
  def test_24_SynchronizeWithStrangeGid(self, quiet=0, run=run_all_test):
1148 1149 1150 1151 1152 1153 1154 1155
    """
    By default, the synchronization process use the id in order to
    recognize objects (because by default, getGid==getId. Here, we will see 
    if it also works with a somewhat strange getGid
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Synchronize With Strange Gid ')
1156
      LOG('Testing... ',0,'test_24_SynchronizeWithStrangeGid')
1157 1158 1159 1160 1161 1162 1163 1164
    self.login()
    self.setupPublicationAndSubscriptionAndGid(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    # This will test adding object
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
1165
    self.assertEqual(len(subscription1.getObjectList()), nb_person)
1166
    publication = portal_sync.getPublication(self.pub_id)
1167
    self.assertEqual(len(publication.getObjectList()), nb_person)
1168
    gid = self.first_name1 +  ' ' + self.last_name1 # ie the title 'Sebastien Robin'
1169
    gid = b16encode(gid)
1170 1171 1172
    person_c1 = subscription1.getObjectFromGid(gid)
    id_c1 = person_c1.getId()
    self.failUnless(id_c1 in ('1','2')) # id given by the default generateNewId
1173
    person_s = publication.getSubscriber(self.subscription_url1).getObjectFromGid(gid)
1174
    id_s = person_s.getId()
1175
    self.assertEqual(id_s, self.id1)
1176 1177 1178 1179
    # This will test updating object
    person_s.setDescription(self.description3)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
1180 1181
    self.assertEqual(person_s.getDescription(), self.description3)
    self.assertEqual(person_c1.getDescription(), self.description3)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1182
    self.assertXMLViewIsEqual(self.sub_id1, person_s, person_c1)
1183 1184 1185 1186 1187 1188
    # This will test deleting object
    person_server = self.getPersonServer()
    person_client1 = self.getPersonClient1()
    person_server.manage_delObjects(self.id2)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
1189 1190
    self.assertEqual(len(subscription1.getObjectList()), (nb_person-1))
    self.assertEqual(len(publication.getObjectList()), (nb_person-1))
1191
    person_s = publication.getSubscriber(self.subscription_url1).getObjectFromGid(gid)
1192
    person_c1 = subscription1.getObjectFromGid(gid)
1193
    self.assertEqual(person_s.getDescription(), self.description3)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1194
    self.assertXMLViewIsEqual(self.sub_id1, person_s, person_c1)
1195

1196
  def test_25_MultiNodeConflict(self, quiet=0, run=run_all_test):
1197 1198 1199 1200 1201
    """
    We will create conflicts with 3 differents nodes, and we will
    solve it by taking one full version of documents.
    """
    if not run: return
1202
    self.test_08_FirstSynchronization(quiet=1,run=1)
1203 1204
    if not quiet:
      ZopeTestCase._print('\nTest Multi Node Conflict ')
1205
      LOG('Testing... ',0,'test_25_MultiNodeConflict')
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'language':self.lang2,'description':self.description2,
          'format':self.format2}
    person1_s.edit(**kw)
    person_client1 = self.getPersonClient1()
    person1_c1 = person_client1._getOb(self.id1)
    kw = {'language':self.lang3,'description':self.description3,
          'format':self.format3}
    person1_c1.edit(**kw)
    person_client2 = self.getPersonClient2()
    person1_c2 = person_client2._getOb(self.id1)
    kw = {'language':self.lang4,'description':self.description4,
          'format':self.format4}
    person1_c2.edit(**kw)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    conflict_list = portal_sync.getConflictList()
1225
    self.assertEqual(len(conflict_list), 6)
1226 1227
    # check if we have the state conflict on all clients
    self.checkSynchronizationStateIsConflict()
1228 1229 1230 1231
    # we will take :
    # description on person_server
    # language on person_client1
    # format on person_client2
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1232 1233
    
    for conflict in conflict_list : 
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
      subscriber = conflict.getSubscriber()
      property = conflict.getPropertyId()
      resolve = 0
      if property == 'language':
        if subscriber.getSubscriptionUrl()==self.subscription_url1:
          resolve = 1
          conflict.applySubscriberValue()
      if property == 'format':
        if subscriber.getSubscriptionUrl()==self.subscription_url2:
          resolve = 1
          conflict.applySubscriberValue()
      if not resolve:
        conflict.applyPublisherValue()
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
1250 1251 1252
    self.assertEqual(person1_c1.getDescription(), self.description2)
    self.assertEqual(person1_c1.getLanguage(), self.lang3)
    self.assertEqual(person1_c1.getFormat(), self.format4)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1253 1254 1255 1256 1257 1258 1259 1260 1261
    self.assertEqual(person1_s.getDescription(), self.description2)
    self.assertEqual(person1_s.getLanguage(), self.lang3)
    self.assertEqual(person1_s.getFormat(), self.format4)
    self.assertXMLViewIsEqual(self.sub_id2, person1_s, person1_c2)
    # the workflow has one more "edit_workflow" in person1_c1 
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.assertXMLViewIsEqual(self.sub_id2, person1_s, person1_c2)
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c1)
1262

1263
  def test_26_SynchronizeWorkflowHistory(self, quiet=0, run=run_all_test):
1264 1265 1266 1267 1268 1269
    """
    We will do a synchronization, then we will edit two times
    the object on the server, then two times the object on the
    client, and see if the global history as 4 more actions.
    """
    if not run: return
1270
    self.test_08_FirstSynchronization(quiet=1,run=1)
1271 1272
    if not quiet:
      ZopeTestCase._print('\nTest Synchronize WorkflowHistory ')
1273
      LOG('Testing... ',0,'test_26_SynchronizeWorkflowHistory')
1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    kw1 = {'description':self.description1}
    kw2 = {'description':self.description2}
    len_wf = len(person1_s.workflow_history[self.workflow_id])
    person1_s.edit(**kw2)
    person1_c.edit(**kw2)
    person1_s.edit(**kw1)
    person1_c.edit(**kw1)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
1287 1288 1289
    # refresh documents
    person1_s = person_server._getOb(self.id1)
    person1_c = person_client1._getOb(self.id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1290
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
1291 1292
    self.assertEqual(len(person1_s.workflow_history[self.workflow_id]), len_wf+4)
    self.assertEqual(len(person1_c.workflow_history[self.workflow_id]), len_wf+4)
1293

1294
  def test_27_UpdateLocalRole(self, quiet=0, run=run_all_test):
1295 1296 1297 1298 1299
    """
    We will do a first synchronization, then modify, add and delete
    an user role and see if it is correctly synchronized
    """
    if not run: return
1300
    self.test_08_FirstSynchronization(quiet=1,run=1)
1301 1302
    if not quiet:
      ZopeTestCase._print('\nTest Update Local Role ')
1303
      LOG('Testing... ',0,'test_27_UpdateLocalRole')
1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
    # First, Create a new user
    uf = self.getPortal().acl_users
    uf._doAddUser('jp', '', ['Manager'], [])
    user = uf.getUserById('jp').__of__(uf)
    # then update create and delete roles
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)
1315
    person1_s.manage_setLocalRoles('fab',['Manager','Owner'])
1316
    person2_s.manage_setLocalRoles('jp',['Manager','Owner'])
1317
    person2_s.manage_delLocalRoles(['fab'])
1318 1319
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
1320 1321 1322
    # refresh documents
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1323 1324
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
    self.assertXMLViewIsEqual(self.sub_id2, person2_s, person2_c)
1325 1326 1327 1328 1329 1330 1331
    role_1_s = person1_s.get_local_roles()
    role_2_s = person2_s.get_local_roles()
    role_1_c = person1_c.get_local_roles()
    role_2_c = person2_c.get_local_roles()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)

1332
  def test_28_PartialData(self, quiet=0, run=run_all_test):
1333 1334 1335 1336 1337 1338
    """
    We will do a first synchronization, then we will do a change, then
    we will modify the SyncCode max_line value so it
    it will generate many messages
    """
    if not run: return
1339
    self.test_08_FirstSynchronization(quiet=1,run=1)
1340 1341
    if not quiet:
      ZopeTestCase._print('\nTest Partial Data ')
1342
      LOG('Testing... ',0,'test_28_PartialData')
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355
    previous_max_lines = SyncCode.MAX_LINES
    SyncCode.MAX_LINES = 10
    self.populatePersonServerWithSubObject(quiet=1,run=1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
    sub_sub_person1 = sub_person1_c._getOb(self.id1)
    sub_sub_person2 = sub_person1_c._getOb(self.id2)
    # remove ('','portal...','person_server')
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
1356
    self.assertEqual(len_path, 3)
1357
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
1358
    self.assertEqual(len_path, 3)
1359 1360 1361 1362 1363 1364
    self.assertEquals(sub_sub_person1.getDescription(),self.description1)
    self.assertEquals(sub_sub_person1.getFirstName(),self.first_name1)
    self.assertEquals(sub_sub_person1.getLastName(),self.last_name1)
    self.assertEquals(sub_sub_person2.getDescription(),self.description2)
    self.assertEquals(sub_sub_person2.getFirstName(),self.first_name2)
    self.assertEquals(sub_sub_person2.getLastName(),self.last_name2)
1365 1366
    SyncCode.MAX_LINES = previous_max_lines

1367
  def test_29_BrokenMessage(self, quiet=0, run=run_all_test):
1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378
    """
    With http synchronization, when a message is not well
    received, then we send message again, we want to
    be sure that is such case we don't do stupid things
    
    If we want to make this test more intersting, it is
    better to split messages
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Broken Message ')
1379
      LOG('Testing... ',0,'test_29_BrokenMessage')
1380 1381 1382 1383 1384 1385 1386 1387 1388
    previous_max_lines = SyncCode.MAX_LINES
    SyncCode.MAX_LINES = 10
    self.setupPublicationAndSubscription(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    # Synchronize the first client
    nb_message1 = self.synchronizeWithBrokenMessage(self.sub_id1)
    #self.failUnless(nb_message1==self.nb_message_first_synchronization)
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
1389
    self.assertEqual(len(subscription1.getObjectList()), nb_person)
1390 1391 1392
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1393 1394
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
1395 1396 1397
    self.assertEqual(person1_s.getId(), self.id1)
    self.assertEqual(person1_s.getFirstName(), self.first_name1)
    self.assertEqual(person1_s.getLastName(), self.last_name1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1398
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
1399 1400
    SyncCode.MAX_LINES = previous_max_lines

1401
  def test_30_GetSynchronizationType(self, quiet=0, run=run_all_test):
1402
    # We will try to update some simple data, first
1403
    # we change on the server side, then on the client side
1404 1405 1406
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Get Synchronization Type ')
1407 1408
      LOG('Testing... ',0,'test_30_GetSynchronizationType')
    self.test_08_FirstSynchronization(quiet=1,run=1)
1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
    # First we do only modification on server
    # Check for each subsription that the synchronization type
    # is TWO WAY
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_s.edit(**kw)
    self.synchronize(self.sub_id1)
    # Then we do only modification on a client
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1423
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
1424 1425 1426
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1427
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
1428 1429 1430 1431 1432 1433 1434 1435 1436
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
    # Then we do only modification on both the client and the server
    # and of course, on the same object
    kw = {'first_name':self.first_name3}
    person1_s.edit(**kw)
    kw = {'description':self.description3}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1437
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
1438 1439 1440
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)

1441
  def test_31_UpdateLocalPermission(self, quiet=0, run=run_all_test):
1442 1443 1444 1445 1446
    """
    We will do a first synchronization, then modify, add and delete
    an user role and see if it is correctly synchronized
    """
    if not run: return
1447
    self.test_08_FirstSynchronization(quiet=1,run=1)
1448 1449
    if not quiet:
      ZopeTestCase._print('\nTest Update Local Permission ')
1450
      LOG('Testing... ',0,'test_31_UpdateLocalPermission')
1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
    # then create roles
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)
    person1_s.manage_setLocalPermissions('View',['Manager','Owner'])
    person2_s.manage_setLocalPermissions('View',['Manager','Owner'])
    person2_s.manage_setLocalPermissions('View management screens',['Owner',])
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
1463 1464 1465 1466 1467 1468 1469

    # refresh documents
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)

1470 1471 1472 1473 1474 1475
    role_1_s = person1_s.get_local_permissions()
    role_2_s = person2_s.get_local_permissions()
    role_1_c = person1_c.get_local_permissions()
    role_2_c = person2_c.get_local_permissions()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1476 1477
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
    self.assertXMLViewIsEqual(self.sub_id2, person2_s, person2_c)
1478 1479 1480 1481 1482
    person1_s.manage_setLocalPermissions('View',['Owner'])
    person2_s.manage_setLocalPermissions('View',None)
    person2_s.manage_setLocalPermissions('View management screens',())
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
1483 1484 1485 1486 1487 1488 1489

    # refresh documents
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)

1490 1491 1492 1493
    role_1_s = person1_s.get_local_permissions()
    role_2_s = person2_s.get_local_permissions()
    role_1_c = person1_c.get_local_permissions()
    role_2_c = person2_c.get_local_permissions()
1494 1495
    self.assertEquals(role_1_s, role_1_c)
    self.assertEquals(role_2_s, role_2_c)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1496 1497
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
    self.assertXMLViewIsEqual(self.sub_id2, person2_s, person2_c)
1498

1499
  def test_32_AddOneWaySubscription(self, quiet=0, run=run_all_test):
1500 1501 1502 1503 1504 1505
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Add One Way Subscription ')
      LOG('Testing... ',0,'test_32_AddOneWaySubscription')
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1506 1507
    if portal_sync.getSubscription(self.sub_id1) is not None:
      portal_sync.manage_deleteSubscription(title=self.sub_id1)
1508
    portal_sync.manage_addSubscription(title=self.sub_id1,
1509
        publication_url=self.publication_url,
1510
        subscription_url=self.subscription_url1,
1511
        destination_path='/%s/person_client1' % portal_id,
1512 1513 1514 1515 1516
        source_uri='Person',
        target_uri='Person',
        query='objectValues',
        xml_mapping=self.xml_mapping,
        conduit='ERP5Conduit',
1517
        gpg_key='',
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1518
        activity_enabled=self.activity_enabled,
1519 1520 1521
        alert_code = SyncCode.ONE_WAY_FROM_SERVER,
        login = 'fab',
        password = 'myPassword')
1522
    sub = portal_sync.getSubscription(self.sub_id1)
1523
    self.assertTrue(sub.isOneWayFromServer())
1524 1525
    self.failUnless(sub is not None)

1526
  def test_33_OneWaySync(self, quiet=0, run=run_all_test):
1527 1528 1529 1530 1531 1532 1533 1534 1535
    """
    We will test if we can synchronize only from to server to the client.
    We want to make sure in this case that all modifications on the client
    will not be taken into account.
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest One Way Sync ')
      LOG('Testing... ',0,'test_33_OneWaySync')
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1536 1537 1538 1539 1540 1541 1542
    person_server = self.getPersonServer()
    if person_server is not None:
      portal = self.getPortal()
      portal._delObject(id='person_server')
      portal._delObject(id='person_client1')
      portal._delObject(id='person_client2')
      self.deletePublicationAndSubscription() 
1543 1544 1545 1546 1547 1548
    self.test_02_AddPublication(quiet=1,run=1)
    self.test_32_AddOneWaySubscription(quiet=1,run=1)

    nb_person = self.populatePersonServer(quiet=1,run=1)
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
1549
      self.assertEquals(sub.getSynchronizationType(), SyncCode.SLOW_SYNC)
1550 1551 1552
    # First do the sync from the server to the client
    nb_message1 = self.synchronize(self.sub_id1)
    for sub in portal_sync.getSubscriptionList():
1553
      self.assertEquals(sub.getSynchronizationType(), SyncCode.ONE_WAY_FROM_SERVER)
1554 1555 1556 1557 1558 1559
    self.assertEquals(nb_message1,self.nb_message_first_synchronization)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.assertEquals(len(subscription1.getObjectList()),nb_person)
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1560 1561
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
1562 1563 1564
    self.assertEqual(person1_s.getId(), self.id1)
    self.assertEqual(person1_s.getFirstName(), self.first_name1)
    self.assertEqual(person1_s.getLastName(), self.last_name1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1565 1566
    self.checkSynchronizationStateIsSynchronized()
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c, force=1)
1567 1568 1569 1570 1571
    # Then we change things on both sides and we look if there
    # is synchronization from only one way
    person1_c.setFirstName(self.first_name2)
    person1_s.setLastName(self.last_name2)
    nb_message1 = self.synchronize(self.sub_id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
    #In One_From_Server Sync not modify the first_name in client because any
    #datas client sent
    self.assertEquals(person1_c.getFirstName(), self.first_name2)
    self.assertEquals(person1_c.getLastName(), self.last_name2)
    self.assertEquals(person1_s.getFirstName(), self.first_name1)
    self.assertEquals(person1_s.getLastName(), self.last_name2) 
    #reset for refresh sync
    #after synchronize, the client object retrieve value of server
    self.resetSignaturePublicationAndSubscription()
    nb_message1 = self.synchronize(self.sub_id1)
1582 1583 1584 1585 1586

    # refresh documents
    person1_s = person_server._getOb(self.id1)
    person1_c = person_client1._getOb(self.id1)

Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1587 1588 1589 1590
    self.assertEquals(person1_s.getFirstName(), self.first_name1)
    self.assertEquals(person1_s.getLastName(), self.last_name2) 
    self.checkSynchronizationStateIsSynchronized()
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c, force=1)
1591

1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
  def test_34_encoding(self, quiet=0, run=run_all_test):
    """
    We will test if we can encode strings with b64encode to encode
    the login and password for authenticated sessions
    """
    #when there will be other format implemented with encode method,
    #there will be tested here

    if not run: return
    self.test_08_FirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Strings Encoding ')
      LOG('Testing... ',0,'test_34_encoding')
      
    #define some strings :
    python = 'www.python.org'
    awaited_result_python = "d3d3LnB5dGhvbi5vcmc="
    long_string = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNO\
PQRSTUVWXYZéèçà@^~µ&²0123456789!@#0^&*();:<>,. []{}\xc3\xa7sdf__\
sdf\xc3\xa7\xc3\xa7\xc3\xa7_df___&&\xc3\xa9]]]\xc2\xb0\xc2\xb0\xc2\
\xb0\xc2\xb0\xc2\xb0\xc2\xb0"
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1613 1614
    #= "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZéèçà@^~µ&²012345
    #6789!@#0^&*();:<>,. []{}çsdf__sdfççç_df___&&é]]]°°°°°°'"
1615

Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1616
    awaited_result_long_string = "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNERUZH\
1617
SElKS0xNTk9QUVJTVFVWV1hZWsOpw6jDp8OgQF5+wrUmwrIwMTIzNDU2Nzg5IUAjMF4mKigpOzo8Pi\
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1618
wuIFtde33Dp3NkZl9fc2Rmw6fDp8OnX2RmX19fJibDqV1dXcKwwrDCsMKwwrDCsA=="
1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
    #test just b64encode
    self.assertEqual(b64encode(python), awaited_result_python)
    self.assertEqual(b64encode(""), "")
    self.assertEqual(b64encode(long_string), awaited_result_long_string)
    
    self.assertEqual(b64decode(awaited_result_python), python)
    self.assertEqual(b64decode(""), "")
    self.assertEqual(b64decode(awaited_result_long_string), long_string)

    # test with the ERP5 functions
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
      
    string_encoded = subscription1.encode('b64', python)
    self.assertEqual(string_encoded, awaited_result_python)
    string_decoded = subscription1.decode('b64', awaited_result_python)
    self.assertEqual(string_decoded, python)
    self.failUnless(subscription1.isDecodeEncodeTheSame(string_encoded, 
      python, 'b64'))
    self.failUnless(subscription1.isDecodeEncodeTheSame(string_encoded, 
      string_decoded, 'b64'))

    string_encoded = subscription1.encode('b64', long_string) 
    self.assertEqual(string_encoded, awaited_result_long_string)
    string_decoded = subscription1.decode('b64', awaited_result_long_string)
    self.assertEqual(string_decoded, long_string)
    self.failUnless(subscription1.isDecodeEncodeTheSame(string_encoded, 
      long_string, 'b64'))
    self.failUnless(subscription1.isDecodeEncodeTheSame(string_encoded, 
      string_decoded, 'b64'))

    self.assertEqual(subscription1.encode('b64', ''), '')
    self.assertEqual(subscription1.decode('b64', ''), '')
    self.failUnless(subscription1.isDecodeEncodeTheSame(
      subscription1.encode('b64', ''), '', 'b64'))

1656
  def test_35_authentication(self, quiet=0, run=run_all_test):
1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
    """
      we will test 
      - if we can't synchronize without good authentication for an 
      autentication required publication.
      - if we can synchronize without of with (and bad or good) authentication
      for an not required autentication publication
    """

    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Authentication ')
      LOG('Testing... ',0,'test_35_authentication')
    
    self.test_08_FirstSynchronization(quiet=1,run=1)
    # First we do only modification on client
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)

    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_c.edit(**kw)
   
    #check that it's not synchronize
    self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name3,
      self.last_name3, person1_s, person1_c)
    self.synchronize(self.sub_id1)
    #now it should be synchronize
    self.checkSynchronizationStateIsSynchronized()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1687 1688 1689
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
    self.assertEquals(person1_s.getFirstName(), self.first_name3)
    self.assertEquals(person1_s.getLastName(), self.last_name3)
1690 1691 1692 1693
 
    #adding authentication :
    self.addAuthenticationToPublication(self.pub_id, 'fab', 'myPassword', 'b64',
        'syncml:auth-basic')
1694 1695 1696
    self.addAuthenticationToSubscription(self.sub_id1, 'pouet', 'pouet', 
        'b64', 'syncml:auth-basic')
    # try to synchronize with a wrong authentication on the subscription, it 
1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
    # should failed
    kw = {'first_name':self.first_name2,'last_name':self.last_name2}
    person1_c.edit(**kw)
    self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name2, 
      self.last_name2, person1_s, person1_c)
    # here, before and after synchronization, the person1_s shoudn't have
    # the name as the person1_c because the user isn't authenticated
    self.synchronize(self.sub_id1)
    self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name2, 
      self.last_name2, person1_s, person1_c)

    #try to synchronize whith an authentication on both the client and server
    self.addAuthenticationToSubscription(self.sub_id1, 'fab', 'myPassword', 
        'b64', 'syncml:auth-basic')
    #now it should be correctly synchronize
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1714 1715 1716
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
    self.assertEquals(person1_s.getFirstName(), self.first_name2)
    self.assertEquals(person1_s.getLastName(), self.last_name2)
1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742

    #try to synchronize with a bad login and/or password
    #test if login is case sensitive (it should be !)
    self.addAuthenticationToSubscription(self.sub_id1, 'fAb', 'myPassword', 
        'b64', 'syncml:auth-basic')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
    self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name1, 
      self.last_name1, person1_s, person1_c)

    #with a paswword case sensitive
    self.addAuthenticationToSubscription(self.sub_id1, 'fab', 'mypassword', 
        'b64', 'syncml:auth-basic')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
    self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name1, 
      self.last_name1, person1_s, person1_c)
    
    #with the good password
    self.addAuthenticationToSubscription(self.sub_id1, 'fab', 'myPassword', 
        'b64', 'syncml:auth-basic')
    #now it should be correctly synchronize
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1743 1744 1745
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
    self.assertEquals(person1_s.getFirstName(), self.first_name1)
    self.assertEquals(person1_s.getLastName(), self.last_name1)
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770

    #verify that the login and password with utf8 caracters are accecpted

    # add a user with an utf8 login
    uf = self.getPortal().acl_users
    uf._doAddUser('\xc3\xa9pouet', 'ploum', ['Manager'], []) # \xc3\xa9pouet = épouet
    user = uf.getUserById('\xc3\xa9pouet').__of__(uf)
    newSecurityManager(None, user)

    self.addAuthenticationToPublication(self.pub_id, '\xc3\xa9pouet', 'ploum', 
        'b64', 'syncml:auth-basic')
    #first, try with a wrong login :
    self.addAuthenticationToSubscription(self.sub_id1, 'pouet', 'ploum', 
        'b64', 'syncml:auth-basic')
    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_c.edit(**kw)
    self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name3, 
      self.last_name3, person1_s, person1_c)
    self.synchronize(self.sub_id1)
    self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name3, 
      self.last_name3, person1_s, person1_c)
    #now with the good :
    self.addAuthenticationToSubscription(self.sub_id1, '\xc3\xa9pouet', 'ploum',
        'b64', 'syncml:auth-basic')
    self.synchronize(self.sub_id1)
Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1771 1772 1773
    self.assertXMLViewIsEqual(self.sub_id1, person1_s, person1_c)
    self.assertEquals(person1_s.getFirstName(), self.first_name3)
    self.assertEquals(person1_s.getLastName(), self.last_name3)
1774
    self.checkSynchronizationStateIsSynchronized()
Nicolas Delaby's avatar
Nicolas Delaby committed
1775

Danièle Vanbaelinghem's avatar
Danièle Vanbaelinghem committed
1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
  def test_36_SynchronizationSubscriptionMaxLines(self, quiet=0, run=run_all_test):
    # We will try to populate the folder person_server
    # with the data form person_client 
    # with the data which over max line of messages
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Synchronization Subscription Max Lines')
      LOG('Testing... ',0,'test_36_SynchronizationSubscriptionMaxLines')
    self.login()
    self.setupPublicationAndSubscription(quiet=1, run=1)
    nb_person = self.populatePersonClient1(quiet=1, run=1)
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(), SyncCode.SLOW_SYNC)
    # Synchronize the first client
    # data_Sub1 -> Pub (the data are in sub1 to pub is empty)
    nb_message1 = self.synchronize(self.sub_id1)
    #Verification number object synchronized
    self.assertEqual(nb_message1, self.nb_message_first_sync_max_lines)
    # Synchronize the second client
    # data_Pub -> data_Sub2 the data are in pub to sub2 is empty so add +2 messages)
    nb_message2 = self.synchronize(self.sub_id2)
    self.assertEqual(nb_message2, self.nb_message_first_sync_max_lines + 2)
    person_server = self.getPersonServer()
    person_client1 = self.getPersonClient1()
    person_client2 = self.getPersonClient2()
    for id in range(1, 60):
      person_s = person_server._getOb(str(id))
      person_c = person_client1._getOb(str(id))
      self.assertXMLViewIsEqual(self.sub_id1, person_s, person_c)
    self.assertEqual(nb_person, len(person_server.objectValues()))
    self.assertEqual(nb_person, len(person_client2.objectValues()))
1808

1809 1810 1811 1812
def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestERP5SyncML))
    return suite
Sebastien Robin's avatar
Sebastien Robin committed
1813