Commit 7e2e54e5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4cec19a6
......@@ -437,8 +437,9 @@ func (nt *NodeTable) notify(nodeInfo NodeInfo) {
}
}
// Subscribe subscribes to NodeTable updates
// it returns a channel via which updates will be delivered and unsubscribe function
// Subscribe subscribes to NodeTable updates.
//
// It returns a channel via which updates will be delivered and function to unsubscribe.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) {
......@@ -459,12 +460,13 @@ func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) {
return ch, unsubscribe
}
// SubscribeBuffered subscribes to NodeTable updates without blocking updater
// it returns a channel via which updates are delivered and unsubscribe function
// the updates will be sent to destination in non-blocking way - if destination
// SubscribeBuffered subscribes to NodeTable updates without blocking updater.
//
// It returns a channel via which updates are delivered and unsubscribe function.
// The updates will be sent to destination in non-blocking way - if destination
// channel is not ready they will be buffered.
// it is the caller responsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown
// It is the caller responsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()) {
......
......@@ -36,7 +36,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/client"
//"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/neo/go/xcommon/tracing"
......@@ -59,11 +59,12 @@ func xwait(w interface { Wait() error }) {
}
func xfs1stor(path string) *fs1.FileStorage {
zstor, err := fs1.Open(context.Background(), path)
zstor, err := fs1.Open(bg, path)
exc.Raiseif(err)
return zstor
}
var bg = context.Background()
// XXX tracer which can collect tracing events from net + TODO master/storage/etc...
// XXX naming
......@@ -212,6 +213,17 @@ func TestMasterStorage(t *testing.T) {
}
}
// shortcut for NodeInfo
nodei := func(typ neo.NodeType, addr string, uuid neo.NodeUUID, state neo.NodeState, idtstamp float64) neo.NodeInfo {
return neo.NodeInfo{
Type: typ,
Addr: xnaddr(addr),
UUID: uuid,
State: state,
IdTimestamp: idtstamp,
}
}
Mhost := xnet.NetTrace(net.Host("m"), tracer)
Shost := xnet.NetTrace(net.Host("s"), tracer)
......@@ -223,7 +235,7 @@ func TestMasterStorage(t *testing.T) {
Mclock := &vclock{}
M := NewMaster("abc1", ":1", Mhost)
M.monotime = Mclock.monotime
Mctx, Mcancel := context.WithCancel(context.Background())
Mctx, Mcancel := context.WithCancel(bg)
gwg.Gox(func() {
err := M.Run(Mctx)
fmt.Println("M err: ", err)
......@@ -240,7 +252,7 @@ func TestMasterStorage(t *testing.T) {
// start storage
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
Sctx, Scancel := context.WithCancel(context.Background())
Sctx, Scancel := context.WithCancel(bg)
gwg.Gox(func() {
err := S.Run(Sctx)
fmt.Println("S err: ", err)
......@@ -317,8 +329,8 @@ func TestMasterStorage(t *testing.T) {
TidDict: nil, // map[zodb.Tid]zodb.Tid{},
}))
lastOid, err1 := zstor.LastOid(context.TODO())
lastTid, err2 := zstor.LastTid(context.TODO())
lastOid, err1 := zstor.LastOid(bg)
lastTid, err2 := zstor.LastTid(bg)
exc.Raiseif(xerr.Merge(err1, err2))
tc.Expect(conntx("m:2", "s:2", 1, &neo.LastIDs{}))
tc.Expect(conntx("s:2", "m:2", 1, &neo.AnswerLastIDs{
......@@ -368,7 +380,15 @@ func TestMasterStorage(t *testing.T) {
YourUUID: neo.UUID(neo.CLIENT, 1),
}))
// XXX C <- M NotifyNodeInformation C1,M1,S1
// C <- M NotifyNodeInformation C1,M1,S1
tc.Expect(conntx("m:3", "c:1", 2, &neo.NotifyNodeInformation{
IdTimestamp: 0, // XXX ?
NodeList: []neo.NodeInfo{
nodei(neo.MASTER, "m:1", neo.UUID(neo.MASTER, 1), neo.RUNNING, 0.00),
nodei(neo.STORAGE, "s:1", neo.UUID(neo.STORAGE, 1), neo.RUNNING, 0.01),
nodei(neo.STORAGE, "", neo.UUID(neo.CLIENT, 1), neo.RUNNING, 0.02),
},
}))
// C asks M about PT
tc.Expect(conntx("c:1", "m:3", 3, &neo.AskPartitionTable{}))
......@@ -380,9 +400,27 @@ func TestMasterStorage(t *testing.T) {
}))
// C asks M about last tid XXX better master sends it itself on new client connected
wg = &xsync.WorkGroup{}
wg.Gox(func() {
cLastTid, err := C.LastTid(bg)
exc.Raiseif(err)
if cLastTid != lastTid {
exc.Raisef("C.LastTid -> %v ; want %v", cLastTid, lastTid)
}
})
tc.Expect(conntx("c:1", "m:3", 5, &neo.LastTransaction{}))
tc.Expect(conntx("m:3", "c:1", 5, &neo.AnswerLastTransaction{
Tid: lastTid,
}))
// C starts loading first object
data, serial, err := C.Load(bg, zodb.Xid{Oid: 1, XTid: zodb.XTid{Tid: zodb.TidMax, TidBefore: true}})
_, _, _ = data, serial, err
_ = C
......@@ -414,7 +452,7 @@ func _TestClientStorage(t *testing.T) {
Cnl, Snl := NodeLinkPipe()
wg := &xsync.WorkGroup{}
Sctx, Scancel := context.WithCancel(context.Background())
Sctx, Scancel := context.WithCancel(bg)
net := pipenet.New("") // XXX here? (or a bit above?)
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") // XXX +readonly
......
......@@ -28,6 +28,8 @@ import (
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/neo"
......@@ -45,14 +47,15 @@ import (
type Master struct {
node neo.NodeCommon
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
// last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ?
// TODO mu
lastOid zodb.Oid
lastTid zodb.Tid
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
// channels controlling main driver
ctlStart chan chan error // request to start cluster
ctlStop chan chan struct{} // request to stop cluster
......@@ -911,25 +914,34 @@ func storCtlService(ctx context.Context, stor *neo.Node) (err error) {
func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) {
defer task.Runningf(&ctx, "%s: client service", cli.Link.RemoteAddr())(&err)
wg, ctx := errgroup.WithContext(ctx)
clink := cli.Link
defer xio.CloseWhenDone(ctx, clink)()
// XXX spawn M -> S notifications about cluster state
// M -> C notifications about cluster state
wg.Go(func() error {
//return m.notifyPeer(ctx, clink) // XXX -> keepPeerUpdated?
return nil
})
for {
req, err := clink.Recv1()
if err != nil {
return err
}
// M <- C requests handler
wg.Go(func() error {
for {
req, err := clink.Recv1()
if err != nil {
return err
}
resp := m.serveClient1(ctx, req.Msg)
err = req.Reply(resp)
if err != nil {
return err
resp := m.serveClient1(ctx, req.Msg)
err = req.Reply(resp)
if err != nil {
return err
}
}
}
})
return nil
return wg.Wait()
}
// serveClient1 prepares response for 1 request from client
......@@ -944,6 +956,9 @@ func (m *Master) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
m.node.StateMu.RUnlock()
return rpt
case *neo.LastTransaction:
// XXX lock
return &neo.AnswerLastTransaction{m.lastTid}
default:
return &neo.Error{neo.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req)}
......
......@@ -52,7 +52,7 @@ type Storage struct {
// data/
// 1 inbox/ (commit queues)
// 2 ? (data.fs)
// 3. packed/
// 3 packed/
zstor zodb.IStorage // underlying ZODB storage XXX -> directly work with fs1 & friends
}
......
......@@ -44,6 +44,8 @@ type Xid struct {
Oid
}
// XXX add XidBefore() and XidSerial() as syntax convenience?
const (
//Tid0 Tid = 0 // XXX -> simply Tid(0) ?
TidMax Tid = 1<<63 - 1 // 0x7fffffffffffffff
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment