Commit 171c9f09 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e699c83b
......@@ -24,7 +24,6 @@ import (
"context"
"fmt"
"sync"
// "time"
"github.com/pkg/errors"
......@@ -42,6 +41,7 @@ import (
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
)
// Storage is NEO node that keeps data and provides read/write access to it via network.
......@@ -96,25 +96,35 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l))
// start serving incoming connections
wg := sync.WaitGroup{}
wg := xsync.NewWorkGroup(ctx)
/*
serveCtx, serveCancel := context.WithCancel(ctx)
//stor.node.OnShutdown = serveCancel
// XXX hack: until ctx cancel is not handled properly by Recv/Send
// XXX -> xcontext.WithCloseOnRetCancel
stor.node.OnShutdown = func() {
serveCancel()
xio.LClose(ctx, lli)
}
*/
// connect to master and get commands and updates from it
wg.Go(func(ctx context.Context) error {
return stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
// XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
// return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
// }
// let master initialize us. If successful this ends with StartOperation command.
reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil {
return err
}
// we got StartOperation command. Let master drive us during service phase.
return stor.m1serve(ctx, reqStart)
})
})
wg.Add(1)
go func(ctx context.Context) (err error) {
defer wg.Done()
// serve incoming connections
wg.Go(func(ctx context.Context) (err error) {
defer task.Running(&ctx, "accept")(&err)
serveWG := sync.WaitGroup{}
defer serveWG.Wait()
// XXX dup from master -> Node.Listen() -> Accept() ?
// XXX ? -> Node.Accept(lli) (it will verify IdTime against Node.nodeTab[nid])
// XXX ? -> Node.Serve(lli -> func(idReq))
......@@ -131,41 +141,15 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
continue
}
wg.Add(1)
serveWG.Add(1)
go func() {
defer wg.Done()
defer serveWG.Done()
stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged
}()
}
// }(serveCtx)
}(ctx)
// connect to master and get commands and updates from it
// err = stor.talkMaster(ctx)
// err = stor.talkMaster(serveCtx) // XXX hack for shutdown
// XXX log err?
err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
// XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
// return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
// }
// let master initialize us. If successful this ends with StartOperation command.
reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil {
return err
}
// we got StartOperation command. Let master drive us during service phase.
return stor.m1serve(ctx, reqStart)
})
// we are done - shutdown
// serveCancel()
wg.Wait()
err = wg.Wait()
// XXX should Storage do it, or should it leave back non-closed?
// TODO -> Storage should not close backend.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment