Commit 5a6ce3e2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 598538fd
......@@ -40,7 +40,10 @@ import (
// _MasteredNode provides base functioanlity of a NEO node driven by master.
//
// talkMaster persists connection to master node, adn receives update from M
// XXX requests/notifications from master go through filter that handles
// nodeTab/partTab/ClusterState changes ...
//
// talkMaster persists connection to master node, and receives update from M
// about δNodeTab, δPartTab, ClusterState.
//
// XXX how to use
......@@ -110,10 +113,12 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
// talkMaster dials master, identifies to it, and receives master updates to
// node/partition tables and cluster state.
// talkMaster dials master, identifies to it, and receives master notifications and requests.
//
// Notifications to node/partition tables and cluster state are automatically
// handled, while other notifications and requests are passed through to RecvM1.
//
// XXX connection to master is persisted (redial)
// The connection to master is persisted by redial as needed.
func (node *_MasteredNode) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%s)", node.MasterAddr)(&err)
......@@ -136,7 +141,7 @@ func (node *_MasteredNode) talkMaster(ctx context.Context) (err error) {
}
}
func (node *_MasteredNode) talkMaster1(ctx context.Context) (err error) {
func (node *_MasteredNode) talkMaster1(ctx context.Context) error {
reqID := &proto.RequestIdentification{
NodeType: node.myInfo.Type,
NID: node.myInfo.NID,
......@@ -151,7 +156,7 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) (err error) {
return err
}
err = xcontext.WithCloseOnErrCancel(ctx, mlink, func() error {
return xcontext.WithCloseOnErrCancel(ctx, mlink, func() (err error) {
if accept.YourNID != node.myInfo.NID {
log.Infof(ctx, "master told us to have nid=%s", accept.YourNID)
node.myInfo.NID = accept.YourNID // XXX locking ?
......@@ -179,35 +184,32 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) (err error) {
// update cluster state
// XXX locking
err := node.updateNodeTab(ctx, &mnt)
if err != nil {
return err
}
node.stateMu.Lock()
err = node.updateNodeTab(ctx, &mnt)
node.state.PartTab = pt
// XXX update "operational"
node.stateMu.Unlock()
if err != nil { // might be command to shutdown
return err
}
// XXX update .masterLink + notify waiters
return nil
// receive and handle notifications from master
defer task.Running(&ctx, "rx")(&err)
for {
req, err := mlink.Recv1()
if err != nil {
return err
}
err = node.recvMaster1(ctx, req.Msg)
req.Close()
if err != nil {
return err
}
}
})
if err != nil {
// XXX
}
// receive and handle notifications from master
// XXX put inside ^^^ ?
defer task.Running(&ctx, "rx")(&err)
for {
req, err := mlink.Recv1()
if err != nil {
return err
}
err = node.recvMaster1(ctx, req.Msg)
req.Close()
if err != nil {
return err
}
}
}
// recvMaster1 handles 1 message from master.
......
......@@ -168,8 +168,8 @@ type NodeLink struct {
// Encoding returns protocol encoding with which the link was handshaked. XXX
// XXX place
func (nl *NodeLink) Encoding() proto.Encoding {
return nl.enc
func (link *NodeLink) Encoding() proto.Encoding {
return link.enc
}
// XXX rx handoff make latency better for serial request-reply scenario but
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment