Commit 8be1ab23 authored by Tetiana Chupryna's avatar Tetiana Chupryna

Merge branch '340364-create-agent-activity-events-with-cleanup' into 'master'

Clean up expired agent activity events

See merge request gitlab-org/gitlab!75709
parents 5119fc16 8aa9f254
......@@ -5,6 +5,7 @@ module Clusters
self.table_name = 'cluster_agents'
INACTIVE_AFTER = 1.hour.freeze
ACTIVITY_EVENT_LIMIT = 200
belongs_to :created_by_user, class_name: 'User', optional: true
belongs_to :project, class_name: '::Project' # Otherwise, it will load ::Clusters::Project
......@@ -39,5 +40,12 @@ module Clusters
def connected?
agent_tokens.active.where("last_used_at > ?", INACTIVE_AFTER.ago).exists?
end
def activity_event_deletion_cutoff
# Order is defined by the association
activity_events
.offset(ACTIVITY_EVENT_LIMIT - 1)
.pick(:recorded_at)
end
end
end
......@@ -56,12 +56,13 @@ module Clusters
end
def log_activity_event!(recorded_at)
agent.activity_events.create!(
Clusters::Agents::CreateActivityEventService.new( # rubocop: disable CodeReuse/ServiceClass
agent,
kind: :agent_connected,
level: :info,
recorded_at: recorded_at,
agent_token: self
)
).execute
end
end
end
......@@ -3,6 +3,7 @@
module Clusters
module Agents
class ActivityEvent < ApplicationRecord
include EachBatch
include NullifyIfBlank
self.table_name = 'agent_activity_events'
......@@ -12,6 +13,7 @@ module Clusters
belongs_to :agent_token, class_name: 'Clusters::AgentToken'
scope :in_timeline_order, -> { order(recorded_at: :desc, id: :desc) }
scope :recorded_before, -> (cutoff) { where('recorded_at < ?', cutoff) }
validates :recorded_at, :kind, :level, presence: true
......
......@@ -30,13 +30,14 @@ module Clusters
end
def log_activity_event!(token)
token.agent.activity_events.create!(
Clusters::Agents::CreateActivityEventService.new(
token.agent,
kind: :token_created,
level: :info,
recorded_at: token.created_at,
user: current_user,
agent_token: token
)
).execute
end
end
end
......
# frozen_string_literal: true
module Clusters
module Agents
class CreateActivityEventService
def initialize(agent, **params)
@agent = agent
@params = params
end
def execute
agent.activity_events.create!(params)
DeleteExpiredEventsWorker.perform_at(schedule_cleanup_at, agent.id)
ServiceResponse.success
end
private
attr_reader :agent, :params
def schedule_cleanup_at
1.hour.from_now.change(min: agent.id % 60)
end
end
end
end
# frozen_string_literal: true
module Clusters
module Agents
class DeleteExpiredEventsService
def initialize(agent)
@agent = agent
end
def execute
agent.activity_events
.recorded_before(remove_events_before)
.each_batch { |batch| batch.delete_all }
end
private
attr_reader :agent
def remove_events_before
agent.activity_event_deletion_cutoff
end
end
end
end
......@@ -129,6 +129,15 @@
:weight: 2
:idempotent:
:tags: []
- :name: cluster_agent:clusters_agents_delete_expired_events
:worker_name: Clusters::Agents::DeleteExpiredEventsWorker
:feature_category: :kubernetes_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: container_repository:cleanup_container_repository
:worker_name: CleanupContainerRepositoryWorker
:feature_category: :container_registry
......
# frozen_string_literal: true
module Clusters
module Agents
class DeleteExpiredEventsWorker
include ApplicationWorker
include ClusterAgentQueue
deduplicate :until_executed, including_scheduled: true
idempotent!
data_consistency :always
def perform(agent_id)
if agent = Clusters::Agent.find_by_id(agent_id)
Clusters::Agents::DeleteExpiredEventsService.new(agent).execute
end
end
end
end
end
# frozen_string_literal: true
module ClusterAgentQueue
extend ActiveSupport::Concern
included do
queue_namespace :cluster_agent
feature_category :kubernetes_management
end
end
......@@ -77,6 +77,8 @@
- 1
- - ci_upstream_projects_subscriptions_cleanup
- 1
- - cluster_agent
- 1
- - container_repository
- 1
- - create_commit_signature
......
......@@ -116,4 +116,19 @@ RSpec.describe Clusters::Agent do
it { is_expected.to be_truthy }
end
end
describe '#activity_event_deletion_cutoff' do
let_it_be(:agent) { create(:cluster_agent) }
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: 1.hour.ago) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: 2.hours.ago) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: 3.hours.ago) }
subject { agent.activity_event_deletion_cutoff }
before do
stub_const("#{described_class}::ACTIVITY_EVENT_LIMIT", 2)
end
it { is_expected.to be_like_time(event2.recorded_at) }
end
end
......@@ -16,11 +16,10 @@ RSpec.describe Clusters::Agents::ActivityEvent do
let_it_be(:agent) { create(:cluster_agent) }
describe '.in_timeline_order' do
let(:recorded_at) { 1.hour.ago }
let!(:event1) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
let!(:event2) { create(:agent_activity_event, agent: agent, recorded_at: Time.current) }
let!(:event3) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
let_it_be(:recorded_at) { 1.hour.ago }
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: Time.current) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
subject { described_class.in_timeline_order }
......@@ -28,5 +27,19 @@ RSpec.describe Clusters::Agents::ActivityEvent do
is_expected.to eq([event2, event3, event1])
end
end
describe '.recorded_before' do
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: 1.hour.ago) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: 2.hours.ago) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: 3.hours.ago) }
let(:cutoff) { event2.recorded_at }
subject { described_class.recorded_before(cutoff) }
it 'returns only events recorded before the cutoff' do
is_expected.to contain_exactly(event3)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Clusters::Agents::CreateActivityEventService do
let_it_be(:agent) { create(:cluster_agent) }
let_it_be(:token) { create(:cluster_agent_token, agent: agent) }
let_it_be(:user) { create(:user) }
describe '#execute' do
let(:params) do
{
kind: :token_created,
level: :info,
recorded_at: token.created_at,
user: user,
agent_token: token
}
end
subject { described_class.new(agent, **params).execute }
it 'creates an activity event record' do
expect { subject }.to change(agent.activity_events, :count).from(0).to(1)
event = agent.activity_events.last
expect(event).to have_attributes(
kind: 'token_created',
level: 'info',
recorded_at: token.reload.created_at,
user: user,
agent_token_id: token.id
)
end
it 'schedules the cleanup worker' do
expect(Clusters::Agents::DeleteExpiredEventsWorker).to receive(:perform_at)
.with(1.hour.from_now.change(min: agent.id % 60), agent.id)
subject
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Clusters::Agents::DeleteExpiredEventsService do
let_it_be(:agent) { create(:cluster_agent) }
describe '#execute' do
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: 1.hour.ago) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: 2.hours.ago) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: 3.hours.ago) }
let_it_be(:event4) { create(:agent_activity_event, agent: agent, recorded_at: 4.hours.ago) }
let_it_be(:event5) { create(:agent_activity_event, agent: agent, recorded_at: 5.hours.ago) }
let(:deletion_cutoff) { 1.day.ago }
subject { described_class.new(agent).execute }
before do
allow(agent).to receive(:activity_event_deletion_cutoff).and_return(deletion_cutoff)
end
it 'does not delete events if the limit has not been reached' do
expect { subject }.not_to change(agent.activity_events, :count)
end
context 'there are more events than the limit' do
let(:deletion_cutoff) { event3.recorded_at }
it 'removes events to remain at the limit, keeping the most recent' do
expect { subject }.to change(agent.activity_events, :count).from(5).to(3)
expect(agent.activity_events).to contain_exactly(event1, event2, event3)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Clusters::Agents::DeleteExpiredEventsWorker do
let(:agent) { create(:cluster_agent) }
describe '#perform' do
let(:agent_id) { agent.id }
let(:deletion_service) { double(execute: true) }
subject { described_class.new.perform(agent_id) }
it 'calls the deletion service' do
expect(deletion_service).to receive(:execute).once
expect(Clusters::Agents::DeleteExpiredEventsService).to receive(:new)
.with(agent).and_return(deletion_service)
subject
end
context 'agent no longer exists' do
let(:agent_id) { -1 }
it 'completes without raising an error' do
expect { subject }.not_to raise_error
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ClusterAgentQueue do
let(:worker) do
Class.new do
def self.name
'ExampleWorker'
end
include ApplicationWorker
include ClusterAgentQueue
end
end
it { expect(worker.queue).to eq('cluster_agent:example') }
it { expect(worker.get_feature_category).to eq(:kubernetes_management) }
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment