Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
T
typon-concurrency
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Xavier Thompson
typon-concurrency
Commits
440c6b05
Commit
440c6b05
authored
Jun 29, 2022
by
Xavier Thompson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Rename Deque into Stack
parent
7522c7e3
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
72 additions
and
72 deletions
+72
-72
rt/include/typon/core/promise.hpp
rt/include/typon/core/promise.hpp
+17
-17
rt/include/typon/core/scheduler.hpp
rt/include/typon/core/scheduler.hpp
+9
-9
rt/include/typon/core/stack.hpp
rt/include/typon/core/stack.hpp
+7
-7
rt/include/typon/core/worker.hpp
rt/include/typon/core/worker.hpp
+39
-39
No files found.
rt/include/typon/core/promise.hpp
View file @
440c6b05
...
...
@@ -8,7 +8,7 @@
#include <typon/fundamental/scope.hpp>
#include <typon/core/
deque
.hpp>
#include <typon/core/
stack
.hpp>
#include <typon/core/scheduler.hpp>
...
...
@@ -45,7 +45,7 @@ namespace typon
auto
state
=
_state
.
exchange
(
ready
,
std
::
memory_order_acq_rel
);
if
(
state
!=
no_waiter
)
{
Scheduler
::
enable
(
reinterpret_cast
<
Deque
*>
(
state
));
Scheduler
::
enable
(
reinterpret_cast
<
Stack
*>
(
state
));
}
}
...
...
@@ -56,11 +56,11 @@ namespace typon
void
await_suspend
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
{
auto
deque
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
deque
);
auto
stack
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
stack
);
if
(
_state
.
exchange
(
state
,
std
::
memory_order_acq_rel
)
==
ready
)
{
Scheduler
::
enable
(
deque
);
Scheduler
::
enable
(
stack
);
}
}
...
...
@@ -92,7 +92,7 @@ namespace typon
auto
state
=
_state
.
exchange
(
ready
,
std
::
memory_order_acq_rel
);
if
(
state
!=
no_waiter
)
{
Scheduler
::
enable
(
reinterpret_cast
<
Deque
*>
(
state
));
Scheduler
::
enable
(
reinterpret_cast
<
Stack
*>
(
state
));
}
}
...
...
@@ -103,11 +103,11 @@ namespace typon
void
await_suspend
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
{
auto
deque
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
deque
);
auto
stack
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
stack
);
if
(
_state
.
exchange
(
state
,
std
::
memory_order_acq_rel
)
==
ready
)
{
Scheduler
::
enable
(
deque
);
Scheduler
::
enable
(
stack
);
}
}
...
...
@@ -133,7 +133,7 @@ namespace typon
auto
state
=
_state
.
exchange
(
ready
,
std
::
memory_order_acq_rel
);
if
(
state
!=
no_waiter
)
{
Scheduler
::
enable
(
reinterpret_cast
<
Deque
*>
(
state
));
Scheduler
::
enable
(
reinterpret_cast
<
Stack
*>
(
state
));
}
}
...
...
@@ -144,11 +144,11 @@ namespace typon
void
await_suspend
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
{
auto
deque
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
deque
);
auto
stack
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
stack
);
if
(
_state
.
exchange
(
state
,
std
::
memory_order_acq_rel
)
==
ready
)
{
Scheduler
::
enable
(
deque
);
Scheduler
::
enable
(
stack
);
}
}
...
...
@@ -172,7 +172,7 @@ namespace typon
auto
state
=
_state
.
exchange
(
ready
,
std
::
memory_order_acq_rel
);
if
(
state
!=
no_waiter
)
{
Scheduler
::
enable
(
reinterpret_cast
<
Deque
*>
(
state
));
Scheduler
::
enable
(
reinterpret_cast
<
Stack
*>
(
state
));
}
}
...
...
@@ -183,11 +183,11 @@ namespace typon
void
await_suspend
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
{
auto
deque
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
deque
);
auto
stack
=
Scheduler
::
suspend
(
coroutine
);
auto
state
=
reinterpret_cast
<
std
::
uintptr_t
>
(
stack
);
if
(
_state
.
exchange
(
state
,
std
::
memory_order_acq_rel
)
==
ready
)
{
Scheduler
::
enable
(
deque
);
Scheduler
::
enable
(
stack
);
}
}
...
...
rt/include/typon/core/scheduler.hpp
View file @
440c6b05
...
...
@@ -13,7 +13,7 @@
#include <typon/fundamental/random.hpp>
#include <typon/core/continuation.hpp>
#include <typon/core/
deque
.hpp>
#include <typon/core/
stack
.hpp>
#include <typon/core/worker.hpp>
...
...
@@ -38,7 +38,7 @@ namespace typon
static
void
schedule
(
std
::
coroutine_handle
<>
task
)
noexcept
{
uint
id
=
fdt
::
random
::
random
()
%
get
().
_concurrency
;
get
().
_worker
[
id
].
add
(
new
Deque
(
task
));
get
().
_worker
[
id
].
add
(
new
Stack
(
task
));
get
().
_stealables
.
fetch_add
(
1
);
get
().
_notifyer
.
notify_one
();
}
...
...
@@ -56,19 +56,19 @@ namespace typon
static
auto
suspend
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
{
Worker
&
worker
=
get
().
_worker
[
thread_id
];
auto
deque
=
worker
.
suspend
(
coroutine
);
auto
stack
=
worker
.
suspend
(
coroutine
);
uint
id
=
fdt
::
random
::
random
()
%
get
().
_concurrency
;
get
().
_worker
[
id
].
add
(
deque
);
return
deque
;
get
().
_worker
[
id
].
add
(
stack
);
return
stack
;
}
static
void
enable
(
Deque
*
deque
)
noexcept
static
void
enable
(
Stack
*
stack
)
noexcept
{
auto
state
=
deque
->
_state
.
exchange
(
Deque
::
Resumable
);
if
(
state
==
Deque
::
Empty
)
auto
state
=
stack
->
_state
.
exchange
(
Stack
::
Resumable
);
if
(
state
==
Stack
::
Empty
)
{
uint
id
=
fdt
::
random
::
random
()
%
get
().
_concurrency
;
get
().
_worker
[
id
].
add
(
deque
);
get
().
_worker
[
id
].
add
(
stack
);
get
().
_stealables
.
fetch_add
(
1
);
}
get
().
_notifyer
.
notify_one
();
...
...
rt/include/typon/core/
deque
.hpp
→
rt/include/typon/core/
stack
.hpp
View file @
440c6b05
#ifndef TYPON_FUNDAMENTAL_
DEQUE
_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_
DEQUE
_HPP_INCLUDED
#ifndef TYPON_FUNDAMENTAL_
STACK
_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_
STACK
_HPP_INCLUDED
#include <atomic>
#include <cstdint>
...
...
@@ -13,7 +13,7 @@
namespace
typon
{
struct
Deque
struct
Stack
{
using
ring_buffer
=
fdt
::
lock_free
::
ring_buffer
<
Continuation
>
;
using
u64
=
ring_buffer
::
u64
;
...
...
@@ -28,14 +28,14 @@ namespace typon
std
::
coroutine_handle
<>
_coroutine
;
std
::
atomic
<
State
>
_state
;
Deque
()
noexcept
{}
Stack
()
noexcept
{}
Deque
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
Stack
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
:
_coroutine
(
coroutine
)
,
_state
(
Resumable
)
{}
~
Deque
()
~
Stack
()
{
delete
_buffer
.
load
(
relaxed
);
}
...
...
@@ -139,4 +139,4 @@ namespace typon
}
#endif // TYPON_FUNDAMENTAL_
DEQUE
_HPP_INCLUDED
#endif // TYPON_FUNDAMENTAL_
STACK
_HPP_INCLUDED
rt/include/typon/core/worker.hpp
View file @
440c6b05
...
...
@@ -28,13 +28,13 @@ namespace typon
State
_state
;
union
{
Deque
*
_deque
;
Stack
*
_stack
;
Continuation
_task
;
};
Work
()
noexcept
:
_state
(
Empty
)
{}
Work
(
Deque
*
deque
)
noexcept
:
_state
(
Resumable
),
_deque
(
deque
)
{}
Work
(
Stack
*
stack
)
noexcept
:
_state
(
Resumable
),
_stack
(
stack
)
{}
Work
(
Continuation
task
)
noexcept
:
_state
(
Stolen
),
_task
(
task
)
{}
...
...
@@ -45,71 +45,71 @@ namespace typon
};
std
::
mutex
_mutex
;
std
::
atomic
<
Deque
*>
_deque
{
nullptr
};
std
::
vector
<
Deque
*>
_pool
;
std
::
atomic
<
Stack
*>
_stack
{
nullptr
};
std
::
vector
<
Stack
*>
_pool
;
std
::
atomic_uint_fast64_t
*
_stealables
;
fdt
::
lock_free
::
garbage_collector
*
_gc
;
~
Worker
()
{
for
(
auto
&
deque
:
_pool
)
for
(
auto
&
stack
:
_pool
)
{
delete
deque
;
delete
stack
;
}
if
(
auto
deque
=
_deque
.
load
())
if
(
auto
stack
=
_stack
.
load
())
{
delete
deque
;
delete
stack
;
}
}
void
add
(
Deque
*
deque
)
noexcept
void
add
(
Stack
*
stack
)
noexcept
{
std
::
lock_guard
lock
(
_mutex
);
_pool
.
push_back
(
deque
);
_pool
.
push_back
(
stack
);
}
bool
try_add
(
Deque
*
deque
)
noexcept
bool
try_add
(
Stack
*
stack
)
noexcept
{
if
(
!
_mutex
.
try_lock
())
{
return
false
;
}
std
::
lock_guard
lock
(
_mutex
,
std
::
adopt_lock
);
_pool
.
push_back
(
deque
);
_pool
.
push_back
(
stack
);
return
true
;
}
auto
suspend
(
std
::
coroutine_handle
<>
coroutine
)
noexcept
{
auto
deque
=
_deque
.
load
();
_
deque
.
store
(
nullptr
);
deque
->
suspend
(
coroutine
);
return
deque
;
auto
stack
=
_stack
.
load
();
_
stack
.
store
(
nullptr
);
stack
->
suspend
(
coroutine
);
return
stack
;
}
void
resume
(
Work
&
work
)
noexcept
{
if
(
work
.
_state
==
Work
::
Resumable
)
{
auto
deque
=
_deque
.
load
();
_
deque
.
store
(
work
.
_deque
);
if
(
deque
)
auto
stack
=
_stack
.
load
();
_
stack
.
store
(
work
.
_stack
);
if
(
stack
)
{
_gc
->
retire
(
deque
);
_gc
->
retire
(
stack
);
}
_stealables
->
fetch_add
(
1
);
work
.
_
deque
->
resume
();
work
.
_
stack
->
resume
();
}
else
{
if
(
!
_
deque
.
load
())
if
(
!
_
stack
.
load
())
{
_
deque
.
store
(
new
Deque
());
_
stack
.
store
(
new
Stack
());
}
_stealables
->
fetch_add
(
1
);
work
.
_task
.
resume
();
}
if
(
_
deque
.
load
())
if
(
_
stack
.
load
())
{
_stealables
->
fetch_sub
(
1
);
}
...
...
@@ -117,14 +117,14 @@ namespace typon
void
push
(
Continuation
task
)
noexcept
{
_
deque
.
load
()
->
push
(
task
);
_
stack
.
load
()
->
push
(
task
);
}
bool
pop
()
noexcept
{
Deque
*
deque
=
_deque
.
load
();
bool
result
=
deque
->
pop
();
if
(
auto
garbage
=
deque
->
reclaim
())
Stack
*
stack
=
_stack
.
load
();
bool
result
=
stack
->
pop
();
if
(
auto
garbage
=
stack
->
reclaim
())
{
_gc
->
retire
(
garbage
);
}
...
...
@@ -138,8 +138,8 @@ namespace typon
return
{};
}
std
::
lock_guard
lock
(
_mutex
,
std
::
adopt_lock
);
auto
deque
=
_deque
.
load
();
auto
total
=
_pool
.
size
()
+
bool
(
deque
);
auto
stack
=
_stack
.
load
();
auto
total
=
_pool
.
size
()
+
bool
(
stack
);
if
(
total
==
0
)
{
return
{};
...
...
@@ -147,25 +147,25 @@ namespace typon
auto
index
=
fdt
::
random
::
random64
()
%
total
;
if
(
index
==
_pool
.
size
())
{
if
(
auto
task
=
deque
->
steal
())
if
(
auto
task
=
stack
->
steal
())
{
task
.
thefts
()
++
;
return
task
;
}
return
{};
}
deque
=
_pool
[
index
];
if
(
deque
->
_state
.
load
()
==
Deque
::
Resumable
)
stack
=
_pool
[
index
];
if
(
stack
->
_state
.
load
()
==
Stack
::
Resumable
)
{
if
(
index
<
_pool
.
size
()
-
1
)
{
_pool
[
index
]
=
_pool
.
back
();
}
_pool
.
pop_back
();
return
deque
;
return
stack
;
}
auto
task
=
deque
->
pop_top
();
if
(
auto
garbage
=
deque
->
reclaim
())
auto
task
=
stack
->
pop_top
();
if
(
auto
garbage
=
stack
->
reclaim
())
{
delete
garbage
;
}
...
...
@@ -179,10 +179,10 @@ namespace typon
_pool
[
index
]
=
_pool
.
back
();
}
_pool
.
pop_back
();
Deque
::
State
expected
=
Deque
::
Suspended
;
if
(
!
deque
->
_state
.
compare_exchange_strong
(
expected
,
Deque
::
Empty
))
Stack
::
State
expected
=
Stack
::
Suspended
;
if
(
!
stack
->
_state
.
compare_exchange_strong
(
expected
,
Stack
::
Empty
))
{
return
deque
;
return
stack
;
}
_stealables
->
fetch_sub
(
1
);
return
{};
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment