Commit 329fc53c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cbe904d8
...@@ -49,7 +49,7 @@ import ( ...@@ -49,7 +49,7 @@ import (
// Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces. // Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct { type Client struct {
nodem *_MasteredNode // XXX naming node *_MasteredNode
// Run is run under: // Run is run under:
runWG *xsync.WorkGroup runWG *xsync.WorkGroup
...@@ -77,7 +77,7 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -77,7 +77,7 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
c := &Client{ c := &Client{
nodem: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
} }
...@@ -97,7 +97,7 @@ func (c *Client) Close() (err error) { ...@@ -97,7 +97,7 @@ func (c *Client) Close() (err error) {
// close networker if configured to do so // close networker if configured to do so
if c.ownNet { if c.ownNet {
err2 := c.nodem.Node.Net.Close() err2 := c.node.Net.Close()
if err == nil { if err == nil {
err = err2 err = err2
} }
...@@ -115,7 +115,7 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -115,7 +115,7 @@ func (c *Client) Run(ctx context.Context) (err error) {
ctx, cancel := xcontext.Merge/*Cancel*/(ctx, runCtx) ctx, cancel := xcontext.Merge/*Cancel*/(ctx, runCtx)
defer cancel() defer cancel()
return c.nodem.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error { return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX errctx ("on redial"? "connected"?) // XXX errctx ("on redial"? "connected"?)
c.head0 = c.head c.head0 = c.head
...@@ -271,7 +271,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) { ...@@ -271,7 +271,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
} }
}() }()
err = c.nodem.WithOperational(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error { err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
// XXX mlink can become down while we are making the call. // XXX mlink can become down while we are making the call.
// XXX do we want to return error or retry? // XXX do we want to return error or retry?
reply := proto.AnswerLastTransaction{} reply := proto.AnswerLastTransaction{}
...@@ -296,7 +296,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -296,7 +296,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
// Retrieve storages we might need to access. // Retrieve storages we might need to access.
storv := make([]*xneo.PeerNode, 0, 1) storv := make([]*xneo.PeerNode, 0, 1)
err = c.nodem.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error { err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error {
for _, cell := range cs.PartTab.Get(xid.Oid) { for _, cell := range cs.PartTab.Get(xid.Oid) {
if cell.Readable() { if cell.Readable() {
stor := cs.NodeTab.Get(cell.NID) stor := cs.NodeTab.Get(cell.NID)
...@@ -510,10 +510,10 @@ func (c *Client) URL() string { ...@@ -510,10 +510,10 @@ func (c *Client) URL() string {
// (but we need to be able to construct URL if Client was created via NewClient directly) // (but we need to be able to construct URL if Client was created via NewClient directly)
zurl := "neo" zurl := "neo"
if strings.Contains(c.nodem.Node.Net.Network(), "+tls") { if strings.Contains(c.node.Net.Network(), "+tls") {
zurl += "s" zurl += "s"
} }
zurl += fmt.Sprintf("://%s/%s", c.nodem.Node.MasterAddr, c.nodem.Node.ClusterName) zurl += fmt.Sprintf("://%s/%s", c.node.MasterAddr, c.node.ClusterName)
return zurl return zurl
} }
......
...@@ -567,7 +567,7 @@ func withNEO(t *testing.T, f func(t *testing.T, nsrv NEOSrv, ndrv *Client), optv ...@@ -567,7 +567,7 @@ func withNEO(t *testing.T, f func(t *testing.T, nsrv NEOSrv, ndrv *Client), optv
if noautodetect { if noautodetect {
encOK = srvEnc encOK = srvEnc
} }
err = ndrv.nodem.WithOperational(context.Background(), func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error { err = ndrv.node.WithOperational(context.Background(), func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
enc := mlink.Encoding() enc := mlink.Encoding()
if enc != encOK { if enc != encOK {
t.Fatalf("connected with encoding %c ; want %c", enc, encOK) t.Fatalf("connected with encoding %c ; want %c", enc, encOK)
......
...@@ -62,7 +62,8 @@ import ( ...@@ -62,7 +62,8 @@ import (
// XXX update after introduction of _MasterLink // XXX update after introduction of _MasterLink
// XXX use `nodem *_MasteredNode` XXX naming=? // XXX use `nodem *_MasteredNode` XXX naming=?
type _MasteredNode struct { type _MasteredNode struct {
Node *xneo.Node *xneo.Node
// myInfo proto.NodeInfo // type, laddr, nid, state, idtime // myInfo proto.NodeInfo // type, laddr, nid, state, idtime
// ClusterName string // ClusterName string
// Net xnet.Networker // network AP we are sending/receiving on // Net xnet.Networker // network AP we are sending/receiving on
...@@ -156,9 +157,9 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -156,9 +157,9 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) { func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// me0 describes local node when it starts connecting to master, e.g. 'client C?'. // me0 describes local node when it starts connecting to master, e.g. 'client C?'.
// we don't use just NID because it is initially 0 and because master can tell us to change it. // we don't use just NID because it is initially 0 and because master can tell us to change it.
me0 := strings.ToLower(node.Node.MyInfo.Type.String()) me0 := strings.ToLower(node.MyInfo.Type.String())
me0 += " " me0 += " "
mynid0 := node.Node.MyInfo.NID mynid0 := node.MyInfo.NID
if mynid0 == 0 { if mynid0 == 0 {
me0 += "?" me0 += "?"
} else { } else {
...@@ -166,7 +167,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -166,7 +167,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
} }
ctx0 := ctx ctx0 := ctx
defer task.Runningf(&ctx, "%s: talk master(%s)", me0, node.Node.MasterAddr)(&err) defer task.Runningf(&ctx, "%s: talk master(%s)", me0, node.MasterAddr)(&err)
for { for {
node.updateOperational(func() { node.updateOperational(func() {
...@@ -194,23 +195,23 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -194,23 +195,23 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error { func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error {
reqID := &proto.RequestIdentification{ reqID := &proto.RequestIdentification{
NodeType: node.Node.MyInfo.Type, NodeType: node.MyInfo.Type,
NID: node.Node.MyInfo.NID, NID: node.MyInfo.NID,
Address: node.Node.MyInfo.Addr, Address: node.MyInfo.Addr,
ClusterName: node.Node.ClusterName, ClusterName: node.ClusterName,
IdTime: node.Node.MyInfo.IdTime, // XXX ok? IdTime: node.MyInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub DevPath: nil, // XXX stub
NewNID: nil, // XXX stub NewNID: nil, // XXX stub
} }
mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Node.Net, node.Node.MasterAddr, reqID) mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID)
if err != nil { if err != nil {
return err return err
} }
return xcontext.WithCloseOnErrCancel(ctx, mlink, func() (err error) { return xcontext.WithCloseOnErrCancel(ctx, mlink, func() (err error) {
if accept.YourNID != node.Node.MyInfo.NID { if accept.YourNID != node.MyInfo.NID {
log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID) log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID)
node.Node.MyInfo.NID = accept.YourNID // XXX locking ? node.MyInfo.NID = accept.YourNID // XXX locking ?
} }
// XXX verify Mnid = M*; our nid corresponds to our type // XXX verify Mnid = M*; our nid corresponds to our type
...@@ -243,7 +244,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -243,7 +244,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
node.updateOperational(func() { node.updateOperational(func() {
err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown
node.Node.State.PartTab = pt node.State.PartTab = pt
if err == nil { if err == nil {
// keep mlink=nil on shutdown so that // keep mlink=nil on shutdown so that
// .operational does not change to y. // .operational does not change to y.
...@@ -353,7 +354,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt ...@@ -353,7 +354,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok? // XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt) log.Infof(ctx, "parttab update: %s", pt)
node.Node.State.PartTab = pt node.State.PartTab = pt
// <- δ(partTab) // <- δ(partTab)
case *proto.NotifyPartitionChanges: case *proto.NotifyPartitionChanges:
...@@ -366,8 +367,8 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt ...@@ -366,8 +367,8 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
case *proto.NotifyClusterState: case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State) log.Infof(ctx, "state update: %s", msg.State)
node.Node.State.Code = msg.State node.State.Code = msg.State
traceClusterStateChanged(&node.Node.State.Code) traceClusterStateChanged(&node.State.Code)
} }
}) })
...@@ -385,7 +386,7 @@ func (node *_MasteredNode) updateOperational(δf func()) { ...@@ -385,7 +386,7 @@ func (node *_MasteredNode) updateOperational(δf func()) {
defer node.opMu.Unlock() defer node.opMu.Unlock()
δf() δf()
operational := (node.mlink != nil) && node.Node.State.IsOperational() operational := (node.mlink != nil) && node.State.IsOperational()
//fmt.Printf("\nupdateOperatinal: %v\n", operational) //fmt.Printf("\nupdateOperatinal: %v\n", operational)
//fmt.Printf(" mlink: %s\n", node.mlink) //fmt.Printf(" mlink: %s\n", node.mlink)
...@@ -436,7 +437,7 @@ func (node *_MasteredNode) WithOperational(ctx context.Context, f func(mlink *ne ...@@ -436,7 +437,7 @@ func (node *_MasteredNode) WithOperational(ctx context.Context, f func(mlink *ne
// node.operational=y and node.opMu is rlocked // node.operational=y and node.opMu is rlocked
defer node.opMu.RUnlock() defer node.opMu.RUnlock()
return f(node.mlink, &node.Node.State) return f(node.mlink, &node.State)
} }
...@@ -449,17 +450,17 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -449,17 +450,17 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
// XXX msg.IdTime ? // XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "<- node: %v", nodeInfo) log.Infof(ctx, "<- node: %v", nodeInfo)
node.Node.State.NodeTab.Update(nodeInfo) node.State.NodeTab.Update(nodeInfo)
// we have to provide IdTime when requesting identification to other peers // we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master") // (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
// TODO .State = DOWN -> ResetLink // TODO .State = DOWN -> ResetLink
if nodeInfo.NID == node.Node.MyInfo.NID { if nodeInfo.NID == node.MyInfo.NID {
// XXX recheck locking // XXX recheck locking
// XXX do .myInfo = nodeInfo ? // XXX do .myInfo = nodeInfo ?
node.Node.MyInfo.IdTime = nodeInfo.IdTime node.MyInfo.IdTime = nodeInfo.IdTime
// NEO/py currently employs this hack // NEO/py currently employs this hack
// FIXME -> better it be separate command and handled cleanly // FIXME -> better it be separate command and handled cleanly
...@@ -472,6 +473,6 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -472,6 +473,6 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
} }
// XXX logging under lock ok? // XXX logging under lock ok?
log.Infof(ctx, "full nodetab:\n%s", node.Node.State.NodeTab) log.Infof(ctx, "full nodetab:\n%s", node.State.NodeTab)
return nil return nil
} }
...@@ -91,7 +91,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -91,7 +91,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
if err != nil { if err != nil {
return err return err
} }
stor.node.Node.MyInfo.Addr = naddr stor.node.MyInfo.Addr = naddr
// wrap listener with link / identificaton hello checker // wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l)) lli := xneo.NewListener(neonet.NewLinkListener(l))
...@@ -208,16 +208,16 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -208,16 +208,16 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
case *proto.Recovery: case *proto.Recovery:
err = req.Reply(&proto.AnswerRecovery{ err = req.Reply(&proto.AnswerRecovery{
PTid: stor.node.Node.State.PartTab.PTid, PTid: stor.node.State.PartTab.PTid,
BackupTid: proto.INVALID_TID, BackupTid: proto.INVALID_TID,
TruncateTid: proto.INVALID_TID}) TruncateTid: proto.INVALID_TID})
case *proto.AskPartitionTable: case *proto.AskPartitionTable:
// TODO initially read PT from disk // TODO initially read PT from disk
err = req.Reply(&proto.AnswerPartitionTable{ err = req.Reply(&proto.AnswerPartitionTable{
PTid: stor.node.Node.State.PartTab.PTid, PTid: stor.node.State.PartTab.PTid,
NumReplicas: 0, // FIXME hardcoded; NEO/py uses this as n(replica)-1 NumReplicas: 0, // FIXME hardcoded; NEO/py uses this as n(replica)-1
RowList: stor.node.Node.State.PartTab.Dump()}) RowList: stor.node.State.PartTab.Dump()})
case *proto.LockedTransactions: case *proto.LockedTransactions:
// XXX r/o stub // XXX r/o stub
...@@ -309,7 +309,7 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo ...@@ -309,7 +309,7 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo
if idReq.NodeType != proto.CLIENT { if idReq.NodeType != proto.CLIENT {
return &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}, false return &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}, false
} }
if idReq.ClusterName != stor.node.Node.ClusterName { if idReq.ClusterName != stor.node.ClusterName {
return &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}, false return &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}, false
} }
...@@ -323,8 +323,8 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo ...@@ -323,8 +323,8 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo
} }
return &proto.AcceptIdentification{ return &proto.AcceptIdentification{
NodeType: stor.node.Node.MyInfo.Type, NodeType: stor.node.MyInfo.Type,
MyNID: stor.node.Node.MyInfo.NID, // XXX lock wrt update MyNID: stor.node.MyInfo.NID, // XXX lock wrt update
YourNID: idReq.NID, YourNID: idReq.NID,
}, true }, true
} }
......
...@@ -311,7 +311,7 @@ func (t *tCluster) Storage(name string) ITestStorage { ...@@ -311,7 +311,7 @@ func (t *tCluster) Storage(name string) ITestStorage {
func (t *tCluster) NewClient(name, masterAddr string) ITestClient { func (t *tCluster) NewClient(name, masterAddr string) ITestClient {
tnode := t.registerNewNode(name) tnode := t.registerNewNode(name)
c := NewClient(t.name, masterAddr, tnode.net) c := NewClient(t.name, masterAddr, tnode.net)
t.gotracer.RegisterNode(c.nodem.Node, name) t.gotracer.RegisterNode(c.node.Node, name)
t.runWG.Go(func(ctx context.Context) error { t.runWG.Go(func(ctx context.Context) error {
return c.Run(ctx) return c.Run(ctx)
}) })
...@@ -343,7 +343,7 @@ func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, ...@@ -343,7 +343,7 @@ func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker,
} }
func (s *tStorage) Run(ctx context.Context) error { func (s *tStorage) Run(ctx context.Context) error {
l, err := s.node.Node.Net.Listen(ctx, s.serveAddr) l, err := s.node.Net.Listen(ctx, s.serveAddr)
if err != nil { if err != nil {
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment