Commit 8251f41e authored by Jérome Perrin's avatar Jérome Perrin

CMFActivity py3

parent a767bf42
...@@ -140,10 +140,10 @@ def sqltest_dict(): ...@@ -140,10 +140,10 @@ def sqltest_dict():
if value is None: # XXX: see comment in SQLBase._getMessageList if value is None: # XXX: see comment in SQLBase._getMessageList
return column + b" IS NULL" return column + b" IS NULL"
for x in value: for x in value:
return b"%s IN (%s)" % (column, str2bytes(', '.join(map( return str2bytes("%s IN (%s)" % (column, ', '.join(map(
str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else
render_datetime if isinstance(x, DateTime) else render_datetime if isinstance(x, DateTime) else
render_string, value)))) lambda v: bytes2str(render_string(v)), value))))
return b"0" return b"0"
sqltest_dict[name] = render sqltest_dict[name] = render
_('active_process_uid') _('active_process_uid')
...@@ -245,7 +245,7 @@ def getNow(db): ...@@ -245,7 +245,7 @@ def getNow(db):
Note that this value is not cached, and is not transactionnal on MySQL Note that this value is not cached, and is not transactionnal on MySQL
side. side.
""" """
return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0] return db.query(b"SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]
class SQLBase(Queue): class SQLBase(Queue):
""" """
...@@ -283,7 +283,7 @@ CREATE TABLE %s ( ...@@ -283,7 +283,7 @@ CREATE TABLE %s (
db = activity_tool.getSQLConnection() db = activity_tool.getSQLConnection()
create = self.createTableSQL() create = self.createTableSQL()
if clear: if clear:
db.query("DROP TABLE IF EXISTS " + self.sql_table) db.query(str2bytes("DROP TABLE IF EXISTS " + self.sql_table))
db.query(create) db.query(create)
else: else:
src = db.upgradeSchema(create, create_if_not_exists=1, src = db.upgradeSchema(create, create_if_not_exists=1,
...@@ -788,7 +788,7 @@ CREATE TABLE %s ( ...@@ -788,7 +788,7 @@ CREATE TABLE %s (
b" %s%s" b" %s%s"
b" ORDER BY priority, date" b" ORDER BY priority, date"
b" LIMIT %i" b" LIMIT %i"
b")" % args).format(*a, *k)) b")" % args).format(*a, **k))
result = Results(query( result = Results(query(
b"SELECT *" b"SELECT *"
b" FROM (%s) AS t" b" FROM (%s) AS t"
...@@ -832,8 +832,8 @@ CREATE TABLE %s ( ...@@ -832,8 +832,8 @@ CREATE TABLE %s (
""" """
Put messages back in given processing_node. Put messages back in given processing_node.
""" """
db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % ( db.query(("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, state, ','.join(map(str, uid_list)))) self.sql_table, state, ','.join(map(str, uid_list)))).encode())
def getProcessableMessageLoader(self, db, processing_node): def getProcessableMessageLoader(self, db, processing_node):
# do not merge anything # do not merge anything
...@@ -1040,16 +1040,16 @@ CREATE TABLE %s ( ...@@ -1040,16 +1040,16 @@ CREATE TABLE %s (
return not message_list return not message_list
def deleteMessageList(self, db, uid_list): def deleteMessageList(self, db, uid_list):
db.query("DELETE FROM %s WHERE uid IN (%s)" % ( db.query(str2bytes("DELETE FROM %s WHERE uid IN (%s)" % (
self.sql_table, ','.join(map(str, uid_list)))) self.sql_table, ','.join(map(str, uid_list)))))
def reactivateMessageList(self, db, uid_list, delay, retry): def reactivateMessageList(self, db, uid_list, delay, retry):
db.query("UPDATE %s SET" db.query(str2bytes("UPDATE %s SET"
" date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)" " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
"%s WHERE uid IN (%s)" % ( "%s WHERE uid IN (%s)" % (
self.sql_table, delay, self.sql_table, delay,
", retry = retry + 1" if retry else "", ", retry = retry + 1" if retry else "",
",".join(map(str, uid_list)))) ",".join(map(str, uid_list)))))
def finalizeMessageExecution(self, activity_tool, message_list, def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None): uid_to_duplicate_uid_list_dict=None):
...@@ -1206,8 +1206,8 @@ CREATE TABLE %s ( ...@@ -1206,8 +1206,8 @@ CREATE TABLE %s (
To simulate time shift, we simply substract delay from To simulate time shift, we simply substract delay from
all dates in message(_queue) table all dates in message(_queue) table
""" """
activity_tool.getSQLConnection().query("UPDATE %s SET" activity_tool.getSQLConnection().query(("UPDATE %s SET"
" date = DATE_SUB(date, INTERVAL %s SECOND)" " date = DATE_SUB(date, INTERVAL %s SECOND)"
% (self.sql_table, delay) % (self.sql_table, delay)
+ ('' if processing_node is None else + ('' if processing_node is None else
"WHERE processing_node=%s" % processing_node)) "WHERE processing_node=%s" % processing_node)).encode())
...@@ -142,10 +142,10 @@ class SQLDict(SQLBase): ...@@ -142,10 +142,10 @@ class SQLDict(SQLBase):
if reserve_uid_list: if reserve_uid_list:
self.assignMessageList(db, processing_node, reserve_uid_list) self.assignMessageList(db, processing_node, reserve_uid_list)
else: else:
db.query("COMMIT") # XXX: useful ? db.query(b"COMMIT") # XXX: useful ?
except: except:
self._log(WARNING, 'Failed to reserve duplicates') self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK") db.query(b"ROLLBACK")
raise raise
if uid_list: if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list) self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
...@@ -619,7 +619,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -619,7 +619,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Monkey patch Queue to induce conflict errors artificially. # Monkey patch Queue to induce conflict errors artificially.
def query(self, query_string,*args, **kw): def query(self, query_string,*args, **kw):
# Not so nice, this is specific to zsql method # Not so nice, this is specific to zsql method
if "REPLACE INTO" in query_string: if b"REPLACE INTO" in query_string:
raise OperationalError raise OperationalError
return self.original_query(query_string,*args, **kw) return self.original_query(query_string,*args, **kw)
...@@ -1026,7 +1026,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1026,7 +1026,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
""" """
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
def delete_volatiles(): def delete_volatiles():
for property_id in activity_tool.__dict__.keys(): for property_id in list(six.iterkeys(activity_tool.__dict__)):
if property_id.startswith('_v_'): if property_id.startswith('_v_'):
delattr(activity_tool, property_id) delattr(activity_tool, property_id)
organisation_module = self.getOrganisationModule() organisation_module = self.getOrganisationModule()
...@@ -1142,6 +1142,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1142,6 +1142,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.flushAllActivities(silent=1, loop_size=100) self.flushAllActivities(silent=1, loop_size=100)
# Check there is a traceback in the email notification # Check there is a traceback in the email notification
sender, recipients, mail = message_list.pop() sender, recipients, mail = message_list.pop()
mail = mail.decode()
self.assertIn("Module %s, line %s, in failingMethod" % ( self.assertIn("Module %s, line %s, in failingMethod" % (
__name__, inspect.getsourcelines(failingMethod)[1]), mail) __name__, inspect.getsourcelines(failingMethod)[1]), mail)
self.assertIn("ValueError:", mail) self.assertIn("ValueError:", mail)
...@@ -1237,7 +1238,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1237,7 +1238,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Check that cmf_activity SQL connection still works # Check that cmf_activity SQL connection still works
connection_da = self.portal.cmf_activity_sql_connection() connection_da = self.portal.cmf_activity_sql_connection()
self.assertFalse(connection_da._registered) self.assertFalse(connection_da._registered)
connection_da.query('select 1') connection_da.query(b'select 1')
self.assertTrue(connection_da._registered) self.assertTrue(connection_da._registered)
self.commit() self.commit()
self.assertFalse(connection_da._registered) self.assertFalse(connection_da._registered)
...@@ -1693,7 +1694,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1693,7 +1694,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# This is a one-shot method, revert after execution # This is a one-shot method, revert after execution
SQLDict.dequeueMessage = original_dequeue SQLDict.dequeueMessage = original_dequeue
result = self.dequeueMessage(activity_tool, processing_node, node_family_id_set) result = self.dequeueMessage(activity_tool, processing_node, node_family_id_set)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive() queue_tic_test_dict['is_alive'] = process_shutdown_thread.is_alive()
return result return result
SQLDict.dequeueMessage = dequeueMessage SQLDict.dequeueMessage = dequeueMessage
Organisation.waitingActivity = waitingActivity Organisation.waitingActivity = waitingActivity
...@@ -1717,7 +1718,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1717,7 +1718,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.tic() activity_tool.tic()
activity_thread = ActivityThread() activity_thread = ActivityThread()
# Do not try to outlive main thread. # Do not try to outlive main thread.
activity_thread.setDaemon(True) activity_thread.daemon = True
# Call process_shutdown in yet another thread because it will wait for # Call process_shutdown in yet another thread because it will wait for
# running activity to complete before returning, and we need to unlock # running activity to complete before returning, and we need to unlock
# activity *after* calling process_shutdown to make sure the next # activity *after* calling process_shutdown to make sure the next
...@@ -1727,7 +1728,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1727,7 +1728,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.process_shutdown(3, 0) activity_tool.process_shutdown(3, 0)
process_shutdown_thread = ProcessShutdownThread() process_shutdown_thread = ProcessShutdownThread()
# Do not try to outlive main thread. # Do not try to outlive main thread.
process_shutdown_thread.setDaemon(True) process_shutdown_thread.daemon = True
activity_thread.start() activity_thread.start()
# Wait at rendez-vous for activity to arrive. # Wait at rendez-vous for activity to arrive.
...@@ -1746,7 +1747,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1746,7 +1747,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(message_list), 1) self.assertEqual(len(message_list), 1)
self.assertEqual(message_list[0].method_id, 'getTitle') self.assertEqual(message_list[0].method_id, 'getTitle')
# Check that process_shutdown_thread was still runing when Queue_tic returned. # Check that process_shutdown_thread was still runing when Queue_tic returned.
self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict)) self.assertTrue(queue_tic_test_dict.get('is_alive'), repr(queue_tic_test_dict))
# Call tic in foreground. This must not lead to activity execution. # Call tic in foreground. This must not lead to activity execution.
activity_tool.tic() activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 1) self.assertEqual(len(activity_tool.getMessageList()), 1)
...@@ -1894,7 +1895,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1894,7 +1895,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
""" """
original_query = six.get_unbound_function(DB.query) original_query = six.get_unbound_function(DB.query)
def query(self, query_string, *args, **kw): def query(self, query_string, *args, **kw):
if query_string.startswith('INSERT'): if query_string.startswith(b'INSERT'):
insert_list.append(len(query_string)) insert_list.append(len(query_string))
if not n: if not n:
raise Skip raise Skip
...@@ -2490,7 +2491,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2490,7 +2491,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(1, activity_tool.countMessage()) self.assertEqual(1, activity_tool.countMessage())
self.flushAllActivities() self.flushAllActivities()
sender, recipients, mail = message_list.pop() sender, recipients, mail = message_list.pop()
self.assertIn('UID mismatch', mail) self.assertIn(b'UID mismatch', mail)
m, = activity_tool.getMessageList() m, = activity_tool.getMessageList()
self.assertEqual(m.processing_node, INVOKE_ERROR_STATE) self.assertEqual(m.processing_node, INVOKE_ERROR_STATE)
obj.flushActivity() obj.flushActivity()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment