Commit 87a6b2d4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 70c2e81a
...@@ -98,8 +98,10 @@ type Conn struct { ...@@ -98,8 +98,10 @@ type Conn struct {
rxdownOnce sync.Once // ----//---- rxdownOnce sync.Once // ----//----
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
closed int32 // 1 if Close was called or "connection closed" entry rxclosed int32 // whether CloseRecv was called
// incremented during every replyNoConn() in progress txclosed int32 // whether CloseSend was called
// closed int32 // 1 if Close was called or "connection closed" entry
// // incremented during every replyNoConn() in progress
errMsg *Error // error message for replyNoConn errMsg *Error // error message for replyNoConn
} }
...@@ -314,25 +316,23 @@ func (c *Conn) shutdownRX(errMsg *Error) { ...@@ -314,25 +316,23 @@ func (c *Conn) shutdownRX(errMsg *Error) {
} }
// time to keep record of a closed connection so that we can properly reply
// "connection closed" if a packet comes in with this connection's connID.
var connKeepClosed = 1*time.Minute var connKeepClosed = 1*time.Minute
// CloseRecv closes reading end of connection. // CloseRecv closes reading end of connection.
// //
// A peer will receive "connection closed" if it tries to send to connection
// with closed reading end.
//
// XXX no race wrt in-progress c.rxq <- ... ?
//
// Any blocked Recv*() will be unblocked and return error. // Any blocked Recv*() will be unblocked and return error.
// The peer will receive "connection closed" if it tries to send anything after.
// //
// It is safe to call CloseRecv several times. // It is safe to call CloseRecv several times.
func (c *Conn) CloseRecv() { func (c *Conn) CloseRecv() {
// XXX c.closedRecv = 1? atomic.StoreInt32(&c.rxclosed, 1)
c.shutdownRX(errConnClosed) // XXX ok? c.shutdownRX(errConnClosed)
// deque all pakcets already queued in c.rxq // dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into // (once serveRecv sees c.rxdown it won't try to put new packets into
// c.rxq but something finite could be already there) // c.rxq, but something finite could be already there)
i := 0 i := 0
loop: loop:
for { for {
...@@ -392,7 +392,8 @@ func (c *Conn) Close() error { ...@@ -392,7 +392,8 @@ func (c *Conn) Close() error {
} }
nl.connMu.Unlock() nl.connMu.Unlock()
atomic.StoreInt32(&c.closed, 1) atomic.StoreInt32(&c.rxclosed, 1)
atomic.StoreInt32(&c.txclosed, 1)
c.shutdown() c.shutdown()
return nil return nil
} }
...@@ -437,13 +438,9 @@ func (nl *NodeLink) Accept(/*ctx context.Context*/) (c *Conn, err error) { ...@@ -437,13 +438,9 @@ func (nl *NodeLink) Accept(/*ctx context.Context*/) (c *Conn, err error) {
// errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt // errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt
func (c *Conn) errRecvShutdown() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
// XXX adjust for CloseRead (shutdownRX) case atomic.LoadInt32(&c.rxclosed) != 0:
case atomic.LoadInt32(&c.closed) != 0:
return ErrClosedConn return ErrClosedConn
case c.errMsg != nil:
return ErrClosedConn // shutdownRX
case atomic.LoadUint32(&c.nodeLink.closed) != 0: case atomic.LoadUint32(&c.nodeLink.closed) != 0:
return ErrLinkClosed return ErrLinkClosed
...@@ -544,7 +541,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -544,7 +541,7 @@ func (nl *NodeLink) serveRecv() {
// don't even try to `conn.rxq <- ...` if .rxdown is ready // don't even try to `conn.rxq <- ...` if .rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv // ( else since select is picking random ready variant Recv/serveRecv
// could receve something on rxdown Conn half sometimes ) // could receive something on rxdown Conn half sometimes )
rxdown := false rxdown := false
select { select {
case <-conn.rxdown: case <-conn.rxdown:
...@@ -630,7 +627,7 @@ type txReq struct { ...@@ -630,7 +627,7 @@ type txReq struct {
// errSendShutdown returns appropriate error when c.txdown is found ready in Send // errSendShutdown returns appropriate error when c.txdown is found ready in Send
func (c *Conn) errSendShutdown() error { func (c *Conn) errSendShutdown() error {
switch { switch {
case atomic.LoadInt32(&c.closed) != 0: case atomic.LoadInt32(&c.txclosed) != 0:
return ErrClosedConn return ErrClosedConn
// the only other error possible besides Conn being .Close()'ed is that // the only other error possible besides Conn being .Close()'ed is that
...@@ -1244,7 +1241,7 @@ func (link *NodeLink) Recv1() (Request, error) { ...@@ -1244,7 +1241,7 @@ func (link *NodeLink) Recv1() (Request, error) {
// noone will be reading from conn anymore - shutdown rx so that if // noone will be reading from conn anymore - shutdown rx so that if
// peer sends any another packet with same .ConnID serveRecv does not // peer sends any another packet with same .ConnID serveRecv does not
// deadlock trying to put it to conn.rxq. // deadlock trying to put it to conn.rxq.
conn.CloseRead() conn.CloseRecv()
return Request{Msg: msg, conn: conn}, nil return Request{Msg: msg, conn: conn}, nil
} }
...@@ -1276,7 +1273,7 @@ func (link *NodeLink) Send1(msg Msg) error { ...@@ -1276,7 +1273,7 @@ func (link *NodeLink) Send1(msg Msg) error {
return err return err
} }
// XXX conn.CloseRead() ? // XXX conn.CloseRecv() ?
err1 := conn.Send(msg) err1 := conn.Send(msg)
err2 := conn.Close() err2 := conn.Close()
......
...@@ -45,6 +45,10 @@ Run NEO storage node. ...@@ -45,6 +45,10 @@ Run NEO storage node.
// FIXME use w (see flags.SetOutput) // FIXME use w (see flags.SetOutput)
} }
// TODO set GOMAXPROCS *= N (a lot of file IO) + link
// https://groups.google.com/forum/#!msg/golang-nuts/jPb_h3TvlKE/rQwbg-etCAAJ
// https://github.com/golang/go/issues/6817
func storageMain(argv []string) { func storageMain(argv []string) {
flags := flag.NewFlagSet("", flag.ExitOnError) flags := flag.NewFlagSet("", flag.ExitOnError)
flags.Usage = func() { storageUsage(os.Stderr); flags.PrintDefaults() } // XXX prettify flags.Usage = func() { storageUsage(os.Stderr); flags.PrintDefaults() } // XXX prettify
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment