Commit 3d7c9f11 authored by Thong Kuah's avatar Thong Kuah

Merge branch 'ak/es-client' into 'master'

Query pod logs through Elastic Stack if installed

See merge request gitlab-org/gitlab!18961
parents e980d766 1933d647
......@@ -5,12 +5,15 @@ module Clusters
class ElasticStack < ApplicationRecord
VERSION = '1.8.0'
ELASTICSEARCH_PORT = 9200
self.table_name = 'clusters_applications_elastic_stacks'
include ::Clusters::Concerns::ApplicationCore
include ::Clusters::Concerns::ApplicationStatus
include ::Clusters::Concerns::ApplicationVersion
include ::Clusters::Concerns::ApplicationData
include ::Gitlab::Utils::StrongMemoize
default_value_for :version, VERSION
......@@ -49,6 +52,28 @@ module Clusters
)
end
def elasticsearch_client
strong_memoize(:elasticsearch_client) do
next unless kube_client
proxy_url = kube_client.proxy_url('service', 'elastic-stack-elasticsearch-client', ::Clusters::Applications::ElasticStack::ELASTICSEARCH_PORT, Gitlab::Kubernetes::Helm::NAMESPACE)
Elasticsearch::Client.new(url: proxy_url) do |faraday|
# ensures headers containing auth data are appended to original client options
faraday.headers.merge!(kube_client.headers)
# ensure TLS certs are properly verified
faraday.ssl[:verify] = kube_client.ssl_options[:verify_ssl]
faraday.ssl[:cert_store] = kube_client.ssl_options[:cert_store]
end
rescue Kubeclient::HttpError => error
# If users have mistakenly set parameters or removed the depended clusters,
# `proxy_url` could raise an exception because gitlab can not communicate with the cluster.
# We check for a nil client in downstream use and behaviour is equivalent to an empty state
log_exception(error, :failed_to_create_elasticsearch_client)
end
end
private
def specification
......@@ -74,6 +99,10 @@ module Clusters
Gitlab::Kubernetes::KubectlCmd.delete("pvc", "--selector", "release=elastic-stack")
].compact
end
def kube_client
cluster&.kubeclient&.core_client
end
end
end
end
......@@ -60,6 +60,24 @@ module Clusters
# Override if your application needs any action after
# being uninstalled by Helm
end
def logger
@logger ||= Gitlab::Kubernetes::Logger.build
end
def log_exception(error, event)
logger.error({
exception: error.class.name,
status_code: error.error_code,
cluster_id: cluster&.id,
application_id: id,
class_name: self.class.name,
event: event,
message: error.message
})
Gitlab::Sentry.track_acceptable_exception(error, extra: { cluster_id: cluster&.id, application_id: id })
end
end
end
end
......
......@@ -82,9 +82,11 @@ module EE
private
def pod_logs(pod_name, namespace, container: nil)
logs = kubeclient.get_pod_log(
pod_name, namespace, container: container, tail_lines: LOGS_LIMIT
).body
logs = if ::Feature.enabled?(:enable_cluster_application_elastic_stack) && elastic_stack_client
elastic_stack_pod_logs(namespace, pod_name, container)
else
platform_pod_logs(namespace, pod_name, container)
end
{
logs: logs,
......@@ -94,6 +96,25 @@ module EE
}
end
def platform_pod_logs(namespace, pod_name, container_name)
logs = kubeclient.get_pod_log(
pod_name, namespace, container: container_name, tail_lines: LOGS_LIMIT
).body
logs.strip.split("\n")
end
def elastic_stack_pod_logs(namespace, pod_name, container_name)
client = elastic_stack_client
return [] if client.nil?
::Gitlab::Elasticsearch::Logs.new(client).pod_logs(namespace, pod_name, container_name)
end
def elastic_stack_client
cluster.application_elastic_stack&.elasticsearch_client
end
def handle_exceptions(resource_not_found_error_message, opts, &block)
yield
rescue Kubeclient::ResourceNotFoundError
......
......@@ -16,7 +16,6 @@ class PodLogsService < ::BaseService
:check_pod_names,
:check_pod_name,
:pod_logs,
:split_logs,
:filter_return_keys
def initialize(environment, params: {})
......@@ -92,11 +91,6 @@ class PodLogsService < ::BaseService
end
end
def split_logs(result)
result[:logs] = split_by_newline(result[:logs])
success(result)
end
def filter_return_keys(result)
result.slice(*SUCCESS_RETURN_KEYS)
end
......@@ -105,12 +99,6 @@ class PodLogsService < ::BaseService
params.slice(*PARAMS)
end
def split_by_newline(logs)
return unless logs
logs.strip.split("\n").as_json
end
def namespace
environment.deployment_namespace
end
......
# frozen_string_literal: true
module Gitlab
module Elasticsearch
class Logs
# How many log lines to fetch in a query
LOGS_LIMIT = 500
def initialize(client)
@client = client
end
def pod_logs(namespace, pod_name, container_name = nil)
query = {
bool: {
must: [
{
match_phrase: {
"kubernetes.pod.name" => {
query: pod_name
}
}
},
{
match_phrase: {
"kubernetes.namespace" => {
query: namespace
}
}
}
]
}
}
# A pod can contain multiple containers.
# By default we return logs from every container
unless container_name.nil?
query[:bool][:must] << {
match_phrase: {
"kubernetes.container.name" => {
query: container_name
}
}
}
end
body = {
query: query,
# reverse order so we can query N-most recent records
sort: [
{ "@timestamp": { order: :desc } },
{ "offset": { order: :desc } }
],
# only return the message field in the response
_source: ["message"],
# fixed limit for now, we should support paginated queries
size: ::Gitlab::Elasticsearch::Logs::LOGS_LIMIT
}
response = @client.search body: body
result = response.fetch("hits", {}).fetch("hits", []).map { |h| h["_source"]["message"] }
# we queried for the N-most recent records but we want them ordered oldest to newest
result.reverse
end
end
end
end
......@@ -51,7 +51,7 @@ describe 'Environment > Pod Logs', :js do
expect(item.text).to eq(pod_names[i])
end
end
expect(page).to have_content("Log 1\\nLog 2\\nLog 3")
expect(page).to have_content("Log 1 Log 2 Log 3")
end
end
......
{
"took": 7087,
"timed_out": false,
"_shards": {
"total": 151,
"successful": 151,
"skipped": 0,
"failed": 0,
"failures": []
},
"hits": {
"total": 486924,
"max_score": null,
"hits": [
{
"_index": "filebeat-6.7.0-2019.10.25",
"_type": "doc",
"_id": "SkbxAW4BWzhswgK-C5-R",
"_score": null,
"_source": {
"message": "10.8.2.1 - - [25/Oct/2019:08:03:22 UTC] \"GET / HTTP/1.1\" 200 13"
},
"sort": [
9999998,
1571990602947
]
},
{
"_index": "filebeat-6.7.0-2019.10.27",
"_type": "doc",
"_id": "wEigD24BWzhswgK-WUU2",
"_score": null,
"_source": {
"message": "10.8.2.1 - - [27/Oct/2019:23:49:54 UTC] \"GET / HTTP/1.1\" 200 13"
},
"sort": [
9999949,
1572220194500
]
},
{
"_index": "filebeat-6.7.0-2019.11.04",
"_type": "doc",
"_id": "gE6uOG4BWzhswgK-M0x2",
"_score": null,
"_source": {
"message": "10.8.2.1 - - [04/Nov/2019:23:09:24 UTC] \"GET / HTTP/1.1\" 200 13"
},
"sort": [
9999944,
1572908964497
]
},
{
"_index": "filebeat-6.7.0-2019.10.30",
"_type": "doc",
"_id": "0klPHW4BWzhswgK-nfCF",
"_score": null,
"_source": {
"message": "- -\u003e /"
},
"sort": [
9999934,
1572449784442
]
}
]
}
}
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Elasticsearch::Logs do
let(:client) { Elasticsearch::Transport::Client }
let(:es_message_1) { "10.8.2.1 - - [25/Oct/2019:08:03:22 UTC] \"GET / HTTP/1.1\" 200 13" }
let(:es_message_2) { "10.8.2.1 - - [27/Oct/2019:23:49:54 UTC] \"GET / HTTP/1.1\" 200 13" }
let(:es_message_3) { "10.8.2.1 - - [04/Nov/2019:23:09:24 UTC] \"GET / HTTP/1.1\" 200 13" }
let(:es_message_4) { "- -\u003e /" }
let(:es_response) { JSON.parse(fixture_file('lib/elasticsearch/logs_response.json', dir: 'ee')) }
subject { described_class.new(client) }
let(:namespace) { "autodevops-deploy-9-production" }
let(:pod_name) { "production-6866bc8974-m4sk4" }
let(:container_name) { "auto-deploy-app" }
let(:body) do
{
query: {
bool: {
must: [
{
match_phrase: {
"kubernetes.pod.name" => {
query: pod_name
}
}
},
{
match_phrase: {
"kubernetes.namespace" => {
query: namespace
}
}
}
]
}
},
sort: [
{
:@timestamp => {
order: :desc
}
},
{
offset: {
order: :desc
}
}
],
_source: [
"message"
],
size: 500
}
end
let(:body_with_container) do
{
query: {
bool: {
must: [
{
match_phrase: {
"kubernetes.pod.name" => {
query: pod_name
}
}
},
{
match_phrase: {
"kubernetes.namespace" => {
query: namespace
}
}
},
{
match_phrase: {
"kubernetes.container.name" => {
query: container_name
}
}
}
]
}
},
sort: [
{
:@timestamp => {
order: :desc
}
},
{
offset: {
order: :desc
}
}
],
_source: [
"message"
],
size: 500
}
end
describe '#pod_logs' do
it 'returns the logs as an array' do
expect(client).to receive(:search).with(body: body).and_return(es_response)
result = subject.pod_logs(namespace, pod_name)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1])
end
it 'can further filter the logs by container name' do
expect(client).to receive(:search).with(body: body_with_container).and_return(es_response)
result = subject.pod_logs(namespace, pod_name, container_name)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1])
end
end
end
......@@ -146,7 +146,7 @@ describe Clusters::Platforms::Kubernetes do
shared_examples 'successful log request' do
it do
expect(subject[:logs]).to eq("\"Log 1\\nLog 2\\nLog 3\"")
expect(subject[:logs]).to eq(["Log 1", "Log 2", "Log 3"])
expect(subject[:status]).to eq(:success)
expect(subject[:pod_name]).to eq(pod_name)
expect(subject[:container_name]).to eq(container)
......@@ -165,6 +165,19 @@ describe Clusters::Platforms::Kubernetes do
synchronous_reactive_cache(service)
end
context 'when ElasticSearch is enabled' do
let(:cluster) { create(:cluster, :project, platform_kubernetes: service) }
let!(:elastic_stack) { create(:clusters_applications_elastic_stack, cluster: cluster) }
before do
expect_any_instance_of(::Clusters::Applications::ElasticStack).to receive(:elasticsearch_client).at_least(:once).and_return(Elasticsearch::Transport::Client.new)
expect_any_instance_of(::Gitlab::Elasticsearch::Logs).to receive(:pod_logs).and_return(["Log 1", "Log 2", "Log 3"])
stub_feature_flags(enable_cluster_application_elastic_stack: true)
end
include_examples 'successful log request'
end
context 'when kubernetes responds with valid logs' do
before do
stub_kubeclient_logs(pod_name, namespace, container: container)
......
......@@ -70,7 +70,7 @@ describe PodLogsService do
.with(environment.id, response_pod_name, environment.deployment_namespace, container: container_name)
.and_return({
status: :success,
logs: "Log 1\nLog 2\nLog 3",
logs: ["Log 1", "Log 2", "Log 3"],
pod_name: response_pod_name,
container_name: container_name
})
......
......@@ -3,6 +3,8 @@
require 'spec_helper'
describe Clusters::Applications::ElasticStack do
include KubernetesHelpers
include_examples 'cluster application core specs', :clusters_applications_elastic_stack
include_examples 'cluster application status specs', :clusters_applications_elastic_stack
include_examples 'cluster application version specs', :clusters_applications_elastic_stack
......@@ -110,4 +112,68 @@ describe Clusters::Applications::ElasticStack do
expect(values).to include('ELASTICSEARCH_HOSTS')
end
end
describe '#elasticsearch_client' do
context 'cluster is nil' do
it 'returns nil' do
expect(subject.cluster).to be_nil
expect(subject.elasticsearch_client).to be_nil
end
end
context "cluster doesn't have kubeclient" do
let(:cluster) { create(:cluster) }
subject { create(:clusters_applications_elastic_stack, cluster: cluster) }
it 'returns nil' do
expect(subject.elasticsearch_client).to be_nil
end
end
context 'cluster has kubeclient' do
let(:cluster) { create(:cluster, :project, :provided_by_gcp) }
let(:kubernetes_url) { subject.cluster.platform_kubernetes.api_url }
let(:kube_client) { subject.cluster.kubeclient.core_client }
subject { create(:clusters_applications_elastic_stack, cluster: cluster) }
before do
subject.cluster.platform_kubernetes.namespace = 'a-namespace'
stub_kubeclient_discover(cluster.platform_kubernetes.api_url)
create(:cluster_kubernetes_namespace,
cluster: cluster,
cluster_project: cluster.cluster_project,
project: cluster.cluster_project.project)
end
it 'creates proxy elasticsearch_client' do
expect(subject.elasticsearch_client).to be_instance_of(Elasticsearch::Transport::Client)
end
it 'copies proxy_url, options and headers from kube client to elasticsearch_client' do
expect(Elasticsearch::Client)
.to(receive(:new))
.with(url: a_valid_url)
.and_call_original
client = subject.elasticsearch_client
faraday_connection = client.transport.connections.first.connection
expect(faraday_connection.headers["Authorization"]).to eq(kube_client.headers[:Authorization])
expect(faraday_connection.ssl.cert_store).to be_instance_of(OpenSSL::X509::Store)
expect(faraday_connection.ssl.verify).to eq(1)
end
context 'when cluster is not reachable' do
before do
allow(kube_client).to receive(:proxy_url).and_raise(Kubeclient::HttpError.new(401, 'Unauthorized', nil))
end
it 'returns nil' do
expect(subject.elasticsearch_client).to be_nil
end
end
end
end
end
......@@ -16,7 +16,7 @@ module KubernetesHelpers
end
def kube_logs_response
kube_response(kube_logs_body)
{ body: kube_logs_body }
end
def kube_deployments_response
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment