Commit 39dbe6d2 authored by Nicolas Wavrant's avatar Nicolas Wavrant

checkfeedaspromise: new python script which can be used to create promises on RSS feeds

parent 8128ac04
......@@ -65,6 +65,7 @@ setup(name=name,
'console_scripts': [
'agent = slapos.agent.agent:main',
'check-web-page-http-cache-hit = slapos.promise.check_web_page_http_cache_hit:main',
'check-feed-as-promise = slapos.checkfeedaspromise:main',
'clouddestroy = slapos.cloudmgr.destroy:main',
'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main',
'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main',
......
# Command line script to test a RSS feed in a promise
# Checks that a given pattern can be found (or not) in the title or the
# description of the latest feed item.
# A time buffer option can be given, to determine if the emitter process is in
# a stalled state, in the case that no OK pattern has been found
import argparse
import datetime
import feedparser
import sys
def parseArguments():
parser = argparse.ArgumentParser()
parser.add_argument('--feed-path', dest='feed_path',
help='Path or Url of the feed to search')
parser.add_argument('--title', dest='title', action='store_true',
help='Patterns should be looked for in feed item\'s title')
parser.add_argument('--description', dest='description', action='store_true',
help='Patterns should be looked for in feed item\'s description')
parser.add_argument('--ok-pattern', dest='ok_pattern_list', action='append',
default=[],
help='If this pattern is found, then promise succeeds')
parser.add_argument('--ko-pattern', dest='ko_pattern_list', action='append',
default=[],
help='If this pattern is found, then promise fails')
parser.add_argument('--time-buffer', dest='time_buffer', type=int,
default=0,
help='Time delta in seconds before the promise really succeeds or fails')
return parser.parse_args()
def containsPattern(string, pattern_list):
for pattern in pattern_list:
if string.find(pattern) >= 0:
return True
return False
def checkFeedAsPromise(feed, option):
feed = feedparser.parse(feed)
if feed.bozo:
return 'Feed malformed'
last_item = feed.entries[-1]
if option.title:
candidate_string = last_item.title
elif option.description:
candidate_string = last_item.description
else:
return 'At least one in [--title|--description] should be provided'
publication_date = datetime.datetime(*last_item.published_parsed[:7])
publication_age = datetime.datetime.now() - publication_date
time_buffer = datetime.timedelta(seconds=option.time_buffer)
ok_pattern_found = containsPattern(candidate_string, option.ok_pattern_list)
ko_pattern_found = containsPattern(candidate_string, option.ko_pattern_list)
if ok_pattern_found and ko_pattern_found:
return 'Both OK and KO patterns found: please check arguments'
# Expectations fulfilled
if ok_pattern_found:
return ''
if ko_pattern_found:
return 'KO pattern found'
if not ok_pattern_found:
if publication_age < time_buffer:
# We have to wait for buffer to expire
return ''
else:
# If time-buffer is out, we are in stalled state
return 'Stalled situation'
# If not ok, and not stalled, what can have possibly happen ?
return 'Something went wrong'
def main():
option = parseArguments()
result = checkFeedAsPromise(option.feed_path, option)
if len(result) > 0:
sys.exit(result)
else:
sys.exit(0)
if __name__ == '__main__':
main()
import datetime
import feedparser
import time
import unittest
import PyRSS2Gen as RSS2
from slapos.checkfeedaspromise import checkFeedAsPromise
class Option(dict):
def __init__(self, **kw):
self.__dict__.update(kw)
def __setitem__(i, y):
self.__dict__[i] = y
class TestCheckFeedAsPromise(unittest.TestCase):
def getOptionObject(self, **kw):
"""
Returns an object containing options as properties, to simulate a call
to the tested script
"""
option = {
'title': False,
'description': False,
'time_buffer': 0,
'ok_pattern_list': [],
'ko_pattern_list': [],
}
option.update(kw)
return Option(**option)
def generateFeed(self, item_list):
return RSS2.RSS2(
title="Feed Title",
link="http://exemple.com",
description="Feed Description",
items=[RSS2.RSSItem(**item) for item in item_list]
).to_xml()
def generateOKFeed(self, extra_item_list=None):
item_list = [{
'title': 'Doing Something',
'description': 'work work work',
'pubDate': datetime.datetime.now(),
}, {
'title': 'Something Finished: OK',
'description': 'OK FINISHED DONE BASTA',
'pubDate': datetime.datetime.now(),
}]
if isinstance(extra_item_list, list):
item_list.append(extra_item_list)
return self.generateFeed(item_list)
def generateKOFeed(self, extra_item_list=None):
item_list = [{
'title': 'Doing Something',
'description': 'work work work',
'pubDate': datetime.datetime.now(),
}, {
'title': 'Something Finished: Error',
'description': 'FAILURE oops Arghh',
'pubDate': datetime.datetime.now(),
}]
if isinstance(extra_item_list, list):
item_list.extend(extra_item_list)
return self.generateFeed(item_list)
def test_ifOKFoundNoErrorReturned(self):
option = self.getOptionObject()
option.title = True
feed = self.generateOKFeed()
option.ok_pattern_list = ['OK']
self.assertEquals(checkFeedAsPromise(feed, option), "")
option.title, option.description = False, True
option.ok_pattern_list = ['DONE', 'OK']
self.assertEquals(checkFeedAsPromise(feed, option), "")
def test_ifKOFoundErrorReturned(self):
option = self.getOptionObject()
option.title = True
feed = self.generateKOFeed()
option.ko_pattern_list = ['Error']
self.assertNotEquals(checkFeedAsPromise(feed, option), "")
option.title, option.description = False, True
option.ko_pattern_list = ['FAILURE', 'Error']
self.assertNotEquals(checkFeedAsPromise(feed, option), "")
def test_ifNoOKPatternFoundErrorIsRaised(self):
option = self.getOptionObject()
option.title = True
feed = self.generateKOFeed()
# If no time buffer, then not OK is always wrong
option.ok_pattern_list = ['OK']
self.assertNotEquals(len(checkFeedAsPromise(feed, option)), 0)
# if time buffer, then not OK is wrong only after buffer expires
extra_item = {
'title': 'Something is Starting',
'description': 'Very long operation, but should last less than 1h',
'pubDate': datetime.datetime.now() - datetime.timedelta(seconds=3600),
}
feed = self.generateKOFeed([extra_item,])
option.time_buffer = 4000
# buffer longer than last item's age
self.assertEquals(checkFeedAsPromise(feed, option), "")
# shorter buffer, we want to raise an error
option.time_buffer = 1800
self.assertNotEquals(len(checkFeedAsPromise(feed, option)), 0)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment