Commit e87de948 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0e6a7bb5
...@@ -255,33 +255,51 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error { ...@@ -255,33 +255,51 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error {
return err return err
} }
c.opMu.Lock()
switch msg := req.Msg.(type) { switch msg := req.Msg.(type) {
default: default:
c.opMu.Unlock()
return fmt.Errorf("unexpected message: %T", msg) return fmt.Errorf("unexpected message: %T", msg)
// M sends whole PT // M sends whole PT
case *neo.NotifyPartitionTable: case *neo.NotifyPartitionTable:
// XXX lock pt := neo.PartTabFromDump(msg.PTid, msg.RowList)
// XXX update operational c.node.PartTab = pt
// M sends δPT // M sends δPT
//case *neo.NotifyPartitionChanges: //case *neo.NotifyPartitionChanges:
// TODO // TODO
case *neo.NotifyNodeInformation: case *neo.NotifyNodeInformation:
// XXX lock
// XXX update operational
// XXX msg.IdTimestamp ? // XXX msg.IdTimestamp ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
c.node.NodeTab.Update(nodeInfo, /*XXX conn should not be here*/nil) c.node.NodeTab.Update(nodeInfo, /*XXX conn should not be here*/nil)
} }
case *neo.NotifyClusterState: case *neo.NotifyClusterState:
// XXX lock
// XXX update operational
c.node.ClusterState.Set(msg.State) c.node.ClusterState.Set(msg.State)
} }
// update .operational + notify those who was waiting for it
operational := c.node.ClusterState == neo.ClusterRunning &&
c.node.PartTab.OperationalWith(c.node.NodeTab)
var opready chan struct{}
if operational != c.operational {
c.operational = operational
if operational {
opready = c.opReady // don't close from under opMu
} else {
c.opReady = make(chan struct{})
}
}
c.opMu.Unlock()
if opready != nil {
close(opready)
}
} }
} }
...@@ -293,13 +311,13 @@ func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error ...@@ -293,13 +311,13 @@ func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error
return err return err
} }
// XXX lock
pt := neo.PartTabFromDump(rpt.PTid, rpt.RowList) pt := neo.PartTabFromDump(rpt.PTid, rpt.RowList)
// XXX pt -> c.node.PartTab ? c.opMu.Lock()
_ = pt c.node.PartTab = pt
c.opMu.Unlock()
/* /*
XXX don't need in init? XXX don't need this in init?
// ask M about last_tid // ask M about last_tid
rlastTxn := neo.AnswerLastTransaction{} rlastTxn := neo.AnswerLastTransaction{}
...@@ -323,6 +341,7 @@ func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error ...@@ -323,6 +341,7 @@ func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error
func (c *Client) LastTid(ctx context.Context) (_ zodb.Tid, err error) { func (c *Client) LastTid(ctx context.Context) (_ zodb.Tid, err error) {
defer xerr.Context(&err, "client: lastTid") defer xerr.Context(&err, "client: lastTid")
// XXX or require full withOperational ?
mlink, err := c.masterLink(ctx) mlink, err := c.masterLink(ctx)
if err != nil { if err != nil {
return 0, err return 0, err
......
...@@ -57,8 +57,8 @@ type NodeCommon struct { ...@@ -57,8 +57,8 @@ type NodeCommon struct {
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of master XXX -> Address ? MasterAddr string // address of master XXX -> Address ?
NodeTab NodeTable // information about nodes in the cluster NodeTab *NodeTable // information about nodes in the cluster
PartTab PartitionTable // information about data distribution in the cluster PartTab *PartitionTable // information about data distribution in the cluster
ClusterState ClusterState // master idea about cluster state ClusterState ClusterState // master idea about cluster state
} }
......
...@@ -332,7 +332,6 @@ type AnswerPartitionTable struct { ...@@ -332,7 +332,6 @@ type AnswerPartitionTable struct {
RowList []RowInfo RowList []RowInfo
} }
// Send whole partition table to update other nodes. PM -> S, C. // Send whole partition table to update other nodes. PM -> S, C.
// XXX py: named also as SendPartitionTable // XXX py: named also as SendPartitionTable
type NotifyPartitionTable struct { type NotifyPartitionTable struct {
......
...@@ -73,6 +73,9 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor ...@@ -73,6 +73,9 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor
ClusterName: cluster, ClusterName: cluster,
Net: net, Net: net,
MasterAddr: masterAddr, MasterAddr: masterAddr,
PartTab: &neo.PartitionTable{}, // empty - TODO read from disk
NodeTab: &neo.NodeTable{},
}, },
zstor: zstor, zstor: zstor,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment