Commit 620e7bb3 authored by Yorick Peterse's avatar Yorick Peterse

Write to InfluxDB directly via UDP

This removes the need for Sidekiq and any overhead/problems introduced
by TCP. There are a few things to take into account:

1. When writing data to InfluxDB you may still get an error if the
   server becomes unavailable during the write. Because of this we're
   catching all exceptions and just ignore them (for now).
2. Writing via UDP apparently requires the timestamp to be in
   nanoseconds. Without this data either isn't written properly.
3. Due to the restrictions on UDP buffer sizes we're writing metrics one
   by one, instead of writing all of them at once.
parent 03478e6d
......@@ -3,5 +3,5 @@
# lib/support/init.d, which call scripts in bin/ .
#
web: bundle exec unicorn_rails -p ${PORT:="3000"} -E ${RAILS_ENV:="development"} -c ${UNICORN_CONFIG:="config/unicorn.rb"}
worker: bundle exec sidekiq -q post_receive -q mailers -q archive_repo -q system_hook -q project_web_hook -q gitlab_shell -q incoming_email -q runner -q common -q default -q metrics
worker: bundle exec sidekiq -q post_receive -q mailers -q archive_repo -q system_hook -q project_web_hook -q gitlab_shell -q incoming_email -q runner -q common -q default
# mail_room: bundle exec mail_room -q -c config/mail_room.yml
......@@ -69,7 +69,7 @@ class Admin::ApplicationSettingsController < Admin::ApplicationController
:max_artifacts_size,
:metrics_enabled,
:metrics_host,
:metrics_database,
:metrics_port,
:metrics_username,
:metrics_password,
:metrics_pool_size,
......
......@@ -171,12 +171,14 @@
.col-sm-10
= f.text_field :metrics_host, class: 'form-control', placeholder: 'influxdb.example.com'
.form-group
= f.label :metrics_database, 'InfluxDB database', class: 'control-label col-sm-2'
= f.label :metrics_port, 'InfluxDB port', class: 'control-label col-sm-2'
.col-sm-10
= f.text_field :metrics_database, class: 'form-control', placeholder: 'gitlab'
= f.text_field :metrics_port, class: 'form-control', placeholder: '8089'
.help-block
The name of the InfluxDB database to store data in. Users will have to
create this database manually, GitLab does not do so automatically.
The UDP port to use for connecting to InfluxDB. InfluxDB requires that
your server configuration specifies a database to store data in when
sending messages to this port, without it metrics data will not be
saved.
.form-group
= f.label :metrics_username, 'InfluxDB username', class: 'control-label col-sm-2'
.col-sm-10
......
class MetricsWorker
include Sidekiq::Worker
sidekiq_options queue: :metrics
def perform(metrics)
prepared = prepare_metrics(metrics)
Gitlab::Metrics.pool.with do |connection|
connection.write_points(prepared)
end
end
def prepare_metrics(metrics)
metrics.map do |hash|
new_hash = hash.symbolize_keys
new_hash[:tags].each do |key, value|
if value.blank?
new_hash[:tags].delete(key)
else
new_hash[:tags][key] = escape_value(value)
end
end
new_hash
end
end
def escape_value(value)
value.to_s.gsub('=', '\\=')
end
end
class InfluxdbUdpPortSetting < ActiveRecord::Migration
def change
add_column :application_settings, :metrics_port, :integer, default: 8089
end
end
class InfluxdbRemoteDatabaseSetting < ActiveRecord::Migration
def change
remove_column :application_settings, :metrics_database
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20151228175719) do
ActiveRecord::Schema.define(version: 20151229112614) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -33,36 +33,36 @@ ActiveRecord::Schema.define(version: 20151228175719) do
t.datetime "created_at"
t.datetime "updated_at"
t.string "home_page_url"
t.integer "default_branch_protection", default: 2
t.boolean "twitter_sharing_enabled", default: true
t.integer "default_branch_protection", default: 2
t.boolean "twitter_sharing_enabled", default: true
t.text "restricted_visibility_levels"
t.boolean "version_check_enabled", default: true
t.integer "max_attachment_size", default: 10, null: false
t.boolean "version_check_enabled", default: true
t.integer "max_attachment_size", default: 10, null: false
t.integer "default_project_visibility"
t.integer "default_snippet_visibility"
t.text "restricted_signup_domains"
t.boolean "user_oauth_applications", default: true
t.boolean "user_oauth_applications", default: true
t.string "after_sign_out_path"
t.integer "session_expire_delay", default: 10080, null: false
t.integer "session_expire_delay", default: 10080, null: false
t.text "import_sources"
t.text "help_page_text"
t.string "admin_notification_email"
t.boolean "shared_runners_enabled", default: true, null: false
t.integer "max_artifacts_size", default: 100, null: false
t.boolean "shared_runners_enabled", default: true, null: false
t.integer "max_artifacts_size", default: 100, null: false
t.string "runners_registration_token"
t.boolean "require_two_factor_authentication", default: false
t.integer "two_factor_grace_period", default: 48
t.boolean "metrics_enabled", default: false
t.string "metrics_host", default: "localhost"
t.string "metrics_database", default: "gitlab"
t.boolean "require_two_factor_authentication", default: false
t.integer "two_factor_grace_period", default: 48
t.boolean "metrics_enabled", default: false
t.string "metrics_host", default: "localhost"
t.string "metrics_username"
t.string "metrics_password"
t.integer "metrics_pool_size", default: 16
t.integer "metrics_timeout", default: 10
t.integer "metrics_method_call_threshold", default: 10
t.integer "metrics_pool_size", default: 16
t.integer "metrics_timeout", default: 10
t.integer "metrics_method_call_threshold", default: 10
t.boolean "recaptcha_enabled", default: false
t.string "recaptcha_site_key"
t.string "recaptcha_private_key"
t.integer "metrics_port", default: 8089
end
create_table "audit_events", force: :cascade do |t|
......@@ -796,12 +796,12 @@ ActiveRecord::Schema.define(version: 20151228175719) do
add_index "tags", ["name"], name: "index_tags_on_name", unique: true, using: :btree
create_table "users", force: :cascade do |t|
t.string "email", default: "", null: false
t.string "encrypted_password", default: "", null: false
t.string "email", default: "", null: false
t.string "encrypted_password", default: "", null: false
t.string "reset_password_token"
t.datetime "reset_password_sent_at"
t.datetime "remember_created_at"
t.integer "sign_in_count", default: 0
t.integer "sign_in_count", default: 0
t.datetime "current_sign_in_at"
t.datetime "last_sign_in_at"
t.string "current_sign_in_ip"
......@@ -809,22 +809,22 @@ ActiveRecord::Schema.define(version: 20151228175719) do
t.datetime "created_at"
t.datetime "updated_at"
t.string "name"
t.boolean "admin", default: false, null: false
t.integer "projects_limit", default: 10
t.string "skype", default: "", null: false
t.string "linkedin", default: "", null: false
t.string "twitter", default: "", null: false
t.boolean "admin", default: false, null: false
t.integer "projects_limit", default: 10
t.string "skype", default: "", null: false
t.string "linkedin", default: "", null: false
t.string "twitter", default: "", null: false
t.string "authentication_token"
t.integer "theme_id", default: 1, null: false
t.integer "theme_id", default: 1, null: false
t.string "bio"
t.integer "failed_attempts", default: 0
t.integer "failed_attempts", default: 0
t.datetime "locked_at"
t.string "username"
t.boolean "can_create_group", default: true, null: false
t.boolean "can_create_team", default: true, null: false
t.boolean "can_create_group", default: true, null: false
t.boolean "can_create_team", default: true, null: false
t.string "state"
t.integer "color_scheme_id", default: 1, null: false
t.integer "notification_level", default: 1, null: false
t.integer "color_scheme_id", default: 1, null: false
t.integer "notification_level", default: 1, null: false
t.datetime "password_expires_at"
t.integer "created_by_id"
t.datetime "last_credential_check_at"
......@@ -833,23 +833,23 @@ ActiveRecord::Schema.define(version: 20151228175719) do
t.datetime "confirmed_at"
t.datetime "confirmation_sent_at"
t.string "unconfirmed_email"
t.boolean "hide_no_ssh_key", default: false
t.string "website_url", default: "", null: false
t.boolean "hide_no_ssh_key", default: false
t.string "website_url", default: "", null: false
t.string "notification_email"
t.boolean "hide_no_password", default: false
t.boolean "password_automatically_set", default: false
t.boolean "hide_no_password", default: false
t.boolean "password_automatically_set", default: false
t.string "location"
t.string "encrypted_otp_secret"
t.string "encrypted_otp_secret_iv"
t.string "encrypted_otp_secret_salt"
t.boolean "otp_required_for_login", default: false, null: false
t.boolean "otp_required_for_login", default: false, null: false
t.text "otp_backup_codes"
t.string "public_email", default: "", null: false
t.integer "dashboard", default: 0
t.integer "project_view", default: 0
t.string "public_email", default: "", null: false
t.integer "dashboard", default: 0
t.integer "project_view", default: 0
t.integer "consumed_timestep"
t.integer "layout", default: 0
t.boolean "hide_project_limit", default: false
t.integer "layout", default: 0
t.boolean "hide_project_limit", default: false
t.string "unlock_token"
t.datetime "otp_grace_period_started_at"
end
......
......@@ -66,6 +66,39 @@ module Gitlab
end
end
def self.submit_metrics(metrics)
prepared = prepare_metrics(metrics)
pool.with do |connection|
prepared.each do |metric|
begin
connection.write_points([metric])
rescue StandardError
end
end
end
end
def self.prepare_metrics(metrics)
metrics.map do |hash|
new_hash = hash.symbolize_keys
new_hash[:tags].each do |key, value|
if value.blank?
new_hash[:tags].delete(key)
else
new_hash[:tags][key] = escape_value(value)
end
end
new_hash
end
end
def self.escape_value(value)
value.to_s.gsub('=', '\\=')
end
@hostname = Socket.gethostname
# When enabled this should be set before being used as the usual pattern
......@@ -73,11 +106,12 @@ module Gitlab
if enabled?
@pool = ConnectionPool.new(size: pool_size, timeout: timeout) do
host = settings[:metrics_host]
db = settings[:metrics_database]
user = settings[:metrics_username]
pw = settings[:metrics_password]
port = settings[:metrics_port]
InfluxDB::Client.new(db, host: host, username: user, password: pw)
InfluxDB::Client.
new(udp: { host: host, port: port }, username: user, password: pw)
end
end
end
......
......@@ -26,7 +26,7 @@ module Gitlab
process_type: Sidekiq.server? ? 'sidekiq' : 'rails'
),
values: @values,
timestamp: @created_at.to_i
timestamp: @created_at.to_i * 1_000_000_000
}
end
end
......
......@@ -40,7 +40,7 @@ module Gitlab
sql = sql.delete('"')
end
sql.gsub("\n", ' ')
sql.tr("\n", ' ')
end
end
end
......
......@@ -46,7 +46,7 @@ module Gitlab
end
def flush
MetricsWorker.perform_async(@metrics.map(&:to_hash))
Metrics.submit_metrics(@metrics.map(&:to_hash))
end
def sample_memory_usage
......
......@@ -5,13 +5,6 @@ module Gitlab
# This middleware is intended to be used as a server-side middleware.
class SidekiqMiddleware
def call(worker, message, queue)
# We don't want to track the MetricsWorker itself as otherwise we'll end
# up in an infinite loop.
if worker.class == MetricsWorker
yield
return
end
trans = Transaction.new
begin
......
......@@ -59,7 +59,7 @@ module Gitlab
end
def submit
MetricsWorker.perform_async(@metrics.map(&:to_hash))
Metrics.submit_metrics(@metrics.map(&:to_hash))
end
end
end
......
......@@ -38,7 +38,7 @@ describe Gitlab::Metrics::Sampler do
describe '#flush' do
it 'schedules the metrics using Sidekiq' do
expect(MetricsWorker).to receive(:perform_async).
expect(Gitlab::Metrics).to receive(:submit_metrics).
with([an_instance_of(Hash)])
sampler.sample_memory_usage
......
......@@ -11,14 +11,6 @@ describe Gitlab::Metrics::SidekiqMiddleware do
middleware.call(worker, 'test', :test) { nil }
end
it 'does not track jobs of the MetricsWorker' do
worker = MetricsWorker.new
expect(Gitlab::Metrics::Transaction).to_not receive(:new)
middleware.call(worker, 'test', :test) { nil }
end
end
describe '#tag_worker' do
......
......@@ -68,7 +68,7 @@ describe Gitlab::Metrics::Transaction do
it 'submits the metrics to Sidekiq' do
transaction.track_self
expect(MetricsWorker).to receive(:perform_async).
expect(Gitlab::Metrics).to receive(:submit_metrics).
with([an_instance_of(Hash)])
transaction.submit
......
......@@ -33,4 +33,52 @@ describe Gitlab::Metrics do
expect(file).to eq('spec/lib/gitlab/metrics_spec.rb')
end
end
describe '#submit_metrics' do
it 'prepares and writes the metrics to InfluxDB' do
connection = double(:connection)
pool = double(:pool)
expect(pool).to receive(:with).and_yield(connection)
expect(connection).to receive(:write_points).with(an_instance_of(Array))
expect(Gitlab::Metrics).to receive(:pool).and_return(pool)
described_class.submit_metrics([{ 'series' => 'kittens', 'tags' => {} }])
end
end
describe '#prepare_metrics' do
it 'returns a Hash with the keys as Symbols' do
metrics = described_class.
prepare_metrics([{ 'values' => {}, 'tags' => {} }])
expect(metrics).to eq([{ values: {}, tags: {} }])
end
it 'escapes tag values' do
metrics = described_class.prepare_metrics([
{ 'values' => {}, 'tags' => { 'foo' => 'bar=' } }
])
expect(metrics).to eq([{ values: {}, tags: { 'foo' => 'bar\\=' } }])
end
it 'drops empty tags' do
metrics = described_class.prepare_metrics([
{ 'values' => {}, 'tags' => { 'cats' => '', 'dogs' => nil } }
])
expect(metrics).to eq([{ values: {}, tags: {} }])
end
end
describe '#escape_value' do
it 'escapes an equals sign' do
expect(described_class.escape_value('foo=')).to eq('foo\\=')
end
it 'casts values to Strings' do
expect(described_class.escape_value(10)).to eq('10')
end
end
end
require 'spec_helper'
describe MetricsWorker do
let(:worker) { described_class.new }
describe '#perform' do
it 'prepares and writes the metrics to InfluxDB' do
connection = double(:connection)
pool = double(:pool)
expect(pool).to receive(:with).and_yield(connection)
expect(connection).to receive(:write_points).with(an_instance_of(Array))
expect(Gitlab::Metrics).to receive(:pool).and_return(pool)
worker.perform([{ 'series' => 'kittens', 'tags' => {} }])
end
end
describe '#prepare_metrics' do
it 'returns a Hash with the keys as Symbols' do
metrics = worker.prepare_metrics([{ 'values' => {}, 'tags' => {} }])
expect(metrics).to eq([{ values: {}, tags: {} }])
end
it 'escapes tag values' do
metrics = worker.prepare_metrics([
{ 'values' => {}, 'tags' => { 'foo' => 'bar=' } }
])
expect(metrics).to eq([{ values: {}, tags: { 'foo' => 'bar\\=' } }])
end
it 'drops empty tags' do
metrics = worker.prepare_metrics([
{ 'values' => {}, 'tags' => { 'cats' => '', 'dogs' => nil } }
])
expect(metrics).to eq([{ values: {}, tags: {} }])
end
end
describe '#escape_value' do
it 'escapes an equals sign' do
expect(worker.escape_value('foo=')).to eq('foo\\=')
end
it 'casts values to Strings' do
expect(worker.escape_value(10)).to eq('10')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment