Commit 4fa53cbe authored by Vincent Pelletier's avatar Vincent Pelletier

Add support for serialization_tag.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19431 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent b86a8173
...@@ -77,6 +77,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -77,6 +77,7 @@ class SQLDict(RAMDict, SQLBase):
group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')]) group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
for message in registered_message_list] for message in registered_message_list]
tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list] tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
serialization_tag_list = [message.activity_kw.get('serialization_tag', '') for message in registered_message_list]
order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list] order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity',
id_count=len(registered_message_list), store=0) id_count=len(registered_message_list), store=0)
...@@ -88,6 +89,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -88,6 +89,7 @@ class SQLDict(RAMDict, SQLBase):
date_list = date_list, date_list = date_list,
group_method_id_list = group_method_id_list, group_method_id_list = group_method_id_list,
tag_list = tag_list, tag_list = tag_list,
serialization_tag_list = serialization_tag_list,
order_validation_text_list = order_validation_text_list) order_validation_text_list = order_validation_text_list)
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
...@@ -645,7 +647,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -645,7 +647,8 @@ class SQLDict(RAMDict, SQLBase):
#LOG('SQLDict.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset - READ_MESSAGE_LIMIT + len(result), validated_count)) #LOG('SQLDict.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset - READ_MESSAGE_LIMIT + len(result), validated_count))
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None): def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
serialization_tag=None):
if isinstance(method_id, str): if isinstance(method_id, str):
method_id = [method_id] method_id = [method_id]
if isinstance(path, str): if isinstance(path, str):
...@@ -653,12 +656,13 @@ class SQLDict(RAMDict, SQLBase): ...@@ -653,12 +656,13 @@ class SQLDict(RAMDict, SQLBase):
if isinstance(tag, str): if isinstance(tag, str):
tag = [tag] tag = [tag]
if method_id or message_uid or path or tag: if method_id or message_uid or path or tag or serialization_tag:
validateMessageList = activity_tool.SQLDict_validateMessageList validateMessageList = activity_tool.SQLDict_validateMessageList
result = validateMessageList(method_id=method_id, result = validateMessageList(method_id=method_id,
message_uid=message_uid, message_uid=message_uid,
path=path, path=path,
tag=tag) tag=tag,
serialization_tag=serialization_tag)
message_list = [] message_list = []
for line in result: for line in result:
m = self.loadMessage(line.message, m = self.loadMessage(line.message,
...@@ -698,6 +702,9 @@ class SQLDict(RAMDict, SQLBase): ...@@ -698,6 +702,9 @@ class SQLDict(RAMDict, SQLBase):
return [] return []
return self._validate(activity_tool, tag=value[0], method_id=value[1]) return self._validate(activity_tool, tag=value[0], method_id=value[1])
def _validate_serialization_tag(self, activity_tool, message, value):
return self._validate(activity_tool, serialization_tag=value)
def countMessage(self, activity_tool, tag=None, path=None, def countMessage(self, activity_tool, tag=None, path=None,
method_id=None, message_uid=None, **kw): method_id=None, message_uid=None, **kw):
"""Return the number of messages which match the given parameters. """Return the number of messages which match the given parameters.
......
...@@ -72,13 +72,15 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -72,13 +72,15 @@ class SQLQueue(RAMQueue, SQLBase):
if date is None: if date is None:
date = self.getNow(activity_tool) date = self.getNow(activity_tool)
tag = m.activity_kw.get('tag', '') tag = m.activity_kw.get('tag', '')
serialization_tag = m.activity_kw.get('serialization_tag', '')
activity_tool.SQLQueue_writeMessage(uid=uid, activity_tool.SQLQueue_writeMessage(uid=uid,
path=path, path=path,
method_id=method_id, method_id=method_id,
priority=priority, priority=priority,
message=self.dumpMessage(m), message=self.dumpMessage(m),
date=date, date=date,
tag=tag) tag=tag,
serialization_tag=serialization_tag)
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction # Erase all messages in a single transaction
...@@ -488,7 +490,8 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -488,7 +490,8 @@ class SQLQueue(RAMQueue, SQLBase):
#LOG('SQLQueue.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset + len(result), validated_count)) #LOG('SQLQueue.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset + len(result), validated_count))
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None): def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
serialization_tag=None):
if isinstance(method_id, str): if isinstance(method_id, str):
method_id = [method_id] method_id = [method_id]
if isinstance(path, str): if isinstance(path, str):
...@@ -496,12 +499,13 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -496,12 +499,13 @@ class SQLQueue(RAMQueue, SQLBase):
if isinstance(tag, str): if isinstance(tag, str):
tag = [tag] tag = [tag]
if method_id or message_uid or path or tag: if method_id or message_uid or path or tag or serialization_tag:
validateMessageList = activity_tool.SQLQueue_validateMessageList validateMessageList = activity_tool.SQLQueue_validateMessageList
result = validateMessageList(method_id=method_id, result = validateMessageList(method_id=method_id,
message_uid=message_uid, message_uid=message_uid,
path=path, path=path,
tag=tag) tag=tag,
serialization_tag=serialization_tag)
message_list = [] message_list = []
for line in result: for line in result:
m = self.loadMessage(line.message, m = self.loadMessage(line.message,
...@@ -540,6 +544,9 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -540,6 +544,9 @@ class SQLQueue(RAMQueue, SQLBase):
return [] return []
return self._validate(activity_tool, tag=value[0], method_id=value[1]) return self._validate(activity_tool, tag=value[0], method_id=value[1])
def _validate_serialization_tag(self, activity_tool, message, value):
return self._validate(activity_tool, serialization_tag=value)
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node = None): def timeShift(self, activity_tool, delay, processing_node = None):
""" """
......
...@@ -19,6 +19,7 @@ CREATE TABLE `message` ( ...@@ -19,6 +19,7 @@ CREATE TABLE `message` (
`priority` TINYINT NOT NULL DEFAULT 0, `priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '', `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL, `tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`order_validation_text` VARCHAR(255) NOT NULL, `order_validation_text` VARCHAR(255) NOT NULL,
`message` LONGBLOB NOT NULL, `message` LONGBLOB NOT NULL,
......
...@@ -12,6 +12,7 @@ message_uid ...@@ -12,6 +12,7 @@ message_uid
path path
tag tag
count count
serialization_tag
</params> </params>
SELECT SELECT
<dtml-if count> <dtml-if count>
...@@ -39,3 +40,7 @@ WHERE ...@@ -39,3 +40,7 @@ WHERE
<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in> <dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
) )
</dtml-if> </dtml-if>
<dtml-if serialization_tag>
AND processing_node > -1
AND serialization_tag LIKE <dtml-sqlvar serialization_tag type="string">
</dtml-if>
...@@ -16,9 +16,10 @@ date_list ...@@ -16,9 +16,10 @@ date_list
processing_node_list processing_node_list
group_method_id_list group_method_id_list
tag_list tag_list
serialization_tag_list
order_validation_text_list</params> order_validation_text_list</params>
INSERT INTO message INSERT INTO message
(uid, path, date, method_id, processing_node, processing, priority, group_method_id, tag, order_validation_text, message) (uid, path, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, order_validation_text, message)
VALUES VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))"> <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if> <dtml-if sequence-start><dtml-else>,</dtml-if>
...@@ -32,6 +33,7 @@ VALUES ...@@ -32,6 +33,7 @@ VALUES
<dtml-sqlvar expr="priority_list[loop_item]" type="int">, <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">, <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">, <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">, <dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string"> <dtml-sqlvar expr="message_list[loop_item]" type="string">
) )
......
...@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` ( ...@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
`processing_date` DATETIME, `processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0, `priority` TINYINT NOT NULL DEFAULT 0,
`tag` VARCHAR(255) NOT NULL, `tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`message` LONGBLOB NOT NULL, `message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
KEY (`path`), KEY (`path`),
......
...@@ -12,6 +12,7 @@ message_uid ...@@ -12,6 +12,7 @@ message_uid
path path
tag tag
count count
serialization_tag
</params> </params>
SELECT SELECT
<dtml-if count> <dtml-if count>
...@@ -39,3 +40,7 @@ WHERE ...@@ -39,3 +40,7 @@ WHERE
<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in> <dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
) )
</dtml-if> </dtml-if>
<dtml-if serialization_tag>
AND processing_node > -1
AND serialization_tag LIKE <dtml-sqlvar serialization_tag type="string">
</dtml-if>
...@@ -14,7 +14,9 @@ message ...@@ -14,7 +14,9 @@ message
priority priority
processing_node processing_node
date date
tag</params> tag
serialization_tag
</params>
INSERT INTO message_queue INSERT INTO message_queue
SET SET
uid = <dtml-sqlvar uid type="int">, uid = <dtml-sqlvar uid type="int">,
...@@ -27,4 +29,5 @@ SET ...@@ -27,4 +29,5 @@ SET
processing = 0, processing = 0,
priority = <dtml-sqlvar priority type="int">, priority = <dtml-sqlvar priority type="int">,
tag = <dtml-sqlvar tag type="string">, tag = <dtml-sqlvar tag type="string">,
serialization_tag = <dtml-sqlvar serialization_tag type="string">,
message = <dtml-sqlvar message type="string"> message = <dtml-sqlvar message type="string">
...@@ -2735,6 +2735,64 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -2735,6 +2735,64 @@ class TestCMFActivity(ERP5TypeTestCase):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CheckActivityRuntimeEnvironment('SQLQueue') self.CheckActivityRuntimeEnvironment('SQLQueue')
def CheckSerializationTag(self, activity):
organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
get_transaction().commit()
self.tic()
activity_tool = self.getActivityTool()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 0)
# First scenario: activate, distribute, activate, distribute
# Create first activity and distribute: it must be distributed
organisation.activate(activity=activity, serialization_tag='1').getTitle()
get_transaction().commit()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 1)
activity_tool.distribute()
result = activity_tool.getMessageList()
self.assertEqual(len([x for x in result if x.processing_node == 0]), 1)
# Create second activity and distribute: it must *NOT* be distributed
organisation.activate(activity=activity, serialization_tag='1').getTitle()
get_transaction().commit()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 2)
activity_tool.distribute()
result = activity_tool.getMessageList()
self.assertEqual(len([x for x in result if x.processing_node == 0]), 1) # Distributed message list len is still 1
self.tic()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 0)
# Second scenario: activate, activate, distribute
# Both messages must be distributed (this is different from regular tags)
organisation.activate(activity=activity, serialization_tag='1').getTitle()
# Use a different method just so that SQLDict doesn't merge both activities prior to insertion.
organisation.activate(activity=activity, serialization_tag='1').getId()
get_transaction().commit()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 2)
activity_tool.distribute()
result = activity_tool.getMessageList()
self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
self.tic()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 0)
def test_106_checkSerializationTagSQLDict(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck serialization tag (SQLDict)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckSerializationTag('SQLDict')
def test_107_checkSerializationTagSQLQueue(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck serialization tag (SQLQueue)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckSerializationTag('SQLQueue')
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment