diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py index 68975af08a6a31aeafde0dd8565918752c33261f..289f4b3b991c64fe279a2766ba8bb29953812f72 100644 --- a/product/CMFActivity/Activity/SQLBase.py +++ b/product/CMFActivity/Activity/SQLBase.py @@ -42,6 +42,9 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import ( from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH from Products.CMFActivity.Errors import ActivityFlushError +# TODO: Limit by size in bytes instead of number of rows. +MAX_MESSAGE_LIST_SIZE = 100 + def sort_message_key(message): # same sort key as in SQLBase.getMessageList return message.line.priority, message.line.date, message.uid @@ -95,8 +98,7 @@ class SQLBase(Queue): def initialize(self, activity_tool, clear): folder = activity_tool.getPortalObject().portal_skins.activity try: - createMessageTable = getattr(folder, - self.__class__.__name__ + '_createMessageTable') + createMessageTable = folder.SQLBase_createMessageTable except AttributeError: return if clear: @@ -105,7 +107,8 @@ class SQLBase(Queue): column_list = [] try: src = createMessageTable._upgradeSchema(added_list=column_list, - modified_list=column_list) + modified_list=column_list, + table=self.sql_table) except ProgrammingError, e: if e[0] != NO_SUCH_TABLE: raise @@ -118,7 +121,41 @@ class SQLBase(Queue): LOG('CMFActivity', INFO, "%r table upgraded\n%s" % (self.sql_table, src)) return - createMessageTable() + createMessageTable(table=self.sql_table) + + def prepareQueueMessageList(self, activity_tool, message_list): + registered_message_list = [m for m in message_list if m.is_registered] + portal = activity_tool.getPortalObject() + for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE): + message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE] + uid_list = portal.portal_ids.generateNewIdList(self.uid_group, + id_count=len(message_list), id_generator='uid') + path_list = ['/'.join(m.object_path) for m in message_list] + active_process_uid_list = [m.active_process_uid for m in message_list] + method_id_list = [m.method_id for m in message_list] + priority_list = [m.activity_kw.get('priority', 1) for m in message_list] + date_list = [m.activity_kw.get('at_date') for m in message_list] + group_method_id_list = [m.getGroupId() for m in message_list] + tag_list = [m.activity_kw.get('tag', '') for m in message_list] + serialization_tag_list = [m.activity_kw.get('serialization_tag', '') + for m in message_list] + processing_node_list = [] + for m in message_list: + m.order_validation_text = x = self.getOrderValidationText(m) + processing_node_list.append(0 if x == 'none' else -1) + portal.SQLBase_writeMessageList( + table=self.sql_table, + uid_list=uid_list, + path_list=path_list, + active_process_uid_list=active_process_uid_list, + method_id_list=method_id_list, + priority_list=priority_list, + message_list=map(Message.dump, message_list), + group_method_id_list=group_method_id_list, + date_list=date_list, + tag_list=tag_list, + processing_node_list=processing_node_list, + serialization_tag_list=serialization_tag_list) def getNow(self, context): """ diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 559e1ba3359e1e1dcbb62b68bff07b53d0dfb7f4..0d930dd8d4cd60174cfbcb56e5d5ab82c09e0124 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -40,8 +40,6 @@ MAX_VALIDATED_LIMIT = 1000 # Read up to this number of messages to validate. READ_MESSAGE_LIMIT = 1000 -MAX_MESSAGE_LIST_SIZE = 100 - class SQLDict(SQLBase): """ A simple OOBTree based queue. It should be compatible with transactions @@ -49,48 +47,7 @@ class SQLDict(SQLBase): because use of OOBTree. """ sql_table = 'message' - - # Transaction commit methods - def prepareQueueMessageList(self, activity_tool, message_list): - registered_message_list = [m for m in message_list if m.is_registered] - for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE): - message_list = registered_message_list[i:i + MAX_MESSAGE_LIST_SIZE] - path_list = ['/'.join(m.object_path) for m in message_list] - active_process_uid_list = [m.active_process_uid for m in message_list] - method_id_list = [m.method_id for m in message_list] - priority_list = [m.activity_kw.get('priority', 1) for m in message_list] - date_list = [m.activity_kw.get('at_date') for m in message_list] - group_method_id_list = [m.getGroupId() for m in message_list] - tag_list = [m.activity_kw.get('tag', '') for m in message_list] - serialization_tag_list = [m.activity_kw.get('serialization_tag', '') - for m in message_list] - order_validation_text_list = [] - processing_node_list = [] - for m in message_list: - m.order_validation_text = x = self.getOrderValidationText(m) - # BBB: 'order_validation_text' SQL column is now useless. - # If we remove it, 'message' & 'message_queue' can have the same - # schema, and much code can be merged into SQLBase. - order_validation_text_list.append(x) - processing_node_list.append(0 if x == 'none' else -1) - dumped_message_list = map(Message.dump, message_list) - # The uid_list also is store in the ZODB - uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList( - id_generator='uid', id_group='portal_activity', - id_count=len(message_list)) - activity_tool.SQLDict_writeMessageList( - uid_list=uid_list, - path_list=path_list, - active_process_uid_list=active_process_uid_list, - method_id_list=method_id_list, - priority_list=priority_list, - message_list=dumped_message_list, - date_list=date_list, - group_method_id_list=group_method_id_list, - tag_list=tag_list, - serialization_tag_list=serialization_tag_list, - processing_node_list=processing_node_list, - order_validation_text_list=order_validation_text_list) + uid_group = 'portal_activity' def generateMessageUID(self, m): return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id')) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index 2ee52a2006fd8d81e4a08be8470579f8ab2c37bb..5f02c263baf0dda9995dcf963541316c3dad7382 100644 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -38,8 +38,6 @@ MAX_VALIDATED_LIMIT = 1000 # Read this many messages to validate. READ_MESSAGE_LIMIT = 1000 -MAX_MESSAGE_LIST_SIZE = 100 - class SQLQueue(SQLBase): """ A simple OOBTree based queue. It should be compatible with transactions @@ -47,41 +45,7 @@ class SQLQueue(SQLBase): because use of OOBTree. """ sql_table = 'message_queue' - - def prepareQueueMessageList(self, activity_tool, message_list): - registered_message_list = [m for m in message_list if m.is_registered] - for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE): - message_list = registered_message_list[i:i + MAX_MESSAGE_LIST_SIZE] - # The uid_list also is store in the ZODB - uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList( - id_generator='uid', id_group='portal_activity_queue', - id_count=len(message_list)) - path_list = ['/'.join(m.object_path) for m in message_list] - active_process_uid_list = [m.active_process_uid for m in message_list] - method_id_list = [m.method_id for m in message_list] - priority_list = [m.activity_kw.get('priority', 1) for m in message_list] - date_list = [m.activity_kw.get('at_date') for m in message_list] - group_method_id_list = [m.getGroupId() for m in message_list] - tag_list = [m.activity_kw.get('tag', '') for m in message_list] - serialization_tag_list = [m.activity_kw.get('serialization_tag', '') - for m in message_list] - processing_node_list = [] - for m in message_list: - m.order_validation_text = x = self.getOrderValidationText(m) - processing_node_list.append(0 if x == 'none' else -1) - dumped_message_list = map(Message.dump, message_list) - activity_tool.SQLQueue_writeMessageList( - uid_list=uid_list, - path_list=path_list, - active_process_uid_list=active_process_uid_list, - method_id_list=method_id_list, - priority_list=priority_list, - message_list=dumped_message_list, - group_method_id_list=group_method_id_list, - date_list=date_list, - tag_list=tag_list, - processing_node_list=processing_node_list, - serialization_tag_list=serialization_tag_list) + uid_group = 'portal_activity_queue' def distribute(self, activity_tool, node_count): offset = 0 diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 83529531f601c53ac14dc60e713a869556e4c5a1..c02be6f28b98ea32e4cb063df6b22374a65af68a 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -583,7 +583,7 @@ class ActivityTool (Folder, UniqueObject): distributingNode = '' _nodes = () activity_creation_trace = False - activity_tracking = False + activity_tracking = True activity_timing_log = False cancel_and_invoke_links_hidden = False diff --git a/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLBase_createMessageTable.zsql similarity index 95% rename from product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql rename to product/CMFActivity/skins/activity/SQLBase_createMessageTable.zsql index 09e733bdc57af076b0cd5cfccdf45ad1cf502f9f..94410430d607756bf59f31823cc68266dfb778ea 100644 --- a/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql +++ b/product/CMFActivity/skins/activity/SQLBase_createMessageTable.zsql @@ -7,8 +7,8 @@ cache_time:0 class_name: class_file: </dtml-comment> -<params></params> -CREATE TABLE `message_queue` ( +<params>table</params> +CREATE TABLE <dtml-var table> ( `uid` INT UNSIGNED NOT NULL, `date` DATETIME NOT NULL, `path` VARCHAR(255) NOT NULL, diff --git a/product/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql b/product/CMFActivity/skins/activity/SQLBase_writeMessageList.zsql similarity index 96% rename from product/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql rename to product/CMFActivity/skins/activity/SQLBase_writeMessageList.zsql index b1044a014a50ca7044d26f5804a61d1c54a94c2d..ea4880ad84a777dc310919ff2cf41163227eec6f 100644 --- a/product/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql +++ b/product/CMFActivity/skins/activity/SQLBase_writeMessageList.zsql @@ -7,7 +7,8 @@ cache_time:0 class_name: class_file: </dtml-comment> -<params>uid_list +<params>table +uid_list path_list active_process_uid_list method_id_list @@ -19,7 +20,7 @@ group_method_id_list tag_list serialization_tag_list </params> -INSERT INTO message_queue +INSERT INTO <dtml-var table> (uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message) VALUES <dtml-in prefix="loop" expr="_.range(_.len(path_list))"> diff --git a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql deleted file mode 100644 index cceab7fb628ee17265250d0fdd462e41fc9f2475..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql +++ /dev/null @@ -1,38 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1000 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params></params> -CREATE TABLE `message` ( - `uid` INT UNSIGNED NOT NULL, - `date` DATETIME NOT NULL, - `path` VARCHAR(255) NOT NULL, - `active_process_uid` INT UNSIGNED NULL, - `method_id` VARCHAR(255) NOT NULL, - `processing_node` SMALLINT NOT NULL DEFAULT -1, - `processing` TINYINT NOT NULL DEFAULT 0, - `processing_date` DATETIME, - `priority` TINYINT NOT NULL DEFAULT 0, - `group_method_id` VARCHAR(255) NOT NULL DEFAULT '', - `tag` VARCHAR(255) NOT NULL, - `serialization_tag` VARCHAR(255) NOT NULL, - `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, - `order_validation_text` VARCHAR(255) NOT NULL, - `message` LONGBLOB NOT NULL, - PRIMARY KEY (`uid`), - KEY (`path`), - KEY (`active_process_uid`), - KEY (`method_id`), - KEY `processing_node_processing` (`processing_node`, `processing`), - KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`), - KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`), - KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`), - KEY (`priority`), - KEY (`tag`), - KEY (`order_validation_text`) -) ENGINE=InnoDB diff --git a/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql deleted file mode 100644 index e61558fcf9162e10c6081d0d56135a86e6148bc3..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql +++ /dev/null @@ -1,42 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1000 -max_cache:100 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>uid_list -path_list -active_process_uid_list -method_id_list -message_list -priority_list -date_list -processing_node_list -group_method_id_list -tag_list -serialization_tag_list -order_validation_text_list</params> -INSERT INTO message -(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, order_validation_text, message) -VALUES -<dtml-in prefix="loop" expr="_.range(_.len(path_list))"> -<dtml-if sequence-start><dtml-else>,</dtml-if> -( - <dtml-sqlvar expr="uid_list[loop_item]" type="int">, - <dtml-sqlvar expr="path_list[loop_item]" type="string">, - <dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>, - <dtml-if expr="date_list is not None"><dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else>UTC_TIMESTAMP()</dtml-if><dtml-else>UTC_TIMESTAMP()</dtml-if>, - <dtml-sqlvar expr="method_id_list[loop_item]" type="string">, - <dtml-sqlvar expr="processing_node_list[loop_item]" type="int">, - 0, - <dtml-sqlvar expr="priority_list[loop_item]" type="int">, - <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">, - <dtml-sqlvar expr="tag_list[loop_item]" type="string">, - <dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">, - <dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">, - <dtml-sqlvar expr="message_list[loop_item]" type="string"> -) -</dtml-in> diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index 6de3eaec022d1c1cfacb4184c65eecc7c5c1d15b..13c99f4a980cc8eb7cf072f652f038409ac49491 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -2682,56 +2682,29 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): DB.query = DB.original_query del DB.original_query - def test_MAX_MESSAGE_LIST_SIZE_SQLQueue(self): - from Products.CMFActivity.Activity import SQLQueue - old_MAX_MESSAGE_LIST_SIZE = SQLQueue.MAX_MESSAGE_LIST_SIZE - SQLQueue.MAX_MESSAGE_LIST_SIZE = 3 - - try: - global call_count - call_count = 0 - def dummy_counter(self): - global call_count - call_count += 1 - - Organisation.dummy_counter = dummy_counter - o = self.portal.organisation_module.newContent(portal_type='Organisation',) - - for i in range(10): - o.activate(activity='SQLQueue').dummy_counter() - - self.flushAllActivities() - self.assertEqual(call_count, 10) - finally: - SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE - del Organisation.dummy_counter - - def test_MAX_MESSAGE_LIST_SIZE_SQLDict(self): - from Products.CMFActivity.Activity import SQLDict - old_MAX_MESSAGE_LIST_SIZE = SQLDict.MAX_MESSAGE_LIST_SIZE - SQLDict.MAX_MESSAGE_LIST_SIZE = 3 - + def test_MAX_MESSAGE_LIST_SIZE(self): + from Products.CMFActivity.Activity import SQLBase + MAX_MESSAGE_LIST_SIZE = SQLBase.MAX_MESSAGE_LIST_SIZE try: - global call_count - call_count = 0 - def dummy_counter(self): - global call_count - call_count += 1 - - o = self.portal.organisation_module.newContent(portal_type='Organisation',) + SQLBase.MAX_MESSAGE_LIST_SIZE = 3 + def dummy_counter(o): + self.__call_count += 1 + o = self.portal.organisation_module.newContent(portal_type='Organisation') - for i in range(10): - method_name = 'dummy_counter_%s' % i - setattr(Organisation, method_name, dummy_counter) - getattr(o.activate(activity='SQLDict'), method_name)() - - self.flushAllActivities() - self.assertEqual(call_count, 10) + for activity in "SQLDict", "SQLQueue": + self.__call_count = 0 + try: + for i in xrange(10): + method_name = 'dummy_counter_%s' % i + getattr(o.activate(activity=activity), method_name)() + setattr(Organisation, method_name, dummy_counter) + self.flushAllActivities() + finally: + for i in xrange(10): + delattr(Organisation, 'dummy_counter_%s' % i) + self.assertEqual(self.__call_count, 10) finally: - SQLDict.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE - for i in range(10): - method_name = 'dummy_counter_%s' % i - delattr(Organisation, method_name) + SQLBase.MAX_MESSAGE_LIST_SIZE = MAX_MESSAGE_LIST_SIZE def test_115_TestSerializationTagSQLDictPreventsParallelExecution(self, quiet=0, run=run_all_test): """ @@ -3032,10 +3005,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): When an activity creates several activities, make sure that all newly created activities are not commited if there is ZODB Conflict error """ - from Products.CMFActivity.Activity import SQLQueue - old_MAX_MESSAGE_LIST_SIZE = SQLQueue.MAX_MESSAGE_LIST_SIZE - SQLQueue.MAX_MESSAGE_LIST_SIZE = 1 + from Products.CMFActivity.Activity import SQLBase + MAX_MESSAGE_LIST_SIZE = SQLBase.MAX_MESSAGE_LIST_SIZE try: + SQLBase.MAX_MESSAGE_LIST_SIZE = 1 activity_tool = self.getPortal().portal_activities def doSomething(self): self.serialize() @@ -3057,7 +3030,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): self.assertEqual(['doSomething'],[x.method_id for x in message_list]) activity_tool.manageClearActivities() finally: - SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE + SQLBase.MAX_MESSAGE_LIST_SIZE = MAX_MESSAGE_LIST_SIZE def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self): activity_tool = self.portal.portal_activities diff --git a/product/ERP5Type/patches/DA.py b/product/ERP5Type/patches/DA.py index 3202b078ead12caaf2824f07fb90f3b9799fd57f..4c1f9f52186e587b667f2b2cbd00d007ffe6d9af 100644 --- a/product/ERP5Type/patches/DA.py +++ b/product/ERP5Type/patches/DA.py @@ -276,9 +276,9 @@ _create_search = re.compile(r'\bCREATE\s+TABLE\s+(`?)(\w+)\1\s+', re.I).search _key_search = re.compile(r'\bKEY\s+(`[^`]+`)\s+(.+)').search def DA_upgradeSchema(self, connection_id=None, added_list=None, - modified_list=None, src__=0): + modified_list=None, src__=0, **kw): query = self.getPortalObject()[connection_id or self.connection_id]().query - src = self(src__=1) + src = self(src__=1, **kw) m = _create_search(src) if m is None: return