Commit e6e0ecb9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2fe63ef2
......@@ -33,9 +33,8 @@ import (
"sync"
"time"
// "golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/neo/internal/xtime"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
......@@ -52,11 +51,17 @@ import (
type Master struct {
node *xneo.Node
// main Runs under runCtx
runCtx context.Context
// master manages node and partition tables and broadcast their updates
// to all connected nodes. δnodeTab/δpartTab updates are proxied to
// a peer by per-peer goroutine reading from .notifyTab[peer.nid] channel.
notifyWG sync.WaitGroup
notifyTab map[proto.NodeID]chan _ΔClusterState // XXX naming -> δStateNotifyTab?
notifyWG sync.WaitGroup // XXX -> runWG ?
// notifyTab map[proto.NodeID]chan _ΔClusterState // XXX -> struct peerWG{.wg, .notifyq} ?
// XXX ^^^ -> peerTab ? XXX make it part of .nodeTab through PeerNode.private?
// XXX ^^^ -> peerWorkTab ?
peerWorkTab map[proto.NodeID]*peerWork
// last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ?
......@@ -70,8 +75,8 @@ type Master struct {
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ?
// channels from workers directly serving peers to main driver
nodeCome chan nodeCome // node connected XXX -> acceptq?
// nodeLeave chan nodeLeave // node disconnected XXX -> don't need ?
nodeComeq chan nodeCome // node connected XXX -> acceptq?
// nodeLeaveq chan nodeLeave // node disconnected XXX -> don't need ?
// so tests could override
monotime func() float64
......@@ -90,6 +95,16 @@ type nodeLeave struct {
}
*/
// peerWork represents context for all tasks related to one peer.
type peerWork struct {
// all tasks are spawned under wg. If any task fails - whole wg is canceled.
wg *xsync.WorkGroup
// snapshot of nodeTab/partTab/stateCode when peer was accepted by main.
state0 *xneo.ClusterStateSnapshot
// main sends δnodeTab/δpartTab/δstateCode to notifyq.
notifyq chan _ΔClusterState
}
// _ΔClusterState represents δnodeTab/δpartTab/δClusterState.
type _ΔClusterState interface { δClusterState() }
type _ΔNodeTab struct {
......@@ -118,7 +133,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master {
ctlStop: make(chan chan struct{}),
ctlShutdown: make(chan chan error),
nodeCome: make(chan nodeCome),
nodeComeq: make(chan nodeCome),
// nodeLeave: make(chan nodeLeave),
monotime: xtime.Mono,
......@@ -169,8 +184,13 @@ func (m *Master) setClusterState(state proto.ClusterState) {
//
// The master will be serving incoming connections on l.
func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // so that .runCtx is canceled if we return due to an error
addr := l.Addr()
defer task.Runningf(&ctx, "master(%v)", addr)(&err)
m.runCtx = ctx
// update our master & serving address in node
naddr, err := proto.Addr(addr)
......@@ -230,7 +250,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
// handover to main driver
select {
case m.nodeCome <- nodeCome{req, idReq}:
case m.nodeComeq <- nodeCome{req, idReq}:
// ok
case <-ctx.Done():
......@@ -354,7 +374,7 @@ loop:
for {
select {
// new connection comes in
case n := <-m.nodeCome:
case n := <-m.nodeComeq:
// FIXME if identify=ok -> subscribe to δ(nodeTab) and send initial nodeTab right after accept (accept should do it?)
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
......@@ -369,6 +389,15 @@ loop:
go func() {
defer wg.Done()
/*
// start recovery
var pt *xneo.PartitionTable
err := acceptAndRun(func(node *_PeerNode) error {
pt, err = storCtlRecovery(ctx, node)
})
recevery <- storRecovery{stor: node, partTab: pt, err: err}
*/
err := m.accept(node, state0, n.req, resp)
if err != nil {
recovery <- storRecovery{stor: node, err: err}
......@@ -608,7 +637,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
loop:
for inprogress > 0 {
select {
case n := <-m.nodeCome:
case n := <-m.nodeComeq:
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
if node == nil {
......@@ -795,6 +824,7 @@ type serviceDone struct {
// TODO document error meanings on return
//
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed
// XXX naming -> serve?
func (m *Master) service(ctx context.Context) (err error) {
defer task.Running(&ctx, "service")(&err)
......@@ -821,7 +851,7 @@ loop:
for {
select {
// new connection comes in
case n := <-m.nodeCome:
case n := <-m.nodeComeq:
node, state0, resp := m.identify(ctx, n, /* XXX accept everyone */)
if node == nil {
......@@ -993,11 +1023,12 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn
event := &_ΔNodeTab{nodeInfo}
// XXX locking
for nid, ch := range m.notifyTab {
// for nid, ch := range m.notifyTab {
for nid, w := range m.peerWorkTab {
// TODO change limiting by buffer size to limiting by time -
// - i.e. detach peer if event queue grows more than 30s of time.
select {
case ch <- event:
case w.notifyq <- event:
continue // ok
default:
}
......@@ -1113,7 +1144,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// XXX + state0. XXX +notifyTab.
// Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification. XXX via .accept()
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, resp proto.Msg) {
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, resp proto.Msg) {
// XXX also verify ? :
// - NodeType valid
// - IdTime ?
......@@ -1162,7 +1193,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
subj := fmt.Sprintf("identify: %s (%s)", n.req.Link().RemoteAddr(), n.idReq.NID)
if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err)
return nil, nil, err
return nil, err
}
log.Infof(ctx, "%s: accepting as %s", subj, nid)
......@@ -1196,13 +1227,15 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
node.SetLink(n.req.Link())
// make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates
state0 = m.node.State.Snapshot()
// TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details)
notifyq := make(chan _ΔClusterState, 1024)
m.notifyTab[node.NID] = notifyq
m.peerWorkTab[node.NID] = &peerWork{
wg: xsync.NewWorkGroup(m.runCtx),
state0: m.node.State.Snapshot(),
// TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details)
notifyq: make(chan _ΔClusterState, 1024),
}
return node, state0, accept
return node, accept
}
// accept sends acceptance to just identified peer, sends nodeTab and partTab
......
......@@ -1710,7 +1710,7 @@ func (req *Request) Reply(resp proto.Msg) error {
// Close must be called to free request resources.
//
// Close must be called exactly once.
// The request object cannot be used any more after call to Close.
// The request object cannot be used any more after call to Close. XXX -> rename to Release? Free?
//
// See "Lightweight mode" in top-level package doc for overview.
func (req *Request) Close() { // XXX +error?
......@@ -1788,6 +1788,7 @@ func (link *NodeLink) Ask1(req proto.Msg, resp proto.Msg) (err error) {
return err
}
// Link returns NodeLink over which the request was received.
func (req *Request) Link() *NodeLink {
return req.conn.Link()
}
......@@ -41,7 +41,7 @@ type Listener interface {
Close() error
Addr() net.Addr
// Accept accepts incoming client connection.
// Accept accepts incoming connection from a peer.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment