SynchronizationTool.py 19.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
# -*- coding: utf-8 -*-
## Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

Aurel's avatar
Aurel committed
28 29 30 31 32
from os import path
from lxml import etree
from logging import getLogger, Formatter

from AccessControl import ClassSecurityInfo
33 34 35 36

from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Products.ERP5Type.Globals import InitializeClass
Aurel's avatar
Aurel committed
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
from Products.ERP5SyncML.SyncMLConstant import ACTIVITY_PRIORITY, \
    SynchronizationError
from Products.ERP5SyncML.SyncMLMessage import SyncMLResponse, SyncMLRequest
from Products.ERP5SyncML.Engine.SynchronousEngine import SyncMLSynchronousEngine
from Products.ERP5SyncML.Engine.AsynchronousEngine import SyncMLAsynchronousEngine
from Products.ERP5SyncML.Transport.HTTP import HTTPTransport
from Products.ERP5SyncML.Transport.File import FileTransport
from Products.ERP5SyncML.Transport.Mail import MailTransport
from Products.ERP5.ERP5Site import getSite

synchronous_engine = SyncMLSynchronousEngine()
asynchronous_engine = SyncMLAsynchronousEngine()

transport_scheme_dict = {
  "http" : HTTPTransport(),
  "https" : HTTPTransport(),
  "file" : FileTransport(),
  "mail" : MailTransport(),
  }
56 57 58 59 60

parser = etree.XMLParser(remove_blank_text=True)

# Logging channel definitions
# Main logging channel
Aurel's avatar
Aurel committed
61 62 63 64
syncml_logger = getLogger('ERP5SyncML')
# Direct logging to "[instancehome]/log/ERP5SyncML.log", if this
# directory exists. Otherwise, it will end up in root logging
# facility (ie, event.log).
65 66 67
from App.config import getConfiguration
instancehome = getConfiguration().instancehome
if instancehome is not None:
Aurel's avatar
Aurel committed
68 69
  log_directory = path.join(instancehome, 'log')
  if path.isdir(log_directory):
70 71
    from Signals import Signals
    from ZConfig.components.logger.loghandler import FileHandler
Aurel's avatar
Aurel committed
72 73
    log_file_handler = FileHandler(path.join(log_directory,
                                                'ERP5SyncML.log'))
74 75 76
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
Aurel's avatar
Aurel committed
77 78 79
    log_file_handler.setFormatter(Formatter(
      "%(asctime)s %(levelname)s %(name)s %(message)s",
      "%Y-%m-%dT%H:%M:%S"))
80 81 82 83 84
    Signals.registerZopeSignals([log_file_handler])
    syncml_logger.addHandler(log_file_handler)
    syncml_logger.propagate = 0


Aurel's avatar
Aurel committed
85
def checkAlertCommand(syncml_request):
86
  """
Aurel's avatar
Aurel committed
87 88
  This parse the alert commands received and return a
  dictionnary mapping database to sync mode
89
  """
Aurel's avatar
Aurel committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
  database_alert_list = []
  # XXX To be moved on engine
  search = getSite().portal_categories.syncml_alert_code.searchFolder
  for alert in syncml_request.alert_list:
    if alert["data"] == "222":
      # 222 is for asking next message, do not care
      continue
    # Retrieve the category
    # XXX Categories must be redefined, ID must be code, not title so
    # that we can drop the use of searchFolder
    alert_code_category_list = search(reference=alert['data'])
    if len(alert_code_category_list) == 1:
      alert_code_category = alert_code_category_list[0].getId()
    else:
      # Must return (405) Command not allowed
      raise NotImplementedError("Alert code is %s, got %s category" %
                                (alert['data'],
                                 len(alert_code_category_list)))
    # Copy the whole dict & add the category id
    alert["code"] = alert_code_category
    database_alert_list.append(alert)

  return database_alert_list
113 114 115 116



class SynchronizationTool(BaseTool):
Aurel's avatar
Aurel committed
117
  """ This tool implements the SyncML Protocol
118

Aurel's avatar
Aurel committed
119
  SyncML Protocol defines how to synchronize data between clients and server.
120

Aurel's avatar
Aurel committed
121 122 123 124 125 126 127 128 129
  Here is a mapping of the specification with the implementation in this tool :
  - client are subscriptions
  - server are publications
  - change log are managed through the use of signatures. A signature contains
    the last data sent and which was successfully synchronized. When running a
    new synchronization new data is compared with the one stored in signature
    to detect changes.
  """
  id = "portal_synchronizations"
130 131 132 133

  security = ClassSecurityInfo()

  security.declareProtected(Permissions.AccessContentsInformation,
Aurel's avatar
Aurel committed
134
                            'getConflictList')
135 136 137 138 139 140 141 142 143 144 145
  def getConflictList(self, context=None):
    """
    Retrieve the list of all conflicts
    Here the list is as follow :
    [conflict_1,conflict2,...] where conflict_1 is like:
    ['publication',publication_id,object.getPath(),property_id,
    publisher_value,subscriber_value]
    """
    conflict_list = []
    for publication in self.searchFolder(portal_type='SyncML Publication'):
      for result in publication.searchFolder(
Aurel's avatar
Aurel committed
146
          portal_type='SyncML Subscription'):
147 148 149 150 151 152 153 154 155 156 157 158 159 160
        subscriber = result.getObject()
        sub_conflict_list = subscriber.getConflictList()
        for conflict in sub_conflict_list:
          if context is None or conflict.getOriginValue() == context:
            conflict_list.append(conflict.__of__(subscriber))
    for result in self.searchFolder(portal_type='SyncML Subscription'):
      subscription = result.getObject()
      sub_conflict_list = subscription.getConflictList()
      for conflict in sub_conflict_list:
        if context is None or conflict.getOriginValue() == context:
          conflict_list.append(conflict.__of__(subscription))
    return conflict_list

  security.declareProtected(Permissions.AccessContentsInformation,
Aurel's avatar
Aurel committed
161
                             'getDocumentConflictList')
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
  def getDocumentConflictList(self, context=None):
    """
    Retrieve the list of all conflicts for a given document
    Well, this is the same thing as getConflictList with a path
    """
    return self.getConflictList(context)

  security.declareProtected(Permissions.AccessContentsInformation,
                            'getSubscriberDocumentVersion')
  def getSubscriberDocumentVersion(self, conflict, docid):
    """
    Given a 'conflict' and a 'docid' refering to a new version of a
    document, applies the conflicting changes to the document's new
    version. By so, two differents versions of the same document will be
    available.
    Thus, the manager will be able to open both version of the document
    before selecting which one to keep.
    """
    subscriber = conflict.getSubscriber()
    publisher_object = conflict.getOrigineValue()
    publisher_xml = self.getXMLObject(
                       object=publisher_object,
                       xml_mapping=subscriber.getXmlBindingGeneratorMethodId())
    directory = publisher_object.aq_parent
    object_id = docid
    if object_id in directory.objectIds():
Aurel's avatar
Aurel committed
188
      directory._delObject(object_id)  # XXX Why not manage_delObjects ?
189
      # Import the conduit and get it
Aurel's avatar
Aurel committed
190
      conduit = subscriber.getConduit()
191 192 193 194 195 196
      conduit.addNode(xml=publisher_xml, object=directory,
                      object_id=object_id,
                      signature=conflict.getParentValue())
      subscriber_document = directory._getOb(object_id)
      for c in self.getConflictList(conflict.getOriginValue()):
        if c.getSubscriber() == subscriber:
Aurel's avatar
Aurel committed
197
          c.applySubscriberValue(document=subscriber_document)
198 199
      return subscriber_document

Aurel's avatar
Aurel committed
200 201 202
  # XXX- ?
  def _getCopyId(self, document):
    directory = document.aq_inner.aq_parent
203
    if directory.getId() != 'portal_repository':
Aurel's avatar
Aurel committed
204 205 206
      document_id = document.getId() + '_conflict_copy'
      if document_id in directory.objectIds():
        directory._delObject(document_id)  # XXX manage_delObjects ?
207 208
    else:
      repotool = directory
Aurel's avatar
Aurel committed
209 210 211 212 213 214 215 216 217
      docid = repotool.getDocidAndRevisionFromObjectId(document.getId())[0]
      new_rev = repotool.getFreeRevision(docid) + 10  # make sure it's not gonna provoke conflicts
      document_id = repotool._getId(docid, new_rev)
    return document_id

  #
  # XXX-Aurel : the following methods must be moved to a specific part that
  # manages protocols to send/receive messages
  #
218
  security.declarePublic('readResponse')
Aurel's avatar
Aurel committed
219
  def readResponse(self, text='', sync_id=None, from_url=None):
220 221 222 223
    """
    We will look at the url and we will see if we need to send mail, http
    response, or just copy to a file.
    """
Aurel's avatar
Aurel committed
224
    syncml_logger.info('readResponse sync_id %s, text %s' % (sync_id, text))
225 226 227
    if text:
      # we are still anonymous at this time, use unrestrictedSearchResults
      # to fetch the Subcribers
Aurel's avatar
Aurel committed
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
      catalog_tool = self.getPortalObject().portal_catalog.unrestrictedSearchResults
      syncml_request = SyncMLRequest(text)

      # It is assumed that client & server does not share the same database ID
      # (source_reference); this must be checked using constraint
      for publication in catalog_tool(portal_type='SyncML Publication',
                                      source_reference=sync_id,
                                      validation_state='validated'):
        if publication.getUrlString() == syncml_request.header['target']:
          # Do not process in activity first checking, a message ordering
          # is required by protocol specification, only use activity when no
          # race condition can happen (ie no final tag)

          # XXX For now do in activity otherwise we never answer the HTTP request
          # directly and so it ends with client/server stuck waiting for answer and
          # on the other side we are doing an http request to send them
244
          if publication.getIsActivityEnabled():
Aurel's avatar
Aurel committed
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
            return self.activate(
              activity="SQLQueue",
              tag=publication.getRelativeUrl(),
              priority=ACTIVITY_PRIORITY-1).processServerSynchronization(
                publication.getPath(), text)
          else:
            return self.processServerSynchronization(publication.getPath(), text)

      for subscription in catalog_tool(portal_type='SyncML Subscription',
                                       source_reference=sync_id,
                                       validation_state='validated'):
        if subscription.getSubscriptionUrlString() == syncml_request.header['target']:
          if subscription.getIsActivityEnabled():
            return self.activate(activity="SQLQueue",
                                 priority=ACTIVITY_PRIORITY-1).processClientSynchronization(
                                   subscription.getPath(),text)
261
          else:
Aurel's avatar
Aurel committed
262 263 264 265 266 267
            return self.processClientSynchronization(subscription.getPath(), text)

      # XXX maybe it is better to generate a syncml error message
      raise ValueError("Impossible to find a pub/sub to process message %s:%s"
                       % (sync_id, syncml_request.header['target']))

268
    # we use from only if we have a file
Aurel's avatar
Aurel committed
269 270 271 272
    elif isinstance(from_url, basestring):
      if from_url.startswith('file:'):
        filename = from_url[len('file:'):]
        xml = None
273 274 275
        try:
          stream = file(filename, 'r')
        except IOError:
Aurel's avatar
Aurel committed
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
          # XXX-Aurel : Why raising here make unit tests to fail ?
          # raise ValueError("Impossible to read file %s, error is %s"
          #                  % (filename, msg))
          pass
        else:
          xml = stream.read()
          stream.close()
        syncml_logger.debug('readResponse xml from file is %s' % (xml,))
        if xml:
          return xml
  #
  # End of part managing protocols
  #

  #
  # Following methods are related to server (Publication)
  #
  security.declarePrivate('processServerSynchronization')
  def processServerSynchronization(self, publication_path, msg=None):
