Commit c825706e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a17bc034
...@@ -28,7 +28,6 @@ import ( ...@@ -28,7 +28,6 @@ import (
"time" "time"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
// "lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log" "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task" "lab.nexedi.com/kirr/neo/go/internal/task"
...@@ -57,8 +56,6 @@ import ( ...@@ -57,8 +56,6 @@ import (
// //
// This pipeline is operated by TalkMaster. // This pipeline is operated by TalkMaster.
// The connection to master is persisted by redial as needed. // The connection to master is persisted by redial as needed.
//
// XXX update after introduction of _MasterLink
type _MasteredNode struct { type _MasteredNode struct {
*xneo.Node *xneo.Node
...@@ -74,8 +71,7 @@ type _MasteredNode struct { ...@@ -74,8 +71,7 @@ type _MasteredNode struct {
opReady chan struct{} // reinitialized each time state becomes non-operational opReady chan struct{} // reinitialized each time state becomes non-operational
operational bool // cache for state.IsOperational() operational bool // cache for state.IsOperational()
// rxm chan _RxM // TalkMaster -> RecvM1 rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to mlink.Recv1
rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to RecvM1
// XXX just use `.myInfo.NodeType == STORAGE` instead? // XXX just use `.myInfo.NodeType == STORAGE` instead?
} }
...@@ -87,8 +83,8 @@ type _RxM struct { ...@@ -87,8 +83,8 @@ type _RxM struct {
type _MasteredNodeFlags int type _MasteredNodeFlags int
const ( const (
// δPartTabPassThrough tells RecvM1 not to filter out messages related // δPartTabPassThrough tells mlink.Recv1 not to filter out messages related
// to partition table changes. When RecvM1 receives such messages there // to partition table changes. When mlink.Recv1 receives such messages there
// are already processed internally to update .state.PartTab correspondingly. // are already processed internally to update .state.PartTab correspondingly.
// //
// Storage uses this mode to receive δPartTab notifications to know // Storage uses this mode to receive δPartTab notifications to know
...@@ -110,7 +106,7 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -110,7 +106,7 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
// TalkMaster dials master, identifies to it, and receives master notifications and requests. // TalkMaster dials master, identifies to it, and receives master notifications and requests.
// //
// Notifications to node/partition tables and cluster state are automatically // Notifications to node/partition tables and cluster state are automatically
// handled, while other notifications and requests are passed through to RecvM1. // handled, while other notifications and requests are passed through to _MasterLink.Recv1.
// //
// Master link is persisted by redialing as needed. // Master link is persisted by redialing as needed.
// //
...@@ -214,49 +210,25 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -214,49 +210,25 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
return err return err
} }
/*
wg := xsync.NewWorkGroup(ctx)
// receive and handle notifications from master
// XXX no need to spawn "rx prefilter" - just make MasterLink.Recv1() call mlink.Recv1(), check if message should be prefiltered and call Master.recvMaster1. (-> yes)
node.rxm = make(chan _RxM)
wg.Go(func(ctx context.Context) error {
defer task.Running(&ctx, "rx prefilter")(&err)
for {
req, err := mlink.Recv1()
if err != nil {
close(node.rxm)
return err
}
err = node.recvMaster1(ctx, req) // req ownership is passed in
if err != nil {
return err
}
}
})
*/
// run user code // run user code
// wg.Go(func(ctx context.Context) error {
return f(ctx, &_MasterLink{mlink, node}) return f(ctx, &_MasterLink{mlink, node})
// })
// return wg.Wait()
}) })
} }
// _MasterLink represents NodeLink to master with Recv1 filtered through _MasteredNode. // _MasterLink represents NodeLink to master with Recv1 filtered through _MasteredNode
// δstate handler.
type _MasterLink struct { type _MasterLink struct {
*neonet.NodeLink *neonet.NodeLink
node *_MasteredNode node *_MasteredNode
} }
// RecvM1 receives request from master filtered through _MasteredNode δstate handler. // Recv1 receives request from master filtered through _MasteredNode δstate handler.
// //
// Must be called only when master link is established - e.g. from under TalkMaster. // Must be called only when master link is established - e.g. from under TalkMaster.
func (mlink *_MasterLink) Recv1(ctx context.Context) (neonet.Request, error) { func (mlink *_MasterLink) Recv1(ctx context.Context) (neonet.Request, error) {
for { for {
req, err := mlink.NodeLink.Recv1() // cancel on ctx req, err := mlink.NodeLink.Recv1() // cancel on ctx
if err != nil { if err != nil {
// close(node.rxm)
return neonet.Request{}, err return neonet.Request{}, err
} }
...@@ -291,59 +263,6 @@ func (mlink *_MasterLink) Recv1(ctx context.Context) (neonet.Request, error) { ...@@ -291,59 +263,6 @@ func (mlink *_MasterLink) Recv1(ctx context.Context) (neonet.Request, error) {
} }
} }
/*
// recvMaster1 handles 1 message from master.
func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request) (err error) {
// messages for state changes are handled internally
δstate := true
switch req.Msg.(type) {
default: δstate = false
case *proto.SendPartitionTable: // whole partTab
case *proto.NotifyPartitionChanges: // δ(partTab)
case *proto.NotifyNodeInformation: // δ(nodeTab)
case *proto.NotifyClusterState:
}
if δstate {
δpt, err := node.recvδstate(ctx, req.Msg)
toRecvM1 := false
if δpt && (node.rxmFlags & δPartTabPassThrough != 0) {
toRecvM1 = true
}
if !toRecvM1 {
req.Close()
return err
}
}
// pass request -> recvM1
// NOTE req ownership is passed into recvM1 caller who becomes responsible to close it
select {
case <-ctx.Done():
req.Close()
return ctx.Err()
case node.rxm <- _RxM{Req: req}:
// ok
}
return nil
}
*/
//var errMasterDisconect = errors.New("master disconnected")
//
// // recvM1 receives request from master filtered through δstate handler.
// //
// // Must be called only when master link is established - e.g. from under TalkMaster.
// func (node *_MasteredNode) recvM1() (neonet.Request, error) {
// rx, ok := <-node.rxm
// if !ok {
// return neonet.Request{}, errMasterDisconect
// }
// return rx.Req, rx.Err
// }
//trace:event traceClusterStateChanged(cs *proto.ClusterState) //trace:event traceClusterStateChanged(cs *proto.ClusterState)
// recvδstate handles reception of δstate messages. // recvδstate handles reception of δstate messages.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment