Commit fd078caf authored by Marco Mariani's avatar Marco Mariani

Merge remote-tracking branch 'origin/resiliency_annotated'

parents 3cf3f4fa af114351
0.31.2 (Unrelease)
================= 0.31.2 (unreleased)
* No change yet. ===================
* pubsub: support multiple notifications and callbacks. [Marco Mariani]
* pubsub: print/return errors from subprocess or notifications. [Marco Mariani]
0.3.1 (2012-10-02) 0.3.1 (2012-10-02)
================= =================
......
# -*- coding: utf-8 -*-
# vim: set et sts=2:
############################################################################## ##############################################################################
# #
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved. # Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
...@@ -24,23 +26,23 @@ ...@@ -24,23 +26,23 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# #
############################################################################## ##############################################################################
import argparse
import gdbm import gdbm
from json import loads as unjson import json
import logging
import logging.handlers
import os
import Queue import Queue
import select import select
from StringIO import StringIO import StringIO
import socket import socket
import os
import logging
import logging.handlers
import signal import signal
import subprocess import subprocess
import argparse
cleanup_data = {} cleanup_data = {}
def cleanup(signum=None, frame=None): def cleanup(signum=None, frame=None):
global cleanup_data
cleanup_functions = dict( cleanup_functions = dict(
sockets=lambda sock: sock.close(), sockets=lambda sock: sock.close(),
subprocesses=lambda process: process.terminate(), subprocesses=lambda process: process.terminate(),
...@@ -48,6 +50,7 @@ def cleanup(signum=None, frame=None): ...@@ -48,6 +50,7 @@ def cleanup(signum=None, frame=None):
) )
for data, function in cleanup_functions.iteritems(): for data, function in cleanup_functions.iteritems():
for item in cleanup_data.get(data, []): for item in cleanup_data.get(data, []):
# XXX will these lists ever have more than 1 element??
# Swallow everything ! # Swallow everything !
try: try:
function(item) function(item)
...@@ -89,7 +92,6 @@ class TaskRunner(object): ...@@ -89,7 +92,6 @@ class TaskRunner(object):
self._command = None self._command = None
def run(self, command, time): def run(self, command, time):
global cleanup_data
self._time = time self._time = time
self._command = command self._command = command
self._task = subprocess.Popen([command], stdin=subprocess.PIPE, self._task = subprocess.Popen([command], stdin=subprocess.PIPE,
...@@ -106,8 +108,6 @@ class TaskRunner(object): ...@@ -106,8 +108,6 @@ class TaskRunner(object):
return self._task.stdout.fileno() return self._task.stdout.fileno()
def main(): def main():
global cleanup_data
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Run a single threaded execution queue.") description="Run a single threaded execution queue.")
parser.add_argument('--database', nargs=1, required=True, parser.add_argument('--database', nargs=1, required=True,
...@@ -169,7 +169,7 @@ def main(): ...@@ -169,7 +169,7 @@ def main():
conn.settimeout(args.timeout) conn.settimeout(args.timeout)
request_string = StringIO() request_string = StringIO.StringIO()
segment = None segment = None
try: try:
while segment != '': while segment != '':
...@@ -180,7 +180,7 @@ def main(): ...@@ -180,7 +180,7 @@ def main():
command = '127' command = '127'
try: try:
request = unjson(request_string.getvalue()) request = json.loads(request_string.getvalue())
timestamp = request['timestamp'] timestamp = request['timestamp']
command = str(request['command']) command = str(request['command'])
task_queue.put([command, timestamp]) task_queue.put([command, timestamp])
...@@ -231,3 +231,4 @@ def main(): ...@@ -231,3 +231,4 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
main() main()
from datetime import datetime from datetime import datetime
import csv import csv
import feedparser import feedparser
import io
import socket import socket
import json import json
import time import time
...@@ -82,24 +83,28 @@ def notify(): ...@@ -82,24 +83,28 @@ def notify():
except AttributeError: except AttributeError:
abort(httplib.BAD_REQUEST) abort(httplib.BAD_REQUEST)
with open(callback_filepath, 'r') as callback_file:
callback = callback_file.read()
abort_it = False
timestamp = int(math.floor(time.mktime(feed.feed.updated_parsed))) for callback in io.open(callback_filepath, 'r', encoding='utf8'):
timestamp = int(math.floor(time.mktime(feed.feed.updated_parsed)))
equeue_request = json.dumps(dict( equeue_request = json.dumps({
command=callback, 'command': callback,
timestamp=timestamp, 'timestamp': timestamp,
)) })
equeue_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) equeue_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
equeue_socket.connect(app.config['EQUEUE_SOCKET']) equeue_socket.connect(app.config['EQUEUE_SOCKET'])
equeue_socket.send(equeue_request) equeue_socket.send(equeue_request)
result = equeue_socket.recv(len(callback)) result = equeue_socket.recv(len(callback))
equeue_socket.close() equeue_socket.close()
if result != callback: if result != callback:
abort_it = True
if abort_it:
# XXX if possible, communicate info about the failed callbacks
abort(httplib.INTERNAL_SERVER_ERROR) abort(httplib.INTERNAL_SERVER_ERROR)
return '', httplib.NO_CONTENT return '', httplib.NO_CONTENT
......
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import math import argparse
import subprocess
import os
import uuid
import csv import csv
import time
import urllib2
from urlparse import urlparse
import httplib import httplib
import os
import socket import socket
import subprocess
import sys import sys
import argparse import time
import urllib2
import urlparse
import uuid
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
...@@ -40,13 +40,16 @@ def main(): ...@@ -40,13 +40,16 @@ def main():
command.stdin.flush() command.stdin.flush()
command.stdin.close() command.stdin.close()
if command.wait() != 0: command_failed = (command.wait() != 0)
command_stderr = command.stderr.read()
if command_failed:
content = ("<p>Failed with returncode <em>%d</em>.</p>" content = ("<p>Failed with returncode <em>%d</em>.</p>"
"<p>Standard error output is :</p><pre>%s</pre>") % ( "<p>Standard error output is :</p><pre>%s</pre>") % (
command.poll(), command.poll(),
command.stderr.read().replace('&', '&amp;')\ command_stderr.replace('&', '&amp;')\
.replace('<', '&lt;')\ .replace('<', '&lt;')\
.replace('>', '&gt;'), .replace('>', '&gt;'),
) )
else: else:
content = "<p>Everything went well.</p>" content = "<p>Everything went well.</p>"
...@@ -54,30 +57,40 @@ def main(): ...@@ -54,30 +57,40 @@ def main():
with open(args.logfile[0], 'a') as file_: with open(args.logfile[0], 'a') as file_:
cvsfile = csv.writer(file_) cvsfile = csv.writer(file_)
cvsfile.writerow([ cvsfile.writerow([
int(math.floor(time.time())), # Timestamp int(time.time()),
args.title[0], args.title[0],
content, content,
'slapos:%s' % uuid.uuid4(), 'slapos:%s' % uuid.uuid4(),
]) ])
if command_failed:
sys.stderr.write('%s\n' % command_stderr)
sys.exit(1)
feed = urllib2.urlopen(args.feed_url[0]) feed = urllib2.urlopen(args.feed_url[0])
body = feed.read()
some_notification_failed = False
for notif_url in args.notification_url: for notif_url in args.notification_url:
notification_url = urlparse(notif_url) notification_url = urlparse.urlparse(notif_url)
notification_port = notification_url.port notification_port = notification_url.port
if notification_port is None: if notification_port is None:
notification_port = socket.getservbyname(notification_url.scheme) notification_port = socket.getservbyname(notification_url.scheme)
headers = {'Content-Type': feed.info().getheader('Content-Type')} headers = {'Content-Type': feed.info().getheader('Content-Type')}
notification = httplib.HTTPConnection(notification_url.hostname, notification = httplib.HTTPConnection(notification_url.hostname,
notification_port) notification_port)
notification.request('POST', notification_url.path, feed.read(), headers) notification.request('POST', notification_url.path, body, headers)
response = notification.getresponse() response = notification.getresponse()
if not (200 <= response.status < 300): if not (200 <= response.status < 300):
print >> sys.stderr, "The remote server didn't send a successfull reponse." sys.stderr.write("The remote server at %s didn't send a successful reponse.\n" % notif_url)
print >> sys.stderr, "It's response was %r" % response.reason sys.stderr.write("Its response was %r\n" % response.reason)
return 1 some_notification_failed = True
return 0
if some_notification_failed:
sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':
main() main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment