1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
require 'securerandom'
module Geo
# The clone_url_prefix is used to build URLs for the Geo synchronization
# If this is missing from the primary node we raise this exception
EmptyCloneUrlPrefixError = Class.new(StandardError)
class BaseSyncService
include ExclusiveLeaseGuard
include ::Gitlab::Geo::ProjectLogHelpers
include ::Gitlab::ShellAdapter
include Delay
class << self
attr_accessor :type
end
attr_reader :project
LEASE_TIMEOUT = 8.hours.freeze
LEASE_KEY_PREFIX = 'geo_sync_service'.freeze
RETRY_BEFORE_REDOWNLOAD = 5
RETRY_LIMIT = 8
def initialize(project)
@project = project
end
def execute
try_obtain_lease do
log_info("Started #{type} sync")
if should_be_retried?
sync_repository
elsif should_be_redownloaded?
sync_repository(true)
else
# Clean up the state of sync to start a new cycle
registry.delete
log_info("Clean up #{type} sync status")
return
end
log_info("Finished #{type} sync")
end
end
def lease_key
@lease_key ||= "#{LEASE_KEY_PREFIX}:#{type}:#{project.id}"
end
def lease_timeout
LEASE_TIMEOUT
end
def primary_ssh_path_prefix
@primary_ssh_path_prefix ||= Gitlab::Geo.primary_node.clone_url_prefix.tap do |prefix|
raise EmptyCloneUrlPrefixError, 'Missing clone_url_prefix in the primary node' unless prefix.present?
end
end
def primary_http_path_prefix
@primary_http_path_prefix ||= Gitlab::Geo.primary_node.url
end
private
def retry_count
registry.public_send("#{type}_retry_count") || 0 # rubocop:disable GitlabSecurity/PublicSend
end
def should_be_retried?
return false if registry.public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend
retry_count <= RETRY_BEFORE_REDOWNLOAD
end
def should_be_redownloaded?
return true if registry.public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend
(RETRY_BEFORE_REDOWNLOAD..RETRY_LIMIT) === retry_count
end
def sync_repository
raise NotImplementedError, 'This class should implement sync_repository method'
end
def current_node
::Gitlab::Geo.current_node
end
def fetch_geo_mirror(repository)
case current_node&.clone_protocol
when 'http'
fetch_http_geo_mirror(repository)
when 'ssh'
fetch_ssh_geo_mirror(repository)
else
raise "Unknown clone protocol: #{current_node&.clone_protocol}"
end
end
def build_repository_url(prefix, repository)
url = prefix
url += '/' unless url.end_with?('/')
url + repository.full_path + '.git'
end
def fetch_http_geo_mirror(repository)
url = build_repository_url(primary_http_path_prefix, repository)
# Fetch the repository, using a JWT header for authentication
authorization = ::Gitlab::Geo::BaseRequest.new.authorization
header = { "http.#{url}.extraHeader" => "Authorization: #{authorization}" }
repository.with_config(header) { repository.fetch_as_mirror(url, forced: true) }
end
def fetch_ssh_geo_mirror(repository)
url = build_repository_url(primary_ssh_path_prefix, repository)
repository.fetch_as_mirror(url, forced: true)
end
def registry
@registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project.id)
end
def update_registry(started_at: nil, finished_at: nil)
return unless started_at || finished_at
log_info("Updating #{type} sync information")
attrs = {}
if started_at
attrs["last_#{type}_synced_at"] = started_at
attrs["#{type}_retry_count"] = retry_count + 1
attrs["#{type}_retry_at"] = next_retry_time(attrs["#{type}_retry_count"])
end
if finished_at
attrs["last_#{type}_successful_sync_at"] = finished_at
attrs["resync_#{type}"] = false
attrs["#{type}_retry_count"] = nil
attrs["#{type}_retry_at"] = nil
end
registry.update!(attrs)
end
def type
self.class.type
end
def update_delay_in_seconds
# We don't track the last update time of repositories and Wiki
# separately in the main database
return unless project.last_repository_updated_at
(last_successful_sync_at.to_f - project.last_repository_updated_at.to_f).round(3)
end
def download_time_in_seconds
(last_successful_sync_at.to_f - last_synced_at.to_f).round(3)
end
def last_successful_sync_at
registry.public_send("last_#{type}_successful_sync_at") # rubocop:disable GitlabSecurity/PublicSend
end
def last_synced_at
registry.public_send("last_#{type}_synced_at") # rubocop:disable GitlabSecurity/PublicSend
end
def random_disk_path(prefix)
random_string = SecureRandom.hex(7)
"#{repository.disk_path}_#{prefix}#{random_string}"
end
def disk_path_temp
@disk_path_temp ||= random_disk_path('')
end
def deleted_disk_path_temp
@deleted_path ||= "#{repository.disk_path}+failed-geo-sync"
end
def build_temporary_repository
unless gitlab_shell.add_repository(project.repository_storage, disk_path_temp)
raise Gitlab::Shell::Error, 'Can not create a temporary repository'
end
repository.clone.tap { |repo| repo.disk_path = disk_path_temp }
end
def clean_up_temporary_repository
gitlab_shell.remove_repository(project.repository_storage_path, disk_path_temp)
end
def set_temp_repository_as_main
log_info(
"Setting newly downloaded repository as main",
storage_path: project.repository_storage_path,
temp_path: disk_path_temp,
deleted_disk_path_temp: deleted_disk_path_temp,
disk_path: repository.disk_path
)
# Remove the deleted path in case it exists, but it may not be there
gitlab_shell.remove_repository(project.repository_storage_path, deleted_disk_path_temp)
# Move the original repository out of the way
unless gitlab_shell.mv_repository(project.repository_storage_path, repository.disk_path, deleted_disk_path_temp)
raise Gitlab::Shell::Error, 'Can not move original repository out of the way'
end
unless gitlab_shell.mv_repository(project.repository_storage_path, disk_path_temp, repository.disk_path)
raise Gitlab::Shell::Error, 'Can not move temporary repository'
end
# Purge the original repository
unless gitlab_shell.remove_repository(project.repository_storage_path, deleted_disk_path_temp)
raise Gitlab::Shell::Error, 'Can not remove outdated main repository'
end
end
# To prevent the retry time from storing invalid dates in the database,
# cap the max time to a week plus some random jitter value.
def next_retry_time(retry_count)
proposed_time = Time.now + delay(retry_count).seconds
max_future_time = Time.now + 7.days + delay(1).seconds
[proposed_time, max_future_time].min
end
end
end