Commit 050a3de8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e050e32d
......@@ -29,8 +29,8 @@ import (
"sync"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/neo"
......@@ -139,9 +139,10 @@ func (c *Client) masterLink(ctx context.Context) (*neo.NodeLink, error) {
// withOperational waits for cluster state to be operational.
//
// If successful it returns with operational state RLocked (c.opMu).
// If successful it returns with operational state RLocked (c.opMu) and
// unlocked otherwise.
//
// the only error possible is if provided ctx cancel.
// The only error possible is if provided ctx cancel.
func (c *Client) withOperational(ctx context.Context) error {
for {
c.opMu.RLock()
......@@ -351,24 +352,28 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo
return nil, 0, err
}
// XXX wait pt is operational first
//
// XXX or better still check first M told us ok to go? (ClusterState=RUNNING)
//if c.node.ClusterState != ClusterRunning {
// return nil, 0, &Error{NOT_READY, "cluster not operational"}
//}
cellv := c.node.PartTab.Get(xid.Oid)
// XXX cellv = filter(cellv, UP_TO_DATE)
if len(cellv) == 0 {
return nil, 0, fmt.Errorf("no storages alive for oid %v", xid.Oid) // XXX err ctx
// Here we have cluster state operational and rlocked. Retrieve
// storages we might need to access and release the lock.
storv := make([]*neo.Node, 0)
for _, cell := range c.node.PartTab.Get(xid.Oid) {
if cell.Readable() {
stor := c.node.NodeTab.Get(cell.UUID)
// this storage might not yet come up
if stor != nil && stor.State == neo.RUNNING {
storv = append(storv, stor)
}
}
}
cell := cellv[rand.Intn(len(cellv))]
stor := c.node.NodeTab.Get(cell.NodeUUID)
if stor == nil {
return nil, 0, fmt.Errorf("storage %v not yet known", cell.NodeUUID) // XXX err ctx
c.opMu.RUnlock()
if len(storv) == 0 {
// XXX recheck it adds traceback to log
return nil, 0, errors.Errorf("internal inconsistency: cluster is operational, but no storages alive for oid %v", xid.Oid)
}
// XXX check stor.State == RUNNING -> in link
// XXX vvv temp stub -> TODO pick up 3 random storages and send load
// requests to them all getting the first who is the fastest to reply.
stor := storv[rand.Intn(len(storv))]
slink := stor.Link // XXX temp stub
//slink, err := stor.Link()
......
......@@ -145,6 +145,15 @@ func (pt *PartitionTable) Get(oid zodb.Oid) []Cell {
return pt.tab[pid]
}
// Readable reports whether it is ok to read data from a cell
func (c *Cell) Readable() bool {
switch c.State {
case UP_TO_DATE, FEEDING:
return true
}
return false
}
// MakePartTab creates new partition with uniformly distributed nodes
// The partition table created will be of len=np
// FIXME R=1 hardcoded
......@@ -180,15 +189,14 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
ok := false
cellLoop:
for _, cell := range ptEntry {
switch cell.CellState {
case UP_TO_DATE, FEEDING: // XXX cell.isReadable in py
if cell.Readable() {
// cell says it is readable. let's check whether corresponding node is up
// FIXME checking whether it is up is not really enough -
// - what is needed to check is that data on that node is up
// to last_tid.
//
// We leave it as is for now.
node := nt.Get(cell.NodeUUID)
node := nt.Get(cell.UUID)
if node == nil || node.State != RUNNING { // XXX PENDING is also ok ?
continue
}
......
......@@ -239,8 +239,8 @@ type NodeInfo struct {
}
type CellInfo struct {
NodeUUID
CellState
UUID NodeUUID
State CellState
}
type RowInfo struct {
......
......@@ -119,16 +119,16 @@ func (p *CellInfo) neoMsgEncodedLen() int {
}
func (p *CellInfo) neoMsgEncode(data []byte) {
binary.BigEndian.PutUint32(data[0:], uint32(int32(p.NodeUUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32(p.CellState)))
binary.BigEndian.PutUint32(data[0:], uint32(int32(p.UUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32(p.State)))
}
func (p *CellInfo) neoMsgDecode(data []byte) (int, error) {
if uint32(len(data)) < 8 {
goto overflow
}
p.NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
p.CellState = CellState(int32(binary.BigEndian.Uint32(data[4:])))
p.UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
p.State = CellState(int32(binary.BigEndian.Uint32(data[4:])))
return 8, nil
overflow:
......@@ -153,8 +153,8 @@ func (p *RowInfo) neoMsgEncode(data []byte) {
data = data[8:]
for i := 0; uint32(i) < l; i++ {
a := &p.CellList[i]
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).NodeUUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).CellState)))
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).UUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).State)))
data = data[8:]
}
}
......@@ -176,8 +176,8 @@ func (p *RowInfo) neoMsgDecode(data []byte) (int, error) {
p.CellList = make([]CellInfo, l)
for i := 0; uint32(i) < l; i++ {
a := &p.CellList[i]
(*a).NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).CellState = CellState(int32(binary.BigEndian.Uint32(data[4:])))
(*a).UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).State = CellState(int32(binary.BigEndian.Uint32(data[4:])))
data = data[8:]
}
}
......@@ -614,8 +614,8 @@ func (p *AnswerPartitionTable) neoMsgEncode(data []byte) {
data = data[8:]
for i := 0; uint32(i) < l; i++ {
a := &(*a).CellList[i]
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).NodeUUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).CellState)))
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).UUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).State)))
data = data[8:]
}
}
......@@ -650,8 +650,8 @@ func (p *AnswerPartitionTable) neoMsgDecode(data []byte) (int, error) {
(*a).CellList = make([]CellInfo, l)
for i := 0; uint32(i) < l; i++ {
a := &(*a).CellList[i]
(*a).NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).CellState = CellState(int32(binary.BigEndian.Uint32(data[4:])))
(*a).UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).State = CellState(int32(binary.BigEndian.Uint32(data[4:])))
data = data[8:]
}
}
......@@ -694,8 +694,8 @@ func (p *NotifyPartitionTable) neoMsgEncode(data []byte) {
data = data[8:]
for i := 0; uint32(i) < l; i++ {
a := &(*a).CellList[i]
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).NodeUUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).CellState)))
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).UUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).State)))
data = data[8:]
}
}
......@@ -730,8 +730,8 @@ func (p *NotifyPartitionTable) neoMsgDecode(data []byte) (int, error) {
(*a).CellList = make([]CellInfo, l)
for i := 0; uint32(i) < l; i++ {
a := &(*a).CellList[i]
(*a).NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).CellState = CellState(int32(binary.BigEndian.Uint32(data[4:])))
(*a).UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).State = CellState(int32(binary.BigEndian.Uint32(data[4:])))
data = data[8:]
}
}
......@@ -763,8 +763,8 @@ func (p *NotifyPartitionChanges) neoMsgEncode(data []byte) {
for i := 0; uint32(i) < l; i++ {
a := &p.CellList[i]
binary.BigEndian.PutUint32(data[0:], (*a).Offset)
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).CellInfo.NodeUUID)))
binary.BigEndian.PutUint32(data[8:], uint32(int32((*a).CellInfo.CellState)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).CellInfo.UUID)))
binary.BigEndian.PutUint32(data[8:], uint32(int32((*a).CellInfo.State)))
data = data[12:]
}
}
......@@ -790,8 +790,8 @@ func (p *NotifyPartitionChanges) neoMsgDecode(data []byte) (int, error) {
for i := 0; uint32(i) < l; i++ {
a := &p.CellList[i]
(*a).Offset = binary.BigEndian.Uint32(data[0:])
(*a).CellInfo.NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[4:])))
(*a).CellInfo.CellState = CellState(int32(binary.BigEndian.Uint32(data[8:])))
(*a).CellInfo.UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[4:])))
(*a).CellInfo.State = CellState(int32(binary.BigEndian.Uint32(data[8:])))
data = data[12:]
}
}
......@@ -2461,8 +2461,8 @@ func (p *AnswerPartitionList) neoMsgEncode(data []byte) {
data = data[8:]
for i := 0; uint32(i) < l; i++ {
a := &(*a).CellList[i]
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).NodeUUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).CellState)))
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).UUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32((*a).State)))
data = data[8:]
}
}
......@@ -2497,8 +2497,8 @@ func (p *AnswerPartitionList) neoMsgDecode(data []byte) (int, error) {
(*a).CellList = make([]CellInfo, l)
for i := 0; uint32(i) < l; i++ {
a := &(*a).CellList[i]
(*a).NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).CellState = CellState(int32(binary.BigEndian.Uint32(data[4:])))
(*a).UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[0:])))
(*a).State = CellState(int32(binary.BigEndian.Uint32(data[4:])))
data = data[8:]
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment