Commit dfa80312 authored by Oswaldo Ferreira's avatar Oswaldo Ferreira

Add instrumentation to Gitaly streamed responses

This is a stab into fixing the Gitaly timing in logs (gitaly_duration_s)
for streamed responses using the same GitalyClient.call method.

The problem of having a GitalyClient.call for non-streamed responses
and GitalyClient.streaming_call (with a block) for streamed responses
is that we'll need to rely mostly on documentation in order to
get the timings right for new RPCs.

In order to solve that, here we look further into the Gitaly response.
If it's an Enumerator (that's what the Ruby implementation of gRPC
streams return from the server https://grpc.io/docs/languages/ruby/basics/),
we wrap that Enumerator into a custom enumerator, which instruments
that stream consumption.

Another advantage of that over wrapping the whole stream consumption
into a block is that we won't add too much of Ruby CPU time at it,
just the response.next call is measured, which is the point of
contact with Gitaly.
parent 870ed3d9
---
title: Add instrumentation to Gitaly streamed responses
merge_request: 35283
author:
type: fixed
......@@ -166,20 +166,7 @@ module Gitlab
# "gitaly-2 is at network address tcp://10.0.1.2:8075".
#
def self.call(storage, service, rpc, request, remote_storage: nil, timeout: default_timeout, &block)
self.measure_timings(service, rpc, request) do
self.execute(storage, service, rpc, request, remote_storage: remote_storage, timeout: timeout, &block)
end
end
# This method is like GitalyClient.call but should be used with
# Gitaly streaming RPCs. It measures how long the the RPC took to
# produce the full response, not just the initial response.
def self.streaming_call(storage, service, rpc, request, remote_storage: nil, timeout: default_timeout)
self.measure_timings(service, rpc, request) do
response = self.execute(storage, service, rpc, request, remote_storage: remote_storage, timeout: timeout)
yield(response)
end
Gitlab::GitalyClient::Call.new(storage, service, rpc, request, remote_storage, timeout).call(&block)
end
def self.execute(storage, service, rpc, request, remote_storage:, timeout:)
......@@ -192,23 +179,6 @@ module Gitlab
stub(service, storage).__send__(rpc, request, kwargs) # rubocop:disable GitlabSecurity/PublicSend
end
def self.measure_timings(service, rpc, request)
start = Gitlab::Metrics::System.monotonic_time
yield
ensure
duration = Gitlab::Metrics::System.monotonic_time - start
request_hash = request.is_a?(Google::Protobuf::MessageExts) ? request.to_h : {}
# Keep track, separately, for the performance bar
self.add_query_time(duration)
if Gitlab::PerformanceBar.enabled_for_request?
add_call_details(feature: "#{service}##{rpc}", duration: duration, request: request_hash, rpc: rpc,
backtrace: Gitlab::BacktraceCleaner.clean_backtrace(caller))
end
end
def self.query_time
query_time = Gitlab::SafeRequestStore[:gitaly_query_time] || 0
query_time.round(Gitlab::InstrumentationHelper::DURATION_PRECISION)
......
......@@ -15,9 +15,8 @@ module Gitlab
oid: oid,
limit: limit
)
GitalyClient.streaming_call(@gitaly_repo.storage_name, :blob_service, :get_blob, request, timeout: GitalyClient.fast_timeout) do |response|
consume_blob_response(response)
end
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_blob, request, timeout: GitalyClient.fast_timeout)
consume_blob_response(response)
end
def batch_lfs_pointers(blob_ids)
......@@ -28,9 +27,8 @@ module Gitlab
blob_ids: blob_ids
)
GitalyClient.streaming_call(@gitaly_repo.storage_name, :blob_service, :get_lfs_pointers, request, timeout: GitalyClient.medium_timeout) do |response|
map_lfs_pointers(response)
end
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
map_lfs_pointers(response)
end
def get_blobs(revision_paths, limit = -1)
......@@ -46,15 +44,14 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(
response = GitalyClient.call(
@gitaly_repo.storage_name,
:blob_service,
:get_blobs,
request,
timeout: GitalyClient.fast_timeout
) do |response|
GitalyClient::BlobsStitcher.new(response)
end
timeout: GitalyClient.fast_timeout)
GitalyClient::BlobsStitcher.new(response)
end
def get_blob_types(revision_paths, limit = -1)
......@@ -70,15 +67,14 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(
response = GitalyClient.call(
@gitaly_repo.storage_name,
:blob_service,
:get_blobs,
request,
timeout: GitalyClient.fast_timeout
) do |response|
map_blob_types(response)
end
)
map_blob_types(response)
end
def get_new_lfs_pointers(revision, limit, not_in, dynamic_timeout = nil)
......@@ -101,15 +97,14 @@ module Gitlab
GitalyClient.medium_timeout
end
GitalyClient.streaming_call(
response = GitalyClient.call(
@gitaly_repo.storage_name,
:blob_service,
:get_new_lfs_pointers,
request,
timeout: timeout
) do |response|
map_lfs_pointers(response)
end
)
map_lfs_pointers(response)
end
def get_all_lfs_pointers
......@@ -117,9 +112,8 @@ module Gitlab
repository: @gitaly_repo
)
GitalyClient.streaming_call(@gitaly_repo.storage_name, :blob_service, :get_all_lfs_pointers, request, timeout: GitalyClient.medium_timeout) do |response|
map_lfs_pointers(response)
end
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_all_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
map_lfs_pointers(response)
end
private
......
# frozen_string_literal: true
module Gitlab
module GitalyClient
class Call
def initialize(storage, service, rpc, request, remote_storage, timeout)
@storage = storage
@service = service
@rpc = rpc
@request = request
@remote_storage = remote_storage
@timeout = timeout
@duration = 0
end
def call(&block)
response = recording_request do
GitalyClient.execute(@storage, @service, @rpc, @request, remote_storage: @remote_storage, timeout: @timeout, &block)
end
if response.is_a?(Enumerator)
# When the given response is an enumerator (coming from streamed
# responses), we wrap it in order to properly measure the stream
# consumption as it happens.
#
# store_timings is not called in that scenario as needs to be
# handled lazily in the custom Enumerator context.
instrument_stream(response)
else
store_timings
response
end
rescue => err
store_timings
raise err
end
private
def instrument_stream(response)
Enumerator.new do |yielder|
loop do
value = recording_request { response.next }
yielder.yield(value)
end
ensure
store_timings
end
end
def recording_request
start = Gitlab::Metrics::System.monotonic_time
yield
ensure
@duration += Gitlab::Metrics::System.monotonic_time - start
end
def store_timings
GitalyClient.add_query_time(@duration)
return unless Gitlab::PerformanceBar.enabled_for_request?
request_hash = @request.is_a?(Google::Protobuf::MessageExts) ? @request.to_h : {}
GitalyClient.add_call_details(feature: "#{@service}##{@rpc}", duration: @duration, request: request_hash, rpc: @rpc,
backtrace: Gitlab::BacktraceCleaner.clean_backtrace(caller))
end
end
end
end
......@@ -13,15 +13,14 @@ module Gitlab
end
def apply_bfg_object_map_stream(io, &blk)
GitalyClient.streaming_call(
response = GitalyClient.call(
storage,
:cleanup_service,
:apply_bfg_object_map_stream,
build_object_map_enum(io),
timeout: GitalyClient.long_timeout
) do |response|
response.each(&blk)
end
)
response.each(&blk)
end
private
......
......@@ -72,9 +72,8 @@ module Gitlab
def commit_deltas(commit)
request = Gitaly::CommitDeltaRequest.new(diff_from_parent_request_params(commit))
GitalyClient.streaming_call(@repository.storage, :diff_service, :commit_delta, request, timeout: GitalyClient.fast_timeout) do |response|
response.flat_map { |msg| msg.deltas }
end
response = GitalyClient.call(@repository.storage, :diff_service, :commit_delta, request, timeout: GitalyClient.fast_timeout)
response.flat_map { |msg| msg.deltas }
end
def tree_entry(ref, path, limit = nil)
......@@ -202,9 +201,8 @@ module Gitlab
to: to
)
GitalyClient.streaming_call(@repository.storage, :commit_service, :commits_between, request, timeout: GitalyClient.medium_timeout) do |response|
consume_commits_response(response)
end
response = GitalyClient.call(@repository.storage, :commit_service, :commits_between, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
def diff_stats(left_commit_sha, right_commit_sha)
......@@ -214,9 +212,8 @@ module Gitlab
right_commit_id: right_commit_sha
)
GitalyClient.streaming_call(@repository.storage, :diff_service, :diff_stats, request, timeout: GitalyClient.medium_timeout) do |response|
response.flat_map(&:stats)
end
response = GitalyClient.call(@repository.storage, :diff_service, :diff_stats, request, timeout: GitalyClient.medium_timeout)
response.flat_map(&:stats)
end
def find_all_commits(opts = {})
......@@ -228,9 +225,8 @@ module Gitlab
)
request.order = opts[:order].upcase if opts[:order].present?
GitalyClient.streaming_call(@repository.storage, :commit_service, :find_all_commits, request, timeout: GitalyClient.medium_timeout) do |response|
consume_commits_response(response)
end
response = GitalyClient.call(@repository.storage, :commit_service, :find_all_commits, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
def list_commits_by_oid(oids)
......@@ -238,9 +234,8 @@ module Gitlab
request = Gitaly::ListCommitsByOidRequest.new(repository: @gitaly_repo, oid: oids)
GitalyClient.streaming_call(@repository.storage, :commit_service, :list_commits_by_oid, request, timeout: GitalyClient.medium_timeout) do |response|
consume_commits_response(response)
end
response = GitalyClient.call(@repository.storage, :commit_service, :list_commits_by_oid, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
rescue GRPC::NotFound # If no repository is found, happens mainly during testing
[]
end
......@@ -256,9 +251,8 @@ module Gitlab
global_options: parse_global_options!(literal_pathspec: literal_pathspec)
)
GitalyClient.streaming_call(@repository.storage, :commit_service, :commits_by_message, request, timeout: GitalyClient.medium_timeout) do |response|
consume_commits_response(response)
end
response = GitalyClient.call(@repository.storage, :commit_service, :commits_by_message, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
def languages(ref = nil)
......@@ -334,9 +328,8 @@ module Gitlab
request.paths = encode_repeated(Array(options[:path])) if options[:path].present?
GitalyClient.streaming_call(@repository.storage, :commit_service, :find_commits, request, timeout: GitalyClient.medium_timeout) do |response|
consume_commits_response(response)
end
response = GitalyClient.call(@repository.storage, :commit_service, :find_commits, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
def filter_shas_with_signatures(shas)
......@@ -352,10 +345,9 @@ module Gitlab
end
end
GitalyClient.streaming_call(@repository.storage, :commit_service, :filter_shas_with_signatures, enum, timeout: GitalyClient.fast_timeout) do |response|
response.flat_map do |msg|
msg.shas.map { |sha| EncodingHelper.encode!(sha) }
end
response = GitalyClient.call(@repository.storage, :commit_service, :filter_shas_with_signatures, enum, timeout: GitalyClient.fast_timeout)
response.flat_map do |msg|
msg.shas.map { |sha| EncodingHelper.encode!(sha) }
end
end
......@@ -423,9 +415,8 @@ module Gitlab
request_params.merge!(Gitlab::Git::DiffCollection.limits(options).to_h)
request = Gitaly::CommitDiffRequest.new(request_params)
GitalyClient.streaming_call(@repository.storage, :diff_service, :commit_diff, request, timeout: GitalyClient.medium_timeout) do |response|
GitalyClient::DiffStitcher.new(response)
end
response = GitalyClient.call(@repository.storage, :diff_service, :commit_diff, request, timeout: GitalyClient.medium_timeout)
GitalyClient::DiffStitcher.new(response)
end
def diff_from_parent_request_params(commit, options = {})
......
......@@ -20,9 +20,8 @@ module Gitlab
our_commit_oid: @our_commit_oid,
their_commit_oid: @their_commit_oid
)
GitalyClient.streaming_call(@repository.storage, :conflicts_service, :list_conflict_files, request, timeout: GitalyClient.long_timeout) do |response|
GitalyClient::ConflictFilesStitcher.new(response, @gitaly_repo)
end
response = GitalyClient.call(@repository.storage, :conflicts_service, :list_conflict_files, request, timeout: GitalyClient.long_timeout)
GitalyClient::ConflictFilesStitcher.new(response, @gitaly_repo)
end
def conflicts?
......
......@@ -14,16 +14,14 @@ module Gitlab
def branches
request = Gitaly::FindAllBranchesRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout) do |response|
consume_find_all_branches_response(response)
end
response = GitalyClient.call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_all_branches_response(response)
end
def remote_branches(remote_name)
request = Gitaly::FindAllRemoteBranchesRequest.new(repository: @gitaly_repo, remote_name: remote_name)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_remote_branches, request, timeout: GitalyClient.medium_timeout) do |response|
consume_find_all_remote_branches_response(remote_name, response)
end
response = GitalyClient.call(@storage, :ref_service, :find_all_remote_branches, request, timeout: GitalyClient.medium_timeout)
consume_find_all_remote_branches_response(remote_name, response)
end
def merged_branches(branch_names = [])
......@@ -32,9 +30,8 @@ module Gitlab
merged_only: true,
merged_branches: branch_names.map { |s| encode_binary(s) }
)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout) do |response|
consume_find_all_branches_response(response)
end
response = GitalyClient.call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_all_branches_response(response)
end
def default_branch_name
......@@ -45,16 +42,14 @@ module Gitlab
def branch_names
request = Gitaly::FindAllBranchNamesRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_branch_names, request, timeout: GitalyClient.fast_timeout) do |response|
consume_refs_response(response) { |name| Gitlab::Git.branch_name(name) }
end
response = GitalyClient.call(@storage, :ref_service, :find_all_branch_names, request, timeout: GitalyClient.fast_timeout)
consume_refs_response(response) { |name| Gitlab::Git.branch_name(name) }
end
def tag_names
request = Gitaly::FindAllTagNamesRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_tag_names, request, timeout: GitalyClient.fast_timeout) do |response|
consume_refs_response(response) { |name| Gitlab::Git.tag_name(name) }
end
response = GitalyClient.call(@storage, :ref_service, :find_all_tag_names, request, timeout: GitalyClient.fast_timeout)
consume_refs_response(response) { |name| Gitlab::Git.tag_name(name) }
end
def find_ref_name(commit_id, ref_prefix)
......@@ -75,11 +70,10 @@ module Gitlab
commits = []
GitalyClient.streaming_call(@storage, :ref_service, :list_new_commits, request, timeout: GitalyClient.medium_timeout) do |response|
response.each do |msg|
msg.commits.each do |c|
commits << Gitlab::Git::Commit.new(@repository, c)
end
response = GitalyClient.call(@storage, :ref_service, :list_new_commits, request, timeout: GitalyClient.medium_timeout)
response.each do |msg|
msg.commits.each do |c|
commits << Gitlab::Git::Commit.new(@repository, c)
end
end
......@@ -100,12 +94,11 @@ module Gitlab
GitalyClient.medium_timeout
end
GitalyClient.streaming_call(@storage, :ref_service, :list_new_blobs, request, timeout: timeout) do |response|
response.flat_map do |msg|
# Returns an Array of Gitaly::NewBlobObject objects
# Available methods are: #size, #oid and #path
msg.new_blob_objects
end
response = GitalyClient.call(@storage, :ref_service, :list_new_blobs, request, timeout: timeout)
response.flat_map do |msg|
# Returns an Array of Gitaly::NewBlobObject objects
# Available methods are: #size, #oid and #path
msg.new_blob_objects
end
end
......@@ -120,16 +113,14 @@ module Gitlab
def local_branches(sort_by: nil)
request = Gitaly::FindLocalBranchesRequest.new(repository: @gitaly_repo)
request.sort_by = sort_by_param(sort_by) if sort_by
GitalyClient.streaming_call(@storage, :ref_service, :find_local_branches, request, timeout: GitalyClient.fast_timeout) do |response|
consume_find_local_branches_response(response)
end
response = GitalyClient.call(@storage, :ref_service, :find_local_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_local_branches_response(response)
end
def tags
request = Gitaly::FindAllTagsRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_tags, request, timeout: GitalyClient.medium_timeout) do |response|
consume_tags_response(response)
end
response = GitalyClient.call(@storage, :ref_service, :find_all_tags, request, timeout: GitalyClient.medium_timeout)
consume_tags_response(response)
end
def ref_exists?(ref_name)
......@@ -174,9 +165,8 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(@storage, :ref_service, :list_tag_names_containing_commit, request, timeout: GitalyClient.medium_timeout) do |response|
consume_ref_contains_sha_response(response, :tag_names)
end
response = GitalyClient.call(@storage, :ref_service, :list_tag_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
consume_ref_contains_sha_response(response, :tag_names)
end
# Limit: 0 implies no limit, thus all tag names will be returned
......@@ -187,9 +177,8 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(@storage, :ref_service, :list_branch_names_containing_commit, request, timeout: GitalyClient.medium_timeout) do |response|
consume_ref_contains_sha_response(response, :branch_names)
end
response = GitalyClient.call(@storage, :ref_service, :list_branch_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
consume_ref_contains_sha_response(response, :branch_names)
end
def get_tag_messages(tag_ids)
......@@ -197,12 +186,11 @@ module Gitlab
messages = Hash.new { |h, k| h[k] = +''.b }
current_tag_id = nil
GitalyClient.streaming_call(@storage, :ref_service, :get_tag_messages, request, timeout: GitalyClient.fast_timeout) do |response|
response.each do |rpc_message|
current_tag_id = rpc_message.tag_id if rpc_message.tag_id.present?
response = GitalyClient.call(@storage, :ref_service, :get_tag_messages, request, timeout: GitalyClient.fast_timeout)
response.each do |rpc_message|
current_tag_id = rpc_message.tag_id if rpc_message.tag_id.present?
messages[current_tag_id] << rpc_message.message
end
messages[current_tag_id] << rpc_message.message
end
messages
......
......@@ -334,9 +334,8 @@ module Gitlab
def search_files_by_content(ref, query, options = {})
request = Gitaly::SearchFilesByContentRequest.new(repository: @gitaly_repo, ref: ref, query: query)
GitalyClient.streaming_call(@storage, :repository_service, :search_files_by_content, request, timeout: GitalyClient.default_timeout) do |response|
search_results_from_response(response, options)
end
response = GitalyClient.call(@storage, :repository_service, :search_files_by_content, request, timeout: GitalyClient.default_timeout)
search_results_from_response(response, options)
end
def disconnect_alternates
......@@ -403,15 +402,14 @@ module Gitlab
def gitaly_fetch_stream_to_file(save_path, rpc_name, request_class, timeout)
request = request_class.new(repository: @gitaly_repo)
GitalyClient.streaming_call(
response = GitalyClient.call(
@storage,
:repository_service,
rpc_name,
request,
timeout: timeout
) do |response|
write_stream_to_file(response, save_path)
end
)
write_stream_to_file(response, save_path)
end
def write_stream_to_file(response, save_path)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::GitalyClient::Call do
describe '#call', :request_store do
let(:client) { Gitlab::GitalyClient }
let(:storage) { 'default' }
let(:remote_storage) { nil }
let(:request) { Gitaly::FindLocalBranchesRequest.new }
let(:rpc) { :find_local_branches }
let(:service) { :ref_service }
let(:timeout) { client.long_timeout }
subject do
described_class.new(storage, service, rpc, request, remote_storage, timeout).call
end
before do
allow(client).to receive(:execute) { response }
allow(Gitlab::PerformanceBar).to receive(:enabled_for_request?) { true }
end
def expect_call_details_to_match(duration_higher_than: 0)
expect(client.list_call_details.size).to eq(1)
expect(client.list_call_details.first)
.to match a_hash_including(feature: "#{service}##{rpc}",
duration: a_value > duration_higher_than,
request: an_instance_of(Hash),
rpc: rpc,
backtrace: an_instance_of(Array))
end
context 'when the response is not an enumerator' do
let(:response) do
Gitaly::FindLocalBranchesResponse.new
end
it 'returns the response' do
expect(subject).to eq(response)
end
it 'stores timings and call details' do
subject
expect(client.query_time).to be > 0
expect_call_details_to_match
end
context 'when err' do
before do
allow(client).to receive(:execute).and_raise(StandardError)
end
it 'stores timings and call details' do
expect { subject }.to raise_error(StandardError)
expect(client.query_time).to be > 0
expect_call_details_to_match
end
end
end
context 'when the response is an enumerator' do
let(:response) do
Enumerator.new do |yielder|
yielder << 1
yielder << 2
end
end
it 'returns a consumable enumerator' do
instrumented_response = subject
expect(instrumented_response).to be_a(Enumerator)
expect(instrumented_response.to_a).to eq([1, 2])
end
context 'time measurements' do
let(:response) do
Enumerator.new do |yielder|
sleep 0.1
yielder << 1
sleep 0.2
yielder << 2
end
end
it 'records full rpc stream consumption' do
subject.to_a
expect(client.query_time).to be > 0.3
expect_call_details_to_match(duration_higher_than: 0.3)
end
it 'records partial rpc stream consumption' do
subject.first
expect(client.query_time).to be > 0.1
expect_call_details_to_match(duration_higher_than: 0.1)
end
context 'when err' do
let(:response) do
Enumerator.new do |yielder|
sleep 0.2
yielder << 1
raise StandardError
end
end
it 'records partial rpc stream consumption' do
expect { subject.to_a }.to raise_error(StandardError)
expect(client.query_time).to be > 0.2
expect_call_details_to_match(duration_higher_than: 0.2)
end
end
end
end
end
end
......@@ -22,11 +22,5 @@ RSpec.describe Gitlab::GitalyClient::CleanupService do
client.apply_bfg_object_map_stream(StringIO.new)
end
it 'is wrapped as a streaming call' do
expect(Gitlab::GitalyClient).to receive(:streaming_call).with(anything, :cleanup_service, :apply_bfg_object_map_stream, anything, anything)
client.apply_bfg_object_map_stream(StringIO.new)
end
end
end
......@@ -521,8 +521,6 @@ RSpec.describe Gitlab::GitalyClient do
context 'when the request store is active', :request_store do
it 'records call details if a RPC is called' do
expect(described_class).to receive(:measure_timings).and_call_original
gitaly_server.server_version
expect(described_class.list_call_details).not_to be_empty
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment