equeue: create a lockfile when running importer command.

It will be used by other components of the import instance to know if
we are in a stable state or if update of backup is ongoing.
parent 6021e54b
...@@ -34,6 +34,7 @@ setup(name=name, ...@@ -34,6 +34,7 @@ setup(name=name,
'atomize', # needed by pubsub 'atomize', # needed by pubsub
'feedparser', # needed by pubsub 'feedparser', # needed by pubsub
'apache_libcloud>=0.4.0', # needed by cloudmgr 'apache_libcloud>=0.4.0', # needed by cloudmgr
'lockfile', # used by equeue
'lxml', # needed for xml parsing 'lxml', # needed for xml parsing
'paramiko', # needed by cloudmgr 'paramiko', # needed by cloudmgr
'psutil', # needed for playing with processes in portable way 'psutil', # needed for playing with processes in portable way
......
...@@ -30,6 +30,7 @@ import argparse ...@@ -30,6 +30,7 @@ import argparse
import errno import errno
import gdbm import gdbm
import json import json
from lockfile import LockFile
import logging import logging
import logging.handlers import logging.handlers
import os import os
...@@ -42,8 +43,6 @@ import StringIO ...@@ -42,8 +43,6 @@ import StringIO
import threading import threading
# I think this is obvious enough to not require any documentation, but I might
# be wrong.
class EqueueServer(SocketServer.ThreadingUnixStreamServer): class EqueueServer(SocketServer.ThreadingUnixStreamServer):
daemon_threads = True daemon_threads = True
...@@ -57,7 +56,8 @@ class EqueueServer(SocketServer.ThreadingUnixStreamServer): ...@@ -57,7 +56,8 @@ class EqueueServer(SocketServer.ThreadingUnixStreamServer):
self.setLogger(self.options.logfile[0], self.options.loglevel[0]) self.setLogger(self.options.logfile[0], self.options.loglevel[0])
self.setDB(self.options.database[0]) self.setDB(self.options.database[0])
# Lock to only have one command running at the time # Lock to only have one command running at the time
self.lock = threading.Lock() self.thread_lock = threading.Lock()
self.lockfile = LockFile(self.options.lockfile)
def setLogger(self, logfile, loglevel): def setLogger(self, logfile, loglevel):
self.logger = logging.getLogger("EQueue") self.logger = logging.getLogger("EQueue")
...@@ -73,7 +73,7 @@ class EqueueServer(SocketServer.ThreadingUnixStreamServer): ...@@ -73,7 +73,7 @@ class EqueueServer(SocketServer.ThreadingUnixStreamServer):
self.db = gdbm.open(database, 'cs', 0700) self.db = gdbm.open(database, 'cs', 0700)
def _runCommandIfNeeded(self, command, timestamp): def _runCommandIfNeeded(self, command, timestamp):
with self.lock: with self.thread_lock as thread_lock, self.lockfile as lockfile:
cmd_list = command.split('\0') cmd_list = command.split('\0')
cmd_readable = ' '.join(cmd_list) cmd_readable = ' '.join(cmd_list)
cmd_executable = cmd_list[0] cmd_executable = cmd_list[0]
...@@ -151,6 +151,8 @@ def main(): ...@@ -151,6 +151,8 @@ def main():
help="Path to the log file.") help="Path to the log file.")
parser.add_argument('-t', '--timeout', nargs=1, required=False, parser.add_argument('-t', '--timeout', nargs=1, required=False,
dest='timeout', type=int, default=3) dest='timeout', type=int, default=3)
parser.add_argument('--lockfile',
help="Path to the lock file created when a command is run")
parser.add_argument('socket', help="Path to the unix socket") parser.add_argument('socket', help="Path to the unix socket")
args = parser.parse_args() args = parser.parse_args()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment