Commit 64c62847 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 60abb484
...@@ -20,6 +20,12 @@ ...@@ -20,6 +20,12 @@
package neo package neo
// master node // master node
// XXX master organization
//
// - main goroutine that is the only mutator of nodeTab, partTab, etc
// - per peer workers are spawned that interact with main via channels
// - δnodeTab, δpartTab updates are proxied to peer by another per-peer goroutine
import ( import (
"context" "context"
stderrors "errors" stderrors "errors"
...@@ -47,9 +53,10 @@ type Master struct { ...@@ -47,9 +53,10 @@ type Master struct {
node *xneo.Node node *xneo.Node
// master manages node and partition tables and broadcast their updates // master manages node and partition tables and broadcast their updates
// to all nodes in the cluster // to all connected nodes. δnodeTab/δpartTab updates are proxied to
notifyWG sync.WaitGroup // nodeTab/partTab updates are proxied by per-peer goroutine // a peer by per-peer goroutine reading from .notifyTab[peer.nid] channel.
notifyTab map[proto.NodeID]chan _ΔClusterState // registered in notifyTab XXX ^^^ notifyWG sync.WaitGroup
notifyTab map[proto.NodeID]chan _ΔClusterState // XXX naming -> δStateNotifyTab?
// last allocated oid & tid // last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ? // XXX how to start allocating oid from 0, not 1 ?
...@@ -883,9 +890,6 @@ func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) { ...@@ -883,9 +890,6 @@ func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
slink := stor.Link() slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err)
// XXX send nodeTab ? -> yes
// XXX send clusterInformation ?
// XXX current neo/py does StartOperation / NotifyReady as separate // XXX current neo/py does StartOperation / NotifyReady as separate
// sends, not exchange on the same conn. - fixed // sends, not exchange on the same conn. - fixed
ready := proto.NotifyReady{} ready := proto.NotifyReady{}
...@@ -934,7 +938,7 @@ func (m *Master) serveClient(ctx context.Context, cli *xneo.PeerNode) (err error ...@@ -934,7 +938,7 @@ func (m *Master) serveClient(ctx context.Context, cli *xneo.PeerNode) (err error
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink? defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink?
// FIXME send initial nodeTab and partTab before starting serveClient1 // FIXME send initial nodeTab and partTab before starting serveClient1
// (move those initial sends from keepPeerUpdated to here) // (move those initial sends from keepPeerUpdated to .accept)
// M -> C notifications about cluster state // M -> C notifications about cluster state
wg.Go(func() error { wg.Go(func() error {
......
...@@ -71,8 +71,6 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error { ...@@ -71,8 +71,6 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
// ---------------------------------------- // ----------------------------------------
// XXX move -> master.go // XXX move -> master.go
// reject sends rejective identification response and closes associated link // reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neonet.Request, resp proto.Msg) { func reject(ctx context.Context, req *neonet.Request, resp proto.Msg) {
err1 := req.Reply(resp) err1 := req.Reply(resp)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment