Commit d64887cb authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: drop DTML completely and use consecutive uids when possible

This moves the remaining DTML queries to Python, dropping the 'activity' skin.

Dealing with conflicts of uids is easier if the inserted uids are consecutive:
now, only 1 random value is generated, as base uid. This also preserves the
order of insertion, which is wanted for performance reasons:
- No more random write in the primary index.
- When modifying several lines of several documents, 1 document being processed
  at a time, we'd like that any grouped activity (usually indexation) follows
  the same order, so that a processing node prefer many lines from a few
  documents instead of mixing lines from too many documents at the same time.
  This is usually better for caches.
parent bcaf44a4
......@@ -33,7 +33,6 @@ import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
from Shared.DC.ZRDB.DA import DatabaseError
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import (
......@@ -70,13 +69,14 @@ def sort_message_key(message):
_DequeueMessageException = Exception()
def render_datetime(x):
return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
sqltest_dict = {}
no_quote_type = int, float, long
def render_datetime(x):
return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]
def _(name, column=None, op="="):
if column is None:
column = name
......@@ -126,22 +126,41 @@ class SQLBase(Queue):
"""
Define a set of common methods for SQL-based storage of activities.
"""
_createMessageTable = 'SQLBase_createMessageTable'
def createTableSQL(self):
return """\
CREATE TABLE %s (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
def initialize(self, activity_tool, clear):
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = getattr(folder, self._createMessageTable)
except AttributeError:
return
db = activity_tool.getSQLConnection()
create = self.createTableSQL()
if clear:
activity_tool.getSQLConnection().query(
"DROP TABLE IF EXISTS " + self.sql_table)
createMessageTable(table=self.sql_table)
db.query("DROP TABLE IF EXISTS " + self.sql_table)
db.query(create)
else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1,
initialize=self._initialize,
table=self.sql_table)
src = db.upgradeSchema(create, create_if_not_exists=1,
initialize=self._initialize)
if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
......@@ -152,48 +171,56 @@ class SQLBase(Queue):
% (self.sql_table, ", ".join(column_list)))
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject()
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
db = activity_tool.getSQLConnection()
quote = db.string_literal
def insert(reset_uid):
values = "),\n(".join(values_list)
del values_list[:]
for _ in xrange(UID_ALLOCATION_TRY_COUNT):
if reset_uid:
reset_uid = False
# Overflow will result into IntegrityError.
db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
try:
portal.SQLBase_writeMessageList(
table=self.sql_table,
uid_list=[
getrandbits(UID_SAFE_BITSIZE)
for _ in xrange(len(message_list))
],
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
serialization_tag_list=serialization_tag_list)
db.query("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node,"
" priority, group_method_id, tag, serialization_tag,"
" message) VALUES\n(%s)" % (self.sql_table, values))
except MySQLdb.IntegrityError, (code, _):
if code != DUP_ENTRY:
raise
reset_uid = True
else:
break
else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
raise RuntimeError("Maximum retry for prepareQueueMessageList reached")
i = 0
reset_uid = True
values_list = []
for m in message_list:
if m.is_registered:
active_process_uid = m.active_process_uid
order_validation_text = m.order_validation_text = \
self.getOrderValidationText(m)
date = m.activity_kw.get('at_date')
values_list.append(','.join((
'@uid+%s' % i,
quote('/'.join(m.object_path)),
'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
quote(m.method_id),
'0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('serialization_tag', '')),
quote(Message.dump(m)))))
i += 1
if not i % MAX_MESSAGE_LIST_SIZE:
insert(reset_uid)
reset_uid = False
if values_list:
insert(reset_uid)
def _getMessageList(self, db, count=1000, src__=0, **kw):
# XXX: Because most columns have NOT NULL constraint, conditions with None
......
......@@ -45,56 +45,89 @@ class SQLJoblib(SQLDict):
sql_table = 'message_job'
uid_group = 'portal_activity_job'
_createMessageTable = 'SQLJoblib_createMessageTable'
def createTableSQL(self):
return """\
CREATE TABLE %s (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BINARY(16) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject()
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
signature_list=[m.activity_kw.get('signature', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
db = activity_tool.getSQLConnection()
quote = db.string_literal
def insert(reset_uid):
values = "),\n(".join(values_list)
del values_list[:]
for _ in xrange(UID_ALLOCATION_TRY_COUNT):
if reset_uid:
reset_uid = False
# Overflow will result into IntegrityError.
db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
try:
portal.SQLJoblib_writeMessage(
uid_list=[
getrandbits(UID_SAFE_BITSIZE)
for _ in xrange(len(message_list))
],
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
signature_list=signature_list,
serialization_tag_list=serialization_tag_list)
db.query("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node,"
" priority, group_method_id, tag, signature, serialization_tag,"
" message) VALUES\n(%s)" % (self.sql_table, values))
except MySQLdb.IntegrityError, (code, _):
if code != DUP_ENTRY:
raise
reset_uid = True
else:
break
else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
raise ValueError("Maximum retry for prepareQueueMessageList reached")
i = 0
reset_uid = True
values_list = []
for m in message_list:
if m.is_registered:
active_process_uid = m.active_process_uid
order_validation_text = m.order_validation_text = \
self.getOrderValidationText(m)
date = m.activity_kw.get('at_date')
values_list.append(','.join((
'@uid+%s' % i,
quote('/'.join(m.object_path)),
'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else render_datetime(date),
quote(m.method_id),
'0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('signature', '')),
quote(m.activity_kw.get('serialization_tag', '')),
quote(Message.dump(m)))))
i += 1
if not i % MAX_MESSAGE_LIST_SIZE:
insert(reset_uid)
reset_uid = False
if values_list:
insert(reset_uid)
def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {}
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table</params>
CREATE TABLE <dtml-var table> (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid_list
path_list
active_process_uid_list
method_id_list
message_list
priority_list
processing_node_list
date_list
group_method_id_list
tag_list
serialization_tag_list
</params>
INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, priority, group_method_id, tag, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
(
<dtml-sqlvar expr="uid_list[loop_item]" type="int">,
<dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime(6)"><dtml-else>UTC_TIMESTAMP(6)</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
)
</dtml-in>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
CREATE TABLE message_job (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BINARY(16) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
uid_list
path_list
active_process_uid_list
method_id_list
message_list
priority_list
processing_node_list
date_list
group_method_id_list
tag_list
signature_list
serialization_tag_list
</params>
INSERT INTO message_job
(uid, path, active_process_uid, date, method_id, processing_node, priority, group_method_id, tag, signature, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
(
<dtml-sqlvar expr="uid_list[loop_item]" type="int">,
<dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime(6)"><dtml-else>UTC_TIMESTAMP(6)</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="signature_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
)
</dtml-in>
......@@ -2203,11 +2203,8 @@ class ERP5Generator(PortalGenerator):
createDirectoryView(ps, reg_key)
def setupDefaultSkins(self, p):
from Products.CMFCore.DirectoryView import addDirectoryViews
from Products.CMFActivity import cmfactivity_globals
ps = p.portal_skins
self.addCMFDefaultDirectoryViews(p)
addDirectoryViews(ps, 'skins', cmfactivity_globals)
ps.manage_addProduct['OFSP'].manage_addFolder(id='external_method')
ps.manage_addProduct['OFSP'].manage_addFolder(id='custom')
# Set the 'custom' layer a high priority, so it remains the first
......@@ -2215,7 +2212,6 @@ class ERP5Generator(PortalGenerator):
ps['custom'].manage_addProperty("business_template_skin_layer_priority", 100.0, "float")
skin_folder_list = [ 'custom'
, 'external_method'
, 'activity'
] + self.CMFDEFAULT_FOLDER_LIST
skin_folders = ', '.join(skin_folder_list)
ps.addSkinSelection( 'View'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment