Commit 3dd45621 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 336fc1be
...@@ -78,8 +78,9 @@ type Master struct { ...@@ -78,8 +78,9 @@ type Master struct {
nodeLeaveq chan nodeLeave // main <- peerWG.wait "node (should be) disconnected" nodeLeaveq chan nodeLeave // main <- peerWG.wait "node (should be) disconnected"
// in addition to nodeTab (which keeps information about a node) tasks // in addition to nodeTab (which keeps information about a node) tasks
// that are specific to a peer are organized around peerWorkTab[peer.nid]. // and mastering context that are specific to a peer are organized
peerWorkTab map[proto.NodeID]*_MasteredPeer // around peerTab[peer.nid].
peerTab map[proto.NodeID]*_MasteredPeer
// last allocated oid & tid // last allocated oid & tid
...@@ -94,7 +95,7 @@ type Master struct { ...@@ -94,7 +95,7 @@ type Master struct {
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ? ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ?
// so tests could override // so tests could override
monotime func() float64 xtimeMono func() float64
} }
// nodeCome represents "node connects" event. // nodeCome represents "node connects" event.
...@@ -114,7 +115,9 @@ type nodeLeave struct { ...@@ -114,7 +115,9 @@ type nodeLeave struct {
// .notify // .notify
// .wait (run under mainWG) // .wait (run under mainWG)
type _MasteredPeer struct { type _MasteredPeer struct {
peer *xneo.PeerNode // XXX naming -> node ? node *xneo.PeerNode
accept *proto.AcceptIdentification // identify decided to accept this peer with .accept
// all tasks are spawned under wg. If any task fails - whole wg is canceled. // all tasks are spawned under wg. If any task fails - whole wg is canceled.
wg *xsync.WorkGroup wg *xsync.WorkGroup
...@@ -159,7 +162,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master { ...@@ -159,7 +162,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master {
nodeComeq: make(chan nodeCome), nodeComeq: make(chan nodeCome),
// nodeLeave: make(chan nodeLeave), // nodeLeave: make(chan nodeLeave),
monotime: xtime.Mono, xtimeMono: xtime.Mono,
} }
return m return m
...@@ -400,12 +403,8 @@ loop: ...@@ -400,12 +403,8 @@ loop:
select { select {
// new connection comes in // new connection comes in
case n := <-m.nodeComeq: case n := <-m.nodeComeq:
// FIXME if identify=ok -> subscribe to δ(nodeTab) and send initial nodeTab right after accept (accept should do it?) peer, ok := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */) if !ok {
// XXX -> move to identify (spawn under peerWork.wg) ?
if node == nil {
goreject(ctx, wg, n.req, resp)
break break
} }
...@@ -423,7 +422,7 @@ loop: ...@@ -423,7 +422,7 @@ loop:
pt, err = storCtlRecovery(ctx, node) pt, err = storCtlRecovery(ctx, node)
return err return err
}) })
recovery <- storRecovery{stor: node, partTab: pt, err: err} recovery <- storRecovery{stor: peer.node, partTab: pt, err: err}
/* /*
err := m.accept(node, state0, n.req, resp) err := m.accept(node, state0, n.req, resp)
...@@ -881,10 +880,8 @@ loop: ...@@ -881,10 +880,8 @@ loop:
select { select {
// new connection comes in // new connection comes in
case n := <-m.nodeComeq: case n := <-m.nodeComeq:
node, state0, resp := m.identify(ctx, n, /* XXX accept everyone */) peer, ok := m.identify(ctx, n, /* XXX accept everyone */)
if !ok {
if node == nil {
goreject(ctx, wg, n.req, resp)
break break
} }
...@@ -1171,11 +1168,11 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er ...@@ -1171,11 +1168,11 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// identify processes identification request of just connected node and either accepts or declines it. // identify processes identification request of just connected node and either accepts or declines it.
// //
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned. // If node identification is accepted .nodeTab and .peerTab are updated and
// XXX + state0. XXX +notifyTab. // corresponding peer entry is returned. XXX
// Response message is constructed but not send back not to block the caller - it is // Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification. XXX via .accept() // the caller responsibility to send the response to node which requested identification. XXX via .accept()
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, resp proto.Msg) { func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredNode, ok bool) {
// XXX also verify ? : // XXX also verify ? :
// - NodeType valid // - NodeType valid
// - IdTime ? // - IdTime ?
...@@ -1224,7 +1221,14 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1224,7 +1221,14 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
subj := fmt.Sprintf("identify: %s (%s)", n.req.Link().RemoteAddr(), n.idReq.NID) subj := fmt.Sprintf("identify: %s (%s)", n.req.Link().RemoteAddr(), n.idReq.NID)
if err != nil { if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err) log.Infof(ctx, "%s: rejecting: %s", subj, err)
return nil, err m.mainWG.Go(func(ctx context.Context) error {
// XXX close link on ctx cancel?
idReq.Reply(err)
idReq.Link.Close()
// XXX log err (if any)
return nil // not to cancel main by a failing reject
})
return nil, false
} }
log.Infof(ctx, "%s: accepting as %s", subj, nid) log.Infof(ctx, "%s: accepting as %s", subj, nid)
...@@ -1251,7 +1255,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1251,7 +1255,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
Addr: n.idReq.Address, Addr: n.idReq.Address,
NID: nid, NID: nid,
State: nodeState, State: nodeState,
IdTime: proto.IdTime(m.monotime()), IdTime: proto.IdTime(m.timeMono()),
} }
node = m.updateNodeTab(ctx, nodeInfo) node = m.updateNodeTab(ctx, nodeInfo)
...@@ -1259,6 +1263,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1259,6 +1263,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
// make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates // make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates
m.peerWorkTab[node.NID] = &_MasteredPeer{ m.peerWorkTab[node.NID] = &_MasteredPeer{
peer: node,
accept: accept,
wg: xsync.NewWorkGroup(m.runCtx), // XXX wrong -> per peer ctx (derived from runCtx) wg: xsync.NewWorkGroup(m.runCtx), // XXX wrong -> per peer ctx (derived from runCtx)
state0: m.node.State.Snapshot(), state0: m.node.State.Snapshot(),
// TODO change limiting by buffer size -> to limiting by time // TODO change limiting by buffer size -> to limiting by time
...@@ -1267,7 +1273,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1267,7 +1273,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
notifyqOverflow: make(chan struct{}), notifyqOverflow: make(chan struct{}),
} }
return node, accept return peer, true
} }
// accept sends acceptance to just identified peer, sends nodeTab and partTab // accept sends acceptance to just identified peer, sends nodeTab and partTab
......
...@@ -25,13 +25,13 @@ import ( ...@@ -25,13 +25,13 @@ import (
"context" "context"
// "fmt" // "fmt"
// "net" // "net"
"sync" // "sync"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/internal/log" // "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/go123/xerr" // "lab.nexedi.com/kirr/go123/xerr"
) )
/* /*
...@@ -71,6 +71,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error { ...@@ -71,6 +71,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
// ---------------------------------------- // ----------------------------------------
// XXX move -> master.go // XXX move -> master.go
/*
// reject sends rejective identification response and closes associated link // reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neonet.Request, resp proto.Msg) { func reject(ctx context.Context, req *neonet.Request, resp proto.Msg) {
err1 := req.Reply(resp) err1 := req.Reply(resp)
...@@ -88,6 +89,7 @@ func goreject(ctx context.Context, wg *sync.WaitGroup, req *neonet.Request, resp ...@@ -88,6 +89,7 @@ func goreject(ctx context.Context, wg *sync.WaitGroup, req *neonet.Request, resp
defer wg.Done() defer wg.Done()
go reject(ctx, req, resp) go reject(ctx, req, resp)
} }
*/
// accept replies with acceptive identification response // accept replies with acceptive identification response
// XXX spawn ping goroutine from here? // XXX spawn ping goroutine from here?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment