Commit 9a0c1f64 authored by Stan Hu's avatar Stan Hu

Merge branch 'sidekiq-interrupt-running-jobs' into 'master'

Allow to interrupt running sidekiq jobs

See merge request gitlab-org/gitlab-ce!31818
parents 5fb18d57 8d17c4da
......@@ -28,11 +28,13 @@ if Rails.env.development?
end
enable_json_logs = Gitlab.config.sidekiq.log_format == 'json'
enable_sidekiq_monitor = ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero?
Sidekiq.configure_server do |config|
config.redis = queues_config_hash
config.server_middleware do |chain|
chain.add Gitlab::SidekiqMiddleware::Monitor if enable_sidekiq_monitor
chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
......@@ -57,6 +59,8 @@ Sidekiq.configure_server do |config|
# Clear any connections that might have been obtained before starting
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
Gitlab::SidekiqMonitor.instance.start if enable_sidekiq_monitor
end
if enable_reliable_fetch?
......
......@@ -169,3 +169,121 @@ The PostgreSQL wiki has details on the query you can run to see blocking
queries. The query is different based on PostgreSQL version. See
[Lock Monitoring](https://wiki.postgresql.org/wiki/Lock_Monitoring) for
the query details.
## Managing Sidekiq queues
It is possible to use [Sidekiq API](https://github.com/mperham/sidekiq/wiki/API)
to perform a number of troubleshoting on Sidekiq.
These are the administrative commands and it should only be used if currently
admin interface is not suitable due to scale of installation.
All this commands should be run using `gitlab-rails console`.
### View the queue size
```ruby
Sidekiq::Queue.new("pipeline_processing:build_queue").size
```
### Enumerate all enqueued jobs
```ruby
queue = Sidekiq::Queue.new("chaos:chaos_sleep")
queue.each do |job|
# job.klass # => 'MyWorker'
# job.args # => [1, 2, 3]
# job.jid # => jid
# job.queue # => chaos:chaos_sleep
# job["retry"] # => 3
# job.item # => {
# "class"=>"Chaos::SleepWorker",
# "args"=>[1000],
# "retry"=>3,
# "queue"=>"chaos:chaos_sleep",
# "backtrace"=>true,
# "queue_namespace"=>"chaos",
# "jid"=>"39bc482b823cceaf07213523",
# "created_at"=>1566317076.266069,
# "correlation_id"=>"c323b832-a857-4858-b695-672de6f0e1af",
# "enqueued_at"=>1566317076.26761},
# }
# job.delete if job.jid == 'abcdef1234567890'
end
```
### Enumerate currently running jobs
```ruby
workers = Sidekiq::Workers.new
workers.each do |process_id, thread_id, work|
# process_id is a unique identifier per Sidekiq process
# thread_id is a unique identifier per thread
# work is a Hash which looks like:
# {"queue"=>"chaos:chaos_sleep",
# "payload"=>
# { "class"=>"Chaos::SleepWorker",
# "args"=>[1000],
# "retry"=>3,
# "queue"=>"chaos:chaos_sleep",
# "backtrace"=>true,
# "queue_namespace"=>"chaos",
# "jid"=>"b2a31e3eac7b1a99ff235869",
# "created_at"=>1566316974.9215662,
# "correlation_id"=>"e484fb26-7576-45f9-bf21-b99389e1c53c",
# "enqueued_at"=>1566316974.9229589},
# "run_at"=>1566316974}],
end
```
### Remove sidekiq jobs for given parameters (destructive)
```ruby
# for jobs like this:
# RepositoryImportWorker.new.perform_async(100)
id_list = [100]
queue = Sidekiq::Queue.new('repository_import')
queue.each do |job|
job.delete if id_list.include?(job.args[0])
end
```
### Remove specific job ID (destructive)
```ruby
queue = Sidekiq::Queue.new('repository_import')
queue.each do |job|
job.delete if job.jid == 'my-job-id'
end
```
## Canceling running jobs (destructive)
> Introduced in GitLab 12.3.
This is highly risky operation and use it as last resort.
Doing that might result in data corruption, as the job
is interrupted mid-execution and it is not guaranteed
that proper rollback of transactions is implemented.
```ruby
Gitlab::SidekiqMonitor.cancel_job('job-id')
```
> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1`
> environment variable.
To perform of the interrupt we use `Thread.raise` which
has number of drawbacks, as mentioned in [Why Ruby’s Timeout is dangerous (and Thread.raise is terrifying)](https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/):
> This is where the implications get interesting, and terrifying. This means that an exception can get raised:
>
> * during a network request (ok, as long as the surrounding code is prepared to catch Timeout::Error)
> * during the cleanup for the network request
> * during a rescue block
> * while creating an object to save to the database afterwards
> * in any of your code, regardless of whether it could have possibly raised an exception before
>
> Nobody writes code to defend against an exception being raised on literally any line. That’s not even possible. So Thread.raise is basically like a sneak attack on your code that could result in almost anything. It would probably be okay if it were pure-functional code that did not modify any state. But this is Ruby, so that’s unlikely :)
......@@ -46,7 +46,10 @@ module Gitlab
if thread
thread.wakeup if thread.alive?
thread.join unless Thread.current == thread
begin
thread.join unless Thread.current == thread
rescue Exception # rubocop:disable Lint/RescueException
end
@thread = nil
end
end
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
class Monitor
def call(worker, job, queue)
Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do
yield
end
rescue Gitlab::SidekiqMonitor::CancelledError
# ignore retries
raise Sidekiq::JobRetry::Skip
end
end
end
end
# frozen_string_literal: true
module Gitlab
class SidekiqMonitor < Daemon
include ::Gitlab::Utils::StrongMemoize
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
CANCEL_DEADLINE = 24.hours.seconds
RECONNECT_TIME = 3.seconds
# We use exception derived from `Exception`
# to consider this as an very low-level exception
# that should not be caught by application
CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
attr_reader :jobs_thread
attr_reader :jobs_mutex
def initialize
super
@jobs_thread = {}
@jobs_mutex = Mutex.new
end
def within_job(jid, queue)
jobs_mutex.synchronize do
jobs_thread[jid] = Thread.current
end
if cancelled?(jid)
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'run',
queue: queue,
jid: jid,
canceled: true
)
raise CancelledError
end
yield
ensure
jobs_mutex.synchronize do
jobs_thread.delete(jid)
end
end
def self.cancel_job(jid)
payload = {
action: 'cancel',
jid: jid
}.to_json
::Gitlab::Redis::SharedState.with do |redis|
redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
redis.publish(NOTIFICATION_CHANNEL, payload)
end
end
private
def start_working
Sidekiq.logger.info(
class: self.class.to_s,
action: 'start',
message: 'Starting Monitor Daemon'
)
while enabled?
process_messages
sleep(RECONNECT_TIME)
end
ensure
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'stop',
message: 'Stopping Monitor Daemon'
)
end
def stop_working
thread.raise(Interrupt) if thread.alive?
end
def process_messages
::Gitlab::Redis::SharedState.with do |redis|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
on.message do |channel, message|
process_message(message)
end
end
end
rescue Exception => e # rubocop:disable Lint/RescueException
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'exception',
message: e.message
)
# we re-raise system exceptions
raise e unless e.is_a?(StandardError)
end
def process_message(message)
Sidekiq.logger.info(
class: self.class.to_s,
channel: NOTIFICATION_CHANNEL,
message: 'Received payload on channel',
payload: message
)
message = safe_parse(message)
return unless message
case message['action']
when 'cancel'
process_job_cancel(message['jid'])
else
# unknown message
end
end
def safe_parse(message)
JSON.parse(message)
rescue JSON::ParserError
end
def process_job_cancel(jid)
return unless jid
# try to find thread without lock
return unless find_thread_unsafe(jid)
Thread.new do
# try to find a thread, but with guaranteed
# that handle for thread corresponds to actually
# running job
find_thread_with_lock(jid) do |thread|
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'cancel',
message: 'Canceling thread with CancelledError',
jid: jid,
thread_id: thread.object_id
)
thread&.raise(CancelledError)
end
end
end
# This method needs to be thread-safe
# This is why it passes thread in block,
# to ensure that we do process this thread
def find_thread_unsafe(jid)
jobs_thread[jid]
end
def find_thread_with_lock(jid)
# don't try to lock if we cannot find the thread
return unless find_thread_unsafe(jid)
jobs_mutex.synchronize do
find_thread_unsafe(jid).tap do |thread|
yield(thread) if thread
end
end
end
def cancelled?(jid)
::Gitlab::Redis::SharedState.with do |redis|
redis.exists(self.class.cancel_job_key(jid))
end
end
def self.cancel_job_key(jid)
"sidekiq:cancel:#{jid}"
end
end
end
......@@ -34,12 +34,12 @@ describe Gitlab::Daemon do
end
end
describe 'when Daemon is enabled' do
context 'when Daemon is enabled' do
before do
allow(subject).to receive(:enabled?).and_return(true)
end
describe 'when Daemon is stopped' do
context 'when Daemon is stopped' do
describe '#start' do
it 'starts the Daemon' do
expect { subject.start.join }.to change { subject.thread? }.from(false).to(true)
......@@ -57,14 +57,14 @@ describe Gitlab::Daemon do
end
end
describe 'when Daemon is running' do
context 'when Daemon is running' do
before do
subject.start.join
subject.start
end
describe '#start' do
it "doesn't start running Daemon" do
expect { subject.start.join }.not_to change { subject.thread? }
expect { subject.start.join }.not_to change { subject.thread }
expect(subject).to have_received(:start_working).once
end
......@@ -76,11 +76,29 @@ describe Gitlab::Daemon do
expect(subject).to have_received(:stop_working)
end
context 'when stop_working raises exception' do
before do
allow(subject).to receive(:start_working) do
sleep(1000)
end
end
it 'shutdowns Daemon' do
expect(subject).to receive(:stop_working) do
subject.thread.raise(Interrupt)
end
expect(subject.thread).to be_alive
expect { subject.stop }.not_to raise_error
expect(subject.thread).to be_nil
end
end
end
end
end
describe 'when Daemon is disabled' do
context 'when Daemon is disabled' do
before do
allow(subject).to receive(:enabled?).and_return(false)
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::Monitor do
let(:monitor) { described_class.new }
describe '#call' do
let(:worker) { double }
let(:job) { { 'jid' => 'job-id' } }
let(:queue) { 'my-queue' }
it 'calls SidekiqMonitor' do
expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job)
.with('job-id', 'my-queue')
.and_call_original
expect { |blk| monitor.call(worker, job, queue, &blk) }.to yield_control
end
it 'passthroughs the return value' do
result = monitor.call(worker, job, queue) do
'value'
end
expect(result).to eq('value')
end
context 'when cancel happens' do
subject do
monitor.call(worker, job, queue) do
raise Gitlab::SidekiqMonitor::CancelledError
end
end
it 'does skip this job' do
expect { subject }.to raise_error(Sidekiq::JobRetry::Skip)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMonitor do
let(:monitor) { described_class.new }
describe '#within_job' do
it 'tracks thread' do
blk = proc do
expect(monitor.jobs_thread['jid']).not_to be_nil
"OK"
end
expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK")
end
context 'when job is canceled' do
let(:jid) { SecureRandom.hex }
before do
described_class.cancel_job(jid)
end
it 'does not execute a block' do
expect do |blk|
monitor.within_job(jid, 'queue', &blk)
rescue described_class::CancelledError
end.not_to yield_control
end
it 'raises exception' do
expect { monitor.within_job(jid, 'queue') }.to raise_error(
described_class::CancelledError)
end
end
end
describe '#start_working' do
subject { monitor.send(:start_working) }
before do
# we want to run at most once cycle
# we toggle `enabled?` flag after the first call
stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0)
allow(monitor).to receive(:enabled?).and_return(true, false)
allow(Sidekiq.logger).to receive(:info)
allow(Sidekiq.logger).to receive(:warn)
end
context 'when structured logging is used' do
it 'logs start message' do
expect(Sidekiq.logger).to receive(:info)
.with(
class: described_class.to_s,
action: 'start',
message: 'Starting Monitor Daemon')
expect(::Gitlab::Redis::SharedState).to receive(:with)
subject
end
it 'logs stop message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class.to_s,
action: 'stop',
message: 'Stopping Monitor Daemon')
expect(::Gitlab::Redis::SharedState).to receive(:with)
subject
end
it 'logs StandardError message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class.to_s,
action: 'exception',
message: 'My Exception')
expect(::Gitlab::Redis::SharedState).to receive(:with)
.and_raise(StandardError, 'My Exception')
expect { subject }.not_to raise_error
end
it 'logs and raises Exception message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class.to_s,
action: 'exception',
message: 'My Exception')
expect(::Gitlab::Redis::SharedState).to receive(:with)
.and_raise(Exception, 'My Exception')
expect { subject }.to raise_error(Exception, 'My Exception')
end
end
context 'when StandardError is raised' do
it 'does retry connection' do
expect(::Gitlab::Redis::SharedState).to receive(:with)
.and_raise(StandardError, 'My Exception')
expect(::Gitlab::Redis::SharedState).to receive(:with)
# we expect to run `process_messages` twice
expect(monitor).to receive(:enabled?).and_return(true, true, false)
subject
end
end
context 'when message is published' do
let(:subscribed) { double }
before do
expect_any_instance_of(::Redis).to receive(:subscribe)
.and_yield(subscribed)
expect(subscribed).to receive(:message)
.and_yield(
described_class::NOTIFICATION_CHANNEL,
payload
)
expect(Sidekiq.logger).to receive(:info)
.with(
class: described_class.to_s,
action: 'start',
message: 'Starting Monitor Daemon')
expect(Sidekiq.logger).to receive(:info)
.with(
class: described_class.to_s,
channel: described_class::NOTIFICATION_CHANNEL,
message: 'Received payload on channel',
payload: payload
)
end
context 'and message is valid' do
let(:payload) { '{"action":"cancel","jid":"my-jid"}' }
it 'processes cancel' do
expect(monitor).to receive(:process_job_cancel).with('my-jid')
subject
end
end
context 'and message is not valid json' do
let(:payload) { '{"action"}' }
it 'skips processing' do
expect(monitor).not_to receive(:process_job_cancel)
subject
end
end
end
end
describe '#stop' do
let!(:monitor_thread) { monitor.start }
it 'does stop the thread' do
expect(monitor_thread).to be_alive
expect { monitor.stop }.not_to raise_error
expect(monitor_thread).not_to be_alive
expect { monitor_thread.value }.to raise_error(Interrupt)
end
end
describe '#process_job_cancel' do
subject { monitor.send(:process_job_cancel, jid) }
context 'when jid is missing' do
let(:jid) { nil }
it 'does not run thread' do
expect(subject).to be_nil
end
end
context 'when jid is provided' do
let(:jid) { 'my-jid' }
context 'when jid is not found' do
it 'does not log cancellation message' do
expect(Sidekiq.logger).not_to receive(:warn)
expect(subject).to be_nil
end
end
context 'when jid is found' do
let(:thread) { Thread.new { sleep 1000 } }
before do
monitor.jobs_thread[jid] = thread
end
after do
thread.kill
rescue
end
it 'does log cancellation message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class.to_s,
action: 'cancel',
message: 'Canceling thread with CancelledError',
jid: 'my-jid',
thread_id: thread.object_id)
expect(subject).to be_a(Thread)
subject.join
end
it 'does cancel the thread' do
expect(subject).to be_a(Thread)
subject.join
# we wait for the thread to be cancelled
# by `process_job_cancel`
expect { thread.join(5) }.to raise_error(described_class::CancelledError)
end
end
end
end
describe '.cancel_job' do
subject { described_class.cancel_job('my-jid') }
it 'sets a redis key' do
expect_any_instance_of(::Redis).to receive(:setex)
.with('sidekiq:cancel:my-jid', anything, 1)
subject
end
it 'notifies all workers' do
payload = '{"action":"cancel","jid":"my-jid"}'
expect_any_instance_of(::Redis).to receive(:publish)
.with('sidekiq:cancel:notifications', payload)
subject
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment