Commit 50098921 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 19d95ba9
...@@ -26,8 +26,13 @@ import ( ...@@ -26,8 +26,13 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net/url" "net/url"
"sync"
"time" "time"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
...@@ -40,8 +45,12 @@ import ( ...@@ -40,8 +45,12 @@ import (
type Client struct { type Client struct {
node neo.NodeCommon node neo.NodeCommon
// storLink *neo.NodeLink // link to storage node talkMasterCancel func()
// storConn *neo.Conn // XXX main connection to storage
// link to master - established and maintained by talkMaster
mlinkMu sync.Mutex
mlink *neo.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle
} }
var _ zodb.IStorage = (*Client)(nil) var _ zodb.IStorage = (*Client)(nil)
...@@ -63,25 +72,62 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { ...@@ -63,25 +72,62 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
//NodeTab: &neo.NodeTable{}, //NodeTab: &neo.NodeTable{},
//PartTab: &neo.PartitionTable{}, //PartTab: &neo.PartitionTable{},
}, },
mlinkReady: make(chan struct{}),
} }
// spawn background process which performs master talk // spawn background process which performs master talk
go cli.talkMaster(context.TODO()) // XXX ctx = "client(?)" ctx, cancel := context.WithCancel(context.Background())
cli.talkMasterCancel = cancel
go cli.talkMaster(ctx)
return cli return cli
} }
func (c *Client) Close() error { func (c *Client) Close() error {
panic("TODO") c.talkMasterCancel()
// // NOTE this will abort all currently in-flght IO and close all connections over storLink // XXX wait talkMaster finishes
// err := c.storLink.Close() // XXX what else?
// // XXX also wait for some goroutines to finish ? return nil
// return err
} }
// --- connection with master --- // --- connection with master ---
/*
// mconnected is result of connecting to master
type mconnected struct {
mlink *neo.NodeLink
ready chan struct{}
}
*/
// masterLink returns link to primary master.
//
// NOTE that even if masterLink returns != nil, the master link can become
// non-operational at any later time. (such cases will be reported as
// ErrLinkDown returned by all mlink operations)
func (c *Client) masterLink(ctx context.Context) (*neo.NodeLink, error) {
for {
c.mlinkMu.Lock()
mlink := c.mlink
ready := c.mlinkReady
c.mlinkMu.Unlock()
if mlink != nil {
return mlink, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ready:
// ok - try to relock mlinkMu and read mlink again.
}
}
}
// talkMaster connects to master, announces self and receives notifications. // talkMaster connects to master, announces self and receives notifications.
// it tries to persist master link reconnecting as needed. // it tries to persist master link reconnecting as needed.
// //
...@@ -89,7 +135,7 @@ func (c *Client) Close() error { ...@@ -89,7 +135,7 @@ func (c *Client) Close() error {
// //
// XXX always error (dup Storage.talkMaster) ? // XXX always error (dup Storage.talkMaster) ?
func (c *Client) talkMaster(ctx context.Context) (err error) { func (c *Client) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%v)", c.node.MasterAddr)(&err) defer task.Runningf(&ctx, "client: talk master(%v)", c.node.MasterAddr)(&err)
// XXX dup wrt Storage.talkMaster // XXX dup wrt Storage.talkMaster
for { for {
...@@ -126,20 +172,36 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -126,20 +172,36 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
log.Info(ctx, "identification accepted") log.Info(ctx, "identification accepted")
Mlink := Mconn.Link() Mlink := Mconn.Link()
// set c.mlink and notify waiters
c.mlinkMu.Lock()
c.mlink = Mlink
ready := c.mlinkReady
c.mlinkReady = make(chan struct{})
c.mlinkMu.Unlock()
close(ready)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
// XXX + close Mconn // XXX + close Mconn
defer xio.CloseWhenDone(ctx, Mlink)() defer xio.CloseWhenDone(ctx, Mlink)()
// when we are done - reset .mlink
defer func() {
c.mlinkMu.Lock()
c.mlink = nil
c.mlinkMu.Unlock()
}()
// launch master notifications receiver // launch master notifications receiver
wg.Go(func() error { wg.Go(func() error {
return c.recvMaster(ctx, Mlink) return c.recvMaster(ctx, Mlink)
}) })
// launch process that "drives" master (initiates & tx requests) // init partition table from master
// XXX is this needed or we can expect master sending us pt via notify channel?
wg.Go(func() error { wg.Go(func() error {
return c.ctlMaster(ctx, Mlink) return c.initFromMaster(ctx, Mlink)
}() })
return wg.Wait() return wg.Wait()
...@@ -176,10 +238,10 @@ func (c *Client) recvMaster(ctx context.Context, Mlink *neo.NodeLink) error { ...@@ -176,10 +238,10 @@ func (c *Client) recvMaster(ctx context.Context, Mlink *neo.NodeLink) error {
} }
} }
func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error { func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error {
// ask M for PT // ask M for PT
rpt := neo.AnswerPartitionTable{} rpt := neo.AnswerPartitionTable{}
err = Mlink.Ask1(&neo.AskPartitionTable{}, &rpt) err := Mlink.Ask1(&neo.AskPartitionTable{}, &rpt)
if err != nil { if err != nil {
return err return err
} }
...@@ -189,6 +251,9 @@ func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error { ...@@ -189,6 +251,9 @@ func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error {
// XXX pt -> c.node.PartTab ? // XXX pt -> c.node.PartTab ?
_ = pt _ = pt
/*
XXX don't need in init?
// ask M about last_tid // ask M about last_tid
rlastTxn := neo.AnswerLastTransaction{} rlastTxn := neo.AnswerLastTransaction{}
err = Mlink.Ask1(&neo.LastTransaction{}, &rlastTxn) err = Mlink.Ask1(&neo.LastTransaction{}, &rlastTxn)
...@@ -198,6 +263,7 @@ func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error { ...@@ -198,6 +263,7 @@ func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error {
// XXX lock // XXX lock
// XXX rlastTxn.Tid -> c.lastTid // XXX rlastTxn.Tid -> c.lastTid
*/
// XXX what next? // XXX what next?
return nil return nil
...@@ -207,25 +273,20 @@ func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error { ...@@ -207,25 +273,20 @@ func (c *Client) ctlMaster(ctx context.Context, Mlink *neo.NodeLink) error {
// --- user API calls --- // --- user API calls ---
func (c *Client) LastTid(ctx context.Context) (zodb.Tid, error) { func (c *Client) LastTid(ctx context.Context) (_ zodb.Tid, err error) {
panic("TODO") defer xerr.Context(&err, "client: lastTid")
/*
c.Mlink // XXX check we are connected mlink, err := c.masterLink(ctx)
conn, err := c.Mlink.NewConn()
if err != nil { if err != nil {
// XXX return 0, err
} }
// XXX defer conn.Close
// FIXME do not use global conn (see comment in openClientByURL)
// XXX open new conn for this particular req/reply ?
reply := neo.AnswerLastTransaction{} reply := neo.AnswerLastTransaction{}
err := conn.Ask(&neo.LastTransaction{}, &reply) err = mlink.Ask1(&neo.LastTransaction{}, &reply) // XXX Ask += ctx
if err != nil { if err != nil {
return 0, err // XXX err ctx return 0, err // XXX err ctx
} }
return reply.Tid, nil return reply.Tid, nil
*/
} }
func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) { func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
...@@ -234,6 +295,8 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) { ...@@ -234,6 +295,8 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
} }
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) { func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
// XXX err context (but keep zodb errors intact ?)
// XXX check pt is operational first? -> no if there is no data - we'll // XXX check pt is operational first? -> no if there is no data - we'll
// just won't find ready cell // just won't find ready cell
// //
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment