Commit 250c5485 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b16ca531
......@@ -193,7 +193,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
// XXX dup from Server.talkMaster1
// XXX put logging into Dial?
log.Info(ctx, "connecting ...")
Mconn, accept, err := c.node.Dial(ctx, neo.MASTER, c.node.MasterAddr)
mlink, accept, err := c.node.Dial(ctx, neo.MASTER, c.node.MasterAddr)
if err != nil {
// FIXME it is not only identification - e.g. ECONNREFUSED
log.Info(ctx, "identification rejected") // XXX ok here? (err is logged above)
......@@ -203,11 +203,10 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
_ = accept // XXX
log.Info(ctx, "identification accepted")
Mlink := Mconn.Link()
// set c.mlink and notify waiters
c.mlinkMu.Lock()
c.mlink = Mlink
c.mlink = mlink
ready := c.mlinkReady
c.mlinkReady = make(chan struct{})
c.mlinkMu.Unlock()
......@@ -215,8 +214,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
wg, ctx := errgroup.WithContext(ctx)
// XXX + close Mconn
defer xio.CloseWhenDone(ctx, Mlink)()
defer xio.CloseWhenDone(ctx, mlink)()
// when we are done - reset .mlink
defer func() {
......@@ -227,13 +225,13 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
// launch master notifications receiver
wg.Go(func() error {
return c.recvMaster(ctx, Mlink)
return c.recvMaster(ctx, mlink)
})
// init partition table from master
// XXX is this needed at all or we can expect master sending us pt via notify channel?
wg.Go(func() error {
return c.initFromMaster(ctx, Mlink)
return c.initFromMaster(ctx, mlink)
})
return wg.Wait()
......@@ -308,10 +306,10 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error {
}
}
func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error {
func (c *Client) initFromMaster(ctx context.Context, mlink *neo.NodeLink) error {
// ask M for PT
rpt := neo.AnswerPartitionTable{}
err := Mlink.Ask1(&neo.AskPartitionTable{}, &rpt)
err := mlink.Ask1(&neo.AskPartitionTable{}, &rpt)
if err != nil {
return err
}
......@@ -326,7 +324,7 @@ func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error
// ask M about last_tid
rlastTxn := neo.AnswerLastTransaction{}
err = Mlink.Ask1(&neo.LastTransaction{}, &rlastTxn)
err = mlink.Ask1(&neo.LastTransaction{}, &rlastTxn)
if err != nil {
return err
}
......@@ -338,7 +336,7 @@ func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error
// XXX what next?
return nil
// TODO transaction control? -> better in original goroutines doing the txn (just share Mlink)
// TODO transaction control? -> better in original goroutines doing the txn (just share mlink)
}
// --- user API calls ---
......
......@@ -35,6 +35,7 @@ import (
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/zodb"
)
......@@ -70,28 +71,23 @@ type NodeApp struct {
// Dial connects to another node in the cluster
//
// It handshakes, requests identification and checks peer type. If successful returned are:
// - primary link connection which carried identification
// - established link
// - accept identification reply
func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *Conn, _ *AcceptIdentification, err error) {
func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *NodeLink, _ *AcceptIdentification, err error) {
link, err := DialLink(ctx, n.Net, addr)
if err != nil {
return nil, nil, err
}
defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error return
// FIXME also close link on ctx cancel -> xcontext.WhenDone()
// close link on error or ctx cancel
cleanup := xio.CloseWhenDone(ctx, link)
defer func() {
if err != nil {
link.Close()
cleanup()
}
}()
conn, err := link.NewConn()
if err != nil {
return nil, nil, err
}
req := &RequestIdentification{
NodeType: n.MyInfo.Type,
UUID: n.MyInfo.UUID,
......@@ -100,11 +96,14 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *
IdTimestamp: n.MyInfo.IdTimestamp, // XXX ok?
}
accept := &AcceptIdentification{}
err = conn.Ask(req, accept)
// FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock)
err = link.Ask1(req, accept)
if err != nil {
return nil, nil, err
}
// XXX vvv move out of here (e.g. to DialPeer) if we are not checking everthing in full here?
if accept.NodeType != peerType {
// XXX send Error to peer?
return nil, nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", peerType, accept.NodeType)
......@@ -114,7 +113,7 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *
// XXX accept.YourUUID // XXX M can tell us to change UUID -> take in effect
// XXX accept.NumPartitions, ... wrt n.node.PartTab
return conn, accept, nil
return link, accept, nil
}
......
......@@ -453,13 +453,11 @@ func (p *Peer) PutConn(c *Conn) {
// XXX p.* reading without lock - ok?
func (p *Node) dial(ctx context.Context) (*NodeLink, error) {
var me *NodeApp // XXX bad -> crashes
conn0, accept, err := me.Dial(ctx, p.Type, p.Addr.String())
link, accept, err := me.Dial(ctx, p.Type, p.Addr.String())
if err != nil {
return nil, err
}
link := conn0.Link()
// verify peer identifies as what we expect
// XXX move to Dial?
switch {
......
......@@ -176,7 +176,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX dup in Client.talkMaster1
// XXX put logging into Dial?
log.Info(ctx, "connecting ...")
Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr)
mlink, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr)
if err != nil {
// FIXME it is not only identification - e.g. ECONNREFUSED
log.Info(ctx, "identification rejected") // XXX ok here? (err is logged above)
......@@ -184,7 +184,6 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
}
log.Info(ctx, "identification accepted")
mlink := Mconn.Link()
defer xio.CloseWhenDone(ctx, mlink)()
// XXX add master UUID -> nodeTab ? or master will notify us with it himself ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment