// Copyright (C) 2016  Nexedi SA and Contributors.
//                     Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.

// NEO | Connection management

package neo

import (
	"errors"
	"io"
	"net"
	"sync"
	"unsafe"

	//"fmt"
)

// NodeLink is a node-node link in NEO
//
// A node-node link represents bidirectional symmetrical communication
// channel in between 2 NEO nodes. The link provides service for packets
// exchange and for multiplexing several communication connections on
// top of the node-node link.
//
// New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding
// new connection will be created - accepting first packet "request" - and all
// further communication send/receive exchange will be happening in between
// those 2 connections.
//
// For a node to be able to accept new incoming connection it has to register
// corresponding handler with .HandleNewConn() . Without such handler
// registered the node will be able to only initiate new connections, not
// accept new ones from its peer.
//
// A NodeLink has to be explicitly closed, once it is no longer needed.
//
// It is safe to use NodeLink from multiple goroutines simultaneously.
type NodeLink struct {
	peerLink net.Conn		// raw conn to peer

	connMu   sync.Mutex	// TODO -> RW ?
	connTab  map[uint32]*Conn	// connId -> Conn associated with connId
	nextConnId uint32		// next connId to use for Conn initiated by us

	serveWg sync.WaitGroup

	handleNewConn func(conn *Conn)	// handler for new connections

	txreq	chan txReq		// tx requests from Conns go via here
	closed  chan struct{}
}

// Conn is a connection established over NodeLink
//
// Data can be sent and received over it.
// Once connection is no longer needed it has to be closed.
//
// It is safe to use Conn from multiple goroutines simultaneously.
type Conn struct {
	nodeLink  *NodeLink
	connId    uint32
	rxq	  chan *PktBuf	// received packets for this Conn go here
	txerr     chan error	// transmit errors for this Conn go back here

	// Conn has to be explicitly closed by user; it can also be closed by NodeLink.Close
	closeOnce sync.Once
	closed    chan struct{}
}

// Buffer with packet data
// XXX move me out of here
type PktBuf struct {
	Data	[]byte	// whole packet data including all headers	XXX -> Buf ?
}

// Get pointer to packet header
func (pkt *PktBuf) Header() *PktHead {
	// XXX check len(Data) < PktHead ? -> no, Data has to be allocated with cap >= PktHeadLen
	return (*PktHead)(unsafe.Pointer(&pkt.Data[0]))
}

// Get packet payload
func (pkt *PktBuf) Payload() []byte {
	return pkt.Data[PktHeadLen:]
}


type ConnRole int
const (
	ConnServer ConnRole = iota	// connection created as server
	ConnClient			// connection created as client

	// for testing:
	connNoRecvSend ConnRole = 1<<16	// do not spawn serveRecv & serveSend
	connFlagsMask  ConnRole = (1<<32 - 1) << 16
)

// Make a new NodeLink from already established net.Conn
//
// role specifies how to treat conn - either as server or client one.
// The difference in between client and server roles are in connid % 2		XXX text
//
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink {
	var nextConnId uint32
	switch role&^connFlagsMask {
	case ConnServer:
		nextConnId = 0	// all initiated by us connId will be even
	case ConnClient:
		nextConnId = 1	// ----//---- odd
	default:
		panic("invalid conn role")
	}

	nl := NodeLink{
		peerLink:   conn,
		connTab:    map[uint32]*Conn{},
		nextConnId: nextConnId,
		txreq:      make(chan txReq),
		closed:     make(chan struct{}),
	}
	if role&connNoRecvSend == 0 {
		nl.serveWg.Add(2)
		go nl.serveRecv()
		go nl.serveSend()
	}
	return &nl
}

// Close node-node link.
// IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error {
	// mark all active Conns as closed
	nl.connMu.Lock()
	defer nl.connMu.Unlock()
	for _, conn := range nl.connTab {
		conn.close()
	}
	nl.connTab = nil	// clear + mark closed

	// close actual link to peer
	// this will wakeup serve{Send,Recv}
	close(nl.closed)
	err := nl.peerLink.Close()

	// wait for serve{Send,Recv} to complete
	//fmt.Printf("%p serveWg.Wait ...\n", nl)
	nl.serveWg.Wait()
	//fmt.Printf("%p\t (wait) -> woken up\n", nl)

	return err
}

// send raw packet to peer
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
	_, err := nl.peerLink.Write(pkt.Data)	// FIXME write Data in full
	if err != nil {
		// XXX do we need to retry if err is temporary?
		// TODO data could be written partially and thus the message stream is now broken
		// -> close connection / whole NodeLink ?
	}
	return err
}

// receive raw packet from peer
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
	// TODO organize rx buffers management (freelist etc)
	// TODO cleanup lots of ntoh32(...)
	// XXX do we need to retry if err is temporary?
	// TODO on error framing is broken -> close connection / whole NodeLink ?

	// first read to read pkt header and hopefully up to page of data in 1 syscall
	pkt := &PktBuf{make([]byte, 4096)}
	n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
	if err != nil {
		return nil, err	// XXX err adjust ?
	}

	pkth := pkt.Header()

	// XXX -> better PktHeader.Decode() ?
	if ntoh32(pkth.Len) < PktHeadLen {
		panic("TODO pkt.Len < PktHeadLen")	// XXX err	(length is a whole packet len with header)
	}
	if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
		panic("TODO message too big")	// XXX err
	}

	if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
		// grow rxbuf
		rxbuf2 := make([]byte, ntoh32(pkth.Len))
		copy(rxbuf2, pkt.Data[:n])
		pkt.Data = rxbuf2
	}
	// cut .Data len to length of packet
	pkt.Data = pkt.Data[:ntoh32(pkth.Len)]

	// read rest of pkt data, if we need to
	if n < len(pkt.Data) {
		_, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
		if err != nil {
			return nil, err	// XXX err adjust ?
		}
	}

	return pkt, nil
}


// worker for NewConn() & friends. Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
	c := &Conn{nodeLink: nl,
		connId: connId,
		rxq: make(chan *PktBuf),
		txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
		closed: make(chan struct{}),
	}
	nl.connTab[connId] = c
	return c
}

// Create a connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn {
	nl.connMu.Lock()
	defer nl.connMu.Unlock()
	if nl.connTab == nil {
		panic("NewConn() on closed node-link")
	}
	c := nl.newConn(nl.nextConnId)
	nl.nextConnId += 2
	return c
}


// serveRecv handles incoming packets routing them to either appropriate
// already-established connection or to new serving goroutine.
func (nl *NodeLink) serveRecv() {
	defer nl.serveWg.Done()
	for {
		// receive 1 packet
		//println(nl, "serveRecv -> recv...")
		pkt, err := nl.recvPkt()
		//fmt.Printf("%p\t (recv) -> %v\n", nl, err)
		if err != nil {
			// this might be just error on close - simply stop in such case
			select {
			case <-nl.closed:
				// XXX check err actually what is on interrupt?
				return
			}
			panic(err)	// XXX err
		}

		// pkt.ConnId -> Conn
		connId := ntoh32(pkt.Header().ConnId)
		var handleNewConn func(conn *Conn)

		nl.connMu.Lock()
		conn := nl.connTab[connId]
		if conn == nil {
			handleNewConn = nl.handleNewConn
			if handleNewConn != nil {
				conn = nl.newConn(connId)
			}
		}
		nl.connMu.Unlock()

		// we have not accepted incoming connection - ignore packet
		if conn == nil {
			// XXX also log?
			continue
		}

		// we are accepting new incoming connection - spawn
		// connection-serving goroutine
		if handleNewConn != nil {
			// TODO avoid spawning goroutine for each new Ask request -
			//	- by keeping pool of read inactive goroutine / conn pool ?
			go handleNewConn(conn)
		}

		// route packet to serving goroutine handler
		conn.rxq <- pkt
	}
}


// request to transmit a packet. Result error goes back to errch
type txReq struct {
	pkt *PktBuf
	errch chan error
}

// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
	defer nl.serveWg.Done()
runloop:
	for {
		//fmt.Printf("%p serveSend -> select ...\n", nl)
		select {
		case <-nl.closed:
			//fmt.Printf("%p\t (send) -> closed\n", nl)
			break runloop

		case txreq := <-nl.txreq:
			//fmt.Printf("%p\t (send) -> txreq\n", nl)
			err := nl.sendPkt(txreq.pkt)
			//fmt.Printf("%p\t (send) -> err: %v\n", nl, err)
			if err != nil {
				// XXX also close whole nodeLink since tx framing now can be broken?
			}
			txreq.errch <- err
		}
	}

	//fmt.Printf("%p\t (send) -> exit\n", nl)
}

// XXX move to NodeLink ctor ?
// Set handler for new incoming connections
func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
	nl.connMu.Lock()
	defer nl.connMu.Unlock()
	nl.handleNewConn = h	// NOTE can change handler at runtime XXX do we need this?
}


// ErrClosedConn is the error indicated for read/write operations on closed Conn
var ErrClosedConn = errors.New("read/write on closed connection")

// Send packet via connection
func (c *Conn) Send(pkt *PktBuf) error {
	// set pkt connId associated with this connection
	pkt.Header().ConnId = hton32(c.connId)
	var err error

	select {
	case <-c.closed:
		return ErrClosedConn

	case c.nodeLink.txreq <- txReq{pkt, c.txerr}:
		select {
		// tx request was sent to serveSend and is being transmitted on the wire.
		// the transmission may block for indefinitely long though and
		// we cannot interrupt it as the only way to interrupt is
		// .nodeLink.Close() which will close all other Conns.
		//
		// That's why we are also checking for c.closed while waiting
		// for reply from serveSend (and leave pkt to finish transmitting).
		//
		// NOTE after we return straight here serveSend won't be blocked on
		// c.txerr<- because that backchannel is a non-blocking one.
		case <-c.closed:
			return ErrClosedConn

		case err = <-c.txerr:
		}
	}

	// if we got transmission error chances are it was due to underlying NodeLink
	// being closed. If our Conn was also requested to be closed adjust err
	// to ErrClosedConn along the way.
	//
	// ( reaching here is theoretically possible if both c.closed and
	//   c.txerr are ready above )
	if err != nil {
		select {
		case <-c.closed:
			err = ErrClosedConn
		default:
		}
	}

	return err
}

// Receive packet from connection
func (c *Conn) Recv() (*PktBuf, error) {
	select {
	case <-c.closed:
		// XXX closed c.rxq might be just indicator for this
		return nil, ErrClosedConn

	case pkt, ok := <-c.rxq:
		if !ok {			// see ^^^
			return nil, io.EOF	// XXX check erroring & other errors?
		}
		return pkt, nil
	}
}

// worker for Close() & co
func (c *Conn) close() {
	c.closeOnce.Do(func() {
		//fmt.Printf("%p Conn.close\n", c)
		close(c.closed)	// XXX better just close c.rxq + ??? for tx
	})
}

// Close connection
// Any blocked Send() or Recv() will be unblocked and return error
// XXX Send() - if started - will first complete (not to break framing)
func (c *Conn) Close() error {	// XXX do we need error here?
	// adjust nodeLink.connTab
	c.nodeLink.connMu.Lock()
	delete(c.nodeLink.connTab, c.connId)
	c.nodeLink.connMu.Unlock()
	c.close()
	return nil
}



// TODO
//func Dial(ctx context.Context, network, address string) (*NodeLink, error)	// + tls.Config
//func Listen(network, laddr string) (net.Listener, error)	// + tls.Config
//	ln.Accept -> will return net.Conn wrapped in NodeLink



// ----------------------------------------



// XXX ^^^ original description about notify/ask/answer
// All packets are classified to be of one of the following kind:
// - notify:	a packet is sent without expecting any reply
// - ask:	a packet is sent and reply is expected
// - answer:	a packet replying to previous ask
//
// At any time there can be several Asks packets issued by both nodes.
// For an Ask packet a single Answer reply is expected		XXX vs protocol where there is one request and list of replies ?
//
// XXX -> multiple subconnection explicitly closed with ability to chat
// multiple packets without spawning goroutines? And only single answer
// expected implemented that after only ask-send / answer-receive the
// (sub-)connection is explicitly closed ?
//
// XXX it is maybe better to try to avoid multiplexing by hand and let the OS do it?
//
// A reply to particular Ask packet, once received, will be delivered to
// corresponding goroutine which originally issued Ask	XXX this can be put into interface



// // Send notify packet to peer
// func (c *NodeLink) Notify(pkt XXX) error {
// 	// TODO
// }
//
// // Send packet and wait for replied answer packet
// func (c *NodeLink) Ask(pkt XXX) (answer Pkt, err error) {
// 	// TODO
// }