diff --git a/product/ZSQLCatalog/ZSQLCatalog.py b/product/ZSQLCatalog/ZSQLCatalog.py index d40b1dfc85f3e1428c8e90b43be13912a4e6fb47..4eeedf0a4d57523b50d470805d27c3d4aeb58706 100644 --- a/product/ZSQLCatalog/ZSQLCatalog.py +++ b/product/ZSQLCatalog/ZSQLCatalog.py @@ -28,6 +28,7 @@ from AccessControl.Permission import name_trans from SQLCatalog import Catalog, CatalogError from AccessControl import ClassSecurityInfo, getSecurityManager from AccessControl.DTML import RestrictedDTML +from Products.CMFCore.utils import getToolByName import string, os, sys, types import time import urllib @@ -37,6 +38,10 @@ from zLOG import LOG manage_addZSQLCatalogForm=DTMLFile('dtml/addZSQLCatalog',globals()) +HOT_REINDEXING_FINISHED_STATE = 'finished' +HOT_REINDEXING_RECORDING_STATE = 'recording' +HOT_REINDEXING_DOUBLE_INDEXING_STATE = 'double indexing' + def manage_addZSQLCatalog(self, id, title, vocab_id='create_default_catalog_', REQUEST=None): @@ -234,6 +239,15 @@ class ZCatalog(Folder, Persistent, Implicit): return 0 return len(catalog) + def getHotReindexingState(self): + """ + Return the current hot reindexing state. + """ + value = getattr(self, 'hot_reindexing_state', None) + if value is None: + return HOT_REINDEXING_FINISHED_STATE + return value + def setHotReindexingState(self, state='', source_sql_catalog_id=None, destination_sql_catalog_id=None): """ Set the state of hot reindexing. @@ -244,39 +258,73 @@ class ZCatalog(Folder, Persistent, Implicit): if source_sql_catalog_id is None: source_sql_catalog_id = self.default_sql_catalog_id - if state == 'finished': + if state == HOT_REINDEXING_FINISHED_STATE: self.hot_reindexing_state = None self.source_sql_catalog_id = None self.destination_sql_catalog_id = None - elif state == 'recording' or state == 'double indexing': + elif state == HOT_REINDEXING_RECORDING_STATE or \ + state == HOT_REINDEXING_DOUBLE_INDEXING_STATE: self.hot_reindexing_state = state self.source_sql_catalog_id = source_sql_catalog_id self.destination_sql_catalog_id = destination_sql_catalog_id else: raise CatalogError, 'unknown hot reindexing state %s' % state - # This change must be reflected as soon as possible. - get_transaction().commit() - - def finishHotReindexing(self, **kw): + def finishHotReindexing(self, source_sql_catalog_id, + destination_sql_catalog_id, skin_selection_dict, + sql_connection_id_dict): + """ + Exchange databases and finish reindexing in the same transaction. """ - XXX: This is a workaround for CMFActivity. + self.exchangeDatabases(source_sql_catalog_id=source_sql_catalog_id, + destination_sql_catalog_id=destination_sql_catalog_id, + skin_selection_dict=skin_selection_dict, + sql_connection_id_dict=sql_connection_id_dict) + self.setHotReindexingState(state=HOT_REINDEXING_FINISHED_STATE) + + def cancelHotReindexing(self): """ - self.setHotReindexingState(state='finished', **kw) + Cancel a hot reindexing. + Remove the hot reindexing state and flush related activities. - def playBackRecordedObjectList(self, sql_catalog_id=None): + TODO: Find a safe way to remove activities started by + ERP5Site_reindexAll. """ - Play back must be a distinct method to activate... + if self.getHotReindexingState() == HOT_REINDEXING_FINISHED_STATE: + raise Exception, 'cancelHotReindexing called while no Hot Reindexing '\ + 'was runing. Nothing done.' + # Remove hot reindexing state + self.setHotReindexingState(HOT_REINDEXING_FINISHED_STATE) + portal_activities = getToolByName(self, 'portal_activities') + if portal_activities is not None: + object_path = self.getPhysicalPath() + # Activities must be removed in the reverse order they were inserted + # to make sure removing one does not accidntaly trigger the next one. + method_id_list = ('finishHotReindexing', 'playBackRecordedObjectList', + 'setHotReindexingState') + for method_id in method_id_list: + portal_activities.flush(object_path, method_id=method_id) + + def playBackRecordedObjectList(self, sql_catalog_id, catalog=0): """ - catalog = self.getSQLCatalog(sql_catalog_id) + Play back the actions scheduled while hot reindexing was in "record" + state. - # First, play back unindexing. - while 1: - result = catalog.readRecordedObjectList(catalog=0) - if len(result) == 0: - break - get_transaction().commit() + sql_catalog_id Id of the catalog on which the actions will be played. + catalog 0 : play unindex actions + 1 : play index actions + This function schedules itself for later execution. + This is done in order to avoid accessing "too many" objects in the same + transaction. + """ + if self.getHotReindexingState() != HOT_REINDEXING_DOUBLE_INDEXING_STATE: + raise Exception, 'playBackRecordedObjectList was called while '\ + 'hot_reindexing_state was not "%s". Playback aborted.' \ + % (HOT_REINDEXING_DOUBLE_INDEXING_STATE, ) + catalog_object = self.getSQLCatalog(sql_catalog_id) + result = catalog_object.readRecordedObjectList(catalog=catalog) + if len(result): for o in result: try: obj = self.resolve_path(o.path) @@ -285,31 +333,27 @@ class ZCatalog(Folder, Persistent, Implicit): except: obj = None if obj is None: - self.uncatalog_object(o.path, sql_catalog_id=sql_catalog_id) - - catalog.deleteRecordedObjectList(uid_list=[o.uid for o in result]) - get_transaction().commit() - - # Next, play back indexing. - while 1: - result = catalog.readRecordedObjectList(catalog=1) - if len(result) == 0: - break - get_transaction().commit() - - for o in result: - try: - obj = self.resolve_path(o.path) - except ConflictError: - raise - except: - obj = None - if obj is not None: - obj.reindexObject(sql_catalog_id=sql_catalog_id) - get_transaction().commit() - - catalog.deleteRecordedObjectList(uid_list=[o.uid for o in result]) - get_transaction().commit() + if catalog == 0: + self.uncatalog_object(o.path, sql_catalog_id=sql_catalog_id) + elif catalog == 1: + obj.reindexObject(sql_catalog_id=sql_catalog_id) + else: + raise ValueError, '%s is not a valid value for "catalog".' % (catalog, ) + catalog_object.deleteRecordedObjectList(uid_list=[o.uid for o in result]) + # Re-schedule the same action in case there are remaining rows in the + # table. This can happen if the database connector limits the number + # of rows in the result. + self.activate(passive_commit=1, priority=5).\ + playBackRecordedObjectList(sql_catalog_id=sql_catalog_id, + catalog=catalog) + else: + # If there iss nothing to do, go to next step. + if catalog == 0: + # If we were replaying unindex actions, time to replay index actions. + self.activate(passive_commit=1, priority=5).\ + playBackRecordedObjectList(sql_catalog_id=sql_catalog_id, + catalog=1) + # If we were replaying index actions, there is nothing else to do. def exchangeDatabases(self, source_sql_catalog_id, destination_sql_catalog_id, skin_selection_dict, sql_connection_id_dict): @@ -318,6 +362,10 @@ class ZCatalog(Folder, Persistent, Implicit): """ if self.default_sql_catalog_id == source_sql_catalog_id: self.default_sql_catalog_id = destination_sql_catalog_id + # Insert the latest generated uid. + # This must be done just before swaping the catalogs in case there were + # generated uids since destination catalog was created. + self[destination_sql_catalog_id].insertMaxUid() LOG('exchangeDatabases skin_selection_dict:',0,skin_selection_dict) if skin_selection_dict is not None: @@ -339,31 +387,57 @@ class ZCatalog(Folder, Persistent, Implicit): changeSQLConnectionIds(self.portal_skins) - def manage_hotReindexAll(self, source_sql_catalog_id='erp5_mysql', - destination_sql_catalog_id='erp5_mysql', - source_sql_connection_id_list = None, - destination_sql_connection_id_list = None, - skin_name_list = None, - skin_selection_list = None, + def manage_hotReindexAll(self, source_sql_catalog_id, + destination_sql_catalog_id, + source_sql_connection_id_list=None, + destination_sql_connection_id_list=None, + skin_name_list=None, + skin_selection_list=None, REQUEST=None, RESPONSE=None): """ - Reindex objects from scratch in the background then switch to the newly created database. + Starts a hot reindexing. + + Hot reindexing will create a catalog and sync it with the current one. + Once done, both catalogs will be swapped so that current catalog will + not be used any more and destination catalog will get used "for real". + + source_catalog_id + Id of the SQLCatalog object to use as the source catalog. + WARNING: it is not considered normal to specify a catalog which is not + the current default one. + The feature is still provided, but you'll be on your own if + you try it. + + destination_sql_catalog_id + Id of the SQLCatalog object to use as the new catalog. + + source_sql_connection_id_list + destination_sql_connection_id_list + SQL Methods in portal_skins using source_sql_connection_id_list[n] + connection will use destination_sql_connection_id_list[n] connection + once hot reindexing is over. + + skin_name_list + skin_selection_list + For each skin_name_list[n], skin_selection_list[n] will be set to + replace the existing skin selection on portal_skins. """ - # Get parameters. - #source_sql_catalog_id = REQUEST.get('source_sql_catalog_id') - #destination_sql_catalog_id = REQUEST.get('destination_sql_catalog_id') - #source_sql_connection_id_list = REQUEST.get('source_sql_connection_id_list') - #destination_sql_connection_id_list = REQUEST.get('destination_sql_connection_id_list') - #skin_name_list = REQUEST.get('skin_name_list') - #skin_selection_list = REQUEST.get('skin_selection_list') - LOG('source_sql_catalog_id',0,source_sql_catalog_id) - LOG('destination_sql_catalog_id',0,destination_sql_catalog_id) - LOG('source_sql_connection_id_list',0,source_sql_connection_id_list) - LOG('destination_sql_connection_id_list',0,destination_sql_connection_id_list) - LOG('skin_name_list',0,skin_name_list) - LOG('skin_selection_list',0,skin_selection_list) - - # Construct a mapping for skin selections. + # Hot reindexing can only be runing once at a time on a system. + if self.hot_reindexing_state is not None: + raise CatalogError, 'hot reindexing process is already running' + + if source_sql_catalog_id == destination_sql_catalog_id: + raise CatalogError, 'Hot reindexing cannot be done with the same '\ + 'catalog as both source and destination. What'\ + ' you want to do is a "clear catalog" and an '\ + '"ERP5Site_reindexAll".' + + if source_sql_catalog_id != self.default_sql_catalog_id: + LOG('ZSQLCatalog', 0, 'Warning : Hot reindexing is started with a '\ + 'source catalog which is not the default one.') + + # Construct a mapping for skin selections. It will be used during the + # final hot reindexing step. skin_selection_dict = None if skin_name_list is not None and skin_selection_list is not None: skin_selection_dict = {} @@ -376,67 +450,63 @@ class ZCatalog(Folder, Persistent, Implicit): new_selection_list.append(new_selection) skin_selection_dict[name] = new_selection_list - # Construct a mapping for connection ids. + # Construct a mapping for connection ids. It will be used during the + # final hot reindexing step. sql_connection_id_dict = None - if source_sql_connection_id_list is not None and destination_sql_connection_id_list is not None: + if source_sql_connection_id_list is not None and \ + destination_sql_connection_id_list is not None: sql_connection_id_dict = {} - for source_sql_connection_id, destination_sql_connection_id in zip(source_sql_connection_id_list, destination_sql_connection_id_list): + for source_sql_connection_id, destination_sql_connection_id in \ + zip(source_sql_connection_id_list, + destination_sql_connection_id_list): if source_sql_connection_id != destination_sql_connection_id: - sql_connection_id_dict[source_sql_connection_id] = destination_sql_connection_id - - # Hot reindexing may not run for multiple databases. - if self.hot_reindexing_state is not None: - raise CatalogError, 'hot reindexing process is already running' + sql_connection_id_dict[source_sql_connection_id] = \ + destination_sql_connection_id # First of all, make sure that all root objects have uids. # XXX This is a workaround for tools (such as portal_simulation). portal = self.getPortalObject() for id in portal.objectIds(): - obj = portal[id] - if hasattr(aq_base(obj), 'getUid'): - obj.getUid() - get_transaction().commit() # Should not have references to objects too long. - - # Recreate the new database. - LOG('hotReindexObjectList', 0, 'Clearing the new database') - self.manage_catalogClear(sql_catalog_id=destination_sql_catalog_id) - - try: - # Start recording catalog/uncatalog information. - LOG('hotReindexObjectList', 0, 'Starting recording') - self.setHotReindexingState('recording', - source_sql_catalog_id=source_sql_catalog_id, - destination_sql_catalog_id=destination_sql_catalog_id) - get_transaction().commit() # Should not have references to objects too long. - - # Start reindexing. - self.ERP5Site_reindexAll(sql_catalog_id=destination_sql_catalog_id) - get_transaction().commit() # Should not have references to objects too long. - - # Now synchronize this new database with the current one. - # XXX It is necessary to use ActiveObject to wait for queued objects to be flushed. - LOG('hotReindexObjectList', 0, 'Starting double indexing') - self.activate(passive_commit=1, after_method_id=('reindexObject', 'recursiveReindexObject'), priority=5).setHotReindexingState('double indexing', - source_sql_catalog_id=source_sql_catalog_id, - destination_sql_catalog_id=destination_sql_catalog_id) - - # Play back records. - if source_sql_catalog_id != destination_sql_catalog_id: - # If equals, then this does not make sense to reindex again. - LOG('hotReindexObjectList', 0, 'Playing back records') - self.activate(passive_commit=1, after_method_id='setHotReindexingState', priority=5).playBackRecordedObjectList(sql_catalog_id=destination_sql_catalog_id) - - # Exchange the databases. - LOG('hotReindexObjectList', 0, 'Exchanging databases') - self.activate(after_method_id=('reindexObject', 'playBackRecordedObjectList'), priority=5).exchangeDatabases(source_sql_catalog_id, destination_sql_catalog_id, skin_selection_dict, sql_connection_id_dict) # XXX Never called by activity tool, why ??? XXX - finally: - # Finish. - LOG('hotReindexObjectList', 0, 'Finishing hot reindexing') - self.activate(passive_commit=1, after_method_id='exchangeDatabases', priority=5).finishHotReindexing() - + getUid = getattr(portal[id], 'getUid', None) + if getUid is not None: + getUid() # Trigger the uid generation if none is set. + + # Mark the hot reindex as begun. Each object indexed in the still-current + # catalog will be scheduled for reindex in the future catalog. + LOG('hotReindexObjectList', 0, 'Starting recording') + self.setHotReindexingState(HOT_REINDEXING_RECORDING_STATE, + source_sql_catalog_id=source_sql_catalog_id, + destination_sql_catalog_id=destination_sql_catalog_id) + # Clear the future catalog and start reindexing the site in it. + final_activity_tag = 'hot_reindex_last_ERP5Site_reindexAll_tag' + self.ERP5Site_reindexAll(sql_catalog_id=destination_sql_catalog_id, + final_activity_tag=final_activity_tag, + clear_catalog=1, + passive_commit=1) + # Once reindexing is finished, change the hot reindexing state so that + # new catalog changes are applied in both catalogs. + self.activate(passive_commit=1, + after_tag=final_activity_tag, + priority=5).setHotReindexingState(HOT_REINDEXING_DOUBLE_INDEXING_STATE, + source_sql_catalog_id=source_sql_catalog_id, + destination_sql_catalog_id=destination_sql_catalog_id) + # Once in double-indexing mode, planned reindex can be replayed. + self.activate(passive_commit=1, + after_method_id='setHotReindexingState', + priority=5).playBackRecordedObjectList( + sql_catalog_id=destination_sql_catalog_id) + # Once there is nothing to replay, databases are sync'ed, so the new + # catalog can become current. + self.activate(passive_commit=1, + after_method_id=('playBackRecordedObjectList'), + priority=5).finishHotReindexing( + source_sql_catalog_id=source_sql_catalog_id, + destination_sql_catalog_id=destination_sql_catalog_id, + skin_selection_dict=skin_selection_dict, + sql_connection_id_dict=sql_connection_id_dict) if RESPONSE is not None: URL1 = REQUEST.get('URL1') - RESPONSE.redirect(URL1 + '/manage_catalogHotReindexing?manage_tabs_message=Catalog%20Changed') + RESPONSE.redirect(URL1 + '/manage_catalogHotReindexing?manage_tabs_message=HotReindexing%20Started') def manage_edit(self, RESPONSE, URL1, threshold=1000, REQUEST=None): """ edit the catalog """ diff --git a/product/ZSQLCatalog/dtml/catalogHotReindexing.dtml b/product/ZSQLCatalog/dtml/catalogHotReindexing.dtml index 3b4bc52306417f591689e6958ca0c3f26cda9df8..c8dcb96b65529296da65e0ad2d23295fbd56b602 100644 --- a/product/ZSQLCatalog/dtml/catalogHotReindexing.dtml +++ b/product/ZSQLCatalog/dtml/catalogHotReindexing.dtml @@ -7,13 +7,6 @@ Hot Reindexing allows you to recreate a new SQL Catalog without stopping your sy This requires at least two SQL Catalogs. </p> -<dtml-if "_.hasattr(getPortalObject(), 'portal_activities')"> -<p class="form-help"> -Note that hot reindexing might not be finished even when you get a successful response, -because portal_activities delays the real operations. You can check the real state in properties. -</p> -</dtml-if> - <table cellspacing="0" cellpadding="2" border="0"> <tr class="list-header"> <td colspan="2" align="left">