Commit c50a0c1e authored by Marco Mariani's avatar Marco Mariani

Release 0.40.4 (pbs: support for transaction-id)

parent d4a6a9f3
0.40.4 (2014-10-14)
-------------------
* The equeue server now supports additional parameters to the callbacks, separated by \0 in the JSON protocol.
The pubsubnotifier accepts a --transaction-id parameter. If missing, the current timestamp is used.
The pubsubnotifier then provides the transaction-id value in the POST call to /notify.
Together, these three changes allow us to put in the crontab:
0 0 * * * /srv/slapgrid/slappartXX/bin/exporter --transaction-id `date +%s`
and the same timestamp value is reported as transaction id in the equeue logs for BOTH the pull
and push operations.
0.40.3 (2014-10-13) 0.40.3 (2014-10-13)
------------------- -------------------
......
...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages ...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
import glob import glob
import os import os
version = '0.40.3' version = '0.40.4'
name = 'slapos.toolbox' name = 'slapos.toolbox'
long_description = open("README.txt").read() + "\n" + \ long_description = open("README.txt").read() + "\n" + \
open("CHANGES.txt").read() + "\n" open("CHANGES.txt").read() + "\n"
......
...@@ -74,23 +74,27 @@ class EqueueServer(SocketServer.ThreadingUnixStreamServer): ...@@ -74,23 +74,27 @@ class EqueueServer(SocketServer.ThreadingUnixStreamServer):
def _runCommandIfNeeded(self, command, timestamp): def _runCommandIfNeeded(self, command, timestamp):
with self.lock: with self.lock:
if command in self.db and timestamp <= int(self.db[command]): cmd_list = command.split('\0')
self.logger.info("%s already run.", command) cmd_readable = ' '.join(cmd_list)
cmd_executable = cmd_list[0]
if cmd_executable in self.db and timestamp <= int(self.db[cmd_executable]):
self.logger.info("%s already run.", cmd_readable)
return return
self.logger.info("Running %s, %s with output:", command, timestamp) self.logger.info("Running %s, %s with output:", cmd_readable, timestamp)
try: try:
self.logger.info( self.logger.info(
subprocess.check_output([command], stderr=subprocess.STDOUT) subprocess.check_output(cmd_list, stderr=subprocess.STDOUT)
) )
self.logger.info("%s finished successfully.", command) self.logger.info("%s finished successfully.", cmd_readable)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
self.logger.warning("%s exited with status %s. output is: \n %s" % ( self.logger.warning("%s exited with status %s. output is: \n %s" % (
command, cmd_readable,
e.returncode, e.returncode,
e.output, e.output,
)) ))
self.db[command] = str(timestamp) self.db[cmd_executable] = str(timestamp)
def process_request_thread(self, request, client_address): def process_request_thread(self, request, client_address):
# Handle request # Handle request
......
...@@ -68,8 +68,8 @@ def get_feed(feed): ...@@ -68,8 +68,8 @@ def get_feed(feed):
{'Content-Type': 'application/atom+xml'} {'Content-Type': 'application/atom+xml'}
) )
@app.route('/notify', methods=['POST']) @app.route('/notify/<int:transaction_id>', methods=['POST'])
def notify(): def notify(transaction_id):
global app global app
try: try:
feed = feedparser.parse(request.data) feed = feedparser.parse(request.data)
...@@ -94,7 +94,7 @@ def notify(): ...@@ -94,7 +94,7 @@ def notify():
timestamp = int(math.floor(time.mktime(feed.feed.updated_parsed))) timestamp = int(math.floor(time.mktime(feed.feed.updated_parsed)))
equeue_request = json.dumps({ equeue_request = json.dumps({
'command': callback, 'command': '%s\0--transaction-id\0%s' % (callback, transaction_id),
'timestamp': timestamp, 'timestamp': timestamp,
}) })
......
...@@ -28,6 +28,9 @@ def main(): ...@@ -28,6 +28,9 @@ def main():
help="Notification url") help="Notification url")
parser.add_argument('--executable', nargs=1, dest='executable', parser.add_argument('--executable', nargs=1, dest='executable',
help="Executable to wrap") help="Executable to wrap")
parser.add_argument('--transaction-id', nargs=1, dest='transaction_id',
type=int, required=False,
help="Additional parameter for notification-url")
args = parser.parse_args() args = parser.parse_args()
...@@ -75,15 +78,23 @@ def main(): ...@@ -75,15 +78,23 @@ def main():
some_notification_failed = False some_notification_failed = False
for notif_url in args.notification_url: for notif_url in args.notification_url:
notification_url = urlparse.urlparse(notif_url) notification_url = urlparse.urlparse(notif_url)
notification_port = notification_url.port notification_port = notification_url.port
if notification_port is None: if notification_port is None:
notification_port = socket.getservbyname(notification_url.scheme) notification_port = socket.getservbyname(notification_url.scheme)
notification_path = notification_url.path
if not notification_path.endswith('/'):
notification_path += '/'
transaction_id = args.transaction_id[0] if args.transaction_id else int(time.time()*1e6)
notification_path += str(transaction_id)
headers = {'Content-Type': feed.info().getheader('Content-Type')} headers = {'Content-Type': feed.info().getheader('Content-Type')}
try: try:
notification = httplib.HTTPConnection(notification_url.hostname, notification = httplib.HTTPConnection(notification_url.hostname,
notification_port) notification_port)
notification.request('POST', notification_url.path, body, headers) notification.request('POST', notification_path, body, headers)
response = notification.getresponse() response = notification.getresponse()
if not (200 <= response.status < 300): if not (200 <= response.status < 300):
sys.stderr.write("The remote server at %s didn't send a successful reponse.\n" % notif_url) sys.stderr.write("The remote server at %s didn't send a successful reponse.\n" % notif_url)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment