Commit bb83c9f6 authored by Timothée Lacroix's avatar Timothée Lacroix Committed by Cédric de Saint Martin

Merge Thimothée's work on resiliency

This is a squash merge of 'resiliency-testing' branch when
the HEAD of this branch was f725c81d7d69.

The merge was done by Antoine Catton.
parent bda7b9c1
......@@ -41,6 +41,7 @@ setup(name=name,
zip_safe=True,
entry_points={
'zc.buildout': [
'addresiliency = slapos.recipe.addres:Recipe',
'agent = slapos.recipe.agent:Recipe',
'apache.frontend = slapos.recipe.apache_frontend:Recipe',
'apachephp = slapos.recipe.apachephp:Recipe',
......@@ -60,6 +61,7 @@ setup(name=name,
'dropbear.add_authorized_key = slapos.recipe.dropbear:AddAuthorizedKey',
'dropbear.client = slapos.recipe.dropbear:Client',
'dropbear = slapos.recipe.dropbear:Recipe',
'dumpmdb = slapos.recipe.dumpmdb:Recipe',
'duplicity = slapos.recipe.duplicity:Recipe',
'egg_test = slapos.recipe.erp5_test:EggTestRecipe',
'equeue = slapos.recipe.equeue:Recipe',
......@@ -83,6 +85,7 @@ setup(name=name,
'gitinit = slapos.recipe.gitinit:Recipe',
'haproxy = slapos.recipe.haproxy:Recipe',
'helloworld = slapos.recipe.helloworld:Recipe',
'importmdb = slapos.recipe.importmdb:Recipe',
'java = slapos.recipe.java:Recipe',
'kumofs = slapos.recipe.kumofs:Recipe',
'kvm.frontend = slapos.recipe.kvm_frontend:Recipe',
......
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.recipe.librecipe import GenericBaseRecipe
import sys
import os
class Recipe(GenericBaseRecipe):
""" This class provides the installation of the resilience
script on the partition.
"""
def install(self):
path_list = []
param_dict = self.getComputerPartitionInstanceParameterDict()
self_id = int(param_dict['number'])
ip = param_dict['ip-list'].split(' ')
print 'Creating bully script with ips : %s\n' % ip
slap_connection = self.buildout['slap-connection']
path_conf = os.path.join(self.options['script'], 'conf.in')
path_bully = os.path.join(self.options['script'], param_dict['script'])
path_bully_new = os.path.join(self.options['script'], 'new.py')
path_run = os.path.join(self.options['run'], param_dict['wrapper'])
print 'paths: %s\n%s\n' % (path_run, path_bully)
bully_conf = dict(self_id=self_id,
ip_list=ip,
executable=sys.executable,
syspath=sys.path,
server_url=slap_connection['server-url'],
key_file=slap_connection.get('key-file'),
cert_file=slap_connection.get('cert-file'),
computer_id=slap_connection['computer-id'],
partition_id=slap_connection['partition-id'],
software=slap_connection['software-release-url'],
namebase=param_dict['namebase'],
confpath=path_conf)
try:
conf = self.createFile(path_conf,
self.substituteTemplate(
self.getTemplateFilename('conf.in.in'),
bully_conf))
path_list.append(conf)
script = self.createExecutable(path_bully,
self.substituteTemplate(
self.getTemplateFilename('bully.py.in'),
bully_conf))
path_list.append(script)
# for testing purposes only
scriptNew = self.createExecutable(path_bully_new,
self.substituteTemplate(
self.getTemplateFilename('bully_new.py.in'),
bully_conf))
path_list.append(scriptNew)
wrapper = self.createPythonScript(
path_run,
'slapos.recipe.librecipe.execute.execute',
[path_bully])
path_list.append(wrapper)
except IOError:
pass
return path_list
#!%(executable)s
import select
import socket
import threading
import time
import sys
sys.path[:] = %(syspath)s
from slapos import slap as slapmodule
port = 50000
size = 1024
wait = True
def loadConnectionInfos():
connectionInfos = {}
file = open('%(confpath)s', 'r')
params = file.read().split('\n')
file.close()
ip_list = [x.strip("' ") for x in params[0].strip('[],').split(',')]
connectionInfos['self_id'] = int(params[1])
connectionInfos['server_list'] = \
[(i, ip_list[i]) for i in range(len(ip_list))]
connectionInfos['self_ip'] = ip_list[connectionInfos['self_id']]
return connectionInfos
def rename_broken_and_stop():
try:
slap = slapmodule.slap()
slap.initializeConnection('%(server_url)s',
'%(key_file)s',
'%(cert_file)s')
computer_partition = slap.registerComputerPartition('%(computer_id)s',
'%(partition_id)s')
broken = computer_partition.request('%(software)s', 'frozen', '%(namebase)s0')
broken.rename('broken-%%s' %% (time.strftime("%%d-%%b_%%H:%%M:%%S", time.gmtime())))
broken.stopped()
computer_partition.rename('%(namebase)s0')
print 'renaming done\n'
except slapos.slap.slap.ServerError:
print 'Internal server error\n'
def election():
global wait
connection = loadConnectionInfos()
message = "%%s, %%s" %% (connection['self_id'], "Election")
victory = True
for (remote_id, addr) in connection['server_list']:
if remote_id > connection['self_id']:
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((addr, port + remote_id))
s.send(message)
reply = s.recv(size)
if reply == "%%s, %%s" %% (remote_id, "Alive"):
victory = False
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
if victory:
wait = True
for (remote_id, addr) in connection['server_list']:
if remote_id < connection['self_id']:
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((addr, port + remote_id))
s.send("%%s, %%s" %% (connection['self_id'], "Victory"))
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
rename_broken_and_stop()
def failure_detect():
global wait
connection = loadConnectionInfos()
while True:
time.sleep(30)
if wait:
print 'waiting 30 minutes\n'
time.sleep(30 * 60)
wait = False
if not connection['server_list'][0]:
continue
(remote_id, addr) = connection['server_list'][0]
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((addr, port + remote_id))
s.close()
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
s.close()
election()
failure_detect_thread = threading.Thread(target=failure_detect)
failure_detect_thread.start()
connection = loadConnectionInfos()
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.bind((connection['self_ip'], port + connection['self_id']))
s.listen(5)
#election()
while True:
force_election = False
client, _ = s.accept()
client_message = client.recv(1024)
if client_message:
client_id, message = client_message.split(', ')
client_id = eval(client_id)
if message == "Victory":
wait = True
print "%%s wins" %% client_id
elif message == "Election":
print "%%s starts an election" %% client_id
if client_id < connection['self_id']:
client.send("%%s, %%s" %% (connection['self_id'], "Alive"))
force_election = True
client.close()
if force_election:
election()
#!%(executable)s
import socket
import time
import sys
import thread
import time
import os
sys.path[:] = %(syspath)s
from slapos import slap as slapmodule
port = 50000
size = 1024
def rename_broken_and_stop():
try:
slap = slapmodule.slap()
slap.initializeConnection('%(server_url)s',
'%(key_file)s',
'%(cert_file)s')
computer_partition = slap.registerComputerPartition('%(computer_id)s',
'%(partition_id)s')
broken = computer_partition.request('%(software)s', 'frozen', '%(namebase)s0')
broken.rename('broken-%%s' %% (time.strftime("%%d-%%b_%%H:%%M:%%S", time.gmtime())))
broken.stopped()
computer_partition.rename('%(namebase)s0')
print 'renaming done\n'
except slapos.slap.slap.ServerError:
print 'Internal server error\n'
## Leader is always number 0
class ResilientInstance(object):
def __init__(self, comm):
self.comm = comm
self.id = 0
self.state = 'normal'
self.halter = 0
self.nbComp = nbComp
self.inElection = False
self.alive = True
self.lastPing = time.clock()
self.mainCanal = self.comm.canal(['ping', 'halt',
'victory'])
self.okCanal = self.comm.canal(['ok'])
self.loadConnectionInfos()
def loadConnectionInfos(self):
file = open('%(confpath)s', 'r')
params = file.read().split('\n')
file.close()
self.nbComp = len([x.strip("' ") for x in params[0].strip('[],').split(',')])
new_id = int(params[1])
if self.id != new_id:
self.halter = new_id
self.id = new_id
## Needs to be changed to use the master
def aliveManagement(self):
while self.alive:
time.sleep(30*60)
if self.id == 0:
continue
self.comm.send('ping', 0)
message, sender = self.okCanal.get()
if message:
continue
self.election()
def listen(self):
while self.alive:
self.comm.recv()
def main(self):
while self.alive:
message, sender = self.mainCanal.get()
if message == 'ping':
self.comm.send('ok', sender)
elif message == 'halt':
self.state = 'waitingConfirm'
self.halter = sender
self.comm.send('ok', sender)
elif message == 'victory':
if int(sender) == int(self.halter) and self.state == 'waitingConfirm':
print '%s thinks %s is the leader\n' % (self.id, sender)
self.comm.send('ok', sender)
self.state = 'normal'
def election(self):
self.inElection = True
self.loadConnectionInfos()
#Check if I'm the highest instance alive
for higher in range(self.id + 1, self.nbComp):
self.comm.send('ping', higher)
message, sender = self.okCanal.get()
if message:
#print '%s is alive (%s)\n' % (higher, self.id)
self.inElection = False
return False
continue
if not self.alive:
return False
#I should be the new coordinator, halt those below me
print 'Should be ME : %s \n' % self.id
self.state = 'election'
self.halter = self.id
ups = []
for lower in range(self.id):
self.comm.send('halt', lower)
message, sender = self.okCanal.get()
if message:
ups.append(lower)
#Broadcast Victory
self.state = 'reorganization'
for up in ups:
self.comm.send('victory', up)
message, sender = self.okCanal.get()
if message:
continue
print 'Something is wrong... let\'s start over\n'
return self.election()
self.state = 'normal'
self.active = True
print '%s Is THE LEADER \n' % self.id
rename_broken_and_stop()
self.inElection = False
return True
class FilteredCanal(object):
def __init__(self, accept, timeout):
self.accept = accept
self.list = []
self.lock = thread.allocate_lock()
self.timeout = timeout
def append(self, message, sender):
if message in self.accept:
self.lock.acquire()
self.list.append([message, sender])
self.lock.release()
def get(self):
start = time.clock()
while (time.clock() - start < self.timeout):
self.lock.acquire()
if self.list:
self.lock.release()
val = self.list[0]
self.list = self.list[1:]
return val
self.lock.release()
return [None, None]
class Wrapper(object):
def __init__(self, timeout=20):
self.read_pipes = [os.fdopen(x) for x in read_pipes]
self.write_pipes = write_pipes
self.canals = []
self.ips = []
self.id = 0
self.timeout = timeout
self.getConnectionInfos()
self.socket = None
def getConnectionInfos(self):
file = open('%(confpath)s', 'r')
params = file.read().split('\n')
file.close()
self.ips = [x.strip("' ") for x in params[0].strip('[],').split(',')]
self.id = int(params[1])
def start(self):
self.getConnectionInfos()
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.bind((self.ips[self.id], port + self.id))
s.listen(5)
def send(self, message, number):
self.getConnectionInfos()
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((self.ips[number], port + number))
s.send(message + (' %s\n' % self.id))
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
def canal(self, accept):
created = FilteredCanal(accept, self.timeout)
self.canals.append(created)
return created
def recv(self):
client, _ = s.accept()
client_message = client.recv(1024)
if client_message:
message, sender = client_message.split()
for canal in self.canals:
canal.append(message, sender)
wrapper = createWrapper(20)
computer = ResilientInstance(wrapper)
#idle waiting for connection infos
while computer.nbComp < 2 :
computer.loadConnectionInfos()
time.sleep(30)
print 'Starting\n'
computer.comm.start()
thread.start_new_thread(computer.listen, ())
thread.start_new_thread(computer.main, ())
thread.start_new_thread(computer.aliveManagement, ())
while True:
continue
......@@ -3,7 +3,6 @@
# Basic server configuration
PidFile "%(pid_file)s"
LockFile "%(lock_file)s"
Listen %(ip)s:%(port)s
ServerAdmin someone@email
DefaultType text/plain
......@@ -22,13 +21,15 @@ CustomLog "%(access_log)s" common
<Directory />
Options FollowSymLinks
AllowOverride None
Order deny,allow
Deny from all
Require all denied
</Directory>
ProxyPass / %(backend_url)s
# List of modules
LoadModule unixd_module modules/mod_unixd.so
LoadModule access_compat_module modules/mod_access_compat.so
LoadModule authz_core_module modules/mod_authz_core.so
LoadModule authz_host_module modules/mod_authz_host.so
LoadModule log_config_module modules/mod_log_config.so
LoadModule setenvif_module modules/mod_setenvif.so
......
......@@ -167,6 +167,7 @@ class AddAuthorizedKey(GenericBaseRecipe):
path_list.append(ssh)
authorized_keys = AuthorizedKeysFile(os.path.join(ssh, 'authorized_keys'))
authorized_keys.append(self.options['key'])
for key in self.options['key'].split(' '):
authorized_keys.append(key)
return path_list
......@@ -31,6 +31,8 @@ import inspect
import re
import urllib
import urlparse
from slapos import slap as slapmodule
from json import loads as unjson
import pkg_resources
import zc.buildout
......@@ -90,7 +92,7 @@ class GenericBaseRecipe(object):
def createPythonScript(self, name, absolute_function, arguments=''):
"""Create a python script using zc.buildout.easy_install.scripts
ok o
* function should look like 'module.function', or only 'function'
if it is a builtin function."""
absolute_function = tuple(absolute_function.rsplit('.', 1))
......@@ -128,6 +130,17 @@ class GenericBaseRecipe(object):
return pkg_resources.resource_filename(name,
'template/%s' % template_name)
def getComputerPartitionInstanceParameterDict(self):
slap_connection = self.buildout['slap-connection']
slap = slapmodule.slap()
slap.initializeConnection(slap_connection['server-url'],
slap_connection.get('key-file'),
slap_connection.get('cert-file'))
computer_partition = slap.registerComputerPartition(slap_connection['computer-id'],
slap_connection['partition-id'])
return computer_partition.getInstanceParameterDict()
def generatePassword(self, len_=32):
"""
The purpose of this method is to generate a password which doesn't change
......
......@@ -31,6 +31,7 @@ import subprocess
from slapos.recipe.librecipe import GenericBaseRecipe
from slapos.recipe.librecipe import filehash
class Recipe(GenericBaseRecipe):
def _options(self, options):
......
......@@ -213,6 +213,7 @@ class Recipe(GenericSlapRecipe, Notify, Callback):
for slave in slaves:
path_list.extend(self.add_slave(slave, known_hosts))
else:
command = [self.options['rdiffbackup-binary']]
self.logger.info("Server mode")
......
......@@ -28,6 +28,7 @@ import logging
from slapos import slap as slapmodule
class Recipe(object):
"""
Request a partition to a slap master.
......@@ -98,8 +99,9 @@ class Recipe(object):
options['computer-id'], options['partition-id']).request
isSlave = options.get('slave', '').lower() in ['y', 'yes', 'true', '1']
print '\n Slave : %s \n' % self.isSlave
return_parameters = []
self.return_parameters = []
if 'return' in options:
return_parameters = [str(parameter).strip()
for parameter in options['return'].split()]
......@@ -119,7 +121,10 @@ class Recipe(object):
for config_parameter in options['config'].split():
partition_parameter_kw[config_parameter] = \
options['config-%s' % config_parameter]
print 'from : %s \n' % options['partition-id']
print 'requested %s (%s), parameters : %s \n\n' % (options['name'],
software_type,
partition_parameter_kw)
self.instance = instance = request(software_url, software_type,
name, partition_parameter_kw=partition_parameter_kw,
filter_kw=filter_kw, shared=isSlave)
......@@ -129,9 +134,14 @@ class Recipe(object):
options['connection-%s' % param] = str(
instance.getConnectionParameter(param))
except slapmodule.NotFoundError:
options['connection-%s' % param] = ''
if self.failed is None:
self.failed = param
try:
self.instance._synced = False
options['connection-%s' % param] = str(
self.instance.getConnectionParameter(param))
except slapmodule.NotFoundError:
options['connection-%s' % param] = ''
if self.failed is None:
self.failed = param
def install(self):
if self.failed is not None:
......
[buildout]
extends = ${template-apache-php:output}
${template-pbsready-export:output}
parts =
apache-proxy
logrotate
logrotate-entry-apache
cron
cron-entry-logrotate
sshkeys-authority
sshkeys-dropbear
dropbear-server
dropbear-server-pbs-authorized-key
[apache-proxy]
recipe = slapos.cookbook:apacheproxy
url = $${slap-parameter:proxy-url}
pid-file = $${basedirectory:run}/apache.pid
lock-file = $${basedirectory:run}/apache.lock
ip = $${slap-network-information:global-ipv6}
port = 8080
error-log = $${directory:httpd-log}/error.log
access-log = $${directory:httpd-log}/access.log
httpd-conf = $${rootdirectory:etc}/apache.conf
wrapper = $${basedirectory:services}/apache
promise = $${basedirectory:promises}/apache
httpd-binary = ${apache:location}/bin/httpd
[sshkeys-directory]
recipe = slapos.cookbook:mkdirectory
requests = $${directory:sshkeys}/requests/
keys = $${directory:sshkeys}/keys/
[sshkeys-authority]
recipe = slapos.cookbook:sshkeys_authority
request-directory = $${sshkeys-directory:requests}
keys-directory = $${sshkeys-directory:keys}
wrapper = $${basedirectory:services}/sshkeys_authority
keygen-binary = ${dropbear:location}/bin/dropbearkey
[sshkeys-dropbear]
<= sshkeys-authority
recipe = slapos.cookbook:sshkeys_authority.request
name = dropbear
type = rsa
executable = $${dropbear-server:wrapper}
public-key = $${dropbear-server:rsa-keyfile}.pub
private-key = $${dropbear-server:rsa-keyfile}
wrapper = $${basedirectory:services}/sshd
[dropbear-server]
recipe = slapos.cookbook:dropbear
host = $${slap-network-information:global-ipv6}
port = 2222
home = $${directory:ssh}
wrapper = $${rootdirectory:bin}/raw_sshd
shell = $${rdiff-backup-server:wrapper}
rsa-keyfile = $${directory:ssh}/server_key.rsa
dropbear-binary = ${dropbear:location}/sbin/dropbear
[dropbear-server-pbs-authorized-key]
<= dropbear-server
recipe = slapos.cookbook:dropbear.add_authorized_key
key = $${slap-parameter:authorized-key}
[rdiff-backup-server]
recipe = slapos.cookbook:pbs
client = false
path = $${directory:htdocs}
wrapper = $${rootdirectory:bin}/rdiffbackup-server
rdiffbackup-binary = ${buildout:bin-directory}/rdiff-backup
[logrotate]
recipe = slapos.cookbook:logrotate
# Binaries
logrotate-binary = ${logrotate:location}/usr/sbin/logrotate
gzip-binary = ${gzip:location}/bin/gzip
gunzip-binary = ${gzip:location}/bin/gunzip
# Directories
wrapper = $${rootdirectory:bin}/logrotate
conf = $${rootdirectory:etc}/logrotate.conf
logrotate-entries = $${directory:logrotate-entries}
backup = $${directory:logrotate-backup}
state-file = $${rootdirectory:srv}/logrotate.status
[logrotate-entry-apache]
<= logrotate
recipe = slapos.cookbook:logrotate.d
name = apache
log = $${apache-proxy:error-log} $${apache-proxy:access-log}
frequency = daily
rotate-num = 30
sharedscripts = true
notifempty = true
create = true
[cron]
recipe = slapos.cookbook:cron
dcrond-binary = ${dcron:location}/sbin/crond
cron-entries = $${directory:cron-entries}
crontabs = $${directory:crontabs}
cronstamps = $${directory:cronstamps}
catcher = $${cron-simplelogger:wrapper}
binary = $${basedirectory:services}/crond
[cron-simplelogger]
recipe = slapos.cookbook:simplelogger
wrapper = $${rootdirectory:bin}/cron_simplelogger
log = $${basedirectory:log}/crond.log