Commit c462fa96 authored by Marco Mariani's avatar Marco Mariani

refactor: main function, Renamer class, logging

parent 7985acb4
#!%(executable)s #!%(executable)s
import logging
import os
import socket import socket
import time
import sys import sys
import thread import thread
import os import time
sys.path[:] = %(syspath)s sys.path[:] = %(syspath)s
from slapos import slap as slapmodule from slapos import slap as slapmodule
import slapos import slapos
port = 50000 BASE_PORT = 50000
SLEEPING_MINS = 2
arg_server_url = '%(server_url)s' log = logging.getLogger(__name__)
arg_key_file = '%(key_file)s' logging.basicConfig(level=logging.DEBUG)
arg_cert_file = '%(cert_file)s'
arg_computer_id = '%(computer_id)s'
arg_partition_id = '%(partition_id)s'
arg_software = '%(software)s' class Renamer(object):
arg_namebase = '%(namebase)s' def __init__(self, server_url, key_file, cert_file, computer_guid,
arg_confpath = '%(confpath)s' partition_id, software_release, namebase):
self.server_url = server_url
self.key_file = key_file
self.cert_file = cert_file
def rename_broken_and_stop(): self.computer_guid = computer_guid
try: self.partition_id = partition_id
slap = slapmodule.slap() self.software_release = software_release
slap.initializeConnection(arg_server_url, self.namebase = namebase
arg_key_file,
arg_cert_file) def _failover(self):
computer_partition = slap.registerComputerPartition(computer_guid=arg_computer_id, slap = slapmodule.slap()
partition_id=arg_partition_id) slap.initializeConnection(self.server_url,
broken = computer_partition.request(software_release=arg_software, self.key_file,
software_type='frozen', self.cert_file)
partition_reference=arg_namebase+'0') computer_partition = slap.registerComputerPartition(computer_guid=self.computer_guid,
partition_id=self.partition_id)
broken = computer_partition.request(software_release=self.software_release,
software_type='frozen',
partition_reference=self.namebase+'0')
broken.rename('broken-{}'.format(time.strftime("%%d-%%b_%%H:%%M:%%S", time.gmtime())))
broken.stopped()
computer_partition.rename(self.namebase+'0')
def failover(self):
try:
log.info('renaming done')
except slapos.slap.slap.ServerError:
log.info('Internal server error')
broken.rename('broken-{}'.format(time.strftime("%%d-%%b_%%H:%%M:%%S", time.gmtime())))
broken.stopped()
computer_partition.rename(arg_namebase+'0')
print 'renaming done\n'
except slapos.slap.slap.ServerError:
print 'Internal server error\n'
## Leader is always number 0 ## Leader is always number 0
class ResilientInstance(object): class ResilientInstance(object):
def __init__(self, comm): def __init__(self, comm, renamer, confpath):
self.comm = comm self.comm = comm
self.id = 0 self.id = 0
self.state = 'normal' self.state = 'normal'
...@@ -58,15 +67,15 @@ class ResilientInstance(object): ...@@ -58,15 +67,15 @@ class ResilientInstance(object):
self.alive = True self.alive = True
self.lastPing = time.clock() self.lastPing = time.clock()
self.mainCanal = self.comm.canal(['ping', 'halt', self.mainCanal = self.comm.canal(['ping', 'halt', 'victory'])
'victory'])
self.renamer = renamer
self.okCanal = self.comm.canal(['ok']) self.okCanal = self.comm.canal(['ok'])
self.confpath = confpath
self.loadConnectionInfos() self.loadConnectionInfos()
def loadConnectionInfos(self): def loadConnectionInfos(self):
file = open(arg_confpath, 'r') file = open(self.confpath, 'r')
params = file.read().split('\n') params = file.read().split('\n')
file.close() file.close()
self.nbComp = len([x.strip("' ") for x in params[0].strip('[],').split(',')]) self.nbComp = len([x.strip("' ") for x in params[0].strip('[],').split(',')])
...@@ -78,8 +87,8 @@ class ResilientInstance(object): ...@@ -78,8 +87,8 @@ class ResilientInstance(object):
## Needs to be changed to use the master ## Needs to be changed to use the master
def aliveManagement(self): def aliveManagement(self):
while self.alive: while self.alive:
print 'XXX sleeping for 30 minutes' log.info('XXX sleeping for %%d minutes' %% SLEEPING_MINS)
time.sleep(30*60) time.sleep(SLEEPING_MINS*60)
if self.id == 0: if self.id == 0:
continue continue
self.comm.send('ping', 0) self.comm.send('ping', 0)
...@@ -96,7 +105,7 @@ class ResilientInstance(object): ...@@ -96,7 +105,7 @@ class ResilientInstance(object):
while self.alive: while self.alive:
message, sender = self.mainCanal.get() message, sender = self.mainCanal.get()
if message == 'ping': if message == 'ping':
self.comm.send('ok', sender) self.comm.send('ok', sender)
elif message == 'halt': elif message == 'halt':
self.state = 'waitingConfirm' self.state = 'waitingConfirm'
...@@ -105,7 +114,7 @@ class ResilientInstance(object): ...@@ -105,7 +114,7 @@ class ResilientInstance(object):
elif message == 'victory': elif message == 'victory':
if int(sender) == int(self.halter) and self.state == 'waitingConfirm': if int(sender) == int(self.halter) and self.state == 'waitingConfirm':
print '{} thinks {} is the leader\n'.format(self.id, sender) log.info('{} thinks {} is the leader'.format(self.id, sender))
self.comm.send('ok', sender) self.comm.send('ok', sender)
self.state = 'normal' self.state = 'normal'
...@@ -117,7 +126,7 @@ class ResilientInstance(object): ...@@ -117,7 +126,7 @@ class ResilientInstance(object):
self.comm.send('ping', higher) self.comm.send('ping', higher)
message, sender = self.okCanal.get() message, sender = self.okCanal.get()
if message: if message:
#print '{} is alive ({})\n'.format(higher, self.id) log.info('{} is alive ({})'.format(higher, self.id))
self.inElection = False self.inElection = False
return False return False
continue continue
...@@ -126,7 +135,7 @@ class ResilientInstance(object): ...@@ -126,7 +135,7 @@ class ResilientInstance(object):
return False return False
#I should be the new coordinator, halt those below me #I should be the new coordinator, halt those below me
print 'Should be ME : {} \n'.format(self.id) log.info('Should be ME : {}'.format(self.id))
self.state = 'election' self.state = 'election'
self.halter = self.id self.halter = self.id
ups = [] ups = []
...@@ -143,13 +152,13 @@ class ResilientInstance(object): ...@@ -143,13 +152,13 @@ class ResilientInstance(object):
message, sender = self.okCanal.get() message, sender = self.okCanal.get()
if message: if message:
continue continue
print 'Something is wrong... let\'s start over\n' log.info('Something is wrong... let\'s start over')
return self.election() return self.election()
self.state = 'normal' self.state = 'normal'
self.active = True self.active = True
print '{} Is THE LEADER \n'.format(self.id) log.info('{} Is THE LEADER'.format(self.id))
rename_broken_and_stop() self.renamer.failover()
self.inElection = False self.inElection = False
...@@ -183,16 +192,17 @@ class FilteredCanal(object): ...@@ -183,16 +192,17 @@ class FilteredCanal(object):
class Wrapper(object): class Wrapper(object):
def __init__(self, timeout=20): def __init__(self, confpath, timeout=20):
self.canals = [] self.canals = []
self.ips = [] self.ips = []
self.id = 0 self.id = 0
self.timeout = timeout self.timeout = timeout
self.confpath = confpath
self.getConnectionInfos() self.getConnectionInfos()
self.socket = None self.socket = None
def getConnectionInfos(self): def getConnectionInfos(self):
file = open(arg_confpath, 'r') file = open(self.confpath, 'r')
params = file.read().split('\n') params = file.read().split('\n')
file.close() file.close()
self.ips = [x.strip("' ") for x in params[0].strip('[],').split(',')] self.ips = [x.strip("' ") for x in params[0].strip('[],').split(',')]
...@@ -201,14 +211,14 @@ class Wrapper(object): ...@@ -201,14 +211,14 @@ class Wrapper(object):
def start(self): def start(self):
self.getConnectionInfos() self.getConnectionInfos()
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.bind((self.ips[self.id], port + self.id)) self.socket.bind((self.ips[self.id], BASE_PORT + self.id))
self.socket.listen(5) self.socket.listen(5)
def send(self, message, number): def send(self, message, number):
self.getConnectionInfos() self.getConnectionInfos()
try: try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((self.ips[number], port + number)) s.connect((self.ips[number], BASE_PORT + number))
s.send(message + (' {}\n'.format(self.id))) s.send(message + (' {}\n'.format(self.id)))
except (socket.error, socket.herror, socket.gaierror, socket.timeout): except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass pass
...@@ -229,24 +239,40 @@ class Wrapper(object): ...@@ -229,24 +239,40 @@ class Wrapper(object):
canal.append(message, int(sender)) canal.append(message, int(sender))
wrapper = Wrapper(20)
computer = ResilientInstance(wrapper) def main():
renamer = Renamer(server_url = '%(server_url)s',
key_file = '%(key_file)s',
cert_file = '%(cert_file)s',
computer_guid = '%(computer_id)s',
partition_id = '%(partition_id)s',
software_release = '%(software)s',
namebase = '%(namebase)s')
confpath = '%(confpath)s'
wrapper = Wrapper(confpath=confpath, timeout=20)
computer = ResilientInstance(wrapper, renamer=renamer, confpath=confpath)
#idle waiting for connection infos
while computer.nbComp < 2 :
computer.loadConnectionInfos()
time.sleep(30)
#idle waiting for connection infos log.info('Starting')
while computer.nbComp < 2 :
computer.loadConnectionInfos()
time.sleep(30)
print 'Starting\n' computer.comm.start()
thread.start_new_thread(computer.listen, ())
thread.start_new_thread(computer.main, ())
thread.start_new_thread(computer.aliveManagement, ())
computer.comm.start() while True:
thread.start_new_thread(computer.listen, ()) # XXX tight loop
thread.start_new_thread(computer.main, ()) continue
thread.start_new_thread(computer.aliveManagement, ())
while True:
# XXX tight loop
continue
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment