Commit 745961cc authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 94a98550
// Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neo
// common parts for organizing network servers
// XXX kill?
import (
// "context"
// "fmt"
// "net"
// "sync"
// "lab.nexedi.com/kirr/neo/go/neo/neonet"
// "lab.nexedi.com/kirr/neo/go/neo/proto"
// "lab.nexedi.com/kirr/neo/go/internal/log"
// "lab.nexedi.com/kirr/go123/xerr"
)
/*
// Server is an interface that represents networked server
type Server interface {
// ServeLink serves already established nodelink (connection) in a blocking way.
// ServeLink is usually run in separate goroutine
ServeLink(ctx context.Context, link *neonet.NodeLink)
}
// Serve runs service on a listener
// - accept incoming connection on the listener
// - for every accepted connection spawn srv.ServeLink() in separate goroutine.
//
// the listener is closed when Serve returns.
func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
fmt.Printf("xxx: serving on %s ...\n", l.Addr()) // XXX 'xxx' -> ?
defer xio.CloseWhenDone(ctx, l)()
// main Accept -> ServeLink loop
for {
link, err := l.Accept()
if err != nil {
// TODO err == closed <-> ctx was cancelled
// TODO err -> net.Error && .Temporary() -> some throttling
return err
}
// XXX close link when either cancelling or returning?
// XXX only returning with error!
go srv.ServeLink(ctx, link)
}
}
*/
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
...@@ -286,7 +287,18 @@ func (stor *Storage) serve(ctx context.Context) (err error) { ...@@ -286,7 +287,18 @@ func (stor *Storage) serve(ctx context.Context) (err error) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged err := stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged
// XXX log err XXX vvv ok?
switch errors.Cause(err) {
// XXX closed by main or peer down - all logged by main called
// XXX review
case neonet.ErrLinkDown, neonet.ErrLinkClosed:
// ok
default:
log.Error(ctx, err)
}
return
}() }()
} }
} }
...@@ -308,7 +320,7 @@ func (stor *Storage) identify(ctx context.Context, idReq *proto.RequestIdentific ...@@ -308,7 +320,7 @@ func (stor *Storage) identify(ctx context.Context, idReq *proto.RequestIdentific
return idResp, ereject return idResp, ereject
} }
func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *proto.Error) { func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *proto.Error) {
// XXX stub: we accept clients and don't care about their NID // XXX stub: we accept clients and don't care about their NID/IDtime
if idReq.NodeType != proto.CLIENT { if idReq.NodeType != proto.CLIENT {
return nil, &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"} return nil, &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}
} }
...@@ -343,7 +355,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq * ...@@ -343,7 +355,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *
} }
// client passed identification, now serve other requests // client passed identification, now serve other requests
wg := sync.WaitGroup{} // XXX -> errgroup? wg := xsync.NewWorkGroup(ctx)
for { for {
req, err := link.Recv1() req, err := link.Recv1()
if err != nil { if err != nil {
...@@ -360,26 +372,27 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq * ...@@ -360,26 +372,27 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *
return err return err
} }
// XXX this go + link.Recv1() in serveClient arrange for N(goroutine) ↑ // FIXME this go + link.Recv1() in serveClient arrange for N(goroutine) ↑
// with O(1/nreq) rate (i.e. N(goroutine, nreq) ~ ln(nreq)). // with O(1/nreq) rate (i.e. N(goroutine, nreq) ~ ln(nreq)).
wg.Add(1) //
go func() { // TODO -> do what go-fuse does:
defer wg.Done() // - serve request in the goroutine that received it
stor.serveClient(ctx, req) // - spawn another goroutine to continue accept loop
}() // - limit number of such accept-loop goroutines by GOMAXPROC
wg.Go(func(ctx context.Context) error {
return stor.serveClient(ctx, req)
})
} }
wg.Wait() err = wg.Wait()
return nil return err
} }
// serveClient serves incoming client request. // serveClient serves incoming client request.
// //
// XXX +error return?
//
// XXX version that reuses goroutine to serve next client requests // XXX version that reuses goroutine to serve next client requests
// XXX for py compatibility (py has no way to tell us Conn is closed) // XXX for py compatibility (py has no way to tell us Conn is closed)
func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) { func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) error {
link := req.Link() link := req.Link()
for { for {
...@@ -387,11 +400,10 @@ func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) { ...@@ -387,11 +400,10 @@ func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) {
err := req.Reply(resp) err := req.Reply(resp)
req.Close() req.Close()
if err != nil { if err != nil {
log.Error(ctx, err) return err
return
} }
// XXX hack -> resp.Release() // XXX hack -> TODO resp.Release()
// XXX req.Msg release too? // XXX req.Msg release too?
if resp, ok := resp.(*proto.AnswerObject); ok { if resp, ok := resp.(*proto.AnswerObject); ok {
resp.Data.Release() resp.Data.Release()
...@@ -401,16 +413,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) { ...@@ -401,16 +413,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) {
// TODO += timeout -> go away if inactive // TODO += timeout -> go away if inactive
req, err = link.Recv1() req, err = link.Recv1()
if err != nil { if err != nil {
switch errors.Cause(err) { return err
// XXX closed by main or peer down - all logged by main called
// XXX review
case neonet.ErrLinkDown, neonet.ErrLinkClosed:
// ok
default:
log.Error(ctx, err)
}
return
} }
} }
} }
...@@ -462,53 +465,3 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot ...@@ -462,53 +465,3 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
//req.Put(...) //req.Put(...)
} }
// ----------------------------------------
// serveClient serves incoming connection on which peer identified itself as client
// the connection is closed when serveClient returns
// XXX +error return?
//
// XXX version that keeps 1 goroutine per 1 Conn
// XXX unusable until Conn.Close signals peer
/*
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
// main work to serve
done := make(chan error, 1)
go func() {
for {
err := stor.serveClient1(conn)
if err != nil {
done <- err
break
}
}
}()
// close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to
// terminate with an error )
var err error
select {
case <-ctx.Done():
// XXX tell client we are shutting down?
// XXX should we also wait for main work to finish?
err = ctx.Err()
case err = <-done:
}
log.Infof(ctx, "%v: %v", conn, err)
// XXX vvv -> defer ?
log.Infof(ctx, "%v: closing client conn", conn)
conn.Close() // XXX err
}
*/
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment