Commit 1679c570 authored by Gabriel Mazetto's avatar Gabriel Mazetto

GeoLogCursor processing repository_update events for repos and wiki.

parent 8ef83018
class Geo::EventLogState < Geo::BaseRegistry
self.primary_key = :event_id
def self.last_processed
order(event_id: :desc).first
end
end
#!/usr/bin/env ruby
# vim: ft=ruby
require 'rubygems'
require 'bundler/setup'
# loads rails environment / initializers
require "#{File.dirname(__FILE__)}/../config/environment"
require 'optparse'
class GeoLogCursorOptionParser
def self.parse(argv)
options = { full_scan: false }
version = Gitlab::Geo::LogCursor::Daemon::VERSION
op = OptionParser.new
op.banner = 'GitLab Geo: Log Cursor'
op.separator ''
op.separator 'Usage: ./geo_log_cursor [options]'
op.separator ''
op.on('-f', '--full-scan', 'Performs full-scan to lookup for un-replicated data') { options[:full_scan] = true }
op.separator 'Common options:'
op.on('-h', '--help') do
puts op.to_s
exit
end
op.on('-v', '--version') do
puts version
exit
end
op.separator ''
op.parse!(argv)
options
end
end
if $0 == __FILE__
options = GeoLogCursorOptionParser.parse(ARGV)
Gitlab::Geo::LogCursor::Daemon.new(options).run!
end
---
title: GeoLogCursor is part of a new experimental Geo replication system
merge_request: 1988
author:
class CreateEventLogState < ActiveRecord::Migration
def change
create_table :event_log_states, id: false do |t|
t.primary_key :event_id, :bigserial
end
end
end
class AddNeedsResyncToProjectRegistry < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_column_with_default(:project_registry, :resync_repository, :boolean, default: true)
add_column_with_default(:project_registry, :resync_wiki, :boolean, default: true)
end
def down
remove_columns :project_registry, :resync_repository, :resync_wiki
end
end
...@@ -11,11 +11,14 @@ ...@@ -11,11 +11,14 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20170526214010) do ActiveRecord::Schema.define(version: 20170606155045) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
create_table "event_log_states", primary_key: "event_id", force: :cascade do |t|
end
create_table "file_registry", force: :cascade do |t| create_table "file_registry", force: :cascade do |t|
t.string "file_type", null: false t.string "file_type", null: false
t.integer "file_id", null: false t.integer "file_id", null: false
...@@ -32,7 +35,10 @@ ActiveRecord::Schema.define(version: 20170526214010) do ...@@ -32,7 +35,10 @@ ActiveRecord::Schema.define(version: 20170526214010) do
t.datetime "last_repository_synced_at" t.datetime "last_repository_synced_at"
t.datetime "last_repository_successful_sync_at" t.datetime "last_repository_successful_sync_at"
t.datetime "created_at", null: false t.datetime "created_at", null: false
t.boolean "resync_repository", default: true, null: false
t.boolean "resync_wiki", default: true, null: false
end end
add_index "project_registry", ["project_id"], name: "index_project_registry_on_project_id", using: :btree add_index "project_registry", ["project_id"], name: "index_project_registry_on_project_id", using: :btree
end end
module Gitlab
module Geo
module LogCursor
class Daemon
VERSION = '0.1.0'.freeze
POOL_WAIT = 5.seconds.freeze
BATCH_SIZE = 250
attr_reader :options
def initialize(options = {})
@options = options
@exit = false
end
def run!
trap_signals
full_scan! if options[:full_scan]
until exit?
Events.fetch_in_batches do |batch|
handle_events(batch)
end
return if exit?
# When no new event is found sleep for a few moments
sleep(POOL_WAIT)
end
end
# Execute routines to verify the required initial data is available
# and mark non-replicated data as requiring replication.
def full_scan!
# This is slow and can be improved in the future by using PostgreSQL FDW
# so we can query with a LEFT JOIN and have a list of
# Projects without corresponding ProjectRegistry in the DR database
# See: https://robots.thoughtbot.com/postgres-foreign-data-wrapper (requires PG 9.6)
$stdout.print 'Searching for non replicated projects...'
Project.select(:id).find_in_batches(batch_size: BATCH_SIZE) do |batch|
$stdout.print '.'
project_ids = batch.map(&:id)
existing = ::Geo::ProjectRegistry.where(project_id: project_ids).pluck(:project_id)
missing_projects = project_ids - existing
Rails.logger.debug("Missing projects: #{missing_projects}")
missing_projects.each do |id|
::Geo::ProjectRegistry.create(project_id: id)
end
end
$stdout.puts 'Done!'
puts
end
def handle_events(batch)
batch.each do |event|
# Update repository
if event.repository_updated_event
handle_repository_update(event.repository_updated_event)
end
end
end
private
def trap_signals
trap(:TERM) do
quit!
end
trap(:INT) do
quit!
end
end
# Safe shutdown
def quit!
$stdout.puts 'Exiting...'
@exit = true
end
def handle_repository_update(updated_event)
registry = ::Geo::ProjectRegistry.find_or_create_by(project_id: project_id)
case updated_event.source
when 'repository'
registry.resync_repository = true
when 'wiki'
registry.resync_wiki = true
end
registry.save!
end
def exit?
@exit
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
# Manages events from primary database and store state in the DR database
class Events
BATCH_SIZE = 50
NAMESPACE = 'geo:gitlab'.freeze
# fetches up to BATCH_SIZE next events and keep track of batches
def self.fetch_in_batches
::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: BATCH_SIZE) do |batch|
yield batch
save_processed(batch.last.id)
end
end
# saves last replicated event
def self.save_processed(event_id)
::Geo::EventLogState.create!(event_id: event_id)
::Geo::EventLogState.where('event_id < ?', event_id).delete_all
end
# @return [Integer] id of last replicated event
def self.last_processed
last = ::Geo::EventLogState.last_processed.try(:id)
return last if last
::Geo::EventLog.any? ? ::Geo::EventLog.last.id : -1
end
end
end
end
end
require 'spec_helper'
load File.expand_path('../../bin/geo_log_cursor', __dir__)
describe 'scripts/geo_log_cursor' do
describe GeoLogCursorOptionParser do
it 'parses -f and --full-scan' do
%w[-f --full-scan].each do |flag|
options = described_class.parse(%W[foo #{flag} bar])
expect(options[:full_scan]).to eq true
end
end
end
end
FactoryGirl.define do
factory :geo_event_log, class: Geo::EventLog do
repository_updated_event factory: :geo_repository_update_event
end
factory :geo_repository_update_event, class: Geo::RepositoryUpdatedEvent do
source 0
branches_affected 0
tags_affected 0
project factory: :empty_project
end
end
FactoryGirl.define do
factory :geo_event_log_state, class: Geo::EventLogState do
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Daemon, lib: true do
describe '#run!' do
before do
allow(subject).to receive(:exit?) { true }
end
it 'traps signals' do
allow(subject).to receive(:exit?) { true }
expect(subject).to receive(:trap_signals)
subject.run!
end
context 'when the command-line defines full_scan: true' do
subject { described_class.new(full_scan: true) }
it 'executes a full-scan' do
expect(subject).to receive(:full_scan!)
subject.run!
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events, lib: true do
describe '.fetch_in_batches' do
let!(:event_log) { create(:geo_event_log) }
before do
allow(described_class).to receive(:last_processed) { -1 }
end
it 'yields a group of events' do
expect { |b| described_class.fetch_in_batches(&b) }.to yield_with_args([event_log])
end
it 'saves processed files after yielding' do
expect(described_class).to receive(:save_processed)
described_class.fetch_in_batches { |batch| batch }
end
end
describe '.save_processed' do
it 'saves a new entry in geo_event_log_state' do
expect { described_class.save_processed(1) }.to change(Geo::EventLogState, :count).by(1)
expect(Geo::EventLogState.last.event_id).to eq(1)
end
it 'removes older entries from geo_event_log_state' do
create(:geo_event_log_state)
expect { described_class.save_processed(2) }.to change(Geo::EventLogState, :count).by(0)
expect(Geo::EventLogState.last.event_id).to eq(2)
end
end
describe '.last_processed' do
context 'when system has not generated any event yet' do
it 'returns -1' do
expect(described_class.last_processed).to eq(-1)
end
end
context 'when there are existing events already but no event_log_state' do
let!(:event_log) { create(:geo_event_log) }
it 'returns last event id' do
expect(described_class.last_processed).to eq(event_log.id)
end
end
context 'when there is already an event_log_state' do
let!(:event_log_state) { create(:geo_event_log_state) }
it 'returns last event from event_log_state' do
expect(described_class.last_processed).to eq(event_log_state.id)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment