Commit a2606bc7 authored by Vincent Pelletier's avatar Vincent Pelletier

Store existing nodes as a BTree:

 - limits conflict errors (the list is automaticaly modified, though rarely)
 - allows to store key, value pairs
Cache processingNode computation result.
Add accessors to node btree.
Remove manage_addNode as it's now automaticaly done.
Update iser interface and add new required methods.
Automaticaly register self as an available processing node when process_timer is called.
Use distributing node accessor instead of accessing the value directly.
Empty processing node list and no defined distributing node means now that the feature is disabled, not that every node distributes and process.
Display the ip:port value which identifies current node on the user interface.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@16940 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 098eeed3
...@@ -49,6 +49,7 @@ from Acquisition import aq_inner ...@@ -49,6 +49,7 @@ from Acquisition import aq_inner
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from ActivityBuffer import ActivityBuffer from ActivityBuffer import ActivityBuffer
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
from BTrees.OIBTree import OIBTree
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from Products.MailHost.MailHost import MailHostError from Products.MailHost.MailHost import MailHostError
...@@ -72,6 +73,9 @@ is_initialized = 0 ...@@ -72,6 +73,9 @@ is_initialized = 0
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
first_run = 1 first_run = 1
currentNode = None
ROLE_IDLE = 0
ROLE_PROCESSING = 1
# Activity Registration # Activity Registration
activity_dict = {} activity_dict = {}
...@@ -301,9 +305,6 @@ class ActivityTool (Folder, UniqueObject): ...@@ -301,9 +305,6 @@ class ActivityTool (Folder, UniqueObject):
allowed_types = ( 'CMF Active Process', ) allowed_types = ( 'CMF Active Process', )
security = ClassSecurityInfo() security = ClassSecurityInfo()
_distributingNode = ''
_nodes = ()
manage_options = tuple( manage_options = tuple(
[ { 'label' : 'Overview', 'action' : 'manage_overview' } [ { 'label' : 'Overview', 'action' : 'manage_overview' }
, { 'label' : 'Activities', 'action' : 'manageActivities' } , { 'label' : 'Activities', 'action' : 'manageActivities' }
...@@ -402,17 +403,19 @@ class ActivityTool (Folder, UniqueObject): ...@@ -402,17 +403,19 @@ class ActivityTool (Folder, UniqueObject):
def getCurrentNode(self): def getCurrentNode(self):
""" Return current node in form ip:port """ """ Return current node in form ip:port """
port = '' global currentNode
from asyncore import socket_map if currentNode is None:
for k, v in socket_map.items(): port = ''
if hasattr(v, 'port'): from asyncore import socket_map
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self) for k, v in socket_map.items():
type = str(getattr(v, '__class__', 'unknown')) if hasattr(v, 'port'):
if type == 'ZServer.HTTPServer.zhttp_server': # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
port = v.port type = str(getattr(v, '__class__', 'unknown'))
break if type == 'ZServer.HTTPServer.zhttp_server':
ip = socket.gethostbyname(socket.gethostname()) port = v.port
currentNode = '%s:%s' %(ip, port) break
ip = socket.gethostbyname(socket.gethostname())
currentNode = '%s:%s' %(ip, port)
return currentNode return currentNode
security.declarePublic('getDistributingNode') security.declarePublic('getDistributingNode')
...@@ -420,11 +423,46 @@ class ActivityTool (Folder, UniqueObject): ...@@ -420,11 +423,46 @@ class ActivityTool (Folder, UniqueObject):
""" Return the distributingNode """ """ Return the distributingNode """
return self.distributingNode return self.distributingNode
security.declarePublic('getNodeList getNodes') def getNodeList(self, role=None):
def getNodes(self): node_dict = self.getNodeDict()
""" Return all nodes """ if role is None:
return self._nodes result = [x for x in node_dict.keys()]
getNodeList = getNodes else:
result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
result.sort()
return result
def getNodeDict(self):
nodes = self._nodes
if isinstance(nodes, tuple):
new_nodes = OIBTree()
new_nodes.update([(x, ROLE_PROCESSING) for x in self._nodes])
self._nodes = nodes = new_nodes
return nodes
def registerNode(self, node):
node_dict = self.getNodeDict()
if not node_dict.has_key(node):
if len(node_dict) == 0: # If we are registering the first node, make
# it both the distributing node and a processing
# node.
role = ROLE_PROCESSING
self.distributingNode = node
else:
role = ROLE_IDLE
self.updateNode(node, role)
def updateNode(self, node, role):
node_dict = self.getNodeDict()
node_dict[node] = role
security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
def getProcessingNodeList(self):
return self.getNodeList(role=ROLE_PROCESSING)
security.declareProtected(CMFCorePermissions.ManagePortal, 'getUnusedNodeList')
def getIdleNodeList(self):
return self.getNodeList(role=ROLE_IDLE)
def _isValidNodeName(self, node_name) : def _isValidNodeName(self, node_name) :
"""Check we have been provided a good node name""" """Check we have been provided a good node name"""
...@@ -447,46 +485,64 @@ class ActivityTool (Folder, UniqueObject): ...@@ -447,46 +485,64 @@ class ActivityTool (Folder, UniqueObject):
'/manageLoadBalancing?manage_tabs_message=' + '/manageLoadBalancing?manage_tabs_message=' +
urllib.quote("Malformed Distributing Node.")) urllib.quote("Malformed Distributing Node."))
security.declarePublic('manage_addNode') security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
def manage_addNode(self, node, REQUEST=None): def manage_delNode(self, unused_node_list=None, REQUEST=None):
""" add a node """ """ delete selected unused nodes """
if not self._isValidNodeName(node) : processing_node = self.getDistributingNode()
if REQUEST is not None: updated_processing_node = False
REQUEST.RESPONSE.redirect( if unused_node_list is not None:
REQUEST.URL1 + node_dict = self.getNodeDict()
'/manageLoadBalancing?manage_tabs_message=' + for node in unused_node_list:
urllib.quote("Malformed node.")) if node in node_dict:
return del node_dict[node]
if node == processing_node:
if node in self._nodes: self.processing_node = ''
if REQUEST is not None: updated_processing_node = True
REQUEST.RESPONSE.redirect( if REQUEST is not None:
REQUEST.URL1 + if unused_node_list is None:
'/manageLoadBalancing?manage_tabs_message=' + message = "No unused node selected, nothing deleted."
urllib.quote("Node exists already.")) else:
return message = "Deleted nodes %r." % (unused_node_list, )
if updated_processing_node:
self._nodes = self._nodes + (node,) message += "Disabled distributing node because it was deleted."
REQUEST.RESPONSE.redirect(
if REQUEST is not None: REQUEST.URL1 +
REQUEST.RESPONSE.redirect( '/manageLoadBalancing?manage_tabs_message=' +
REQUEST.URL1 + urllib.quote(message))
'/manageLoadBalancing?manage_tabs_message=' +
urllib.quote("Node successfully added.")) security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
security.declarePublic('manage_delNode') """ Change one or more idle nodes into processing nodes """
def manage_delNode(self, deleteNodes, REQUEST=None): if unused_node_list is not None:
""" delete nodes """ node_dict = self.getNodeDict()
nodeList = list(self._nodes) for node in unused_node_list:
for node in deleteNodes: self.updateNode(node, ROLE_PROCESSING)
if node in self._nodes: if REQUEST is not None:
nodeList.remove(node) if unused_node_list is None:
self._nodes = tuple(nodeList) message = "No unused node selected, nothing done."
if REQUEST is not None: else:
REQUEST.RESPONSE.redirect( message = "Nodes now procesing: %r." % (unused_node_list, )
REQUEST.URL1 + REQUEST.RESPONSE.redirect(
'/manageLoadBalancing?manage_tabs_message=' + REQUEST.URL1 +
urllib.quote("Node(s) successfully deleted.")) '/manageLoadBalancing?manage_tabs_message=' +
urllib.quote(message))
security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
""" Change one or more procesing nodes into idle nodes """
if processing_node_list is not None:
node_dict = self.getNodeDict()
for node in processing_node_list:
self.updateNode(node, ROLE_IDLE)
if REQUEST is not None:
if processing_node_list is None:
message = "No used node selected, nothing done."
else:
message = "Nodes now unused %r." % (processing_node_list, )
REQUEST.RESPONSE.redirect(
REQUEST.URL1 +
'/manageLoadBalancing?manage_tabs_message=' +
urllib.quote(message))
def process_timer(self, tick, interval, prev="", next=""): def process_timer(self, tick, interval, prev="", next=""):
""" """
...@@ -508,13 +564,12 @@ class ActivityTool (Folder, UniqueObject): ...@@ -508,13 +564,12 @@ class ActivityTool (Folder, UniqueObject):
newSecurityManager(self.REQUEST, user) newSecurityManager(self.REQUEST, user)
currentNode = self.getCurrentNode() currentNode = self.getCurrentNode()
self.registerNode(currentNode)
processing_node_list = self.getNodeList(role=ROLE_PROCESSING)
# only distribute when we are the distributingNode or if it's empty # only distribute when we are the distributingNode or if it's empty
if (self.distributingNode == currentNode): if (self.getDistributingNode() == currentNode):
self.distribute(len(self._nodes)) self.distribute(len(processing_node_list))
elif not self.distributingNode:
self.distribute(1)
# SkinsTool uses a REQUEST cache to store skin objects, as # SkinsTool uses a REQUEST cache to store skin objects, as
# with TimerService we have the same REQUEST over multiple # with TimerService we have the same REQUEST over multiple
...@@ -527,11 +582,8 @@ class ActivityTool (Folder, UniqueObject): ...@@ -527,11 +582,8 @@ class ActivityTool (Folder, UniqueObject):
# call tic for the current processing_node # call tic for the current processing_node
# the processing_node numbers are the indices of the elements in the node tuple +1 # the processing_node numbers are the indices of the elements in the node tuple +1
# because processing_node starts form 1 # because processing_node starts form 1
if currentNode in self._nodes: if currentNode in processing_node_list:
self.tic(list(self._nodes).index(currentNode)+1) self.tic(processing_node_list.index(currentNode)+1)
elif len(self._nodes) == 0:
self.tic(1)
finally: finally:
timerservice_lock.release() timerservice_lock.release()
......
...@@ -44,130 +44,137 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. ...@@ -44,130 +44,137 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
Zope Configuration File <i>zope.conf</i>. Zope Configuration File <i>zope.conf</i>.
</p> </p>
<p class="form-help">
The <i>Distributing node</i> is responsible for the Load Distribution. The default value is empty
which means that every existing node will try to do the distribution, which is fine when there
is only one ZServer running.
To change the Distributing Node, edit the value and click &quot;Change&quot;.
</p>
<form action="&dtml-URL1;"> <form action="&dtml-URL1;">
<table cellspacing="0" cellpadding="2" border="0" width="100%"> <table cellspacing="0" cellpadding="2" border="0" width="100%">
<tr class="list-header"> <tr class="list-header">
<td align="left" valign="top" colspan=2> <td align="left" valign="top" colspan=2>
<div class="form-label">Distributing Node</div> <div class="form-label">This node</div>
</td> </td>
</tr> </tr>
<tr> <tr>
<td> <td colspan=2>Current node is <b><dtml-var getCurrentNode></b></td>
<div class="form-label">IP:Port</div>
</td>
<td align="left">
<div class="form-item">
<input type="text" name="distributingNode" value="&dtml-getDistributingNode;" size="19" />
</div>
</td>
</tr>
<tr>
<td>&nbsp;</td>
<td align="left">
<div class="form-element">
<input type="submit" class="form-element" name="manage_setDistributingNode:method" value=" Change ">
</div>
</td>
</tr> </tr>
<tr> <tr>
<td>&nbsp;</td> <td>&nbsp;</td>
<td>&nbsp;</td> <td>&nbsp;</td>
</tr> </tr>
<tr>
<td align="left" colspan=2>
<p class="form-help">
The following list defines the nodes the Load is currently distributed to. Per default this list is empty
which means that every existing node will try to process all activities, which is fine, when ther is only one
ZServer running.
To delete a node from the list, activate the checkbox on the left of that node and click the &quot;Delete&quot; button.
</p>
</td>
</tr> </tr>
<tr class="list-header"> <tr class="list-header">
<td align="left" valign="top" colspan=2> <td align="left" valign="top" colspan=2>
<div class="form-label">Existing Nodes</div> <div class="form-label">Distributing Node</div>
</td>
<dtml-in getNodes>
<dtml-let domain=sequence-key
node=sequence-item
index=sequence-index>
<tr class="row-normal">
<td align="left" valign="top">
<input type="checkbox" name="deleteNodes:list" value="&dtml-node;">
</td>
<td align="left" valign="top">
<dtml-if "domain == _.None">
<p>Default</p>
<dtml-else>
<p><dtml-var node></p>
</dtml-if>
</td> </td>
</tr> </tr>
</dtml-let>
</dtml-in>
<tr> <tr>
<td align="left" colspan="2"> <td>
<div class="form-element"> <select name="distributingNode">
<input type="submit" class="form-element" name="manage_delNode:method" value=" Delete " /> <option value="">(disabled)</option>
</div> <dtml-in getNodeList prefix="node">
<dtml-if expr="node_item == getDistributingNode()">
<option selected="selected" value="<dtml-var sequence-item>">
<dtml-var sequence-item>
</option>
<dtml-else>
<option value="<dtml-var sequence-item>">
<dtml-var sequence-item>
</option>
</dtml-if>
</dtml-in>
</select>
<input type="submit" class="form-element" name="manage_setDistributingNode:method" value=" Change ">
</td>
<td>
<p class="form-help">
The <i>Distributing node</i> is responsible for the Load Distribution.
Only one node can be <i>Distributing node</i> at any given time.
It is also possible to interupt activity distribution by selecting the
<i>(disabled)</i> value. Activity nodes will receive no more activity
when <i>Distributing node</i> is disabled.
</p>
</td> </td>
</tr> </tr>
<tr> <tr>
<td>&nbsp;</td> <td>&nbsp;</td>
<td>&nbsp;</td> <td>&nbsp;</td>
</tr> </tr>
<tr> <tr class="list-header">
<td align="left" valign="top" colspan=2> <td align="left" valign="top" colspan=2>
<p class="form-help"> <div class="form-label">Existing Nodes</div>
To add a new node, enter the Ip-Address and Port-Number for the new node
and click the &quot;Add&quot; button.
</p>
</td> </td>
</tr> </tr>
<tr> <tr>
<td> <td>
<div class="form-label">IP:Port</div> <table>
</td> <tr>
<td align="left"> <td rowspan="2">
<div class="form-item"><input type="text" name="node" size="19" /></div> <div class="form-label">Idle&nbsp;nodes</div>
<select name="unused_node_list:list" size="10" multiple="multiple" style="width:100%">
<dtml-in getIdleNodeList>
<option value="<dtml-var sequence-item>"><dtml-var sequence-item></option>
</dtml-in>
</select>
</td>
<td>
<input type="submit" name="manage_addToProcessingList:method" value="&gt;"/>
</td>
<td rowspan="2">
<div class="form-label">Processing&nbsp;nodes</div>
<select name="processing_node_list:list" size="10" multiple="multiple" style="width:100%">
<dtml-in getProcessingNodeList prefix="node">
<option value="<dtml-var node_item>"><dtml-var node_item> (#<dtml-var expr="node_index + 1">)</option>
</dtml-in>
</select>
</td>
</tr>
<tr>
<td>
<input type="submit" name="manage_removeFromProcessingList:method" value="&lt;"/>
</td>
</tr>
<tr>
<td align="left" colspan="2">
<div class="form-element">
<input type="submit" class="form-element" name="manage_delNode:method" value=" Delete " />
</div>
</td>
</tr>
</table>
</td> </td>
</tr> <td>
<tr> <p class="form-help">
<td>&nbsp;</td> Every node sharing the same ZODB will register itself to the idle
<td align="left"> node list - except the first one which will be automaticaly declared
<div class="form-element"> both <i>Processing node</i> and <i>Distributing node</i>. Registered
<input type="submit" class="form-element" name="manage_addNode:method" value=" Add "> nodes can then be made <i>Processing nodes</i> or <i>Idle nodes</i>.
</div> </p>
<p class="form-help">
<b>Important note:</b> Nodes can register themselves, but can not
unregister themselves (for example, a node which IP has changed will
be present twice in the list). It is up to the user to manualy prune
obsolete nodes. If non-existant nodes are present in the
<i>Processing node</i> list, activities will get balanced to those
nodes and never be executed.
</p>
</td> </td>
</tr> </tr>
<tr> <tr>
<td>&nbsp;</td> <td>&nbsp;</td>
<td>&nbsp;</td> <td>&nbsp;</td>
</tr>
<tr>
<td align="left" colspan=2>
<p class="form-help">
Subscribe/Unsubscribe from Timer Service
</p>
</td>
</tr> </tr>
<tr class="list-header"> <tr class="list-header">
<td align="left" valign="top" colspan=2> <td align="left" valign="top" colspan=2>
<div class="form-label">Subscribe/Unsubscribe from Timer Service</div>
</td>
</tr>
<tr>
<td colspan=2>
<div class="form-label"> <div class="form-label">
Status: Status:
<dtml-if isSubscribed> <dtml-if isSubscribed>
Subscribed Subscribed
<dtml-else> <dtml-else>
Not Subscribed Not Subscribed
</dtml-if> </dtml-if>
</div> </div>
</td> </td>
</tr> </tr>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment