Commit 4cec19a6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e993689f
......@@ -53,7 +53,7 @@ type Client struct {
mlink *neo.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle
// operational state - maintained by recvMaster.
// operational state in node is maintained by recvMaster.
// users retrieve it via withOperational.
//
// NOTE being operational means:
......@@ -62,11 +62,7 @@ type Client struct {
// - .ClusterState = RUNNING <- XXX needed?
//
// however master link is accessed separately (see ^^^ and masterLink)
opMu sync.RWMutex
// node.NodeTab
// node.PartTab
// XXX + node.ClusterState
operational bool
operational bool // XXX <- somehow move to NodeCommon?
opReady chan struct{} // reinitialized each time state becomes non-operational
}
......@@ -139,19 +135,19 @@ func (c *Client) masterLink(ctx context.Context) (*neo.NodeLink, error) {
// withOperational waits for cluster state to be operational.
//
// If successful it returns with operational state RLocked (c.opMu) and
// If successful it returns with operational state RLocked (c.node.StateMu) and
// unlocked otherwise.
//
// The only error possible is if provided ctx cancel.
func (c *Client) withOperational(ctx context.Context) error {
for {
c.opMu.RLock()
c.node.StateMu.RLock()
if c.operational {
return nil
}
ready := c.opReady
c.opMu.RUnlock()
c.node.StateMu.RUnlock()
select {
case <-ctx.Done():
......@@ -255,11 +251,11 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error {
return err
}
c.opMu.Lock()
c.node.StateMu.Lock()
switch msg := req.Msg.(type) {
default:
c.opMu.Unlock()
c.node.StateMu.Unlock()
return fmt.Errorf("unexpected message: %T", msg)
// M sends whole PT
......@@ -289,13 +285,13 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error {
if operational != c.operational {
c.operational = operational
if operational {
opready = c.opReady // don't close from under opMu
opready = c.opReady // don't close from under StateMu
} else {
c.opReady = make(chan struct{})
}
}
c.opMu.Unlock()
c.node.StateMu.Unlock()
if opready != nil {
close(opready)
......@@ -312,9 +308,9 @@ func (c *Client) initFromMaster(ctx context.Context, Mlink *neo.NodeLink) error
}
pt := neo.PartTabFromDump(rpt.PTid, rpt.RowList)
c.opMu.Lock()
c.node.StateMu.Lock()
c.node.PartTab = pt
c.opMu.Unlock()
c.node.StateMu.Unlock()
/*
XXX don't need this in init?
......@@ -383,7 +379,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo
}
}
}
c.opMu.RUnlock()
c.node.StateMu.RUnlock()
if len(storv) == 0 {
// XXX recheck it adds traceback to log
......
......@@ -31,6 +31,7 @@ import (
"context"
"fmt"
"net"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
......@@ -50,6 +51,7 @@ const (
// NodeCommon is common data in all NEO nodes: Master, Storage & Client XXX text
// XXX naming -> Node ?
// XXX -> internal?
type NodeCommon struct {
MyInfo NodeInfo
ClusterName string
......@@ -57,6 +59,7 @@ type NodeCommon struct {
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of master XXX -> Address ?
StateMu sync.RWMutex // <- XXX just embed?
NodeTab *NodeTable // information about nodes in the cluster
PartTab *PartitionTable // information about data distribution in the cluster
ClusterState ClusterState // master idea about cluster state
......
......@@ -53,14 +53,6 @@ type Master struct {
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
// XXX dup from .node - kill here
/*
stateMu sync.RWMutex // XXX recheck: needed ?
nodeTab *neo.NodeTable
partTab *neo.PartitionTable
clusterState neo.ClusterState
*/
// channels controlling main driver
ctlStart chan chan error // request to start cluster
ctlStop chan chan struct{} // request to stop cluster
......@@ -242,6 +234,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
defer task.Running(&ctx, "main")(&err)
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
// XXX however since clients request state reading we should use node.StateMu
for ctx.Err() == nil {
// recover partition table from storages and wait till enough
......@@ -943,11 +936,12 @@ func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) {
func (m *Master) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
switch req := req.(type) {
case *neo.AskPartitionTable:
// XXX lock
m.node.StateMu.RLock()
rpt := &neo.AnswerPartitionTable{
PTid: m.node.PartTab.PTid,
RowList: m.node.PartTab.Dump(),
}
m.node.StateMu.RUnlock()
return rpt
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment