Commit 01bae9f2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d95400c1
......@@ -356,7 +356,7 @@ loop:
// new connection comes in
case n := <-m.nodeCome:
// FIXME if identify=ok -> subscribe to δ(nodeTab) and send initial nodeTab right after accept (accept should do it?)
node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
if node == nil {
goreject(ctx, wg, n.req, resp)
......@@ -369,7 +369,7 @@ loop:
go func() {
defer wg.Done()
err := m.accept(ctx, n.req, resp)
err := m.accept(node, state0, n.req, resp)
if err != nil {
recovery <- storRecovery{stor: node, err: err}
return
......@@ -609,7 +609,7 @@ loop:
for inprogress > 0 {
select {
case n := <-m.nodeCome:
node, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
if node == nil {
goreject(ctx, wg, n.req, resp)
......@@ -622,7 +622,7 @@ loop:
go func() {
defer wg.Done()
err := m.accept(ctx, n.req, resp)
err := m.accept(node, state0, n.req, resp)
if err != nil {
verify <- storVerify{stor: node, err: err}
return
......@@ -822,7 +822,7 @@ loop:
select {
// new connection comes in
case n := <-m.nodeCome:
node, resp := m.identify(ctx, n, /* XXX accept everyone */)
node, state0, resp := m.identify(ctx, n, /* XXX accept everyone */)
if node == nil {
goreject(ctx, wg, n.req, resp)
......@@ -833,7 +833,7 @@ loop:
go func() {
defer wg.Done()
err = m.accept(ctx, n.req, resp)
err = m.accept(node, state0, n.req, resp)
if err != nil {
serviced <- serviceDone{node: node, err: err}
return
......@@ -988,8 +988,8 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// XXX place
// called from main master process. XXX
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) {
m.node.State.NodeTab.Update(nodeInfo)
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
peer := m.node.State.NodeTab.Update(nodeInfo)
event := &_ΔNodeTab{nodeInfo}
// XXX locking
......@@ -1005,6 +1005,8 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) {
log.Warningf(ctx, "peer %s is slow -> detaching it", nid)
// TODO ^^^
}
return peer
}
// XXX place
......@@ -1109,7 +1111,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification.
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, resp proto.Msg) {
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, resp proto.Msg) {
// XXX also verify ? :
// - NodeType valid
// - IdTime ?
......@@ -1158,7 +1160,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
subj := fmt.Sprintf("identify: %s (%s)", n.req.Link().RemoteAddr(), n.idReq.NID)
if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err)
return nil, err
return nil, nil, err
}
log.Infof(ctx, "%s: accepting as %s", subj, nid)
......@@ -1193,7 +1195,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
node.SetLink(n.req.Link())
// make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates
state0 := m.node.State.Snapshot()
state0 = m.node.State.Snapshot()
// TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details)
notifyq := make(chan _ΔClusterState, 1024)
......@@ -1206,26 +1208,30 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
}()
*/
return node, accept
return node, state0, accept
}
func (m *Master) accept(peer *xneo.PeerNode, idReq *neonet.Request, idResp proto.Msg) error {
// accept sends acceptance to just identified peer, sends nodeTab and partTab
// and spawns task to proxy their updates to the peer. XXX
func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, idReq *neonet.Request, idResp proto.Msg) error {
err := idReq.Reply(idResp)
if err != nil {
return fmt.Errorf("send accept: %w", err)
}
// XXX idReq close?
// send initial state snapshot to accepted node
link := peer.Link() // XXX -> idReq.Link() instead?
// nodeTab
err = link.Send1(state0.nodeTab) // = proto.NotifyNodeInformation{
err = link.Send1(&state0.NodeTab)
if err != nil {
return fmt.Errorf("send nodeTab: %w", err)
}
// partTab (not to S until cluster is RUNNING)
if !(peer.NodeType == proto.STORAGE && cs.StateCode != proto.ClusterRunning) {
err = link.Send1(state0.partTab)
if !(peer.Type == proto.STORAGE && state0.Code != proto.ClusterRunning) {
err = link.Send1(&state0.PartTab)
if err != nil {
return fmt.Errorf("send partTab: %w", err)
}
......@@ -1239,7 +1245,7 @@ func (m *Master) accept(peer *xneo.PeerNode, idReq *neonet.Request, idResp proto
wg.Go(func(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "send cluster updates")(&err)
stateCode == state0.StateCode
stateCode := state0.Code
for {
var δstate _ΔClusterState
......@@ -1255,13 +1261,13 @@ func (m *Master) accept(peer *xneo.PeerNode, idReq *neonet.Request, idResp proto
switch δstate := δstate.(type) {
case *_ΔNodeTab:
msg = &proto.NotifyNodeInformation{
IdTime: ...,
NodeList: []NodeInfo{δstate.NodeInfo},
IdTime: proto.IdTimeNone, // FIXME
NodeList: []proto.NodeInfo{δstate.NodeInfo},
}
case *_ΔPartTab:
// don't send δpartTab to S unless cluster is RUNNING
if peer.NodeType == proto.STORAGE && stateCode != ClusterRunning {
if peer.Type == proto.STORAGE && stateCode != proto.ClusterRunning {
continue
}
msg = &proto.NotifyPartitionChanges{
......@@ -1275,7 +1281,7 @@ func (m *Master) accept(peer *xneo.PeerNode, idReq *neonet.Request, idResp proto
case *_ΔStateCode:
stateCode = δstate.ClusterState
msg = &proto.NotifyClusterState{
State: stateCode
State: stateCode,
}
default:
......@@ -1287,7 +1293,9 @@ func (m *Master) accept(peer *xneo.PeerNode, idReq *neonet.Request, idResp proto
return err
}
}
}()
})
return nil
}
// allocNID allocates new node ID for a node of kind nodeType.
......
......@@ -46,7 +46,7 @@ func (cs *ClusterState) IsOperational() bool {
type ClusterStateSnapshot struct {
NodeTab proto.NotifyNodeInformation
PartTab proto.SendPartitionTable
Code proto.CluterState
Code proto.ClusterState
}
func (cs *ClusterState) Snapshot() *ClusterStateSnapshot {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment