Commit e5b116e3 authored by Jean-Paul Smets's avatar Jean-Paul Smets

Initial tests passed


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@663 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent f17a5136
...@@ -76,7 +76,6 @@ class Queue: ...@@ -76,7 +76,6 @@ class Queue:
# we can set some global variables related # we can set some global variables related
# to the ZODB context # to the ZODB context
if not self.is_initialized: if not self.is_initialized:
self.activity_tool = activity_tool
self.is_initialized = 1 self.is_initialized = 1
def queueMessage(self, activity_tool, m): def queueMessage(self, activity_tool, m):
...@@ -169,17 +168,14 @@ class Queue: ...@@ -169,17 +168,14 @@ class Queue:
# Registration Management # Registration Management
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
if not hasattr(activity_buffer, '_%s_message_list' % class_name):
setattr(activity_buffer, '_%s_message_list' % class_name, []) setattr(activity_buffer, '_%s_message_list' % class_name, [])
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
return m in getattr(activity_buffer, '_%s_message_list' % class_name) return m in getattr(activity_buffer, '_%s_message_list' % class_name)
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
getattr(activity_buffer, '_%s_message_list' % class_name).append(m) getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
m.is_registered = 1 m.is_registered = 1
......
...@@ -56,18 +56,15 @@ class RAMDict(Queue): ...@@ -56,18 +56,15 @@ class RAMDict(Queue):
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
if not hasattr(activity_buffer, '_%s_message_list' % class_name):
setattr(activity_buffer, '_%s_message_list' % class_name, []) setattr(activity_buffer, '_%s_message_list' % class_name, [])
setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((m.object_path, m.method_id)) return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((m.object_path, m.method_id))
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
getattr(activity_buffer, '_%s_message_list' % class_name).append(m) getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
getattr(activity_buffer, '_%s_uid_dict' % class_name)[(m.object_path, m.method_id)] = 1 getattr(activity_buffer, '_%s_uid_dict' % class_name)[(m.object_path, m.method_id)] = 1
m.is_registered = 1 m.is_registered = 1
...@@ -96,7 +93,7 @@ class RAMDict(Queue): ...@@ -96,7 +93,7 @@ class RAMDict(Queue):
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
self.unregisterMessage(m) activity_tool.unregisterMessage(self, m)
if not method_dict.has_key(method_id): if not method_dict.has_key(method_id):
if invoke: if invoke:
# First Validate # First Validate
......
...@@ -74,7 +74,7 @@ class RAMQueue(Queue): ...@@ -74,7 +74,7 @@ class RAMQueue(Queue):
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke: activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
self.unregisterMessage(m) activity_tool.unregisterMessage(self, m)
# Parse each message in queue # Parse each message in queue
for m in self.queue: for m in self.queue:
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
......
...@@ -68,22 +68,18 @@ class SQLDict(RAMDict): ...@@ -68,22 +68,18 @@ class SQLDict(RAMDict):
# Registration management # Registration management
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
if not hasattr(activity_buffer, '_sqldict_uid_dict'):
activity_buffer._sqldict_uid_dict = {} activity_buffer._sqldict_uid_dict = {}
activity_buffer._sqldict_message_list = [] activity_buffer._sqldict_message_list = []
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
self.registerActivityBuffer(activity_buffer)
return activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)) return activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id))
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
self.registerActivityBuffer(activity_buffer)
m.is_registered = 1 m.is_registered = 1
activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] = 1 activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] = 1
activity_buffer._sqldict_message_list.append(m) activity_buffer._sqldict_message_list.append(m)
def unregisterMessage(self, activity_buffer, activity_tool, m): def unregisterMessage(self, activity_buffer, activity_tool, m):
self.registerActivityBuffer(activity_buffer)
m.is_registered = 0 # This prevents from inserting deleted messages into the queue m.is_registered = 0 # This prevents from inserting deleted messages into the queue
if activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)): if activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)):
del activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] del activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)]
...@@ -182,7 +178,7 @@ class SQLDict(RAMDict): ...@@ -182,7 +178,7 @@ class SQLDict(RAMDict):
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
self.unregisterMessage(m) activity_tool.unregisterMessage(self, m)
if not method_dict.has_key(method_id): if not method_dict.has_key(method_id):
if invoke: if invoke:
# First Validate # First Validate
......
...@@ -136,7 +136,7 @@ class SQLQueue(RAMQueue): ...@@ -136,7 +136,7 @@ class SQLQueue(RAMQueue):
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke: activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
self.unregisterMessage(m) activity_tool.unregisterMessage(self, m)
# Parse each message in SQL queue # Parse each message in SQL queue
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None)
......
...@@ -42,12 +42,15 @@ class ActivityBuffer(TM): ...@@ -42,12 +42,15 @@ class ActivityBuffer(TM):
# at end of transaction # at end of transaction
def _begin(self, *ignored): def _begin(self, *ignored):
from thread import get_ident from thread import get_ident
from ActivityTool import activity_list
self._tlock.acquire() self._tlock.acquire()
self._tthread = get_ident() self._tthread = get_ident()
self.requires_prepare = 1 self.requires_prepare = 1
try: try:
self.queued_activity = [] self.queued_activity = []
self.flushed_activity = [] self.flushed_activity = []
for activity in activity_list: # Reset registration for each transaction
activity.registerActivityBuffer(self)
except: except:
LOG('ActivityBuffer', ERROR, "exception during _begin", LOG('ActivityBuffer', ERROR, "exception during _begin",
error=sys.exc_info()) error=sys.exc_info())
......
...@@ -316,7 +316,10 @@ class ActivityTool (Folder, UniqueObject): ...@@ -316,7 +316,10 @@ class ActivityTool (Folder, UniqueObject):
self._v_activity_buffer.deferredDeleteMessage(self, activity, message) self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
def getRegisteredMessageList(self, activity): def getRegisteredMessageList(self, activity):
return activity.getRegisteredMessageList(self, self._v_activity_buffer) return activity.getRegisteredMessageList(self._v_activity_buffer, self)
def unregisterMessage(self, activity, message):
return activity.unregisterMessage(self._v_activity_buffer, self, message)
def flush(self, object, invoke=0, **kw): def flush(self, object, invoke=0, **kw):
global is_initialized global is_initialized
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment