Commit c643ba53 authored by Kirill Smelkov's avatar Kirill Smelkov

X Send1: switch to sending directly over link

parent d39a6fc3
...@@ -249,7 +249,7 @@ func (c *Conn) reinit() { ...@@ -249,7 +249,7 @@ func (c *Conn) reinit() {
c.txdownOnce = sync.Once{} // XXX ok? c.txdownOnce = sync.Once{} // XXX ok?
c.txclosed.Set(0) c.txclosed.Set(0)
c.closeOnce = sync.Once{} // XXX ok? c.closeOnce = sync.Once{} // XXX ok?
} }
// ensureOpen make sure *ch stays non-closed chan struct{} for signalling. // ensureOpen make sure *ch stays non-closed chan struct{} for signalling.
...@@ -302,7 +302,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -302,7 +302,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
return c, nil return c, nil
} }
// shutdownAX marks acceptq as no longer operational // shutdownAX marks acceptq as no longer operational and interrupts Accept.
func (link *NodeLink) shutdownAX() { func (link *NodeLink) shutdownAX() {
link.axdown1.Do(func() { link.axdown1.Do(func() {
close(link.axdown) close(link.axdown)
...@@ -364,12 +364,12 @@ func (nl *NodeLink) shutdown() { ...@@ -364,12 +364,12 @@ func (nl *NodeLink) shutdown() {
}) })
} }
// CloseAccept instructs node link to not accept incoming conections anymore. // CloseAccept instructs node link to not accept incoming connections anymore.
// //
// Any blocked Accept() will be unblocked and return error. // Any blocked Accept() will be unblocked and return error.
// The peer will receive "connection refused" if it tries to connect after. // The peer will receive "connection refused" if it tries to connect after.
// //
// It is safet to call CloseAccept several times. // It is safe to call CloseAccept several times.
func (link *NodeLink) CloseAccept() { func (link *NodeLink) CloseAccept() {
link.axclosed.Set(1) link.axclosed.Set(1)
link.shutdownAX() link.shutdownAX()
...@@ -395,6 +395,7 @@ func (c *Conn) shutdown() { ...@@ -395,6 +395,7 @@ func (c *Conn) shutdown() {
c.shutdownRX(errConnClosed) c.shutdownRX(errConnClosed)
} }
// shutdownTX marks TX as no longer operational (?) and interrupts Send.
func (c *Conn) shutdownTX() { func (c *Conn) shutdownTX() {
c.txdownOnce.Do(func() { c.txdownOnce.Do(func() {
close(c.txdown) close(c.txdown)
...@@ -1335,12 +1336,13 @@ func (c *Conn) Recv() (Msg, error) { ...@@ -1335,12 +1336,13 @@ func (c *Conn) Recv() (Msg, error) {
// //
// it encodes message int packet, sets header appropriately and sends it. // it encodes message int packet, sets header appropriately and sends it.
// //
// it is ok to call sendMsg in parallel with serveSend running XXX why // it is ok to call sendMsg in parallel with serveSend. XXX link to sendPktDirect for rationale?
func (link *NodeLink) sendMsg(connId uint32, msg Msg) error { func (link *NodeLink) sendMsg(connId uint32, msg Msg) error {
traceMsgSendPre(link, connId, msg) traceMsgSendPre(link, connId, msg)
buf := msgPack(connId, msg) buf := msgPack(connId, msg)
return link.sendPkt(buf) // XXX more context in err? (msg type) return link.sendPkt(buf) // XXX more context in err? (msg type)
// FIXME ^^^ shutdown whole link on error
} }
// Send sends message. // Send sends message.
...@@ -1353,6 +1355,10 @@ func (c *Conn) Send(msg Msg) error { ...@@ -1353,6 +1355,10 @@ func (c *Conn) Send(msg Msg) error {
return c.sendPkt(buf) // XXX more context in err? (msg type) return c.sendPkt(buf) // XXX more context in err? (msg type)
} }
func (c *Conn) sendMsgDirect(msg Msg) error {
return c.link.sendMsg(c.connId, msg)
}
// Expect receives message and checks it is one of expected types // Expect receives message and checks it is one of expected types
// //
...@@ -1497,9 +1503,9 @@ func (link *NodeLink) Send1(msg Msg) error { ...@@ -1497,9 +1503,9 @@ func (link *NodeLink) Send1(msg Msg) error {
return err return err
} }
conn.downRX(errConnClosed) // FIXME just new conn this way conn.downRX(errConnClosed) // XXX just new conn this way
err = conn.Send(msg) err = conn.sendMsgDirect(msg)
conn.lightClose() conn.lightClose()
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment