Commit 599cd283 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9d963b88
......@@ -58,7 +58,7 @@ type Client struct {
// driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event
head zodb.Tid // last invalidation received from server
head0 zodb.Tid // .head state on start of every talkMaster1
head0 zodb.Tid // .head state on start of every (re)connection to master
at0Mu sync.Mutex
at0 zodb.Tid // at0 obtained when initially connecting to server
......
......@@ -24,7 +24,7 @@ import (
"context"
"fmt"
"sync"
"time"
// "time"
"github.com/pkg/errors"
......@@ -48,7 +48,7 @@ import (
//
// Storage implements only NEO protocol logic with data being persisted via provided storage.Backend.
type Storage struct {
node *xneo.Node
node *_MasteredNode
// context for providing operational service
// it is renewed every time master tells us StartOpertion, so users
......@@ -57,8 +57,6 @@ type Storage struct {
opCtx context.Context
back storage.Backend
//nodeCome chan nodeCome // node connected
}
// NewStorage creates new storage node that will talk to master on masterAddr.
......@@ -67,7 +65,7 @@ type Storage struct {
// Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage.Backend) *Storage {
stor := &Storage{
node: xneo.NewNode(net, proto.STORAGE, clusterName, masterAddr),
node: newMasteredNode(proto.STORAGE, clusterName, net, masterAddr),
back: back,
}
......@@ -93,15 +91,16 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
if err != nil {
return err
}
stor.node.MyInfo.Addr = naddr
stor.node.myInfo.Addr = naddr
// wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l))
// start serving incoming connections
wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx)
/*
serveCtx, serveCancel := context.WithCancel(ctx)
//stor.node.OnShutdown = serveCancel
// XXX hack: until ctx cancel is not handled properly by Recv/Send
// XXX -> xcontext.WithCloseOnRetCancel
......@@ -109,13 +108,16 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
serveCancel()
xio.LClose(ctx, lli)
}
*/
wg.Add(1)
go func(ctx context.Context) (err error) {
defer wg.Done()
defer task.Running(&ctx, "accept")(&err)
// XXX dup from master
// XXX dup from master -> Node.Listen() -> Accept() ?
// XXX ? -> Node.Accept(lli) (it will verify IdTime against Node.nodeTab[nid])
// XXX ? -> Node.Serve(lli -> func(idReq))
for {
if ctx.Err() != nil {
return ctx.Err()
......@@ -147,28 +149,39 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// continue
// }
}
}(serveCtx)
// }(serveCtx)
}(ctx)
// connect to master and get commands and updates from it
//err = stor.talkMaster(ctx)
err = stor.talkMaster(serveCtx) // XXX hack for shutdown
err = stor.talkMaster(ctx)
// err = stor.talkMaster(serveCtx) // XXX hack for shutdown
// XXX log err?
// we are done - shutdown
serveCancel()
// serveCancel()
wg.Wait()
// XXX should Storage do it, or should it leave back non-closed?
// TODO -> Storage should not close backend.
err2 := stor.back.Close()
if err == nil {
err = err2
}
return err // XXX err ctx
return err
}
// --- connect to master and let it drive us ---
// XXX move -> SetNumReplicas handler
/*
// NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
}
*/
/* XXX kill
// talkMaster connects to master, announces self and receives commands and notifications.
// it tries to persist master link reconnecting as needed.
//
......@@ -220,13 +233,6 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX add master NID -> nodeTab ? or master will notify us with it himself ?
// XXX move -> SetNumReplicas handler
/*
// NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
}
*/
// XXX -> node.Dial ?
if accept.YourNID != stor.node.MyInfo.NID {
......@@ -251,6 +257,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
//log.Error(ctx, err)
return err
}
*/
// m1initialize drives storage by master messages during initialization phase
//
......@@ -267,7 +274,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neonet.NodeLink) (
defer task.Runningf(&ctx, "init %v", mlink)(&err)
for {
req, err := mlink.Recv1()
req, err := mlink.Recv1() // XXX -> RecvM1
if err != nil {
return nil, err
}
......@@ -377,7 +384,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neonet.Request) (err
for {
// XXX abort on ctx (XXX or upper?)
req, err := mlink.Recv1()
req, err := mlink.Recv1() // XXX -> RecvM1
if err != nil {
return err
}
......
......@@ -66,7 +66,6 @@ import (
// XXX users have to care locking explicitly
type NodeTable struct {
// XXX for PeerNode.Dial to work. see also comments vvv near "peer link"
// XXX move pointer to local node to PeerNode instead?
localNode *Node
nodev []*PeerNode // all nodes
......@@ -77,13 +76,13 @@ type NodeTable struct {
// PeerNode represents a peer node in the cluster.
type PeerNode struct {
nodeTab *NodeTable // this node is part of // XXX -> change to `local *Node` ?
nodeTab *NodeTable // this node is part of
proto.NodeInfo // (.type, .laddr, .nid, .state, .idtime) XXX also protect by mu?
linkMu sync.Mutex
link *neonet.NodeLink // link to this peer; nil if not connected
dialT time.Time // last dial finished at this time
dialT time.Time // last dial finished at this time
// dialer notifies waiters via this; reinitialized at each redial; nil while not dialing
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment