Commit ee2edadb authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: move message serialization code to Message class

Later, we might want to do more processing after loading, or before dumping,
accessing private Message data.
parent e47f2923
......@@ -26,7 +26,7 @@
#
##############################################################################
import cPickle, sys
import sys
from hashlib import sha1
from DateTime import DateTime
from zLOG import LOG, WARNING, ERROR
......@@ -202,14 +202,6 @@ class Queue(object):
def flush(self, activity_tool, object, **kw):
pass
def loadMessage(self, s, **kw):
m = cPickle.load(StringIO(s))
m.__dict__.update(kw)
return m
def dumpMessage(self, m):
return cPickle.dumps(m)
def getOrderValidationText(self, message):
# Return an identifier of validators related to ordering.
order_validation_item_list = []
......
......@@ -33,7 +33,7 @@ from Shared.DC.ZRDB.Results import Results
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import (
MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE
from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
......@@ -122,7 +122,7 @@ class SQLBase(Queue):
if type(result) is str: # src__ == 1
return result,
class_name = self.__class__.__name__
return [self.loadMessage(line.message,
return [Message.load(line.message,
activity=class_name,
uid=line.uid,
processing_node=line.processing_node,
......@@ -221,7 +221,7 @@ class SQLBase(Queue):
# do not merge anything
def load(line):
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
m = Message.load(line.message, uid=uid, line=line)
return m, uid, ()
return load
......@@ -546,6 +546,6 @@ class SQLBase(Queue):
**({'method_id': method_id} if method_id else {})):
uid_list.append(line.uid)
if invoke:
invoke(self.loadMessage(line.message, uid=line.uid, line=line))
invoke(Message.load(line.message, uid=line.uid, line=line))
if uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
......@@ -26,7 +26,7 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Products.CMFActivity.ActivityTool import Message, registerActivity
import sys
#from time import time
from SQLBase import SQLBase, sort_message_key
......@@ -73,7 +73,7 @@ class SQLDict(SQLBase):
# schema, and much code can be merged into SQLBase.
order_validation_text_list.append(x)
processing_node_list.append(0 if x == 'none' else -1)
dumped_message_list = map(self.dumpMessage, message_list)
dumped_message_list = map(Message.dump, message_list)
# The uid_list also is store in the ZODB
uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
id_generator='uid', id_group='portal_activity',
......@@ -128,7 +128,7 @@ class SQLDict(SQLBase):
uid = line.uid
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
m = self.loadMessage(line.message, uid=uid, line=line)
m = Message.load(line.message, uid=uid, line=line)
merge_parent = m.activity_kw.get('merge_parent')
try:
if merge_parent:
......@@ -154,7 +154,7 @@ class SQLDict(SQLBase):
line = result[0]
key = line.path, method_id
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
m = Message.load(line.message, uid=uid, line=line)
# return unreserved similar children
result = activity_tool.SQLDict_selectChildMessageList(
path=line.path,
......@@ -208,7 +208,7 @@ class SQLDict(SQLBase):
if dumpMessageList is not None:
result = dumpMessageList()
for line in result:
m = self.loadMessage(line.message, uid=line.uid, line=line)
m = Message.load(line.message, uid=line.uid, line=line)
message_list.append(m)
return message_list
......@@ -229,7 +229,7 @@ class SQLDict(SQLBase):
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = self.loadMessage(line.message, uid=line.uid, line=line)
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = line.order_validation_text
self.getExecutableMessageList(activity_tool, message, message_dict,
......@@ -307,7 +307,7 @@ class SQLDict(SQLBase):
serialization_tag=serialization_tag)
message_list = []
for line in result:
m = self.loadMessage(line.message,
m = Message.load(line.message,
line=line,
uid=line.uid,
date=line.date,
......
......@@ -26,7 +26,7 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Products.CMFActivity.ActivityTool import Message, registerActivity
from ZODB.POSException import ConflictError
from SQLBase import SQLBase, sort_message_key
from zExceptions import ExceptionFormatter
......@@ -71,7 +71,7 @@ class SQLQueue(SQLBase):
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
dumped_message_list = map(self.dumpMessage, message_list)
dumped_message_list = map(Message.dump, message_list)
activity_tool.SQLQueue_writeMessageList(
uid_list=uid_list,
path_list=path_list,
......@@ -127,7 +127,7 @@ class SQLQueue(SQLBase):
if dumpMessageList is not None:
result = dumpMessageList()
for line in result:
m = self.loadMessage(line.message, uid=line.uid, line=line)
m = Message.load(line.message, uid=line.uid, line=line)
message_list.append(m)
return message_list
......@@ -148,7 +148,7 @@ class SQLQueue(SQLBase):
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = self.loadMessage(line.message, uid=line.uid, line=line)
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
......@@ -202,7 +202,7 @@ class SQLQueue(SQLBase):
serialization_tag=serialization_tag)
message_list = []
for line in result:
m = self.loadMessage(line.message,
m = Message.load(line.message,
line=line,
uid=line.uid,
date=line.date,
......
......@@ -32,7 +32,7 @@ import threading
import sys
from types import StringType
import re
from cPickle import dumps, loads
from Products.CMFCore import permissions as CMFCorePermissions
from Products.ERP5Type.Core.Folder import Folder
from Products.CMFActivity.ActiveResult import ActiveResult
......@@ -214,6 +214,14 @@ class Message(BaseMessage):
request.environ['HTTP_ACCEPT_LANGUAGE']
self.request_info['_script'] = list(request._script)
@staticmethod
def load(s, **kw):
self = loads(s)
self.__dict__.update(kw)
return self
dump = dumps
def getGroupId(self):
get = self.activity_kw.get
group_method_id = get('group_method_id', '')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment