Commit cbe904d8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0af56694
...@@ -49,7 +49,7 @@ import ( ...@@ -49,7 +49,7 @@ import (
// Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces. // Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct { type Client struct {
node *_MasteredNode nodem *_MasteredNode // XXX naming
// Run is run under: // Run is run under:
runWG *xsync.WorkGroup runWG *xsync.WorkGroup
...@@ -77,7 +77,7 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -77,7 +77,7 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
c := &Client{ c := &Client{
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), nodem: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
} }
...@@ -97,7 +97,7 @@ func (c *Client) Close() (err error) { ...@@ -97,7 +97,7 @@ func (c *Client) Close() (err error) {
// close networker if configured to do so // close networker if configured to do so
if c.ownNet { if c.ownNet {
err2 := c.node.Net.Close() err2 := c.nodem.Node.Net.Close()
if err == nil { if err == nil {
err = err2 err = err2
} }
...@@ -115,7 +115,7 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -115,7 +115,7 @@ func (c *Client) Run(ctx context.Context) (err error) {
ctx, cancel := xcontext.Merge/*Cancel*/(ctx, runCtx) ctx, cancel := xcontext.Merge/*Cancel*/(ctx, runCtx)
defer cancel() defer cancel()
return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error { return c.nodem.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX errctx ("on redial"? "connected"?) // XXX errctx ("on redial"? "connected"?)
c.head0 = c.head c.head0 = c.head
...@@ -271,7 +271,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) { ...@@ -271,7 +271,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
} }
}() }()
err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error { err = c.nodem.WithOperational(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
// XXX mlink can become down while we are making the call. // XXX mlink can become down while we are making the call.
// XXX do we want to return error or retry? // XXX do we want to return error or retry?
reply := proto.AnswerLastTransaction{} reply := proto.AnswerLastTransaction{}
...@@ -296,7 +296,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -296,7 +296,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
// Retrieve storages we might need to access. // Retrieve storages we might need to access.
storv := make([]*xneo.PeerNode, 0, 1) storv := make([]*xneo.PeerNode, 0, 1)
err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error { err = c.nodem.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error {
for _, cell := range cs.PartTab.Get(xid.Oid) { for _, cell := range cs.PartTab.Get(xid.Oid) {
if cell.Readable() { if cell.Readable() {
stor := cs.NodeTab.Get(cell.NID) stor := cs.NodeTab.Get(cell.NID)
...@@ -510,10 +510,10 @@ func (c *Client) URL() string { ...@@ -510,10 +510,10 @@ func (c *Client) URL() string {
// (but we need to be able to construct URL if Client was created via NewClient directly) // (but we need to be able to construct URL if Client was created via NewClient directly)
zurl := "neo" zurl := "neo"
if strings.Contains(c.node.Net.Network(), "+tls") { if strings.Contains(c.nodem.Node.Net.Network(), "+tls") {
zurl += "s" zurl += "s"
} }
zurl += fmt.Sprintf("://%s/%s", c.node.MasterAddr, c.node.ClusterName) zurl += fmt.Sprintf("://%s/%s", c.nodem.Node.MasterAddr, c.nodem.Node.ClusterName)
return zurl return zurl
} }
......
...@@ -567,7 +567,7 @@ func withNEO(t *testing.T, f func(t *testing.T, nsrv NEOSrv, ndrv *Client), optv ...@@ -567,7 +567,7 @@ func withNEO(t *testing.T, f func(t *testing.T, nsrv NEOSrv, ndrv *Client), optv
if noautodetect { if noautodetect {
encOK = srvEnc encOK = srvEnc
} }
err = ndrv.node.WithOperational(context.Background(), func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error { err = ndrv.nodem.WithOperational(context.Background(), func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
enc := mlink.Encoding() enc := mlink.Encoding()
if enc != encOK { if enc != encOK {
t.Fatalf("connected with encoding %c ; want %c", enc, encOK) t.Fatalf("connected with encoding %c ; want %c", enc, encOK)
......
...@@ -74,7 +74,7 @@ type Master struct { ...@@ -74,7 +74,7 @@ type Master struct {
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewMaster(clusterName string, net xnet.Networker) *Master { func NewMaster(clusterName string, net xnet.Networker) *Master {
m := &Master{ m := &Master{
node: xneo.NewNode(net, proto.MASTER, clusterName, ""), node: xneo.NewNode(proto.MASTER, clusterName, net, ""),
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}), ctlStop: make(chan chan struct{}),
...@@ -120,7 +120,7 @@ func (m *Master) Shutdown() error { ...@@ -120,7 +120,7 @@ func (m *Master) Shutdown() error {
// setClusterState sets .clusterState and notifies subscribers. // setClusterState sets .clusterState and notifies subscribers.
func (m *Master) setClusterState(state proto.ClusterState) { func (m *Master) setClusterState(state proto.ClusterState) {
m.node.ClusterState.Set(state) m.node.State.Code.Set(state)
// TODO notify subscribers // TODO notify subscribers
} }
...@@ -148,7 +148,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -148,7 +148,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
} }
// update nodeTab with self // update nodeTab with self
m.node.NodeTab.Update(m.node.MyInfo) m.node.State.NodeTab.Update(m.node.MyInfo)
// wrap listener with link / identificaton hello checker // wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l)) lli := xneo.NewListener(neonet.NewLinkListener(l))
...@@ -299,7 +299,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -299,7 +299,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
// start recovery on all storages we are currently in touch with // start recovery on all storages we are currently in touch with
// XXX close links to clients // XXX close links to clients
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
wg.Add(1) wg.Add(1)
...@@ -358,18 +358,18 @@ loop: ...@@ -358,18 +358,18 @@ loop:
// we are interested in latest partTab // we are interested in latest partTab
// NOTE during recovery no one must be subscribed to // NOTE during recovery no one must be subscribed to
// partTab so it is ok to simply change whole m.partTab // partTab so it is ok to simply change whole m.partTab
if r.partTab.PTid > m.node.PartTab.PTid { if r.partTab.PTid > m.node.State.PartTab.PTid {
m.node.PartTab = r.partTab m.node.State.PartTab = r.partTab
} }
} }
// update indicator whether cluster currently can be operational or not // update indicator whether cluster currently can be operational or not
var ready bool var ready bool
if m.node.PartTab.PTid == 0 { if m.node.State.PartTab.PTid == 0 {
// new cluster - allow startup if we have some storages passed // new cluster - allow startup if we have some storages passed
// recovery and there is no in-progress recovery running // recovery and there is no in-progress recovery running
nup := 0 nup := 0
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN { if stor.State > proto.DOWN {
nup++ nup++
} }
...@@ -377,7 +377,7 @@ loop: ...@@ -377,7 +377,7 @@ loop:
ready = (nup > 0 && inprogress == 0) ready = (nup > 0 && inprogress == 0)
} else { } else {
ready = m.node.PartTab.OperationalWith(m.node.NodeTab) // XXX + node state ready = m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) // XXX + node state
} }
if readyToStart != ready { if readyToStart != ready {
...@@ -451,24 +451,24 @@ loop2: ...@@ -451,24 +451,24 @@ loop2:
// S PENDING -> RUNNING // S PENDING -> RUNNING
// XXX recheck logic is ok for when starting existing cluster // XXX recheck logic is ok for when starting existing cluster
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State == proto.PENDING { if stor.State == proto.PENDING {
stor.SetState(proto.RUNNING) stor.SetState(proto.RUNNING)
} }
} }
// if we are starting for new cluster - create partition table // if we are starting for new cluster - create partition table
if m.node.PartTab.PTid == 0 { if m.node.State.PartTab.PTid == 0 {
// XXX -> m.nodeTab.StorageList(State > DOWN) // XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*xneo.PeerNode{} storv := []*xneo.PeerNode{}
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN { if stor.State > proto.DOWN {
storv = append(storv, stor) storv = append(storv, stor)
} }
} }
m.node.PartTab = xneo.MakePartTab(1 /* XXX hardcoded */, storv) m.node.State.PartTab = xneo.MakePartTab(1 /* XXX hardcoded */, storv)
m.node.PartTab.PTid = 1 m.node.State.PartTab.PTid = 1
log.Infof(ctx, "creating new partition table: %s", m.node.PartTab) log.Infof(ctx, "creating new partition table: %s", m.node.State.PartTab)
} }
return nil return nil
...@@ -548,13 +548,13 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -548,13 +548,13 @@ func (m *Master) verify(ctx context.Context) (err error) {
// XXX (= py), rationale=? // XXX (= py), rationale=?
// start verification on all storages we are currently in touch with // start verification on all storages we are currently in touch with
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
storCtlVerify(ctx, stor, m.node.PartTab, verify) storCtlVerify(ctx, stor, m.node.State.PartTab, verify)
}() }()
} }
} }
...@@ -582,7 +582,7 @@ loop: ...@@ -582,7 +582,7 @@ loop:
return return
} }
storCtlVerify(ctx, node, m.node.PartTab, verify) storCtlVerify(ctx, node, m.node.State.PartTab, verify)
}() }()
/* /*
...@@ -590,7 +590,7 @@ loop: ...@@ -590,7 +590,7 @@ loop:
n.node.SetState(proto.DOWN) n.node.SetState(proto.DOWN)
// if cluster became non-operational - we cancel verification // if cluster became non-operational - we cancel verification
if !m.node.PartTab.OperationalWith(m.node.NodeTab) { if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
// XXX ok to instantly cancel? or better // XXX ok to instantly cancel? or better
// graceful shutdown in-flight verifications? // graceful shutdown in-flight verifications?
vcancel() vcancel()
...@@ -616,7 +616,7 @@ loop: ...@@ -616,7 +616,7 @@ loop:
// check partTab is still operational // check partTab is still operational
// if not -> cancel to go back to recovery // if not -> cancel to go back to recovery
if !m.node.PartTab.OperationalWith(m.node.NodeTab) { if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
vcancel() vcancel()
err = errClusterDegraded err = errClusterDegraded
break loop break loop
...@@ -758,7 +758,7 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -758,7 +758,7 @@ func (m *Master) service(ctx context.Context) (err error) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
// spawn per-storage service driver // spawn per-storage service driver
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State == proto.RUNNING { // XXX note PENDING - not adding to service; ok? if stor.State == proto.RUNNING { // XXX note PENDING - not adding to service; ok?
wg.Add(1) wg.Add(1)
go func() { go func() {
...@@ -814,7 +814,7 @@ loop: ...@@ -814,7 +814,7 @@ loop:
n.node.SetState(proto.DOWN) n.node.SetState(proto.DOWN)
// if cluster became non-operational - cancel service // if cluster became non-operational - cancel service
if !m.node.PartTab.OperationalWith(m.node.NodeTab) { if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
err = errClusterDegraded err = errClusterDegraded
break loop break loop
} }
...@@ -953,19 +953,19 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er ...@@ -953,19 +953,19 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
//clusterState := m.node.ClusterState //clusterState := m.node.ClusterState
// XXX ^^^ + subscribe // XXX ^^^ + subscribe
nodev := m.node.NodeTab.All() nodev := m.node.State.NodeTab.All()
nodeiv := make([]proto.NodeInfo, len(nodev)) nodeiv := make([]proto.NodeInfo, len(nodev))
for i, node := range nodev { for i, node := range nodev {
// NOTE .NodeInfo is data not pointers - so won't change after we copy it to nodeiv // NOTE .NodeInfo is data not pointers - so won't change after we copy it to nodeiv
nodeiv[i] = node.NodeInfo nodeiv[i] = node.NodeInfo
} }
ptid := m.node.PartTab.PTid ptid := m.node.State.PartTab.PTid
ptnr := uint32(0) // FIXME hardcoded NumReplicas; NEO/py keeps this as n(replica)-1 ptnr := uint32(0) // FIXME hardcoded NumReplicas; NEO/py keeps this as n(replica)-1
ptv := m.node.PartTab.Dump() ptv := m.node.State.PartTab.Dump()
// XXX RLock is not enough for subscribe - right? // XXX RLock is not enough for subscribe - right?
nodech, nodeUnsubscribe := m.node.NodeTab.SubscribeBuffered() nodech, nodeUnsubscribe := m.node.State.NodeTab.SubscribeBuffered()
m.node.StateMu.RUnlock() m.node.StateMu.RUnlock()
...@@ -1053,7 +1053,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1053,7 +1053,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
// XXX check nid matches NodeType // XXX check nid matches NodeType
node = m.node.NodeTab.Get(nid) node = m.node.State.NodeTab.Get(nid)
if node != nil { if node != nil {
// reject - nid is already occupied by someone else // reject - nid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting // XXX check also for down state - it could be the same node reconnecting
...@@ -1064,7 +1064,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1064,7 +1064,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
// XXX ok to have this logic inside identify? (better provide from outside ?) // XXX ok to have this logic inside identify? (better provide from outside ?)
switch nodeType { switch nodeType {
case proto.CLIENT: case proto.CLIENT:
if m.node.ClusterState != proto.ClusterRunning { if m.node.State.Code != proto.ClusterRunning {
return &proto.Error{proto.NOT_READY, "cluster not operational"} return &proto.Error{proto.NOT_READY, "cluster not operational"}
} }
...@@ -1112,7 +1112,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1112,7 +1112,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
IdTime: proto.IdTime(m.monotime()), IdTime: proto.IdTime(m.monotime()),
} }
node = m.node.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers node = m.node.State.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers
node.SetLink(n.req.Link()) node.SetLink(n.req.Link())
return node, accept return node, accept
} }
...@@ -1123,7 +1123,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1123,7 +1123,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
func (m *Master) allocNID(nodeType proto.NodeType) proto.NodeID { func (m *Master) allocNID(nodeType proto.NodeType) proto.NodeID {
for num := int32(1); num < 1<<24; num++ { for num := int32(1); num < 1<<24; num++ {
nid := proto.NID(nodeType, num) nid := proto.NID(nodeType, num)
if m.node.NodeTab.Get(nid) == nil { if m.node.State.NodeTab.Get(nid) == nil {
return nid return nid
} }
} }
......
...@@ -60,8 +60,9 @@ import ( ...@@ -60,8 +60,9 @@ import (
// The connection to master is persisted by redial as needed. // The connection to master is persisted by redial as needed.
// //
// XXX update after introduction of _MasterLink // XXX update after introduction of _MasterLink
// XXX use `nodem *_MasteredNode` XXX naming=?
type _MasteredNode struct { type _MasteredNode struct {
node *xneo.Node Node *xneo.Node
// myInfo proto.NodeInfo // type, laddr, nid, state, idtime // myInfo proto.NodeInfo // type, laddr, nid, state, idtime
// ClusterName string // ClusterName string
// Net xnet.Networker // network AP we are sending/receiving on // Net xnet.Networker // network AP we are sending/receiving on
...@@ -114,7 +115,7 @@ const ( ...@@ -114,7 +115,7 @@ const (
// XXX doc // XXX doc
func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *_MasteredNode { func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *_MasteredNode {
node := &_MasteredNode{ node := &_MasteredNode{
node: xneo.NewNode(typ, clusterName, net, masterAddr), Node: xneo.NewNode(typ, clusterName, net, masterAddr),
/* /*
myInfo: proto.NodeInfo{ myInfo: proto.NodeInfo{
Type: typ, Type: typ,
...@@ -155,9 +156,9 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -155,9 +156,9 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) { func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// me0 describes local node when it starts connecting to master, e.g. 'client C?'. // me0 describes local node when it starts connecting to master, e.g. 'client C?'.
// we don't use just NID because it is initially 0 and because master can tell us to change it. // we don't use just NID because it is initially 0 and because master can tell us to change it.
me0 := strings.ToLower(node.myInfo.Type.String()) me0 := strings.ToLower(node.Node.MyInfo.Type.String())
me0 += " " me0 += " "
mynid0 := node.myInfo.NID mynid0 := node.Node.MyInfo.NID
if mynid0 == 0 { if mynid0 == 0 {
me0 += "?" me0 += "?"
} else { } else {
...@@ -165,7 +166,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -165,7 +166,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
} }
ctx0 := ctx ctx0 := ctx
defer task.Runningf(&ctx, "%s: talk master(%s)", me0, node.MasterAddr)(&err) defer task.Runningf(&ctx, "%s: talk master(%s)", me0, node.Node.MasterAddr)(&err)
for { for {
node.updateOperational(func() { node.updateOperational(func() {
...@@ -193,23 +194,23 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -193,23 +194,23 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error { func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error {
reqID := &proto.RequestIdentification{ reqID := &proto.RequestIdentification{
NodeType: node.myInfo.Type, NodeType: node.Node.MyInfo.Type,
NID: node.myInfo.NID, NID: node.Node.MyInfo.NID,
Address: node.myInfo.Addr, Address: node.Node.MyInfo.Addr,
ClusterName: node.ClusterName, ClusterName: node.Node.ClusterName,
IdTime: node.myInfo.IdTime, // XXX ok? IdTime: node.Node.MyInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub DevPath: nil, // XXX stub
NewNID: nil, // XXX stub NewNID: nil, // XXX stub
} }
mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID) mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Node.Net, node.Node.MasterAddr, reqID)
if err != nil { if err != nil {
return err return err
} }
return xcontext.WithCloseOnErrCancel(ctx, mlink, func() (err error) { return xcontext.WithCloseOnErrCancel(ctx, mlink, func() (err error) {
if accept.YourNID != node.myInfo.NID { if accept.YourNID != node.Node.MyInfo.NID {
log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID) log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID)
node.myInfo.NID = accept.YourNID // XXX locking ? node.Node.MyInfo.NID = accept.YourNID // XXX locking ?
} }
// XXX verify Mnid = M*; our nid corresponds to our type // XXX verify Mnid = M*; our nid corresponds to our type
...@@ -242,7 +243,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -242,7 +243,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
node.updateOperational(func() { node.updateOperational(func() {
err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown
node.state.PartTab = pt node.Node.State.PartTab = pt
if err == nil { if err == nil {
// keep mlink=nil on shutdown so that // keep mlink=nil on shutdown so that
// .operational does not change to y. // .operational does not change to y.
...@@ -352,7 +353,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt ...@@ -352,7 +353,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok? // XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt) log.Infof(ctx, "parttab update: %s", pt)
node.state.PartTab = pt node.Node.State.PartTab = pt
// <- δ(partTab) // <- δ(partTab)
case *proto.NotifyPartitionChanges: case *proto.NotifyPartitionChanges:
...@@ -365,8 +366,8 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt ...@@ -365,8 +366,8 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
case *proto.NotifyClusterState: case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State) log.Infof(ctx, "state update: %s", msg.State)
node.state.Code = msg.State node.Node.State.Code = msg.State
traceClusterStateChanged(&node.state.Code) traceClusterStateChanged(&node.Node.State.Code)
} }
}) })
...@@ -384,7 +385,7 @@ func (node *_MasteredNode) updateOperational(δf func()) { ...@@ -384,7 +385,7 @@ func (node *_MasteredNode) updateOperational(δf func()) {
defer node.opMu.Unlock() defer node.opMu.Unlock()
δf() δf()
operational := (node.mlink != nil) && node.state.IsOperational() operational := (node.mlink != nil) && node.Node.State.IsOperational()
//fmt.Printf("\nupdateOperatinal: %v\n", operational) //fmt.Printf("\nupdateOperatinal: %v\n", operational)
//fmt.Printf(" mlink: %s\n", node.mlink) //fmt.Printf(" mlink: %s\n", node.mlink)
...@@ -435,7 +436,7 @@ func (node *_MasteredNode) WithOperational(ctx context.Context, f func(mlink *ne ...@@ -435,7 +436,7 @@ func (node *_MasteredNode) WithOperational(ctx context.Context, f func(mlink *ne
// node.operational=y and node.opMu is rlocked // node.operational=y and node.opMu is rlocked
defer node.opMu.RUnlock() defer node.opMu.RUnlock()
return f(node.mlink, &node.state) return f(node.mlink, &node.Node.State)
} }
...@@ -448,17 +449,17 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -448,17 +449,17 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
// XXX msg.IdTime ? // XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "<- node: %v", nodeInfo) log.Infof(ctx, "<- node: %v", nodeInfo)
node.state.NodeTab.Update(nodeInfo) node.Node.State.NodeTab.Update(nodeInfo)
// we have to provide IdTime when requesting identification to other peers // we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master") // (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
// TODO .State = DOWN -> ResetLink // TODO .State = DOWN -> ResetLink
if nodeInfo.NID == node.myInfo.NID { if nodeInfo.NID == node.Node.MyInfo.NID {
// XXX recheck locking // XXX recheck locking
// XXX do .myInfo = nodeInfo ? // XXX do .myInfo = nodeInfo ?
node.myInfo.IdTime = nodeInfo.IdTime node.Node.MyInfo.IdTime = nodeInfo.IdTime
// NEO/py currently employs this hack // NEO/py currently employs this hack
// FIXME -> better it be separate command and handled cleanly // FIXME -> better it be separate command and handled cleanly
...@@ -471,6 +472,6 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -471,6 +472,6 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
} }
// XXX logging under lock ok? // XXX logging under lock ok?
log.Infof(ctx, "full nodetab:\n%s", node.state.NodeTab) log.Infof(ctx, "full nodetab:\n%s", node.Node.State.NodeTab)
return nil return nil
} }
...@@ -48,7 +48,7 @@ import ( ...@@ -48,7 +48,7 @@ import (
// //
// Storage implements only NEO protocol logic with data being persisted via provided storage.Backend. // Storage implements only NEO protocol logic with data being persisted via provided storage.Backend.
type Storage struct { type Storage struct {
node *_MasteredNode node *_MasteredNode // XXX -> nodem ?
// context for providing operational service // context for providing operational service
// it is renewed every time master tells us StartOpertion, so users // it is renewed every time master tells us StartOpertion, so users
...@@ -91,7 +91,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -91,7 +91,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
if err != nil { if err != nil {
return err return err
} }
stor.node.myInfo.Addr = naddr stor.node.Node.MyInfo.Addr = naddr
// wrap listener with link / identificaton hello checker // wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l)) lli := xneo.NewListener(neonet.NewLinkListener(l))
...@@ -208,16 +208,16 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -208,16 +208,16 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
case *proto.Recovery: case *proto.Recovery:
err = req.Reply(&proto.AnswerRecovery{ err = req.Reply(&proto.AnswerRecovery{
PTid: stor.node.state.PartTab.PTid, PTid: stor.node.Node.State.PartTab.PTid,
BackupTid: proto.INVALID_TID, BackupTid: proto.INVALID_TID,
TruncateTid: proto.INVALID_TID}) TruncateTid: proto.INVALID_TID})
case *proto.AskPartitionTable: case *proto.AskPartitionTable:
// TODO initially read PT from disk // TODO initially read PT from disk
err = req.Reply(&proto.AnswerPartitionTable{ err = req.Reply(&proto.AnswerPartitionTable{
PTid: stor.node.state.PartTab.PTid, PTid: stor.node.Node.State.PartTab.PTid,
NumReplicas: 0, // FIXME hardcoded; NEO/py uses this as n(replica)-1 NumReplicas: 0, // FIXME hardcoded; NEO/py uses this as n(replica)-1
RowList: stor.node.state.PartTab.Dump()}) RowList: stor.node.Node.State.PartTab.Dump()})
case *proto.LockedTransactions: case *proto.LockedTransactions:
// XXX r/o stub // XXX r/o stub
...@@ -309,7 +309,7 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo ...@@ -309,7 +309,7 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo
if idReq.NodeType != proto.CLIENT { if idReq.NodeType != proto.CLIENT {
return &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}, false return &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}, false
} }
if idReq.ClusterName != stor.node.ClusterName { if idReq.ClusterName != stor.node.Node.ClusterName {
return &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}, false return &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}, false
} }
...@@ -323,8 +323,8 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo ...@@ -323,8 +323,8 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo
} }
return &proto.AcceptIdentification{ return &proto.AcceptIdentification{
NodeType: stor.node.myInfo.Type, NodeType: stor.node.Node.MyInfo.Type,
MyNID: stor.node.myInfo.NID, // XXX lock wrt update MyNID: stor.node.Node.MyInfo.NID, // XXX lock wrt update
YourNID: idReq.NID, YourNID: idReq.NID,
}, true }, true
} }
......
...@@ -292,7 +292,7 @@ func (t *tCluster) Master(name string) ITestMaster { ...@@ -292,7 +292,7 @@ func (t *tCluster) Master(name string) ITestMaster {
func (t *tCluster) NewStorage(name, masterAddr string, back storage.Backend) ITestStorage { func (t *tCluster) NewStorage(name, masterAddr string, back storage.Backend) ITestStorage {
tnode := t.registerNewNode(name) tnode := t.registerNewNode(name)
s := tNewStorage(t.name, masterAddr, ":1", tnode.net, back) s := tNewStorage(t.name, masterAddr, ":1", tnode.net, back)
t.gotracer.RegisterNode(s.node, name) t.gotracer.RegisterNode(s.node.Node, name)
t.runWG.Go(func(ctx context.Context) error { t.runWG.Go(func(ctx context.Context) error {
return s.Run(ctx) return s.Run(ctx)
}) })
...@@ -311,7 +311,7 @@ func (t *tCluster) Storage(name string) ITestStorage { ...@@ -311,7 +311,7 @@ func (t *tCluster) Storage(name string) ITestStorage {
func (t *tCluster) NewClient(name, masterAddr string) ITestClient { func (t *tCluster) NewClient(name, masterAddr string) ITestClient {
tnode := t.registerNewNode(name) tnode := t.registerNewNode(name)
c := NewClient(t.name, masterAddr, tnode.net) c := NewClient(t.name, masterAddr, tnode.net)
t.gotracer.RegisterNode(c.node, name) t.gotracer.RegisterNode(c.nodem.Node, name)
t.runWG.Go(func(ctx context.Context) error { t.runWG.Go(func(ctx context.Context) error {
return c.Run(ctx) return c.Run(ctx)
}) })
...@@ -343,7 +343,7 @@ func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, ...@@ -343,7 +343,7 @@ func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker,
} }
func (s *tStorage) Run(ctx context.Context) error { func (s *tStorage) Run(ctx context.Context) error {
l, err := s.node.Net.Listen(ctx, s.serveAddr) l, err := s.node.Node.Net.Listen(ctx, s.serveAddr)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -38,8 +38,7 @@ type TraceCollector struct { ...@@ -38,8 +38,7 @@ type TraceCollector struct {
pg *tracing.ProbeGroup pg *tracing.ProbeGroup
rx interface { RxEvent(interface{}) } rx interface { RxEvent(interface{}) }
// node2Name map[*xneo.Node]string node2Name map[*xneo.Node]string
node2Name map[interface{}]string // XXX -> *_MasteredNode
nodeTab2Owner map[*xneo.NodeTable]string nodeTab2Owner map[*xneo.NodeTable]string
clusterState2Owner map[*proto.ClusterState]string clusterState2Owner map[*proto.ClusterState]string
} }
...@@ -49,7 +48,7 @@ func NewTraceCollector(rx interface { RxEvent(interface{}) }) *TraceCollector { ...@@ -49,7 +48,7 @@ func NewTraceCollector(rx interface { RxEvent(interface{}) }) *TraceCollector {
pg: &tracing.ProbeGroup{}, pg: &tracing.ProbeGroup{},
rx: rx, rx: rx,
node2Name: make(map[interface{}]string), node2Name: make(map[*xneo.Node]string),
nodeTab2Owner: make(map[*xneo.NodeTable]string), nodeTab2Owner: make(map[*xneo.NodeTable]string),
clusterState2Owner: make(map[*proto.ClusterState]string), clusterState2Owner: make(map[*proto.ClusterState]string),
} }
...@@ -78,26 +77,15 @@ func (t *TraceCollector) Detach() { ...@@ -78,26 +77,15 @@ func (t *TraceCollector) Detach() {
// //
// This way it can translate e.g. *NodeTable -> owner node name when creating // This way it can translate e.g. *NodeTable -> owner node name when creating
// corresponding event. // corresponding event.
func (t *TraceCollector) RegisterNode(node /*XXX -> *_MasteredNode*/interface{}, name string) { func (t *TraceCollector) RegisterNode(node *xneo.Node, name string) {
tracing.Lock() tracing.Lock()
defer tracing.Unlock() defer tracing.Unlock()
// XXX verify there is no duplicate names // XXX verify there is no duplicate names
// XXX verify the same pointer is not registerd twice // XXX verify the same pointer is not registerd twice
switch node := node.(type) {
default:
panic(/*bad type*/ node)
case *xneo.Node:
t.node2Name[node] = name
t.nodeTab2Owner[node.NodeTab] = name
t.clusterState2Owner[&node.ClusterState] = name
case *_MasteredNode:
t.node2Name[node] = name t.node2Name[node] = name
t.nodeTab2Owner[node.state.NodeTab] = name t.nodeTab2Owner[node.State.NodeTab] = name
t.clusterState2Owner[&node.state.Code] = name t.clusterState2Owner[&node.State.Code] = name
}
} }
func (t *TraceCollector) TraceNetDial(ev *xnet.TraceDial) { func (t *TraceCollector) TraceNetDial(ev *xnet.TraceDial) {
......
...@@ -54,8 +54,9 @@ type Node struct { ...@@ -54,8 +54,9 @@ type Node struct {
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry MasterAddr string // address of current master TODO -> masterRegistry
// XXX reconsider not using State and have just .NodeTab, .PartTab, .ClusterState
StateMu sync.RWMutex // <- XXX unexport StateMu sync.RWMutex // <- XXX unexport
state ClusterState // nodeTab/partTab/stateCode State ClusterState // nodeTab/partTab/stateCode XXX unexport?
// NodeTab *NodeTable // information about nodes in the cluster // NodeTab *NodeTable // information about nodes in the cluster
// PartTab *PartitionTable // information about data distribution in the cluster // PartTab *PartitionTable // information about data distribution in the cluster
// ClusterState proto.ClusterState // master idea about cluster state // ClusterState proto.ClusterState // master idea about cluster state
...@@ -78,13 +79,13 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA ...@@ -78,13 +79,13 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA
Net: net, Net: net,
MasterAddr: masterAddr, MasterAddr: masterAddr,
state: ClusterState{ State: ClusterState{
NodeTab: &NodeTable{}, NodeTab: &NodeTable{},
PartTab: &PartitionTable{}, PartTab: &PartitionTable{},
Code: -1, // invalid Code: -1, // invalid
}, },
} }
node.state.NodeTab.localNode = node node.State.NodeTab.localNode = node
return node return node
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment