Commit 28b59942 authored by Dave Cheney's avatar Dave Cheney

net: move deadline logic into pollServer

Update #4434.

The proposal attempts to reduce the number of places where fd,{r,w}deadline is checked and updated in preparation for issue 4434. In doing so the deadline logic is simplified by letting the pollster return errTimeout from netFD.Wait{Read,Write} as part of the wakeup logic.

The behaviour of setting n = 0 has been restored to match rev 2a55e349097f, which was the previous change to fd_unix.go before CL 6851096.

R=jsing, bradfitz, mikioh.mikioh, rsc
CC=fullung, golang-dev
https://golang.org/cl/6850110
parent da803e5c
...@@ -181,12 +181,10 @@ func (s *pollServer) CheckDeadlines() { ...@@ -181,12 +181,10 @@ func (s *pollServer) CheckDeadlines() {
delete(s.pending, key) delete(s.pending, key)
if mode == 'r' { if mode == 'r' {
s.poll.DelFD(fd.sysfd, mode) s.poll.DelFD(fd.sysfd, mode)
fd.rdeadline = -1
} else { } else {
s.poll.DelFD(fd.sysfd, mode) s.poll.DelFD(fd.sysfd, mode)
fd.wdeadline = -1
} }
s.WakeFD(fd, mode, nil) s.WakeFD(fd, mode, errTimeout)
} else if nextDeadline == 0 || t < nextDeadline { } else if nextDeadline == 0 || t < nextDeadline {
nextDeadline = t nextDeadline = t
} }
...@@ -329,14 +327,10 @@ func (fd *netFD) name() string { ...@@ -329,14 +327,10 @@ func (fd *netFD) name() string {
func (fd *netFD) connect(ra syscall.Sockaddr) error { func (fd *netFD) connect(ra syscall.Sockaddr) error {
err := syscall.Connect(fd.sysfd, ra) err := syscall.Connect(fd.sysfd, ra)
hadTimeout := fd.wdeadline > 0
if err == syscall.EINPROGRESS { if err == syscall.EINPROGRESS {
if err = fd.pollServer.WaitWrite(fd); err != nil { if err = fd.pollServer.WaitWrite(fd); err != nil {
return err return err
} }
if hadTimeout && fd.wdeadline < 0 {
return errTimeout
}
var e int var e int
e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR) e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
if err != nil { if err != nil {
...@@ -430,20 +424,15 @@ func (fd *netFD) Read(p []byte) (n int, err error) { ...@@ -430,20 +424,15 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
} }
} }
n, err = syscall.Read(int(fd.sysfd), p) n, err = syscall.Read(int(fd.sysfd), p)
if err == syscall.EAGAIN { if err != nil {
n = 0 n = 0
err = errTimeout if err == syscall.EAGAIN {
if fd.rdeadline >= 0 {
if err = fd.pollServer.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
} }
} }
} }
if err != nil { err = chkReadErr(n, err, fd)
n = 0
} else if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM {
err = io.EOF
}
break break
} }
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
...@@ -467,18 +456,15 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -467,18 +456,15 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
} }
} }
n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0) n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
if err == syscall.EAGAIN { if err != nil {
n = 0 n = 0
err = errTimeout if err == syscall.EAGAIN {
if fd.rdeadline >= 0 {
if err = fd.pollServer.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
} }
} }
} }
if err != nil { err = chkReadErr(n, err, fd)
n = 0
}
break break
} }
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
...@@ -502,27 +488,30 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S ...@@ -502,27 +488,30 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
} }
} }
n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0) n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
if err == syscall.EAGAIN { if err != nil {
n = 0 // TODO(dfc) should n and oobn be set to nil
err = errTimeout if err == syscall.EAGAIN {
if fd.rdeadline >= 0 {
if err = fd.pollServer.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
} }
} }
} }
if err == nil && n == 0 { err = chkReadErr(n, err, fd)
err = io.EOF
}
break break
} }
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
err = &OpError{"read", fd.net, fd.laddr, err} err = &OpError{"read", fd.net, fd.laddr, err}
return
} }
return return
} }
func chkReadErr(n int, err error, fd *netFD) error {
if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
return io.EOF
}
return err
}
func (fd *netFD) Write(p []byte) (int, error) { func (fd *netFD) Write(p []byte) (int, error) {
fd.wio.Lock() fd.wio.Lock()
defer fd.wio.Unlock() defer fd.wio.Unlock()
...@@ -548,11 +537,8 @@ func (fd *netFD) Write(p []byte) (int, error) { ...@@ -548,11 +537,8 @@ func (fd *netFD) Write(p []byte) (int, error) {
break break
} }
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout if err = fd.pollServer.WaitWrite(fd); err == nil {
if fd.wdeadline >= 0 { continue
if err = fd.pollServer.WaitWrite(fd); err == nil {
continue
}
} }
} }
if err != nil { if err != nil {
...@@ -586,11 +572,8 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { ...@@ -586,11 +572,8 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
} }
err = syscall.Sendto(fd.sysfd, p, 0, sa) err = syscall.Sendto(fd.sysfd, p, 0, sa)
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout if err = fd.pollServer.WaitWrite(fd); err == nil {
if fd.wdeadline >= 0 { continue
if err = fd.pollServer.WaitWrite(fd); err == nil {
continue
}
} }
} }
break break
...@@ -619,11 +602,8 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob ...@@ -619,11 +602,8 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
} }
err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0) err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout if err = fd.pollServer.WaitWrite(fd); err == nil {
if fd.wdeadline >= 0 { continue
if err = fd.pollServer.WaitWrite(fd); err == nil {
continue
}
} }
} }
break break
...@@ -654,11 +634,8 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e ...@@ -654,11 +634,8 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e
if err != nil { if err != nil {
syscall.ForkLock.RUnlock() syscall.ForkLock.RUnlock()
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout if err = fd.pollServer.WaitRead(fd); err == nil {
if fd.rdeadline >= 0 { continue
if err = fd.pollServer.WaitRead(fd); err == nil {
continue
}
} }
} else if err == syscall.ECONNABORTED { } else if err == syscall.ECONNABORTED {
// This means that a socket on the listen queue was closed // This means that a socket on the listen queue was closed
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
package net package net
import ( import (
"io"
"syscall"
"testing" "testing"
) )
...@@ -57,3 +59,48 @@ func TestAddFDReturnsError(t *testing.T) { ...@@ -57,3 +59,48 @@ func TestAddFDReturnsError(t *testing.T) {
} }
t.Error("unexpected error:", err) t.Error("unexpected error:", err)
} }
var chkReadErrTests = []struct {
n int
err error
fd *netFD
expected error
}{
{100, nil, &netFD{sotype: syscall.SOCK_STREAM}, nil},
{100, io.EOF, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF},
{100, errClosing, &netFD{sotype: syscall.SOCK_STREAM}, errClosing},
{0, nil, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF},
{0, io.EOF, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF},
{0, errClosing, &netFD{sotype: syscall.SOCK_STREAM}, errClosing},
{100, nil, &netFD{sotype: syscall.SOCK_DGRAM}, nil},
{100, io.EOF, &netFD{sotype: syscall.SOCK_DGRAM}, io.EOF},
{100, errClosing, &netFD{sotype: syscall.SOCK_DGRAM}, errClosing},
{0, nil, &netFD{sotype: syscall.SOCK_DGRAM}, nil},
{0, io.EOF, &netFD{sotype: syscall.SOCK_DGRAM}, io.EOF},
{0, errClosing, &netFD{sotype: syscall.SOCK_DGRAM}, errClosing},
{100, nil, &netFD{sotype: syscall.SOCK_SEQPACKET}, nil},
{100, io.EOF, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF},
{100, errClosing, &netFD{sotype: syscall.SOCK_SEQPACKET}, errClosing},
{0, nil, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF},
{0, io.EOF, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF},
{0, errClosing, &netFD{sotype: syscall.SOCK_SEQPACKET}, errClosing},
{100, nil, &netFD{sotype: syscall.SOCK_RAW}, nil},
{100, io.EOF, &netFD{sotype: syscall.SOCK_RAW}, io.EOF},
{100, errClosing, &netFD{sotype: syscall.SOCK_RAW}, errClosing},
{0, nil, &netFD{sotype: syscall.SOCK_RAW}, nil},
{0, io.EOF, &netFD{sotype: syscall.SOCK_RAW}, io.EOF},
{0, errClosing, &netFD{sotype: syscall.SOCK_RAW}, errClosing},
}
func TestChkReadErr(t *testing.T) {
for _, tt := range chkReadErrTests {
actual := chkReadErr(tt.n, tt.err, tt.fd)
if actual != tt.expected {
t.Errorf("chkReadError(%v, %v, %v): expected %v, actual %v", tt.n, tt.err, tt.fd.sotype, tt.expected, actual)
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment