Commit 23d660e1 authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity: Provide automatic migration to named node.

Assumes nodes will not be named after other node's network address.
parent 3a849db9
...@@ -88,6 +88,7 @@ tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent ti ...@@ -88,6 +88,7 @@ tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent ti
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
is_running_lock = threading.Lock() is_running_lock = threading.Lock()
currentNode = None currentNode = None
_server_address = None
ROLE_IDLE = 0 ROLE_IDLE = 0
ROLE_PROCESSING = 1 ROLE_PROCESSING = 1
...@@ -785,7 +786,27 @@ class ActivityTool (Folder, UniqueObject): ...@@ -785,7 +786,27 @@ class ActivityTool (Folder, UniqueObject):
def manage_afterAdd(self, item, container): def manage_afterAdd(self, item, container):
self.subscribe() self.subscribe()
Folder.inheritedAttribute('manage_afterAdd')(self, item, container) Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
def getServerAddress(self):
"""
Backward-compatibility code only.
"""
global _server_address
if _server_address is None:
ip = port = ''
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'addr'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
if ip == '0.0.0.0':
ip = socket.gethostbyname(socket.gethostname())
_server_address = '%s:%s' %(ip, port)
return _server_address
def getCurrentNode(self): def getCurrentNode(self):
""" Return current node identifier """ """ Return current node identifier """
global currentNode global currentNode
...@@ -803,18 +824,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -803,18 +824,7 @@ class ActivityTool (Folder, UniqueObject):
'</product-config>\n' '</product-config>\n'
'section in your zope.conf, replacing "..." with a cluster-unique ' 'section in your zope.conf, replacing "..." with a cluster-unique '
'node identifier.', DeprecationWarning) 'node identifier.', DeprecationWarning)
ip = port = '' currentNode = self.getServerAddress()
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'addr'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
if ip == '0.0.0.0':
ip = socket.gethostbyname(socket.gethostname())
currentNode = '%s:%s' %(ip, port)
return currentNode return currentNode
security.declarePublic('getDistributingNode') security.declarePublic('getDistributingNode')
...@@ -841,14 +851,19 @@ class ActivityTool (Folder, UniqueObject): ...@@ -841,14 +851,19 @@ class ActivityTool (Folder, UniqueObject):
def registerNode(self, node): def registerNode(self, node):
node_dict = self.getNodeDict() node_dict = self.getNodeDict()
if not node_dict.has_key(node): if node not in node_dict:
if len(node_dict) == 0: # If we are registering the first node, make if node_dict:
# it both the distributing node and a processing # BBB: check if our node was known by address (processing and/or
# node. # distribution), and migrate it.
server_address = self.getServerAddress()
role = node_dict.pop(server_address, ROLE_IDLE)
if self.distributingNode == server_address:
self.distributingNode = node
else:
# We are registering the first node, make
# it both the distributing node and a processing node.
role = ROLE_PROCESSING role = ROLE_PROCESSING
self.distributingNode = node self.distributingNode = node
else:
role = ROLE_IDLE
self.updateNode(node, role) self.updateNode(node, role)
def updateNode(self, node, role): def updateNode(self, node, role):
......
...@@ -43,6 +43,7 @@ import urllib ...@@ -43,6 +43,7 @@ import urllib
last_tic = time.time() last_tic = time.time()
last_tic_lock = threading.Lock() last_tic_lock = threading.Lock()
_check_upgrade = True
class AlarmTool(TimerServiceMixin, BaseTool): class AlarmTool(TimerServiceMixin, BaseTool):
""" """
...@@ -170,6 +171,12 @@ class AlarmTool(TimerServiceMixin, BaseTool): ...@@ -170,6 +171,12 @@ class AlarmTool(TimerServiceMixin, BaseTool):
if now - last_tic >= self.interval: if now - last_tic >= self.interval:
self.tic() self.tic()
last_tic = now last_tic = now
elif _check_upgrade and self.getServerAddress() == alarmNode:
# BBB: check (once per run) if our node was alarm_node by address, and
# migrate it.
global _check_upgrade
_check_upgrade = False
self.setAlarmNode(current_node)
finally: finally:
last_tic_lock.release() last_tic_lock.release()
......
...@@ -80,5 +80,7 @@ class TimerServiceMixin(object): ...@@ -80,5 +80,7 @@ class TimerServiceMixin(object):
security.declarePublic('getCurrentNode') security.declarePublic('getCurrentNode')
getCurrentNode = ActivityTool.getCurrentNode.im_func getCurrentNode = ActivityTool.getCurrentNode.im_func
security.declarePublic('getServerAddress')
getServerAddress = ActivityTool.getServerAddress.im_func
_isValidNodeName = ActivityTool._isValidNodeName.im_func _isValidNodeName = ActivityTool._isValidNodeName.im_func
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment