fd_windows.go 13.2 KB
Newer Older
1 2 3 4 5 6 7 8
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package net

import (
	"os"
Wei Guangjing's avatar
Wei Guangjing committed
9
	"runtime"
10 11
	"sync"
	"syscall"
Wei Guangjing's avatar
Wei Guangjing committed
12
	"time"
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
	"unsafe"
)

// IO completion result parameters.
type ioResult struct {
	key   uint32
	qty   uint32
	errno int
}

// Network file descriptor.
type netFD struct {
	// locking/lifetime of sysfd
	sysmu   sync.Mutex
	sysref  int
	closing bool

	// immutable until Close
31 32 33 34 35 36 37 38
	sysfd  int
	family int
	proto  int
	cr     chan *ioResult
	cw     chan *ioResult
	net    string
	laddr  Addr
	raddr  Addr
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80

	// owned by client
	rdeadline_delta int64
	rdeadline       int64
	rio             sync.Mutex
	wdeadline_delta int64
	wdeadline       int64
	wio             sync.Mutex
}

type InvalidConnError struct{}

func (e *InvalidConnError) String() string  { return "invalid net.Conn" }
func (e *InvalidConnError) Temporary() bool { return false }
func (e *InvalidConnError) Timeout() bool   { return false }

// pollServer will run around waiting for io completion request
// to arrive. Every request received will contain channel to signal
// io owner about the completion.

type pollServer struct {
	iocp int32
}

func newPollServer() (s *pollServer, err os.Error) {
	s = new(pollServer)
	var e int
	if s.iocp, e = syscall.CreateIoCompletionPort(-1, 0, 0, 1); e != 0 {
		return nil, os.NewSyscallError("CreateIoCompletionPort", e)
	}
	go s.Run()
	return s, nil
}

type ioPacket struct {
	// Used by IOCP interface,
	// it must be first field of the struct,
	// as our code rely on it.
	o syscall.Overlapped

	// Link to the io owner.
	c chan *ioResult
Wei Guangjing's avatar
Wei Guangjing committed
81 82

	w *syscall.WSABuf
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
}

func (s *pollServer) getCompletedIO() (ov *syscall.Overlapped, result *ioResult, err os.Error) {
	var r ioResult
	var o *syscall.Overlapped
	_, e := syscall.GetQueuedCompletionStatus(s.iocp, &r.qty, &r.key, &o, syscall.INFINITE)
	switch {
	case e == 0:
		// Dequeued successfully completed io packet.
		return o, &r, nil
	case e == syscall.WAIT_TIMEOUT && o == nil:
		// Wait has timed out (should not happen now, but might be used in the future).
		return nil, &r, os.NewSyscallError("GetQueuedCompletionStatus", e)
	case o == nil:
		// Failed to dequeue anything -> report the error.
		return nil, &r, os.NewSyscallError("GetQueuedCompletionStatus", e)
	default:
		// Dequeued failed io packet.
		r.errno = e
		return o, &r, nil
	}
	return
}

func (s *pollServer) Run() {
	for {
		o, r, err := s.getCompletedIO()
		if err != nil {
			panic("Run pollServer: " + err.String() + "\n")
		}
		p := (*ioPacket)(unsafe.Pointer(o))
		p.c <- r
	}
}

// Network FD methods.
// All the network FDs use a single pollServer.

var pollserver *pollServer
122
var onceStartServer sync.Once
123 124 125 126 127 128 129

func startServer() {
	p, err := newPollServer()
	if err != nil {
		panic("Start pollServer: " + err.String() + "\n")
	}
	pollserver = p
Wei Guangjing's avatar
Wei Guangjing committed
130 131

	go timeoutIO()
132 133 134 135 136 137 138 139
}

var initErr os.Error

func newFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD, err os.Error) {
	if initErr != nil {
		return nil, initErr
	}
140
	onceStartServer.Do(startServer)
141 142 143 144 145 146 147 148
	// Associate our socket with pollserver.iocp.
	if _, e := syscall.CreateIoCompletionPort(int32(fd), pollserver.iocp, 0, 0); e != 0 {
		return nil, &OpError{"CreateIoCompletionPort", net, laddr, os.Errno(e)}
	}
	f = &netFD{
		sysfd:  fd,
		family: family,
		proto:  proto,
Wei Guangjing's avatar
Wei Guangjing committed
149 150
		cr:     make(chan *ioResult, 1),
		cw:     make(chan *ioResult, 1),
151 152 153 154
		net:    net,
		laddr:  laddr,
		raddr:  raddr,
	}
155
	runtime.SetFinalizer(f, (*netFD).Close)
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
	return f, nil
}

// Add a reference to this fd.
func (fd *netFD) incref() {
	fd.sysmu.Lock()
	fd.sysref++
	fd.sysmu.Unlock()
}

// Remove a reference to this FD and close if we've been asked to do so (and
// there are no references left.
func (fd *netFD) decref() {
	fd.sysmu.Lock()
	fd.sysref--
	if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 {
		// In case the user has set linger, switch to blocking mode so
		// the close blocks.  As long as this doesn't happen often, we
		// can handle the extra OS processes.  Otherwise we'll need to
		// use the pollserver for Close too.  Sigh.
		syscall.SetNonblock(fd.sysfd, false)
177
		closesocket(fd.sysfd)
178
		fd.sysfd = -1
179 180
		// no need for a finalizer anymore
		runtime.SetFinalizer(fd, nil)
181 182 183 184 185
	}
	fd.sysmu.Unlock()
}

func (fd *netFD) Close() os.Error {
186
	if fd == nil || fd.sysfd == -1 {
187 188 189 190 191 192 193 194 195 196 197
		return os.EINVAL
	}

	fd.incref()
	syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
	fd.closing = true
	fd.decref()
	return nil
}

func newWSABuf(p []byte) *syscall.WSABuf {
198 199 200 201 202
	var p0 *byte
	if len(p) > 0 {
		p0 = (*byte)(unsafe.Pointer(&p[0]))
	}
	return &syscall.WSABuf{uint32(len(p)), p0}
203 204
}

Wei Guangjing's avatar
Wei Guangjing committed
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
func waitPacket(fd *netFD, pckt *ioPacket, mode int) (r *ioResult) {
	var delta int64
	if mode == 'r' {
		delta = fd.rdeadline_delta
	}
	if mode == 'w' {
		delta = fd.wdeadline_delta
	}
	if delta <= 0 {
		return <-pckt.c
	}

	select {
	case r = <-pckt.c:
	case <-time.After(delta):
		a := &arg{f: cancel, fd: fd, pckt: pckt, c: make(chan int)}
		ioChan <- a
		<-a.c
		r = <-pckt.c
		if r.errno == 995 { // IO Canceled
			r.errno = syscall.EWOULDBLOCK
		}
	}
	return r
}

const (
	read = iota
	readfrom
	write
	writeto
	cancel
)

type arg struct {
	f     int
	fd    *netFD
	pckt  *ioPacket
	done  *uint32
	flags *uint32
	rsa   *syscall.RawSockaddrAny
	size  *int32
	sa    *syscall.Sockaddr
	c     chan int
}

var ioChan chan *arg = make(chan *arg)

func timeoutIO() {
	// CancelIO only cancels all pending input and output (I/O) operations that are
	// issued by the calling thread for the specified file, does not cancel I/O
	// operations that other threads issue for a file handle. So we need do all timeout
	// I/O in single OS thread.
	runtime.LockOSThread()
	defer runtime.UnlockOSThread()
	for {
		o := <-ioChan
		var e int
		switch o.f {
		case read:
			e = syscall.WSARecv(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, o.flags, &o.pckt.o, nil)
		case readfrom:
			e = syscall.WSARecvFrom(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, o.flags, o.rsa, o.size, &o.pckt.o, nil)
		case write:
			e = syscall.WSASend(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, uint32(0), &o.pckt.o, nil)
		case writeto:
			e = syscall.WSASendto(uint32(o.fd.sysfd), o.pckt.w, 1, o.done, 0, *o.sa, &o.pckt.o, nil)
		case cancel:
			_, e = syscall.CancelIo(uint32(o.fd.sysfd))
		}
		o.c <- e
	}
}

279 280 281 282 283 284 285 286
func (fd *netFD) Read(p []byte) (n int, err os.Error) {
	if fd == nil {
		return 0, os.EINVAL
	}
	fd.rio.Lock()
	defer fd.rio.Unlock()
	fd.incref()
	defer fd.decref()
287
	if fd.sysfd == -1 {
288 289 290 291 292
		return 0, os.EINVAL
	}
	// Submit receive request.
	var pckt ioPacket
	pckt.c = fd.cr
Wei Guangjing's avatar
Wei Guangjing committed
293
	pckt.w = newWSABuf(p)
294 295
	var done uint32
	flags := uint32(0)
Wei Guangjing's avatar
Wei Guangjing committed
296 297 298 299 300 301 302 303
	var e int
	if fd.rdeadline_delta > 0 {
		a := &arg{f: read, fd: fd, pckt: &pckt, done: &done, flags: &flags, c: make(chan int)}
		ioChan <- a
		e = <-a.c
	} else {
		e = syscall.WSARecv(uint32(fd.sysfd), pckt.w, 1, &done, &flags, &pckt.o, nil)
	}
304 305 306 307 308 309 310 311 312
	switch e {
	case 0:
		// IO completed immediately, but we need to get our completion message anyway.
	case syscall.ERROR_IO_PENDING:
		// IO started, and we have to wait for it's completion.
	default:
		return 0, &OpError{"WSARecv", fd.net, fd.laddr, os.Errno(e)}
	}
	// Wait for our request to complete.
Wei Guangjing's avatar
Wei Guangjing committed
313
	r := waitPacket(fd, &pckt, 'r')
314 315 316 317
	if r.errno != 0 {
		err = &OpError{"WSARecv", fd.net, fd.laddr, os.Errno(r.errno)}
	}
	n = int(r.qty)
318 319 320
	if err == nil && n == 0 {
		err = os.EOF
	}
321 322 323 324
	return
}

func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) {
325 326 327 328 329 330 331 332 333 334
	if fd == nil {
		return 0, nil, os.EINVAL
	}
	if len(p) == 0 {
		return 0, nil, nil
	}
	fd.rio.Lock()
	defer fd.rio.Unlock()
	fd.incref()
	defer fd.decref()
335
	if fd.sysfd == -1 {
336 337 338 339 340
		return 0, nil, os.EINVAL
	}
	// Submit receive request.
	var pckt ioPacket
	pckt.c = fd.cr
Wei Guangjing's avatar
Wei Guangjing committed
341
	pckt.w = newWSABuf(p)
342 343 344 345
	var done uint32
	flags := uint32(0)
	var rsa syscall.RawSockaddrAny
	l := int32(unsafe.Sizeof(rsa))
Wei Guangjing's avatar
Wei Guangjing committed
346 347 348 349 350 351 352 353
	var e int
	if fd.rdeadline_delta > 0 {
		a := &arg{f: readfrom, fd: fd, pckt: &pckt, done: &done, flags: &flags, rsa: &rsa, size: &l, c: make(chan int)}
		ioChan <- a
		e = <-a.c
	} else {
		e = syscall.WSARecvFrom(uint32(fd.sysfd), pckt.w, 1, &done, &flags, &rsa, &l, &pckt.o, nil)
	}
354 355 356 357 358 359 360 361 362
	switch e {
	case 0:
		// IO completed immediately, but we need to get our completion message anyway.
	case syscall.ERROR_IO_PENDING:
		// IO started, and we have to wait for it's completion.
	default:
		return 0, nil, &OpError{"WSARecvFrom", fd.net, fd.laddr, os.Errno(e)}
	}
	// Wait for our request to complete.
Wei Guangjing's avatar
Wei Guangjing committed
363
	r := waitPacket(fd, &pckt, 'r')
364 365 366 367 368 369
	if r.errno != 0 {
		err = &OpError{"WSARecvFrom", fd.net, fd.laddr, os.Errno(r.errno)}
	}
	n = int(r.qty)
	sa, _ = rsa.Sockaddr()
	return
370 371 372 373 374 375 376 377 378 379
}

func (fd *netFD) Write(p []byte) (n int, err os.Error) {
	if fd == nil {
		return 0, os.EINVAL
	}
	fd.wio.Lock()
	defer fd.wio.Unlock()
	fd.incref()
	defer fd.decref()
380
	if fd.sysfd == -1 {
381 382 383 384 385
		return 0, os.EINVAL
	}
	// Submit send request.
	var pckt ioPacket
	pckt.c = fd.cw
Wei Guangjing's avatar
Wei Guangjing committed
386
	pckt.w = newWSABuf(p)
387
	var done uint32
Wei Guangjing's avatar
Wei Guangjing committed
388 389 390 391 392 393 394 395
	var e int
	if fd.wdeadline_delta > 0 {
		a := &arg{f: write, fd: fd, pckt: &pckt, done: &done, c: make(chan int)}
		ioChan <- a
		e = <-a.c
	} else {
		e = syscall.WSASend(uint32(fd.sysfd), pckt.w, 1, &done, uint32(0), &pckt.o, nil)
	}
396 397 398 399 400 401 402 403 404
	switch e {
	case 0:
		// IO completed immediately, but we need to get our completion message anyway.
	case syscall.ERROR_IO_PENDING:
		// IO started, and we have to wait for it's completion.
	default:
		return 0, &OpError{"WSASend", fd.net, fd.laddr, os.Errno(e)}
	}
	// Wait for our request to complete.
Wei Guangjing's avatar
Wei Guangjing committed
405
	r := waitPacket(fd, &pckt, 'w')
406 407 408 409 410 411 412 413
	if r.errno != 0 {
		err = &OpError{"WSASend", fd.net, fd.laddr, os.Errno(r.errno)}
	}
	n = int(r.qty)
	return
}

func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) {
414 415 416 417 418 419 420 421 422 423
	if fd == nil {
		return 0, os.EINVAL
	}
	if len(p) == 0 {
		return 0, nil
	}
	fd.wio.Lock()
	defer fd.wio.Unlock()
	fd.incref()
	defer fd.decref()
424
	if fd.sysfd == -1 {
425 426 427 428 429
		return 0, os.EINVAL
	}
	// Submit send request.
	var pckt ioPacket
	pckt.c = fd.cw
Wei Guangjing's avatar
Wei Guangjing committed
430
	pckt.w = newWSABuf(p)
431
	var done uint32
Wei Guangjing's avatar
Wei Guangjing committed
432 433 434 435 436 437 438 439
	var e int
	if fd.wdeadline_delta > 0 {
		a := &arg{f: writeto, fd: fd, pckt: &pckt, done: &done, sa: &sa, c: make(chan int)}
		ioChan <- a
		e = <-a.c
	} else {
		e = syscall.WSASendto(uint32(fd.sysfd), pckt.w, 1, &done, 0, sa, &pckt.o, nil)
	}
440 441 442 443 444 445 446 447 448
	switch e {
	case 0:
		// IO completed immediately, but we need to get our completion message anyway.
	case syscall.ERROR_IO_PENDING:
		// IO started, and we have to wait for it's completion.
	default:
		return 0, &OpError{"WSASendTo", fd.net, fd.laddr, os.Errno(e)}
	}
	// Wait for our request to complete.
Wei Guangjing's avatar
Wei Guangjing committed
449
	r := waitPacket(fd, &pckt, 'w')
450 451 452 453 454
	if r.errno != 0 {
		err = &OpError{"WSASendTo", fd.net, fd.laddr, os.Errno(r.errno)}
	}
	n = int(r.qty)
	return
455 456 457
}

func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
458
	if fd == nil || fd.sysfd == -1 {
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
		return nil, os.EINVAL
	}
	fd.incref()
	defer fd.decref()

	// Get new socket.
	// See ../syscall/exec.go for description of ForkLock.
	syscall.ForkLock.RLock()
	s, e := syscall.Socket(fd.family, fd.proto, 0)
	if e != 0 {
		syscall.ForkLock.RUnlock()
		return nil, os.Errno(e)
	}
	syscall.CloseOnExec(s)
	syscall.ForkLock.RUnlock()

	// Associate our new socket with IOCP.
476
	onceStartServer.Do(startServer)
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
	if _, e = syscall.CreateIoCompletionPort(int32(s), pollserver.iocp, 0, 0); e != 0 {
		return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, os.Errno(e)}
	}

	// Submit accept request.
	// Will use new unique channel here, because, unlike Read or Write,
	// Accept is expected to be executed by many goroutines simultaniously.
	var pckt ioPacket
	pckt.c = make(chan *ioResult)
	attrs, e := syscall.AcceptIOCP(fd.sysfd, s, &pckt.o)
	switch e {
	case 0:
		// IO completed immediately, but we need to get our completion message anyway.
	case syscall.ERROR_IO_PENDING:
		// IO started, and we have to wait for it's completion.
	default:
493
		closesocket(s)
494 495 496 497 498 499
		return nil, &OpError{"AcceptEx", fd.net, fd.laddr, os.Errno(e)}
	}

	// Wait for peer connection.
	r := <-pckt.c
	if r.errno != 0 {
500
		closesocket(s)
501 502 503 504 505 506
		return nil, &OpError{"AcceptEx", fd.net, fd.laddr, os.Errno(r.errno)}
	}

	// Inherit properties of the listening socket.
	e = syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, fd.sysfd)
	if e != 0 {
507
		closesocket(s)
508 509 510 511 512 513 514 515 516 517 518 519 520 521
		return nil, &OpError{"Setsockopt", fd.net, fd.laddr, os.Errno(r.errno)}
	}

	// Get local and peer addr out of AcceptEx buffer.
	lsa, rsa := syscall.GetAcceptIOCPSockaddrs(attrs)

	// Create our netFD and return it for further use.
	laddr := toAddr(lsa)
	raddr := toAddr(rsa)

	f := &netFD{
		sysfd:  s,
		family: fd.family,
		proto:  fd.proto,
Wei Guangjing's avatar
Wei Guangjing committed
522 523
		cr:     make(chan *ioResult, 1),
		cw:     make(chan *ioResult, 1),
524 525 526 527
		net:    fd.net,
		laddr:  laddr,
		raddr:  raddr,
	}
528
	runtime.SetFinalizer(f, (*netFD).Close)
529 530 531
	return f, nil
}

532 533 534 535
func closesocket(s int) (errno int) {
	return syscall.Closesocket(int32(s))
}

536 537 538 539 540 541 542
func init() {
	var d syscall.WSAData
	e := syscall.WSAStartup(uint32(0x101), &d)
	if e != 0 {
		initErr = os.NewSyscallError("WSAStartup", e)
	}
}
543 544 545 546 547

func (fd *netFD) dup() (f *os.File, err os.Error) {
	// TODO: Implement this
	return nil, os.NewSyscallError("dup", syscall.EWINDOWS)
}
548 549 550 551 552 553 554 555

func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
	return 0, 0, 0, nil, os.EAFNOSUPPORT
}

func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
	return 0, 0, os.EAFNOSUPPORT
}