Commit e050e32d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 50098921
......@@ -47,10 +47,27 @@ type Client struct {
talkMasterCancel func()
// link to master - established and maintained by talkMaster
// link to master - established and maintained by talkMaster.
// users retrieve it via masterLink.
mlinkMu sync.Mutex
mlink *neo.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle
// operational state - maintained by recvMaster.
// users retrieve it via withOperational.
//
// NOTE being operational means:
// - link to master established and is ok
// - .PartTab is operational wrt .NodeTab
// - .ClusterState = RUNNING <- XXX needed?
//
// however master link is accessed separately (see ^^^ and masterLink)
opMu sync.RWMutex
// node.NodeTab
// node.PartTab
// XXX + node.ClusterState
operational bool
opReady chan struct{} // reinitialized each time state becomes non-operational
}
var _ zodb.IStorage = (*Client)(nil)
......@@ -94,14 +111,6 @@ func (c *Client) Close() error {
// --- connection with master ---
/*
// mconnected is result of connecting to master
type mconnected struct {
mlink *neo.NodeLink
ready chan struct{}
}
*/
// masterLink returns link to primary master.
//
// NOTE that even if masterLink returns != nil, the master link can become
......@@ -128,6 +137,31 @@ func (c *Client) masterLink(ctx context.Context) (*neo.NodeLink, error) {
}
}
// withOperational waits for cluster state to be operational.
//
// If successful it returns with operational state RLocked (c.opMu).
//
// the only error possible is if provided ctx cancel.
func (c *Client) withOperational(ctx context.Context) error {
for {
c.opMu.RLock()
if c.operational {
return nil
}
ready := c.opReady
c.opMu.RUnlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-ready:
// ok - try to relock and read again.
}
}
}
// talkMaster connects to master, announces self and receives notifications.
// it tries to persist master link reconnecting as needed.
//
......@@ -204,36 +238,48 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
})
return wg.Wait()
}
// recvMaster receives and handles notifications from master
func (c *Client) recvMaster(ctx context.Context, Mlink *neo.NodeLink) error {
func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error {
// XXX .nodeTab.Reset()
for {
req, err := Mlink.Recv1()
req, err := mlink.Recv1()
if err != nil {
return err
}
err = req.Close()
if err != nil {
return err
}
msg := req.Msg
switch msg.(type) {
switch msg := req.Msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
//case *neo.NotifyPartitionTable:
// // TODO M sends whole PT
// M sends whole PT
case *neo.NotifyPartitionTable:
// XXX lock
// XXX update operational
// M sends δPT
//case *neo.NotifyPartitionChanges:
// // TODO M sends δPT
// TODO
case *neo.NotifyNodeInformation:
// TODO
// XXX lock
// XXX update operational
// XXX msg.IdTimestamp ?
for _, nodeInfo := range msg.NodeList {
c.node.NodeTab.Update(nodeInfo, /*XXX conn should not be here*/nil)
}
case *neo.NotifyClusterState:
// TODO
// XXX lock
// XXX update operational
c.node.ClusterState.Set(msg.State)
}
}
}
......@@ -281,6 +327,8 @@ func (c *Client) LastTid(ctx context.Context) (_ zodb.Tid, err error) {
return 0, err
}
// XXX mlink can become down while we are making the call.
// XXX do we want to return error or retry?
reply := neo.AnswerLastTransaction{}
err = mlink.Ask1(&neo.LastTransaction{}, &reply) // XXX Ask += ctx
if err != nil {
......@@ -296,9 +344,14 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
// XXX err context (but keep zodb errors intact ?)
defer xerr.Context(&err, "client: load")
err = c.withOperational(ctx)
if err != nil {
return nil, 0, err
}
// XXX check pt is operational first? -> no if there is no data - we'll
// just won't find ready cell
// XXX wait pt is operational first
//
// XXX or better still check first M told us ok to go? (ClusterState=RUNNING)
//if c.node.ClusterState != ClusterRunning {
......@@ -317,12 +370,11 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo
}
// XXX check stor.State == RUNNING -> in link
Sconn := stor.Conn // XXX temp stub
//Sconn, err := stor.Conn()
slink := stor.Link // XXX temp stub
//slink, err := stor.Link()
if err != nil {
return nil, 0, err // XXX err ctx
}
defer lclose(ctx, Sconn)
req := neo.GetObject{Oid: xid.Oid}
if xid.TidBefore {
......@@ -334,21 +386,21 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo
}
resp := neo.AnswerGetObject{}
err = Sconn.Ask(&req, &resp)
err = slink.Ask1(&req, &resp)
if err != nil {
return nil, 0, err // XXX err context
}
checksum := sha1.Sum(data)
if checksum != resp.Checksum {
// XXX data corrupt
panic("TODO") // XXX data corrupt
}
data = resp.Data
if resp.Compression {
data, err = decompress(resp.Data, make([]byte, 0, len(resp.Data)))
if err != nil {
// XXX data corrupt
panic("TODO") // XXX data corrupt
}
}
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// XXX goes away - we don't need it.
package neo
// cluster state XXX
// ClusterInfo represents information about state and participants of a NEO cluster
//
// Usually ClusterInfo is Master's idea about the cluster which Master shares
// with other nodes. XXX text ok?
//
// XXX naming -> ClusterState ? (but conflict with proto.ClusterState)
type ClusterInfo struct {
State ClusterState // what is cluster currently doing: recovering/verification/service/...
NodeTab NodeTable // nodes participating in the cluster
PartTab PartitionTable // data space partitioning
// XXX do we want to put data movement scheduling plans here ?
}
......@@ -51,7 +51,7 @@ const (
// NodeCommon is common data in all NEO nodes: Master, Storage & Client XXX text
// XXX naming -> Node ?
type NodeCommon struct {
MyInfo NodeInfo // XXX -> only NodeUUID
MyInfo NodeInfo
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
......@@ -59,6 +59,7 @@ type NodeCommon struct {
NodeTab NodeTable // information about nodes in the cluster
PartTab PartitionTable // information about data distribution in the cluster
ClusterState ClusterState // master idea about cluster state
}
// Dial connects to another node in the cluster
......
......@@ -122,7 +122,7 @@ type PartitionTable struct {
// Cell describes one storage in a pid entry in partition table
type Cell struct {
CellInfo
CellInfo // .uuid + .state
// XXX ? + .haveUpToTid associated node has data up to such tid
// = uptodate if haveUpToTid == lastTid
......
......@@ -57,7 +57,6 @@ type Master struct {
partTab *neo.PartitionTable // XXX ^ is also in node
clusterState neo.ClusterState
//*/
clusterInfo neo.ClusterInfo
// channels controlling main driver
ctlStart chan chan error // request to start cluster
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment