Commit ebac0e6f authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

runtime: convert async semaphores to Go

LGTM=rsc
R=golang-codereviews, rsc
CC=golang-codereviews, khr
https://golang.org/cl/126210046
parent 846db089
......@@ -337,6 +337,7 @@ selecttype(int32 size)
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("g")), typenod(ptrto(types[TUINT8]))));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("selectdone")), typenod(ptrto(types[TUINT8]))));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("link")), typenod(ptrto(types[TUINT8]))));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("prev")), typenod(ptrto(types[TUINT8]))));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("elem")), typenod(ptrto(types[TUINT8]))));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("releasetime")), typenod(types[TUINT64])));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("nrelease")), typenod(types[TINT32])));
......
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
const (
cacheLineSize = 64
)
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
const (
cacheLineSize = 64
)
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
const (
cacheLineSize = 64
)
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
const (
cacheLineSize = 32
)
......@@ -168,7 +168,7 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = (*uint8)(ep)
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
......@@ -257,13 +257,13 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
}
func (q *waitq) enqueue(sgp *sudog) {
sgp.link = nil
sgp.next = nil
if q.first == nil {
q.first = sgp
q.last = sgp
return
}
q.last.link = sgp
q.last.next = sgp
q.last = sgp
}
......@@ -273,7 +273,7 @@ func (q *waitq) dequeue() *sudog {
if sgp == nil {
return nil
}
q.first = sgp.link
q.first = sgp.next
if q.last == sgp {
q.last = nil
}
......
......@@ -1089,7 +1089,7 @@ loop:
sgp = q->first;
if(sgp == nil)
return nil;
q->first = sgp->link;
q->first = sgp->next;
if(q->last == sgp)
q->last = nil;
......@@ -1109,9 +1109,9 @@ dequeueg(WaitQ *q)
SudoG **l, *sgp, *prevsgp;
prevsgp = nil;
for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
for(l=&q->first; (sgp=*l) != nil; l=&sgp->next, prevsgp=sgp) {
if(sgp->g == g) {
*l = sgp->link;
*l = sgp->next;
if(q->last == sgp)
q->last = prevsgp;
break;
......@@ -1122,13 +1122,13 @@ dequeueg(WaitQ *q)
static void
enqueue(WaitQ *q, SudoG *sgp)
{
sgp->link = nil;
sgp->next = nil;
if(q->first == nil) {
q->first = sgp;
q->last = sgp;
return;
}
q->last->link = sgp;
q->last->next = sgp;
q->last = sgp;
}
......
......@@ -68,7 +68,7 @@ func acquireSudog() *sudog {
c := gomcache()
s := c.sudogcache
if s != nil {
c.sudogcache = s.link
c.sudogcache = s.next
return s
}
return new(sudog)
......@@ -77,6 +77,6 @@ func acquireSudog() *sudog {
//go:nosplit
func releaseSudog(s *sudog) {
c := gomcache()
s.link = c.sudogcache
s.next = c.sudogcache
c.sudogcache = s
}
......@@ -224,8 +224,9 @@ struct SudoG
{
G* g;
uint32* selectdone;
SudoG* link;
byte* elem; // data element
SudoG* next;
SudoG* prev;
void* elem; // data element
int64 releasetime;
int32 nrelease; // -1 for acquire
SudoG* waitlink; // G.waiting list
......
......@@ -21,6 +21,159 @@ package runtime
import "unsafe"
// Asynchronous semaphore for sync.Mutex.
type semaRoot struct {
lock
head *sudog
tail *sudog
nwait uint32 // Number of waiters. Read w/o the lock.
}
// Prime to not correlate with any user patterns.
const semTabSize = 251
var semtable [semTabSize]struct {
root semaRoot
pad [cacheLineSize - unsafe.Sizeof(semaRoot{})]byte
}
// Called from sync/net packages.
func asyncsemacquire(addr *uint32) {
semacquire(addr, true)
}
func asyncsemrelease(addr *uint32) {
semrelease(addr)
}
// Called from runtime.
func semacquire(addr *uint32, profile bool) {
// Easy case.
if cansemacquire(addr) {
return
}
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
if profile && blockprofilerate > 0 {
t0 = gocputicks()
s.releasetime = -1
}
for {
golock(&root.lock)
// Add ourselves to nwait to disable "easy case" in semrelease.
goxadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
goxadd(&root.nwait, ^uint32(0))
gounlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s)
goparkunlock(&root.lock, "semacquire")
if cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
goblockevent(int64(s.releasetime)-t0, 4)
}
releaseSudog(s)
}
func semrelease(addr *uint32) {
root := semroot(addr)
goxadd(addr, 1)
// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
if goatomicload(&root.nwait) == 0 {
return
}
// Harder case: search for a waiter and wake it.
golock(&root.lock)
if goatomicload(&root.nwait) == 0 {
// The count is already consumed by another goroutine,
// so no need to wake up another goroutine.
gounlock(&root.lock)
return
}
s := root.head
for ; s != nil; s = s.next {
if s.elem == unsafe.Pointer(addr) {
goxadd(&root.nwait, ^uint32(0))
root.dequeue(s)
break
}
}
gounlock(&root.lock)
if s != nil {
if s.releasetime != 0 {
// TODO: Remove use of unsafe here.
releasetimep := (*int64)(unsafe.Pointer(&s.releasetime))
*releasetimep = gocputicks()
}
goready(s.g)
}
}
func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
func cansemacquire(addr *uint32) bool {
for {
v := goatomicload(addr)
if v == 0 {
return false
}
if gocas(addr, v, v-1) {
return true
}
}
}
func (root *semaRoot) queue(addr *uint32, s *sudog) {
s.g = getg()
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = root.tail
if root.tail != nil {
root.tail.next = s
} else {
root.head = s
}
root.tail = s
}
func (root *semaRoot) dequeue(s *sudog) {
if s.next != nil {
s.next.prev = s.prev
} else {
root.tail = s.prev
}
if s.prev != nil {
s.prev.next = s.next
} else {
root.head = s.next
}
s.next = nil
s.prev = nil
}
// Synchronous semaphore for sync.Cond.
type syncSema struct {
lock lock
......@@ -37,7 +190,7 @@ func syncsemacquire(s *syncSema) {
s.head.nrelease--
if s.head.nrelease == 0 {
wake = s.head
s.head = wake.link
s.head = wake.next
if s.head == nil {
s.tail = nil
}
......@@ -51,7 +204,7 @@ func syncsemacquire(s *syncSema) {
w := acquireSudog()
w.g = getg()
w.nrelease = -1
w.link = nil
w.next = nil
w.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
......@@ -61,7 +214,7 @@ func syncsemacquire(s *syncSema) {
if s.tail == nil {
s.head = w
} else {
s.tail.link = w
s.tail.next = w
}
s.tail = w
goparkunlock(&s.lock, "semacquire")
......@@ -78,7 +231,7 @@ func syncsemrelease(s *syncSema, n uint32) {
for n > 0 && s.head != nil && s.head.nrelease < 0 {
// Have pending acquire, satisfy it.
wake := s.head
s.head = wake.link
s.head = wake.next
if s.head == nil {
s.tail = nil
}
......@@ -95,12 +248,12 @@ func syncsemrelease(s *syncSema, n uint32) {
w := acquireSudog()
w.g = getg()
w.nrelease = int32(n)
w.link = nil
w.next = nil
w.releasetime = 0
if s.tail == nil {
s.head = w
} else {
s.tail.link = w
s.tail.next = w
}
s.tail = w
goparkunlock(&s.lock, "semarelease")
......
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.
//
// See Mullender and Cox, ``Semaphores in Plan 9,''
// http://swtch.com/semaphore.pdf
package sync
#include "runtime.h"
#include "arch_GOARCH.h"
#include "../../cmd/ld/textflag.h"
typedef struct SemaWaiter SemaWaiter;
struct SemaWaiter
{
uint32 volatile* addr;
G* g;
int64 releasetime;
int32 nrelease; // -1 for acquire
SemaWaiter* prev;
SemaWaiter* next;
};
typedef struct SemaRoot SemaRoot;
struct SemaRoot
{
Lock lock;
SemaWaiter* head;
SemaWaiter* tail;
// Number of waiters. Read w/o the lock.
uint32 volatile nwait;
};
// Prime to not correlate with any user patterns.
#define SEMTABLESZ 251
struct semtable
{
SemaRoot root;
uint8 pad[CacheLineSize-sizeof(SemaRoot)];
};
#pragma dataflag NOPTR /* mark semtable as 'no pointers', hiding from garbage collector */
static struct semtable semtable[SEMTABLESZ];
static SemaRoot*
semroot(uint32 *addr)
{
return &semtable[((uintptr)addr >> 3) % SEMTABLESZ].root;
}
static void
semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s)
{
s->g = g;
s->addr = addr;
s->next = nil;
s->prev = root->tail;
if(root->tail)
root->tail->next = s;
else
root->head = s;
root->tail = s;
}
static void
semdequeue(SemaRoot *root, SemaWaiter *s)
{
if(s->next)
s->next->prev = s->prev;
else
root->tail = s->prev;
if(s->prev)
s->prev->next = s->next;
else
root->head = s->next;
s->prev = nil;
s->next = nil;
}
static int32
cansemacquire(uint32 *addr)
{
uint32 v;
while((v = runtime·atomicload(addr)) > 0)
if(runtime·cas(addr, v, v-1))
return 1;
return 0;
}
void
runtime·semacquire(uint32 volatile *addr, bool profile)
{
SemaWaiter s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
SemaRoot *root;
int64 t0;
// Easy case.
if(cansemacquire(addr))
return;
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
root = semroot(addr);
t0 = 0;
s.releasetime = 0;
if(profile && runtime·blockprofilerate > 0) {
t0 = runtime·cputicks();
s.releasetime = -1;
}
for(;;) {
runtime·lock(&root->lock);
// Add ourselves to nwait to disable "easy case" in semrelease.
runtime·xadd(&root->nwait, 1);
// Check cansemacquire to avoid missed wakeup.
if(cansemacquire(addr)) {
runtime·xadd(&root->nwait, -1);
runtime·unlock(&root->lock);
return;
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
semqueue(root, addr, &s);
runtime·parkunlock(&root->lock, runtime·gostringnocopy((byte*)"semacquire"));
if(cansemacquire(addr)) {
if(t0)
runtime·blockevent(s.releasetime - t0, 3);
return;
}
}
}
void
runtime·semrelease(uint32 volatile *addr)
{
SemaWaiter *s;
SemaRoot *root;
root = semroot(addr);
runtime·xadd(addr, 1);
// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
if(runtime·atomicload(&root->nwait) == 0)
return;
// Harder case: search for a waiter and wake it.
runtime·lock(&root->lock);
if(runtime·atomicload(&root->nwait) == 0) {
// The count is already consumed by another goroutine,
// so no need to wake up another goroutine.
runtime·unlock(&root->lock);
return;
}
for(s = root->head; s; s = s->next) {
if(s->addr == addr) {
runtime·xadd(&root->nwait, -1);
semdequeue(root, s);
break;
}
}
runtime·unlock(&root->lock);
if(s) {
if(s->releasetime)
s->releasetime = runtime·cputicks();
runtime·ready(s->g);
}
}
// TODO(dvyukov): move to netpoll.goc once it's used by all OSes.
void net·runtime_Semacquire(uint32 *addr)
{
runtime·semacquire(addr, true);
}
void net·runtime_Semrelease(uint32 *addr)
{
runtime·semrelease(addr);
}
func runtime_Semacquire(addr *uint32) {
runtime·semacquire(addr, true);
}
func runtime_Semrelease(addr *uint32) {
runtime·semrelease(addr);
}
......@@ -116,7 +116,9 @@ const (
// Atomic operations to read/write a pointer.
// in stubs.goc
func goatomicload(p *uint32) uint32 // return *p
func goatomicloadp(p unsafe.Pointer) unsafe.Pointer // return *p
func goatomicstore(p *uint32, v uint32) // *p = v
func goatomicstorep(p unsafe.Pointer, v unsafe.Pointer) // *p = v
// in stubs.goc
......@@ -124,6 +126,9 @@ func goatomicstorep(p unsafe.Pointer, v unsafe.Pointer) // *p = v
//go:noescape
func gocas(p *uint32, x uint32, y uint32) bool
//go:noescape
func goxadd(p *uint32, x uint32) uint32
//go:noescape
func gocasx(p *uintptr, x uintptr, y uintptr) bool
......@@ -151,8 +156,6 @@ func gothrow(s string)
func golock(x *lock)
func gounlock(x *lock)
func semacquire(*uint32, bool)
func semrelease(*uint32)
// Return the Go equivalent of the C Alg structure.
// TODO: at some point Go will hold the truth for the layout
......
......@@ -48,16 +48,31 @@ func gonanotime() (r int64) {
r = runtime·nanotime();
}
#pragma textflag NOSPLIT
func goatomicload(p *uint32) (v uint32) {
v = runtime·atomicload(p);
}
#pragma textflag NOSPLIT
func goatomicloadp(p **byte) (v *byte) {
v = runtime·atomicloadp(p);
}
#pragma textflag NOSPLIT
func goatomicstore(p *uint32, v uint32) {
runtime·atomicstore(p, v);
}
#pragma textflag NOSPLIT
func goatomicstorep(p **byte, v *byte) {
runtime·atomicstorep(p, v);
}
#pragma textflag NOSPLIT
func runtime·goxadd(p *uint32, x uint32) (ret uint32) {
ret = runtime·xadd(p, x);
}
#pragma textflag NOSPLIT
func runtime·gocas(p *uint32, x uint32, y uint32) (ret bool) {
ret = runtime·cas(p, x, y);
......
......@@ -19,3 +19,15 @@ TEXT sync·runtime_Syncsemrelease(SB),NOSPLIT,$0-0
TEXT sync·runtime_Syncsemcheck(SB),NOSPLIT,$0-0
JMP runtime·syncsemcheck(SB)
TEXT sync·runtime_Semacquire(SB),NOSPLIT,$0-0
JMP runtime·asyncsemacquire(SB)
TEXT sync·runtime_Semrelease(SB),NOSPLIT,$0-0
JMP runtime·asyncsemrelease(SB)
TEXT net·runtime_Semacquire(SB),NOSPLIT,$0-0
JMP runtime·asyncsemacquire(SB)
TEXT net·runtime_Semrelease(SB),NOSPLIT,$0-0
JMP runtime·asyncsemrelease(SB)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment