Commit 2b6f66f8 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Use SQLQueue for Hot Reindexing.

Make sure that all obejcts have uids before starting hot reindexing.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@2308 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 5e93ab86
...@@ -287,10 +287,11 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -287,10 +287,11 @@ class ZCatalog(Folder, Persistent, Implicit):
if o.catalog: if o.catalog:
if object is not None: if object is not None:
LOG('playBackRecordedObjectList, will reindexObject:',0,object.getPhysicalPath()) LOG('playBackRecordedObjectList, will reindexObject:',0,object.getPhysicalPath())
object.reindexObject(sql_catalog_id=sql_catalog_id, activity='SQLQueue')
#self.activate(passive_commit=1, priority=5).reindexObject(object, sql_catalog_id=sql_catalog_id) #self.activate(passive_commit=1, priority=5).reindexObject(object, sql_catalog_id=sql_catalog_id)
# XXX We can't call reindexObject has an activate message, because we can not # XXX We can't call reindexObject has an activate message, because we can not
# do pickle with acquisition wrappers # do pickle with acquisition wrappers
self.reindexObject(object, sql_catalog_id=sql_catalog_id) # Ask YO, we must find something activated #self.reindexObject(object, sql_catalog_id=sql_catalog_id) # Ask YO, we must find something activated
# Make sure that this object has no extra reference. # Make sure that this object has no extra reference.
object = None object = None
else: else:
...@@ -381,6 +382,16 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -381,6 +382,16 @@ class ZCatalog(Folder, Persistent, Implicit):
if self.hot_reindexing_state is not None: if self.hot_reindexing_state is not None:
raise CatalogError, 'hot reindexing process is already running' raise CatalogError, 'hot reindexing process is already running'
# First of all, make sure that all root objects have uids.
# XXX This is a workaround for tools (such as portal_simulation).
for path in self.portal_skins.erp5_core.Catalog_getIndexablePathList():
object = self.resolve_path(path)
if object is None: continue
if getattr(object, 'uid', None) is None:
object.getUid()
object = None
get_transaction().commit() # Should not have references to objects too long.
# Recreate the new database. # Recreate the new database.
LOG('hotReindexObjectList', 0, 'Clearing the new database') LOG('hotReindexObjectList', 0, 'Clearing the new database')
self.manage_catalogClear(sql_catalog_id=destination_sql_catalog_id) self.manage_catalogClear(sql_catalog_id=destination_sql_catalog_id)
...@@ -403,7 +414,7 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -403,7 +414,7 @@ class ZCatalog(Folder, Persistent, Implicit):
id_list = object.objectIds() id_list = object.objectIds()
#LOG('will hotReindex this object:',0,object.getPhysicalPath()) #LOG('will hotReindex this object:',0,object.getPhysicalPath())
#LOG('will hotReindex this object uid:',0,getattr(object,'uid',None)) #LOG('will hotReindex this object uid:',0,getattr(object,'uid',None))
object.activate(passive_commit=1, priority=5).queueCataloggedObject(sql_catalog_id=destination_sql_catalog_id) object.activate(passive_commit=1, priority=5, activity='SQLQueue').queueCataloggedObject(sql_catalog_id=destination_sql_catalog_id, activity='SQLQueue')
object = None object = None
get_transaction().commit() # Should not have references to objects too long. get_transaction().commit() # Should not have references to objects too long.
...@@ -412,18 +423,18 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -412,18 +423,18 @@ class ZCatalog(Folder, Persistent, Implicit):
subpath = '/'.join([path, id]) subpath = '/'.join([path, id])
o = self.resolve_path(subpath) o = self.resolve_path(subpath)
if o is not None: if o is not None:
o.activate(passive_commit=1, priority=5).recursiveQueueCataloggedObject(sql_catalog_id=destination_sql_catalog_id) o.activate(passive_commit=1, priority=5, activity='SQLQueue').recursiveQueueCataloggedObject(sql_catalog_id=destination_sql_catalog_id, activity='SQLQueue')
o = None o = None
get_transaction().commit() # Should not have references to objects too long. get_transaction().commit() # Should not have references to objects too long.
# Make sure that everything is catalogged. # Make sure that everything is catalogged.
LOG('hotReindexObjectList', 0, 'Flushing the queue') LOG('hotReindexObjectList', 0, 'Flushing the queue')
self.activate(broadcast=1, passive_commit=1, after_method_id='immediateQueueCataloggedObject', priority=5).flushQueuedObjectList(sql_catalog_id=destination_sql_catalog_id) self.activate(broadcast=1, passive_commit=1, after_method_id=('recursiveQueueCataloggedObject', 'queueCataloggedObject', 'immediateQueueCataloggedObject'), priority=5, activity='SQLQueue').flushQueuedObjectList(sql_catalog_id=destination_sql_catalog_id)
# Now synchronize this new database with the current one. # Now synchronize this new database with the current one.
# XXX It is necessary to use ActiveObject to wait for queued objects to be flushed. # XXX It is necessary to use ActiveObject to wait for queued objects to be flushed.
LOG('hotReindexObjectList', 0, 'Starting double indexing') LOG('hotReindexObjectList', 0, 'Starting double indexing')
self.activate(passive_commit=1, after_method_id='flushQueuedObjectList', priority=5).setHotReindexingState('double indexing', self.activate(passive_commit=1, after_method_id='flushQueuedObjectList', priority=5, activity='SQLQueue').setHotReindexingState('double indexing',
source_sql_catalog_id=source_sql_catalog_id, source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id) destination_sql_catalog_id=destination_sql_catalog_id)
...@@ -431,15 +442,15 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -431,15 +442,15 @@ class ZCatalog(Folder, Persistent, Implicit):
if source_sql_catalog_id != destination_sql_catalog_id: if source_sql_catalog_id != destination_sql_catalog_id:
# If equals, then this does not make sense to reindex again. # If equals, then this does not make sense to reindex again.
LOG('hotReindexObjectList', 0, 'Playing back records') LOG('hotReindexObjectList', 0, 'Playing back records')
self.activate(passive_commit=1, after_method_id='setHotReindexingState', priority=5).playBackRecordedObjectList(sql_catalog_id=destination_sql_catalog_id) self.activate(passive_commit=1, after_method_id='setHotReindexingState', priority=5, activity='SQLQueue').playBackRecordedObjectList(sql_catalog_id=destination_sql_catalog_id)
# Exchange the databases. # Exchange the databases.
LOG('hotReindexObjectList', 0, 'Exchanging databases') LOG('hotReindexObjectList', 0, 'Exchanging databases')
self.activate(after_method_id=('reindexObject', 'playBackRecordedObjectList', 'setHotReindexingState'), priority=5).exchangeDatabases(source_sql_catalog_id, destination_sql_catalog_id, skin_selection_dict, sql_connection_id_dict) # XXX Never called by activity tool, why ??? XXX self.activate(after_method_id=('reindexObject', 'playBackRecordedObjectList', 'setHotReindexingState'), priority=5, activity='SQLQueue').exchangeDatabases(source_sql_catalog_id, destination_sql_catalog_id, skin_selection_dict, sql_connection_id_dict) # XXX Never called by activity tool, why ??? XXX
finally: finally:
# Finish. # Finish.
LOG('hotReindexObjectList', 0, 'Finishing hot reindexing') LOG('hotReindexObjectList', 0, 'Finishing hot reindexing')
self.activate(passive_commit=1, after_method_id='exchangeDatabases', priority=5).finishHotReindexing() self.activate(passive_commit=1, after_method_id='exchangeDatabases', priority=5, activity='SQLQueue').finishHotReindexing()
if RESPONSE is not None: if RESPONSE is not None:
URL1 = REQUEST.get('URL1') URL1 = REQUEST.get('URL1')
...@@ -624,7 +635,7 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -624,7 +635,7 @@ class ZCatalog(Folder, Persistent, Implicit):
destination_catalog.catalogObject(obj, url, is_object_moved=is_object_moved) destination_catalog.catalogObject(obj, url, is_object_moved=is_object_moved)
security.declarePrivate('queueCataloggedObject') security.declarePrivate('queueCataloggedObject')
def queueCataloggedObject(self, object, sql_catalog_id=None): def queueCataloggedObject(self, object, sql_catalog_id=None, **kw):
""" """
Add an object into the queue for catalogging the object later in a batch form. Add an object into the queue for catalogging the object later in a batch form.
""" """
...@@ -640,7 +651,7 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -640,7 +651,7 @@ class ZCatalog(Folder, Persistent, Implicit):
self.flushQueuedObjectList(sql_catalog_id=sql_catalog_id) self.flushQueuedObjectList(sql_catalog_id=sql_catalog_id)
security.declarePublic('flushQueuedObjectList') security.declarePublic('flushQueuedObjectList')
def flushQueuedObjectList(self, sql_catalog_id=None): def flushQueuedObjectList(self, sql_catalog_id=None, **kw):
""" """
Flush queued objects. Flush queued objects.
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment