Commit 87b612aa authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3a9c2cac
......@@ -108,6 +108,8 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
if stor == nil {
panic(0) // XXX
}
// XXX check stor.State == RUNNING
//Slink := c.Connect(stor) // single-flight Dial; puts result into stor.Link (XXX ok?)
//Slink := stor.Connect() // single-flight Dial; puts result into stor.Link (XXX ok?)
Slink := stor.Link // XXX stub
......
......@@ -75,12 +75,120 @@ type NodeTable struct {
//sync.RWMutex XXX needed ?
//storv []*Node // storages
nodev []*Node // all other nodes
nodev []*Node // all other nodes -> *Peer
notifyv []chan NodeInfo // subscribers
//ver int // ↑ for versioning XXX do we need this?
}
// // special error indicating dial is currently in progress
// var errDialInprogress = errors.New("dialing...")
// even if dialing a peer failed, we'll attempt redial after this timeout
const δtRedial = 3 * time.Second
// Peer represents a peer node in the cluster.
type Peer struct {
NodeInfo // .uuid, .addr, ...
// link to this peer
linkMu sync.Mutex
link *NodeLink // link to peer or nil if not connected
// linkErr error // dialing gave this error
dialT time.Time // dialing finished at this time
// linkReady chan struct{} // becomes ready after dial finishes; reinitialized at each redial
dialing *dialReady // dialer notifies waiters via this; reinitialized at each redial; nil while not dialing
}
type dialReady struct {
link *NodeLink
err error
ready chan struct{}
}
// Connect returns link to this peer.
//
// If the link was not yet established Connect dials the peer appropriately,
// handshakes, requests identification and checks that identification reply is
// as expected.
func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) {
// XXX p.State != RUNNING
// XXX p.Addr != ""
p.linkMu.Lock()
// ok if already connected
if link := p.link; link != nil {
p.linkMu.Unlock()
return link, nil
}
// if dial is already in progress - wait for its completion
if dialing := p.dialing; dialing != nil {
p.linkMu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-dialing.ready:
return dialed.link, dialed.err
}
}
// otherwise this goroutine becomes responsible for (re)dialing the peer
dialing = &dialReady{ready: make(chan struct{})}
p.dialing = dialing
// start dialing - in singleflight
p.linkMu.Unlock()
go func() {
// throttle redialing if too fast
δt := time.Now().Sub(dialT)
if δt < δtRedial && !dialT.IsZero() {
select {
case <-ctx.Done():
// XXX -> return nil, ctx.Err()
case <-time.After(δtRedial - δt):
// ok
}
}
conn0, accept, err := Dial(ctx, p.Type, p.Addr)
if err != nil {
// XXX -> return nil, err
}
// XXX accept.NodeType == p.Type
// XXX accept.MyUUID == p.UUID
// XXX accept.YourUUID == (what has been given us by master)
// XXX accept.Num{Partitions,Replicas} == (what is expected - (1,1) currently)
p.link = link
p.linkErr = err
p.dialT = time.Now()
dialing.link = link
dialing.err = err
close(dialing.ready)
}()
<-dialing.ready
return dialing.link, dialing.err
}
//trace:event traceNodeChanged(nt *NodeTable, n *Node)
// Node represents a node entry in NodeTable
......@@ -137,6 +245,7 @@ func (nt *NodeTable) Update(nodeInfo NodeInfo, conn *Conn /*XXX better link *Nod
}
/*
// GetByLink finds node by node-link
// XXX is this a good idea ?
func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
......@@ -148,6 +257,7 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
}
return nil
}
*/
// XXX doc
func (nt *NodeTable) SetNodeState(node *Node, state NodeState) {
......@@ -156,6 +266,7 @@ func (nt *NodeTable) SetNodeState(node *Node, state NodeState) {
nt.notify(node.NodeInfo)
}
/*
// UpdateLinkDown updates information about corresponding to link node and marks it as down
// it returns corresponding node entry for convenience
// XXX is this a good idea ?
......@@ -169,6 +280,7 @@ func (nt *NodeTable) UpdateLinkDown(link *NodeLink) *Node {
nt.SetNodeState(node, DOWN)
return node
}
*/
// StorageList returns list of all storages in node table
......
......@@ -271,7 +271,7 @@ type CloseClient struct {
// connection. Any -> Any.
type RequestIdentification struct {
NodeType NodeType // XXX name
NodeUUID NodeUUID
UUID NodeUUID
Address Address // where requesting node is also accepting connections
ClusterName string
IdTimestamp float64
......@@ -280,10 +280,10 @@ type RequestIdentification struct {
// XXX -> ReplyIdentification? RequestIdentification.Answer somehow ?
type AcceptIdentification struct {
NodeType NodeType // XXX name
MyNodeUUID NodeUUID
MyUUID NodeUUID
NumPartitions uint32 // PNumber
NumReplicas uint32 // PNumber
YourNodeUUID NodeUUID
YourUUID NodeUUID
}
// Ask current primary master's uuid. CTL -> A.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment