Commit 030b0056 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e88063c9
...@@ -898,7 +898,7 @@ func (m *Master) serve(ctx context.Context) (err error) { ...@@ -898,7 +898,7 @@ func (m *Master) serve(ctx context.Context) (err error) {
ctlStop = nil ctlStop = nil
nodeComeq = nil nodeComeq = nil
// XXX tell storages to stop // TODO tell storages to stop serving
} }
for inprogress > 0 { for inprogress > 0 {
...@@ -941,7 +941,7 @@ func (m *Master) serve(ctx context.Context) (err error) { ...@@ -941,7 +941,7 @@ func (m *Master) serve(ctx context.Context) (err error) {
return err return err
} }
// storCtlServe drives a storage node during cluster serve state // storCtlServe drives a storage node during cluster serve state.
func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) { func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) {
defer task.Runningf(&ctx, "%s serve", stor.node.NID)(&err) defer task.Runningf(&ctx, "%s serve", stor.node.NID)(&err)
slink := stor.node.Link() slink := stor.node.Link()
...@@ -959,8 +959,9 @@ func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) { ...@@ -959,8 +959,9 @@ func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) {
//if err != nil { //if err != nil {
// return err // return err
//} //}
//req.Close() XXX must be after req handling //msg := req.Msg
//switch msg := req.Msg.(type) { //req.Close()
//switch msg := msg.(type) {
//case *proto.NotifyReady: //case *proto.NotifyReady:
// // ok // // ok
//case *proto.Error: //case *proto.Error:
...@@ -974,12 +975,11 @@ func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) { ...@@ -974,12 +975,11 @@ func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) {
// TODO this should be also controlling transactions // TODO this should be also controlling transactions
for { for {
select { select {
// XXX stub
case <-time.After(1*time.Second): case <-time.After(1*time.Second):
//println(".") //println(".")
case <-ctx.Done(): case <-ctx.Done():
// FIXME also send StopOperation // TODO also send StopOperation
return ctx.Err() return ctx.Err()
} }
} }
...@@ -1024,18 +1024,33 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms ...@@ -1024,18 +1024,33 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// disconnectPeer resets link to the peer and sets its state to DOWN in nodeTab. // disconnectPeer resets link to the peer and sets its state to DOWN in nodeTab.
// other peers are notified with δnodeTab about it. // other peers are notified with δnodeTab about it.
// must be called from main.
// XXX place=?
func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) { func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) {
// XXX log? log.Infof(ctx, "disconnecting %s", peer.node.NID)
peer.node.ResetLink(ctx) peer.node.ResetLink(ctx)
delete(m.peerTab, peer.node.NID) delete(m.peerTab, peer.node.NID)
m.updateNodeState(ctx, peer.node, proto.DOWN) m.updateNodeState(ctx, peer.node, proto.DOWN)
} }
// notifyAll notifies all peers about event. // updateNodeTab = .nodeTab.Update + send δnodeTab to all subscribers.
// must be called from main. // must be called from main.
// XXX place // XXX place
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
node := m.node.State.NodeTab.Update(nodeInfo)
m.notifyAll(ctx, &_ΔNodeTab{nodeInfo})
return node
}
// XXX place
// XXX doc
func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state proto.NodeState) {
nodei := node.NodeInfo
// XXX skip if .State == state ?
nodei.State = state
m.updateNodeTab(ctx, nodei)
}
// notifyAll notifies all peers about event.
// XXX place
func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) { func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) {
// XXX locking // XXX locking
for nid, peer := range m.peerTab { for nid, peer := range m.peerTab {
...@@ -1057,24 +1072,6 @@ func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) { ...@@ -1057,24 +1072,6 @@ func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) {
} }
// updateNodeTab = .nodeTab.Update + send δnodeTab to all subscribers.
// must be called from main.
// XXX place
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
node := m.node.State.NodeTab.Update(nodeInfo)
m.notifyAll(ctx, &_ΔNodeTab{nodeInfo})
return node
}
// XXX place
// XXX doc
func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state proto.NodeState) {
nodei := node.NodeInfo
// XXX skip if .State == state ?
nodei.State = state
m.updateNodeTab(ctx, nodei)
}
// ---------------------------------------- // ----------------------------------------
...@@ -1336,10 +1333,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) { ...@@ -1336,10 +1333,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
return nil return nil
} }
// allocNID allocates new node ID for a node of kind nodeType. // allocNID allocates new node ID for a node of kind nodeType.
// XXX it is bad idea for master to assign node ID to coming node
// -> better nodes generate really unique UUID themselves and always show with them
func (m *Master) allocNID(nodeType proto.NodeType) proto.NodeID { func (m *Master) allocNID(nodeType proto.NodeType) proto.NodeID {
for num := int32(1); num < 1<<24; num++ { for num := int32(1); num < 1<<24; num++ {
nid := proto.NID(nodeType, num) nid := proto.NID(nodeType, num)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment