Commit bc56f502 authored by Jean-Paul Smets's avatar Jean-Paul Smets

Added support for TimerService if available. Added ZMI user interface for load balancing.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@4178 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0e789c37
No related merge requests found
......@@ -26,12 +26,15 @@
#
##############################################################################
import socket, asyncore, urllib
from Products.CMFCore import CMFCorePermissions
from Products.ERP5Type.Document.Folder import Folder
from Products.ERP5Type.Utils import getPath
from Products.ERP5Type.Error import Error
from Products.PythonScripts.Utility import allow_class
from AccessControl import ClassSecurityInfo
from App.ApplicationManager import ApplicationManager
from AccessControl import ClassSecurityInfo, Permissions
from AccessControl.SecurityManagement import newSecurityManager
from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthenticatedUser
from Globals import InitializeClass, DTMLFile, get_request
from Acquisition import aq_base
......@@ -44,7 +47,13 @@ import sys
from ZODB.POSException import ConflictError
from OFS.Traversable import NotFound
from zLOG import LOG
from zLOG import LOG, INFO
try:
from Products.TimerService import getTimerService
except ImportError:
def getTimerService():
pass
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
......@@ -218,9 +227,13 @@ class ActivityTool (Folder, UniqueObject):
allowed_types = ( 'CMF Active Process', )
security = ClassSecurityInfo()
_distributingNode = ''
_nodes = ()
manage_options = tuple(
[ { 'label' : 'Overview', 'action' : 'manage_overview' }
, { 'label' : 'Activities', 'action' : 'manageActivities' }
, { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
,
] + list(Folder.manage_options))
......@@ -230,6 +243,12 @@ class ActivityTool (Folder, UniqueObject):
security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
distributingNode = ''
_nodes = ()
def __init__(self):
return Folder.__init__(self, ActivityTool.id)
......@@ -251,6 +270,138 @@ class ActivityTool (Folder, UniqueObject):
activity.initialize(self)
is_initialized = 1
security.declareProtected(Permissions.manage_properties, 'subscribe')
def subscribe(self):
""" subscribe to the global Timer Service """
service = getTimerService(self)
if not service:
raise ValueError, "Can't find event service!"
service.subscribe(self)
return "Subscribed to Timer Service"
security.declareProtected(Permissions.manage_properties, 'unsubscribe')
def unsubscribe(self):
""" unsubscribe from the global Timer Service """
service = getTimerService(self)
if not service:
raise ValueError, "Can't find event service!"
service.unsubscribe(self)
return "Usubscribed from Timer Service"
def manage_beforeDelete(self, item, container):
self.unsubscribe()
Folder.manage_beforeDelete(self, item, container)
def manage_afterAdd(self, item, container):
self.subscribe()
Folder.manage_afterAdd(self, item, container)
def getCurrentNode(self):
""" Return current node in form ip:port """
port = ''
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'port'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
port = v.port
break
ip = socket.gethostbyname(socket.gethostname())
currentNode = '%s:%s' %(ip, port)
return currentNode
security.declarePublic('getDistributingNode')
def getDistributingNode(self):
""" Return the distributingNode """
return self.distributingNode
security.declarePublic('getNodeList')
def getNodes(self):
""" Return all nodes """
return self._nodes
security.declarePublic('manage_setDistributingNode')
def manage_setDistributingNode(self, distributingNode, REQUEST=None):
""" set the distributing node """
self.distributingNode = distributingNode
if REQUEST is not None:
REQUEST.RESPONSE.redirect(
REQUEST.URL1 +
'/manageLoadBalancing?manage_tabs_message=' +
urllib.quote("Distributing Node successfully changed."))
security.declarePublic('manage_addNode')
def manage_addNode(self, node, REQUEST=None):
""" add a node """
if node in self._nodes:
if REQUEST is not None:
REQUEST.RESPONSE.redirect(
REQUEST.URL1 +
'/manageLoadBalancing?manage_tabs_message=' +
urllib.quote("Node exists already."))
return
self._nodes = self._nodes + (node,)
if REQUEST is not None:
REQUEST.RESPONSE.redirect(
REQUEST.URL1 +
'/manageLoadBalancing?manage_tabs_message=' +
urllib.quote("Node successfully added."))
security.declarePublic('manage_delNode')
def manage_delNode(self, deleteNodes, REQUEST=None):
""" delete nodes """
nodeList = list(self._nodes)
for node in deleteNodes:
if node in self._nodes:
nodeList.remove(node)
self._nodes = tuple(nodeList)
if REQUEST is not None:
REQUEST.RESPONSE.redirect(
REQUEST.URL1 +
'/manageLoadBalancing?manage_tabs_message=' +
urllib.quote("Node(s) successfully deleted."))
def process_timer(self, tick, interval):
"""
Call distribute() if we are the Distributing Node and call tic()
with our node number.
This method is called by TimerService in the interval given
in zope.conf. The Default is every 5 seconds.
"""
# get owner of portal_catalog, so normally we should be able to
# have the permission to invoke all activities
user = self.portal_catalog.getOwner()
newSecurityManager(self.REQUEST, user)
currentNode = self.getCurrentNode()
# only distribute when we are the distributingNode or if it's empty
if (self.distributingNode == self.getCurrentNode()):
self.distribute(len(self._nodes))
#LOG('CMFActivity:', INFO, 'self.distribute(node_count=%s)' %len(self._nodes))
elif not self.distributingNode:
self.distribute(1)
#LOG('CMFActivity:', INFO, 'distributingNodes empty! Calling distribute(1)')
# call tic for the current processing_node
# the processing_node numbers are the indices of the elements in the node tuple +1
# because processing_node starts form 1
if currentNode in self._nodes:
self.tic(list(self._nodes).index(currentNode)+1)
#LOG('CMFActivity:', INFO, 'self.tic(processing_node=%s)' %str(list(self._nodes).index(currentNode)+1))
elif len(self._nodes) == 0:
self.tic(1)
#LOG('CMFActivity:', INFO, 'Node List is empty! Calling tic(1)')
security.declarePublic('distribute')
def distribute(self, node_count=1):
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment