Commit 7badf3b1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9a128aa5
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
...@@ -53,7 +54,7 @@ import ( ...@@ -53,7 +54,7 @@ import (
type Client struct { type Client struct {
node *_MasteredNode node *_MasteredNode
// runWG *xsync.WorkGroup runWG *xsync.WorkGroup
runCancel func() runCancel func()
// driver client <- watcher: database commits | errors. // driver client <- watcher: database commits | errors.
...@@ -77,10 +78,15 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -77,10 +78,15 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// It will connect to master @masterAddr and identify with specified cluster name. // It will connect to master @masterAddr and identify with specified cluster name.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
return &Client{ c := &Client{
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
} }
var runCtx context.Context
runCtx, c.runCancel = context.WithCancel(context.Background())
c.runWG = xsync.NewWorkGroup(runCtx)
return c
} }
// Run starts client node and runs it until either ctx is canceled or master // Run starts client node and runs it until either ctx is canceled or master
...@@ -89,11 +95,11 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -89,11 +95,11 @@ func (c *Client) Run(ctx context.Context) (err error) {
// run process which performs master talk // run process which performs master talk
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
c.runCancel = cancel c.runCancel = cancel
// c.node.OnShutdown = cancel // XXX ok?
// return c.talkMaster(ctx)
// c.runWG = xsync.NewWorkGroup(ctx) // XXX create it in NewClient ? (Close also uses it) c.runWG.Go(func(runCtx context.Context) error {
// c.runWG.Go(c.node.talkMaster) ctx, cancel := xcontext.Merge(ctx, runCtx) // TODO -> MergeCancel
// c.runWG.Go(c.recvMaster) defer cancel()
return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error { return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
// XXX errctx ("on redial"? "connected"?) // XXX errctx ("on redial"? "connected"?)
...@@ -116,14 +122,18 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -116,14 +122,18 @@ func (c *Client) Run(ctx context.Context) (err error) {
return wg.Wait() return wg.Wait()
}) })
})
// return c.runWG.Wait() return c.runWG.Wait()
} }
// Close implements zodb.IStorageDriver. // Close implements zodb.IStorageDriver.
func (c *Client) Close() (err error) { func (c *Client) Close() (err error) {
c.runCancel() c.runCancel()
// err = c.runWG.Wait() XXX reenable err = c.runWG.Wait()
if errors.Is(err, context.Canceled) {
err = nil // we canceled it
}
// close networker if configured to do so // close networker if configured to do so
if c.ownNet { if c.ownNet {
......
...@@ -120,7 +120,6 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -120,7 +120,6 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
}, },
opReady: make(chan struct{}), opReady: make(chan struct{}),
rxm: make(chan _RxM),
} }
return node return node
...@@ -244,11 +243,13 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -244,11 +243,13 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
wg := xsync.NewWorkGroup(ctx) wg := xsync.NewWorkGroup(ctx)
// receive and handle notifications from master // receive and handle notifications from master
node.rxm = make(chan _RxM)
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
defer task.Running(&ctx, "rx prefilter")(&err) defer task.Running(&ctx, "rx prefilter")(&err)
for { for {
req, err := mlink.Recv1() req, err := mlink.Recv1()
if err != nil { if err != nil {
close(node.rxm)
return err return err
} }
err = node.recvMaster1(ctx, req) // req ownership is passed in err = node.recvMaster1(ctx, req) // req ownership is passed in
...@@ -305,13 +306,16 @@ func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request) ...@@ -305,13 +306,16 @@ func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request)
return nil return nil
} }
var errMasterDisconect = errors.New("master disconnected")
// RecvM1 receives request from master filtered through δstate handler. // RecvM1 receives request from master filtered through δstate handler.
// //
// XXX link down ? // Must be called only when master link is established - e.g. from under TalkMaster.
// XXX kill err?
func (node *_MasteredNode) RecvM1() (neonet.Request, error) { func (node *_MasteredNode) RecvM1() (neonet.Request, error) {
rx := <-node.rxm rx, ok := <-node.rxm
// XXX close -> EOF? if !ok {
return neonet.Request{}, errMasterDisconect
}
return rx.Req, rx.Err return rx.Req, rx.Err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment