Commit 1b6f936f authored by Marco Mariani's avatar Marco Mariani

removed unused bully script - resilience now simpler and faster.

parent ee253abb
......@@ -40,44 +40,8 @@ class Recipe(GenericSlapRecipe):
def _install(self):
path_list = []
confpath = os.path.join(self.options['etc'], 'bully.conf')
ip_list = self.parameter_dict['ip-list']
print 'Creating bully configuration with ips : %s\n' % ip_list
conf = self.createFile(confpath,
self.substituteTemplate(
self.getTemplateFilename('bully.conf.in'),
{
'self_id': int(self.parameter_dict['number']),
'ip_list': ip_list
}
))
path_list.append(conf)
slap_connection = self.buildout['slap-connection']
if self.optionIsTrue('enable-bully-service', default=False):
bully_dir = self.options['services']
else:
bully_dir = self.options['bin']
bully_wrapper = self.createPythonScript(
name=os.path.join(bully_dir, self.options['wrapper-bully']),
absolute_function='slapos.recipe.addresiliency.bully.run',
arguments={
'confpath': confpath,
'server_url': slap_connection['server-url'],
'key_file': slap_connection.get('key-file'),
'cert_file': slap_connection.get('cert-file'),
'computer_id': slap_connection['computer-id'],
'partition_id': slap_connection['partition-id'],
'software': slap_connection['software-release-url'],
'namebase': self.parameter_dict['namebase'],
})
path_list.append(bully_wrapper)
takeover_wrapper = self.createPythonScript(
name=os.path.join(self.options['bin'], self.options['wrapper-takeover']),
absolute_function='slapos.recipe.addresiliency.takeover.run',
......
# -*- coding: utf-8 -*-
import logging
import Queue
import socket
import thread
import time
import slapos.recipe.addresiliency.renamer
import slapos
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
BASE_PORT = 50000
SLEEPING_MINS = 2 # XXX was 30, increase after testing
MSG_PING = 'ping'
MSG_HALT = 'halt'
MSG_VICTORY = 'victory'
MSG_OK = 'ok'
STATE_NORMAL = 'normal'
STATE_WAITINGCONFIRM = 'waitingConfirm'
STATE_ELECTION = 'election'
STATE_REORGANIZATION = 'reorganization'
## Leader is always number 0
class ResilientInstance(object):
def __init__(self, comm, renamer, confpath):
self.comm = comm
self.participant_id = 0
self.state = STATE_NORMAL
self.halter_id = 0
self.inElection = False
self.alive = True
self.mainCanal = self.comm.create_canal([MSG_PING, MSG_HALT, MSG_VICTORY])
self.renamer = renamer
self.okCanal = self.comm.create_canal([MSG_OK])
self.confpath = confpath
self.loadConnectionInfo()
def loadConnectionInfo(self):
params = open(self.confpath, 'r').readlines()
self.total_participants = len(params[0].split())
new_id = int(params[1])
if self.participant_id != new_id:
self.halter_id = new_id
self.participant_id = new_id
log.debug('I am {} of {}'.format(self.participant_id, self.total_participants))
## Needs to be changed to use the master
def aliveManagement(self):
while self.alive:
log.info('XXX sleeping for %d minutes' % SLEEPING_MINS)
time.sleep(SLEEPING_MINS*60)
if self.participant_id == 0:
continue
self.comm.send(MSG_PING, 0)
message, sender = self.okCanal.get()
if message:
continue
self.election()
def listen(self):
while self.alive:
self.comm.recv()
def main(self):
while self.alive:
message, sender = self.mainCanal.get()
if message == MSG_PING:
self.comm.send(MSG_OK, sender)
elif message == MSG_HALT:
self.state = STATE_WAITINGCONFIRM
self.halter_id = int(sender)
self.comm.send(MSG_OK, sender)
elif message == MSG_VICTORY:
if int(sender) == self.halter_id and self.state == STATE_WAITINGCONFIRM:
log.info('{} thinks {} is the leader'.format(self.participant_id, sender))
self.comm.send(MSG_OK, sender)
self.state = STATE_NORMAL
def election(self):
self.inElection = True
self.loadConnectionInfo()
# Check if I'm the highest instance alive
for higher in range(self.participant_id + 1, self.total_participants):
self.comm.send(MSG_PING, higher)
message, sender = self.okCanal.get()
if message:
log.info('{} is alive ({})'.format(higher, self.participant_id))
self.inElection = False
return False
continue
if not self.alive:
return False
# I should be the new coordinator, halt those below me
log.info('Should be ME : {}'.format(self.participant_id))
self.state = STATE_ELECTION
self.halter_id = self.participant_id
ups = []
for lower in range(self.participant_id):
self.comm.send(MSG_HALT, lower)
message, sender = self.okCanal.get()
if message:
ups.append(lower)
#Broadcast Victory
self.state = STATE_REORGANIZATION
for up in ups:
self.comm.send(MSG_VICTORY, up)
message, sender = self.okCanal.get()
if message:
continue
log.info('Something is wrong... let\'s start over')
return self.election()
self.state = STATE_NORMAL
self.active = True
log.info('{} Is THE LEADER'.format(self.participant_id))
self.renamer.failover()
self.inElection = False
return True
class FilteredCanal(object):
def __init__(self, accept, timeout):
self.accept = accept
self.queue = Queue.Queue()
self.timeout = timeout
def append(self, message, sender):
if message in self.accept:
self.queue.put([message, sender])
def get(self):
try:
return self.queue.get(timeout=self.timeout)
except Queue.Empty:
return [None, None]
class Wrapper(object):
def __init__(self, confpath, timeout=20):
self.canals = []
self.ips = []
self.participant_id = 0
self.timeout = timeout
self.confpath = confpath
self.getConnectionInfo()
self.socket = None
def getConnectionInfo(self):
params = open(self.confpath, 'r').readlines()
self.ips = params[0].split()
self.participant_id = int(params[1])
log.debug('I am {} of {}'.format(self.participant_id, self.ips))
def start(self):
self.getConnectionInfo()
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.bind((self.ips[self.participant_id], BASE_PORT + self.participant_id))
self.socket.listen(5)
def send(self, message, number):
self.getConnectionInfo()
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((self.ips[number], BASE_PORT + number))
s.send(message + (' {}\n'.format(self.participant_id)))
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
def create_canal(self, accept):
created = FilteredCanal(accept, self.timeout)
self.canals.append(created)
return created
def recv(self):
client, _ = self.socket.accept()
client_message = client.recv(1024)
if client_message:
message, sender = client_message.split()
for canal in self.canals:
canal.append(message, int(sender))
def run(args):
confpath = args.pop('confpath')
renamer = slapos.recipe.addresiliency.renamer.Renamer(server_url = args.pop('server_url'),
key_file = args.pop('key_file'),
cert_file = args.pop('cert_file'),
computer_guid = args.pop('computer_id'),
partition_id = args.pop('partition_id'),
software_release = args.pop('software'),
namebase = args.pop('namebase'))
if args:
raise ValueError('Unknown arguments: %s' % ', '.join(args))
wrapper = Wrapper(confpath=confpath, timeout=20)
computer = ResilientInstance(wrapper, renamer=renamer, confpath=confpath)
# idle waiting for connection infos
while computer.total_participants < 2:
computer.loadConnectionInfo()
time.sleep(30)
log.info('Starting')
computer.comm.start()
thread.start_new_thread(computer.listen, ())
thread.start_new_thread(computer.aliveManagement, ())
computer.main()
#!%(executable)s
import select
import socket
import threading
import time
import sys
sys.path[:] = %(syspath)s
import slapos
from slapos import slap as slapmodule
port = 50000
size = 1024
wait = True
def loadConnectionInfos():
connectionInfos = {}
file = open('%(confpath)s', 'r')
params = file.read().split('\n')
file.close()
ip_list = [x.strip("' ") for x in params[0].strip('[],').split(',')]
connectionInfos['self_id'] = int(params[1])
connectionInfos['server_list'] = \
[(i, ip_list[i]) for i in range(len(ip_list))]
connectionInfos['self_ip'] = ip_list[connectionInfos['self_id']]
return connectionInfos
def rename_broken_and_stop():
try:
slap = slapmodule.slap()
slap.initializeConnection('%(server_url)s',
'%(key_file)s',
'%(cert_file)s')
computer_partition = slap.registerComputerPartition(computer_guid='%(computer_id)s',
partition_id='%(partition_id)s')
broken = computer_partition.request(software_release='%(software)s',
software_type='frozen',
partition_reference='%(namebase)s0')
broken.rename('broken-%%s' %% (time.strftime("%%d-%%b_%%H:%%M:%%S", time.gmtime())))
broken.stopped()
computer_partition.rename('%(namebase)s0')
print 'renaming done\n'
except slapos.slap.slap.ServerError:
print 'Internal server error\n'
def election():
global wait
connection = loadConnectionInfos()
message = "%%s, %%s" %% (connection['self_id'], "Election")
victory = True
for (remote_id, addr) in connection['server_list']:
if remote_id > connection['self_id']:
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((addr, port + remote_id))
s.send(message)
reply = s.recv(size)
if reply == "%%s, %%s" %% (remote_id, "Alive"):
victory = False
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
if victory:
wait = True
for (remote_id, addr) in connection['server_list']:
if remote_id < connection['self_id']:
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((addr, port + remote_id))
s.send("%%s, %%s" %% (connection['self_id'], "Victory"))
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
rename_broken_and_stop()
def failure_detect():
global wait
connection = loadConnectionInfos()
while True:
time.sleep(30)
if wait:
print 'waiting 30 minutes\n'
time.sleep(30 * 60)
wait = False
if not connection['server_list'][0]:
continue
(remote_id, addr) = connection['server_list'][0]
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((addr, port + remote_id))
s.close()
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
s.close()
election()
failure_detect_thread = threading.Thread(target=failure_detect)
failure_detect_thread.start()
connection = loadConnectionInfos()
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.bind((connection['self_ip'], port + connection['self_id']))
s.listen(5)
#election()
while True:
force_election = False
client, _ = s.accept()
client_message = client.recv(1024)
if client_message:
client_id, message = client_message.split(', ')
client_id = eval(client_id)
if message == "Victory":
wait = True
print "%%s wins" %% client_id
elif message == "Election":
print "%%s starts an election" %% client_id
if client_id < connection['self_id']:
client.send("%%s, %%s" %% (connection['self_id'], "Alive"))
force_election = True
client.close()
if force_election:
election()
#!%(executable)s
import logging
import os
import socket
import sys
import thread
import time
sys.path[:] = %(syspath)s
from slapos import slap as slapmodule
import slapos
BASE_PORT = 50000
SLEEPING_MINS = 2
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
class Renamer(object):
def __init__(self, server_url, key_file, cert_file, computer_guid,
partition_id, software_release, namebase):
self.server_url = server_url
self.key_file = key_file
self.cert_file = cert_file
self.computer_guid = computer_guid
self.partition_id = partition_id
self.software_release = software_release
self.namebase = namebase
def _failover(self):
slap = slapmodule.slap()
slap.initializeConnection(self.server_url,
self.key_file,
self.cert_file)
computer_partition = slap.registerComputerPartition(computer_guid=self.computer_guid,
partition_id=self.partition_id)
broken = computer_partition.request(software_release=self.software_release,
software_type='frozen',
partition_reference=self.namebase+'0')
broken.rename('broken-{}'.format(time.strftime("%%d-%%b_%%H:%%M:%%S", time.gmtime())))
broken.stopped()
computer_partition.rename(self.namebase+'0')
def failover(self):
try:
log.info('renaming done')
except slapos.slap.slap.ServerError:
log.info('Internal server error')
## Leader is always number 0
class ResilientInstance(object):
def __init__(self, comm, renamer, confpath):
self.comm = comm
self.id = 0
self.state = 'normal'
self.halter = 0
self.inElection = False
self.alive = True
self.lastPing = time.clock()
self.mainCanal = self.comm.canal(['ping', 'halt', 'victory'])
self.renamer = renamer
self.okCanal = self.comm.canal(['ok'])
self.confpath = confpath
self.loadConnectionInfos()
def loadConnectionInfos(self):
file = open(self.confpath, 'r')
params = file.read().split('\n')
file.close()
self.nbComp = len([x.strip("' ") for x in params[0].strip('[],').split(',')])
new_id = int(params[1])
if self.id != new_id:
self.halter = new_id
self.id = new_id
## Needs to be changed to use the master
def aliveManagement(self):
while self.alive:
log.info('XXX sleeping for %%d minutes' %% SLEEPING_MINS)
time.sleep(SLEEPING_MINS*60)
if self.id == 0:
continue
self.comm.send('ping', 0)
message, sender = self.okCanal.get()
if message:
continue
self.election()
def listen(self):
while self.alive:
self.comm.recv()
def main(self):
while self.alive:
message, sender = self.mainCanal.get()
if message == 'ping':
self.comm.send('ok', sender)
elif message == 'halt':
self.state = 'waitingConfirm'
self.halter = sender
self.comm.send('ok', sender)
elif message == 'victory':
if int(sender) == int(self.halter) and self.state == 'waitingConfirm':
log.info('{} thinks {} is the leader'.format(self.id, sender))
self.comm.send('ok', sender)
self.state = 'normal'
def election(self):
self.inElection = True
self.loadConnectionInfos()
#Check if I'm the highest instance alive
for higher in range(self.id + 1, self.nbComp):
self.comm.send('ping', higher)
message, sender = self.okCanal.get()
if message:
log.info('{} is alive ({})'.format(higher, self.id))
self.inElection = False
return False
continue
if not self.alive:
return False
#I should be the new coordinator, halt those below me
log.info('Should be ME : {}'.format(self.id))
self.state = 'election'
self.halter = self.id
ups = []
for lower in range(self.id):
self.comm.send('halt', lower)
message, sender = self.okCanal.get()
if message:
ups.append(lower)
#Broadcast Victory
self.state = 'reorganization'
for up in ups:
self.comm.send('victory', up)
message, sender = self.okCanal.get()
if message:
continue
log.info('Something is wrong... let\'s start over')
return self.election()
self.state = 'normal'
self.active = True
log.info('{} Is THE LEADER'.format(self.id))
self.renamer.failover()
self.inElection = False
return True
class FilteredCanal(object):
def __init__(self, accept, timeout):
self.accept = accept
self.list = []
self.lock = thread.allocate_lock()
self.timeout = timeout
def append(self, message, sender):
if message in self.accept:
self.lock.acquire()
self.list.append([message, sender])
self.lock.release()
def get(self):
start = time.clock()
while (time.clock() - start < self.timeout):
self.lock.acquire()
if self.list:
self.lock.release()
return self.list.pop(0)
self.lock.release()
return [None, None]
class Wrapper(object):
def __init__(self, confpath, timeout=20):
self.canals = []
self.ips = []
self.id = 0
self.timeout = timeout
self.confpath = confpath
self.getConnectionInfos()
self.socket = None
def getConnectionInfos(self):
file = open(self.confpath, 'r')
params = file.read().split('\n')
file.close()
self.ips = [x.strip("' ") for x in params[0].strip('[],').split(',')]
self.id = int(params[1])
def start(self):
self.getConnectionInfos()
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.bind((self.ips[self.id], BASE_PORT + self.id))
self.socket.listen(5)
def send(self, message, number):
self.getConnectionInfos()
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((self.ips[number], BASE_PORT + number))
s.send(message + (' {}\n'.format(self.id)))
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
def canal(self, accept):
created = FilteredCanal(accept, self.timeout)
self.canals.append(created)
return created
def recv(self):
client, _ = self.socket.accept()
client_message = client.recv(1024)
if client_message:
message, sender = client_message.split()
for canal in self.canals:
canal.append(message, int(sender))
def main():
renamer = Renamer(server_url = '%(server_url)s',
key_file = '%(key_file)s',
cert_file = '%(cert_file)s',
computer_guid = '%(computer_id)s',
partition_id = '%(partition_id)s',
software_release = '%(software)s',
namebase = '%(namebase)s')
confpath = '%(confpath)s'
wrapper = Wrapper(confpath=confpath, timeout=20)
computer = ResilientInstance(wrapper, renamer=renamer, confpath=confpath)
#idle waiting for connection infos
while computer.nbComp < 2 :
computer.loadConnectionInfos()
time.sleep(30)
log.info('Starting')
computer.comm.start()
thread.start_new_thread(computer.listen, ())
thread.start_new_thread(computer.main, ())
thread.start_new_thread(computer.aliveManagement, ())
while True:
# XXX tight loop
continue
if __name__ == '__main__':
main()
......@@ -112,7 +112,7 @@ context = key buildout buildout:bin-directory
import-list = file parts template-parts:destination
file replicated template-replicated:destination
md5sum = f6dc9290194ad1fbe1ced553f5668922
md5sum = 1640ebe7f8ced607ce1ec93c4f6e5413
mode = 0644
[instance-mariadb]
......
......@@ -19,7 +19,7 @@ parts +=
[request-apache]
return = url ssh-public-key ssh-url notification-id ip url backend_url
# XXX: hardcoded values
config = domain number authorized-key notify ip-list namebase mariadb1-computer-guid pbs-mariadb1-computer-guid mariadb2-computer-guid pbs-mariadb2-computer-guid mariadb3-computer-guid pbs-mariadb3-computer-guid
config = domain number authorized-key notify namebase mariadb1-computer-guid pbs-mariadb1-computer-guid mariadb2-computer-guid pbs-mariadb2-computer-guid mariadb3-computer-guid pbs-mariadb3-computer-guid
config-mariadb1-computer-guid = ${slap-parameter:mariadb1-computer-guid}
config-pbs-mariadb1-computer-guid = ${slap-parameter:pbs-mariadb1-computer-guid}
config-mariadb2-computer-guid = ${slap-parameter:mariadb2-computer-guid}
......
......@@ -100,7 +100,7 @@ context = key buildout buildout:bin-directory
import-list = file parts template-parts:destination
file replicated template-replicated:destination
md5sum = ef38aa9810ce20960382261f235abfcd
md5sum = 26da0631906c2267b82703c4d4f4a911
mode = 0644
[instance-postgres]
......