Commit e6fbafe8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 620df4c7
......@@ -272,7 +272,7 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error {
// XXX msg.IdTimestamp ?
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "rx node update: %v", nodeInfo)
c.node.NodeTab.Update(nodeInfo, /*XXX conn should not be here*/nil)
c.node.NodeTab.Update(nodeInfo)
}
// FIXME logging under lock
......@@ -399,7 +399,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo
// retry from the beginning if all are found to fail?
stor := storv[rand.Intn(len(storv))]
slink, err := stor.Link()
slink, err := stor.Dial(ctx)
if err != nil {
return nil, 0, err // XXX err ctx
}
......
......@@ -26,6 +26,8 @@ import (
"fmt"
"sync"
"time"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// NodeTable represents known nodes in a cluster.
......@@ -60,28 +62,17 @@ type NodeTable struct {
//sync.RWMutex XXX needed ?
//storv []*Node // storages
nodev []*Node // all other nodes -> *Peer
nodev []*Node // all other nodes
notifyv []chan NodeInfo // subscribers
}
// Len returns N(entries) in the table.
func (nt *NodeTable) Len() int {
return len(nt.nodev)
}
// All returns all entries in the table as one slice.
// XXX -> better iter?
func (nt *NodeTable) All() []*Node {
return nt.nodev
}
// XXX vvv move -> peer.go?
//trace:event traceNodeChanged(nt *NodeTable, n *Node)
// even if dialing a peer failed, we'll attempt redial after this timeout
const δtRedial = 3 * time.Second
// Node represents a peer node in the cluster.
// XXX name as Peer?
type Node struct {
nodeTab *NodeTable // this node is part of
// Peer represents a peer node in the cluster.
type Peer struct {
NodeInfo // .type, .addr, .uuid, ... XXX also protect by mu?
linkMu sync.Mutex
......@@ -107,6 +98,237 @@ type Peer struct {
// connPool []*Conn
}
// Len returns N(entries) in the table.
func (nt *NodeTable) Len() int {
return len(nt.nodev)
}
// All returns all entries in the table as one slice.
// XXX -> better iter?
func (nt *NodeTable) All() []*Node {
return nt.nodev
}
// Get finds node by uuid.
func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.UUID == uuid {
return node
}
}
return nil
}
// XXX GetByAddress ?
// Update updates information about a node.
//
// it returns corresponding node entry for convenience.
func (nt *NodeTable) Update(nodeInfo NodeInfo) *Node {
node := nt.Get(nodeInfo.UUID)
if node == nil {
node = &Node{nodeTab: nt}
nt.nodev = append(nt.nodev, node)
}
node.NodeInfo = nodeInfo
/*
node.Conn = conn
if conn != nil {
node.Link = conn.Link()
}
*/
traceNodeChanged(nt, node)
nt.notify(node.NodeInfo)
return node
}
// StorageList returns list of all storages in node table
func (nt *NodeTable) StorageList() []*Node {
// FIXME linear scan
sl := []*Node{}
for _, node := range nt.nodev {
if node.Type == STORAGE {
sl = append(sl, node)
}
}
return sl
}
/* XXX closing .link on .state = DOWN?
func (p *Peer) SetState(state NodeState) {
// XXX lock?
p.State = state
traceNodeChanged(nt, node)
if state == DOWN {
if p.link != nil {
lclose(ctx, p.link)
p.link = nil
// XXX clear p.connPool
}
}
nt.notify(node.NodeInfo)
}
*/
// XXX doc
func (n *Node) SetState(state NodeState) {
n.State = state
traceNodeChanged(n.nodeTab, n)
n.nodeTab.notify(n.NodeInfo)
}
func (nt *NodeTable) String() string {
buf := bytes.Buffer{}
// XXX also for .storv
for _, n := range nt.nodev {
// XXX recheck output
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", n.UUID, n.Type, n.State, n.Addr)
}
return buf.String()
}
// ---- subscription to nodetab updates ----
// notify notifies NodeTable subscribers that nodeInfo was updated
func (nt *NodeTable) notify(nodeInfo NodeInfo) {
// XXX rlock for .notifyv ?
for _, notify := range nt.notifyv {
notify <- nodeInfo
}
}
// Subscribe subscribes to NodeTable updates.
//
// It returns a channel via which updates will be delivered and function to unsubscribe.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) {
ch = make(chan NodeInfo) // XXX how to specify ch buf size if needed ?
nt.notifyv = append(nt.notifyv, ch)
unsubscribe = func() {
for i, c := range nt.notifyv {
if c == ch {
nt.notifyv = append(nt.notifyv[:i], nt.notifyv[i+1:]...)
close(ch)
return
}
}
panic("XXX unsubscribe not subscribed channel")
}
return ch, unsubscribe
}
// SubscribeBuffered subscribes to NodeTable updates without blocking updater.
//
// It returns a channel via which updates are delivered and unsubscribe function.
// The updates will be sent to destination in non-blocking way - if destination
// channel is not ready they will be buffered.
// It is the caller responsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()) {
in, unsubscribe := nt.Subscribe()
ch = make(chan []NodeInfo)
go func() {
var updatev []NodeInfo
shutdown := false
for {
out := ch
if len(updatev) == 0 {
if shutdown {
// nothing to send and source channel closed
// -> close destination and stop
close(ch)
break
}
out = nil
}
select {
case update, ok := <-in:
if !ok {
shutdown = true
break
}
// FIXME merge updates as same node could be updated several times
updatev = append(updatev, update)
case out <- updatev:
updatev = nil
}
}
}()
return ch, unsubscribe
}
// ---- peer link ----
// SetLink sets link to peer node.
// XXX
//
// See also: Link, CloseLink, Dial.
func (p *Node) SetLink(link *NodeLink) {
// XXX see Link about locking - whether it is needed here or not
p.linkMu.Lock()
p.link = link
p.linkMu.Unlock()
}
// Link returns current link to peer node.
//
// If the link is not yet established - Link returns nil.
//
// See also: Dial.
func (p *Node) Link() *NodeLink {
// XXX do we need lock here?
// XXX usages where Link is used (contrary to Dial) there is no need for lock
p.linkMu.Lock()
link := p.link
p.linkMu.Unlock()
return link
}
// CloseLink closes link to peer and sets it to nil.
func (p *Node) CloseLink(ctx context.Context) {
p.linkMu.Lock()
link := p.link
p.link = nil
p.dialing = nil // XXX what if dialing is in progress?
p.linkMu.Unlock()
if link != nil {
log.Infof(ctx, "%v: closing link", link)
err := link.Close()
if err != nil {
log.Error(ctx, err)
}
}
}
// even if dialing a peer failed, we'll attempt redial after this timeout
const δtRedial = 3 * time.Second
// dialed is result of dialing a peer.
type dialed struct {
link *NodeLink
......@@ -114,18 +336,18 @@ type dialed struct {
ready chan struct{}
}
// Link returns link to peer node.
// Dial returns link to peer node.
//
// If the link was not yet established Link dials the peer appropriately,
// If the link was not yet established Dial dials the peer appropriately,
// handshakes, requests identification and checks that identification reply is
// as expected.
//
// Several Link calls may be done in parallel - in any case only 1 link-level
// Several Dial calls may be done in parallel - in any case only 1 link-level
// dial will be made and others will share established link.
//
// In case Link returns an error - future Link will attempt to reconnect with
// In case Dial returns an error - future Dial will attempt to reconnect with
// "don't reconnect too fast" throttling.
func (p *Peer) Link(ctx context.Context) (*NodeLink, error) {
func (p *Node) Dial(ctx context.Context) (*NodeLink, error) {
p.linkMu.Lock()
// ok if already connected
......@@ -191,13 +413,13 @@ func (p *Peer) Link(ctx context.Context) (*NodeLink, error) {
return dialing.link, dialing.err
}
// Conn returns conn to the peer.
// Conn returns conn to the peer. XXX -> DialConn ?
//
// If there is no link established - conn first dials peer (see Link).
// If there is no link established - conn first dials peer (see Dial).
//
// For established link Conn either creates new connection over the link,
// XXX (currently inactive) or gets one from the pool of unused connections (see PutConn).
func (p *Peer) Conn(ctx context.Context) (*Conn, error) {
func (p *Node) Conn(ctx context.Context) (*Conn, error) {
var err error
/*
......@@ -216,7 +438,7 @@ func (p *Peer) Conn(ctx context.Context) (*Conn, error) {
// we might need to (re)dial
if link == nil {
link, err = p.Link(ctx)
link, err = p.Dial(ctx)
if err != nil {
return nil, err
}
......@@ -235,8 +457,8 @@ func (p *Peer) Conn(ctx context.Context) (*Conn, error) {
func (p *Peer) PutConn(c *Conn) {
p.linkMu.Lock()
// NOTE we can't panic on p.link != c.Link() - reason is: p.link can change on redial
if p.link == c.Link() {
// NOTE we can't panic on p.link != c.Dial() - reason is: p.link can change on redial
if p.link == c.Dial() {
p.connPool = append(p.connPool, c)
}
......@@ -245,9 +467,9 @@ func (p *Peer) PutConn(c *Conn) {
*/
// XXX dial does low-level work to dial peer
// dial does low-level work to dial peer
// XXX p.* reading without lock - ok?
func (p *Peer) dial(ctx context.Context) (*NodeLink, error) {
func (p *Node) dial(ctx context.Context) (*NodeLink, error) {
var me *NodeCommon // XXX temp stub
conn0, accept, err := me.Dial(ctx, p.Type, p.Addr.String())
if err != nil {
......@@ -280,222 +502,3 @@ func (p *Peer) dial(ctx context.Context) (*NodeLink, error) {
return link, err
}
/* XXX closing .link on .state = DOWN?
func (p *Peer) SetState(state NodeState) {
// XXX lock?
p.State = state
traceNodeChanged(nt, node)
if state == DOWN {
if p.link != nil {
lclose(ctx, p.link)
p.link = nil
// XXX clear p.connPool
}
}
nt.notify(node.NodeInfo)
}
*/
//trace:event traceNodeChanged(nt *NodeTable, n *Node)
// Node represents a node entry in NodeTable
type Node struct {
NodeInfo
// XXX have Node point to -> NodeTable?
// XXX decouple vvv from Node ?
// link to this node; =nil if not connected
Link *NodeLink
// XXX not yet sure it is good idea
Conn *Conn // main connection
}
// Get finds node by uuid.
func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.UUID == uuid {
return node
}
}
return nil
}
// XXX GetByAddress ?
// Update updates information about a node.
//
// it returns corresponding node entry for convenience
func (nt *NodeTable) Update(nodeInfo NodeInfo, conn *Conn /*XXX better link *NodeLink*/) *Node {
node := nt.Get(nodeInfo.UUID)
if node == nil {
node = &Node{}
nt.nodev = append(nt.nodev, node)
}
node.NodeInfo = nodeInfo
node.Conn = conn
if conn != nil {
node.Link = conn.Link()
}
traceNodeChanged(nt, node)
nt.notify(node.NodeInfo)
return node
}
/*
// GetByLink finds node by node-link
// XXX is this a good idea ?
func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.Link == link {
return node
}
}
return nil
}
*/
// XXX doc
func (nt *NodeTable) SetNodeState(node *Node, state NodeState) {
node.State = state
traceNodeChanged(nt, node)
nt.notify(node.NodeInfo)
}
/*
// UpdateLinkDown updates information about corresponding to link node and marks it as down
// it returns corresponding node entry for convenience
// XXX is this a good idea ?
func (nt *NodeTable) UpdateLinkDown(link *NodeLink) *Node {
node := nt.GetByLink(link)
if node == nil {
// XXX vvv not good
panic("nodetab: UpdateLinkDown: no corresponding entry")
}
nt.SetNodeState(node, DOWN)
return node
}
*/
// StorageList returns list of all storages in node table
func (nt *NodeTable) StorageList() []*Node {
// FIXME linear scan
sl := []*Node{}
for _, node := range nt.nodev {
if node.Type == STORAGE {
sl = append(sl, node)
}
}
return sl
}
func (nt *NodeTable) String() string {
//nt.RLock() // FIXME -> it must be client
//defer nt.RUnlock()
buf := bytes.Buffer{}
// XXX also for .storv
for _, n := range nt.nodev {
// XXX recheck output
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", n.UUID, n.Type, n.State, n.Addr)
}
return buf.String()
}
// notify notifies NodeTable subscribers that nodeInfo was updated
func (nt *NodeTable) notify(nodeInfo NodeInfo) {
// XXX rlock for .notifyv ?
for _, notify := range nt.notifyv {
notify <- nodeInfo
}
}
// Subscribe subscribes to NodeTable updates.
//
// It returns a channel via which updates will be delivered and function to unsubscribe.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) {
ch = make(chan NodeInfo) // XXX how to specify ch buf size if needed ?
nt.notifyv = append(nt.notifyv, ch)
unsubscribe = func() {
for i, c := range nt.notifyv {
if c == ch {
nt.notifyv = append(nt.notifyv[:i], nt.notifyv[i+1:]...)
close(ch)
return
}
}
panic("XXX unsubscribe not subscribed channel")
}
return ch, unsubscribe
}
// SubscribeBuffered subscribes to NodeTable updates without blocking updater.
//
// It returns a channel via which updates are delivered and unsubscribe function.
// The updates will be sent to destination in non-blocking way - if destination
// channel is not ready they will be buffered.
// It is the caller responsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()) {
in, unsubscribe := nt.Subscribe()
ch = make(chan []NodeInfo)
go func() {
var updatev []NodeInfo
shutdown := false
for {
out := ch
if len(updatev) == 0 {
if shutdown {
// nothing to send and source channel closed
// -> close destination and stop
close(ch)
break
}
out = nil
}
select {
case update, ok := <-in:
if !ok {
shutdown = true
break
}
// FIXME merge updates as same node could be updated several times
updatev = append(updatev, update)
case out <- updatev:
updatev = nil
}
}
}()
return ch, unsubscribe
}
......@@ -176,7 +176,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
}
// update nodeTab with self
m.node.NodeTab.Update(m.node.MyInfo, nil /*XXX ok? we are not connecting to self*/)
m.node.NodeTab.Update(m.node.MyInfo)
// accept incoming connections and pass them to main driver
......@@ -368,13 +368,8 @@ loop:
log.Error(ctx, r.err)
if !xcontext.Canceled(errors.Cause(r.err)) {
// XXX dup wrt vvv (loop2)
log.Infof(ctx, "%v: closing link", r.stor.Link)
// close stor link / update .nodeTab
lclose(ctx, r.stor.Link)
// r.stor.SetState(neo.DOWN)
m.node.NodeTab.SetNodeState(r.stor, neo.DOWN)
r.stor.CloseLink(ctx)
r.stor.SetState(neo.DOWN)
}
} else {
......@@ -457,12 +452,8 @@ loop2:
log.Error(ctx, r.err)
if !xcontext.Canceled(errors.Cause(r.err)) {
// XXX -> r.stor.CloseLink(ctx) ?
log.Infof(ctx, "%v: closing link", r.stor.Link)
// close stor link / update .nodeTab
lclose(ctx, r.stor.Link)
m.node.NodeTab.SetNodeState(r.stor, neo.DOWN)
r.stor.CloseLink(ctx)
r.stor.SetState(neo.DOWN)
}
case <-done:
......@@ -480,7 +471,7 @@ loop2:
// XXX recheck logic is ok for when starting existing cluster
for _, stor := range m.node.NodeTab.StorageList() {
if stor.State == neo.PENDING {
m.node.NodeTab.SetNodeState(stor, neo.RUNNING)
stor.SetState(neo.RUNNING)
}
}
......@@ -513,28 +504,19 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
// on error provide feedback to storRecovery chan
res <- storRecovery{stor: stor, err: err}
}()
defer task.Runningf(&ctx, "%s: stor recovery", stor.Link.RemoteAddr())(&err)
conn := stor.Conn
// conn, err := stor.Link.NewConn()
// if err != nil {
// return
// }
// defer func() {
// err2 := conn.Close()
// err = xerr.First(err, err2)
// }()
slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor recovery", slink.RemoteAddr())(&err)
// XXX cancel on ctx
recovery := neo.AnswerRecovery{}
err = conn.Ask(&neo.Recovery{}, &recovery)
err = slink.Ask1(&neo.Recovery{}, &recovery)
if err != nil {
return
}
resp := neo.AnswerPartitionTable{}
err = conn.Ask(&neo.AskPartitionTable{}, &resp)
err = slink.Ask1(&neo.AskPartitionTable{}, &resp)
if err != nil {
return
}
......@@ -621,7 +603,7 @@ loop:
}()
case n := <-m.nodeLeave:
m.node.NodeTab.SetNodeState(n.node, neo.DOWN)
n.node.SetState(neo.DOWN)
// if cluster became non-operational - we cancel verification
if !m.node.PartTab.OperationalWith(m.node.NodeTab) {
......@@ -643,12 +625,8 @@ loop:
log.Error(ctx, v.err)
if !xcontext.Canceled(errors.Cause(v.err)) {
// XXX dup wrt recovery ^^^
log.Infof(ctx, "%s: closing link", v.stor.Link)
// mark storage as non-working in nodeTab
lclose(ctx, v.stor.Link)
m.node.NodeTab.SetNodeState(v.stor, neo.DOWN)
v.stor.CloseLink(ctx)
v.stor.SetState(neo.DOWN)
}
// check partTab is still operational
......@@ -696,11 +674,8 @@ loop2:
log.Error(ctx, v.err)
if !xcontext.Canceled(errors.Cause(v.err)) {
log.Infof(ctx, "%v: closing link", v.stor.Link)
// close stor link / update .nodeTab
lclose(ctx, v.stor.Link)
m.node.NodeTab.SetNodeState(v.stor, neo.DOWN)
v.stor.CloseLink(ctx)
v.stor.SetState(neo.DOWN)
}
case <-done:
......@@ -730,12 +705,11 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
res <- storVerify{stor: stor, err: err}
}
}()
defer task.Runningf(&ctx, "%s: stor verify", stor.Link)(&err)
conn := stor.Conn
slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor verify", slink)(&err)
// send just recovered parttab so storage saves it
err = conn.Send(&neo.NotifyPartitionTable{
err = slink.Send1(&neo.NotifyPartitionTable{
PTid: pt.PTid,
RowList: pt.Dump(),
})
......@@ -744,7 +718,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
}
locked := neo.AnswerLockedTransactions{}
err = conn.Ask(&neo.LockedTransactions{}, &locked)
err = slink.Ask1(&neo.LockedTransactions{}, &locked)
if err != nil {
return
}
......@@ -756,7 +730,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
}
last := neo.AnswerLastIDs{}
err = conn.Ask(&neo.LastIDs{}, &last)
err = slink.Ask1(&neo.LastIDs{}, &last)
if err != nil {
return
}
......@@ -850,7 +824,7 @@ loop:
// XXX who sends here?
case n := <-m.nodeLeave:
m.node.NodeTab.SetNodeState(n.node, neo.DOWN)
n.node.SetState(neo.DOWN)
// if cluster became non-operational - cancel service
if !m.node.PartTab.OperationalWith(m.node.NodeTab) {
......@@ -884,15 +858,14 @@ loop:
// storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *neo.Node) (err error) {
defer task.Runningf(&ctx, "%s: stor service", stor.Link.RemoteAddr())(&err)
conn := stor.Conn
slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err)
// XXX send nodeTab ?
// XXX send clusterInformation ?
ready := neo.NotifyReady{}
err = conn.Ask(&neo.StartOperation{Backup: false}, &ready)
err = slink.Ask1(&neo.StartOperation{Backup: false}, &ready)
if err != nil {
return err
}
......@@ -915,12 +888,11 @@ func storCtlService(ctx context.Context, stor *neo.Node) (err error) {
// serveClient serves incoming client link
func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) {
defer task.Runningf(&ctx, "%s: client service", cli.Link.RemoteAddr())(&err)
clink := cli.Link()
defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err)
wg, ctx := errgroup.WithContext(ctx)
clink := cli.Link
defer xio.CloseWhenDone(ctx, clink)()
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.CloseLink?
// M -> C notifications about cluster state
wg.Go(func() error {
......@@ -1128,7 +1100,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
IdTimestamp: m.monotime(),
}
node = m.node.NodeTab.Update(nodeInfo, n.conn) // NOTE this notifies all nodeTab subscribers
node = m.node.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers
node.SetLink(n.conn.Link())
return node, accept
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment