__init__.py 8.27 KB
Newer Older
Marco Mariani's avatar
Marco Mariani committed
1
# -*- coding: utf-8 -*-
Antoine Catton's avatar
Antoine Catton committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
Marco Mariani's avatar
Marco Mariani committed
28 29

import argparse
30
import errno
Antoine Catton's avatar
Antoine Catton committed
31
import gdbm
Marco Mariani's avatar
Marco Mariani committed
32
import json
33
from lockfile import LockFile
Marco Mariani's avatar
Marco Mariani committed
34 35 36
import logging
import logging.handlers
import os
Antoine Catton's avatar
Antoine Catton committed
37
import signal
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
38
import socket
Antoine Catton's avatar
Antoine Catton committed
39
import subprocess
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
40 41 42 43 44
import sys
import SocketServer
import StringIO
import threading

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
# Copied from erp5.util:erp5/util/testnode/ProcessManager.py
def subprocess_capture(p, log, log_prefix, get_output=True):
  def readerthread(input, output, buffer):
    while True:
      data = input.readline()
      if not data:
        break
      if get_output:
        buffer.append(data)
      if log_prefix:
        data = "%s : " % log_prefix +  data
      data = data.rstrip('\n')
      output(data)
  if p.stdout:
    stdout = []
    stdout_thread = threading.Thread(target=readerthread,
                                     args=(p.stdout, log, stdout))
    stdout_thread.daemon = True
    stdout_thread.start()
  if p.stderr:
    stderr = []
    stderr_thread = threading.Thread(target=readerthread,
                                     args=(p.stderr, log, stderr))
    stderr_thread.daemon = True
    stderr_thread.start()
  p.wait()
  if p.stdout:
    stdout_thread.join()
  if p.stderr:
    stderr_thread.join()
  return (p.stdout and ''.join(stdout),
          p.stderr and ''.join(stderr))
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
77 78 79 80 81 82 83 84 85 86 87 88 89

class EqueueServer(SocketServer.ThreadingUnixStreamServer):

  daemon_threads = True

  def __init__(self, *args, **kw):
    self.options = kw.pop('equeue_options')
    SocketServer.ThreadingUnixStreamServer.__init__(self,
                                                    RequestHandlerClass=None,
                                                    *args, **kw)
    # Equeue Specific elements
    self.setLogger(self.options.logfile[0], self.options.loglevel[0])
    self.setDB(self.options.database[0])
90 91
    if getattr(self.options, 'takeover_triggered_file_path', None):
      self.takeover_triggered_file_path = self.options.takeover_triggered_file_path[0]
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
92
    # Lock to only have one command running at the time
93
    self.thread_lock = threading.Lock()
94
    # Lockfile is used by other commands to know if an import is ongoing.
95
    self.lockfile = LockFile(self.options.lockfile)
96
    self.lockfile.break_lock()
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
97 98 99 100 101 102 103 104 105 106 107 108 109 110

  def setLogger(self, logfile, loglevel):
    self.logger = logging.getLogger("EQueue")
    handler = logging.handlers.WatchedFileHandler(logfile, mode='a')
    # Natively support logrotate
    level = logging._levelNames.get(loglevel, logging.INFO)
    self.logger.setLevel(level)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    self.logger.addHandler(handler)

  def setDB(self, database):
    self.db = gdbm.open(database, 'cs', 0700)

111 112 113 114 115 116
  def _hasTakeoverBeenTriggered(self):
    if hasattr(self, 'takeover_triggered_file_path') and \
       os.path.exists(self.takeover_triggered_file_path):
      return True
    return False

117
  def _runCommandIfNeeded(self, command, timestamp):
118 119 120
    if self._hasTakeoverBeenTriggered():
      self.logger.info('Takeover has been triggered, preventing to run import script.')
      return
121
    with self.thread_lock as thread_lock, self.lockfile as lockfile:
122 123 124 125 126 127
      cmd_list = command.split('\0')
      cmd_readable = ' '.join(cmd_list)
      cmd_executable = cmd_list[0]

      if cmd_executable in self.db and timestamp <= int(self.db[cmd_executable]):
        self.logger.info("%s already run.", cmd_readable)
128 129
        return

130
      self.logger.info("Running %s, %s with output:", cmd_readable, timestamp)
131
      try:
132 133 134 135 136 137 138 139 140
        sys.stdout.flush()
        p = subprocess.Popen(cmd_list, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        subprocess_capture(p, self.logger.info, '', True)
        if p.returncode == 0:
          self.logger.info("%s finished successfully.", cmd_readable)
          self.db[cmd_executable] = str(timestamp)
        else:
          self.logger.warning("%s exited with status %s." % (cmd_readable, p.returncode))
141 142
      except subprocess.CalledProcessError as e:
        self.logger.warning("%s exited with status %s. output is: \n %s" % (
143
            cmd_readable,
144 145 146 147
            e.returncode,
            e.output,
        ))

Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
  def process_request_thread(self, request, client_address):
    # Handle request
    self.logger.debug("Connection with file descriptor %d", request.fileno())
    request.settimeout(self.options.timeout)
    request_string = StringIO.StringIO()
    segment = None
    try:
      while segment != '':
        segment = request.recv(1024)
        request_string.write(segment)
    except socket.timeout:
      pass

    command = '127'
    try:
      request_parameters = json.loads(request_string.getvalue())
      timestamp = request_parameters['timestamp']
      command = str(request_parameters['command'])
      self.logger.info("New command %r at %s", command, timestamp)

    except (ValueError, IndexError) :
      self.logger.warning("Error during the unserialization of json "
                          "message of %r file descriptor. The message "
                          "was %r", request.fileno(), request_string.getvalue())

    try:
      request.send(command)
    except:
      self.logger.warning("Couldn't respond to %r", request.fileno())
    self.close_request(request)
178
    self._runCommandIfNeeded(command, timestamp)
179 180 181 182 183 184 185 186
# Well the following function is made for schrodinger's files,
# It will work if the file exists or not
def remove_existing_file(path):
  try:
    os.remove(path)
  except OSError, e:
    if e.errno != errno.ENOENT:
      raise
Antoine Catton's avatar
Antoine Catton committed
187

188 189 190 191 192
def main():
  parser = argparse.ArgumentParser(
    description="Run a single threaded execution queue.")
  parser.add_argument('--database', nargs=1, required=True,
                      help="Path to the database where the last "
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
193
                      "calls are stored")
194 195 196 197 198 199 200
  parser.add_argument('--loglevel', nargs=1,
                      default='INFO',
                      choices=[i for i in logging._levelNames
                               if isinstance(i, str)],
                      required=False)
  parser.add_argument('-l', '--logfile', nargs=1, required=True,
                      help="Path to the log file.")
201 202
  parser.add_argument('-t', '--timeout', nargs=1, required=False,
                      dest='timeout', type=int, default=3)
203 204
  parser.add_argument('--lockfile',
                      help="Path to the lock file created when a command is run")
205 206
  parser.add_argument('--takeover-triggered-file-path', nargs=1, required=False,
                      help="Path to the file created by takeover script to state that it has been triggered.")
207 208 209 210 211
  parser.add_argument('socket', help="Path to the unix socket")

  args = parser.parse_args()

  socketpath = args.socket
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
212 213 214 215

  signal.signal(signal.SIGHUP, lambda *args: sys.exit(-1))
  signal.signal(signal.SIGTERM, lambda *args: sys.exit())

216
  remove_existing_file(socketpath)
217 218 219 220 221
  try:
    server = EqueueServer(socketpath, **{'equeue_options':args})
    server.logger.info("Starting server on %r", socketpath)
    server.serve_forever()
  finally:
222
    remove_existing_file(socketpath)
223
    os.kill(0, 9)
224 225 226

if __name__ == '__main__':
  main()