Commit 16b2e8b8 authored by Cédric Le Ninivin's avatar Cédric Le Ninivin

Introducing watchdog

parent 676e32b8
......@@ -59,6 +59,7 @@ setup(name=name,
'slapproxy = slapos.proxy:main',
'bang = slapos.bang:main',
'slapos = slapos.entry:main',
'watchdog = slapos.grid.watchdog:main',
]
},
test_suite="slapos.tests",
......
......@@ -43,6 +43,7 @@ from exception import BuildoutFailedError, WrongPermissionError, \
PathDoesNotExistError
from networkcache import download_network_cached, upload_network_cached
import tarfile
from watchdog import getWatchdogID
REQUIRED_COMPUTER_PARTITION_PERMISSION = '0750'
......@@ -237,6 +238,7 @@ class Partition(object):
self.software_path = software_path
self.instance_path = instance_path
self.run_path = os.path.join(self.instance_path, 'etc', 'run')
self.service_path = os.path.join(self.instance_path, 'etc', 'service')
self.supervisord_partition_configuration_path = \
supervisord_partition_configuration_path
self.supervisord_socket = supervisord_socket
......@@ -276,6 +278,26 @@ class Partition(object):
gid = stat_info.st_gid
return (uid, gid)
def addServiceToGroup(self, partition_id,
runner_list, path, extension = ''):
uid, gid = self.getUserGroupId()
program_partition_template = pkg_resources.resource_stream(__name__,
'templates/program_partition_supervisord.conf.in').read()
for runner in runner_list:
self.partition_supervisor_configuration += '\n' + \
program_partition_template % dict(
program_id='_'.join([partition_id, runner]),
program_directory=self.instance_path,
program_command=os.path.join(path, runner),
program_name=runner+extension,
instance_path=self.instance_path,
user_id=uid,
group_id=gid,
# As supervisord has no environment to inherit setup minimalistic one
HOME=pwd.getpwuid(uid).pw_dir,
USER=pwd.getpwuid(uid).pw_name,
)
def install(self):
""" Creates configuration file from template in software_path, then
installs the software partition with the help of buildout
......@@ -384,42 +406,35 @@ class Partition(object):
self.logger.info("Generating supervisord config file from template...")
# check if CP/etc/run exists and it is a directory
# iterate over each file in CP/etc/run
# iterate over each file in CP/etc/service adding WatchdogID to their name
# if at least one is not 0750 raise -- partition has something funny
runner_list = []
service_list = []
if os.path.exists(self.run_path):
if os.path.isdir(self.run_path):
runner_list = os.listdir(self.run_path)
if len(runner_list) == 0:
self.logger.warning('No runners found for partition %r' %
if os.path.exists(self.service_path):
if os.path.isdir(self.service_path):
service_list = os.listdir(self.service_path)
if len(runner_list) == 0 and len(service_list) == 0:
self.logger.warning('No runners nor services found for partition %r' %
self.partition_id)
if os.path.exists(self.supervisord_partition_configuration_path):
os.unlink(self.supervisord_partition_configuration_path)
else:
partition_id = self.computer_partition.getId()
program_partition_template = pkg_resources.resource_stream(__name__,
'templates/program_partition_supervisord.conf.in').read()
group_partition_template = pkg_resources.resource_stream(__name__,
'templates/group_partition_supervisord.conf.in').read()
partition_supervisor_configuration = group_partition_template % dict(
self.partition_supervisor_configuration = group_partition_template % dict(
instance_id=partition_id,
program_list=','.join(['_'.join([partition_id, runner])
for runner in runner_list]))
for runner in runner_list:
partition_supervisor_configuration += '\n' + \
program_partition_template % dict(
program_id='_'.join([partition_id, runner]),
program_directory=self.instance_path,
program_command=os.path.join(self.run_path, runner),
program_name=runner,
instance_path=self.instance_path,
user_id=uid,
group_id=gid,
# As supervisord has no environment to inherit setup minimalistic one
HOME=pwd.getpwuid(uid).pw_dir,
USER=pwd.getpwuid(uid).pw_name,
)
for runner in runner_list+service_list]))
# Same method to add to service and run
self.addServiceToGroup(partition_id, runner_list,self.run_path)
self.addServiceToGroup(partition_id, service_list,self.service_path,
extension=getWatchdogID())
utils.updateFile(self.supervisord_partition_configuration_path,
partition_supervisor_configuration)
self.partition_supervisor_configuration)
self.updateSupervisor()
def start(self):
......
......@@ -451,6 +451,17 @@ class Slapgrid(object):
computer_partition_filter_list.split(",")
self.maximum_periodicity = maximum_periodicity
self.force_periodicity = force_periodicity
# XXX hardcoded watchdog_path
self.watchdog_path = '/opt/slapos/bin/watchdog'
def getWatchdogLine(self):
invocation_list = [self.watchdog_path]
invocation_list.append("--master-url '%s' " % self.master_url)
if self.key_file is not None and self.cert_file is not None:
invocation_list.append("--cert-file %s" % self.cert_file)
invocation_list.append("--key-file %s" % self.key_file)
invocation_list.append("--computer-id '%s'" % self.computer_id)
return ' '.join(invocation_list)
def checkEnvironmentAndCreateStructure(self):
"""Checks for software_root and instance_root existence, then creates
......@@ -486,6 +497,7 @@ class Slapgrid(object):
supervisord_pidfile=os.path.abspath(os.path.join(
self.instance_root, 'var', 'run', 'supervisord.pid')),
supervisord_logfile_backups='10',
watchdog_command = self.getWatchdogLine(),
))
except (WrongPermissionError, PathDoesNotExistError) as error:
raise error
......
......@@ -18,3 +18,7 @@ logfile-backups = %(supervisord_logfile_backups)s
[unix_http_server]
file=%(supervisord_socket)s
chmod=0700
[eventlistener:watchdog]
command=%(watchdog_command)s
events=PROCESS_STATE_EXITED, PROCESS_STATE_FATAL
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import argparse
import slapos.slap.slap
import subprocess
from supervisor import childutils
import sys
def getWatchdogID():
return "-on-watch"
def parseArgumentTuple():
"""Parses arguments either from command line, from method parameters or from
config file. Then returns a new instance of slapgrid.Slapgrid with those
parameters. Also returns the options dict and unused variable list, and
configures logger.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--master-url",
help="The master server URL. Mandatory.",
required=True)
parser.add_argument("--computer-id",
help="The computer id defined in the server.",
required=True)
parser.add_argument("--key-file",
help="SSL Authorisation key file.",
default=None)
parser.add_argument("--cert-file",
help="SSL Authorisation certificate file.",
default=None)
option = parser.parse_args()
# Build option_dict
option_dict = {}
for argument_key, argument_value in vars(option).iteritems():
option_dict.update({argument_key: argument_value})
return option_dict
class Watchdog():
process_state_events = ['PROCESS_STATE_EXITED', 'PROCESS_STATE_FATAL']
def __init__(self, option_dict):
for option, value in option_dict.items():
setattr(self, option, value)
self.stdin = sys.stdin
self.stdout = sys.stdout
self.stderr = sys.stderr
self.slap = slapos.slap.slap()
self.slap.initializeConnection(
slapgrid_uri=self.master_url, key_file=self.key_file,
cert_file=self.cert_file)
def write_stdout(self, s):
self.stdout.write(s)
self.stdout.flush()
def write_stderr(self, s):
self.stderr.write(s)
self.stderr.flush()
def run(self):
while 1:
self.write_stdout('READY\n')
line = self.stdin.readline() # read header line from stdin
headers = dict([ x.split(':') for x in line.split() ])
data = sys.stdin.read(int(headers['len'])) # read the event payload
self.handle_event(headers, data)
self.write_stdout('RESULT 2\nOK') # transition from READY to ACKNOWLEDGED
def handle_event(self, headers, payload):
if headers['eventname'] in self.process_state_events:
payload_dict = dict([ x.split(':') for x in payload.split() ])
if getWatchdogID() in payload_dict['processname']:
self.handle_process_state_change_event(headers, payload_dict)
def handle_process_state_change_event(self, headers, payload_dict):
partition_id = payload_dict['groupname']
partition = slapos.slap.ComputerPartition(
computer_id = self.computer_id,
connection_helper = self.slap._connection_helper ,
partition_id = partition_id)
partition.bang("%s process in partition %s encountered a problem"
% (payload_dict['processname'], partition_id))
def main():
watchdog = Watchdog(parseArgumentTuple())
watchdog.run()
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment