__init__.py 7.5 KB
Newer Older
Marco Mariani's avatar
Marco Mariani committed
1
# -*- coding: utf-8 -*-
Antoine Catton's avatar
Antoine Catton committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
Marco Mariani's avatar
Marco Mariani committed
28 29

import argparse
30
import errno
Antoine Catton's avatar
Antoine Catton committed
31
import gdbm
Marco Mariani's avatar
Marco Mariani committed
32
import json
33
from lockfile import LockFile
Marco Mariani's avatar
Marco Mariani committed
34 35 36
import logging
import logging.handlers
import os
Antoine Catton's avatar
Antoine Catton committed
37
import signal
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
38
import socket
Antoine Catton's avatar
Antoine Catton committed
39
import subprocess
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
40 41 42 43 44
import sys
import SocketServer
import StringIO
import threading

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
# Copied from erp5.util:erp5/util/testnode/ProcessManager.py
def subprocess_capture(p, log, log_prefix, get_output=True):
  def readerthread(input, output, buffer):
    while True:
      data = input.readline()
      if not data:
        break
      if get_output:
        buffer.append(data)
      if log_prefix:
        data = "%s : " % log_prefix +  data
      data = data.rstrip('\n')
      output(data)
  if p.stdout:
    stdout = []
    stdout_thread = threading.Thread(target=readerthread,
                                     args=(p.stdout, log, stdout))
    stdout_thread.daemon = True
    stdout_thread.start()
  if p.stderr:
    stderr = []
    stderr_thread = threading.Thread(target=readerthread,
                                     args=(p.stderr, log, stderr))
    stderr_thread.daemon = True
    stderr_thread.start()
  p.wait()
  if p.stdout:
    stdout_thread.join()
  if p.stderr:
    stderr_thread.join()
  return (p.stdout and ''.join(stdout),
          p.stderr and ''.join(stderr))
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
77 78 79 80 81 82 83 84 85 86 87 88 89 90

class EqueueServer(SocketServer.ThreadingUnixStreamServer):

  daemon_threads = True

  def __init__(self, *args, **kw):
    self.options = kw.pop('equeue_options')
    SocketServer.ThreadingUnixStreamServer.__init__(self,
                                                    RequestHandlerClass=None,
                                                    *args, **kw)
    # Equeue Specific elements
    self.setLogger(self.options.logfile[0], self.options.loglevel[0])
    self.setDB(self.options.database[0])
    # Lock to only have one command running at the time
91 92
    self.thread_lock = threading.Lock()
    self.lockfile = LockFile(self.options.lockfile)
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106

  def setLogger(self, logfile, loglevel):
    self.logger = logging.getLogger("EQueue")
    handler = logging.handlers.WatchedFileHandler(logfile, mode='a')
    # Natively support logrotate
    level = logging._levelNames.get(loglevel, logging.INFO)
    self.logger.setLevel(level)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    self.logger.addHandler(handler)

  def setDB(self, database):
    self.db = gdbm.open(database, 'cs', 0700)

107
  def _runCommandIfNeeded(self, command, timestamp):
108
    with self.thread_lock as thread_lock, self.lockfile as lockfile:
109 110 111 112 113 114
      cmd_list = command.split('\0')
      cmd_readable = ' '.join(cmd_list)
      cmd_executable = cmd_list[0]

      if cmd_executable in self.db and timestamp <= int(self.db[cmd_executable]):
        self.logger.info("%s already run.", cmd_readable)
115 116
        return

117
      self.logger.info("Running %s, %s with output:", cmd_readable, timestamp)
118
      try:
119 120 121 122 123 124 125 126 127
        sys.stdout.flush()
        p = subprocess.Popen(cmd_list, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        subprocess_capture(p, self.logger.info, '', True)
        if p.returncode == 0:
          self.logger.info("%s finished successfully.", cmd_readable)
          self.db[cmd_executable] = str(timestamp)
        else:
          self.logger.warning("%s exited with status %s." % (cmd_readable, p.returncode))
128 129
      except subprocess.CalledProcessError as e:
        self.logger.warning("%s exited with status %s. output is: \n %s" % (
130
            cmd_readable,
131 132 133 134
            e.returncode,
            e.output,
        ))

Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
  def process_request_thread(self, request, client_address):
    # Handle request
    self.logger.debug("Connection with file descriptor %d", request.fileno())
    request.settimeout(self.options.timeout)
    request_string = StringIO.StringIO()
    segment = None
    try:
      while segment != '':
        segment = request.recv(1024)
        request_string.write(segment)
    except socket.timeout:
      pass

    command = '127'
    try:
      request_parameters = json.loads(request_string.getvalue())
      timestamp = request_parameters['timestamp']
      command = str(request_parameters['command'])
      self.logger.info("New command %r at %s", command, timestamp)

    except (ValueError, IndexError) :
      self.logger.warning("Error during the unserialization of json "
                          "message of %r file descriptor. The message "
                          "was %r", request.fileno(), request_string.getvalue())

    try:
      request.send(command)
    except:
      self.logger.warning("Couldn't respond to %r", request.fileno())
    self.close_request(request)
165
    self._runCommandIfNeeded(command, timestamp)
166 167 168 169 170 171 172 173
# Well the following function is made for schrodinger's files,
# It will work if the file exists or not
def remove_existing_file(path):
  try:
    os.remove(path)
  except OSError, e:
    if e.errno != errno.ENOENT:
      raise
Antoine Catton's avatar
Antoine Catton committed
174

175 176 177 178 179
def main():
  parser = argparse.ArgumentParser(
    description="Run a single threaded execution queue.")
  parser.add_argument('--database', nargs=1, required=True,
                      help="Path to the database where the last "
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
180
                      "calls are stored")
181 182 183 184 185 186 187
  parser.add_argument('--loglevel', nargs=1,
                      default='INFO',
                      choices=[i for i in logging._levelNames
                               if isinstance(i, str)],
                      required=False)
  parser.add_argument('-l', '--logfile', nargs=1, required=True,
                      help="Path to the log file.")
188 189
  parser.add_argument('-t', '--timeout', nargs=1, required=False,
                      dest='timeout', type=int, default=3)
190 191
  parser.add_argument('--lockfile',
                      help="Path to the lock file created when a command is run")
192 193 194 195 196
  parser.add_argument('socket', help="Path to the unix socket")

  args = parser.parse_args()

  socketpath = args.socket
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
197 198 199 200

  signal.signal(signal.SIGHUP, lambda *args: sys.exit(-1))
  signal.signal(signal.SIGTERM, lambda *args: sys.exit())

201
  remove_existing_file(socketpath)
202 203 204 205 206
  try:
    server = EqueueServer(socketpath, **{'equeue_options':args})
    server.logger.info("Starting server on %r", socketpath)
    server.serve_forever()
  finally:
207
    remove_existing_file(socketpath)
208
    os.kill(0, 9)
209 210 211

if __name__ == '__main__':
  main()