295 296 297 298 299
    """
      This is the synchronization method for the server
    """
    # Read the request from the client
    publication = self.unrestrictedTraverse(publication_path)
Aurel's avatar
Aurel committed
300 301 302 303
    if publication.getIsActivityEnabled():
      engine = asynchronous_engine
    else:
      engine = synchronous_engine
304

Aurel's avatar
Aurel committed
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
    if msg is None:
      # Read message from file
      msg = self.readResponse(from_url=publication.getUrlString(),
                                     sync_id=publication.getSourceReference())
    if msg is not None:
      syncml_request = SyncMLRequest(msg)
      #syncml_logger.info("\tXML received from client %s" %(str(syncml_request)))

      # Get the subscriber
      subscription_url = syncml_request.header['source']
      subscriber = publication.getSubscriber(subscription_url)  # XXX method to be renamed

      # Alert commands are generated at initialization phase or when client ask
      # for the remaining messages
      database_alert_list = checkAlertCommand(syncml_request)
      assert len(database_alert_list) <= 1, "Multi-databases sync no supported"
      if len(database_alert_list):
        # We are initializing the synchronization
        if subscriber and subscriber.getSynchronizationState() not in \
              ("not_running", "initializing", "finished"):
          syncml_logger.error(
            'Trying to start a synchronization on server side : %s although synchronisation is already running'
            % (subscriber.getPath(),))
          # Prevent initilisation if sync already running
          return
        syncml_response = engine.processServerInitialization(
          publication=publication,
          syncml_request=syncml_request,
          subscriber=subscriber,
          alert_dict=database_alert_list[0])
335
      else:
Aurel's avatar
Aurel committed
336 337
        if not subscriber:
          raise ValueError("First synchronization message must contains alert command")
338
        else:
Aurel's avatar
Aurel committed
339 340 341 342 343
          # Let engine manage the synchronization
          try:
            return engine.processServerSynchronization(subscriber, syncml_request)
          except SynchronizationError:
            return
344
    else:
Aurel's avatar
Aurel committed
345 346
      # This must be implemented following the syncml protocol, not with this hack
      raise NotImplementedError("Starting sync process from server is forbidden")
347

Aurel's avatar
Aurel committed
348 349
    # Return message for unit test purpose
    return str(syncml_response)
350

Aurel's avatar
Aurel committed
351 352 353 354 355 356
  #
  # Following methods are related to client (subscription)
  #
  security.declareProtected(Permissions.ModifyPortalContent,
                            'processClientSynchronization')
  def processClientSynchronization(self, subscription_path, msg=None):
357 358
    """
      This is the synchronization method for the client
Aurel's avatar
Aurel committed
359 360

      This is the first method called to launch a synchronization process
361 362
    """
    subscription = self.unrestrictedTraverse(subscription_path)
Aurel's avatar
Aurel committed
363 364 365 366 367
    if subscription.getIsActivityEnabled():
      engine = asynchronous_engine
    else:
      engine = synchronous_engine

368
    if msg is None and subscription.getSubscriptionUrlString('').find('file') >= 0:
Aurel's avatar
Aurel committed
369
      # XXX This is a hack for unit test only, must be removed
370 371
      msg = self.readResponse(sync_id=subscription.getDestinationReference(),
                              from_url=subscription.getSubscriptionUrlString())
Aurel's avatar
Aurel committed
372 373


374
    if msg is None:
Aurel's avatar
Aurel committed
375 376 377
      # This is a synchronization initialisation call
      # Even if call on asynchronous engine, this will not use activities
      syncml_response = engine.initializeClientSynchronization(subscription)
378
    else:
Aurel's avatar
Aurel committed
379
      syncml_request = SyncMLRequest(msg)
380

Aurel's avatar
Aurel committed
381 382 383 384 385
      if not subscription.checkCorrectRemoteMessageId(
          syncml_request.header['message_id']):
        # Message already processed, resend the response
        # XXX How to make sure we send the good last response ?
        raise NotImplementedError
386
      else:
Aurel's avatar
Aurel committed
387 388 389 390 391 392 393 394 395 396
        return engine.processClientSynchronization(syncml_request, subscription)

    # Send the message
    # XXX This must depends on activity enables property, maybe use engine
    if subscription.getIsActivityEnabled():
      subscription.activate(
        after_tag="%s_reset" %(subscription.getPath(),),
        activity="SQLQueue",
        priority=ACTIVITY_PRIORITY,
        tag=subscription.getRelativeUrl()).sendMessage(str(syncml_response))
397
    else:
Aurel's avatar
Aurel committed
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
      subscription.sendMessage(str(syncml_response))

    return str(syncml_response)

  def applySyncCommand(self, subscription_path, response_message_id,
                       activate_kw, **kw):
    """
    This methods is intented to be called by asynchronous engine in activity to
    apply sync commands for a subset of data
    As engines are not zodb object, the tool acts as a placeholder for method
    that need to be called in activities
    """
    subscription = self.restrictedTraverse(subscription_path)
    assert subscription is not None, "Impossible to find subscription %s" \
        % (subscription_path)
    # Build Message
    if response_message_id:
      syncml_response = SyncMLResponse()
      syncml_response.addHeader(
        session_id=subscription.getSessionId(),
        message_id=response_message_id,
        target=subscription.getUrlString(),
        source=subscription.getSubscriptionUrlString())
      syncml_response.addBody()
422
    else:
Aurel's avatar
Aurel committed
423
      syncml_response = None
424

Aurel's avatar
Aurel committed
425
    subscription.applySyncCommand(syncml_response=syncml_response, **kw)
426

Aurel's avatar
Aurel committed
427 428 429 430 431 432 433 434 435 436
    # Send the message in activity to prevent recomputing data in case of
    # transport failure
    if syncml_response:
      syncml_logger("---- %s sending %s notifications of sync"
                    % (subscription.getTitle(),
                       syncml_response.sync_confirmation_counter))
      subscription.activate(activity="SQLQueue",
                            # group_method_id=None,
                            # group_method_cost=.05,
                            tag=activate_kw).sendMessage(xml=str(syncml_response))
437 438 439



Aurel's avatar
Aurel committed
440 441
  def sendSyncCommand(self, id_list, message_id, subscription_path,
                      activate_kw, is_final_message=False):
442
    """
Aurel's avatar
Aurel committed
443 444 445 446
    This methods is intented to be called by asynchronous engine in activity to
    send sync commands for a subset of data
    As engines are not zodb object, the tool acts as a placeholder for method
    that need to be called in activities
447
    """
Aurel's avatar
Aurel committed
448 449 450 451 452 453 454 455 456 457 458
    subscription = self.restrictedTraverse(subscription_path)
    assert subscription is not None, "Impossible to find subscription %s" \
        % (subscription_path)
    # Build Message
    syncml_response = SyncMLResponse()
    syncml_response.addHeader(
      session_id=subscription.getSessionId(),
      message_id=message_id,
      target=subscription.getUrlString(),
      source=subscription.getSubscriptionUrlString())
    syncml_response.addBody()
459 460


Aurel's avatar
Aurel committed
461 462 463 464
    subscription._getSyncMLData(
      syncml_response=syncml_response,
      id_list=id_list,
      )
465

Aurel's avatar
Aurel committed
466 467 468
    if is_final_message:
      # Notify that all modifications were sent
      syncml_response.addFinal()
469

Aurel's avatar
Aurel committed
470 471 472 473 474
    # Send the message in activity to prevent recomputing data in case of
    # transport failure
    # activate_kw["group_method_id"] = None
    # activate_kw["group_method_cost"] = .05
    subscription.activate(**activate_kw).sendMessage(xml=str(syncml_response))
475 476


Aurel's avatar
Aurel committed
477
InitializeClass(SynchronizationTool)