Commit 146534b0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 915cb69a
......@@ -139,7 +139,7 @@ type nodeLeave struct {
// reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neo.Request, resp neo.Msg) {
// XXX cancel on ctx?
// XXX log?
// log.Info(ctx, "identification rejected") ?
err1 := req.Reply(resp)
err2 := req.Link().Close()
err := xerr.Merge(err1, err2)
......
......@@ -43,7 +43,7 @@ type Storage struct {
// context for providing operational service
// it is renewed every time master tells us StartOpertion, so users
// must read it initially only once under opMu via opCtxRead
// must read it initially only once under opMu via withWhileOperational.
opMu sync.Mutex
opCtx context.Context
......@@ -94,7 +94,7 @@ func (stor *Storage) Run(ctx context.Context) error {
wg.Add(1)
go func(ctx context.Context) (err error) {
defer wg.Done()
defer task.Running(&ctx, "serve")(&err) // XXX or "accept" ?
defer task.Running(&ctx, "accept")(&err)
// XXX dup from master
for {
......@@ -108,28 +108,23 @@ func (stor *Storage) Run(ctx context.Context) error {
continue
}
resp, ok := stor.identify(idReq)
if !ok {
goreject(ctx, &wg, req, resp)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
stor.serveLink(ctx, req, idReq) // XXX ignore err?
}()
// handover to main driver
select {
//case stor.nodeCome <- nodeCome{req, idReq}:
// // ok
case <-ctx.Done():
// shutdown
lclose(ctx, req.Link())
continue
}
// // handover to main driver
// select {
// case stor.nodeCome <- nodeCome{req, idReq}:
// // ok
//
// case <-ctx.Done():
// // shutdown
// lclose(ctx, req.Link())
// continue
// }
}
}(serveCtx)
......@@ -221,8 +216,8 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// cancelled or some other error.
//
// return error indicates:
// - nil: initialization was ok and a command came from master to start operation
// - !nil: initialization was cancelled or failed somehow
// - nil: initialization was ok and a command came from master to start operation.
// - !nil: initialization was cancelled or failed somehow.
func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (reqStart *neo.Request, err error) {
defer task.Runningf(&ctx, "init %v", mlink)(&err)
......@@ -292,9 +287,9 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (req
}
}
// m1serve drives storage by master messages during service phase
// m1serve drives storage by master messages during service phase.
//
// Service is regular phase serving requests from clients to load/save object,
// Service is regular phase serving requests from clients to load/save objects,
// handling transaction commit (with master) and syncing data with other
// storage nodes (XXX correct?).
//
......@@ -362,7 +357,14 @@ func (stor *Storage) identify(idReq *neo.RequestIdentification) (neo.Msg, bool)
return &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}, false
}
// XXX check operational?
// check operational
stor.opMu.Lock()
operational := (stor.opCtx.Err() == nil)
stor.opMu.Unlock()
if !operational {
return &neo.Error{neo.NOT_READY, "cluster not operational"}, false
}
return &neo.AcceptIdentification{
NodeType: stor.node.MyInfo.Type,
......@@ -374,58 +376,62 @@ func (stor *Storage) identify(idReq *neo.RequestIdentification) (neo.Msg, bool)
}
// withWhileOperational derives new context from ctx which will be cancelled, when either
// - ctx is cancelled, or
// - master tells us to stop operational service
func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, context.CancelFunc) {
stor.opMu.Lock()
opCtx := stor.opCtx
stor.opMu.Unlock()
return xcontext.Merge(ctx, opCtx)
}
// ServeLink serves incoming node-node link connection
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) (err error) {
// serveLink serves incoming node-node link connection
func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *neo.RequestIdentification) (err error) {
link := req.Link()
defer task.Runningf(&ctx, "serve %s", link)(&err)
defer xio.CloseWhenDone(ctx, link)()
// XXX only accept clients
// XXX only accept when operational (?)
nodeInfo, err := IdentifyPeer(ctx, link, neo.STORAGE)
// handle identification
idResp, ok := stor.identify(idReq)
if !ok {
reject(ctx, req, idResp) // XXX log?
return nil
}
err = accept(ctx, req, idResp)
if err != nil {
log.Error(ctx, err)
return
return err
}
var serveReq func(context.Context, neo.Request)
switch nodeInfo.NodeType {
case neo.CLIENT:
serveReq = stor.serveClient
// client passed identification, now serve other requests
log.Info(ctx, "identification accepted") // FIXME must be in identify?
default:
// XXX vvv should be reply to peer
log.Errorf(ctx, "%v: unexpected peer type: %v", link, nodeInfo.NodeType)
return
}
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
// identification passed, now serve other requests
wg := sync.WaitGroup{} // XXX -> errgroup?
for {
req, err := link.Recv1()
if err != nil {
log.Error(ctx, err)
break
return err
}
go serveReq(ctx, req)
wg.Add(1)
go func() {
defer wg.Done()
stor.serveClient(ctx, req)
}()
}
// TODO wait all spawned serveConn
wg.Wait()
return nil
}
// withWhileOperational derives new context from ctx which will be cancelled, when either
// - ctx is cancelled, or
// - master tells us to stop operational service
func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, context.CancelFunc) {
stor.opMu.Lock()
opCtx := stor.opCtx
stor.opMu.Unlock()
return xcontext.Merge(ctx, opCtx)
}
// serveClient serves incoming client request.
//
// XXX +error return?
......@@ -433,21 +439,13 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
// XXX version that reuses goroutine to serve next client requests
// XXX for py compatibility (py has no way to tell us Conn is closed)
func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
// XXX vvv move level-up
//log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation
// XXX level up
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
link := req.Link()
for {
resp := stor.serveClient1(ctx, req.Msg)
err := req.Reply(resp)
if err != nil {
log.Info(ctx, err)
log.Error(ctx, err)
return
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment