Commit c4340114 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Allow normal users to invoke any alarm manually, if the alarm is enabled and fixit is false.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@28235 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent d2ad0f61
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
import zope.interface import zope.interface
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from AccessControl import Unauthorized
from Products.CMFCore.utils import getToolByName from Products.CMFCore.utils import getToolByName
from Products.ERP5Type import Permissions, PropertySheet from Products.ERP5Type import Permissions, PropertySheet
from Products.ERP5Type.XMLObject import XMLObject from Products.ERP5Type.XMLObject import XMLObject
...@@ -36,6 +37,9 @@ from DateTime import DateTime ...@@ -36,6 +37,9 @@ from DateTime import DateTime
from Products.ERP5Type.Message import Message from Products.ERP5Type.Message import Message
from Products.ERP5Type.DateUtils import addToDate from Products.ERP5Type.DateUtils import addToDate
from Products.CMFCore.PortalContent import _getViewFor from Products.CMFCore.PortalContent import _getViewFor
from Products.ERP5Security.ERP5UserManager import SUPER_USER
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager
class PeriodicityMixin: class PeriodicityMixin:
""" """
...@@ -280,7 +284,7 @@ class Alarm(XMLObject, PeriodicityMixin): ...@@ -280,7 +284,7 @@ class Alarm(XMLObject, PeriodicityMixin):
""" """
return self.hasActivity(only_valid=1) return self.hasActivity(only_valid=1)
security.declareProtected(Permissions.ManagePortal, 'activeSense') security.declareProtected(Permissions.AccessContentsInformation, 'activeSense')
def activeSense(self, fixit=0): def activeSense(self, fixit=0):
""" """
This method launches the sensing process as activities. This method launches the sensing process as activities.
...@@ -291,35 +295,47 @@ class Alarm(XMLObject, PeriodicityMixin): ...@@ -291,35 +295,47 @@ class Alarm(XMLObject, PeriodicityMixin):
The result of the sensing process can be obtained by invoking The result of the sensing process can be obtained by invoking
the sense method or by requesting a report. the sense method or by requesting a report.
""" """
# LOG('activeSense, self.getPath()',0,self.getPath()) portal_membership = self.getPortalObject().portal_membership
if fixit or not self.getEnabled():
# Set the next date at which this method should be invoked checkPermission = portal_membership.checkPermission
self.setNextAlarmDate() if not checkPermission(Permissions.ManagePortal, self):
raise Unauthorized('fixing problems or activating a disabled alarm is not allowed')
# Find the active sensing method and invoke it
# as an activity so that we can benefit from # Switch to the superuser temporarily, so that the behavior would not
# distribution of alarm processing as soon as possible # change even if this method is invoked by random users.
method_id = self.getActiveSenseMethodId() sm = getSecurityManager()
if method_id not in (None, ''): newSecurityManager(None, portal_membership.getMemberById(SUPER_USER))
# A tag is provided as a parameter in order to be try:
# able to notify the user after all processes are ended # Set the next date at which this method should be invoked
# Tag is generated from portal_ids so that it can be retrieved self.setNextAlarmDate()
# later when creating an active process for example
# We do some inspection to keep compatibility # Find the active sensing method and invoke it
# (because fixit and tag were not set previously) # as an activity so that we can benefit from
tag = str(self.portal_ids.generateNewLengthId(id_group=self.getId())) # distribution of alarm processing as soon as possible
kw = {} method_id = self.getActiveSenseMethodId()
method = getattr(self, method_id) if method_id not in (None, ''):
name_list = method.func_code.co_varnames # A tag is provided as a parameter in order to be
if 'fixit' in name_list or (method.func_defaults is not None # able to notify the user after all processes are ended
and len(method.func_defaults) < len(name_list)): # Tag is generated from portal_ids so that it can be retrieved
# New API - also if variable number of named parameters # later when creating an active process for example
getattr(self.activate(tag=tag), method_id)(fixit=fixit, tag=tag) # We do some inspection to keep compatibility
else: # (because fixit and tag were not set previously)
# Old API tag = str(self.portal_ids.generateNewLengthId(id_group=self.getId()))
getattr(self.activate(tag=tag), method_id)() kw = {}
if self.isAlarmNotificationMode(): method = getattr(self, method_id)
self.activate(after_tag=tag).notify(include_active=True) name_list = method.func_code.co_varnames
if 'fixit' in name_list or (method.func_defaults is not None
and len(method.func_defaults) < len(name_list)):
# New API - also if variable number of named parameters
getattr(self.activate(tag=tag), method_id)(fixit=fixit, tag=tag)
else:
# Old API
getattr(self.activate(tag=tag), method_id)()
if self.isAlarmNotificationMode():
self.activate(after_tag=tag).notify(include_active=True)
finally:
# Restore the original user.
setSecurityManager(sm)
security.declareProtected(Permissions.ManagePortal, 'sense') security.declareProtected(Permissions.ManagePortal, 'sense')
def sense(self, process=None): def sense(self, process=None):
......
...@@ -31,7 +31,9 @@ import transaction ...@@ -31,7 +31,9 @@ import transaction
from Testing import ZopeTestCase from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager, \
getSecurityManager, setSecurityManager
from AccessControl import Unauthorized
from DateTime import DateTime from DateTime import DateTime
from zLOG import LOG from zLOG import LOG
from Products.ERP5Type.DateUtils import addToDate from Products.ERP5Type.DateUtils import addToDate
...@@ -591,7 +593,100 @@ class TestAlarm(ERP5TypeTestCase): ...@@ -591,7 +593,100 @@ class TestAlarm(ERP5TypeTestCase):
else: else:
raise AssertionError, m.method_id raise AssertionError, m.method_id
def test_19_ManualInvocation(self, quiet=0, run=run_all_test):
"""
test if an alarm can be invoked directly by the user securely,
and if the results are identical when allowed.
"""
if not run: return
if not quiet:
message = 'Test manual invocation'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ', 0, message)
alarm = self.newAlarm()
# Create script that generate active process
sense_method_id = 'Alarm_setBogusLocalProperty'
skin_folder_id = 'custom'
skin_folder = self.getPortal().portal_skins[skin_folder_id]
skin_folder.manage_addProduct['PythonScripts']\
.manage_addPythonScript(id=sense_method_id)
skin_folder[sense_method_id].ZPythonScript_edit('*args,**kw',
'context.setProperty("bogus", str(context.showPermissions()))')
# update alarm properties
alarm.edit(active_sense_method_id=sense_method_id,
enabled=False)
transaction.commit()
self.tic()
# Make a normal user.
uf = self.getPortal().acl_users
uf._doAddUser('normal', '', ['Member', 'Auditor'], [])
user = uf.getUserById('normal').__of__(uf)
# Check the pre-conditions.
self.assertEquals(alarm.getProperty('bogus', None), None)
self.assertEquals(alarm.getEnabled(), False)
sm = getSecurityManager()
newSecurityManager(None, user)
# Non-managers must not be able to invoke a disabled alarm.
self.assertRaises(Unauthorized, alarm.activeSense)
self.assertRaises(Unauthorized, alarm.activeSense, fixit=1)
# Non-managers must not be able to invoke the automatic fixation.
setSecurityManager(sm)
alarm.setEnabled(True)
self.assertEquals(alarm.getEnabled(), True)
newSecurityManager(None, user)
self.assertRaises(Unauthorized, alarm.activeSense, fixit=1)
# Now, check that everybody can invoke an enabled alarm manually.
setSecurityManager(sm)
correct_answer = str(alarm.showPermissions())
self.assertNotEquals(correct_answer, None)
alarm.activeSense()
transaction.commit()
self.tic()
self.assertEquals(alarm.getProperty('bogus', None), correct_answer)
alarm.setProperty('bogus', None)
self.assertEquals(alarm.getProperty('bogus', None), None)
newSecurityManager(None, user)
alarm.activeSense()
transaction.commit()
self.tic()
self.assertEquals(alarm.getProperty('bogus', None), correct_answer)
setSecurityManager(sm)
alarm.setProperty('bogus', None)
# Check that Manager can invoke an alarm freely.
alarm.activeSense(fixit=1)
transaction.commit()
self.tic()
self.assertEquals(alarm.getProperty('bogus', None), correct_answer)
alarm.setProperty('bogus', None)
self.assertEquals(alarm.getProperty('bogus', None), None)
alarm.setEnabled(False)
self.assertEquals(alarm.getEnabled(), False)
alarm.activeSense()
transaction.commit()
self.tic()
self.assertEquals(alarm.getProperty('bogus', None), correct_answer)
alarm.setProperty('bogus', None)
self.assertEquals(alarm.getProperty('bogus', None), None)
alarm.activeSense(fixit=1)
transaction.commit()
self.tic()
self.assertEquals(alarm.getProperty('bogus', None), correct_answer)
alarm.setProperty('bogus', None)
self.assertEquals(alarm.getProperty('bogus', None), None)
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment