Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gitlab-ce
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
nexedi
gitlab-ce
Commits
1751cab4
Commit
1751cab4
authored
Feb 26, 2018
by
Douwe Maan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Extract WaitableWorker out of AuthorizedProjectsWorker
parent
58a312f5
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
145 additions
and
114 deletions
+145
-114
app/workers/authorized_projects_worker.rb
app/workers/authorized_projects_worker.rb
+2
-34
app/workers/concerns/waitable_worker.rb
app/workers/concerns/waitable_worker.rb
+44
-0
lib/gitlab/job_waiter.rb
lib/gitlab/job_waiter.rb
+7
-1
spec/workers/authorized_projects_worker_spec.rb
spec/workers/authorized_projects_worker_spec.rb
+0
-79
spec/workers/concerns/waitable_worker_spec.rb
spec/workers/concerns/waitable_worker_spec.rb
+92
-0
No files found.
app/workers/authorized_projects_worker.rb
View file @
1751cab4
class
AuthorizedProjectsWorker
include
ApplicationWorker
prepend
WaitableWorker
# Schedules multiple jobs and waits for them to be completed.
def
self
.
bulk_perform_and_wait
(
args_list
)
# Short-circuit: it's more efficient to do small numbers of jobs inline
return
bulk_perform_inline
(
args_list
)
if
args_list
.
size
<=
3
waiter
=
Gitlab
::
JobWaiter
.
new
(
args_list
.
size
)
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
# into [[1, "key"], [2, "key"], [3, "key"]]
waiting_args_list
=
args_list
.
map
{
|
args
|
[
*
args
,
waiter
.
key
]
}
bulk_perform_async
(
waiting_args_list
)
waiter
.
wait
end
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries
def
self
.
bulk_perform_inline
(
args_list
)
failed
=
[]
args_list
.
each
do
|
args
|
begin
new
.
perform
(
*
args
)
rescue
failed
<<
args
end
end
bulk_perform_async
(
failed
)
if
failed
.
present?
end
def
perform
(
user_id
,
notify_key
=
nil
)
def
perform
(
user_id
)
user
=
User
.
find_by
(
id:
user_id
)
user
&
.
refresh_authorized_projects
ensure
Gitlab
::
JobWaiter
.
notify
(
notify_key
,
jid
)
if
notify_key
end
end
app/workers/concerns/waitable_worker.rb
0 → 100644
View file @
1751cab4
module
WaitableWorker
extend
ActiveSupport
::
Concern
module
ClassMethods
# Schedules multiple jobs and waits for them to be completed.
def
bulk_perform_and_wait
(
args_list
)
# Short-circuit: it's more efficient to do small numbers of jobs inline
return
bulk_perform_inline
(
args_list
)
if
args_list
.
size
<=
3
waiter
=
Gitlab
::
JobWaiter
.
new
(
args_list
.
size
)
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
# into [[1, "key"], [2, "key"], [3, "key"]]
waiting_args_list
=
args_list
.
map
{
|
args
|
[
*
args
,
waiter
.
key
]
}
bulk_perform_async
(
waiting_args_list
)
waiter
.
wait
end
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries
def
bulk_perform_inline
(
args_list
)
failed
=
[]
args_list
.
each
do
|
args
|
begin
new
.
perform
(
*
args
)
rescue
failed
<<
args
end
end
bulk_perform_async
(
failed
)
if
failed
.
present?
end
end
def
perform
(
*
args
)
notify_key
=
args
.
pop
if
Gitlab
::
JobWaiter
.
key?
(
args
.
last
)
super
(
*
args
)
ensure
Gitlab
::
JobWaiter
.
notify
(
notify_key
,
jid
)
if
notify_key
end
end
lib/gitlab/job_waiter.rb
View file @
1751cab4
...
...
@@ -15,16 +15,22 @@ module Gitlab
# push to that array when done. Once the waiter has popped `count` items, it
# knows all the jobs are done.
class
JobWaiter
KEY_PREFIX
=
"gitlab:job_waiter"
.
freeze
def
self
.
notify
(
key
,
jid
)
Gitlab
::
Redis
::
SharedState
.
with
{
|
redis
|
redis
.
lpush
(
key
,
jid
)
}
end
def
self
.
key?
(
key
)
key
.
is_a?
(
String
)
&&
key
=~
/\A
#{
KEY_PREFIX
}
:\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/
end
attr_reader
:key
,
:finished
attr_accessor
:jobs_remaining
# jobs_remaining - the number of jobs left to wait for
# key - The key of this waiter.
def
initialize
(
jobs_remaining
=
0
,
key
=
"
gitlab:job_waiter
:
#{
SecureRandom
.
uuid
}
"
)
def
initialize
(
jobs_remaining
=
0
,
key
=
"
#{
KEY_PREFIX
}
:
#{
SecureRandom
.
uuid
}
"
)
@key
=
key
@jobs_remaining
=
jobs_remaining
@finished
=
[]
...
...
spec/workers/authorized_projects_worker_spec.rb
View file @
1751cab4
require
'spec_helper'
describe
AuthorizedProjectsWorker
do
let
(
:project
)
{
create
(
:project
)
}
def
build_args_list
(
*
ids
,
multiply:
1
)
args_list
=
ids
.
map
{
|
id
|
[
id
]
}
args_list
*
multiply
end
describe
'.bulk_perform_and_wait'
do
it
'schedules the ids and waits for the jobs to complete'
do
args_list
=
build_args_list
(
project
.
owner
.
id
)
project
.
owner
.
project_authorizations
.
delete_all
described_class
.
bulk_perform_and_wait
(
args_list
)
expect
(
project
.
owner
.
project_authorizations
.
count
).
to
eq
(
1
)
end
it
'inlines workloads <= 3 jobs'
do
args_list
=
build_args_list
(
project
.
owner
.
id
,
multiply:
3
)
expect
(
described_class
).
to
receive
(
:bulk_perform_inline
).
with
(
args_list
)
described_class
.
bulk_perform_and_wait
(
args_list
)
end
it
'runs > 3 jobs using sidekiq'
do
project
.
owner
.
project_authorizations
.
delete_all
expect
(
described_class
).
to
receive
(
:bulk_perform_async
).
and_call_original
args_list
=
build_args_list
(
project
.
owner
.
id
,
multiply:
4
)
described_class
.
bulk_perform_and_wait
(
args_list
)
expect
(
project
.
owner
.
project_authorizations
.
count
).
to
eq
(
1
)
end
end
describe
'.bulk_perform_inline'
do
it
'refreshes the authorizations inline'
do
project
.
owner
.
project_authorizations
.
delete_all
expect_any_instance_of
(
described_class
).
to
receive
(
:perform
).
and_call_original
described_class
.
bulk_perform_inline
(
build_args_list
(
project
.
owner
.
id
))
expect
(
project
.
owner
.
project_authorizations
.
count
).
to
eq
(
1
)
end
it
'enqueues jobs if an error is raised'
do
invalid_id
=
-
1
args_list
=
build_args_list
(
project
.
owner
.
id
,
invalid_id
)
allow_any_instance_of
(
described_class
).
to
receive
(
:perform
).
with
(
project
.
owner
.
id
)
allow_any_instance_of
(
described_class
).
to
receive
(
:perform
).
with
(
invalid_id
).
and_raise
(
ArgumentError
)
expect
(
described_class
).
to
receive
(
:bulk_perform_async
).
with
(
build_args_list
(
invalid_id
))
described_class
.
bulk_perform_inline
(
args_list
)
end
end
describe
'.bulk_perform_async'
do
it
"uses it's respective sidekiq queue"
do
args_list
=
build_args_list
(
project
.
owner
.
id
)
push_bulk_args
=
{
'class'
=>
described_class
,
'args'
=>
args_list
}
expect
(
Sidekiq
::
Client
).
to
receive
(
:push_bulk
).
with
(
push_bulk_args
).
once
described_class
.
bulk_perform_async
(
args_list
)
end
end
describe
'#perform'
do
let
(
:user
)
{
create
(
:user
)
}
...
...
@@ -85,12 +12,6 @@ describe AuthorizedProjectsWorker do
job
.
perform
(
user
.
id
)
end
it
'notifies the JobWaiter when done if the key is provided'
do
expect
(
Gitlab
::
JobWaiter
).
to
receive
(
:notify
).
with
(
'notify-key'
,
job
.
jid
)
job
.
perform
(
user
.
id
,
'notify-key'
)
end
context
"when the user is not found"
do
it
"does nothing"
do
expect_any_instance_of
(
User
).
not_to
receive
(
:refresh_authorized_projects
)
...
...
spec/workers/concerns/waitable_worker_spec.rb
0 → 100644
View file @
1751cab4
require
'spec_helper'
describe
WaitableWorker
do
let
(
:worker
)
do
Class
.
new
do
def
self
.
name
'Gitlab::Foo::Bar::DummyWorker'
end
class
<<
self
cattr_accessor
(
:counter
)
{
0
}
end
include
ApplicationWorker
prepend
WaitableWorker
def
perform
(
i
=
0
)
self
.
class
.
counter
+=
i
end
end
end
subject
(
:job
)
{
worker
.
new
}
describe
'.bulk_perform_and_wait'
do
it
'schedules the jobs and waits for them to complete'
do
worker
.
bulk_perform_and_wait
([[
1
],
[
2
]])
expect
(
worker
.
counter
).
to
eq
(
3
)
end
it
'inlines workloads <= 3 jobs'
do
args_list
=
[[
1
],
[
2
],
[
3
]]
expect
(
worker
).
to
receive
(
:bulk_perform_inline
).
with
(
args_list
).
and_call_original
worker
.
bulk_perform_and_wait
(
args_list
)
expect
(
worker
.
counter
).
to
eq
(
6
)
end
it
'runs > 3 jobs using sidekiq'
do
expect
(
worker
).
to
receive
(
:bulk_perform_async
)
worker
.
bulk_perform_and_wait
([[
1
],
[
2
],
[
3
],
[
4
]])
end
end
describe
'.bulk_perform_inline'
do
it
'runs the jobs inline'
do
expect
(
worker
).
not_to
receive
(
:bulk_perform_async
)
worker
.
bulk_perform_inline
([[
1
],
[
2
]])
expect
(
worker
.
counter
).
to
eq
(
3
)
end
it
'enqueues jobs if an error is raised'
do
expect
(
worker
).
to
receive
(
:bulk_perform_async
).
with
([[
'foo'
]])
worker
.
bulk_perform_inline
([[
1
],
[
'foo'
]])
end
end
describe
'#perform'
do
shared_examples
'perform'
do
it
'notifies the JobWaiter when done if the key is provided'
do
key
=
Gitlab
::
JobWaiter
.
new
.
key
expect
(
Gitlab
::
JobWaiter
).
to
receive
(
:notify
).
with
(
key
,
job
.
jid
)
job
.
perform
(
*
args
,
key
)
end
it
'does not notify the JobWaiter when done if no key is provided'
do
expect
(
Gitlab
::
JobWaiter
).
not_to
receive
(
:notify
)
job
.
perform
(
*
args
)
end
end
context
'when the worker takes arguments'
do
let
(
:args
)
{
[
1
]
}
it_behaves_like
'perform'
end
context
'when the worker takes no arguments'
do
let
(
:args
)
{
[]
}
it_behaves_like
'perform'
end
end
end
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment