Commit b92f383c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9c047ee1
...@@ -77,8 +77,8 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -77,8 +77,8 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
c := &Client{ c := &Client{
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
} }
var runCtx context.Context var runCtx context.Context
...@@ -108,12 +108,11 @@ func (c *Client) Close() (err error) { ...@@ -108,12 +108,11 @@ func (c *Client) Close() (err error) {
// Run starts client node and runs it until either ctx is canceled or master // Run starts client node and runs it until either ctx is canceled or master
// commands it to shutdown. // commands it to shutdown.
func (c *Client) Run(ctx context.Context) (err error) { func (c *Client) Run(ctx context.Context) (err error) {
// run process which performs master talk
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
c.runCancel = cancel c.runCancel = cancel
c.runWG.Go(func(runCtx context.Context) error { c.runWG.Go(func(runCtx context.Context) error {
ctx, cancel := xcontext.Merge(ctx, runCtx) ctx, cancel := xcontext.Merge/*Cancel*/(ctx, runCtx)
defer cancel() defer cancel()
return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error { return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
...@@ -313,14 +312,13 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -313,14 +312,13 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
return nil, 0, err return nil, 0, err
} }
if len(storv) == 0 { if len(storv) == 0 {
// XXX recheck it adds traceback to log -> XXX it does not -> add our Bugf which always forces +v on such error print // XXX recheck it adds traceback to log -> XXX it does not -> add our Bugf which always forces +v on such error print
return nil, 0, errors.Errorf("internal inconsistency: cluster is operational, but no storages alive for oid %v", xid.Oid) return nil, 0, errors.Errorf("internal inconsistency: cluster is operational, but no storages alive for oid %v", xid.Oid)
} }
// XXX vvv temp stub -> TODO pick up 3 random storages and send load // XXX vvv temp stub -> TODO pick up 3 random storages and send load
// requests to them all getting the first who is the fastest to reply; // requests to them all, getting the first who is the fastest to reply;
// retry from the beginning if all are found to fail? // retry from the beginning if all are found to fail?
stor := storv[rand.Intn(len(storv))] stor := storv[rand.Intn(len(storv))]
......
...@@ -350,7 +350,7 @@ loop: ...@@ -350,7 +350,7 @@ loop:
log.Error(ctx, r.err) log.Error(ctx, r.err)
if !xcontext.Canceled(r.err) { if !xcontext.Canceled(r.err) {
r.stor.CloseLink(ctx) r.stor.ResetLink(ctx)
r.stor.SetState(proto.DOWN) r.stor.SetState(proto.DOWN)
} }
...@@ -434,7 +434,7 @@ loop2: ...@@ -434,7 +434,7 @@ loop2:
log.Error(ctx, r.err) log.Error(ctx, r.err)
if !xcontext.Canceled(r.err) { if !xcontext.Canceled(r.err) {
r.stor.CloseLink(ctx) r.stor.ResetLink(ctx)
r.stor.SetState(proto.DOWN) r.stor.SetState(proto.DOWN)
} }
...@@ -610,7 +610,7 @@ loop: ...@@ -610,7 +610,7 @@ loop:
log.Error(ctx, v.err) log.Error(ctx, v.err)
if !xcontext.Canceled(v.err) { if !xcontext.Canceled(v.err) {
v.stor.CloseLink(ctx) v.stor.ResetLink(ctx)
v.stor.SetState(proto.DOWN) v.stor.SetState(proto.DOWN)
} }
...@@ -659,7 +659,7 @@ loop2: ...@@ -659,7 +659,7 @@ loop2:
log.Error(ctx, v.err) log.Error(ctx, v.err)
if !xcontext.Canceled(v.err) { if !xcontext.Canceled(v.err) {
v.stor.CloseLink(ctx) v.stor.ResetLink(ctx)
v.stor.SetState(proto.DOWN) v.stor.SetState(proto.DOWN)
} }
...@@ -897,7 +897,7 @@ func (m *Master) serveClient(ctx context.Context, cli *xneo.Node) (err error) { ...@@ -897,7 +897,7 @@ func (m *Master) serveClient(ctx context.Context, cli *xneo.Node) (err error) {
defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.CloseLink? defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink?
// FIXME send initial nodeTab and partTab before starting serveClient1 // FIXME send initial nodeTab and partTab before starting serveClient1
// (move those initial sends from keepPeerUpdated to here) // (move those initial sends from keepPeerUpdated to here)
......
...@@ -438,6 +438,9 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -438,6 +438,9 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
// we have to provide IdTime when requesting identification to other peers // we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master") // (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
// TODO .State = DOWN -> ResetLink
if nodeInfo.NID == node.myInfo.NID { if nodeInfo.NID == node.myInfo.NID {
// XXX recheck locking // XXX recheck locking
// XXX do .myInfo = nodeInfo ? // XXX do .myInfo = nodeInfo ?
......
...@@ -61,16 +61,14 @@ import ( ...@@ -61,16 +61,14 @@ import (
// a connection to node is lost associated entry is marked as having DOWN (XXX // a connection to node is lost associated entry is marked as having DOWN (XXX
// or UNKNOWN ?) node state. // or UNKNOWN ?) node state.
// //
// NodeTable zero value is valid empty node table. // NodeTable zero value is valid empty node table. XXX recheck
//
// XXX users have to care locking explicitly
type NodeTable struct { type NodeTable struct {
// XXX for Node.Dial to work. see also comments vvv near "peer link" // XXX for Node.Dial to work. see also comments vvv near "peer link"
nodeApp *NodeApp nodeApp *NodeApp
// users have to care locking explicitly nodev []*Node // all nodes
//sync.RWMutex XXX needed ?
//storv []*Node // storages
nodev []*Node // all other nodes
notifyv []chan proto.NodeInfo // subscribers notifyv []chan proto.NodeInfo // subscribers
} }
...@@ -82,7 +80,7 @@ type NodeTable struct { ...@@ -82,7 +80,7 @@ type NodeTable struct {
type Node struct { type Node struct {
nodeTab *NodeTable // this node is part of nodeTab *NodeTable // this node is part of
proto.NodeInfo // .type, .addr, .nid, ... XXX also protect by mu? proto.NodeInfo // (.type, .laddr, .nid, .state, .idtime) XXX also protect by mu?
linkMu sync.Mutex linkMu sync.Mutex
link *neonet.NodeLink // link to this peer; nil if not connected link *neonet.NodeLink // link to this peer; nil if not connected
...@@ -182,7 +180,6 @@ func (n *Node) SetState(state proto.NodeState) { ...@@ -182,7 +180,6 @@ func (n *Node) SetState(state proto.NodeState) {
func (nt *NodeTable) String() string { func (nt *NodeTable) String() string {
buf := bytes.Buffer{} buf := bytes.Buffer{}
// XXX also for .storv
for _, n := range nt.nodev { for _, n := range nt.nodev {
// XXX recheck output // XXX recheck output
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\t@ %s\n", n.NID, n.Type, n.State, n.Addr, n.IdTime) fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\t@ %s\n", n.NID, n.Type, n.State, n.Addr, n.IdTime)
...@@ -192,6 +189,7 @@ func (nt *NodeTable) String() string { ...@@ -192,6 +189,7 @@ func (nt *NodeTable) String() string {
} }
// ---- subscription to nodetab updates ---- // ---- subscription to nodetab updates ----
// XXX used only by M -> move into M?
// notify notifies NodeTable subscribers that nodeInfo was updated // notify notifies NodeTable subscribers that nodeInfo was updated
func (nt *NodeTable) notify(nodeInfo proto.NodeInfo) { func (nt *NodeTable) notify(nodeInfo proto.NodeInfo) {
...@@ -277,7 +275,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe ...@@ -277,7 +275,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe
// TODO review peer link dialing / setting / accepting. // TODO review peer link dialing / setting / accepting.
// //
// Keep in mind that in NEO in general case it is not client/server but peer-to-peer // Keep in mind that in NEO in general case it is not client/server but peer-to-peer
// e.g. when two S establish a link in between then to exchange/sync data. // e.g. when two S establish a link in between them to exchange/sync data.
// //
// Also the distinction between S and M should go away as every S should // Also the distinction between S and M should go away as every S should
// be taught to also become M (and thus separate M nodes go away // be taught to also become M (and thus separate M nodes go away
...@@ -287,7 +285,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe ...@@ -287,7 +285,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe
// SetLink sets link to peer node. // SetLink sets link to peer node.
// XXX // XXX
// //
// See also: Link, CloseLink, Dial. // See also: Link, ResetLink, Dial.
func (p *Node) SetLink(link *neonet.NodeLink) { func (p *Node) SetLink(link *neonet.NodeLink) {
// XXX see Link about locking - whether it is needed here or not // XXX see Link about locking - whether it is needed here or not
p.linkMu.Lock() p.linkMu.Lock()
...@@ -309,8 +307,8 @@ func (p *Node) Link() *neonet.NodeLink { ...@@ -309,8 +307,8 @@ func (p *Node) Link() *neonet.NodeLink {
return link return link
} }
// CloseLink closes link to peer and sets it to nil. // ResetLink closes link to peer and sets it to nil.
func (p *Node) CloseLink(ctx context.Context) { func (p *Node) ResetLink(ctx context.Context) {
p.linkMu.Lock() p.linkMu.Lock()
link := p.link link := p.link
p.link = nil p.link = nil
...@@ -324,7 +322,6 @@ func (p *Node) CloseLink(ctx context.Context) { ...@@ -324,7 +322,6 @@ func (p *Node) CloseLink(ctx context.Context) {
log.Error(ctx, err) log.Error(ctx, err)
} }
} }
} }
// dial does low-level work to dial peer // dial does low-level work to dial peer
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment