Commit e1549361 authored by Boxiang Sun's avatar Boxiang Sun Committed by Vincent Pelletier

CMFActivity: Use a random value for activity uids

Sequential number generators stored in a fixed-size format eventually run
out of values. But activity queues only care about what activities are
currently present: any uid can be reused as soon as it is available.
So stop using a sequential id generator for activity uids, and instead use
random values.

Vincent Pelletier:
- Commit message.
- Minor formatting changes.
- Do probability computations, and increase activity uid storage size to
  64bits integers, up from 32. Table schema migration happens on first
  activity node which starts on upgraded code.
- Apply to SQLJobLib too.
parent d0472bc2
...@@ -28,6 +28,9 @@ ...@@ -28,6 +28,9 @@
import sys import sys
import transaction import transaction
from random import getrandbits
import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
from DateTime import DateTime from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results from Shared.DC.ZRDB.Results import Results
from Shared.DC.ZRDB.DA import DatabaseError from Shared.DC.ZRDB.DA import DatabaseError
...@@ -47,6 +50,19 @@ READ_MESSAGE_LIMIT = 1000 ...@@ -47,6 +50,19 @@ READ_MESSAGE_LIMIT = 1000
# TODO: Limit by size in bytes instead of number of rows. # TODO: Limit by size in bytes instead of number of rows.
MAX_MESSAGE_LIST_SIZE = 100 MAX_MESSAGE_LIST_SIZE = 100
INVOKE_ERROR_STATE = -2 INVOKE_ERROR_STATE = -2
# Activity uids are stored as 64 bits unsigned integers.
# No need to depend on a database that supports unsigned integers.
# Numbers are far big enough without using the MSb. Assuming a busy activity
# table having one million activities, the probability of triggering a conflict
# when inserting one activity with 64 bits uid is 0.5e-13. With 63 bits it
# increases to 1e-13, which is still very low.
UID_SAFE_BITSIZE = 63
# Inserting an activity batch of 100 activities among one million existing
# activities has a probability of failing of 1e-11. While it should be low
# enough, retries can help lower that. Try 10 times, which should be short
# enough while yielding one order of magnitude collision probability
# improvement.
UID_ALLOCATION_TRY_COUNT = 10
def sort_message_key(message): def sort_message_key(message):
# same sort key as in SQLBase.getMessageList # same sort key as in SQLBase.getMessageList
...@@ -126,8 +142,6 @@ class SQLBase(Queue): ...@@ -126,8 +142,6 @@ class SQLBase(Queue):
portal = activity_tool.getPortalObject() portal = activity_tool.getPortalObject()
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE): for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE] message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
uid_list = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=len(message_list), id_generator='uid')
path_list = ['/'.join(m.object_path) for m in message_list] path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list] active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list] method_id_list = [m.method_id for m in message_list]
...@@ -141,19 +155,31 @@ class SQLBase(Queue): ...@@ -141,19 +155,31 @@ class SQLBase(Queue):
for m in message_list: for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m) m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1) processing_node_list.append(0 if x == 'none' else -1)
portal.SQLBase_writeMessageList( for _ in xrange(UID_ALLOCATION_TRY_COUNT):
table=self.sql_table, try:
uid_list=uid_list, portal.SQLBase_writeMessageList(
path_list=path_list, table=self.sql_table,
active_process_uid_list=active_process_uid_list, uid_list=[
method_id_list=method_id_list, getrandbits(UID_SAFE_BITSIZE)
priority_list=priority_list, for _ in xrange(len(message_list))
message_list=map(Message.dump, message_list), ],
group_method_id_list=group_method_id_list, path_list=path_list,
date_list=date_list, active_process_uid_list=active_process_uid_list,
tag_list=tag_list, method_id_list=method_id_list,
processing_node_list=processing_node_list, priority_list=priority_list,
serialization_tag_list=serialization_tag_list) message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
serialization_tag_list=serialization_tag_list)
except MySQLdb.IntegrityError, (code, _):
if code != DUP_ENTRY:
raise
else:
break
else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
def getNow(self, context): def getNow(self, context):
""" """
......
...@@ -26,10 +26,15 @@ ...@@ -26,10 +26,15 @@
# #
############################################################################## ##############################################################################
from random import getrandbits
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from SQLBase import SQLBase, sort_message_key, MAX_MESSAGE_LIST_SIZE import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
from SQLBase import (
SQLBase, sort_message_key, MAX_MESSAGE_LIST_SIZE,
UID_SAFE_BITSIZE, UID_ALLOCATION_TRY_COUNT,
)
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
from SQLDict import SQLDict from SQLDict import SQLDict
class SQLJoblib(SQLDict): class SQLJoblib(SQLDict):
...@@ -69,8 +74,6 @@ class SQLJoblib(SQLDict): ...@@ -69,8 +74,6 @@ class SQLJoblib(SQLDict):
portal = activity_tool.getPortalObject() portal = activity_tool.getPortalObject()
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE): for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE] message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
uid_list = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=len(message_list), id_generator='uid')
path_list = ['/'.join(m.object_path) for m in message_list] path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list] active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list] method_id_list = [m.method_id for m in message_list]
...@@ -85,19 +88,31 @@ class SQLJoblib(SQLDict): ...@@ -85,19 +88,31 @@ class SQLJoblib(SQLDict):
for m in message_list: for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m) m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1) processing_node_list.append(0 if x == 'none' else -1)
portal.SQLJoblib_writeMessage( for _ in xrange(UID_ALLOCATION_TRY_COUNT):
uid_list=uid_list, try:
path_list=path_list, portal.SQLJoblib_writeMessage(
active_process_uid_list=active_process_uid_list, uid_list=[
method_id_list=method_id_list, getrandbits(UID_SAFE_BITSIZE)
priority_list=priority_list, for _ in xrange(len(message_list))
message_list=map(Message.dump, message_list), ],
group_method_id_list=group_method_id_list, path_list=path_list,
date_list=date_list, active_process_uid_list=active_process_uid_list,
tag_list=tag_list, method_id_list=method_id_list,
processing_node_list=processing_node_list, priority_list=priority_list,
signature_list=signature_list, message_list=map(Message.dump, message_list),
serialization_tag_list=serialization_tag_list) group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
signature_list=signature_list,
serialization_tag_list=serialization_tag_list)
except MySQLdb.IntegrityError, (code, _):
if code != DUP_ENTRY:
raise
else:
break
else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
def getProcessableMessageLoader(self, activity_tool, processing_node): def getProcessableMessageLoader(self, activity_tool, processing_node):
path_and_method_id_dict = {} path_and_method_id_dict = {}
......
...@@ -9,7 +9,7 @@ class_file: ...@@ -9,7 +9,7 @@ class_file:
</dtml-comment> </dtml-comment>
<params>table</params> <params>table</params>
CREATE TABLE <dtml-var table> ( CREATE TABLE <dtml-var table> (
`uid` INT UNSIGNED NOT NULL, `uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME NOT NULL, `date` DATETIME NOT NULL,
`path` VARCHAR(255) NOT NULL, `path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL, `active_process_uid` INT UNSIGNED NULL,
......
...@@ -9,7 +9,7 @@ class_file: ...@@ -9,7 +9,7 @@ class_file:
</dtml-comment> </dtml-comment>
<params></params> <params></params>
CREATE TABLE message_job ( CREATE TABLE message_job (
`uid` INT UNSIGNED NOT NULL, `uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME NOT NULL, `date` DATETIME NOT NULL,
`path` VARCHAR(255) NOT NULL, `path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL, `active_process_uid` INT UNSIGNED NULL,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment