Commit 6ea7bf25 authored by Alex Brainman's avatar Alex Brainman

net: implement netpoll for windows

Moves the network poller from net package into runtime.

benchmark                           old ns/op    new ns/op    delta
BenchmarkTCP4OneShot                   316386       287061   -9.27%
BenchmarkTCP4OneShot-2                 339822       313424   -7.77%
BenchmarkTCP4OneShot-3                 330057       306589   -7.11%
BenchmarkTCP4OneShotTimeout            341775       287061  -16.01%
BenchmarkTCP4OneShotTimeout-2          380835       295849  -22.32%
BenchmarkTCP4OneShotTimeout-3          398412       328070  -17.66%
BenchmarkTCP4Persistent                 40622        33392  -17.80%
BenchmarkTCP4Persistent-2               44528        35736  -19.74%
BenchmarkTCP4Persistent-3               44919        36907  -17.84%
BenchmarkTCP4PersistentTimeout          45309        33588  -25.87%
BenchmarkTCP4PersistentTimeout-2        50289        38079  -24.28%
BenchmarkTCP4PersistentTimeout-3        51559        37103  -28.04%
BenchmarkTCP6OneShot                   361305       345645   -4.33%
BenchmarkTCP6OneShot-2                 361305       331976   -8.12%
BenchmarkTCP6OneShot-3                 376929       347598   -7.78%
BenchmarkTCP6OneShotTimeout            361305       322212  -10.82%
BenchmarkTCP6OneShotTimeout-2          378882       333928  -11.86%
BenchmarkTCP6OneShotTimeout-3          388647       335881  -13.58%
BenchmarkTCP6Persistent                 47653        35345  -25.83%
BenchmarkTCP6Persistent-2               49215        35736  -27.39%
BenchmarkTCP6Persistent-3               38474        37493   -2.55%
BenchmarkTCP6PersistentTimeout          56637        34369  -39.32%
BenchmarkTCP6PersistentTimeout-2        42575        38079  -10.56%
BenchmarkTCP6PersistentTimeout-3        44137        37689  -14.61%

R=dvyukov
CC=golang-dev
https://golang.org/cl/8670044
parent ae599169
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// +build darwin linux // +build darwin linux windows
package net package net
......
...@@ -74,39 +74,39 @@ type anOpIface interface { ...@@ -74,39 +74,39 @@ type anOpIface interface {
Submit() error Submit() error
} }
// IO completion result parameters.
type ioResult struct {
qty uint32
err error
}
// anOp implements functionality common to all IO operations. // anOp implements functionality common to all IO operations.
// Its beginning must be the same as runtime.net_anOp. Keep these in sync.
type anOp struct { type anOp struct {
// Used by IOCP interface, it must be first field // Used by IOCP interface, it must be first field
// of the struct, as our code rely on it. // of the struct, as our code rely on it.
o syscall.Overlapped o syscall.Overlapped
resultc chan ioResult // fields used by runtime.netpoll
errnoc chan error runtimeCtx uintptr
fd *netFD mode int32
errno int32
qty uint32
errnoc chan error
fd *netFD
} }
func (o *anOp) Init(fd *netFD, mode int) { func (o *anOp) Init(fd *netFD, mode int32) {
o.fd = fd o.fd = fd
var i int o.mode = mode
if mode == 'r' { o.runtimeCtx = fd.pd.runtimeCtx
i = 0 if !canCancelIO {
} else { var i int
i = 1 if mode == 'r' {
} i = 0
if fd.resultc[i] == nil { } else {
fd.resultc[i] = make(chan ioResult, 1) i = 1
} }
o.resultc = fd.resultc[i] if fd.errnoc[i] == nil {
if fd.errnoc[i] == nil { fd.errnoc[i] = make(chan error)
fd.errnoc[i] = make(chan error) }
o.errnoc = fd.errnoc[i]
} }
o.errnoc = fd.errnoc[i]
} }
func (o *anOp) Op() *anOp { func (o *anOp) Op() *anOp {
...@@ -120,7 +120,7 @@ type bufOp struct { ...@@ -120,7 +120,7 @@ type bufOp struct {
buf syscall.WSABuf buf syscall.WSABuf
} }
func (o *bufOp) Init(fd *netFD, buf []byte, mode int) { func (o *bufOp) Init(fd *netFD, buf []byte, mode int32) {
o.anOp.Init(fd, mode) o.anOp.Init(fd, mode)
o.buf.Len = uint32(len(buf)) o.buf.Len = uint32(len(buf))
if len(buf) == 0 { if len(buf) == 0 {
...@@ -130,41 +130,6 @@ func (o *bufOp) Init(fd *netFD, buf []byte, mode int) { ...@@ -130,41 +130,6 @@ func (o *bufOp) Init(fd *netFD, buf []byte, mode int) {
} }
} }
// resultSrv will retrieve all IO completion results from
// iocp and send them to the correspondent waiting client
// goroutine via channel supplied in the request.
type resultSrv struct {
iocp syscall.Handle
}
func runtime_blockingSyscallHint()
func (s *resultSrv) Run() {
var o *syscall.Overlapped
var key uint32
var r ioResult
for {
r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, 0)
if r.err == syscall.Errno(syscall.WAIT_TIMEOUT) && o == nil {
runtime_blockingSyscallHint()
r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE)
}
switch {
case r.err == nil:
// Dequeued successfully completed IO packet.
case r.err == syscall.Errno(syscall.WAIT_TIMEOUT) && o == nil:
// Wait has timed out (should not happen now, but might be used in the future).
panic("GetQueuedCompletionStatus timed out")
case o == nil:
// Failed to dequeue anything -> report the error.
panic("GetQueuedCompletionStatus failed " + r.err.Error())
default:
// Dequeued failed IO packet.
}
(*anOp)(unsafe.Pointer(o)).resultc <- r
}
}
// ioSrv executes net IO requests. // ioSrv executes net IO requests.
type ioSrv struct { type ioSrv struct {
submchan chan anOpIface // submit IO requests submchan chan anOpIface // submit IO requests
...@@ -192,18 +157,14 @@ func (s *ioSrv) ProcessRemoteIO() { ...@@ -192,18 +157,14 @@ func (s *ioSrv) ProcessRemoteIO() {
// ExecIO executes a single IO operation oi. It submits and cancels // ExecIO executes a single IO operation oi. It submits and cancels
// IO in the current thread for systems where Windows CancelIoEx API // IO in the current thread for systems where Windows CancelIoEx API
// is available. Alternatively, it passes the request onto // is available. Alternatively, it passes the request onto
// a special goroutine and waits for completion or cancels request. // runtime netpoll and waits for completion or cancels request.
// deadline is unix nanos. func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
func (s *ioSrv) ExecIO(oi anOpIface, deadline int64) (int, error) {
var err error var err error
o := oi.Op() o := oi.Op()
// Calculate timeout delta. // Notify runtime netpoll about starting IO.
var delta int64 err = o.fd.pd.Prepare(int(o.mode))
if deadline != 0 { if err != nil {
delta = deadline - time.Now().UnixNano() return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
if delta <= 0 {
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, errTimeout}
}
} }
// Start IO. // Start IO.
if canCancelIO { if canCancelIO {
...@@ -223,67 +184,56 @@ func (s *ioSrv) ExecIO(oi anOpIface, deadline int64) (int, error) { ...@@ -223,67 +184,56 @@ func (s *ioSrv) ExecIO(oi anOpIface, deadline int64) (int, error) {
default: default:
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
} }
// Setup timer, if deadline is given.
var timer <-chan time.Time
if delta > 0 {
t := time.NewTimer(time.Duration(delta) * time.Nanosecond)
defer t.Stop()
timer = t.C
}
// Wait for our request to complete. // Wait for our request to complete.
var r ioResult err = o.fd.pd.Wait(int(o.mode))
var cancelled, timeout bool if err == nil {
select { // All is good. Extract our IO results and return.
case r = <-o.resultc: if o.errno != 0 {
case <-timer: err = syscall.Errno(o.errno)
cancelled = true return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
timeout = true
case <-o.fd.closec:
cancelled = true
}
if cancelled {
// Cancel it.
if canCancelIO {
err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic.
panic(err)
}
} else {
s.canchan <- oi
<-o.errnoc
}
// Wait for IO to be canceled or complete successfully.
r = <-o.resultc
if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
if timeout {
r.err = errTimeout
} else {
r.err = errClosing
}
} }
return int(o.qty), nil
} }
if r.err != nil { // IO is interrupted by "close" or "timeout"
err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, r.err} netpollErr := err
switch netpollErr {
case errClosing, errTimeout:
// will deal with those.
default:
panic("net: unexpected runtime.netpoll error: " + netpollErr.Error())
} }
return int(r.qty), err // Cancel our request.
if canCancelIO {
err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic.
panic(err)
}
} else {
s.canchan <- oi
<-o.errnoc
}
// Wait for cancellation to complete.
o.fd.pd.WaitCanceled(int(o.mode))
if o.errno != 0 {
err = syscall.Errno(o.errno)
if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
err = netpollErr
}
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
}
// We issued cancellation request. But, it seems, IO operation succeeded
// before cancellation request run. We need to treat IO operation as
// succeeded (the bytes are actually sent/recv from network).
return int(o.qty), nil
} }
// Start helper goroutines. // Start helper goroutines.
var resultsrv *resultSrv
var iosrv *ioSrv var iosrv *ioSrv
var onceStartServer sync.Once var onceStartServer sync.Once
func startServer() { func startServer() {
resultsrv = new(resultSrv)
var err error
resultsrv.iocp, err = syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 1)
if err != nil {
panic("CreateIoCompletionPort: " + err.Error())
}
go resultsrv.Run()
iosrv = new(ioSrv) iosrv = new(ioSrv)
if !canCancelIO { if !canCancelIO {
// Only CancelIo API is available. Lets start special goroutine // Only CancelIo API is available. Lets start special goroutine
...@@ -309,38 +259,30 @@ type netFD struct { ...@@ -309,38 +259,30 @@ type netFD struct {
net string net string
laddr Addr laddr Addr
raddr Addr raddr Addr
resultc [2]chan ioResult // read/write completion results errnoc [2]chan error // read/write submit or cancel operation errors
errnoc [2]chan error // read/write submit or cancel operation errors
closec chan bool // used by Close to cancel pending IO
// serialize access to Read and Write methods // serialize access to Read and Write methods
rio, wio sync.Mutex rio, wio sync.Mutex
// read and write deadlines // wait server
rdeadline, wdeadline deadline pd pollDesc
} }
func allocFD(fd syscall.Handle, family, sotype int, net string) *netFD { func newFD(fd syscall.Handle, family, sotype int, net string) (*netFD, error) {
if initErr != nil {
return nil, initErr
}
onceStartServer.Do(startServer)
netfd := &netFD{ netfd := &netFD{
sysfd: fd, sysfd: fd,
family: family, family: family,
sotype: sotype, sotype: sotype,
net: net, net: net,
closec: make(chan bool),
}
return netfd
}
func newFD(fd syscall.Handle, family, proto int, net string) (*netFD, error) {
if initErr != nil {
return nil, initErr
} }
onceStartServer.Do(startServer) if err := netfd.pd.Init(netfd); err != nil {
// Associate our socket with resultsrv.iocp.
if _, err := syscall.CreateIoCompletionPort(syscall.Handle(fd), resultsrv.iocp, 0, 0); err != nil {
return nil, err return nil, err
} }
return allocFD(fd, family, proto, net), nil return netfd, nil
} }
func (fd *netFD) setAddr(laddr, raddr Addr) { func (fd *netFD) setAddr(laddr, raddr Addr) {
...@@ -386,7 +328,7 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { ...@@ -386,7 +328,7 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
var o connectOp var o connectOp
o.Init(fd, 'w') o.Init(fd, 'w')
o.ra = ra o.ra = ra
_, err := iosrv.ExecIO(&o, fd.wdeadline.value()) _, err := iosrv.ExecIO(&o)
if err != nil { if err != nil {
return err return err
} }
...@@ -438,7 +380,7 @@ func (fd *netFD) Close() error { ...@@ -438,7 +380,7 @@ func (fd *netFD) Close() error {
} }
defer fd.decref() defer fd.decref()
// unblock pending reader and writer // unblock pending reader and writer
close(fd.closec) fd.pd.Evict()
// wait for both reader and writer to exit // wait for both reader and writer to exit
fd.rio.Lock() fd.rio.Lock()
defer fd.rio.Unlock() defer fd.rio.Unlock()
...@@ -495,7 +437,7 @@ func (fd *netFD) Read(buf []byte) (int, error) { ...@@ -495,7 +437,7 @@ func (fd *netFD) Read(buf []byte) (int, error) {
defer fd.rio.Unlock() defer fd.rio.Unlock()
var o readOp var o readOp
o.Init(fd, buf, 'r') o.Init(fd, buf, 'r')
n, err := iosrv.ExecIO(&o, fd.rdeadline.value()) n, err := iosrv.ExecIO(&o)
if err == nil && n == 0 { if err == nil && n == 0 {
err = io.EOF err = io.EOF
} }
...@@ -532,7 +474,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -532,7 +474,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
var o readFromOp var o readFromOp
o.Init(fd, buf, 'r') o.Init(fd, buf, 'r')
o.rsan = int32(unsafe.Sizeof(o.rsa)) o.rsan = int32(unsafe.Sizeof(o.rsa))
n, err = iosrv.ExecIO(&o, fd.rdeadline.value()) n, err = iosrv.ExecIO(&o)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
...@@ -564,7 +506,7 @@ func (fd *netFD) Write(buf []byte) (int, error) { ...@@ -564,7 +506,7 @@ func (fd *netFD) Write(buf []byte) (int, error) {
defer fd.wio.Unlock() defer fd.wio.Unlock()
var o writeOp var o writeOp
o.Init(fd, buf, 'w') o.Init(fd, buf, 'w')
return iosrv.ExecIO(&o, fd.wdeadline.value()) return iosrv.ExecIO(&o)
} }
// WriteTo to network. // WriteTo to network.
...@@ -596,7 +538,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { ...@@ -596,7 +538,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
var o writeToOp var o writeToOp
o.Init(fd, buf, 'w') o.Init(fd, buf, 'w')
o.sa = sa o.sa = sa
return iosrv.ExecIO(&o, fd.wdeadline.value()) return iosrv.ExecIO(&o)
} }
// Accept new network connections. // Accept new network connections.
...@@ -631,17 +573,17 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { ...@@ -631,17 +573,17 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
} }
// Associate our new socket with IOCP. // Associate our new socket with IOCP.
onceStartServer.Do(startServer) netfd, err := newFD(s, fd.family, fd.sotype, fd.net)
if _, err := syscall.CreateIoCompletionPort(s, resultsrv.iocp, 0, 0); err != nil { if err != nil {
closesocket(s) closesocket(s)
return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, err} return nil, &OpError{"accept", fd.net, fd.laddr, err}
} }
// Submit accept request. // Submit accept request.
var o acceptOp var o acceptOp
o.Init(fd, 'r') o.Init(fd, 'r')
o.newsock = s o.newsock = s
_, err = iosrv.ExecIO(&o, fd.rdeadline.value()) _, err = iosrv.ExecIO(&o)
if err != nil { if err != nil {
closesocket(s) closesocket(s)
return nil, err return nil, err
...@@ -663,7 +605,6 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { ...@@ -663,7 +605,6 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
lsa, _ := lrsa.Sockaddr() lsa, _ := lrsa.Sockaddr()
rsa, _ := rrsa.Sockaddr() rsa, _ := rrsa.Sockaddr()
netfd := allocFD(s, fd.family, fd.sotype, fd.net)
netfd.setAddr(toAddr(lsa), toAddr(rsa)) netfd.setAddr(toAddr(lsa), toAddr(rsa))
return netfd, nil return netfd, nil
} }
......
...@@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
o.Init(c, 'w') o.Init(c, 'w')
o.n = uint32(n) o.n = uint32(n)
o.src = syscall.Handle(f.Fd()) o.src = syscall.Handle(f.Fd())
done, err := iosrv.ExecIO(&o, 0) done, err := iosrv.ExecIO(&o)
if err != nil { if err != nil {
return 0, err, false return 0, err, false
} }
......
...@@ -9,7 +9,6 @@ package net ...@@ -9,7 +9,6 @@ package net
import ( import (
"os" "os"
"syscall" "syscall"
"time"
) )
func setDefaultSockopts(s syscall.Handle, f, t int, ipv6only bool) error { func setDefaultSockopts(s syscall.Handle, f, t int, ipv6only bool) error {
...@@ -48,21 +47,3 @@ func setDefaultMulticastSockopts(s syscall.Handle) error { ...@@ -48,21 +47,3 @@ func setDefaultMulticastSockopts(s syscall.Handle) error {
} }
return nil return nil
} }
// TODO(dfc) these unused error returns could be removed
func setReadDeadline(fd *netFD, t time.Time) error {
fd.rdeadline.setTime(t)
return nil
}
func setWriteDeadline(fd *netFD, t time.Time) error {
fd.wdeadline.setTime(t)
return nil
}
func setDeadline(fd *netFD, t time.Time) error {
setReadDeadline(fd, t)
setWriteDeadline(fd, t)
return nil
}
...@@ -7,8 +7,8 @@ ...@@ -7,8 +7,8 @@
/* /*
Input to cgo. Input to cgo.
GOARCH=amd64 cgo -cdefs defs.go >amd64/defs.h GOARCH=amd64 go tool cgo -cdefs defs_windows.go > defs_windows_amd64.h
GOARCH=386 cgo -cdefs defs.go >386/defs.h GOARCH=386 go tool cgo -cdefs defs_windows.go > defs_windows_386.h
*/ */
package runtime package runtime
...@@ -57,6 +57,9 @@ const ( ...@@ -57,6 +57,9 @@ const (
EXCEPTION_FLT_UNDERFLOW = C.STATUS_FLOAT_UNDERFLOW EXCEPTION_FLT_UNDERFLOW = C.STATUS_FLOAT_UNDERFLOW
EXCEPTION_INT_DIVIDE_BY_ZERO = C.STATUS_INTEGER_DIVIDE_BY_ZERO EXCEPTION_INT_DIVIDE_BY_ZERO = C.STATUS_INTEGER_DIVIDE_BY_ZERO
EXCEPTION_INT_OVERFLOW = C.STATUS_INTEGER_OVERFLOW EXCEPTION_INT_OVERFLOW = C.STATUS_INTEGER_OVERFLOW
INFINITE = C.INFINITE
WAIT_TIMEOUT = C.WAIT_TIMEOUT
) )
type SystemInfo C.SYSTEM_INFO type SystemInfo C.SYSTEM_INFO
...@@ -64,3 +67,4 @@ type ExceptionRecord C.EXCEPTION_RECORD ...@@ -64,3 +67,4 @@ type ExceptionRecord C.EXCEPTION_RECORD
type FloatingSaveArea C.FLOATING_SAVE_AREA type FloatingSaveArea C.FLOATING_SAVE_AREA
type M128a C.M128A type M128a C.M128A
type Context C.CONTEXT type Context C.CONTEXT
type Overlapped C.OVERLAPPED
...@@ -30,6 +30,9 @@ enum { ...@@ -30,6 +30,9 @@ enum {
EXCEPTION_FLT_UNDERFLOW = 0xc0000093, EXCEPTION_FLT_UNDERFLOW = 0xc0000093,
EXCEPTION_INT_DIVIDE_BY_ZERO = 0xc0000094, EXCEPTION_INT_DIVIDE_BY_ZERO = 0xc0000094,
EXCEPTION_INT_OVERFLOW = 0xc0000095, EXCEPTION_INT_OVERFLOW = 0xc0000095,
INFINITE = 0xffffffff,
WAIT_TIMEOUT = 0x102,
}; };
typedef struct SystemInfo SystemInfo; typedef struct SystemInfo SystemInfo;
...@@ -37,6 +40,7 @@ typedef struct ExceptionRecord ExceptionRecord; ...@@ -37,6 +40,7 @@ typedef struct ExceptionRecord ExceptionRecord;
typedef struct FloatingSaveArea FloatingSaveArea; typedef struct FloatingSaveArea FloatingSaveArea;
typedef struct M128a M128a; typedef struct M128a M128a;
typedef struct Context Context; typedef struct Context Context;
typedef struct Overlapped Overlapped;
#pragma pack on #pragma pack on
...@@ -98,6 +102,12 @@ struct Context { ...@@ -98,6 +102,12 @@ struct Context {
uint32 SegSs; uint32 SegSs;
uint8 ExtendedRegisters[512]; uint8 ExtendedRegisters[512];
}; };
struct Overlapped {
uint32 Internal;
uint32 InternalHigh;
byte anon0[8];
byte *hEvent;
};
#pragma pack off #pragma pack off
...@@ -30,6 +30,9 @@ enum { ...@@ -30,6 +30,9 @@ enum {
EXCEPTION_FLT_UNDERFLOW = 0xc0000093, EXCEPTION_FLT_UNDERFLOW = 0xc0000093,
EXCEPTION_INT_DIVIDE_BY_ZERO = 0xc0000094, EXCEPTION_INT_DIVIDE_BY_ZERO = 0xc0000094,
EXCEPTION_INT_OVERFLOW = 0xc0000095, EXCEPTION_INT_OVERFLOW = 0xc0000095,
INFINITE = 0xffffffff,
WAIT_TIMEOUT = 0x102,
}; };
typedef struct SystemInfo SystemInfo; typedef struct SystemInfo SystemInfo;
...@@ -37,6 +40,7 @@ typedef struct ExceptionRecord ExceptionRecord; ...@@ -37,6 +40,7 @@ typedef struct ExceptionRecord ExceptionRecord;
typedef struct FloatingSaveArea FloatingSaveArea; typedef struct FloatingSaveArea FloatingSaveArea;
typedef struct M128a M128a; typedef struct M128a M128a;
typedef struct Context Context; typedef struct Context Context;
typedef struct Overlapped Overlapped;
#pragma pack on #pragma pack on
...@@ -113,6 +117,12 @@ struct Context { ...@@ -113,6 +117,12 @@ struct Context {
uint64 LastExceptionToRip; uint64 LastExceptionToRip;
uint64 LastExceptionFromRip; uint64 LastExceptionFromRip;
}; };
struct Overlapped {
uint64 Internal;
uint64 InternalHigh;
byte anon0[8];
byte *hEvent;
};
#pragma pack off #pragma pack off
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// +build darwin linux // +build darwin linux windows
package net package net
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// +build freebsd netbsd openbsd plan9 windows // +build freebsd netbsd openbsd plan9
#include "runtime.h" #include "runtime.h"
......
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
#include "runtime.h"
#include "defs_GOOS_GOARCH.h"
#include "os_GOOS.h"
#define DWORD_MAX 0xffffffff
#pragma dynimport runtime·CreateIoCompletionPort CreateIoCompletionPort "kernel32.dll"
#pragma dynimport runtime·GetQueuedCompletionStatus GetQueuedCompletionStatus "kernel32.dll"
extern void *runtime·CreateIoCompletionPort;
extern void *runtime·GetQueuedCompletionStatus;
#define INVALID_HANDLE_VALUE ((uintptr)-1)
// net_anOp must be the same as beginning of net.anOp. Keep these in sync.
typedef struct net_anOp net_anOp;
struct net_anOp
{
// used by windows
Overlapped o;
// used by netpoll
uintptr runtimeCtx;
int32 mode;
int32 errno;
uint32 qty;
};
static uintptr iocphandle = INVALID_HANDLE_VALUE; // completion port io handle
void
runtime·netpollinit(void)
{
iocphandle = (uintptr)runtime·stdcall(runtime·CreateIoCompletionPort, 4, INVALID_HANDLE_VALUE, (uintptr)0, (uintptr)0, (uintptr)DWORD_MAX);
if(iocphandle == 0) {
runtime·printf("netpoll: failed to create iocp handle (errno=%d)\n", runtime·getlasterror());
runtime·throw("netpoll: failed to create iocp handle");
}
return;
}
int32
runtime·netpollopen(uintptr fd, PollDesc *pd)
{
USED(pd);
if(runtime·stdcall(runtime·CreateIoCompletionPort, 4, fd, iocphandle, (uintptr)0, (uintptr)0) == 0)
return -runtime·getlasterror();
return 0;
}
int32
runtime·netpollclose(uintptr fd)
{
// nothing to do
USED(fd);
return 0;
}
// Polls for completed network IO.
// Returns list of goroutines that become runnable.
G*
runtime·netpoll(bool block)
{
uint32 wait, qty, key;
int32 mode, errno;
net_anOp *o;
G *gp;
if(iocphandle == INVALID_HANDLE_VALUE)
return nil;
o = nil;
errno = 0;
qty = 0;
wait = INFINITE;
if(!block)
// TODO(brainman): should use 0 here instead, but scheduler hogs CPU
wait = 1;
// TODO(brainman): Need a loop here to fetch all pending notifications
// (or at least a batch). Scheduler will behave better if is given
// a batch of newly runnable goroutines.
// TODO(brainman): Call GetQueuedCompletionStatusEx() here when possible.
if(runtime·stdcall(runtime·GetQueuedCompletionStatus, 5, iocphandle, &qty, &key, &o, (uintptr)wait) == 0) {
errno = runtime·getlasterror();
if(o == nil && errno == WAIT_TIMEOUT) {
if(!block)
return nil;
runtime·throw("netpoll: GetQueuedCompletionStatus timed out");
}
if(o == nil) {
runtime·printf("netpoll: GetQueuedCompletionStatus failed (errno=%d)\n", errno);
runtime·throw("netpoll: GetQueuedCompletionStatus failed");
}
// dequeued failed IO packet, so report that
}
if(o == nil)
runtime·throw("netpoll: GetQueuedCompletionStatus returned o == nil");
mode = o->mode;
if(mode != 'r' && mode != 'w') {
runtime·printf("netpoll: GetQueuedCompletionStatus returned invalid mode=%d\n", mode);
runtime·throw("netpoll: GetQueuedCompletionStatus returned invalid mode");
}
o->errno = errno;
o->qty = qty;
gp = nil;
runtime·netpollready(&gp, (void*)o->runtimeCtx, mode);
return gp;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment