Commit 5a45a355 authored by Vincent Pelletier's avatar Vincent Pelletier

Declare symbolic globals for hot reindexing states and update users.

Define an accessor to get hot reindexing state.
Remove unneeded get_transaction().commit().
Make finishHotReindexing both update hot reindex state and swap the catalogs.
Defne a function to cancel a runing hot reindexing.
Factorize playBackRecordedObjectList code and make it re-scedule its own call to avoid working on many object in one transaction while allowing to remove commit()s.
Insert last reserved uid at reindexing end.
Remove manage_hotReindexAll default values to remove "erp5" reference in this code (which should be ERP5-independant). Note that there are still explicit c
alls to ERP5Site_reindexAll.
Describe manage_hotReindexAll more completely.
Check first if hot reindex is already runing to avoid wasting time generating lists when the function would anyway later fail on this check.
Use introspection to avoid calling getUid on object where it's not available.
Remove a useless try...except (was usefull when there were commit()s).
Update ZMI message telling hot reindex was started.
Update ZMI page to remove an outdated comment.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@12687 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0deb8041
......@@ -28,6 +28,7 @@ from AccessControl.Permission import name_trans
from SQLCatalog import Catalog, CatalogError
from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.DTML import RestrictedDTML
from Products.CMFCore.utils import getToolByName
import string, os, sys, types
import time
import urllib
......@@ -37,6 +38,10 @@ from zLOG import LOG
manage_addZSQLCatalogForm=DTMLFile('dtml/addZSQLCatalog',globals())
HOT_REINDEXING_FINISHED_STATE = 'finished'
HOT_REINDEXING_RECORDING_STATE = 'recording'
HOT_REINDEXING_DOUBLE_INDEXING_STATE = 'double indexing'
def manage_addZSQLCatalog(self, id, title,
vocab_id='create_default_catalog_',
REQUEST=None):
......@@ -234,6 +239,15 @@ class ZCatalog(Folder, Persistent, Implicit):
return 0
return len(catalog)
def getHotReindexingState(self):
"""
Return the current hot reindexing state.
"""
value = getattr(self, 'hot_reindexing_state', None)
if value is None:
return HOT_REINDEXING_FINISHED_STATE
return value
def setHotReindexingState(self, state='', source_sql_catalog_id=None, destination_sql_catalog_id=None):
"""
Set the state of hot reindexing.
......@@ -244,39 +258,73 @@ class ZCatalog(Folder, Persistent, Implicit):
if source_sql_catalog_id is None:
source_sql_catalog_id = self.default_sql_catalog_id
if state == 'finished':
if state == HOT_REINDEXING_FINISHED_STATE:
self.hot_reindexing_state = None
self.source_sql_catalog_id = None
self.destination_sql_catalog_id = None
elif state == 'recording' or state == 'double indexing':
elif state == HOT_REINDEXING_RECORDING_STATE or \
state == HOT_REINDEXING_DOUBLE_INDEXING_STATE:
self.hot_reindexing_state = state
self.source_sql_catalog_id = source_sql_catalog_id
self.destination_sql_catalog_id = destination_sql_catalog_id
else:
raise CatalogError, 'unknown hot reindexing state %s' % state
# This change must be reflected as soon as possible.
get_transaction().commit()
def finishHotReindexing(self, **kw):
def finishHotReindexing(self, source_sql_catalog_id,
destination_sql_catalog_id, skin_selection_dict,
sql_connection_id_dict):
"""
Exchange databases and finish reindexing in the same transaction.
"""
XXX: This is a workaround for CMFActivity.
self.exchangeDatabases(source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id,
skin_selection_dict=skin_selection_dict,
sql_connection_id_dict=sql_connection_id_dict)
self.setHotReindexingState(state=HOT_REINDEXING_FINISHED_STATE)
def cancelHotReindexing(self):
"""
self.setHotReindexingState(state='finished', **kw)
Cancel a hot reindexing.
Remove the hot reindexing state and flush related activities.
def playBackRecordedObjectList(self, sql_catalog_id=None):
TODO: Find a safe way to remove activities started by
ERP5Site_reindexAll.
"""
Play back must be a distinct method to activate...
if self.getHotReindexingState() == HOT_REINDEXING_FINISHED_STATE:
raise Exception, 'cancelHotReindexing called while no Hot Reindexing '\
'was runing. Nothing done.'
# Remove hot reindexing state
self.setHotReindexingState(HOT_REINDEXING_FINISHED_STATE)
portal_activities = getToolByName(self, 'portal_activities')
if portal_activities is not None:
object_path = self.getPhysicalPath()
# Activities must be removed in the reverse order they were inserted
# to make sure removing one does not accidntaly trigger the next one.
method_id_list = ('finishHotReindexing', 'playBackRecordedObjectList',
'setHotReindexingState')
for method_id in method_id_list:
portal_activities.flush(object_path, method_id=method_id)
def playBackRecordedObjectList(self, sql_catalog_id, catalog=0):
"""
catalog = self.getSQLCatalog(sql_catalog_id)
Play back the actions scheduled while hot reindexing was in "record"
state.
# First, play back unindexing.
while 1:
result = catalog.readRecordedObjectList(catalog=0)
if len(result) == 0:
break
get_transaction().commit()
sql_catalog_id Id of the catalog on which the actions will be played.
catalog 0 : play unindex actions
1 : play index actions
This function schedules itself for later execution.
This is done in order to avoid accessing "too many" objects in the same
transaction.
"""
if self.getHotReindexingState() != HOT_REINDEXING_DOUBLE_INDEXING_STATE:
raise Exception, 'playBackRecordedObjectList was called while '\
'hot_reindexing_state was not "%s". Playback aborted.' \
% (HOT_REINDEXING_DOUBLE_INDEXING_STATE, )
catalog_object = self.getSQLCatalog(sql_catalog_id)
result = catalog_object.readRecordedObjectList(catalog=catalog)
if len(result):
for o in result:
try:
obj = self.resolve_path(o.path)
......@@ -285,31 +333,27 @@ class ZCatalog(Folder, Persistent, Implicit):
except:
obj = None
if obj is None:
self.uncatalog_object(o.path, sql_catalog_id=sql_catalog_id)
catalog.deleteRecordedObjectList(uid_list=[o.uid for o in result])
get_transaction().commit()
# Next, play back indexing.
while 1:
result = catalog.readRecordedObjectList(catalog=1)
if len(result) == 0:
break
get_transaction().commit()
for o in result:
try:
obj = self.resolve_path(o.path)
except ConflictError:
raise
except:
obj = None
if obj is not None:
obj.reindexObject(sql_catalog_id=sql_catalog_id)
get_transaction().commit()
catalog.deleteRecordedObjectList(uid_list=[o.uid for o in result])
get_transaction().commit()
if catalog == 0:
self.uncatalog_object(o.path, sql_catalog_id=sql_catalog_id)
elif catalog == 1:
obj.reindexObject(sql_catalog_id=sql_catalog_id)
else:
raise ValueError, '%s is not a valid value for "catalog".' % (catalog, )
catalog_object.deleteRecordedObjectList(uid_list=[o.uid for o in result])
# Re-schedule the same action in case there are remaining rows in the
# table. This can happen if the database connector limits the number
# of rows in the result.
self.activate(passive_commit=1, priority=5).\
playBackRecordedObjectList(sql_catalog_id=sql_catalog_id,
catalog=catalog)
else:
# If there iss nothing to do, go to next step.
if catalog == 0:
# If we were replaying unindex actions, time to replay index actions.
self.activate(passive_commit=1, priority=5).\
playBackRecordedObjectList(sql_catalog_id=sql_catalog_id,
catalog=1)
# If we were replaying index actions, there is nothing else to do.
def exchangeDatabases(self, source_sql_catalog_id, destination_sql_catalog_id,
skin_selection_dict, sql_connection_id_dict):
......@@ -318,6 +362,10 @@ class ZCatalog(Folder, Persistent, Implicit):
"""
if self.default_sql_catalog_id == source_sql_catalog_id:
self.default_sql_catalog_id = destination_sql_catalog_id
# Insert the latest generated uid.
# This must be done just before swaping the catalogs in case there were
# generated uids since destination catalog was created.
self[destination_sql_catalog_id].insertMaxUid()
LOG('exchangeDatabases skin_selection_dict:',0,skin_selection_dict)
if skin_selection_dict is not None:
......@@ -339,31 +387,57 @@ class ZCatalog(Folder, Persistent, Implicit):
changeSQLConnectionIds(self.portal_skins)
def manage_hotReindexAll(self, source_sql_catalog_id='erp5_mysql',
destination_sql_catalog_id='erp5_mysql',
source_sql_connection_id_list = None,
destination_sql_connection_id_list = None,
skin_name_list = None,
skin_selection_list = None,
def manage_hotReindexAll(self, source_sql_catalog_id,
destination_sql_catalog_id,
source_sql_connection_id_list=None,
destination_sql_connection_id_list=None,
skin_name_list=None,
skin_selection_list=None,
REQUEST=None, RESPONSE=None):
"""
Reindex objects from scratch in the background then switch to the newly created database.
Starts a hot reindexing.
Hot reindexing will create a catalog and sync it with the current one.
Once done, both catalogs will be swapped so that current catalog will
not be used any more and destination catalog will get used "for real".
source_catalog_id
Id of the SQLCatalog object to use as the source catalog.
WARNING: it is not considered normal to specify a catalog which is not
the current default one.
The feature is still provided, but you'll be on your own if
you try it.
destination_sql_catalog_id
Id of the SQLCatalog object to use as the new catalog.
source_sql_connection_id_list
destination_sql_connection_id_list
SQL Methods in portal_skins using source_sql_connection_id_list[n]
connection will use destination_sql_connection_id_list[n] connection
once hot reindexing is over.
skin_name_list
skin_selection_list
For each skin_name_list[n], skin_selection_list[n] will be set to
replace the existing skin selection on portal_skins.
"""
# Get parameters.
#source_sql_catalog_id = REQUEST.get('source_sql_catalog_id')
#destination_sql_catalog_id = REQUEST.get('destination_sql_catalog_id')
#source_sql_connection_id_list = REQUEST.get('source_sql_connection_id_list')
#destination_sql_connection_id_list = REQUEST.get('destination_sql_connection_id_list')
#skin_name_list = REQUEST.get('skin_name_list')
#skin_selection_list = REQUEST.get('skin_selection_list')
LOG('source_sql_catalog_id',0,source_sql_catalog_id)
LOG('destination_sql_catalog_id',0,destination_sql_catalog_id)
LOG('source_sql_connection_id_list',0,source_sql_connection_id_list)
LOG('destination_sql_connection_id_list',0,destination_sql_connection_id_list)
LOG('skin_name_list',0,skin_name_list)
LOG('skin_selection_list',0,skin_selection_list)
# Construct a mapping for skin selections.
# Hot reindexing can only be runing once at a time on a system.
if self.hot_reindexing_state is not None:
raise CatalogError, 'hot reindexing process is already running'
if source_sql_catalog_id == destination_sql_catalog_id:
raise CatalogError, 'Hot reindexing cannot be done with the same '\
'catalog as both source and destination. What'\
' you want to do is a "clear catalog" and an '\
'"ERP5Site_reindexAll".'
if source_sql_catalog_id != self.default_sql_catalog_id:
LOG('ZSQLCatalog', 0, 'Warning : Hot reindexing is started with a '\
'source catalog which is not the default one.')
# Construct a mapping for skin selections. It will be used during the
# final hot reindexing step.
skin_selection_dict = None
if skin_name_list is not None and skin_selection_list is not None:
skin_selection_dict = {}
......@@ -376,67 +450,63 @@ class ZCatalog(Folder, Persistent, Implicit):
new_selection_list.append(new_selection)
skin_selection_dict[name] = new_selection_list
# Construct a mapping for connection ids.
# Construct a mapping for connection ids. It will be used during the
# final hot reindexing step.
sql_connection_id_dict = None
if source_sql_connection_id_list is not None and destination_sql_connection_id_list is not None:
if source_sql_connection_id_list is not None and \
destination_sql_connection_id_list is not None:
sql_connection_id_dict = {}
for source_sql_connection_id, destination_sql_connection_id in zip(source_sql_connection_id_list, destination_sql_connection_id_list):
for source_sql_connection_id, destination_sql_connection_id in \
zip(source_sql_connection_id_list,
destination_sql_connection_id_list):
if source_sql_connection_id != destination_sql_connection_id:
sql_connection_id_dict[source_sql_connection_id] = destination_sql_connection_id
# Hot reindexing may not run for multiple databases.
if self.hot_reindexing_state is not None:
raise CatalogError, 'hot reindexing process is already running'
sql_connection_id_dict[source_sql_connection_id] = \
destination_sql_connection_id
# First of all, make sure that all root objects have uids.
# XXX This is a workaround for tools (such as portal_simulation).
portal = self.getPortalObject()
for id in portal.objectIds():
obj = portal[id]
if hasattr(aq_base(obj), 'getUid'):
obj.getUid()
get_transaction().commit() # Should not have references to objects too long.
# Recreate the new database.
LOG('hotReindexObjectList', 0, 'Clearing the new database')
self.manage_catalogClear(sql_catalog_id=destination_sql_catalog_id)
try:
# Start recording catalog/uncatalog information.
LOG('hotReindexObjectList', 0, 'Starting recording')
self.setHotReindexingState('recording',
source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id)
get_transaction().commit() # Should not have references to objects too long.
# Start reindexing.
self.ERP5Site_reindexAll(sql_catalog_id=destination_sql_catalog_id)
get_transaction().commit() # Should not have references to objects too long.
# Now synchronize this new database with the current one.
# XXX It is necessary to use ActiveObject to wait for queued objects to be flushed.
LOG('hotReindexObjectList', 0, 'Starting double indexing')
self.activate(passive_commit=1, after_method_id=('reindexObject', 'recursiveReindexObject'), priority=5).setHotReindexingState('double indexing',
source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id)
# Play back records.
if source_sql_catalog_id != destination_sql_catalog_id:
# If equals, then this does not make sense to reindex again.
LOG('hotReindexObjectList', 0, 'Playing back records')
self.activate(passive_commit=1, after_method_id='setHotReindexingState', priority=5).playBackRecordedObjectList(sql_catalog_id=destination_sql_catalog_id)
# Exchange the databases.
LOG('hotReindexObjectList', 0, 'Exchanging databases')
self.activate(after_method_id=('reindexObject', 'playBackRecordedObjectList'), priority=5).exchangeDatabases(source_sql_catalog_id, destination_sql_catalog_id, skin_selection_dict, sql_connection_id_dict) # XXX Never called by activity tool, why ??? XXX
finally:
# Finish.
LOG('hotReindexObjectList', 0, 'Finishing hot reindexing')
self.activate(passive_commit=1, after_method_id='exchangeDatabases', priority=5).finishHotReindexing()
getUid = getattr(portal[id], 'getUid', None)
if getUid is not None:
getUid() # Trigger the uid generation if none is set.
# Mark the hot reindex as begun. Each object indexed in the still-current
# catalog will be scheduled for reindex in the future catalog.
LOG('hotReindexObjectList', 0, 'Starting recording')
self.setHotReindexingState(HOT_REINDEXING_RECORDING_STATE,
source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id)
# Clear the future catalog and start reindexing the site in it.
final_activity_tag = 'hot_reindex_last_ERP5Site_reindexAll_tag'
self.ERP5Site_reindexAll(sql_catalog_id=destination_sql_catalog_id,
final_activity_tag=final_activity_tag,
clear_catalog=1,
passive_commit=1)
# Once reindexing is finished, change the hot reindexing state so that
# new catalog changes are applied in both catalogs.
self.activate(passive_commit=1,
after_tag=final_activity_tag,
priority=5).setHotReindexingState(HOT_REINDEXING_DOUBLE_INDEXING_STATE,
source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id)
# Once in double-indexing mode, planned reindex can be replayed.
self.activate(passive_commit=1,
after_method_id='setHotReindexingState',
priority=5).playBackRecordedObjectList(
sql_catalog_id=destination_sql_catalog_id)
# Once there is nothing to replay, databases are sync'ed, so the new
# catalog can become current.
self.activate(passive_commit=1,
after_method_id=('playBackRecordedObjectList'),
priority=5).finishHotReindexing(
source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id,
skin_selection_dict=skin_selection_dict,
sql_connection_id_dict=sql_connection_id_dict)
if RESPONSE is not None:
URL1 = REQUEST.get('URL1')
RESPONSE.redirect(URL1 + '/manage_catalogHotReindexing?manage_tabs_message=Catalog%20Changed')
RESPONSE.redirect(URL1 + '/manage_catalogHotReindexing?manage_tabs_message=HotReindexing%20Started')
def manage_edit(self, RESPONSE, URL1, threshold=1000, REQUEST=None):
""" edit the catalog """
......
......@@ -7,13 +7,6 @@ Hot Reindexing allows you to recreate a new SQL Catalog without stopping your sy
This requires at least two SQL Catalogs.
</p>
<dtml-if "_.hasattr(getPortalObject(), 'portal_activities')">
<p class="form-help">
Note that hot reindexing might not be finished even when you get a successful response,
because portal_activities delays the real operations. You can check the real state in properties.
</p>
</dtml-if>
<table cellspacing="0" cellpadding="2" border="0">
<tr class="list-header">
<td colspan="2" align="left">
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment