Commit a77c935c authored by Nicolas Wavrant's avatar Nicolas Wavrant

generatefeed: new helper script to easily create RSS2 Feeds from a bunch of...

generatefeed: new helper script to easily create RSS2 Feeds from a bunch of well-formated JSON files
parent 6734fca2
......@@ -73,6 +73,7 @@ setup(name=name,
'cloudstart = slapos.cloudmgr.start:main',
'cloudstop = slapos.cloudmgr.stop:main',
'equeue = slapos.equeue:main',
'generatefeed = slapos.generatefeed:main',
'htpasswd = slapos.htpasswd:main',
'is-local-tcp-port-opened = slapos.promise.is_local_tcp_port_opened:main',
'is-process-older-than-dependency-set = slapos.promise.is_process_older_than_dependency_set:main',
# Command-line script to generate a RSS feed from a bunch of well-formated
# JSON items.
# This script tries to be the more generic possible. The items used to generate
# the feed must be JSON-formatted (because of simplicity to read/write them),
# and their keys must follow the names of elements of items as described
# in the RSS2 specification :
import argparse
import collections
import datetime
import json
import os
import PyRSS2Gen as rss
def parseArguments():
parser = argparse.ArgumentParser()
parser.add_argument('--output', dest='output', type=str, required=True,
help='Path where to save the file')
parser.add_argument('--status-item-path', dest='status_item_path',
type=str, required=True,
help='Path where to find feed items')
parser.add_argument('--max-item', dest='max_item', type=int,
default=50, required=False,
help='Maximum number of items in the feed')
parser.add_argument('--title', dest='feed_title', type=str, required=True,
help='Title of the feed')
parser.add_argument('--link', dest='feed_link', type=str, required=True,
help='Link of the feed')
parser.add_argument('--description', dest='feed_description',
type=str, required=False,
help='Description of the feed')
option = parser.parse_args()
if not hasattr(option, 'feed_description'):
option.feed_description = option.feed_title
return option
def deleteFileList(file_list):
for file in file_list:
except OSError:
def getRSSItemListFromItemDict(item_dict):
rss_item_list = []
for item in item_dict:
item_dict[item]['pubDate'] = datetime.datetime.fromtimestamp(item_dict[item]['pubDate'])
return rss_item_list
def generateFeed(option):
item_dict = {} # {file: content}
for filename in os.listdir(option.status_item_path):
file_path = os.path.join(option.status_item_path, filename)
with open(file_path, 'r') as fd:
item_dict[file_path] = json.load(fd)
sorted_item_dict = collections.OrderedDict(
sorted(item_dict.items(), key=lambda x: x[1]['pubDate']))
# Reduces feed if number of items exceeds max_item
if len(item_dict) > option.max_item:
outdated_key_list = sorted_item_dict.keys()[option.max_item:]
for outdated_key in outdated_key_list:
del sorted_item_dict[outdated_key]
# Generate feed
feed = rss.RSS2(
lastBuildDate =,
items = getRSSItemListFromItemDict(sorted_item_dict)
return feed.to_xml()
def main():
option = parseArguments()
feed = generateFeed(option)
open(option.output, 'w').write(feed)
if __name__ == "__main__":
import collections
import datetime
import feedparser
import json
import os
import shutil
import tempfile
import time
import unittest
from slapos.generatefeed import generateFeed
class Option(dict):
def __init__(self, **kw):
def __setitem__(i, y):
self.__dict__[i] = y
class TestGenerateFeed(unittest.TestCase):
def setUp(self):
self.item_directory = tempfile.mkdtemp(dir='.')
self.feed_path = os.path.join(self.item_directory, 'path')
def tearDown(self):
def getOptionObject(self, **kw):
Returns an object containing options as properties, to simulate a call
to the tested script
option = {
'output': self.feed_path,
'status_item_path': self.item_directory,
'max_item': 50,
'feed_title': 'Feed title',
'feed_link': '',
'feed_description': 'Feed description',
return Option(**option)
def saveAsStatusItem(self, filename, content):
Save a JSON at filename in self.item_directory as a status item
path = os.path.join(self.item_directory, filename)
with open(path, 'w') as status_file:
def createItemSample(self):
Populate item_directory with a few sample items
item = [
# Last in alphabet, first in pubDate
{'description': 'description is OK too',
'link': "",
'pubDate': time.mktime(datetime.datetime(2000, 1, 1).timetuple()),
'title': 'everything is OK',
# First in pubDate, last in alphabet
{'description': 'what went wrong ?',
'link': "",
'pubDate': time.mktime(datetime.datetime(2000, 12, 31).timetuple()),
'title': 'I guess we have an ERROR',
for filename, content in item:
self.saveAsStatusItem(filename, content)
def test_feedItemsAreSortedByDate(self):
option = self.getOptionObject()
content_feed = generateFeed(option)
feed = feedparser.parse(content_feed)
start_date = None
for item in feed.entries:
if start_date is None:
start_date = item.published_parsed
self.assertLessEqual(start_date, item.published_parsed)
def test_generateFeedCleanStatusDirectoryIfTooManyItems(self):
option = self.getOptionObject()
option.max_item = 10
# Creates items more than allowed
item_dummy_content = {
'description': 'dummy description',
'link': "",
'pubDate': time.mktime(,
'title': 'dummy title',
for i in range(15):
filename = '%s.item' % i
self.saveAsStatusItem(filename, item_dummy_content)
content_feed = generateFeed(option)
feed = feedparser.parse(content_feed)
# Feed entries number should be limited
self.assertEqual(len(feed.entries), option.max_item)
# Status item directory should have been cleaned
self.assertEqual(len(os.listdir(self.item_directory)), option.max_item)
if __name__ == '__main__':
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment