Commit e744ca3e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6cf05c5e
...@@ -844,8 +844,10 @@ func storCtlVerify(ctx context.Context, stor *_MasteredPeer, pt *xneo.PartitionT ...@@ -844,8 +844,10 @@ func storCtlVerify(ctx context.Context, stor *_MasteredPeer, pt *xneo.PartitionT
// serviceDone is the error returned after service-phase node handling is finished. // serviceDone is the error returned after service-phase node handling is finished.
type serviceDone struct { type serviceDone struct {
node *xneo.PeerNode peer *_MasteredPeer
err error err error
ack chan struct{}
} }
// service drives cluster during running state. // service drives cluster during running state.
...@@ -870,19 +872,19 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -870,19 +872,19 @@ func (m *Master) service(ctx context.Context) (err error) {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx) ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel() defer cancel()
switch node.Type { switch peer.node.Type {
case proto.STORAGE: case proto.STORAGE:
err = storCtlService(ctx, node) err = storCtlService(ctx, peer)
case proto.CLIENT: case proto.CLIENT:
err = m.serveClient(ctx, node) err = m.serveClient(ctx, peer)
// XXX ADMIN // XXX ADMIN
} }
// XXX do we need vvv ? // XXX do we need vvv ?
ack := make(chan struct{}) ack := make(chan struct{})
servedq <- serviceDone{stor: stor, err: err, ack: ack} servedq <- serviceDone{peer: peer, err: err, ack: ack}
<-ack <-ack
// canceled service does not necessarily mean we should down the peer // canceled service does not necessarily mean we should down the peer
...@@ -894,10 +896,10 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -894,10 +896,10 @@ func (m *Master) service(ctx context.Context) (err error) {
} }
// spawn peer serve driver (it should be only storages on entry here?) // spawn peer serve driver (it should be only storages on entry here?)
for _, peer := range m.peerTab() { for _, peer := range m.peerTab {
// XXX clients? other nodes? // XXX clients? other nodes?
// XXX note PENDING - not adding to service; ok? // XXX note PENDING - not adding to service; ok?
if peer.node.Type == proto.Storage && peer.node.State == proto.RUNNING { if peer.node.Type == proto.STORAGE && peer.node.State == proto.RUNNING {
goServe(peer) goServe(peer)
} }
} }
...@@ -919,7 +921,7 @@ loop: ...@@ -919,7 +921,7 @@ loop:
break loop break loop
// peer (should be) disconnected // peer (should be) disconnected
case n := <-m.nodeLeave: case n := <-m.nodeLeaveq:
m.disconnectPeer(ctx, n.peer) m.disconnectPeer(ctx, n.peer)
// if cluster became non-operational - cancel service // if cluster became non-operational - cancel service
...@@ -1001,7 +1003,7 @@ func storCtlService(ctx context.Context, stor *_MasteredPeer) (err error) { ...@@ -1001,7 +1003,7 @@ func storCtlService(ctx context.Context, stor *_MasteredPeer) (err error) {
// serveClient serves incoming client link. // serveClient serves incoming client link.
func (m *Master) serveClient(ctx context.Context, cli *_MasteredPeer) (err error) { func (m *Master) serveClient(ctx context.Context, cli *_MasteredPeer) (err error) {
defer task.Runningf(&ctx, "%s: client service", cli.node.NID)(&err) defer task.Runningf(&ctx, "%s: client service", cli.node.NID)(&err)
clink := cli.Link() clink := cli.node.Link()
// wg, ctx := errgroup.WithContext(ctx) // XXX -> sync.WorkGroup // wg, ctx := errgroup.WithContext(ctx) // XXX -> sync.WorkGroup
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink? (better not here) defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink? (better not here)
...@@ -1209,7 +1211,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1209,7 +1211,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
// XXX check nid matches NodeType // XXX check nid matches NodeType
node = m.node.State.NodeTab.Get(nid) node := m.node.State.NodeTab.Get(nid)
if node != nil { if node != nil {
// reject - nid is already occupied by someone else // reject - nid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting // XXX check also for down state - it could be the same node reconnecting
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment