Commit 0dd84253 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 37dce98f
......@@ -114,7 +114,7 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
// Run starts client node and runs it until either ctx is canceled or master
// commands it to shutdown. (TODO verify M->shutdown)
func (c *Client) Run(ctx context.Context) (err error) {
defer task.Running(&ctx, "client")(&err)
// defer task.Running(&ctx, "client")(&err)
// run process which performs master talk
ctx, cancel := context.WithCancel(ctx)
......
......@@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
......@@ -42,15 +43,15 @@ import (
// _MasteredNode provides base functionality of a NEO node driven by master.
//
// It connects to master, identifies to it, and handles master messages about
// δNodeTab, δPartTab and δClusterState to update local replica of cluster
// δNodeTab, δPartTab and δClusterState to maintain local replica of cluster
// state. The other messages from master are passed through to RecvM1. In other
// words _MasteredNode installs a kind of reception pipeline in between master
// and _MasteredNode user:
//
//
// δNodeTab
// δPartTab
// δClusterState
// δNodeTab
// δPartTab
// δClusterState
// ↑
// RecvM1 ---------------
// user <-------- | _MasteredNode | <- M
......@@ -138,16 +139,30 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
//
// The connection to master is persisted by redial as needed.
//
// f is called on every reconnection after identification and protocol prologue. XXX
// f is called on every reconnection after identification and protocol prologue.
//
// See top-level _MasteredNode overview for details.
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *neonet.NodeLink) error) (err error) {
defer task.Runningf(&ctx, "talk master(%s)", node.MasterAddr)(&err)
// me0 describes local node when it starts connecting to master, e.g. 'client C?'.
// we don't use just NID because it is initially 0 and because master can tell us to change it.
me0 := strings.ToLower(node.myInfo.Type.String())
me0 += " "
mynid0 := node.myInfo.NID
if mynid0 == 0 {
me0 += "?"
} else {
me0 += mynid0.String()
}
ctx0 := ctx
defer task.Runningf(&ctx, "%s: talk master(%s)", me0, node.MasterAddr)(&err)
for {
err := node.talkMaster1(ctx, f)
err := node.talkMaster1(ctx, ctx0, f)
log.Warning(ctx, err) // XXX Warning ok? -> Error?
// TODO if err == "reject identification / protocol error" -> shutdown client
// TODO if err == shutdown -> return
// TODO if err == "not a primary" -> try redirected addresss
// TODO if err == "not a primary" -> try redirected address
// exit on cancel / throttle reconnecting
select {
......@@ -161,7 +176,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
}
}
func (node *_MasteredNode) talkMaster1(ctx context.Context, f func(context.Context, *neonet.NodeLink) error) error {
func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *neonet.NodeLink) error) error {
reqID := &proto.RequestIdentification{
NodeType: node.myInfo.Type,
NID: node.myInfo.NID,
......@@ -182,6 +197,11 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context, f func(context.Conte
node.myInfo.NID = accept.YourNID // XXX locking ?
}
// rebuild nicer task now when we know both our and master NIDs
// e.g. "client ?: talk master(127.0.0.1:21484)" -> "C1: talk M1".
ctx := ctxPreTalkM
defer task.Runningf(&ctx, "%s: talk %s", accept.YourNID, accept.MyNID)(&err)
// master pushes whole nodeTab and partTab to us right after identification
// nodeTab
......@@ -199,7 +219,7 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context, f func(context.Conte
return fmt.Errorf("after identification: %w", err)
}
pt := xneo.PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
log.Infof(ctx, "<- parttab:\n%s", pt)
// update cluster state
......@@ -331,14 +351,6 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
traceClusterStateChanged(&node.state.Code)
}
/* XXX kill
if δpt && node.OnNotifyδPartTab != nil {
err = node.OnNotifyδPartTab(node.state.PartTab)
// XXX err -> return without notify?
panic("TODO")
}
*/
// update .operational + notify those who was waiting for it
opready := node.updateOperational()
node.stateMu.Unlock()
......@@ -417,7 +429,7 @@ var cmdShutdown = errors.New("master told us to shutdown")
func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) error {
// XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "node update: %v", nodeInfo)
log.Infof(ctx, "<- node: %v", nodeInfo)
node.state.NodeTab.Update(nodeInfo)
// we have to provide IdTime when requesting identification to other peers
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment