Commit 099bfc29 authored by Kirill Smelkov's avatar Kirill Smelkov

X freelist for PktBuf

before:

BenchmarkLinkNetPipeRTT-4         200000              9763 ns/op            9664 B/op         25 allocs/op
BenchmarkLinkTCPRTT-4             100000             19069 ns/op            9664 B/op         25 allocs/op

after:

BenchmarkLinkNetPipeRTT-4         200000              7872 ns/op            1224 B/op         17 allocs/op
BenchmarkLinkTCPRTT-4             100000             18465 ns/op            1221 B/op         17 allocs/op
parent 66c8e78f
......@@ -188,8 +188,8 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{link: nl,
connId: connId,
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}),
rxdown: make(chan struct{}),
}
......@@ -667,7 +667,9 @@ func (c *Conn) errSendShutdown() error {
}
}
// sendPkt sends raw packet via connection
// sendPkt sends raw packet via connection.
//
// on success pkt is freed.
func (c *Conn) sendPkt(pkt *PktBuf) error {
err := c.sendPkt2(pkt)
return c.err("send", err)
......@@ -746,8 +748,11 @@ func (nl *NodeLink) serveSend() {
const dumpio = false
// sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend
// sendPkt sends raw packet to peer.
//
// tx error, if any, is returned as is and is analyzed in serveSend.
//
// XXX pkt should be freed always or only on error?
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if dumpio {
// XXX -> log
......@@ -757,18 +762,20 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
// NOTE Write writes data in full, or it is error
_, err := nl.peerLink.Write(pkt.Data)
pkt.Free()
return err
}
var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer
// recvPkt receives raw packet from peer.
//
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc)
// first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)}
pkt := pktAlloc(4096)
// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, pktHeaderLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:pktHeaderLen])
......@@ -784,15 +791,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
return nil, ErrPktTooBig
}
// XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], pktLen)
if pktLen > uint32(cap(pkt.Data)) {
// grow rxbuf
rxbuf2 := make([]byte, pktLen)
copy(rxbuf2, pkt.Data[:n])
pkt.Data = rxbuf2
}
// cut .Data len to length of packet
pkt.Data = pkt.Data[:pktLen]
pkt.Resize(int(pktLen))
// read rest of pkt data, if we need to
if n < len(pkt.Data) {
......@@ -1129,11 +1128,11 @@ func (c *Conn) err(op string, e error) error {
// Recv receives message
// it receives packet and decodes message from it
func (c *Conn) Recv() (Msg, error) {
// TODO use freelist for PktBuf
pkt, err := c.recvPkt()
if err != nil {
return nil, err
}
defer pkt.Free()
// decode packet
pkth := pkt.Header()
......@@ -1162,7 +1161,7 @@ func (c *Conn) Send(msg Msg) error {
traceConnSendPre(c, msg)
l := msg.neoMsgEncodedLen()
buf := PktBuf{make([]byte, pktHeaderLen+l)} // TODO -> freelist
buf := pktAlloc(pktHeaderLen+l)
h := buf.Header()
// h.ConnId will be set by conn.Send
......@@ -1171,9 +1170,8 @@ func (c *Conn) Send(msg Msg) error {
msg.neoMsgEncode(buf.Payload())
// XXX why pointer?
// XXX more context in err? (msg type)
return c.sendPkt(&buf)
return c.sendPkt(buf)
}
......@@ -1185,11 +1183,11 @@ func (c *Conn) Send(msg Msg) error {
// on error (-1, err) is returned
func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
// XXX a bit dup wrt Recv
// TODO use freelist for PktBuf
pkt, err := c.recvPkt()
if err != nil {
return -1, err
}
defer pkt.Free()
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
......
......@@ -23,29 +23,60 @@ package neo
import (
"fmt"
"reflect"
"sync"
"unsafe"
)
// TODO organize rx buffers management (freelist etc)
"lab.nexedi.com/kirr/go123/xbytes"
)
// PktBuf is a buffer with full raw packet (header + data)
// PktBuf is a buffer with full raw packet (header + data).
//
// variables of type PktBuf are usually named "pkb" (packet buffer), similar to "skb" in Linux
// variables of type PktBuf are usually named "pkb" (packet buffer), similar to "skb" in Linux.
//
// Allocate PktBuf via allocPkt() and free via PktBuf.Free().
type PktBuf struct {
Data []byte // whole packet data including all headers XXX -> Buf ?
Data []byte // whole packet data including all headers
}
// Header returns pointer to packet header
// Header returns pointer to packet header.
func (pkt *PktBuf) Header() *PktHeader {
// XXX check len(Data) < PktHeader ? -> no, Data has to be allocated with cap >= pktHeaderLen
return (*PktHeader)(unsafe.Pointer(&pkt.Data[0]))
}
// Payload returns []byte representing packet payload
// Payload returns []byte representing packet payload.
func (pkt *PktBuf) Payload() []byte {
return pkt.Data[pktHeaderLen:]
}
// Resize resizes pkt to be of length n.
//
// semantic = xbytes.Resize.
func (pkt *PktBuf) Resize(n int) {
pkt.Data = xbytes.Resize(pkt.Data, n)
}
// ---- PktBuf freelist ----
// pktBufPool is sync.Pool<pktBuf>
var pktBufPool = sync.Pool{New: func() interface{} {
return &PktBuf{Data: make([]byte, 0, 4096)}
}}
// pktAlloc allocates PktBuf with len=n
func pktAlloc(n int) *PktBuf {
pkt := pktBufPool.Get().(*PktBuf)
pkt.Data = xbytes.Realloc(pkt.Data, n)
return pkt
}
// Free marks pkt as no longer needed.
func (pkt *PktBuf) Free() {
pktBufPool.Put(pkt)
}
// ---- PktBuf dump ----
// Strings dumps a packet in human-readable form
func (pkt *PktBuf) String() string {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment