Commit 3b9c0e5d authored by Rafael Monnerat's avatar Rafael Monnerat

re6st-registry: Refactor integration with re6st registry

  * Follow up methods changes on re6stnet registry API.
  * getIPv4Information was simplifed on registry, so reimplement it.
  * Remove scripts generated and keep a single script to orchestate the token management.
  * Drop some code duplication
  * Remove babeld, openvpn and miniupnp as they are not required.
  * Remove useless scripts.
  * Remove re6stnet section, extra-eggs is more them enough and prevent duplications.
parent 6902a67f
......@@ -218,24 +218,6 @@ class Recipe(GenericBaseRecipe):
)
path_list.append(request_add)
request_drop = self.createPythonScript(
self.options['drop-service-wrapper'].strip(),
'%s.re6stnet.requestRemoveToken' % __name__, service_dict
)
path_list.append(request_drop)
request_check = self.createPythonScript(
self.options['check-service-wrapper'].strip(),
'%s.re6stnet.checkService' % __name__, service_dict
)
path_list.append(request_check)
revoke_check = self.createPythonScript(
self.options['revoke-service-wrapper'].strip(),
'%s.re6stnet.requestRevoqueCertificate' % __name__, service_dict
)
path_list.append(revoke_check)
# Send connection parameters of slave instances
if token_dict:
self.slap.initializeConnection(self.server_url, self.key_file,
......
# -*- coding: utf-8 -*-
import httplib
import logging
import json
import os
import time
import sqlite3
import slapos
import traceback
import logging
import socket
import select
from re6st import tunnel, ctl, registry, utils, x509
from OpenSSL import crypto
from re6st import registry
log = logging.getLogger('SLAPOS-RE6STNET')
logging.basicConfig(level=logging.INFO)
logging.trace = logging.debug
class iterRoutes(object):
_waiting = True
def __new__(cls, control_socket, network):
self = object.__new__(cls)
c = ctl.Babel(control_socket, self, network)
c.request_dump()
while self._waiting:
args = {}, {}, ()
c.select(*args)
utils.select(*args)
return (prefix
for neigh_routes in c.neighbours.itervalues()
for prefix in neigh_routes[1]
if prefix)
def babel_dump(self):
self._waiting = False
logging.trace = logging.debug
def loadJsonFile(path):
if os.path.exists(path):
......@@ -57,12 +34,11 @@ def readFile(path):
return content
return ''
def getDb(db_path):
db = sqlite3.connect(db_path, isolation_level=None,
check_same_thread=False)
db.text_factory = str
return db.cursor()
def updateFile(file_path, value):
if readFile(file_path) != value:
writeFile(file_path, value)
return True
return False
def bang(args):
computer_guid = args['computer_id']
......@@ -77,64 +53,62 @@ def bang(args):
partition.bang(message='Published parameters changed!')
log.info("Bang with message 'parameters changed'...")
def requestAddToken(args, can_bang=True):
def requestAddToken(client, base_token_path):
time.sleep(3)
registry_url = args['registry_url']
base_token_path = args['token_base_path']
path_list = [x for x in os.listdir(base_token_path) if x.endswith('.add')]
updated = False
log.info("Searching tokens to add at %s and found %s." % (base_token_path, path_list))
if not path_list:
log.info("No new token to add. Exiting...")
return
client = registry.RegistryClient(registry_url)
call_bang = False
for reference_key in path_list:
request_file = os.path.join(base_token_path, reference_key)
token = readFile(request_file)
log.info("Including token %s for %s" % (token, reference_key))
if token :
reference = reference_key.split('.')[0]
# email is unique as reference is also unique
email = '%s@slapos' % reference.lower()
try:
result = client.requestAddToken(token, email)
result = client.addToken(email, token)
except Exception:
log.debug('Request add token fail for %s... \n %s' % (request_file,
log.info('Request add token fail for %s... \n %s' % (request_file,
traceback.format_exc()))
continue
if result and result == token:
# update information
log.info("New token added for slave instance %s. Updating file status..." %
reference)
writeFile(os.path.join(base_token_path, '%s.status' % reference),
'TOKEN_ADDED')
status_file = os.path.join(base_token_path, '%s.status' % reference)
updateFile(status_file, 'TOKEN_ADDED')
os.unlink(request_file)
call_bang = True
updated = True
else:
log.debug('Bad token. Request add token fail for %s...' % request_file)
if can_bang and call_bang:
bang(args)
return updated
def requestRemoveToken(args):
base_token_path = args['token_base_path']
def requestRemoveToken(client, base_token_path):
path_list = [x for x in os.listdir(base_token_path) if x.endswith('.remove')]
if not path_list:
log.info("No token to delete. Exiting...")
return
client = registry.RegistryClient(args['registry_url'])
for reference_key in path_list:
request_file = os.path.join(base_token_path, reference_key)
token = readFile(request_file)
if token :
reference = reference_key.split('.')[0]
try:
result = client.requestDeleteToken(token)
result = client.deleteToken(token)
except httplib.NOTFOUND:
# Token is alread removed.
result = True
except Exception:
log.debug('Request delete token fail for %s... \n %s' % (request_file,
traceback.format_exc()))
......@@ -142,10 +116,12 @@ def requestRemoveToken(args):
else:
# certificate is invalidated, it will be revoked
writeFile(os.path.join(base_token_path, '%s.revoke' % reference), '')
if result == 'True':
if result in (True, 'True'):
# update information
log.info("Token deleted for slave instance %s. Clean up file status..." %
reference)
if result in ['True', 'False']:
os.unlink(request_file)
status_file = os.path.join(base_token_path, '%s.status' % reference)
......@@ -159,198 +135,96 @@ def requestRemoveToken(args):
log.debug('Bad token. Request add token fail for %s...' % request_file)
def requestRevoqueCertificate(args):
base_token_path = args['token_base_path']
db = getDb(args['db'])
path_list = [x for x in os.listdir(base_token_path) if x.endswith('.revoke')]
client = registry.RegistryClient(args['registry_url'])
for reference_key in path_list:
reference = reference_key.split('.')[0]
# XXX - email is always unique
email = '%s@slapos' % reference.lower()
cert_string = ''
try:
cert_string, = db.execute("SELECT cert FROM cert WHERE email = ?",
(email,)).next()
except StopIteration:
# Certificate was not generated yet !!!
pass
try:
if cert_string:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
cn = x509.subnetFromCert(cert)
result = client.revoke(str(cn))
time.sleep(2)
except Exception:
log.debug('Request revoke certificate fail for %s... \n %s' % (reference,
traceback.format_exc()))
continue
else:
if revokeByMail(args['registry_url'],
'%s@slapos' % reference.lower(),
args['db']):
os.unlink(os.path.join(base_token_path, reference_key))
log.info("Certificate revoked for slave instance %s." % reference)
return
log.info("Failed to revoke email for %s" % reference)
def dumpIPv6Network(slave_reference, db, network, ipv6_file):
email = '%s@slapos' % slave_reference.lower()
try:
cert_string, = db.execute("SELECT cert FROM cert WHERE email = ?",
(email,)).next()
except StopIteration:
# Certificate was not generated yet !!!
pass
try:
if cert_string:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
cn = x509.subnetFromCert(cert)
subnet = network + utils.binFromSubnet(cn)
ipv6 = utils.ipFromBin(subnet)
changed = readFile(ipv6_file) != ipv6
writeFile(ipv6_file, ipv6)
return ipv6, utils.binFromSubnet(cn), changed
except Exception:
log.debug('XXX for %s... \n %s' % (slave_reference,
traceback.format_exc()))
def sendto(sock, prefix, code):
return sock.sendto("%s\0%c" % (prefix, code), ('::1', tunnel.PORT))
def recv(sock, code):
try:
prefix, msg = sock.recv(1<<16).split('\0', 1)
int(prefix, 2)
except ValueError:
pass
else:
if msg and ord(msg[0]) == code:
return prefix, msg[1:]
return None, None
def dumpIPv4Network(ipv6_prefix, network, ipv4_file, sock, peer_prefix_list):
try:
if ipv6_prefix == "00000000000000000000000000000000":
# workarround to ignore the first node
ipv4 = "0.0.0.0"
changed = readFile(ipv4_file) != ipv4
writeFile(ipv4_file, ipv4)
return ipv4, changed
peers = []
peer_list = [prefix for prefix in peer_prefix_list if prefix == ipv6_prefix ]
if len(peer_list) == 0:
raise ValueError("Unable to find such prefix on database")
peer = peer_list[0]
sendto(sock, peer, 1)
s = sock,
timeout = 15
end = timeout + time.time()
while select.select(s, (), (), timeout)[0]:
prefix, msg = recv(sock, 1)
if prefix == peer:
break
timeout = max(0, end - time.time())
else:
logging.info("Timeout while querying address for %s/%s", int(peer, 2), len(peer))
msg = ""
if "," in msg:
ipv4 = msg.split(',')[0]
else:
ipv4 = "0.0.0.0"
changed = readFile(ipv4_file) != ipv4
writeFile(ipv4_file, ipv4)
return ipv4, changed
except Exception:
log.info('XXX for %s... \n %s' % (ipv6_prefix,
traceback.format_exc()))
return "0.0.0.0", False
def checkService(args, can_bang=True):
base_token_path = args['token_base_path']
token_dict = loadJsonFile(args['token_json'])
def checkService(client, base_token_path, token_json):
token_dict = loadJsonFile(token_json)
updated = False
if not token_dict:
return
db = getDb(args['db'])
call_bang = False
computer_guid = args['computer_id']
partition_id = args['partition_id']
slap = slapos.slap.slap()
client = registry.RegistryClient(args['registry_url'])
ca = client.getCa()
network = x509.networkFromCa(crypto.load_certificate(crypto.FILETYPE_PEM, ca))
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
peer_prefix_list = [prefix for prefix in
iterRoutes("/var/run/re6stnet/babeld.sock", network)]
# Check token status
for slave_reference, token in token_dict.iteritems():
log.info("%s %s" % (slave_reference, token))
status_file = os.path.join(base_token_path, '%s.status' % slave_reference)
ipv6_file = os.path.join(base_token_path, '%s.ipv6' % slave_reference)
ipv4_file = os.path.join(base_token_path, '%s.ipv4' % slave_reference)
if not os.path.exists(status_file):
# This token is not added yet!
log.info("Token %s dont exist yet." % status_file)
continue
if not client.isToken(str(token)):
# Token is used to register client
updated = True
updateFile(status_file, 'TOKEN_USED')
log.info("Token status of %s updated to 'used'." % slave_reference)
msg = readFile(status_file)
log.info("Token %s has %s State." % (status_file, msg))
if msg == 'TOKEN_USED':
log.info("Dumping ipv6...")
ipv6, ipv6_prefix, ipv6_changed = dumpIPv6Network(slave_reference, db, network, ipv6_file)
log.info("%s, IPV6 = %s, IPV6_PREFIX = %s" % (slave_reference, ipv6, ipv6_prefix))
_, ipv4_changed = dumpIPv4Network(ipv6_prefix, network, ipv4_file, sock, peer_prefix_list)
if ipv4_changed or ipv6_changed:
call_bang = True
continue
# Check if token is not in the database
status = False
try:
token_found, = db.execute("SELECT token FROM token WHERE token = ?",
(token,)).next()
if token_found == token:
status = True
except StopIteration:
pass
if not status:
# Token is used to register client
call_bang = True
if msg == 'TOKEN_USED':
try:
writeFile(status_file, 'TOKEN_USED')
dumpIPv6Network(slave_reference, db, network, ipv6_file)
dumpIPv4Network(ipv6_prefix, network, ipv4_file, sock, peer_prefix_list)
log.info("Token status of %s updated to 'used'." % slave_reference)
log.info("Dumping ipv6...")
email = '%s@slapos' % slave_reference.lower()
try:
ipv6 = client.getIPv6Address(str(email))
ipv6_file = os.path.join(base_token_path, '%s.ipv6' % slave_reference)
ipv6_changed = updateFile(ipv6_file, ipv6)
except Exception:
log.info('Error for dump ipv6 for %s... \n %s' % (slave_reference,
traceback.format_exc()))
continue
log.info("%s, IPV6 = %s" % (slave_reference, ipv6))
log.info("Dumping ipv4...")
try:
ipv4 = client.getIPv4Information(str(email)) or "0.0.0.0"
ipv4_file = os.path.join(base_token_path, '%s.ipv4' % slave_reference)
ipv4_changed = updateFile(ipv4_file, ipv4)
except Exception:
log.info('Error for dump ipv4 for %s... \n %s' % (slave_reference,
traceback.format_exc()))
continue
log.info("%s, IPV4 = %s" % (slave_reference, ipv4))
except IOError:
# XXX- this file should always exists
log.debug('Error when writing in file %s. Clould not update status of %s...' %
(status_file, slave_reference))
log.debug('Error when writing in file %s. Could not update status of %s...' %
(status_file, slave_reference))
if call_bang and can_bang:
bang(args)
if not updated or ipv4_changed or ipv6_changed:
updated = True
return updated
def manage(args, can_bang=True):
client = registry.RegistryClient(args['registry_url'])
base_token_path = args['token_base_path']
token_json = args['token_json']
def manage(args):
# Request Add new tokens
requestAddToken(args)
has_new_token = requestAddToken(client, base_token_path)
# Request delete removed token
requestRemoveToken(args)
requestRemoveToken(client, base_token_path)
# check status of all token
checkService(args)
changed = checkService(client, base_token_path, token_json)
if (has_new_token or changed) and can_bang:
bang(args)
......@@ -138,9 +138,6 @@ command = {{ re6st_registry }}
wrapper = ${directory:services}/re6st-registry
pid-file = ${directory:run}/registry.pid
manager-wrapper = ${directory:bin}/re6stManageToken
check-service-wrapper = ${directory:bin}/re6stCheckService
drop-service-wrapper = ${directory:bin}/re6stManageDeleteToken
revoke-service-wrapper = ${directory:bin}/re6stRevokeCertificate
openssl-bin = {{ openssl_bin }}/openssl
python-bin = {{ python_bin }}
ipv6-prefix = {{ slapparameter_dict.get('ipv6-prefix', '2001:db8:24::/48') }}
......@@ -167,26 +164,12 @@ recipe = slapos.cookbook:wrapper
wrapper-path = ${directory:script}/re6st-token-manager
command-line = "{{ python_bin }}" ${re6st-registry:manager-wrapper}
[cron-entry-re6st-check]
[cron-entry-re6st-manage]
recipe = slapos.cookbook:cron.d
cron-entries = ${cron:cron-entries}
name = re6stnet-check-token
frequency = */5 * * * *
command = {{ python_bin }} ${re6st-registry:check-service-wrapper}
[cron-entry-re6st-revoke]
recipe = slapos.cookbook:cron.d
cron-entries = ${cron:cron-entries}
name = re6stnet-revoke-cert
frequency = */5 * * * *
command = {{ python_bin }} ${re6st-registry:revoke-service-wrapper}
[cron-entry-re6st-drop]
recipe = slapos.cookbook:cron.d
cron-entries = ${cron:cron-entries}
name = re6stnet-drop-token
frequency = */5 * * * *
command = {{ python_bin }} ${re6st-registry:drop-service-wrapper}
command = {{ python_bin }} ${re6st-registry:manager-wrapper}
[logrotate-entry-re6stnet]
< = logrotate-entry-base
......@@ -227,9 +210,7 @@ parts =
logrotate-entry-re6stnet
re6stnet-manage
cron-entry-logrotate
cron-entry-re6st-check
cron-entry-re6st-drop
cron-entry-re6st-revoke
cron-entry-re6st-manage
apache-httpd
apache-httpd-graceful
publish
......
[buildout]
extends =
../../component/re6stnet/buildout.cfg
../../component/dash/buildout.cfg
../../component/dcron/buildout.cfg
../../component/gzip/buildout.cfg
......@@ -23,8 +22,6 @@ parts +=
slapos-cookbook
eggs
dash
babeld
re6stnet
template
[eggs]
......@@ -45,7 +42,6 @@ eggs =
${python-cffi:egg}
${python-cryptography:egg}
pyOpenSSL
miniupnpc
re6stnet
[download-base]
......@@ -87,7 +83,7 @@ extra-context =
[template-re6stnet]
< = download-base
filename = instance-re6stnet.cfg.in
md5sum = 6e9452d283e82e2f512a9f9edb17fe3a
md5sum = 4596d91cef4184b97d257a68478b6330
[template-apache-conf]
< = download-base
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment