Commit 373ef31b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0dd84253
...@@ -114,8 +114,6 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { ...@@ -114,8 +114,6 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
// Run starts client node and runs it until either ctx is canceled or master // Run starts client node and runs it until either ctx is canceled or master
// commands it to shutdown. (TODO verify M->shutdown) // commands it to shutdown. (TODO verify M->shutdown)
func (c *Client) Run(ctx context.Context) (err error) { func (c *Client) Run(ctx context.Context) (err error) {
// defer task.Running(&ctx, "client")(&err)
// run process which performs master talk // run process which performs master talk
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
c.runCancel = cancel c.runCancel = cancel
...@@ -137,11 +135,11 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -137,11 +135,11 @@ func (c *Client) Run(ctx context.Context) (err error) {
return c.recvMaster(ctx) return c.recvMaster(ctx)
}) })
// init lastTid from master // sync lastTid with master
// TODO better change protocol for master to send us head via notify // TODO better change protocol for master to send us head via notify
// channel right after identification. // channel right after identification.
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
return c.initFromMaster(ctx, mlink) return c.syncMaster(ctx, mlink)
}) })
return wg.Wait() return wg.Wait()
...@@ -344,9 +342,9 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -344,9 +342,9 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
} }
*/ */
// initFromMaster asks M for DB head right after identification. // syncMaster asks M for DB head right after identification.
func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) { func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "init")(&err) defer task.Running(&ctx, "sync")(&err)
// query last_tid // query last_tid
lastTxn := proto.AnswerLastTransaction{} lastTxn := proto.AnswerLastTransaction{}
......
...@@ -244,7 +244,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -244,7 +244,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
wg := xsync.NewWorkGroup(ctx) wg := xsync.NewWorkGroup(ctx)
// receive and handle notifications from master // receive and handle notifications from master
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
defer task.Running(&ctx, "rx")(&err) defer task.Running(&ctx, "rx prefilter")(&err)
for { for {
req, err := mlink.Recv1() req, err := mlink.Recv1()
if err != nil { if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment