Commit 79a91fb4 authored by Jérome Perrin's avatar Jérome Perrin

grid,manager: close xmlrpc.ServerProxy sockets

by using it getSupervisorRPC in a context manager which on python 3
automatically closes.
This needs fix from https://github.com/Supervisor/supervisor/issues/1184
we assume that this fix is only on python 3.
On python2, we keep this same behavior of not closing socket explicitly
and leaving it to destructors.

Also re-raise a few error cases that were ignored.
parent 0998e7f1
...@@ -708,14 +708,16 @@ class Partition(object): ...@@ -708,14 +708,16 @@ class Partition(object):
"""Asks supervisord to start the instance. If this instance is not """Asks supervisord to start the instance. If this instance is not
installed, we install it. installed, we install it.
""" """
supervisor = self.getSupervisorRPC()
partition_id = self.computer_partition.getId() partition_id = self.computer_partition.getId()
try: try:
supervisor.startProcessGroup(partition_id, False) with self.getSupervisorRPC() as supervisor:
supervisor.startProcessGroup(partition_id, False)
except xmlrpclib.Fault as exc: except xmlrpclib.Fault as exc:
if exc.faultString.startswith('BAD_NAME:'): if exc.faultString.startswith('BAD_NAME:'):
self.logger.info("Nothing to start on %s..." % self.logger.info("Nothing to start on %s..." %
self.computer_partition.getId()) self.computer_partition.getId())
else:
raise
else: else:
self.logger.info("Requested start of %s..." % self.computer_partition.getId()) self.logger.info("Requested start of %s..." % self.computer_partition.getId())
...@@ -723,11 +725,13 @@ class Partition(object): ...@@ -723,11 +725,13 @@ class Partition(object):
"""Asks supervisord to stop the instance.""" """Asks supervisord to stop the instance."""
partition_id = self.computer_partition.getId() partition_id = self.computer_partition.getId()
try: try:
supervisor = self.getSupervisorRPC() with self.getSupervisorRPC() as supervisor:
supervisor.stopProcessGroup(partition_id, False) supervisor.stopProcessGroup(partition_id, False)
except xmlrpclib.Fault as exc: except xmlrpclib.Fault as exc:
if exc.faultString.startswith('BAD_NAME:'): if exc.faultString.startswith('BAD_NAME:'):
self.logger.info('Partition %s not known in supervisord, ignoring' % partition_id) self.logger.info('Partition %s not known in supervisord, ignoring' % partition_id)
else:
raise
else: else:
self.logger.info("Requested stop of %s..." % self.computer_partition.getId()) self.logger.info("Requested stop of %s..." % self.computer_partition.getId())
...@@ -798,15 +802,18 @@ class Partition(object): ...@@ -798,15 +802,18 @@ class Partition(object):
def checkProcessesFromStateList(self, process_list, state_list): def checkProcessesFromStateList(self, process_list, state_list):
"""Asks supervisord to check if one of the processes are in the state_list.""" """Asks supervisord to check if one of the processes are in the state_list."""
supervisor = self.getSupervisorRPC()
for process in process_list: for process in process_list:
try: try:
info = supervisor.getProcessInfo(process) with self.getSupervisorRPC() as supervisor:
info = supervisor.getProcessInfo(process)
if info['statename'] in state_list: if info['statename'] in state_list:
return True return True
except xmlrpclib.Fault as exc: except xmlrpclib.Fault as exc:
self.logger.debug("BAD process name: %r" % process) if exc.faultString.startswith('BAD_NAME:'):
continue self.logger.debug("BAD process name: %r" % process)
continue
else:
raise
return False return False
def cleanupFolder(self, folder_path): def cleanupFolder(self, folder_path):
...@@ -832,43 +839,43 @@ class Partition(object): ...@@ -832,43 +839,43 @@ class Partition(object):
# In future it will not be needed, as update command # In future it will not be needed, as update command
# is going to be implemented on server side. # is going to be implemented on server side.
self.logger.debug('Updating supervisord') self.logger.debug('Updating supervisord')
supervisor = self.getSupervisorRPC() with self.getSupervisorRPC() as supervisor:
# took from supervisord.supervisorctl.do_update # took from supervisord.supervisorctl.do_update
result = supervisor.reloadConfig() result = supervisor.reloadConfig()
added, changed, removed = result[0] added, changed, removed = result[0]
for gname in removed: for gname in removed:
results = supervisor.stopProcessGroup(gname) results = supervisor.stopProcessGroup(gname)
fails = [res for res in results fails = [res for res in results
if res['status'] == xmlrpc.Faults.FAILED] if res['status'] == xmlrpc.Faults.FAILED]
if fails: if fails:
self.logger.warning('Problem while stopping process %r, will try later' % gname) self.logger.warning('Problem while stopping process %r, will try later' % gname)
else: else:
self.logger.info('Stopped %r' % gname)
for i in range(0, 10):
# Some process may be still running, be nice and wait for them to be stopped.
try:
supervisor.removeProcessGroup(gname)
break
except:
if i == 9:
raise
time.sleep(1)
self.logger.info('Removed %r' % gname)
for gname in changed:
results = supervisor.stopProcessGroup(gname)
self.logger.info('Stopped %r' % gname) self.logger.info('Stopped %r' % gname)
for i in range(0, 10):
# Some process may be still running, be nice and wait for them to be stopped. supervisor.removeProcessGroup(gname)
try: supervisor.addProcessGroup(gname)
supervisor.removeProcessGroup(gname) self.logger.info('Updated %r' % gname)
break
except: for gname in added:
if i == 9: supervisor.addProcessGroup(gname)
raise self.logger.info('Updated %r' % gname)
time.sleep(1) self.logger.debug('Supervisord updated')
self.logger.info('Removed %r' % gname)
for gname in changed:
results = supervisor.stopProcessGroup(gname)
self.logger.info('Stopped %r' % gname)
supervisor.removeProcessGroup(gname)
supervisor.addProcessGroup(gname)
self.logger.info('Updated %r' % gname)
for gname in added:
supervisor.addProcessGroup(gname)
self.logger.info('Updated %r' % gname)
self.logger.debug('Supervisord updated')
def _set_ownership(self, path): def _set_ownership(self, path):
""" """
......
...@@ -36,6 +36,7 @@ import stat ...@@ -36,6 +36,7 @@ import stat
import sys import sys
import time import time
from six.moves import xmlrpc_client as xmlrpclib from six.moves import xmlrpc_client as xmlrpclib
import contextlib
from slapos.grid.utils import (createPrivateDirectory, SlapPopen, updateFile) from slapos.grid.utils import (createPrivateDirectory, SlapPopen, updateFile)
from slapos.util import bytes2str from slapos.util import bytes2str
...@@ -43,13 +44,25 @@ from slapos.util import bytes2str ...@@ -43,13 +44,25 @@ from slapos.util import bytes2str
from supervisor import xmlrpc, states from supervisor import xmlrpc, states
@contextlib.contextmanager
def getSupervisorRPC(socket): def getSupervisorRPC(socket):
"""Get a supervisor XML-RPC connection.
Use in a context manager for proper closing of sockets.
"""
supervisor_transport = xmlrpc.SupervisorTransport('', '', supervisor_transport = xmlrpc.SupervisorTransport('', '',
'unix://' + socket) 'unix://' + socket)
server_proxy = xmlrpclib.ServerProxy('http://127.0.0.1', server_proxy = xmlrpclib.ServerProxy('http://127.0.0.1',
supervisor_transport) supervisor_transport)
return getattr(server_proxy, 'supervisor')
# python3's xmlrpc is a closing context manager, python2 is not and cannot be
# just used as a context manager as it would call __enter__ and __exit__ on
# XML-RPC.
if sys.version_info.major == 2:
yield server_proxy.supervisor
else:
with server_proxy as s:
yield s.supervisor
def _getSupervisordSocketPath(instance_root): def _getSupervisordSocketPath(instance_root):
return os.path.join(instance_root, 'supervisord.socket') return os.path.join(instance_root, 'supervisord.socket')
...@@ -116,13 +129,13 @@ def _updateWatchdog(socket): ...@@ -116,13 +129,13 @@ def _updateWatchdog(socket):
Then, when running slapgrid, the real watchdog configuration is generated. Then, when running slapgrid, the real watchdog configuration is generated.
We thus need to reload watchdog configuration if needed and start it. We thus need to reload watchdog configuration if needed and start it.
""" """
supervisor = getSupervisorRPC(socket) with getSupervisorRPC(socket) as supervisor:
if supervisor.getProcessInfo('watchdog')['state'] not in states.RUNNING_STATES: if supervisor.getProcessInfo('watchdog')['state'] not in states.RUNNING_STATES:
# XXX workaround for https://github.com/Supervisor/supervisor/issues/339 # XXX workaround for https://github.com/Supervisor/supervisor/issues/339
# In theory, only reloadConfig is needed. # In theory, only reloadConfig is needed.
supervisor.removeProcessGroup('watchdog') supervisor.removeProcessGroup('watchdog')
supervisor.reloadConfig() supervisor.reloadConfig()
supervisor.addProcessGroup('watchdog') supervisor.addProcessGroup('watchdog')
def launchSupervisord(instance_root, logger, def launchSupervisord(instance_root, logger,
supervisord_additional_argument_list=None): supervisord_additional_argument_list=None):
...@@ -132,13 +145,15 @@ def launchSupervisord(instance_root, logger, ...@@ -132,13 +145,15 @@ def launchSupervisord(instance_root, logger,
trynum = 1 trynum = 1
while trynum < 6: while trynum < 6:
try: try:
supervisor = getSupervisorRPC(socket) with getSupervisorRPC(socket) as supervisor:
status = supervisor.getState() status = supervisor.getState()
except xmlrpclib.Fault as e: except xmlrpclib.Fault as e:
if e.faultCode == 6 and e.faultString == 'SHUTDOWN_STATE': if e.faultCode == 6 and e.faultString == 'SHUTDOWN_STATE':
logger.info('Supervisor in shutdown procedure, will check again later.') logger.info('Supervisor in shutdown procedure, will check again later.')
trynum += 1 trynum += 1
time.sleep(2 * trynum) time.sleep(2 * trynum)
else:
raise
except Exception: except Exception:
# In case if there is problem with connection, assume that supervisord # In case if there is problem with connection, assume that supervisord
# is not running and try to run it # is not running and try to run it
...@@ -187,8 +202,8 @@ def launchSupervisord(instance_root, logger, ...@@ -187,8 +202,8 @@ def launchSupervisord(instance_root, logger,
while trynum < 6: while trynum < 6:
try: try:
socketlib.setdefaulttimeout(current_timeout) socketlib.setdefaulttimeout(current_timeout)
supervisor = getSupervisorRPC(socket) with getSupervisorRPC(socket) as supervisor:
status = supervisor.getState() status = supervisor.getState()
if status['statename'] == 'RUNNING' and status['statecode'] == 1: if status['statename'] == 'RUNNING' and status['statecode'] == 1:
return return
logger.warning('Wrong status name %(statename)r and code ' logger.warning('Wrong status name %(statename)r and code '
......
...@@ -167,13 +167,13 @@ class Manager(object): ...@@ -167,13 +167,13 @@ class Manager(object):
partition.writeSupervisorConfigurationFile() partition.writeSupervisorConfigurationFile()
# Start processes # Start processes
supervisord = partition.getSupervisorRPC() with partition.getSupervisorRPC() as supervisor:
for program in socat_programs: for program in socat_programs:
process_name = '{}:{}'.format(group_id, program['name']) process_name = '{}:{}'.format(group_id, program['name'])
status = supervisord.getProcessInfo(process_name) status = supervisor.getProcessInfo(process_name)
if status['start'] == 0: if status['start'] == 0:
supervisord.startProcess(process_name, False) supervisor.startProcess(process_name, False)
def report(self, partition): def report(self, partition):
"""Method called at `slapos node report` phase. """Method called at `slapos node report` phase.
......
...@@ -80,16 +80,16 @@ class Manager(object): ...@@ -80,16 +80,16 @@ class Manager(object):
partition.writeSupervisorConfigurationFile() partition.writeSupervisorConfigurationFile()
# check the state of all process, if the process is not started yes, start it # check the state of all process, if the process is not started yes, start it
supervisord = partition.getSupervisorRPC() with partition.getSupervisorRPC() as supervisor:
process_list_string = "" process_list_string = ""
for name in wrapper_list: for name in wrapper_list:
process_name = '-'.join([partition_id, group_suffix]) + ':' + name process_name = '-'.join([partition_id, group_suffix]) + ':' + name
process_list_string += '%s\n' % process_name process_list_string += '%s\n' % process_name
status = supervisord.getProcessInfo(process_name) status = supervisor.getProcessInfo(process_name)
if status['start'] == 0: if status['start'] == 0:
# process is not started yet # process is not started yet
logger.info("Starting pre-delete process %r..." % name) logger.info("Starting pre-delete process %r..." % name)
supervisord.startProcess(process_name, False) supervisor.startProcess(process_name, False)
# ask to slapgrid to check theses scripts before destroy partition # ask to slapgrid to check theses scripts before destroy partition
with open(wait_filepath, 'w') as f: with open(wait_filepath, 'w') as f:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment