Commit 641b1326 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7864d9cb
......@@ -295,7 +295,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
}()
// Retrieve storages we might need to access.
storv := make([]*xneo.Node, 0, 1)
storv := make([]*xneo.PeerNode, 0, 1)
err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error {
for _, cell := range cs.PartTab.Get(xid.Oid) {
if cell.Readable() {
......
......@@ -269,16 +269,16 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// - start is also allowed if storages connected and say there is no partition
// table saved to them (empty new cluster case).
// storRecovery is result of 1 storage node passing recovery phase
// storRecovery is result of 1 storage node passing recovery phase.
type storRecovery struct {
stor *xneo.Node
stor *xneo.PeerNode
partTab *xneo.PartitionTable
err error
// XXX + backup_tid, truncate_tid ?
}
// recovery drives cluster during recovery phase
// recovery drives cluster during recovery phase.
//
// when recovery finishes error indicates:
// - nil: recovery was ok and a command came for cluster to start
......@@ -460,7 +460,7 @@ loop2:
// if we are starting for new cluster - create partition table
if m.node.PartTab.PTid == 0 {
// XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*xneo.Node{}
storv := []*xneo.PeerNode{}
for _, stor := range m.node.NodeTab.StorageList() {
if stor.State > proto.DOWN {
storv = append(storv, stor)
......@@ -474,9 +474,9 @@ loop2:
return nil
}
// storCtlRecovery drives a storage node during cluster recovering state
// storCtlRecovery drives a storage node during cluster recovering state.
// it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, stor *xneo.Node, res chan storRecovery) {
func storCtlRecovery(ctx context.Context, stor *xneo.PeerNode, res chan storRecovery) {
var err error
defer func() {
if err == nil {
......@@ -526,7 +526,7 @@ var errClusterDegraded = stderrors.New("cluster became non-operatonal")
// - once we are done without losing too much storages in the process (so that
// partition table is still operational) we are ready to enter servicing state.
// verify drives cluster during verification phase
// verify drives cluster during verification phase.
//
// when verify finishes error indicates:
// - nil: verification completed ok; cluster is ready to enter running state
......@@ -671,16 +671,16 @@ loop2:
return err
}
// storVerify is result of a storage node passing verification phase
// storVerify is result of a storage node passing verification phase.
type storVerify struct {
stor *xneo.Node
stor *xneo.PeerNode
lastOid zodb.Oid
lastTid zodb.Tid
err error
}
// storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, stor *xneo.Node, pt *xneo.PartitionTable, res chan storVerify) {
// storCtlVerify drives a storage node during cluster verifying (= starting) state.
func storCtlVerify(ctx context.Context, stor *xneo.PeerNode, pt *xneo.PartitionTable, res chan storVerify) {
// XXX link.Close on err -> = xcontext.WithCloseOnErrCancel
// XXX cancel on ctx -> = ^^^
......@@ -736,13 +736,13 @@ func storCtlVerify(ctx context.Context, stor *xneo.Node, pt *xneo.PartitionTable
//
// TODO also plan data movement on new storage nodes appearing
// serviceDone is the error returned after service-phase node handling is finished
// serviceDone is the error returned after service-phase node handling is finished.
type serviceDone struct {
node *xneo.Node
node *xneo.PeerNode
err error
}
// service drives cluster during running state
// service drives cluster during running state.
//
// TODO document error meanings on return
//
......@@ -845,7 +845,7 @@ loop:
}
// storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *xneo.Node) (err error) {
func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err)
......@@ -891,8 +891,8 @@ func storCtlService(ctx context.Context, stor *xneo.Node) (err error) {
}
}
// serveClient serves incoming client link
func (m *Master) serveClient(ctx context.Context, cli *xneo.Node) (err error) {
// serveClient serves incoming client link.
func (m *Master) serveClient(ctx context.Context, cli *xneo.PeerNode) (err error) {
clink := cli.Link()
defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err)
......@@ -927,7 +927,7 @@ func (m *Master) serveClient(ctx context.Context, cli *xneo.Node) (err error) {
return wg.Wait()
}
// serveClient1 prepares response for 1 request from client
// serveClient1 prepares response for 1 request from client.
func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Msg) {
switch req := req.(type) {
case *proto.LastTransaction:
......@@ -941,7 +941,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// ----------------------------------------
// keepPeerUpdated sends cluster state updates to peer on the link
// keepPeerUpdated sends cluster state updates to peer on the link.
func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (err error) {
// link should be already in parent ctx (XXX and closed on cancel ?)
defer task.Runningf(&ctx, "keep updated")(&err)
......@@ -1033,7 +1033,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification.
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.Node, resp proto.Msg) {
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, resp proto.Msg) {
// XXX also verify ? :
// - NodeType valid
// - IdTime ?
......@@ -1117,7 +1117,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.Node, res
return node, accept
}
// allocNID allocates new node ID for a node of kind nodeType
// allocNID allocates new node ID for a node of kind nodeType.
// XXX it is bad idea for master to assign node ID to coming node
// -> better nodes generate really unique UUID themselves and always show with them
func (m *Master) allocNID(nodeType proto.NodeType) proto.NodeID {
......
......@@ -131,7 +131,7 @@ func (t *TraceCollector) traceClusterState(cs *proto.ClusterState) {
t.rx.RxEvent(&eventClusterState{where, *cs})
}
func (t *TraceCollector) traceNode(nt *xneo.NodeTable, n *xneo.Node) {
func (t *TraceCollector) traceNode(nt *xneo.NodeTable, n *xneo.PeerNode) {
//t.rx.RxEvent(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
where := t.nodeTab2Owner[nt]
t.rx.RxEvent(&eventNodeTab{where, n.NodeInfo})
......
......@@ -35,11 +35,17 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/xio"
)
// NodeTable represents known nodes in a cluster.
// TODO
//
// - Node <- instead of NodeApp
// + PeerNode <- instead of Node
// - NodeTable <- remains + comment that each entry is "node information" + link
// NodeTable represents known nodes in a cluster. XXX + "containing" parent node
//
// It is
//
// NID -> *Node ; = (.laddr, .state, ...) + .link
// NID -> *PeerNode ; = (.laddr, .state, ...) + .link
//
// mapping listing known nodes and associating their node ID with information
// about a node.
......@@ -65,20 +71,19 @@ import (
//
// XXX users have to care locking explicitly
type NodeTable struct {
// XXX for Node.Dial to work. see also comments vvv near "peer link"
// XXX for PeerNode.Dial to work. see also comments vvv near "peer link"
// XXX move pointer to local node to PeerNode instead?
nodeApp *NodeApp
nodev []*Node // all nodes
nodev []*PeerNode // all nodes
notifyv []chan proto.NodeInfo // subscribers
}
//trace:event traceNodeChanged(nt *NodeTable, n *Node)
//trace:event traceNodeChanged(nt *NodeTable, n *PeerNode)
// Node represents a peer node in the cluster.
//
// XXX name as Peer? PeerNode?
type Node struct {
nodeTab *NodeTable // this node is part of
// PeerNode represents a peer node in the cluster.
type PeerNode struct {
nodeTab *NodeTable // this node is part of // XXX -> change to `local *Node` ?
proto.NodeInfo // (.type, .laddr, .nid, .state, .idtime) XXX also protect by mu?
......@@ -108,13 +113,12 @@ func (nt *NodeTable) Len() int {
}
// All returns all entries in the table as one slice.
// XXX -> better iter?
func (nt *NodeTable) All() []*Node {
func (nt *NodeTable) All() []*PeerNode {
return nt.nodev
}
// Get finds node by node ID.
func (nt *NodeTable) Get(nid proto.NodeID) *Node {
func (nt *NodeTable) Get(nid proto.NodeID) *PeerNode {
// FIXME linear scan
for _, node := range nt.nodev {
if node.NID == nid {
......@@ -129,20 +133,14 @@ func (nt *NodeTable) Get(nid proto.NodeID) *Node {
// Update updates information about a node.
//
// it returns corresponding node entry for convenience.
func (nt *NodeTable) Update(nodeInfo proto.NodeInfo) *Node {
func (nt *NodeTable) Update(nodeInfo proto.NodeInfo) *PeerNode {
node := nt.Get(nodeInfo.NID)
if node == nil {
node = &Node{nodeTab: nt}
node = &PeerNode{nodeTab: nt}
nt.nodev = append(nt.nodev, node)
}
node.NodeInfo = nodeInfo
/*
node.Conn = conn
if conn != nil {
node.Link = conn.Link()
}
*/
// XXX close link if .state becomes DOWN ?
......@@ -152,13 +150,13 @@ func (nt *NodeTable) Update(nodeInfo proto.NodeInfo) *Node {
return node
}
// StorageList returns list of all storages in node table
func (nt *NodeTable) StorageList() []*Node {
// StorageList returns list of all storages in the node table.
func (nt *NodeTable) StorageList() []*PeerNode {
// FIXME linear scan
sl := []*Node{}
for _, node := range nt.nodev {
if node.Type == proto.STORAGE {
sl = append(sl, node)
sl := []*PeerNode{}
for _, nodev := range nt.nodev {
if nodev.Type == proto.STORAGE {
sl = append(sl, nodev)
}
}
return sl
......@@ -166,7 +164,7 @@ func (nt *NodeTable) StorageList() []*Node {
// XXX doc
func (n *Node) SetState(state proto.NodeState) {
func (n *PeerNode) SetState(state proto.NodeState) {
n.State = state
traceNodeChanged(n.nodeTab, n)
n.nodeTab.notify(n.NodeInfo)
......@@ -179,6 +177,7 @@ func (nt *NodeTable) String() string {
for _, n := range nt.nodev {
// XXX recheck output
// XXX +link ?
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\t@ %s\n", n.NID, n.Type, n.State, n.Addr, n.IdTime)
}
......@@ -283,7 +282,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe
// XXX
//
// See also: Link, ResetLink, Dial.
func (p *Node) SetLink(link *neonet.NodeLink) {
func (p *PeerNode) SetLink(link *neonet.NodeLink) {
// XXX see Link about locking - whether it is needed here or not
p.linkMu.Lock()
p.link = link
......@@ -295,7 +294,7 @@ func (p *Node) SetLink(link *neonet.NodeLink) {
// If the link is not yet established - Link returns nil.
//
// See also: Dial.
func (p *Node) Link() *neonet.NodeLink {
func (p *PeerNode) Link() *neonet.NodeLink {
// XXX do we need lock here?
// XXX usages where Link is used (contrary to Dial) there is no need for lock
p.linkMu.Lock()
......@@ -305,7 +304,7 @@ func (p *Node) Link() *neonet.NodeLink {
}
// ResetLink closes link to peer and sets it to nil.
func (p *Node) ResetLink(ctx context.Context) {
func (p *PeerNode) ResetLink(ctx context.Context) {
p.linkMu.Lock()
link := p.link
p.link = nil
......@@ -324,7 +323,7 @@ func (p *Node) ResetLink(ctx context.Context) {
// dial does low-level work to dial peer
// XXX p.* reading without lock - ok?
// XXX app.MyInfo without lock - ok?
func (p *Node) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
func (p *PeerNode) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
defer task.Runningf(&ctx, "connect %s", p.NID)(&err) // XXX "connect" good word here?
app := p.nodeTab.nodeApp
......@@ -343,7 +342,7 @@ func (p *Node) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
case accept.YourNID != app.MyInfo.NID:
err = fmt.Errorf("connected, but peer gives us nid %v (our is %v)", accept.YourNID, app.MyInfo.NID)
// XXX Node.Dial is currently used by Client only.
// XXX PeerNode.Dial is currently used by Client only.
// XXX For Client it would be not correct to check #partition only at
// XXX connection time, but it has to be also checked after always as every
// XXX operation could coincide with cluster reconfiguration.
......@@ -386,7 +385,7 @@ type dialed struct {
//
// In case Dial returns an error - future Dial will attempt to reconnect with
// "don't reconnect too fast" throttling.
func (p *Node) Dial(ctx context.Context) (*neonet.NodeLink, error) {
func (p *PeerNode) Dial(ctx context.Context) (*neonet.NodeLink, error) {
p.linkMu.Lock()
// ok if already connected
......
......@@ -159,7 +159,8 @@ func (c *Cell) Readable() bool {
// MakePartTab creates new partition with uniformly distributed nodes
// The partition table created will be of len=np
// FIXME R=1 hardcoded
func MakePartTab(np int, nodev []*Node) *PartitionTable {
// XXX nodev -> []NodeInfo ?
func MakePartTab(np int, nodev []*PeerNode) *PartitionTable {
// XXX stub, not tested
tab := make([][]Cell, np)
for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) {
......
......@@ -8,32 +8,32 @@ import (
"unsafe"
)
// traceevent: traceNodeChanged(nt *NodeTable, n *Node)
// traceevent: traceNodeChanged(nt *NodeTable, n *PeerNode)
type _t_traceNodeChanged struct {
tracing.Probe
probefunc func(nt *NodeTable, n *Node)
probefunc func(nt *NodeTable, n *PeerNode)
}
var _traceNodeChanged *_t_traceNodeChanged
func traceNodeChanged(nt *NodeTable, n *Node) {
func traceNodeChanged(nt *NodeTable, n *PeerNode) {
if _traceNodeChanged != nil {
_traceNodeChanged_run(nt, n)
}
}
func _traceNodeChanged_run(nt *NodeTable, n *Node) {
func _traceNodeChanged_run(nt *NodeTable, n *PeerNode) {
for p := _traceNodeChanged; p != nil; p = (*_t_traceNodeChanged)(unsafe.Pointer(p.Next())) {
p.probefunc(nt, n)
}
}
func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n *Node)) *tracing.Probe {
func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n *PeerNode)) *tracing.Probe {
p := _t_traceNodeChanged{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceNodeChanged)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_a393ecf34683256731eab893a4d035f1326c103e() {}
func _trace_exporthash_703f7fc2f10119b6979d924e79f1bb7862c52ced() {}
......@@ -40,10 +40,10 @@ func proto_traceClusterStateChanged_Attach(*tracing.ProbeGroup, func(cs *proto.C
// traceimport: "lab.nexedi.com/kirr/neo/go/neo/xneo"
// rerun "gotrace gen" if you see link failure ↓↓↓
//go:linkname xneo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo/xneo._trace_exporthash_a393ecf34683256731eab893a4d035f1326c103e
//go:linkname xneo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo/xneo._trace_exporthash_703f7fc2f10119b6979d924e79f1bb7862c52ced
func xneo_trace_exporthash()
func init() { xneo_trace_exporthash() }
//go:linkname xneo_traceNodeChanged_Attach lab.nexedi.com/kirr/neo/go/neo/xneo.traceNodeChanged_Attach
func xneo_traceNodeChanged_Attach(*tracing.ProbeGroup, func(nt *xneo.NodeTable, n *xneo.Node)) *tracing.Probe
func xneo_traceNodeChanged_Attach(*tracing.ProbeGroup, func(nt *xneo.NodeTable, n *xneo.PeerNode)) *tracing.Probe
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment