Commit 3d644bde authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: automatic migration of queues and removal of button to recreate tables

The action to recreate activity tables while preserving existing messages
was unsafe for 2 reasons:
- if any error happened, messages could be lost
- it relied on Message.reactivate

Which this patch, any instance created after commit d881edd1 (Aug 2010) will
upgrade successfully. For older instances, make sure you have no activity left.

For cases where 'ALTER TABLE' would not work, a better way to implement repair
functionality would be:
- one action to backup all messages in ZODB
- and another to restore them
And maybe a security so that during the backup-clear-restore sequence,
activities can't be created nor processed.

If any column is added in the future, it would still be possible to write code
that fills them by inspecting messages.
parent 7c1ccdb0
......@@ -80,15 +80,8 @@ class Queue(object):
#scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']
def __init__(self):
self.is_initialized = 0
def initialize(self, activity_tool):
# This is the only moment when
# we can set some global variables related
# to the ZODB context
if not self.is_initialized:
self.is_initialized = 1
def initialize(self, activity_tool, clear):
pass
def deleteMessage(self, activity_tool, m):
if not getattr(m, 'is_deleted', 0):
......
......@@ -28,6 +28,8 @@
import sys
import transaction
from _mysql_exceptions import ProgrammingError
from MySQLdb.constants.ER import NO_SUCH_TABLE
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
......@@ -90,6 +92,33 @@ class SQLBase(Queue):
"""
Define a set of common methods for SQL-based storage of activities.
"""
def initialize(self, activity_tool, clear):
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = getattr(folder,
self.__class__.__name__ + '_createMessageTable')
except AttributeError:
return
if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table)
else:
column_list = []
try:
src = createMessageTable._upgradeSchema(added_list=column_list,
modified_list=column_list)
except ProgrammingError, e:
if e[0] != NO_SUCH_TABLE:
raise
else:
if column_list and self._getMessageList(activity_tool, count=1):
LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
" The following added columns could not be initialized: %s\n%s"
% (self.sql_table, ", ".join(column_list), src))
elif src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
return
createMessageTable()
def getNow(self, context):
"""
......@@ -163,11 +192,6 @@ class SQLBase(Queue):
return result[0].message_count > 0
return 0
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
return [Message.load(line.message, uid=line.uid, line=line)
for line in activity_tool.SQLBase_dumpMessageList(table=self.sql_table)]
def getPriority(self, activity_tool):
result = activity_tool.SQLBase_getPriority(table=self.sql_table)
if result:
......
......@@ -26,14 +26,14 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import Message, registerActivity
from Products.CMFActivity.ActivityTool import Message
import sys
#from time import time
from SQLBase import SQLBase, sort_message_key
import transaction
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
from zLOG import TRACE, WARNING
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
......@@ -265,5 +265,3 @@ class SQLDict(SQLBase):
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
registerActivity(SQLDict)
......@@ -26,15 +26,13 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import Message, registerActivity
from Products.CMFActivity.ActivityTool import Message
from ZODB.POSException import ConflictError
from SQLBase import SQLBase, sort_message_key
from zExceptions import ExceptionFormatter
import transaction
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
......@@ -135,5 +133,3 @@ class SQLQueue(SQLBase):
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
registerActivity(SQLQueue)
......@@ -83,7 +83,6 @@ from traceback import format_list, extract_stack
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = False
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
is_running_lock = threading.Lock()
......@@ -92,9 +91,6 @@ _server_address = None
ROLE_IDLE = 0
ROLE_PROCESSING = 1
# Activity Registration
activity_dict = {}
# Logging channel definitions
import logging
# Main logging channel
......@@ -136,13 +132,6 @@ def activity_timing_method(method, args, kw):
global_activity_buffer = defaultdict(dict)
from thread import get_ident
def registerActivity(activity):
# Must be rewritten to register
# class and create instance for each activity
#LOG('Init Activity', 0, str(activity.__name__))
activity_instance = activity()
activity_dict[activity.__name__] = activity_instance
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2
......@@ -439,6 +428,14 @@ Named Parameters: %r
def getExecutionState(self):
return self.is_executed
# Activity Registration
def activity_dict():
from Activity import SQLDict, SQLQueue
return {k: getattr(v, k)() for k, v in locals().iteritems()}
activity_dict = activity_dict()
class Method(object):
__slots__ = (
'_portal_activities',
......@@ -624,30 +621,24 @@ class ActivityTool (Folder, UniqueObject):
sql_connection.connection_string)
parent._setObject(connection_id, new_sql_connection)
security.declarePrivate('initialize')
def initialize(self):
global is_initialized
from Activity import SQLQueue, SQLDict
# Initialize each queue
for activity in activity_dict.itervalues():
activity.initialize(self)
self.maybeMigrateConnectionClass()
is_initialized = True
for activity in activity_dict.itervalues():
activity.initialize(self, clear=False)
security.declareProtected(Permissions.manage_properties, 'isSubscribed')
def isSubscribed(self):
"""
return True, if we are subscribed to TimerService.
Otherwise return False.
"""
service = getTimerService(self)
if not service:
LOG('ActivityTool', INFO, 'TimerService not available')
return False
"""
return True, if we are subscribed to TimerService.
Otherwise return False.
"""
service = getTimerService(self)
if service:
path = '/'.join(self.getPhysicalPath())
if path in service.lisSubscriptions():
return True
return False
return path in service.lisSubscriptions()
LOG('ActivityTool', INFO, 'TimerService not available')
return False
security.declareProtected(Permissions.manage_properties, 'subscribe')
def subscribe(self, REQUEST=None, RESPONSE=None):
......@@ -1027,10 +1018,6 @@ class ActivityTool (Folder, UniqueObject):
"""
Distribute load
"""
# Initialize if needed
if not is_initialized:
self.initialize()
# Call distribute on each queue
for activity in activity_dict.itervalues():
activity.distribute(aq_inner(self), node_count)
......@@ -1054,10 +1041,6 @@ class ActivityTool (Folder, UniqueObject):
raise RuntimeError, 'Too many threads'
tic_lock.release()
# Initialize if needed
if not is_initialized:
self.initialize()
inner_self = aq_inner(self)
try:
......@@ -1135,8 +1118,6 @@ class ActivityTool (Folder, UniqueObject):
return buffer
def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw):
if not is_initialized:
self.initialize()
if active_process is None:
active_process_uid = None
elif isinstance(active_process, str):
......@@ -1177,8 +1158,6 @@ class ActivityTool (Folder, UniqueObject):
return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
def flush(self, obj, invoke=0, **kw):
if not is_initialized:
self.initialize()
self.getActivityBuffer()
if isinstance(obj, tuple):
object_path = obj
......@@ -1366,8 +1345,6 @@ class ActivityTool (Folder, UniqueObject):
def newMessage(self, activity, path, active_process,
activity_kw, method_id, *args, **kw):
# Some Security Cheking should be made here XXX
if not is_initialized:
self.initialize()
self.getActivityBuffer()
activity_dict[activity].queueMessage(aq_inner(self),
Message(path, active_process, activity_kw, method_id, args, kw,
......@@ -1427,68 +1404,16 @@ class ActivityTool (Folder, UniqueObject):
security.declareProtected( CMFCorePermissions.ManagePortal,
'manageClearActivities' )
def manageClearActivities(self, keep=1, REQUEST=None):
def manageClearActivities(self, keep=1, RESPONSE=None):
"""
Clear all activities and recreate tables.
Recreate tables, clearing all activities
"""
folder = self.getPortalObject().portal_skins.activity
# Obtain all pending messages.
message_list_dict = {}
if keep:
for activity in activity_dict.itervalues():
if hasattr(activity, 'dumpMessageList'):
try:
message_list_dict[activity.__class__.__name__] =\
activity.dumpMessageList(self)
except ConflictError:
raise
except:
LOG('ActivityTool', WARNING,
'could not dump messages from %s' %
(activity,), error=sys.exc_info())
if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
try:
folder.SQLDict_dropMessageTable()
except ConflictError:
raise
except:
LOG('CMFActivity', WARNING,
'could not drop the message table',
error=sys.exc_info())
folder.SQLDict_createMessageTable()
if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
try:
folder.SQLQueue_dropMessageTable()
except ConflictError:
raise
except:
LOG('CMFActivity', WARNING,
'could not drop the message queue table',
error=sys.exc_info())
folder.SQLQueue_createMessageTable()
# Reactivate the messages.
for activity, message_list in message_list_dict.iteritems():
for m in message_list:
try:
m.reactivate(aq_inner(self), activity=activity)
except ConflictError:
raise
except:
LOG('ActivityTool', WARNING,
'could not reactivate the message %r, %r' %
(m.object_path, m.method_id), error=sys.exc_info())
for activity in activity_dict.itervalues():
activity.initialize(self, clear=True)
if REQUEST is not None:
message = 'Activities%20Cleared'
if keep:
message = 'Tables%20Recreated'
return REQUEST.RESPONSE.redirect(
'%s/manageActivitiesAdvanced?manage_tabs_message=%s' % (
self.absolute_url(), message))
if RESPONSE is not None:
return RESPONSE.redirect(self.absolute_url_path() +
'/manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared')
security.declarePublic('getMessageTempObjectList')
def getMessageTempObjectList(self, **kw):
......@@ -1508,9 +1433,6 @@ class ActivityTool (Folder, UniqueObject):
"""
List messages waiting in queues
"""
# Initialize if needed
if not is_initialized:
self.initialize()
if activity:
return activity_dict[activity].getMessageList(aq_inner(self), **kw)
......@@ -1566,8 +1488,6 @@ class ActivityTool (Folder, UniqueObject):
security.declarePrivate('getDependentMessageList')
def getDependentMessageList(self, message, validator_id, validation_value):
if not is_initialized:
self.initialize()
message_list = []
method_id = "_validate_" + validator_id
for activity in activity_dict.itervalues():
......@@ -1580,8 +1500,6 @@ class ActivityTool (Folder, UniqueObject):
# Required for tests (time shift)
def timeShift(self, delay):
if not is_initialized:
self.initialize()
for activity in activity_dict.itervalues():
activity.timeShift(aq_inner(self), delay)
......
......@@ -75,24 +75,12 @@
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
<p class="form-help">Recreate activity tables, preserving existing messages.</p>
</td>
<td align="right" valign="top">
<form action="&dtml-URL1;">
<input class="form-element" type="submit"
name="manageClearActivities:method" value=" Recreate Tables ">
</form>
</td>
</tr>
<tr>
<td align="left" valign="top">
<p class="form-help">Recreate activity tables, throwing away all contained messages.</p>
</td>
<td align="right" valign="top">
<form action="&dtml-URL1;">
<input type="hidden" name="keep:int" value="0">
<input class="form-element" type="submit"
name="manageClearActivities:method" value=" Clear Activities " style="background-color: #F00">
</form>
......
......@@ -7,5 +7,5 @@ cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
DROP TABLE IF EXISTS message
<params>table</params>
DROP TABLE IF EXISTS <dtml-var table>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
</params>
SELECT * FROM
<dtml-var table>
ORDER BY
uid
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
DROP TABLE IF EXISTS message_queue
......@@ -539,37 +539,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.tic()
self.assertEqual(o.getTitle(), 'last')
def CheckClearActivities(self, activity, activity_count=1):
"""
Check if active objects are held even after clearing the tables.
"""
portal = self.getPortal()
organisation_module = self.getOrganisationModule()
if not organisation_module.hasContent(self.company_id):
organisation_module.newContent(id=self.company_id)
self.tic()
def check(o):
message_list = portal.portal_activities.getMessageList()
self.assertEqual(len(message_list), activity_count)
m = message_list[0]
self.assertEqual(m.object_path, o.getPhysicalPath())
self.assertEqual(m.method_id, '_setTitle')
o = portal.organisation._getOb(self.company_id)
for i in range(activity_count):
o.activate(activity=activity)._setTitle('foo')
self.commit()
check(o)
portal.portal_activities.manageClearActivities()
self.commit()
check(o)
self.tic()
self.assertEqual(o.getTitle(), 'foo')
def CheckCountMessageWithTag(self, activity):
"""
Check countMessageWithTag function.
......@@ -596,7 +565,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
while validating, and check if they are still executed."""
# Make sure that no active object is installed.
activity_tool = self.getPortal().portal_activities
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
# Need an object.
organisation_module = self.getOrganisationModule()
......@@ -639,7 +608,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
while validating, and check if they are still executed."""
# Make sure that no active object is installed.
activity_tool = self.getPortal().portal_activities
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
# Need an object.
organisation_module = self.getOrganisationModule()
......@@ -685,7 +654,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
if not object_a.hasContent(self.company_id):
object_a.newContent(id=self.company_id)
object_b = object_a._getOb(self.company_id)
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
self.commit()
# First case: creating the same activity twice must only register one.
self.assertEqual(len(activity_tool.getMessageList()), 0) # Sanity check
......@@ -693,7 +662,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
object_a.activate(activity=activity).getId()
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 1)
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
self.commit()
# Second case: creating activity with same tag must only register one.
# This behaviour is actually the same as the no-tag behaviour.
......@@ -702,7 +671,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
object_a.activate(activity=activity, tag='foo').getId()
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 1)
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
self.commit()
# Third case: creating activities with different tags must register both.
self.assertEqual(len(activity_tool.getMessageList()), 0) # Sanity check
......@@ -710,7 +679,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
object_a.activate(activity=activity, tag='bar').getId()
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
self.commit()
# Fourth case: creating activities on different objects must register
# both.
......@@ -719,7 +688,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
object_b.activate(activity=activity).getId()
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
self.commit()
# Fifth case: creating activities with different method must register
# both.
......@@ -728,7 +697,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
object_a.activate(activity=activity).getTitle()
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
self.commit()
def test_01_DeferredSetTitleSQLDict(self, quiet=0, run=run_all_test):
......@@ -1057,24 +1026,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message)
self.CheckSchedulingAfterTagList('SQLQueue')
def test_63_CheckClearActivitiesWithSQLDict(self, quiet=0, run=run_all_test):
# Test if clearing tables does not remove active objects with SQLDict
if not run: return
if not quiet:
message = '\nCheck Clearing Activities With SQL Dict'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckClearActivities('SQLDict')
def test_64_CheckClearActivitiesWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if clearing tables does not remove active objects with SQLQueue
if not run: return
if not quiet:
message = '\nCheck Clearing Activities With SQL Queue'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckClearActivities('SQLQueue', activity_count=2)
def flushAllActivities(self, silent=0, loop_size=1000):
"""Executes all messages until the queue only contains failed
messages.
......@@ -1127,7 +1078,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_list = ['SQLQueue', 'SQLDict', ]
for activity in activity_list:
# reset
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
obj.setTitle(original_title)
self.commit()
......@@ -1184,7 +1135,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
activity_tool = self.getPortal().portal_activities
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
original_title = 'something'
obj = self.getPortal().organisation_module.newContent(
......@@ -3020,7 +2971,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises(self, activity):
# Make sure that no active object is installed.
activity_tool = self.getPortal().portal_activities
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
# Need an object.
organisation_module = self.getOrganisationModule()
......@@ -3104,7 +3055,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.tic()
message_list = activity_tool.getMessageList()
self.assertEqual(['doSomething'],[x.method_id for x in message_list])
activity_tool.manageClearActivities(keep=0)
activity_tool.manageClearActivities()
finally:
SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
......@@ -3177,9 +3128,11 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
)
oldconn = portal.cmf_activity_sql_connection
self.assertEqual(oldconn.meta_type, 'Z MySQL Database Connection')
# de-initialize and check that migration of the connection happens
# force rebootstrap and check that migration of the connection happens
# automatically
Products.CMFActivity.ActivityTool.is_initialized = False
from Products.ERP5Type.dynamic import portal_type_class
portal_type_class._bootstrapped.clear()
portal_type_class.synchronizeDynamicModules(activity_tool, True)
activity_tool.activate(activity='SQLQueue').getId()
self.tic()
newconn = portal.cmf_activity_sql_connection
......
......@@ -2176,7 +2176,7 @@ class ERP5Generator(PortalGenerator):
assert not p.hasObject('portal_activities')
addERP5Tool(p, 'portal_activities', 'Activity Tool')
# Initialize Activities
p.portal_activities.manageClearActivities(keep=0)
p.portal_activities.manageClearActivities()
if not p.hasObject('content_type_registry'):
self.setupMimetypes(p)
......
......@@ -345,6 +345,14 @@ def synchronizeDynamicModules(context, force=False):
continue
tool._bootstrap()
tool.__class__ = getattr(erp5.portal_type, tool.portal_type)
# TODO: Create portal_activities here, and even before portal_types:
# - all code in ActiveObject could assume that it always exists
# - currently, some objects created very early are not indexed
# and this would fix this issue
try:
portal.portal_activities.initialize()
except AttributeError:
pass # no Activity Tool yet
if migrate:
portal.migrateToPortalTypeClass()
......@@ -358,8 +366,8 @@ def synchronizeDynamicModules(context, force=False):
except:
# Required because the exception may be silently dropped by the caller.
transaction.doom()
LOG('ERP5Site', PANIC, "Automatic migration of type and"
" property sheet tool failed", error=sys.exc_info())
LOG('ERP5Site', PANIC, "Automatic migration of core tools failed",
error=sys.exc_info())
raise
LOG("ERP5Type.dynamic", 0, "Resetting dynamic classes")
......
......@@ -275,7 +275,8 @@ def _getTableSchema(query, name,
_create_search = re.compile(r'\bCREATE\s+TABLE\s+(`?)(\w+)\1\s+', re.I).search
_key_search = re.compile(r'\bKEY\s+(`[^`]+`)\s+(.+)').search
def DA_upgradeSchema(self, connection_id=None, src__=0):
def DA_upgradeSchema(self, connection_id=None, added_list=None,
modified_list=None, src__=0):
query = self.getPortalObject()[connection_id or self.connection_id]().query
src = self(src__=1)
m = _create_search(src)
......@@ -313,15 +314,22 @@ def DA_upgradeSchema(self, connection_id=None, src__=0):
else:
q("DROP KEY " + _key_search(key).group(1))
added = str if added_list is None else added_list.append
modified = str if modified_list is None else modified_list.append
pos = 0
where = "FIRST"
for column, spec in new_list:
try:
if old_dict[column] != (pos, spec):
q("MODIFY COLUMN %s %s %s" % (column, spec, where))
pos += 1
old = old_dict[column]
except KeyError:
q("ADD COLUMN %s %s %s" % (column, spec, where))
added(column)
else:
if old != (pos, spec):
q("MODIFY COLUMN %s %s %s" % (column, spec, where))
if old[1] != spec:
modified(column)
pos += 1
where = "AFTER " + column
for key in new_set - old_set:
......
......@@ -1077,7 +1077,7 @@ class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin):
if m.processing_node < -1:
self.abort()
count = portal_activities.countMessage()
portal_activities.manageClearActivities(keep=False)
portal_activities.manageClearActivities()
self.commit()
ZopeTestCase._print(' (dropped %d left-over activity messages) '
% count)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment