Commit d85207d8 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch 'osw-instrument-lazily-consumed-gitaly-streams' into 'master'

Add instrumentation to Gitaly streamed responses

See merge request gitlab-org/gitlab!35283
parents d041ad56 dfa80312
---
title: Add instrumentation to Gitaly streamed responses
merge_request: 35283
author:
type: fixed
......@@ -166,20 +166,7 @@ module Gitlab
# "gitaly-2 is at network address tcp://10.0.1.2:8075".
#
def self.call(storage, service, rpc, request, remote_storage: nil, timeout: default_timeout, &block)
self.measure_timings(service, rpc, request) do
self.execute(storage, service, rpc, request, remote_storage: remote_storage, timeout: timeout, &block)
end
end
# This method is like GitalyClient.call but should be used with
# Gitaly streaming RPCs. It measures how long the the RPC took to
# produce the full response, not just the initial response.
def self.streaming_call(storage, service, rpc, request, remote_storage: nil, timeout: default_timeout)
self.measure_timings(service, rpc, request) do
response = self.execute(storage, service, rpc, request, remote_storage: remote_storage, timeout: timeout)
yield(response)
end
Gitlab::GitalyClient::Call.new(storage, service, rpc, request, remote_storage, timeout).call(&block)
end
def self.execute(storage, service, rpc, request, remote_storage:, timeout:)
......@@ -192,23 +179,6 @@ module Gitlab
stub(service, storage).__send__(rpc, request, kwargs) # rubocop:disable GitlabSecurity/PublicSend
end
def self.measure_timings(service, rpc, request)
start = Gitlab::Metrics::System.monotonic_time
yield
ensure
duration = Gitlab::Metrics::System.monotonic_time - start
request_hash = request.is_a?(Google::Protobuf::MessageExts) ? request.to_h : {}
# Keep track, separately, for the performance bar
self.add_query_time(duration)
if Gitlab::PerformanceBar.enabled_for_request?
add_call_details(feature: "#{service}##{rpc}", duration: duration, request: request_hash, rpc: rpc,
backtrace: Gitlab::BacktraceCleaner.clean_backtrace(caller))
end
end
def self.query_time
query_time = Gitlab::SafeRequestStore[:gitaly_query_time] || 0
query_time.round(Gitlab::InstrumentationHelper::DURATION_PRECISION)
......
......@@ -15,10 +15,9 @@ module Gitlab
oid: oid,
limit: limit
)
GitalyClient.streaming_call(@gitaly_repo.storage_name, :blob_service, :get_blob, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_blob, request, timeout: GitalyClient.fast_timeout)
consume_blob_response(response)
end
end
def batch_lfs_pointers(blob_ids)
return [] if blob_ids.empty?
......@@ -28,10 +27,9 @@ module Gitlab
blob_ids: blob_ids
)
GitalyClient.streaming_call(@gitaly_repo.storage_name, :blob_service, :get_lfs_pointers, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
map_lfs_pointers(response)
end
end
def get_blobs(revision_paths, limit = -1)
return [] if revision_paths.empty?
......@@ -46,16 +44,15 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(
response = GitalyClient.call(
@gitaly_repo.storage_name,
:blob_service,
:get_blobs,
request,
timeout: GitalyClient.fast_timeout
) do |response|
timeout: GitalyClient.fast_timeout)
GitalyClient::BlobsStitcher.new(response)
end
end
def get_blob_types(revision_paths, limit = -1)
return {} if revision_paths.empty?
......@@ -70,16 +67,15 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(
response = GitalyClient.call(
@gitaly_repo.storage_name,
:blob_service,
:get_blobs,
request,
timeout: GitalyClient.fast_timeout
) do |response|
)
map_blob_types(response)
end
end
def get_new_lfs_pointers(revision, limit, not_in, dynamic_timeout = nil)
request = Gitaly::GetNewLFSPointersRequest.new(
......@@ -101,26 +97,24 @@ module Gitlab
GitalyClient.medium_timeout
end
GitalyClient.streaming_call(
response = GitalyClient.call(
@gitaly_repo.storage_name,
:blob_service,
:get_new_lfs_pointers,
request,
timeout: timeout
) do |response|
)
map_lfs_pointers(response)
end
end
def get_all_lfs_pointers
request = Gitaly::GetAllLFSPointersRequest.new(
repository: @gitaly_repo
)
GitalyClient.streaming_call(@gitaly_repo.storage_name, :blob_service, :get_all_lfs_pointers, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_all_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
map_lfs_pointers(response)
end
end
private
......
# frozen_string_literal: true
module Gitlab
module GitalyClient
class Call
def initialize(storage, service, rpc, request, remote_storage, timeout)
@storage = storage
@service = service
@rpc = rpc
@request = request
@remote_storage = remote_storage
@timeout = timeout
@duration = 0
end
def call(&block)
response = recording_request do
GitalyClient.execute(@storage, @service, @rpc, @request, remote_storage: @remote_storage, timeout: @timeout, &block)
end
if response.is_a?(Enumerator)
# When the given response is an enumerator (coming from streamed
# responses), we wrap it in order to properly measure the stream
# consumption as it happens.
#
# store_timings is not called in that scenario as needs to be
# handled lazily in the custom Enumerator context.
instrument_stream(response)
else
store_timings
response
end
rescue => err
store_timings
raise err
end
private
def instrument_stream(response)
Enumerator.new do |yielder|
loop do
value = recording_request { response.next }
yielder.yield(value)
end
ensure
store_timings
end
end
def recording_request
start = Gitlab::Metrics::System.monotonic_time
yield
ensure
@duration += Gitlab::Metrics::System.monotonic_time - start
end
def store_timings
GitalyClient.add_query_time(@duration)
return unless Gitlab::PerformanceBar.enabled_for_request?
request_hash = @request.is_a?(Google::Protobuf::MessageExts) ? @request.to_h : {}
GitalyClient.add_call_details(feature: "#{@service}##{@rpc}", duration: @duration, request: request_hash, rpc: @rpc,
backtrace: Gitlab::BacktraceCleaner.clean_backtrace(caller))
end
end
end
end
......@@ -13,16 +13,15 @@ module Gitlab
end
def apply_bfg_object_map_stream(io, &blk)
GitalyClient.streaming_call(
response = GitalyClient.call(
storage,
:cleanup_service,
:apply_bfg_object_map_stream,
build_object_map_enum(io),
timeout: GitalyClient.long_timeout
) do |response|
)
response.each(&blk)
end
end
private
......
......@@ -72,10 +72,9 @@ module Gitlab
def commit_deltas(commit)
request = Gitaly::CommitDeltaRequest.new(diff_from_parent_request_params(commit))
GitalyClient.streaming_call(@repository.storage, :diff_service, :commit_delta, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@repository.storage, :diff_service, :commit_delta, request, timeout: GitalyClient.fast_timeout)
response.flat_map { |msg| msg.deltas }
end
end
def tree_entry(ref, path, limit = nil)
if Pathname.new(path).cleanpath.to_s.start_with?('../')
......@@ -202,10 +201,9 @@ module Gitlab
to: to
)
GitalyClient.streaming_call(@repository.storage, :commit_service, :commits_between, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@repository.storage, :commit_service, :commits_between, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
end
def diff_stats(left_commit_sha, right_commit_sha)
request = Gitaly::DiffStatsRequest.new(
......@@ -214,10 +212,9 @@ module Gitlab
right_commit_id: right_commit_sha
)
GitalyClient.streaming_call(@repository.storage, :diff_service, :diff_stats, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@repository.storage, :diff_service, :diff_stats, request, timeout: GitalyClient.medium_timeout)
response.flat_map(&:stats)
end
end
def find_all_commits(opts = {})
request = Gitaly::FindAllCommitsRequest.new(
......@@ -228,19 +225,17 @@ module Gitlab
)
request.order = opts[:order].upcase if opts[:order].present?
GitalyClient.streaming_call(@repository.storage, :commit_service, :find_all_commits, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@repository.storage, :commit_service, :find_all_commits, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
end
def list_commits_by_oid(oids)
return [] if oids.empty?
request = Gitaly::ListCommitsByOidRequest.new(repository: @gitaly_repo, oid: oids)
GitalyClient.streaming_call(@repository.storage, :commit_service, :list_commits_by_oid, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@repository.storage, :commit_service, :list_commits_by_oid, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
rescue GRPC::NotFound # If no repository is found, happens mainly during testing
[]
end
......@@ -256,10 +251,9 @@ module Gitlab
global_options: parse_global_options!(literal_pathspec: literal_pathspec)
)
GitalyClient.streaming_call(@repository.storage, :commit_service, :commits_by_message, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@repository.storage, :commit_service, :commits_by_message, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
end
def languages(ref = nil)
request = Gitaly::CommitLanguagesRequest.new(repository: @gitaly_repo, revision: ref || '')
......@@ -334,10 +328,9 @@ module Gitlab
request.paths = encode_repeated(Array(options[:path])) if options[:path].present?
GitalyClient.streaming_call(@repository.storage, :commit_service, :find_commits, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@repository.storage, :commit_service, :find_commits, request, timeout: GitalyClient.medium_timeout)
consume_commits_response(response)
end
end
def filter_shas_with_signatures(shas)
request = Gitaly::FilterShasWithSignaturesRequest.new(repository: @gitaly_repo)
......@@ -352,12 +345,11 @@ module Gitlab
end
end
GitalyClient.streaming_call(@repository.storage, :commit_service, :filter_shas_with_signatures, enum, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@repository.storage, :commit_service, :filter_shas_with_signatures, enum, timeout: GitalyClient.fast_timeout)
response.flat_map do |msg|
msg.shas.map { |sha| EncodingHelper.encode!(sha) }
end
end
end
def get_commit_signatures(commit_ids)
request = Gitaly::GetCommitSignaturesRequest.new(repository: @gitaly_repo, commit_ids: commit_ids)
......@@ -423,10 +415,9 @@ module Gitlab
request_params.merge!(Gitlab::Git::DiffCollection.limits(options).to_h)
request = Gitaly::CommitDiffRequest.new(request_params)
GitalyClient.streaming_call(@repository.storage, :diff_service, :commit_diff, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@repository.storage, :diff_service, :commit_diff, request, timeout: GitalyClient.medium_timeout)
GitalyClient::DiffStitcher.new(response)
end
end
def diff_from_parent_request_params(commit, options = {})
parent_id = commit.parent_ids.first || Gitlab::Git::EMPTY_TREE_ID
......
......@@ -20,10 +20,9 @@ module Gitlab
our_commit_oid: @our_commit_oid,
their_commit_oid: @their_commit_oid
)
GitalyClient.streaming_call(@repository.storage, :conflicts_service, :list_conflict_files, request, timeout: GitalyClient.long_timeout) do |response|
response = GitalyClient.call(@repository.storage, :conflicts_service, :list_conflict_files, request, timeout: GitalyClient.long_timeout)
GitalyClient::ConflictFilesStitcher.new(response, @gitaly_repo)
end
end
def conflicts?
list_conflict_files.any?
......
......@@ -14,17 +14,15 @@ module Gitlab
def branches
request = Gitaly::FindAllBranchesRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_all_branches_response(response)
end
end
def remote_branches(remote_name)
request = Gitaly::FindAllRemoteBranchesRequest.new(repository: @gitaly_repo, remote_name: remote_name)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_remote_branches, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :find_all_remote_branches, request, timeout: GitalyClient.medium_timeout)
consume_find_all_remote_branches_response(remote_name, response)
end
end
def merged_branches(branch_names = [])
request = Gitaly::FindAllBranchesRequest.new(
......@@ -32,10 +30,9 @@ module Gitlab
merged_only: true,
merged_branches: branch_names.map { |s| encode_binary(s) }
)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_all_branches_response(response)
end
end
def default_branch_name
request = Gitaly::FindDefaultBranchNameRequest.new(repository: @gitaly_repo)
......@@ -45,17 +42,15 @@ module Gitlab
def branch_names
request = Gitaly::FindAllBranchNamesRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_branch_names, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :find_all_branch_names, request, timeout: GitalyClient.fast_timeout)
consume_refs_response(response) { |name| Gitlab::Git.branch_name(name) }
end
end
def tag_names
request = Gitaly::FindAllTagNamesRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_tag_names, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :find_all_tag_names, request, timeout: GitalyClient.fast_timeout)
consume_refs_response(response) { |name| Gitlab::Git.tag_name(name) }
end
end
def find_ref_name(commit_id, ref_prefix)
request = Gitaly::FindRefNameRequest.new(
......@@ -75,13 +70,12 @@ module Gitlab
commits = []
GitalyClient.streaming_call(@storage, :ref_service, :list_new_commits, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :list_new_commits, request, timeout: GitalyClient.medium_timeout)
response.each do |msg|
msg.commits.each do |c|
commits << Gitlab::Git::Commit.new(@repository, c)
end
end
end
commits
end
......@@ -100,14 +94,13 @@ module Gitlab
GitalyClient.medium_timeout
end
GitalyClient.streaming_call(@storage, :ref_service, :list_new_blobs, request, timeout: timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :list_new_blobs, request, timeout: timeout)
response.flat_map do |msg|
# Returns an Array of Gitaly::NewBlobObject objects
# Available methods are: #size, #oid and #path
msg.new_blob_objects
end
end
end
def count_tag_names
tag_names.count
......@@ -120,17 +113,15 @@ module Gitlab
def local_branches(sort_by: nil)
request = Gitaly::FindLocalBranchesRequest.new(repository: @gitaly_repo)
request.sort_by = sort_by_param(sort_by) if sort_by
GitalyClient.streaming_call(@storage, :ref_service, :find_local_branches, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :find_local_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_local_branches_response(response)
end
end
def tags
request = Gitaly::FindAllTagsRequest.new(repository: @gitaly_repo)
GitalyClient.streaming_call(@storage, :ref_service, :find_all_tags, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :find_all_tags, request, timeout: GitalyClient.medium_timeout)
consume_tags_response(response)
end
end
def ref_exists?(ref_name)
request = Gitaly::RefExistsRequest.new(repository: @gitaly_repo, ref: encode_binary(ref_name))
......@@ -174,10 +165,9 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(@storage, :ref_service, :list_tag_names_containing_commit, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :list_tag_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
consume_ref_contains_sha_response(response, :tag_names)
end
end
# Limit: 0 implies no limit, thus all tag names will be returned
def branch_names_contains_sha(sha, limit: 0)
......@@ -187,23 +177,21 @@ module Gitlab
limit: limit
)
GitalyClient.streaming_call(@storage, :ref_service, :list_branch_names_containing_commit, request, timeout: GitalyClient.medium_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :list_branch_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
consume_ref_contains_sha_response(response, :branch_names)
end
end
def get_tag_messages(tag_ids)
request = Gitaly::GetTagMessagesRequest.new(repository: @gitaly_repo, tag_ids: tag_ids)
messages = Hash.new { |h, k| h[k] = +''.b }
current_tag_id = nil
GitalyClient.streaming_call(@storage, :ref_service, :get_tag_messages, request, timeout: GitalyClient.fast_timeout) do |response|
response = GitalyClient.call(@storage, :ref_service, :get_tag_messages, request, timeout: GitalyClient.fast_timeout)
response.each do |rpc_message|
current_tag_id = rpc_message.tag_id if rpc_message.tag_id.present?
messages[current_tag_id] << rpc_message.message
end
end
messages
end
......
......@@ -334,10 +334,9 @@ module Gitlab
def search_files_by_content(ref, query, options = {})
request = Gitaly::SearchFilesByContentRequest.new(repository: @gitaly_repo, ref: ref, query: query)
GitalyClient.streaming_call(@storage, :repository_service, :search_files_by_content, request, timeout: GitalyClient.default_timeout) do |response|
response = GitalyClient.call(@storage, :repository_service, :search_files_by_content, request, timeout: GitalyClient.default_timeout)
search_results_from_response(response, options)
end
end
def disconnect_alternates
request = Gitaly::DisconnectGitAlternatesRequest.new(
......@@ -403,16 +402,15 @@ module Gitlab
def gitaly_fetch_stream_to_file(save_path, rpc_name, request_class, timeout)
request = request_class.new(repository: @gitaly_repo)
GitalyClient.streaming_call(
response = GitalyClient.call(
@storage,
:repository_service,
rpc_name,
request,
timeout: timeout
) do |response|
)
write_stream_to_file(response, save_path)
end
end
def write_stream_to_file(response, save_path)
File.open(save_path, 'wb') do |f|
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::GitalyClient::Call do
describe '#call', :request_store do
let(:client) { Gitlab::GitalyClient }
let(:storage) { 'default' }
let(:remote_storage) { nil }
let(:request) { Gitaly::FindLocalBranchesRequest.new }
let(:rpc) { :find_local_branches }
let(:service) { :ref_service }
let(:timeout) { client.long_timeout }
subject do
described_class.new(storage, service, rpc, request, remote_storage, timeout).call
end
before do
allow(client).to receive(:execute) { response }
allow(Gitlab::PerformanceBar).to receive(:enabled_for_request?) { true }
end
def expect_call_details_to_match(duration_higher_than: 0)
expect(client.list_call_details.size).to eq(1)
expect(client.list_call_details.first)
.to match a_hash_including(feature: "#{service}##{rpc}",
duration: a_value > duration_higher_than,
request: an_instance_of(Hash),
rpc: rpc,
backtrace: an_instance_of(Array))
end
context 'when the response is not an enumerator' do
let(:response) do
Gitaly::FindLocalBranchesResponse.new
end
it 'returns the response' do
expect(subject).to eq(response)
end
it 'stores timings and call details' do
subject
expect(client.query_time).to be > 0
expect_call_details_to_match
end
context 'when err' do
before do
allow(client).to receive(:execute).and_raise(StandardError)
end
it 'stores timings and call details' do
expect { subject }.to raise_error(StandardError)
expect(client.query_time).to be > 0
expect_call_details_to_match
end
end
end
context 'when the response is an enumerator' do
let(:response) do
Enumerator.new do |yielder|
yielder << 1
yielder << 2
end
end
it 'returns a consumable enumerator' do
instrumented_response = subject
expect(instrumented_response).to be_a(Enumerator)
expect(instrumented_response.to_a).to eq([1, 2])
end
context 'time measurements' do
let(:response) do
Enumerator.new do |yielder|
sleep 0.1
yielder << 1
sleep 0.2
yielder << 2
end
end
it 'records full rpc stream consumption' do
subject.to_a
expect(client.query_time).to be > 0.3
expect_call_details_to_match(duration_higher_than: 0.3)
end
it 'records partial rpc stream consumption' do
subject.first
expect(client.query_time).to be > 0.1
expect_call_details_to_match(duration_higher_than: 0.1)
end
context 'when err' do
let(:response) do
Enumerator.new do |yielder|
sleep 0.2
yielder << 1
raise StandardError
end
end
it 'records partial rpc stream consumption' do
expect { subject.to_a }.to raise_error(StandardError)
expect(client.query_time).to be > 0.2
expect_call_details_to_match(duration_higher_than: 0.2)
end
end
end
end
end
end
......@@ -22,11 +22,5 @@ RSpec.describe Gitlab::GitalyClient::CleanupService do
client.apply_bfg_object_map_stream(StringIO.new)
end
it 'is wrapped as a streaming call' do
expect(Gitlab::GitalyClient).to receive(:streaming_call).with(anything, :cleanup_service, :apply_bfg_object_map_stream, anything, anything)
client.apply_bfg_object_map_stream(StringIO.new)
end
end
end
......@@ -521,8 +521,6 @@ RSpec.describe Gitlab::GitalyClient do
context 'when the request store is active', :request_store do
it 'records call details if a RPC is called' do
expect(described_class).to receive(:measure_timings).and_call_original
gitaly_server.server_version
expect(described_class.list_call_details).not_to be_empty
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment