Commit 4324c812 authored by Kirill Smelkov's avatar Kirill Smelkov

X restore all Conn functionality

before:

null:00   ; oid=0..16995  nread=68269354  t=481.582632ms (28.335µs / object)  x=zsha1.go
null:00   ; oid=0..16995  nread=68269354  t=473.499859ms (27.859µs / object)  x=zsha1.go
null:00   ; oid=0..16995  nread=68269354  t=471.996668ms (27.771µs / object)  x=zsha1.go
null:00   ; oid=0..16995  nread=68269354  t=478.029272ms (28.125µs / object)  x=zsha1.go

after:

null:00   ; oid=0..16995  nread=68269354  t=709.761334ms (41.76µs / object)  x=zsha1.go
null:00   ; oid=0..16995  nread=68269354  t=704.768088ms (41.466µs / object)  x=zsha1.go
null:00   ; oid=0..16995  nread=68269354  t=720.756186ms (42.407µs / object)  x=zsha1.go
null:00   ; oid=0..16995  nread=68269354  t=693.688744ms (40.814µs / object)  x=zsha1.go

now we'll be teaching Recv1 & friends to do things in optimized way but Conn
functionality must stay working.
parent 47cfe4a7
...@@ -65,8 +65,8 @@ type NodeLink struct { ...@@ -65,8 +65,8 @@ type NodeLink struct {
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
acceptq chan *Conn // queue of incoming connections for Accept acceptq chan *Conn // queue of incoming connections for Accept
// txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
// // (rx packets are routed to Conn.rxq) // (rx packets are routed to Conn.rxq)
axdown chan struct{} // ready when accept is marked as no longer operational axdown chan struct{} // ready when accept is marked as no longer operational
axdown1 sync.Once // CloseAccept may be called severall times axdown1 sync.Once // CloseAccept may be called severall times
...@@ -95,16 +95,16 @@ type Conn struct { ...@@ -95,16 +95,16 @@ type Conn struct {
link *NodeLink link *NodeLink
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
// txerr chan error // transmit results for this Conn go back here txerr chan error // transmit results for this Conn go back here
// txdown chan struct{} // ready when Conn TX is marked as no longer operational txdown chan struct{} // ready when Conn TX is marked as no longer operational
rxdown chan struct{} // ----//---- RX rxdown chan struct{} // ----//---- RX
// txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
rxdownOnce sync.Once // ----//---- rxdownOnce sync.Once // ----//----
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
rxclosed int32 // whether CloseRecv was called rxclosed int32 // whether CloseRecv was called
// txclosed int32 // whether CloseSend was called txclosed int32 // whether CloseSend was called
errMsg *Error // error message for peer if rx is down errMsg *Error // error message for peer if rx is down
...@@ -176,15 +176,14 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -176,15 +176,14 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
connTab: map[uint32]*Conn{}, connTab: map[uint32]*Conn{},
nextConnId: nextConnId, nextConnId: nextConnId,
acceptq: make(chan *Conn), // XXX +buf acceptq: make(chan *Conn), // XXX +buf
//txq: make(chan txReq), txq: make(chan txReq),
axdown: make(chan struct{}), axdown: make(chan struct{}),
down: make(chan struct{}), down: make(chan struct{}),
} }
if role&linkNoRecvSend == 0 { if role&linkNoRecvSend == 0 {
//nl.serveWg.Add(2) nl.serveWg.Add(2)
nl.serveWg.Add(1)
go nl.serveRecv() go nl.serveRecv()
//go nl.serveSend() go nl.serveSend()
} }
return nl return nl
} }
...@@ -195,8 +194,8 @@ func (nl *NodeLink) newConn(connId uint32) *Conn { ...@@ -195,8 +194,8 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{link: nl, c := &Conn{link: nl,
connId: connId, connId: connId,
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
//txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
//txdown: make(chan struct{}), txdown: make(chan struct{}),
rxdown: make(chan struct{}), rxdown: make(chan struct{}),
} }
nl.connTab[connId] = c nl.connTab[connId] = c
...@@ -328,9 +327,9 @@ func (c *Conn) shutdown() { ...@@ -328,9 +327,9 @@ func (c *Conn) shutdown() {
} }
func (c *Conn) shutdownTX() { func (c *Conn) shutdownTX() {
//c.txdownOnce.Do(func() { c.txdownOnce.Do(func() {
// close(c.txdown) close(c.txdown)
//}) })
} }
// shutdownRX marks .rxq as no loner operational // shutdownRX marks .rxq as no loner operational
...@@ -389,7 +388,7 @@ func (c *Conn) Close() error { ...@@ -389,7 +388,7 @@ func (c *Conn) Close() error {
nl := c.link nl := c.link
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
atomic.StoreInt32(&c.rxclosed, 1) atomic.StoreInt32(&c.rxclosed, 1)
//atomic.StoreInt32(&c.txclosed, 1) atomic.StoreInt32(&c.txclosed, 1)
c.shutdown() c.shutdown()
// adjust link.connTab // adjust link.connTab
...@@ -410,10 +409,11 @@ func (c *Conn) Close() error { ...@@ -410,10 +409,11 @@ func (c *Conn) Close() error {
// ( we cannot reuse same connection since after it is marked as // ( we cannot reuse same connection since after it is marked as
// closed Send refuses to work ) // closed Send refuses to work )
} else { } else {
delete(nl.connTab, c.connId) // delete(nl.connTab, c.connId)
// XXX temp. disabled - costs a lot in 1req=1conn model // XXX vvv was temp. disabled - costs a lot in 1req=1conn model
// // c implicitly goes away from connTab
// tmpclosed = nl.newConn(c.connId) // c implicitly goes away from connTab
tmpclosed = nl.newConn(c.connId)
} }
} }
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -583,6 +583,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -583,6 +583,7 @@ func (nl *NodeLink) serveRecv() {
// queuing pkt succeeds for incoming connection that is not yet // queuing pkt succeeds for incoming connection that is not yet
// there in acceptq. // there in acceptq.
if !rxdown { if !rxdown {
// XXX can avoid select here: if conn closer cares to drain rxq (?)
select { select {
case <-conn.rxdown: case <-conn.rxdown:
rxdown = true rxdown = true
...@@ -615,6 +616,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -615,6 +616,7 @@ func (nl *NodeLink) serveRecv() {
// put conn to .acceptq // put conn to .acceptq
if !axdown { if !axdown {
// XXX can avoid select here if shutdownAX cares to drain acceptq (?)
select { select {
case <-nl.axdown: case <-nl.axdown:
axdown = true axdown = true
...@@ -655,7 +657,6 @@ type txReq struct { ...@@ -655,7 +657,6 @@ type txReq struct {
errch chan error errch chan error
} }
/*
// errSendShutdown returns appropriate error when c.txdown is found ready in Send // errSendShutdown returns appropriate error when c.txdown is found ready in Send
func (c *Conn) errSendShutdown() error { func (c *Conn) errSendShutdown() error {
switch { switch {
...@@ -673,7 +674,6 @@ func (c *Conn) errSendShutdown() error { ...@@ -673,7 +674,6 @@ func (c *Conn) errSendShutdown() error {
return ErrLinkDown return ErrLinkDown
} }
} }
*/
// sendPkt sends raw packet via connection. // sendPkt sends raw packet via connection.
// //
...@@ -683,12 +683,12 @@ func (c *Conn) sendPkt(pkt *PktBuf) error { ...@@ -683,12 +683,12 @@ func (c *Conn) sendPkt(pkt *PktBuf) error {
return c.err("send", err) return c.err("send", err)
} }
// XXX serveSend is not needed - Conn.Write is already can be used by multiple // XXX serveSend is not needed - Conn.Write already can be used by multiple
// goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD) // goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD)
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109 // https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14 // https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14
///* /*
func (c *Conn) sendPkt2(pkt *PktBuf) error { func (c *Conn) sendPkt2(pkt *PktBuf) error {
// set pkt connId associated with this connection // set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId) pkt.Header().ConnId = hton32(c.connId)
...@@ -705,9 +705,9 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error { ...@@ -705,9 +705,9 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error {
return err return err
} }
//*/ */
/* ///*
func (c *Conn) sendPkt2(pkt *PktBuf) error { func (c *Conn) sendPkt2(pkt *PktBuf) error {
// set pkt connId associated with this connection // set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId) pkt.Header().ConnId = hton32(c.connId)
...@@ -775,7 +775,7 @@ func (nl *NodeLink) serveSend() { ...@@ -775,7 +775,7 @@ func (nl *NodeLink) serveSend() {
} }
} }
} }
*/ //*/
// ---- raw IO ---- // ---- raw IO ----
...@@ -1230,7 +1230,7 @@ func (c *Conn) Send(msg Msg) error { ...@@ -1230,7 +1230,7 @@ func (c *Conn) Send(msg Msg) error {
buf := pktAlloc(pktHeaderLen+l) buf := pktAlloc(pktHeaderLen+l)
h := buf.Header() h := buf.Header()
// h.ConnId will be set by conn.Send // h.ConnId will be set by conn.sendPkt
h.MsgCode = hton16(msg.neoMsgCode()) h.MsgCode = hton16(msg.neoMsgCode())
h.MsgLen = hton32(uint32(l)) // XXX casting: think again h.MsgLen = hton32(uint32(l)) // XXX casting: think again
......
...@@ -170,6 +170,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -170,6 +170,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
//println("000")
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := &xsync.WorkGroup{} wg := &xsync.WorkGroup{}
...@@ -227,6 +228,7 @@ func TestNodeLink(t *testing.T) { ...@@ -227,6 +228,7 @@ func TestNodeLink(t *testing.T) {
} }
xclose(nl1) xclose(nl1)
//println("111")
// Close vs recvPkt on another side // Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
...@@ -256,6 +258,8 @@ func TestNodeLink(t *testing.T) { ...@@ -256,6 +258,8 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xclose(nl1) xclose(nl1)
//println("222")
// raw exchange // raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
...@@ -286,6 +290,7 @@ func TestNodeLink(t *testing.T) { ...@@ -286,6 +290,7 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xwait(wgclose) xwait(wgclose)
//println("333")
// ---- connections on top of nodelink ---- // ---- connections on top of nodelink ----
...@@ -305,6 +310,8 @@ func TestNodeLink(t *testing.T) { ...@@ -305,6 +310,8 @@ func TestNodeLink(t *testing.T) {
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
//println("444")
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1) c = xnewconn(nl1)
...@@ -320,6 +327,8 @@ func TestNodeLink(t *testing.T) { ...@@ -320,6 +327,8 @@ func TestNodeLink(t *testing.T) {
} }
xwait(wg) xwait(wg)
//println("555")
// NodeLink.Close vs Conn.sendPkt/recvPkt // NodeLink.Close vs Conn.sendPkt/recvPkt
c11 := xnewconn(nl1) c11 := xnewconn(nl1)
c12 := xnewconn(nl1) c12 := xnewconn(nl1)
...@@ -344,6 +353,8 @@ func TestNodeLink(t *testing.T) { ...@@ -344,6 +353,8 @@ func TestNodeLink(t *testing.T) {
xclose(c12) xclose(c12)
xclose(nl2) xclose(nl2)
//println(600)
// NodeLink.Close vs Conn.sendPkt/recvPkt and Accept on another side // NodeLink.Close vs Conn.sendPkt/recvPkt and Accept on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, 0) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, 0)
c21 := xnewconn(nl2) c21 := xnewconn(nl2)
...@@ -381,6 +392,7 @@ func TestNodeLink(t *testing.T) { ...@@ -381,6 +392,7 @@ func TestNodeLink(t *testing.T) {
xclose(nl1) xclose(nl1)
xwait(wg) xwait(wg)
//println(777)
// XXX denoise vvv // XXX denoise vvv
// NewConn after NodeLink shutdown // NewConn after NodeLink shutdown
...@@ -446,6 +458,7 @@ func TestNodeLink(t *testing.T) { ...@@ -446,6 +458,7 @@ func TestNodeLink(t *testing.T) {
t.Fatalf("Accept after NodeLink close: %v", err) t.Fatalf("Accept after NodeLink close: %v", err)
} }
//println(888)
xclose(c21) xclose(c21)
xclose(c22) xclose(c22)
...@@ -463,6 +476,9 @@ func TestNodeLink(t *testing.T) { ...@@ -463,6 +476,9 @@ func TestNodeLink(t *testing.T) {
saveKeepClosed := connKeepClosed saveKeepClosed := connKeepClosed
connKeepClosed = 10*time.Millisecond connKeepClosed = 10*time.Millisecond
//println(999)
//println()
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
nl1.CloseAccept() nl1.CloseAccept()
...@@ -519,7 +535,7 @@ func TestNodeLink(t *testing.T) { ...@@ -519,7 +535,7 @@ func TestNodeLink(t *testing.T) {
}) })
//println("000") //println("aaa")
c1 := xnewconn(nl1) c1 := xnewconn(nl1)
xsendPkt(c1, mkpkt(33, []byte("ping"))) xsendPkt(c1, mkpkt(33, []byte("ping")))
...@@ -574,7 +590,7 @@ func TestNodeLink(t *testing.T) { ...@@ -574,7 +590,7 @@ func TestNodeLink(t *testing.T) {
} }
nl2.connMu.Unlock() nl2.connMu.Unlock()
//println("555") //println("bbb")
xclose(c1) xclose(c1)
xclose(c2) xclose(c2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment