Commit 616f19cc authored by Jean-Paul Smets's avatar Jean-Paul Smets

new transaction system

new API for distribution of load


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@317 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 05f7e3e6
......@@ -67,8 +67,8 @@ class Queue:
#scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']
def __init__(self):
self.is_alive = 1
self.is_awake = 0
self.is_alive = {}
self.is_awake = {}
self.is_initialized = 0
def initialize(self, activity_tool):
......@@ -82,23 +82,26 @@ class Queue:
def queueMessage(self, activity_tool, m):
pass
def dequeueMessage(self, activity_tool):
def dequeueMessage(self, activity_tool, processing_node):
pass
def tic(self, activity_tool):
# Tic should return quickly
if self.dequeueMessage(activity_tool):
self.sleep(activity_tool)
def tic(self, activity_tool, processing_node):
# Tic should return quickly to prevent locks or commit transactions at some point
if self.dequeueMessage(activity_tool, processing_node):
self.sleep(activity_tool, processing_node)
def sleep(self, activity_tool):
self.is_awake = 0
def distribute(self, activity_tool, node_count):
pass
def sleep(self, activity_tool, processing_node):
self.is_awake[processing_node] = 0
def wakeup(self, activity_tool):
self.is_awake = 1
def wakeup(self, activity_tool, processing_node):
self.is_awake[processing_node] = 1
def terminate(self, activity_tool):
self.is_awake = 0
self.is_alive = 0
def terminate(self, activity_tool, processing_node):
self.is_awake[processing_node] = 0
self.is_alive[processing_node] = 0
def validate(self, activity_tool, message, wait_for=None, **kw):
try:
......@@ -117,8 +120,8 @@ class Queue:
return 0
return 1
def isAwake(self, activity_tool):
return self.is_awake
def isAwake(self, activity_tool, processing_node):
return self.is_awake[processing_node]
def hasActivity(self, activity_tool, object, **kw):
return 0
......@@ -132,5 +135,5 @@ class Queue:
def dumpMessage(self, m):
return pickle.dumps(m)
def getMessageList(self, activity_tool):
def getMessageList(self, activity_tool, processing_node=None):
return []
......@@ -47,7 +47,7 @@ class RAMDict(Queue):
def queueMessage(self, m):
self.dict[(m.object_path, m.method_id)] = m
def dequeueMessage(self, activity_tool):
def dequeueMessage(self, activity_tool, processing_node):
if len(self.dict.keys()) is 0:
return 1 # Go to sleep
for key, m in self.dict.items():
......@@ -74,7 +74,7 @@ class RAMDict(Queue):
pass
#LOG('CMFActivity RAMDict: ', 0, 'not flushing object %s' % '/'.join(m.object_path))
def getMessageList(self, activity_tool):
def getMessageList(self, activity_tool, processing_node=None):
return self.dict.values()
registerActivity(RAMDict)
......@@ -41,7 +41,7 @@ class RAMQueue(Queue):
def queueMessage(self, m):
self.queue.append(m)
def dequeueMessage(self, activity_tool):
def dequeueMessage(self, activity_tool, processing_node):
if len(self.queue) is 0:
return 1 # Go to sleep
m = self.queue[0]
......@@ -66,7 +66,7 @@ class RAMQueue(Queue):
new_queue.append(m)
self.queue = new_queue
def getMessageList(self, activity_tool):
def getMessageList(self, activity_tool, processing_node=None):
return self.queue
registerActivity(RAMQueue)
......@@ -31,6 +31,10 @@ from RAMDict import RAMDict
from zLOG import LOG
DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3
class SQLDict(RAMDict):
"""
A simple OOBTree based queue. It should be compatible with transactions
......@@ -41,21 +45,29 @@ class SQLDict(RAMDict):
def queueMessage(self, activity_tool, m):
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , method_id = m.method_id, message = self.dumpMessage(m))
def dequeueMessage(self, activity_tool):
#activity_tool.SQLDict_lockMessage() # Too slow...
result = activity_tool.SQLDict_readMessage()
def dequeueMessage(self, activity_tool, processing_node):
result = activity_tool.SQLDict_readMessage(processing_node=processing_node)
get_transaction().commit() # Release locks before starting a potentially long calculation
if len(result) > 0:
line = result[0]
path = line.path
method_id = line.method_id
activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node=1)
#activity_tool.SQLDict_unlockMessage() # Too slow...
m = self.loadMessage(line.message)
if m.validate(self, activity_tool):
activity_tool.invoke(m)
activity_tool.SQLDict_delMessage(path=path, method_id=method_id)
activity_tool.invoke(m) # Try to invoke the message
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node) # Delete it
get_transaction().commit() # If successful, commit
else:
get_transaction().abort() # If not, abort transaction and start a new one
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
get_transaction().commit() # and commit
else:
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state
get_transaction().commit() # and commit
return 0
#activity_tool.SQLDict_unlockMessage()
return 1
def hasActivity(self, activity_tool, object, method_id=None, **kw):
......@@ -65,13 +77,20 @@ class SQLDict(RAMDict):
return result[0].message_count > 0
return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
"""
object_path is a tuple
commit allows to choose mode
- if we commit, then we make sure no locks are taken for too long
- if we do not commit, then we can use flush in a larger transaction
commit should in general not be used
"""
path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id)))
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id)
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
if commit: get_transaction().commit() # Release locks before starting a potentially long calculation
method_dict = {}
if invoke:
for line in result:
......@@ -82,15 +101,39 @@ class SQLDict(RAMDict):
method_dict[method_id] = 1
m = self.loadMessage(line.message)
if m.validate(self, activity_tool):
activity_tool.invoke(m)
activity_tool.SQLDict_delMessage(path=path, method_id=method_id)
activity_tool.invoke(m) # Try to invoke the message
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node) # Delete it
if commit: get_transaction().commit() # If successful, commit
else:
if commit: get_transaction().abort() # If not, abort transaction and start a new one
else:
activity_tool.SQLDict_delMessage(path=path, method_id=method_id) # Delete all
if commit: get_transaction().abort() # Commit flush
def getMessageList(self, activity_tool):
def getMessageList(self, activity_tool, processing_node=None):
message_list = []
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None)
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None)
for line in result:
m = self.loadMessage(line.message)
m.processing_node = line.processing_node
message_list.append(m)
return message_list
def distribute(self, activity_tool, node_count):
processing_node = 1
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {}
for line in result:
path = line.path
if not path_dict.has_key(path):
# Only assign once (it would be different for a queue)
path_dict[path] = 1
activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node)
get_transaction().commit() # Release locks immediately to allow processing of messages
processing_node = processing_node + 1
if processing_node > node_count:
processing_node = 1 # Round robin
registerActivity(SQLDict)
......@@ -27,41 +27,70 @@
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity
from Queue import Queue
import pickle
from SQLDict import SQLDict
from zLOG import LOG
class SQLQueue(Queue):
class SQLQueue(SQLDict):
"""
A simple RAM based queue
A simple OOBTree based queue. It should be compatible with transactions
and provide sequentiality. Should not create conflict
because use of OOBTree.
"""
def initialize(self, activity_tool):
# This is the only moment when
# we can set some global variables related
# to the ZODB context
if not self.is_initialized:
try:
self.activity_tool = activity_tool
self.sqlWriteMessage = activity_tool.SQLQueue_writeMessage
self.sqlReadMessage = activity_tool.SQLQueue_readMessage
self.sqlDelMessage = activity_tool.SQLQueue_delMessage
self.sqlHasMessage = activity_tool.SQLQueue_hasMessage
self.is_initialized = 1
except:
LOG('ERROR SQLQueue', 100, 'could not initialize SQL methods')
def queueMessage(self, activity_tool, m):
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , method_id = m.method_id, message = self.dumpMessage(m))
def queueMessage(self, m):
self.sqlWriteMessage(uid = m.object.uid , method_id = m.method_id, message = self.dumpMessage(m))
def dequeueMessage(self, activity_tool, processing_node):
#activity_tool.SQLDict_lockMessage() # Too slow...
result = activity_tool.SQLDict_readMessage()
if len(result) > 0:
line = result[0]
path = line.path
method_id = line.method_id
activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node=1)
#activity_tool.SQLDict_unlockMessage() # Too slow...
m = self.loadMessage(line.message)
if m.validate(self, activity_tool):
activity_tool.invoke(m)
activity_tool.SQLDict_delMessage(message_id = 222) # We will need a message_id
return 0
#activity_tool.SQLDict_unlockMessage()
return 1
def dequeueMessage(self, activity_tool):
return 1 # sleep
m = self.loadMessage(message)
activity_tool.invoke(m)
self.sqlDelMessage(uid = m.object.uid , method_id = m.method_id)
def hasActivity(self, activity_tool, object, method_id=None, **kw):
my_object_path = '/'.join(object.getPhysicalPath())
result = activity_tool.SQLDict_hasMessage(path=my_object_path, method_id=method_id)
if len(result) > 0:
return result[0].message_count > 0
return 0
def hasActivity(self, object):
return self.sqlHasMessage(uid = object.uid).has_activity
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
"""
object_path is a tuple
"""
path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id)))
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id)
method_dict = {}
if invoke:
for line in result:
path = line.path
method_id = line.method_id
if not method_dict.has_key(method_id):
# Only invoke once (it would be different for a queue)
method_dict[method_id] = 1
m = self.loadMessage(line.message)
if m.validate(self, activity_tool):
activity_tool.invoke(m)
activity_tool.SQLDict_delMessage(path=path, method_id=method_id)
registerActivity(SQLQueue)
\ No newline at end of file
def getMessageList(self, activity_tool, processing_node=None):
message_list = []
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None)
for line in result:
m = self.loadMessage(line.message)
message_list.append(m)
return message_list
registerActivity(SQLQueue)
......@@ -60,7 +60,7 @@ class ZODBDict(RAMDict):
message_dict[m.object_path][m.method_id] = m
activable_set.insert(m.object_path) # Add to set
def dequeueMessage(self, activity_tool):
def dequeueMessage(self, activity_tool, processing_node):
message_dict = activity_tool.activity_data.message_dict
activable_set = activity_tool.activity_data.activable_set
# We never erase BTree items a this point
......@@ -105,7 +105,7 @@ class ZODBDict(RAMDict):
if invoke: activity_tool.invoke(m)
del object_dict[key]
def getMessageList(self, activity_tool):
def getMessageList(self, activity_tool, processing_node=None):
message_dict = activity_tool.activity_data.message_dict
activable_set = activity_tool.activity_data.activable_set
result = []
......
......@@ -71,7 +71,7 @@ class Message:
self.method_id = method_id
self.args = args
self.kw = kw
self.__is_executed = 0
self.is_executed = 0
# User Info ? REQUEST Info ?
def __call__(self, activity_tool):
......@@ -85,11 +85,11 @@ class Message:
if REQUEST.active_process is not None:
active_process = activity_tool.getActiveProcess()
active_process.activateResult(result) # XXX Allow other method_id in future
self.__is_executed = 1
self.is_executed = 1
except:
self.is_executed = 0
LOG('WARNING ActivityTool', 0,
'Could not call method %s on object %s' % (self.method_id, self.object_path))
self.__is_executed = 1
def validate(self, activity, activity_tool):
return activity.validate(activity_tool, self, **self.activity_kw)
......@@ -164,10 +164,28 @@ class ActivityTool (Folder, UniqueObject):
activity.initialize(self)
is_initialized = 1
security.declarePublic('distribute')
def distribute(self, node_count=1):
"""
Distribute load
"""
# Initialize if needed
if not is_initialized: self.initialize()
# Call distribute on each queue
for activity in activity_list:
#try:
if 1:
activity.distribute(self, node_count)
#except:
else:
LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity)
security.declarePublic('tic')
def tic(self, force=0):
def tic(self, processing_node=1, force=0):
"""
Starts again an activity
processing_node starts from 1 (there is not node 0)
"""
global active_threads, is_initialized
......@@ -189,7 +207,7 @@ class ActivityTool (Folder, UniqueObject):
# Wakeup each queue
for activity in activity_list:
try:
activity.wakeup(self)
activity.wakeup(self, processing_node)
except:
LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
......@@ -198,12 +216,12 @@ class ActivityTool (Folder, UniqueObject):
while has_awake_activity:
has_awake_activity = 0
for activity in activity_list:
try:
#if 1:
activity.tic(self)
get_transaction().commit()
has_awake_activity = has_awake_activity or activity.isAwake(self)
except:
#try:
if 1:
activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
#except:
else:
LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity)
# decrease the number of active_threads
......@@ -267,6 +285,9 @@ class ActivityTool (Folder, UniqueObject):
"""
List messages waiting in queues
"""
# Initialize if needed
if not is_initialized: self.initialize()
message_list = []
for activity in activity_list:
message_list += activity.getMessageList(self)
......
......@@ -37,6 +37,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<th align="left" valign="top">Method Id</th>
<th align="left" valign="top">Arguments</th>
<th align="left" valign="top">Named Parameters</th>
<th align="left" valign="top">Processing Node</th>
</tr>
<dtml-in getMessageList>
<tr>
......@@ -45,6 +46,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<td align="left" valign="top"><dtml-var method_id></td>
<td align="left" valign="top"><dtml-var "','.join(args)"></td>
<td align="left" valign="top"><dtml-var "_.str(kw)[1:-1]"></td>
<td align="left" valign="top"><dtml-var processing_node></td>
</tr>
</dtml-in>
</table>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment