connection.go 15.5 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2016-2017  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
3 4
//
// This program is free software: you can Use, Study, Modify and Redistribute
5
// it under the terms of the GNU General Public License version 3, or (at your
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
6 7
// option) any later version, as published by the Free Software Foundation.
//
8 9 10 11 12
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
13 14 15 16
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
17 18

package neo
19
// Connection management
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
20 21

import (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
22
	"context"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
23
	"errors"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
24
	"io"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
25
	"net"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27 28

	"fmt"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
29
	"lab.nexedi.com/kirr/go123/xruntime/debug"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30 31
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
32
// NodeLink is a node-node link in NEO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
33
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
34 35 36 37
// A node-node link represents bidirectional symmetrical communication
// channel in between 2 NEO nodes. The link provides service for packets
// exchange and for multiplexing several communication connections on
// top of the node-node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
38
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
39 40
// New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding
41 42
// new connection can be accepted via .Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
43
//
44 45
// For a node to be able to accept new incoming connection it has to have
// "server" role - see NewNodeLink() for details.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
46
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47
// A NodeLink has to be explicitly closed, once it is no longer needed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
48
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
49
// It is safe to use NodeLink from multiple goroutines simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
50
type NodeLink struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51
	peerLink net.Conn		// raw conn to peer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
52

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53 54
	//connMu     sync.Mutex	// TODO -> RW ?
	connMu     debug.Mutex	// TODO -> RW ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
55
	connTab    map[uint32]*Conn	// connId -> Conn associated with connId
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
56 57
	nextConnId uint32		// next connId to use for Conn initiated by us

58
	serveWg sync.WaitGroup		// for serve{Send,Recv}
59 60
	acceptq chan *Conn		// queue of incoming connections for Accept
					// = nil if NodeLink is not accepting connections
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
61

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62 63 64 65 66 67 68 69 70 71
	txq	chan txReq		// tx requests from Conns go via here

//	errMu   sync.Mutex	-> use connMu
//	sendErr error			// error got from sendPkt, if any
	recvErr	error			// error got from recvPkt, if any

	// once because: NodeLink has to be explicitly closed by user; it can also
	// be "closed" by IO errors on peerLink
	closeOnce sync.Once
	closed    chan struct{}		// XXX text
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
72 73
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
74 75 76 77 78
// Conn is a connection established over NodeLink
//
// Data can be sent and received over it.
// Once connection is no longer needed it has to be closed.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
79
// It is safe to use Conn from multiple goroutines simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80
type Conn struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
81
	nodeLink  *NodeLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82
	connId    uint32
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
83 84
	rxq	  chan *PktBuf	// received packets for this Conn go here
	txerr     chan error	// transmit errors for this Conn go back here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
85

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
86 87
	// once because: Conn has to be explicitly closed by user; it can also
	// be closed by NodeLink.Close .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
88
	closeOnce sync.Once
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
89
	closed    chan struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
90 91
}

92
// LinkRole is a role an end of NodeLink is intended to play
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93
type LinkRole int
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
94
const (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
	LinkServer LinkRole = iota	// link created as server
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
96
	LinkClient			// link created as client
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
98
	// for testing:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
99 100
	linkNoRecvSend LinkRole = 1<<16	// do not spawn serveRecv & serveSend
	linkFlagsMask  LinkRole = (1<<32 - 1) << 16
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
101 102
)

103
// NewNodeLink makes a new NodeLink from already established net.Conn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
104
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
105
// Role specifies how to treat our role on the link - either as client or
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
106
// server. The difference in between client and server roles are in:
107 108 109 110 111 112
//
// 1. how connection ids are allocated for connections initiated at our side:
//    there is no conflict in identifiers if one side always allocates them as
//    even (server) and its peer as odd (client).
//
// 2. NodeLink.Accept() works only on server side.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
113 114 115
//
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
116
func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
117
	var nextConnId uint32
118
	var acceptq chan *Conn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
119 120
	switch role&^linkFlagsMask {
	case LinkServer:
121 122
		nextConnId = 0			// all initiated by us connId will be even
		acceptq = make(chan *Conn)	// accept queue; TODO use backlog
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
123
	case LinkClient:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
124
		nextConnId = 1	// ----//---- odd
125
		acceptq = nil	// not accepting incoming connections
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
126 127 128 129
	default:
		panic("invalid conn role")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130
	nl := &NodeLink{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
131 132 133
		peerLink:   conn,
		connTab:    map[uint32]*Conn{},
		nextConnId: nextConnId,
134
		acceptq:    acceptq,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
135
		txq:        make(chan txReq),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
136
		closed:     make(chan struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
138
	if role&linkNoRecvSend == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
139 140 141 142
		nl.serveWg.Add(2)
		go nl.serveRecv()
		go nl.serveSend()
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143
	return nl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
144 145
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
146 147 148 149 150 151 152 153 154 155 156 157 158
// worker for Close & friends. Must be called with connMu held.
// marks all active Conns and NodeLink itself as closed
func (nl *NodeLink) close() {
	nl.closeOnce.Do(func() {
		for _, conn := range nl.connTab {
			conn.close()	// XXX explicitly pass error here ?
		}
		nl.connTab = nil	// clear + mark closed

		close(nl.closed)
	})
}

159
// Close closes node-node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
160 161
// IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
162 163
	nl.connMu.Lock()
	defer nl.connMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
164 165

	nl.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
166 167 168 169 170 171 172 173

	// close actual link to peer
	// this will wakeup serve{Send,Recv}
	err := nl.peerLink.Close()

	// wait for serve{Send,Recv} to complete
	nl.serveWg.Wait()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
174
	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
175 176
}

177
// sendPkt sends raw packet to peer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
178
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179 180 181 182
	if true {
		// XXX -> log
		fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
183 184 185
	// XXX if nl is closed peerLink will return "io on closed xxx" but
	// maybe better to check explicitly and return ErrClosedLink

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186
	_, err := nl.peerLink.Write(pkt.Data)	// FIXME write Data in full
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
187
	//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
188
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
189
		// XXX do we need to retry if err is temporary?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
190 191 192 193 194 195
		// TODO data could be written partially and thus the message stream is now broken
		// -> close connection / whole NodeLink ?
	}
	return err
}

196
// recvPkt receives raw packet from peer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
197
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
198 199 200
	// XXX if nl is closed peerLink will return "io on closed xxx" but
	// maybe better to check explicitly and return ErrClosedLink

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
201
	// TODO organize rx buffers management (freelist etc)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
202
	// TODO cleanup lots of ntoh32(...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
203 204
	// XXX do we need to retry if err is temporary?
	// TODO on error framing is broken -> close connection / whole NodeLink ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
205 206

	// first read to read pkt header and hopefully up to page of data in 1 syscall
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
207
	pkt := &PktBuf{make([]byte, 4096)}
208
	// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
209 210
	//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
	n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
211
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
212
		return nil, err	// XXX err adjust ? -> (?) framing error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
213 214
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
215
	pkth := pkt.Header()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
216

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
217
	// XXX -> better PktHeader.Decode() ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
218
	if ntoh32(pkth.Len) < PktHeadLen {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
219 220
		// TODO framing error -> nl.CloseWithError(err)
		panic("TODO pkt.Len < PktHeadLen")	// length is a whole packet len with header
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
221
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
222
	if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
223 224
		// TODO framing error -> nl.CloseWithError(err)
		panic("TODO message too big")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
225
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
226

227
	//pkt.Data = xbytes.Resize32(pkt.Data, ntoh32(pkth.Len))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
228
	if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
229
		// grow rxbuf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
230
		rxbuf2 := make([]byte, ntoh32(pkth.Len))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
231 232
		copy(rxbuf2, pkt.Data[:n])
		pkt.Data = rxbuf2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
233
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
234 235
	// cut .Data len to length of packet
	pkt.Data = pkt.Data[:ntoh32(pkth.Len)]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
236

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237
	// read rest of pkt data, if we need to
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
238 239 240
	if n < len(pkt.Data) {
		_, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
241
			return nil, err	// XXX err adjust ? -> (?) framing error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
242
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
244

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245 246 247 248 249
	if true {
		// XXX -> log
		fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
250
	return pkt, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
251 252
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253 254 255

// worker for NewConn() & friends. Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
256
	c := &Conn{nodeLink: nl,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
257
		connId: connId,
258
		rxq: make(chan *PktBuf),    // TODO buffering
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
259
		txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
260 261
		closed: make(chan struct{}),
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
262 263 264 265
	nl.connTab[connId] = c
	return c
}

266
// NewConn creates new connection on top of node-node link
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
267
func (nl *NodeLink) NewConn() *Conn {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
268 269
	nl.connMu.Lock()
	defer nl.connMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
270 271 272
	if nl.connTab == nil {
		panic("NewConn() on closed node-link")
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
273
	c := nl.newConn(nl.nextConnId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
274
	nl.nextConnId += 2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
275
	return c
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
276 277
}

278
// ErrLinkClosed is the error indicated for operations on closed NodeLink
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
var ErrLinkClosed   = errors.New("node link closed")
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")

// Accept waits for and accepts incoming connection on top of node-node link
func (nl *NodeLink) Accept() (*Conn, error) {
	// this node link is not accepting connections
	if nl.acceptq == nil {
		return nil, ErrLinkNoListen
	}

	select {
	case <-nl.closed:
		return nil, ErrLinkClosed // XXX + op = Accept ?

	case c := <-nl.acceptq:
		return c, nil
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
298

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299
// serveRecv handles incoming packets routing them to either appropriate
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
300
// already-established connection or to new handling goroutine.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
301
func (nl *NodeLink) serveRecv() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
302
	defer nl.serveWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
303 304
	for {
		// receive 1 packet
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
305
		pkt, err := nl.recvPkt()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
306
		fmt.Printf("recvPkt -> %v, %v\n", pkt, err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
307
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
308 309 310
			// on IO error framing over peerLink becomes broken
			// so we are marking node link and all connections as closed

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
311 312
			select {
			case <-nl.closed:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
313
				// error due to closing NodeLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
314
				err = ErrLinkClosed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
315
			default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
316
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
317

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
318 319 320 321 322 323 324
			println("\tzzz")
			nl.connMu.Lock()
			println("\tzzz 2")
			defer nl.connMu.Unlock()

			nl.recvErr = err

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
325 326 327 328 329
			println("\trrr")
			// wake-up all conns & mark node link as closed
			nl.close()
			println("\tsss")
			return
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
330 331
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
332 333 334
		// pkt.ConnId -> Conn
		connId := ntoh32(pkt.Header().ConnId)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
335
		accept := false
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
336 337
		nl.connMu.Lock()
		conn := nl.connTab[connId]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
338
		if conn == nil {
339
			if nl.acceptq != nil {
340
				// we are accepting new incoming connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341
				conn = nl.newConn(connId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
342
				accept = true
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
343
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
344 345 346 347
		}
		nl.connMu.Unlock()

		// we have not accepted incoming connection - ignore packet
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
348
		if conn == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
349 350 351
			// XXX also log?
			continue
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
352

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
353 354 355 356 357 358
		if accept {
			// XXX what if Accept exited because of just recently close(nl.closed)?
			//     -> check nl.closed here too ?
			nl.acceptq <- conn
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
359
		// route packet to serving goroutine handler
360
		// XXX what if Conn.Recv exited because of just recently close(nl.closed) ?
361
		//     -> check nl.closed here too ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
362 363 364 365
		conn.rxq <- pkt
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
366

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
367
// txReq is request to transmit a packet. Result error goes back to errch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
368
type txReq struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
369
	pkt   *PktBuf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
370 371 372 373 374 375
	errch chan error
}

// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
376
	defer nl.serveWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
377 378
	var err error

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
379 380 381
	for {
		select {
		case <-nl.closed:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382 383 384 385
			return

		case txreq := <-nl.txq:
			err = nl.sendPkt(txreq.pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
386 387

			if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
388 389 390 391 392 393 394 395
				// on IO error framing over peerLink becomes broken
				// so we are marking node link and all connections as closed
				select {
				case <-nl.closed:
					// error due to closing NodeLink
					err = ErrLinkClosed
				default:
				}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
396
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
397

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
398
			txreq.errch <- err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
399 400 401 402 403 404 405 406 407 408

			if err != nil {
				nl.connMu.Lock()
				defer nl.connMu.Unlock()

//				nl.sendErr = err

				// wake-up all conns & mark node link as closed
				nl.close()
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
409 410

			return
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
411 412 413 414
		}
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
415

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
416 417 418
// ErrClosedConn is the error indicated for read/write operations on closed Conn
var ErrClosedConn = errors.New("read/write on closed connection")

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
419 420 421 422 423 424 425
func errClosedConn(err error) error {
	if err != nil {
		return err
	}
	return ErrClosedConn
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
426 427
// Send packet via connection
func (c *Conn) Send(pkt *PktBuf) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
428
	// set pkt connId associated with this connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
429
	pkt.Header().ConnId = hton32(c.connId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
430
	var err error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
432 433 434
	select {
	case <-c.closed:
		return ErrClosedConn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
435
//		return errClosedConn(c.nodeLink.sendErr)	// XXX locking ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
437
	case c.nodeLink.txq <- txReq{pkt, c.txerr}:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
438 439 440
		select {
		// tx request was sent to serveSend and is being transmitted on the wire.
		// the transmission may block for indefinitely long though and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
441
		// we cannot interrupt it as the only way to interrupt is
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
442 443 444 445 446
		// .nodeLink.Close() which will close all other Conns.
		//
		// That's why we are also checking for c.closed while waiting
		// for reply from serveSend (and leave pkt to finish transmitting).
		//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
447 448
		// NOTE after we return straight here serveSend won't be later
		// blocked on c.txerr<- because that backchannel is a non-blocking one.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
449
		case <-c.closed:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
450
			// XXX also poll c.txerr
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451
			return ErrClosedConn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452
//			return errClosedConn(c.nodeLink.sendErr)	// XXX locking ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
453 454

		case err = <-c.txerr:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
455 456
			//fmt.Printf("%v <- c.txerr\n", err)
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
457 458 459 460
		}
	}

	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
461
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
462

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
463 464
// Receive packet from connection
func (c *Conn) Recv() (*PktBuf, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
465 466
	select {
	case <-c.closed:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
467 468 469
		// XXX get err from c.nodeLink.recvErr
		// XXX if nil -> ErrClosedConn ?
		return nil, ErrClosedConn	// XXX -> EOF ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
470

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
471
	case pkt := <-c.rxq:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
472
		return pkt, nil	// XXX error = ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
473 474
	}
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
475

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
476 477
// worker for Close() & co
func (c *Conn) close() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
478
	c.closeOnce.Do(func() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
479
		close(c.closed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
480
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
481 482
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
483
// Close closes connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
484
// Any blocked Send() or Recv() will be unblocked and return error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
485 486 487
//
// NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break framing.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
488
func (c *Conn) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
489 490 491 492 493
	// adjust nodeLink.connTab
	c.nodeLink.connMu.Lock()
	delete(c.nodeLink.connTab, c.connId)
	c.nodeLink.connMu.Unlock()
	c.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
494
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
495 496 497
}


498
// for convenience: Dial/Listen
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
499

500
// Dial connects to address on named network and wrap the connection as NodeLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
// TODO +tls.Config
func Dial(ctx context.Context, network, address string) (*NodeLink, error) {
	d := net.Dialer{}
	peerConn, err := d.DialContext(ctx, network, address)
	if err != nil {
		return nil, err
	}
	return NewNodeLink(peerConn, LinkClient), nil
}

// like net.Listener but Accept returns net.Conn wrapped in NodeLink
type Listener struct {
	net.Listener
}

func (l *Listener) Accept() (*NodeLink, error) {
	peerConn, err := l.Listener.Accept()
	if err != nil {
		return nil, err
	}
	return NewNodeLink(peerConn, LinkServer), nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524 525
// TODO +tls.Config
// TODO +ctx		-> no as .Close() will interrupt all .Accept()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
526 527 528 529 530 531 532
func Listen(network, laddr string) (*Listener, error) {
	l, err := net.Listen(network, laddr)
	if err != nil {
		return nil, err
	}
	return &Listener{l}, nil
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
533

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
534

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558


// ----------------------------------------



// XXX ^^^ original description about notify/ask/answer
// All packets are classified to be of one of the following kind:
// - notify:	a packet is sent without expecting any reply
// - ask:	a packet is sent and reply is expected
// - answer:	a packet replying to previous ask
//
// At any time there can be several Asks packets issued by both nodes.
// For an Ask packet a single Answer reply is expected		XXX vs protocol where there is one request and list of replies ?
//
// XXX -> multiple subconnection explicitly closed with ability to chat
// multiple packets without spawning goroutines? And only single answer
// expected implemented that after only ask-send / answer-receive the
// (sub-)connection is explicitly closed ?
//
// XXX it is maybe better to try to avoid multiplexing by hand and let the OS do it?
//
// A reply to particular Ask packet, once received, will be delivered to
// corresponding goroutine which originally issued Ask	XXX this can be put into interface