Commit 0f310080 authored by Kirill Smelkov's avatar Kirill Smelkov

X fix Client.withOperational - was not working properly because initial .opReady was nil

parent ae1b31de
...@@ -62,6 +62,8 @@ type Client struct { ...@@ -62,6 +62,8 @@ type Client struct {
// - .ClusterState = RUNNING <- XXX needed? // - .ClusterState = RUNNING <- XXX needed?
// //
// however master link is accessed separately (see ^^^ and masterLink) // however master link is accessed separately (see ^^^ and masterLink)
//
// protected by .node.StateMu
operational bool // XXX <- somehow move to NodeApp? operational bool // XXX <- somehow move to NodeApp?
opReady chan struct{} // reinitialized each time state becomes non-operational opReady chan struct{} // reinitialized each time state becomes non-operational
} }
...@@ -77,8 +79,10 @@ func (c *Client) StorageName() string { ...@@ -77,8 +79,10 @@ func (c *Client) StorageName() string {
// It will connect to master @masterAddr and identify with sepcified cluster name. // It will connect to master @masterAddr and identify with sepcified cluster name.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
cli := &Client{ cli := &Client{
node: neo.NewNodeApp(net, neo.CLIENT, clusterName, masterAddr, ""), node: neo.NewNodeApp(net, neo.CLIENT, clusterName, masterAddr, ""),
mlinkReady: make(chan struct{}), mlinkReady: make(chan struct{}),
operational: false,
opReady: make(chan struct{}),
} }
// spawn background process which performs master talk // spawn background process which performs master talk
...@@ -125,6 +129,42 @@ func (c *Client) masterLink(ctx context.Context) (*neo.NodeLink, error) { ...@@ -125,6 +129,42 @@ func (c *Client) masterLink(ctx context.Context) (*neo.NodeLink, error) {
} }
} }
// updateOperational updates .operational from current state.
//
// Must be called with .node.StateMu lock held.
//
// Returned sendReady func must be called by updateOperational caller after
// .node.StateMu lock is released - it will close current .opReady this way
// notifying .operational waiters.
//
// XXX move somehow -> NodeApp?
func (c *Client) updateOperational() (sendReady func()) {
// XXX py client does not wait for cluster state = running
operational := // c.node.ClusterState == neo.ClusterRunning &&
c.node.PartTab.OperationalWith(c.node.NodeTab)
//fmt.Printf("\nupdateOperatinal: %v\n", operational)
//fmt.Println(c.node.PartTab)
//fmt.Println(c.node.NodeTab)
var opready chan struct{}
if operational != c.operational {
c.operational = operational
if operational {
opready = c.opReady // don't close from under StateMu
} else {
c.opReady = make(chan struct{}) // remake for next operational waiters
}
}
return func() {
if opready != nil {
//fmt.Println("updateOperational - notifying %v\n", opready)
close(opready)
}
}
}
// withOperational waits for cluster state to be operational. // withOperational waits for cluster state to be operational.
// //
// If successful it returns with operational state RLocked (c.node.StateMu) and // If successful it returns with operational state RLocked (c.node.StateMu) and
...@@ -141,6 +181,8 @@ func (c *Client) withOperational(ctx context.Context) error { ...@@ -141,6 +181,8 @@ func (c *Client) withOperational(ctx context.Context) error {
ready := c.opReady ready := c.opReady
c.node.StateMu.RUnlock() c.node.StateMu.RUnlock()
//fmt.Printf("withOperational - waiting on %v\n", ready)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
...@@ -266,25 +308,9 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) (err error ...@@ -266,25 +308,9 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) (err error
} }
// update .operational + notify those who was waiting for it // update .operational + notify those who was waiting for it
// XXX py client does not wait for cluster state = running opready := c.updateOperational()
operational := // c.node.ClusterState == neo.ClusterRunning &&
c.node.PartTab.OperationalWith(c.node.NodeTab)
var opready chan struct{}
if operational != c.operational {
c.operational = operational
if operational {
opready = c.opReady // don't close from under StateMu
} else {
c.opReady = make(chan struct{})
}
}
c.node.StateMu.Unlock() c.node.StateMu.Unlock()
opready()
if opready != nil {
close(opready)
}
} }
} }
...@@ -302,7 +328,9 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neo.NodeLink) (err e ...@@ -302,7 +328,9 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neo.NodeLink) (err e
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt) log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock() c.node.StateMu.Lock()
c.node.PartTab = pt c.node.PartTab = pt
opready := c.updateOperational()
c.node.StateMu.Unlock() c.node.StateMu.Unlock()
opready()
/* /*
XXX don't need this in init? XXX don't need this in init?
......
...@@ -164,7 +164,7 @@ func MakePartTab(np int, nodev []*Node) *PartitionTable { ...@@ -164,7 +164,7 @@ func MakePartTab(np int, nodev []*Node) *PartitionTable {
for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) { for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) {
node := nodev[j] node := nodev[j]
// XXX assert node.State > DOWN // XXX assert node.State > DOWN
fmt.Printf("tab[%d] <- %v\n", i, node.UUID) //fmt.Printf("tab[%d] <- %v\n", i, node.UUID)
tab[i] = []Cell{{CellInfo: CellInfo{node.UUID, UP_TO_DATE /*XXX ok?*/}}} tab[i] = []Cell{{CellInfo: CellInfo{node.UUID, UP_TO_DATE /*XXX ok?*/}}}
} }
...@@ -261,6 +261,7 @@ func (pt *PartitionTable) Dump() []RowInfo { // XXX also include .ptid? -> struc ...@@ -261,6 +261,7 @@ func (pt *PartitionTable) Dump() []RowInfo { // XXX also include .ptid? -> struc
func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable { func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable {
// reconstruct partition table from response // reconstruct partition table from response
pt := &PartitionTable{} pt := &PartitionTable{}
pt.PTid = ptid
for _, row := range rowv { for _, row := range rowv {
i := row.Offset i := row.Offset
......
...@@ -457,7 +457,6 @@ loop2: ...@@ -457,7 +457,6 @@ loop2:
// if we are starting for new cluster - create partition table // if we are starting for new cluster - create partition table
if m.node.PartTab.PTid == 0 { if m.node.PartTab.PTid == 0 {
log.Infof(ctx, "creating new partition table")
// XXX -> m.nodeTab.StorageList(State > DOWN) // XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*neo.Node{} storv := []*neo.Node{}
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
...@@ -467,6 +466,7 @@ loop2: ...@@ -467,6 +466,7 @@ loop2:
} }
m.node.PartTab = neo.MakePartTab(1 /* XXX hardcoded */, storv) m.node.PartTab = neo.MakePartTab(1 /* XXX hardcoded */, storv)
m.node.PartTab.PTid = 1 m.node.PartTab.PTid = 1
log.Infof(ctx, "creating new partition table: %s", m.node.PartTab)
} }
return nil return nil
......
...@@ -59,7 +59,7 @@ func NewSyncTracer() *SyncTracer { ...@@ -59,7 +59,7 @@ func NewSyncTracer() *SyncTracer {
// Trace1 sends message with 1 tracing event to a consumer and waits for ack // Trace1 sends message with 1 tracing event to a consumer and waits for ack
func (st *SyncTracer) Trace1(event interface{}) { func (st *SyncTracer) Trace1(event interface{}) {
ack := make(chan struct{}) ack := make(chan struct{})
fmt.Printf("trace: send: %T %v\n", event, event) //fmt.Printf("trace: send: %T %v\n", event, event)
st.tracech <- &SyncTraceMsg{event, ack} st.tracech <- &SyncTraceMsg{event, ack}
<-ack <-ack
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment