Commit 19d95ba9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 48c864b0
......@@ -126,27 +126,28 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
log.Info(ctx, "identification accepted")
Mlink := Mconn.Link()
wg, ctx := errgroup.WithContext(ctx)
// XXX + close Mconn
defer xio.CloseWhenDone(ctx, Mlink)()
// XXX .nodeTab.Reset()
// launch master notifications receiver
wg.Go(func() error {
return c.recvMaster(ctx, Mlink)
})
rpt := neo.AnswerPartitionTable{}
err = Mlink.Ask1(&neo.AskPartitionTable{}, &rpt)
if err != nil {
// XXX
}
// launch process that "drives" master (initiates & tx requests)
wg.Go(func() error {
return c.ctlMaster(ctx, Mlink)
}()
pt := neo.PartTabFromDump(rpt.PTid, rpt.RowList)
// XXX pt -> c.node.PartTab ?
_ = pt
return wg.Wait()
rlastTxn := neo.AnswerLastTransaction{}
err = Mlink.Ask1(&neo.LastTransaction{}, &rlastTxn)
if err != nil {
// XXX
}
}
// XXX rlastTxn.Tid -> c.lastTid
// recvMaster receives and handles notifications from master
func (c *Client) recvMaster(ctx context.Context, Mlink *neo.NodeLink) error {
// XXX .nodeTab.Reset()
for {
req, err := Mlink.Recv1()
......@@ -175,6 +176,35 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
}
}
func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error {
// ask M for PT
rpt := neo.AnswerPartitionTable{}
err = Mlink.Ask1(&neo.AskPartitionTable{}, &rpt)
if err != nil {
return err
}
// XXX lock
pt := neo.PartTabFromDump(rpt.PTid, rpt.RowList)
// XXX pt -> c.node.PartTab ?
_ = pt
// ask M about last_tid
rlastTxn := neo.AnswerLastTransaction{}
err = Mlink.Ask1(&neo.LastTransaction{}, &rlastTxn)
if err != nil {
return err
}
// XXX lock
// XXX rlastTxn.Tid -> c.lastTid
// XXX what next?
return nil
// TODO transaction control? -> better in original goroutines doing the txn (just share Mlink)
}
// --- user API calls ---
func (c *Client) LastTid(ctx context.Context) (zodb.Tid, error) {
......
......@@ -74,7 +74,7 @@ func (n *NodeCommon) Dial(ctx context.Context, peerType NodeType, addr string) (
defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error return
// FIXME also close link on ctx cancel
// FIXME also close link on ctx cancel -> xcontext.WhenDone()
defer func() {
if err != nil {
link.Close()
......@@ -104,8 +104,9 @@ func (n *NodeCommon) Dial(ctx context.Context, peerType NodeType, addr string) (
return nil, nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", peerType, accept.NodeType)
}
//accept.MyNodeUUID, link // XXX register .NodeTab? (or better LinkTab as NodeTab is driven by M)
//accept.YourNodeUUID // XXX M can tell us to change UUID -> take in effect
// XXX accept.MyUUID, link // XXX register .NodeTab? (or better LinkTab as NodeTab is driven by M)
// XXX accept.YourUUID // XXX M can tell us to change UUID -> take in effect
// XXX accept.NumPartitions, ... wrt n.node.PartTab
return conn, accept, nil
}
......
......@@ -172,8 +172,8 @@ func Canceled(err error) bool {
}
// WhenDone arranges f to be called either when ctx is cancelled or surrounding
// function returns.
// WhenDone arranges for f to be called either when ctx is cancelled or
// surrounding function returns.
//
// To work as intended it should be called under defer like this:
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment