Commit 5c31b6b6 authored by Dylan Griffith's avatar Dylan Griffith

Log extra metadata sidekiq done for ElasticIndexBulkCronWorker

This introduces a generic worker capability
`ApplicationWorker#log_extra_metadata_on_done(key, value)` to add
metadata that will later be logged with the done job payload. When
called this method will add the key/value to a `Hash` which is then
pushed all the way up the middleware to the `StructuredLogger` and
amended to the done log statement.

This MR also uses this functionality in ElasticIndexBulkCronWorker.

This is an improved way of doing the same thing as was implemented in
https://gitlab.com/gitlab-org/gitlab/-/merge_requests/30265 which allows
easily correlating the duration of a job with the size of the job being
processed. This is better because sidekiq done logs are already broken
out in detail for DB time, CPU time, Redis time and so we'd prefer to
build on that existing behaviour when doing our analysis.
parent 495f985c
...@@ -11,6 +11,8 @@ module ApplicationWorker ...@@ -11,6 +11,8 @@ module ApplicationWorker
include WorkerAttributes include WorkerAttributes
include WorkerContext include WorkerContext
LOGGING_EXTRA_KEY = 'extra'
included do included do
set_queue set_queue
...@@ -24,6 +26,21 @@ module ApplicationWorker ...@@ -24,6 +26,21 @@ module ApplicationWorker
payload.stringify_keys.merge(context) payload.stringify_keys.merge(context)
end end
def log_extra_metadata_on_done(key, value)
@done_log_extra_metadata ||= {}
@done_log_extra_metadata[key] = value
end
def logging_extras
return {} unless @done_log_extra_metadata
# Prefix keys with class name to avoid conflicts in Elasticsearch types.
# Also prefix with "extra." so that we know to log these new fields.
@done_log_extra_metadata.transform_keys do |k|
"#{LOGGING_EXTRA_KEY}.#{self.class.name.gsub("::", "_").underscore}.#{k}"
end
end
end end
class_methods do class_methods do
......
...@@ -57,7 +57,7 @@ module Elastic ...@@ -57,7 +57,7 @@ module Elastic
start_time = Time.now start_time = Time.now
specs = redis.zrangebyscore(REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true) specs = redis.zrangebyscore(REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
return if specs.empty? return 0 if specs.empty?
first_score = specs.first.last first_score = specs.first.last
last_score = specs.last.last last_score = specs.last.last
...@@ -79,14 +79,18 @@ module Elastic ...@@ -79,14 +79,18 @@ module Elastic
# Remove all the successes # Remove all the successes
redis.zremrangebyscore(REDIS_SET_KEY, first_score, last_score) redis.zremrangebyscore(REDIS_SET_KEY, first_score, last_score)
records_count = specs.count
logger.info( logger.info(
message: 'bulk_indexing_end', message: 'bulk_indexing_end',
records_count: specs.count, records_count: records_count,
failures_count: failures.count, failures_count: failures.count,
first_score: first_score, first_score: first_score,
last_score: last_score, last_score: last_score,
bulk_execution_duration_s: Time.now - start_time bulk_execution_duration_s: Time.now - start_time
) )
records_count
end end
def deserialize_all(specs) def deserialize_all(specs)
......
...@@ -14,7 +14,8 @@ class ElasticIndexBulkCronWorker ...@@ -14,7 +14,8 @@ class ElasticIndexBulkCronWorker
def perform def perform
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
Elastic::ProcessBookkeepingService.new.execute records_count = Elastic::ProcessBookkeepingService.new.execute
log_extra_metadata_on_done(:records_count, records_count)
end end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here # We're scheduled on a cronjob, so nothing to do here
......
...@@ -93,6 +93,20 @@ describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_state do ...@@ -93,6 +93,20 @@ describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_state do
expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit) expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit)
end end
it 'returns the number of documents processed' do
described_class.track!(*fake_refs)
expect_processing(*fake_refs[0...limit])
expect(described_class.new.execute).to eq(limit)
end
it 'returns 0 without writing to the index when there are no documents' do
expect(::Gitlab::Elastic::BulkIndexer).not_to receive(:new)
expect(described_class.new.execute).to eq(0)
end
it 'retries failed documents' do it 'retries failed documents' do
described_class.track!(*fake_refs) described_class.track!(*fake_refs)
failed = fake_refs[0] failed = fake_refs[0]
......
...@@ -14,5 +14,18 @@ describe ElasticIndexBulkCronWorker do ...@@ -14,5 +14,18 @@ describe ElasticIndexBulkCronWorker do
described_class.new.perform described_class.new.perform
end end
it 'adds the elastic_bulk_count to the done log' do
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).and_return(15)
end
worker = described_class.new
worker.perform
expect(worker.logging_extras).to eq(
"#{ApplicationWorker::LOGGING_EXTRA_KEY}.elastic_index_bulk_cron_worker.records_count" => 15
)
end
end end
end end
...@@ -30,6 +30,12 @@ module Gitlab ...@@ -30,6 +30,12 @@ module Gitlab
output_payload.merge!(job.slice(*::Gitlab::InstrumentationHelper::KEYS)) output_payload.merge!(job.slice(*::Gitlab::InstrumentationHelper::KEYS))
end end
def add_logging_extras!(job, output_payload)
output_payload.merge!(
job.select { |key, _| key.to_s.start_with?("#{ApplicationWorker::LOGGING_EXTRA_KEY}.") }
)
end
def log_job_start(payload) def log_job_start(payload)
payload['message'] = "#{base_message(payload)}: start" payload['message'] = "#{base_message(payload)}: start"
payload['job_status'] = 'start' payload['job_status'] = 'start'
...@@ -43,6 +49,7 @@ module Gitlab ...@@ -43,6 +49,7 @@ module Gitlab
def log_job_done(job, started_time, payload, job_exception = nil) def log_job_done(job, started_time, payload, job_exception = nil)
payload = payload.dup payload = payload.dup
add_instrumentation_keys!(job, payload) add_instrumentation_keys!(job, payload)
add_logging_extras!(job, payload)
elapsed_time = elapsed(started_time) elapsed_time = elapsed(started_time)
add_time_keys!(elapsed_time, payload) add_time_keys!(elapsed_time, payload)
......
...@@ -14,6 +14,7 @@ module Gitlab ...@@ -14,6 +14,7 @@ module Gitlab
chain.add ::Gitlab::SidekiqMiddleware::ArgumentsLogger if arguments_logger chain.add ::Gitlab::SidekiqMiddleware::ArgumentsLogger if arguments_logger
chain.add ::Gitlab::SidekiqMiddleware::MemoryKiller if memory_killer chain.add ::Gitlab::SidekiqMiddleware::MemoryKiller if memory_killer
chain.add ::Gitlab::SidekiqMiddleware::RequestStoreMiddleware chain.add ::Gitlab::SidekiqMiddleware::RequestStoreMiddleware
chain.add ::Gitlab::SidekiqMiddleware::ExtraDoneLogMetadata
chain.add ::Gitlab::SidekiqMiddleware::BatchLoader chain.add ::Gitlab::SidekiqMiddleware::BatchLoader
chain.add ::Labkit::Middleware::Sidekiq::Server chain.add ::Labkit::Middleware::Sidekiq::Server
chain.add ::Gitlab::SidekiqMiddleware::InstrumentationLogger chain.add ::Gitlab::SidekiqMiddleware::InstrumentationLogger
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
class ExtraDoneLogMetadata
def call(worker, job, queue)
yield
# We needed a way to pass state from a worker in to the
# Gitlab::SidekiqLogging::StructuredLogger . Unfortunately the
# StructuredLogger itself is not a middleware so cannot access the
# worker object. We also tried to use SafeRequestStore but to pass the
# data up but that doesn't work either because this is reset in
# Gitlab::SidekiqMiddleware::RequestStoreMiddleware inside yield for
# the StructuredLogger so it's cleared before we get to logging the
# done statement. As such the only way to do this is to pass the data
# up in the `job` object. Since `job` is just a Hash we can add this
# extra metadata there.
if worker.respond_to?(:logging_extras)
job.merge!(worker.logging_extras)
end
end
end
end
end
...@@ -218,6 +218,27 @@ describe Gitlab::SidekiqLogging::StructuredLogger do ...@@ -218,6 +218,27 @@ describe Gitlab::SidekiqLogging::StructuredLogger do
subject.call(job, 'test_queue') { } subject.call(job, 'test_queue') { }
end end
end end
context 'when there is extra metadata set for the done log' do
let(:expected_start_payload) { start_payload.except('args') }
let(:expected_end_payload) do
end_payload.except('args').merge("#{ApplicationWorker::LOGGING_EXTRA_KEY}.key1" => 15, "#{ApplicationWorker::LOGGING_EXTRA_KEY}.key2" => 16)
end
it 'logs it in the done log' do
Timecop.freeze(timestamp) do
expect(logger).to receive(:info).with(expected_start_payload).ordered
expect(logger).to receive(:info).with(expected_end_payload).ordered
subject.call(job, 'test_queue') do
job["#{ApplicationWorker::LOGGING_EXTRA_KEY}.key1"] = 15
job["#{ApplicationWorker::LOGGING_EXTRA_KEY}.key2"] = 16
job['key that will be ignored because it does not start with extra.'] = 17
end
end
end
end
end end
describe '#add_time_keys!' do describe '#add_time_keys!' do
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::ExtraDoneLogMetadata do
# Cannot use Class.new for this as ApplicationWorker will need the class to
# have a name during `included do`.
let(:worker) { AdminEmailWorker.new }
let(:worker_without_application_worker) do
Class.new do
end.new
end
subject { described_class.new }
let(:job) { { 'jid' => 123 } }
let(:queue) { 'test_queue' }
describe '#call' do
it 'merges Application#logging_extras in to job' do
worker.log_extra_metadata_on_done(:key1, 15)
worker.log_extra_metadata_on_done(:key2, 16)
expect { |b| subject.call(worker, job, queue, &b) }.to yield_control
expect(job).to eq({ 'jid' => 123, 'extra.admin_email_worker.key1' => 15, 'extra.admin_email_worker.key2' => 16 })
end
it 'does not raise when the worker does not respond to #done_log_extra_metadata' do
expect { |b| subject.call(worker_without_application_worker, job, queue, &b) }.to yield_control
expect(job).to eq({ 'jid' => 123 })
end
end
end
...@@ -9,6 +9,7 @@ describe Gitlab::SidekiqMiddleware do ...@@ -9,6 +9,7 @@ describe Gitlab::SidekiqMiddleware do
TestWorker.class_eval do TestWorker.class_eval do
include Sidekiq::Worker include Sidekiq::Worker
include ApplicationWorker
def perform(_arg) def perform(_arg)
Gitlab::SafeRequestStore['gitaly_call_actual'] = 1 Gitlab::SafeRequestStore['gitaly_call_actual'] = 1
...@@ -55,6 +56,7 @@ describe Gitlab::SidekiqMiddleware do ...@@ -55,6 +56,7 @@ describe Gitlab::SidekiqMiddleware do
Gitlab::SidekiqMiddleware::ArgumentsLogger, Gitlab::SidekiqMiddleware::ArgumentsLogger,
Gitlab::SidekiqMiddleware::MemoryKiller, Gitlab::SidekiqMiddleware::MemoryKiller,
Gitlab::SidekiqMiddleware::RequestStoreMiddleware, Gitlab::SidekiqMiddleware::RequestStoreMiddleware,
Gitlab::SidekiqMiddleware::ExtraDoneLogMetadata,
Gitlab::SidekiqMiddleware::WorkerContext::Server, Gitlab::SidekiqMiddleware::WorkerContext::Server,
Gitlab::SidekiqMiddleware::AdminMode::Server, Gitlab::SidekiqMiddleware::AdminMode::Server,
Gitlab::SidekiqMiddleware::DuplicateJobs::Server Gitlab::SidekiqMiddleware::DuplicateJobs::Server
......
...@@ -21,6 +21,21 @@ describe ApplicationWorker do ...@@ -21,6 +21,21 @@ describe ApplicationWorker do
end end
end end
describe '#logging_extras' do
it 'returns extra data to be logged that was set from #log_extra_metadata_on_done' do
instance.log_extra_metadata_on_done(:key1, "value1")
instance.log_extra_metadata_on_done(:key2, "value2")
expect(instance.logging_extras).to eq({ 'extra.gitlab_foo_bar_dummy_worker.key1' => "value1", 'extra.gitlab_foo_bar_dummy_worker.key2' => "value2" })
end
context 'when nothing is set' do
it 'returns {}' do
expect(instance.logging_extras).to eq({})
end
end
end
describe '#structured_payload' do describe '#structured_payload' do
let(:payload) { {} } let(:payload) { {} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment