Commit e4a7f75c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 599cd283
......@@ -153,10 +153,24 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
}(ctx)
// connect to master and get commands and updates from it
err = stor.talkMaster(ctx)
// err = stor.talkMaster(ctx)
// err = stor.talkMaster(serveCtx) // XXX hack for shutdown
// XXX log err?
err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
// let master initialize us. If successful this ends with StartOperation command.
reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil {
//log.Error(ctx, err)
return err
}
// we got StartOperation command. Let master drive us during service phase.
err = stor.m1serve(ctx, reqStart)
//log.Error(ctx, err)
return err
})
// we are done - shutdown
// serveCancel()
wg.Wait()
......@@ -274,7 +288,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neonet.NodeLink) (
defer task.Runningf(&ctx, "init %v", mlink)(&err)
for {
req, err := mlink.Recv1() // XXX -> RecvM1
req, err := stor.node.RecvM1()
if err != nil {
return nil, err
}
......@@ -294,7 +308,6 @@ var cmdStart = errors.New("start requested")
// m1initialize1 handles one message from master from under m1initialize
func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) error {
// XXX vvv move Send out of reply preparing logic
var err error
switch msg := req.Msg.(type) {
......@@ -333,23 +346,13 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
// XXX -> somehow to common part in Node ?
case *proto.SendPartitionTable:
// TODO M sends us whole PT -> save locally
stor.node.UpdatePartTab(ctx, msg) // XXX lock? XXX handle msg.NumReplicas
case *proto.NotifyPartitionChanges:
// TODO M sends us δPT -> save locally?
case *proto.NotifyNodeInformation:
// XXX check for myNID and consider it a command (like neo/py) does?
stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *proto.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
// TODO M sends us δPT -> save locally
}
// XXX move req.Reply here and ^^^ only prepare reply
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment