Commit 4841f886 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 48faf697
...@@ -113,19 +113,41 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { ...@@ -113,19 +113,41 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
// Run starts client node and runs it until either ctx is canceled or master // Run starts client node and runs it until either ctx is canceled or master
// commands it to shutdown. (TODO verify M->shutdown) // commands it to shutdown. (TODO verify M->shutdown)
func (cli *Client) Run(ctx context.Context) (err error) { func (c *Client) Run(ctx context.Context) (err error) {
defer task.Running(&ctx, "client")(&err) defer task.Running(&ctx, "client")(&err)
// run process which performs master talk // run process which performs master talk
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
cli.runCancel = cancel c.runCancel = cancel
// cli.node.OnShutdown = cancel // XXX ok? // c.node.OnShutdown = cancel // XXX ok?
// return cli.talkMaster(ctx) // return c.talkMaster(ctx)
cli.runWG = xsync.NewWorkGroup(ctx) // XXX create it in NewClient ? (Close also uses it) // c.runWG = xsync.NewWorkGroup(ctx) // XXX create it in NewClient ? (Close also uses it)
cli.runWG.Go(cli.node.talkMaster) // c.runWG.Go(c.node.talkMaster)
cli.runWG.Go(cli.recvMaster) // c.runWG.Go(c.recvMaster)
return cli.runWG.Wait() return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
// XXX errctx ("on redial"? "connected"?)
c.head0 = c.head
wg := xsync.NewWorkGroup(ctx)
// launch master notifications receiver
wg.Go(func(ctx context.Context) error {
return c.recvMaster(ctx)
})
// init lastTid from master
// TODO better change protocol for master to send us head via notify
// channel right after identification.
wg.Go(func(ctx context.Context) error {
return c.initFromMaster(ctx, mlink)
})
return wg.Wait()
})
// return c.runWG.Wait()
} }
// Close implements zodb.IStorageDriver. // Close implements zodb.IStorageDriver.
...@@ -368,17 +390,8 @@ func (c *Client) recvMaster(ctx context.Context) (err error) { ...@@ -368,17 +390,8 @@ func (c *Client) recvMaster(ctx context.Context) (err error) {
for { for {
req, err := c.node.RecvM1() req, err := c.node.RecvM1()
if err != nil { if err != nil {
switch {
// reconnected to master
case errors.Is(err, eventMasterReconnect):
c.head0 = c.head
// XXX -> initFromMaster
default:
return err return err
} }
continue
}
err = c.recvMaster1(req.Msg) err = c.recvMaster1(req.Msg)
req.Close() req.Close()
......
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log" "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task" "lab.nexedi.com/kirr/neo/go/internal/task"
...@@ -132,17 +133,19 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -132,17 +133,19 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
// talkMaster dials master, identifies to it, and receives master notifications and requests. // TalkMaster dials master, identifies to it, and receives master notifications and requests.
// //
// Notifications to node/partition tables and cluster state are automatically // Notifications to node/partition tables and cluster state are automatically
// handled, while other notifications and requests are passed through to RecvM1. // handled, while other notifications and requests are passed through to RecvM1.
// //
// The connection to master is persisted by redial as needed. // The connection to master is persisted by redial as needed.
func (node *_MasteredNode) talkMaster(ctx context.Context) (err error) { //
// f is called on every reconnection after identification and protocol prologue. XXX
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *neonet.NodeLink) error) (err error) {
defer task.Runningf(&ctx, "talk master(%s)", node.MasterAddr)(&err) defer task.Runningf(&ctx, "talk master(%s)", node.MasterAddr)(&err)
for { for {
err := node.talkMaster1(ctx) err := node.talkMaster1(ctx, f)
log.Warning(ctx, err) // XXX Warning ok? -> Error? log.Warning(ctx, err) // XXX Warning ok? -> Error?
// TODO if err == "reject identification / protocol error" -> shutdown client // TODO if err == "reject identification / protocol error" -> shutdown client
// TODO if err == shutdown -> return // TODO if err == shutdown -> return
...@@ -160,7 +163,7 @@ func (node *_MasteredNode) talkMaster(ctx context.Context) (err error) { ...@@ -160,7 +163,7 @@ func (node *_MasteredNode) talkMaster(ctx context.Context) (err error) {
} }
} }
func (node *_MasteredNode) talkMaster1(ctx context.Context) error { func (node *_MasteredNode) talkMaster1(ctx context.Context, f func(context.Context, *neonet.NodeLink) error) error {
reqID := &proto.RequestIdentification{ reqID := &proto.RequestIdentification{
NodeType: node.myInfo.Type, NodeType: node.myInfo.Type,
NID: node.myInfo.NID, NID: node.myInfo.NID,
...@@ -220,17 +223,9 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) error { ...@@ -220,17 +223,9 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) error {
// XXX update .masterLink + notify waiters // XXX update .masterLink + notify waiters
// RecvM1 <- eventReconnect wg := xsync.NewWorkGroup(ctx)
select {
case <-ctx.Done():
return ctx.Err()
case node.rxm <- _RxM{Err: eventMasterReconnect}:
// ok
}
// receive and handle notifications from master // receive and handle notifications from master
wg.Go(func(ctx context.Context) error {
defer task.Running(&ctx, "rx")(&err) defer task.Running(&ctx, "rx")(&err)
for { for {
req, err := mlink.Recv1() req, err := mlink.Recv1()
...@@ -243,6 +238,13 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) error { ...@@ -243,6 +238,13 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) error {
} }
} }
}) })
// run user code
wg.Go(func(ctx context.Context) error {
return f(ctx, mlink)
})
return wg.Wait()
})
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment