Commit 0a1adf68 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: drop RAMDict & RAMQueue

parent ae7a36f3
......@@ -45,7 +45,7 @@ INVALID_ORDER = 2
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 30 # in seconds
class Queue:
class Queue(object):
"""
Step 1: use lists
......@@ -101,10 +101,10 @@ class Queue:
m.is_deleted = 1
def dequeueMessage(self, activity_tool, processing_node):
pass
raise NotImplementedError
def distribute(self, activity_tool, node_count):
pass
raise NotImplementedError
def validate(self, activity_tool, message, check_order_validation=1, **kw):
"""
......
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED
from Products.CMFActivity.Errors import ActivityFlushError
from Queue import Queue, VALID
from zLOG import LOG
import transaction
class RAMDict(Queue):
"""
A simple RAM based queue. It is not compatible with transactions which
means methods can be called before an object even exists or before
it is modified. This also means there is no garantee on any kind of sequenciality.
Dictionnary is global.
"""
def __init__(self):
Queue.__init__(self)
self.queue_dict = {}
def getDict(self, activity_tool_path):
return self.queue_dict.setdefault(activity_tool_path, {})
def finishQueueMessage(self, activity_tool_path, m):
if m.is_registered:
self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)] = m
def finishDeleteMessage(self, activity_tool_path, message):
for key, m in self.getDict(activity_tool_path).items():
if m.object_path == message.object_path and m.method_id == message.method_id:
del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)]
def registerActivityBuffer(self, activity_buffer):
pass
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id)
def isMessageRegistered(self, activity_buffer, activity_tool, m):
uid_set = activity_buffer.getUidSet(self)
return self.generateMessageUID(m) in uid_set
def registerMessage(self, activity_buffer, activity_tool, m):
message_list = activity_buffer.getMessageList(self)
message_list.append(m)
uid_set = activity_buffer.getUidSet(self)
uid_set.add(self.generateMessageUID(m))
m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node):
path = activity_tool.getPhysicalPath()
if len(self.getDict(path).keys()) is 0:
return 1 # Go to sleep
for key, m in self.getDict(path).items():
if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m)
if m.getExecutionState() == MESSAGE_EXECUTED:
del self.getDict(path)[key]
transaction.commit()
return 0
else:
# Start a new transaction and keep on to next message
transaction.commit()
return 1
def countMessage(self, activity_tool,path=None,method_id=None,**kw):
tool_path = activity_tool.getPhysicalPath()
count = 0
for (key,m) in self.getDict(tool_path).items():
add = 1
if path is not None:
object_path = '/'.join(m.object_path)
if object_path != path:
add = 0
if method_id is not None:
if m.method_id != method_id:
add = 0
count += add
return count
def hasActivity(self, activity_tool, object, **kw):
if object is not None:
object_path = object.getPhysicalPath()
else:
object_path = None
active_process = kw.get('active_process', None)
path = activity_tool.getPhysicalPath()
for m in self.getDict(path).values():
# Filter active process and path if defined
if active_process is None or m.active_process == active_process:
if object_path is None or m.object_path == object_path:
return 1
return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {}
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if not method_dict.has_key(m.method_id):
if invoke:
# First Validate
if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else:
method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m)
else:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
else:
method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m)
else:
method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m)
# Parse each message in RAM dict
path = activity_tool.getPhysicalPath()
for key, m in self.getDict(path).items():
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if not method_dict.has_key(m.method_id):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
if invoke:
activity_tool.invoke(m)
if m.getExecutionState() == MESSAGE_EXECUTED:
method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m)
else:
method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m)
else:
self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = []
path = activity_tool.getPhysicalPath()
for m in self.getDict(path).values():
m.processing_node = 1
m.priority = 0
new_queue.append(m)
return new_queue
registerActivity(RAMDict)
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED
from Queue import Queue, VALID
import transaction
class RAMQueue(Queue):
"""
A simple RAM based queue
"""
def __init__(self):
Queue.__init__(self)
self.queue_dict = {}
self.last_uid = 0
def getQueue(self, activity_tool_path):
return self.queue_dict.setdefault(activity_tool_path, [])
def finishQueueMessage(self, activity_tool_path, m):
if m.is_registered:
# XXX - Some lock is required on this section
self.last_uid = self.last_uid + 1
m.uid = self.last_uid
self.getQueue(activity_tool_path).append(m)
def finishDeleteMessage(self, activity_tool_path, m):
i = 0
queue = self.getQueue(activity_tool_path)
for my_message in queue:
if my_message.uid == m.uid:
del queue[i]
return
i = i + 1
def dequeueMessage(self, activity_tool, processing_node):
path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
transaction.commit() # Start a new transaction
return 0 # Keep on ticking
activity_tool.invoke(m)
if m.getExecutionState() == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
transaction.commit() # Start a new transaction
return 0 # Keep on ticking
else:
# Start a new transaction and keep on to next message
transaction.commit()
return 1 # Go to sleep
def countMessage(self, activity_tool,path=None,method_id=None,**kw):
tool_path = activity_tool.getPhysicalPath()
count = 0
for m in self.getQueue(tool_path):
add = 1
if path is not None:
object_path = '/'.join(m.object_path)
if object_path != path:
add = 0
if method_id is not None:
if m.method_id != method_id:
add = 0
count += add
return count
def hasActivity(self, activity_tool, object, **kw):
if object is not None:
object_path = object.getPhysicalPath()
else:
object_path = None
active_process = kw.get('active_process', None)
path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
# Filter active process and path if defined
if active_process is None or m.active_process == active_process:
if object_path is None or m.object_path == object_path:
return 1
return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if m.validate(self, activity_tool) is not VALID:
activity_tool.unregisterMessage(self, m) # Trash messages which are not validated (no error handling)
else:
if invoke:
activity_tool.invoke(m)
if m.getExecutionState() == MESSAGE_EXECUTED:
activity_tool.unregisterMessage(self, m)
else:
activity_tool.unregisterMessage(self, m)
# Parse each message in queue
path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
else:
if invoke:
activity_tool.invoke(m)
if m.getExecutionState() == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Only delete if no error happens
else:
self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = []
path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
m.processing_node = 1
m.priority = 0
new_queue.append(m)
return new_queue
registerActivity(RAMQueue)
......@@ -36,7 +36,7 @@ from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import VALIDATION_ERROR_DELAY
from Queue import Queue, VALIDATION_ERROR_DELAY
def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList
......@@ -44,7 +44,7 @@ def sort_message_key(message):
_DequeueMessageException = Exception()
class SQLBase:
class SQLBase(Queue):
"""
Define a set of common methods for SQL-based storage of activities.
"""
......
......@@ -28,7 +28,6 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH
from RAMDict import RAMDict
from Products.CMFActivity.Errors import ActivityFlushError
import sys
#from time import time
......@@ -45,7 +44,7 @@ READ_MESSAGE_LIMIT = 1000
MAX_MESSAGE_LIST_SIZE = 100
class SQLDict(RAMDict, SQLBase):
class SQLDict(SQLBase):
"""
A simple OOBTree based queue. It should be compatible with transactions
and provide sequentiality. Should not create conflict
......@@ -103,21 +102,19 @@ class SQLDict(RAMDict, SQLBase):
if len(uid_list)>0:
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
def finishQueueMessage(self, activity_tool_path, m):
# Nothing to do in SQLDict.
pass
def finishDeleteMessage(self, activity_tool_path, m):
# Nothing to do in SQLDict.
pass
# Registration management
def registerActivityBuffer(self, activity_buffer):
pass
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def isMessageRegistered(self, activity_buffer, activity_tool, m):
return self.generateMessageUID(m) in activity_buffer.getUidSet(self)
def registerMessage(self, activity_buffer, activity_tool, m):
message_list = activity_buffer.getMessageList(self)
message_list.append(m)
uid_set = activity_buffer.getUidSet(self)
uid_set.add(self.generateMessageUID(m))
m.is_registered = 1
def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0 # This prevents from inserting deleted messages into the queue
class_name = self.__class__.__name__
......@@ -158,8 +155,6 @@ class SQLDict(RAMDict, SQLBase):
raise
return uid_list
dequeueMessage = SQLBase.dequeueMessage
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
if hasMessage is not None:
......@@ -244,8 +239,6 @@ class SQLDict(RAMDict, SQLBase):
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[x.uid for x in uid_list])
getMessageList = SQLBase.getMessageList
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
message_list = []
......@@ -395,7 +388,7 @@ class SQLDict(RAMDict, SQLBase):
def getPriority(self, activity_tool):
method = activity_tool.SQLDict_getPriority
default = RAMDict.getPriority(self, activity_tool)
default = SQLBase.getPriority(self, activity_tool)
return self._getPriority(activity_tool, method, default)
registerActivity(SQLDict)
......@@ -27,7 +27,6 @@
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
......@@ -45,7 +44,7 @@ READ_MESSAGE_LIMIT = 1000
MAX_MESSAGE_LIST_SIZE = 100
class SQLQueue(RAMQueue, SQLBase):
class SQLQueue(SQLBase):
"""
A simple OOBTree based queue. It should be compatible with transactions
and provide sequentiality. Should not create conflict
......@@ -94,14 +93,6 @@ class SQLQueue(RAMQueue, SQLBase):
#LOG("prepareDeleteMessage", 0, str(m.__dict__))
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=[m.uid])
def finishQueueMessage(self, activity_tool_path, m):
# Nothing to do in SQLQueue.
pass
def finishDeleteMessage(self, activity_tool_path, m):
# Nothing to do in SQLQueue.
pass
def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
Reserve unreserved messages matching given line.
......@@ -109,8 +100,6 @@ class SQLQueue(RAMQueue, SQLBase):
"""
return ()
dequeueMessage = SQLBase.dequeueMessage
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
if hasMessage is not None:
......@@ -181,8 +170,6 @@ class SQLQueue(RAMQueue, SQLBase):
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[line.uid for line in result])
getMessageList = SQLBase.getMessageList
def countMessage(self, activity_tool, tag=None, path=None,
method_id=None, message_uid=None, **kw):
"""Return the number of messages which match the given parameters.
......@@ -308,7 +295,7 @@ class SQLQueue(RAMQueue, SQLBase):
def getPriority(self, activity_tool):
method = activity_tool.SQLQueue_getPriority
default = RAMQueue.getPriority(self, activity_tool)
default = SQLBase.getPriority(self, activity_tool)
return self._getPriority(activity_tool, method, default)
registerActivity(SQLQueue)
......@@ -552,7 +552,7 @@ class ActivityTool (Folder, UniqueObject):
def initialize(self):
global is_initialized
from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
from Activity import SQLQueue, SQLDict
# Initialize each queue
for activity in activity_dict.itervalues():
activity.initialize(self)
......@@ -965,7 +965,7 @@ class ActivityTool (Folder, UniqueObject):
# Loop as long as there are activities. Always process the queue with
# "highest" priority. If several queues have same highest priority, do
# not choose one that has just been processed.
# This algorithm is fair enough because we actually use only 2 queues.
# This algorithm is fair enough because we only have 2 queues.
# Otherwise, a round-robin of highest-priority queues would be required.
# XXX: We always finish by iterating over all queues, in case that
# getPriority does not see messages dequeueMessage would process.
......@@ -974,7 +974,7 @@ class ActivityTool (Folder, UniqueObject):
return activity.getPriority(self), activity is last
while is_running_lock.acquire(0):
try:
for last in sorted(activity_dict.itervalues(), key=sort_key):
for last in sorted(activity_dict.values(), key=sort_key):
# Transaction processing is the responsability of the activity
if not last.dequeueMessage(inner_self, processing_node):
break
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment