Commit dea6b7db authored by Vincent Pelletier's avatar Vincent Pelletier

testCMFActivity: Do not hide actual failure cause.

The main change here is the reduction of try..except scope, so failing the
first assertion does not cause an AttributeError about original_query.
Also, avoid code duplication by just checking final number of messages
outside the try..except block.
Also, improve coding style (spaces around operators and commas).
Also, avoid single-use variable.
Also, fix typo in comment.
parent a0d813b6
......@@ -45,6 +45,8 @@ from zLOG import LOG
from ZODB.POSException import ConflictError
from DateTime import DateTime
from Products.CMFActivity.ActivityTool import Message
from _mysql_exceptions import OperationalError
from Products.ZMySQLDA.db import DB
import gc
import random
import threading
......@@ -616,37 +618,31 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
organisation_module.newContent(id=self.company_id)
o = organisation_module._getOb(self.company_id)
self.commit()
self.flushAllActivities(silent = 1, loop_size = 10)
self.flushAllActivities(silent=1, loop_size=10)
self.assertEqual(len(activity_tool.getMessageList()), 0)
from _mysql_exceptions import OperationalError
# Monkey patch Queue to induce conflict errors artificially.
def query(self, query_string,*args, **kw):
# No so nice, this is specific to zsql method
if query_string.find("REPLACE INTO")>=0:
# Not so nice, this is specific to zsql method
if "REPLACE INTO" in query_string:
raise OperationalError
else:
return self.original_query(query_string,*args, **kw)
from Products.ZMySQLDA.db import DB
return self.original_query(query_string,*args, **kw)
portal = self.getPortal()
# Test some range of conflict error occurences.
organisation_module.recursiveReindexObject()
self.commit()
self.assertTrue(len(activity_tool.getMessageList()), 1)
try:
# Test some range of conflict error occurences.
organisation_module.recursiveReindexObject()
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 1)
DB.original_query = DB.query
DB.query = query
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.commit()
DB.query = DB.original_query
message_list = portal.portal_activities.getMessageList()
self.assertEqual(len(message_list),1)
finally:
DB.query = DB.original_query
del DB.original_query
self.assertEqual(len(portal.portal_activities.getMessageList()), 1)
def checkIsMessageRegisteredMethod(self, activity):
activity_tool = self.getPortal().portal_activities
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment