Commit 94a98550 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0fdf4c40
......@@ -154,21 +154,16 @@ func (_ *_ΔStateCode) δClusterState() {}
//
// Use Run to actually start running the node.
func NewMaster(clusterName string, net xnet.Networker) *Master {
m := &Master{
return &Master{
node: xneo.NewNode(proto.MASTER, clusterName, net, ""),
ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}),
nodeComeq: make(chan nodeCome),
nodeLeaveq: make(chan nodeLeave),
peerTab: make(map[proto.NodeID]*_MasteredPeer),
monotime: xtime.Mono,
ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}),
nodeComeq: make(chan nodeCome),
nodeLeaveq: make(chan nodeLeave),
peerTab: make(map[proto.NodeID]*_MasteredPeer),
monotime: xtime.Mono,
}
return m
}
var ErrStartNonOperational = errors.New("start: cluster is non-operational")
......@@ -1026,6 +1021,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// other peers are notified with δnodeTab about it.
func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) {
log.Infof(ctx, "disconnecting %s", peer.node.NID)
peer.cancel()
peer.node.ResetLink(ctx)
delete(m.peerTab, peer.node.NID)
m.updateNodeState(ctx, peer.node, proto.DOWN)
......@@ -1181,15 +1177,19 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
}
})
// send accept and indicate to run that initial acceptance is done
err := peer.accept(ctx)
if err != nil {
return nil
}
close(peer.acceptSent)
// XXX compensate a bit for lack of ctx handling in Send/Recv
return xxcontext.WithCloseOnErrCancel(ctx, link, func() error {
// proxy δnodeTab,δpartTab/δclusterState from main to the peer
return peer.notify(ctx)
// send accept and indicate to run that initial acceptance is done
err := peer.accept(ctx)
if err != nil {
return nil
}
close(peer.acceptSent)
// proxy δnodeTab,δpartTab/δclusterState from main to the peer
return peer.notify(ctx)
})
})
return peer, true
......@@ -1259,12 +1259,9 @@ func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) {
default:
}
// XXX try to move logging into other place - so that we could remove ctx arg here
log.Warningf(ctx, "peer %s is slow -> detaching it", nid)
close(peer.notifyqOverflow)
// TODO delete(m.peerTab, nid) -> p.cancel()
// XXX what else?
panic("TODO")
delete(m.peerTab, nid)
}
}
......@@ -1273,63 +1270,58 @@ func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) {
func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "notify %s", p.node.NID)(&err)
link := p.node.Link()
stateCode := p.state0.Code
// XXX vvv right?
return xxcontext.WithCloseOnErrCancel(ctx, p.node.Link(), func() error {
for {
var δstate _ΔClusterState
select {
case <-ctx.Done():
return ctx.Err()
case <-p.notifyqOverflow:
// XXX err -> ?
return fmt.Errorf("detaching (peer is too slow to consume updates)")
for {
var δstate _ΔClusterState
case δstate = <-p.notifyq: // XXX could be also closed?
}
select {
case <-ctx.Done():
return ctx.Err()
var msg proto.Msg
switch δstate := δstate.(type) {
case *_ΔNodeTab:
msg = &proto.NotifyNodeInformation{
IdTime: proto.IdTimeNone, // FIXME
NodeList: []proto.NodeInfo{δstate.NodeInfo},
}
case <-p.notifyqOverflow:
return fmt.Errorf("detaching (peer is too slow to consume updates)")
case *_ΔPartTab:
// don't send δpartTab to S unless cluster is RUNNING
if p.node.Type == proto.STORAGE && stateCode != proto.ClusterRunning {
continue
}
msg = &proto.NotifyPartitionChanges{
// XXX
// PTid: ...,
// NumReplicas: ...,
// CellList: ...,
}
panic("TODO") // ^^^
case δstate = <-p.notifyq:
}
case *_ΔStateCode:
stateCode = δstate.ClusterState
msg = &proto.NotifyClusterState{
State: stateCode,
}
var msg proto.Msg
switch δstate := δstate.(type) {
case *_ΔNodeTab:
msg = &proto.NotifyNodeInformation{
IdTime: proto.IdTimeNone, // XXX ok?
NodeList: []proto.NodeInfo{δstate.NodeInfo},
}
default:
panic("bug")
case *_ΔPartTab:
// don't send δpartTab to S unless cluster is RUNNING
if p.node.Type == proto.STORAGE && stateCode != proto.ClusterRunning {
continue
}
msg = &proto.NotifyPartitionChanges{
// XXX
// PTid: ...,
// NumReplicas: ...,
// CellList: ...,
}
panic("TODO") // ^^^
err := p.node.Link().Send1(msg)
if err != nil {
return err
case *_ΔStateCode:
stateCode = δstate.ClusterState
msg = &proto.NotifyClusterState{
State: stateCode,
}
default:
panic("bug")
}
})
return nil
err := link.Send1(msg)
if err != nil {
return err
}
}
}
// allocNID allocates new node ID for a node of kind nodeType.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment