__init__.py 8.48 KB
Newer Older
Marco Mariani's avatar
Marco Mariani committed
1
# -*- coding: utf-8 -*-
Antoine Catton's avatar
Antoine Catton committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
Marco Mariani's avatar
Marco Mariani committed
28 29

import argparse
30
import errno
31
from six.moves import dbm_gnu as gdbm
Marco Mariani's avatar
Marco Mariani committed
32
import json
33
from lockfile import LockFile
Marco Mariani's avatar
Marco Mariani committed
34 35 36
import logging
import logging.handlers
import os
Antoine Catton's avatar
Antoine Catton committed
37
import signal
38
import socket
Antoine Catton's avatar
Antoine Catton committed
39
import subprocess
40
import sys
41 42
from six.moves import socketserver
import io
43 44
import threading

45 46 47 48 49 50 51 52
try:
  logging_levels = logging._nameToLevel
  logging_choices = logging_levels.keys()
except AttributeError:
  logging_levels = logging._levelNames
  logging_choices = [i for i in logging_levels
                     if isinstance(i, str)]

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
# Copied from erp5.util:erp5/util/testnode/ProcessManager.py
def subprocess_capture(p, log, log_prefix, get_output=True):
  def readerthread(input, output, buffer):
    while True:
      data = input.readline()
      if not data:
        break
      if get_output:
        buffer.append(data)
      if log_prefix:
        data = "%s : " % log_prefix +  data
      data = data.rstrip('\n')
      output(data)
  if p.stdout:
    stdout = []
    stdout_thread = threading.Thread(target=readerthread,
                                     args=(p.stdout, log, stdout))
    stdout_thread.daemon = True
    stdout_thread.start()
  if p.stderr:
    stderr = []
    stderr_thread = threading.Thread(target=readerthread,
                                     args=(p.stderr, log, stderr))
    stderr_thread.daemon = True
    stderr_thread.start()
  p.wait()
  if p.stdout:
    stdout_thread.join()
  if p.stderr:
    stderr_thread.join()
  return (p.stdout and ''.join(stdout),
          p.stderr and ''.join(stderr))
85

86
class EqueueServer(socketserver.ThreadingUnixStreamServer):
87 88 89 90 91

  daemon_threads = True

  def __init__(self, *args, **kw):
    self.options = kw.pop('equeue_options')
92 93 94
    socketserver.ThreadingUnixStreamServer.__init__(self,
                                                    RequestHandlerClass=None,
                                                    *args, **kw)
95
    # Equeue Specific elements
96 97
    self.setLogger(self.options.logfile, self.options.loglevel)
    self.setDB(self.options.database)
98
    if getattr(self.options, 'takeover_triggered_file_path', None):
99
      self.takeover_triggered_file_path = self.options.takeover_triggered_file_path
100
    # Lock to only have one command running at the time
101
    self.thread_lock = threading.Lock()
102
    # Lockfile is used by other commands to know if an import is ongoing.
103
    self.lockfile = LockFile(self.options.lockfile)
104
    self.lockfile.break_lock()
105 106 107 108 109

  def setLogger(self, logfile, loglevel):
    self.logger = logging.getLogger("EQueue")
    handler = logging.handlers.WatchedFileHandler(logfile, mode='a')
    # Natively support logrotate
110
    level = logging_levels.get(loglevel, logging.INFO)
111 112 113 114 115 116
    self.logger.setLevel(level)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    self.logger.addHandler(handler)

  def setDB(self, database):
117
    self.db = gdbm.open(database, 'cs', 0o700)
118

119 120 121 122 123 124
  def _hasTakeoverBeenTriggered(self):
    if hasattr(self, 'takeover_triggered_file_path') and \
       os.path.exists(self.takeover_triggered_file_path):
      return True
    return False

125
  def _runCommandIfNeeded(self, command, timestamp):
126
    with self.thread_lock as thread_lock, self.lockfile as lockfile:
127 128 129
      if self._hasTakeoverBeenTriggered():
        self.logger.info('Takeover has been triggered, preventing to run import script.')
        return
130 131 132 133 134 135
      cmd_list = command.split('\0')
      cmd_readable = ' '.join(cmd_list)
      cmd_executable = cmd_list[0]

      if cmd_executable in self.db and timestamp <= int(self.db[cmd_executable]):
        self.logger.info("%s already run.", cmd_readable)
136 137
        return

138
      self.logger.info("Running %s, %s with output:", cmd_readable, timestamp)
139
      try:
140 141
        sys.stdout.flush()
        p = subprocess.Popen(cmd_list, stdout=subprocess.PIPE,
142
                             stderr=subprocess.PIPE, universal_newlines=True)
143 144 145 146 147 148
        subprocess_capture(p, self.logger.info, '', True)
        if p.returncode == 0:
          self.logger.info("%s finished successfully.", cmd_readable)
          self.db[cmd_executable] = str(timestamp)
        else:
          self.logger.warning("%s exited with status %s." % (cmd_readable, p.returncode))
149 150
      except subprocess.CalledProcessError as e:
        self.logger.warning("%s exited with status %s. output is: \n %s" % (
151
            cmd_readable,
152 153 154 155
            e.returncode,
            e.output,
        ))

156 157 158 159
  def process_request_thread(self, request, client_address):
    # Handle request
    self.logger.debug("Connection with file descriptor %d", request.fileno())
    request.settimeout(self.options.timeout)
160
    request_string = io.BytesIO()
161
    try:
162
      while 1:
163
        segment = request.recv(1024)
164 165
        if not segment:
          break
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
        request_string.write(segment)
    except socket.timeout:
      pass

    command = '127'
    try:
      request_parameters = json.loads(request_string.getvalue())
      timestamp = request_parameters['timestamp']
      command = str(request_parameters['command'])
      self.logger.info("New command %r at %s", command, timestamp)

    except (ValueError, IndexError) :
      self.logger.warning("Error during the unserialization of json "
                          "message of %r file descriptor. The message "
                          "was %r", request.fileno(), request_string.getvalue())

    try:
183
      request.send(command.encode())
184
    except Exception:
185
      self.logger.warning("Couldn't respond to %r", request.fileno(), exc_info=True)
186
    self.close_request(request)
187
    self._runCommandIfNeeded(command, timestamp)
188 189 190 191 192
# Well the following function is made for schrodinger's files,
# It will work if the file exists or not
def remove_existing_file(path):
  try:
    os.remove(path)
193
  except OSError as e:
194 195
    if e.errno != errno.ENOENT:
      raise
Antoine Catton's avatar
Antoine Catton committed
196

197 198 199
def main():
  parser = argparse.ArgumentParser(
    description="Run a single threaded execution queue.")
200
  parser.add_argument('--database', required=True,
201
                      help="Path to the database where the last "
202
                      "calls are stored")
203
  parser.add_argument('--loglevel',
204
                      default='INFO',
205
                      choices=logging_choices,
206
                      required=False)
207
  parser.add_argument('-l', '--logfile', required=True,
208
                      help="Path to the log file.")
209
  parser.add_argument('-t', '--timeout', required=False,
210
                      dest='timeout', type=int, default=3)
211 212
  parser.add_argument('--lockfile',
                      help="Path to the lock file created when a command is run")
213
  parser.add_argument('--takeover-triggered-file-path', required=False,
214
                      help="Path to the file created by takeover script to state that it has been triggered.")
215 216 217 218 219
  parser.add_argument('socket', help="Path to the unix socket")

  args = parser.parse_args()

  socketpath = args.socket
220 221 222 223

  signal.signal(signal.SIGHUP, lambda *args: sys.exit(-1))
  signal.signal(signal.SIGTERM, lambda *args: sys.exit())

224
  remove_existing_file(socketpath)
225 226 227 228 229
  try:
    server = EqueueServer(socketpath, **{'equeue_options':args})
    server.logger.info("Starting server on %r", socketpath)
    server.serve_forever()
  finally:
230
    remove_existing_file(socketpath)
231
    os.kill(0, 9)
232 233 234

if __name__ == '__main__':
  main()