Commit db7980a9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 07798a21
......@@ -32,7 +32,6 @@ import (
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
......@@ -270,6 +269,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
err = &zodb.OpError{URL: c.URL(), Op: "sync", Args: nil, Err: err}
}
}()
// defer task.Runningf(&ctx, "%s: zsync", c.nid)(&err) ... // XXX enable
err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
// XXX mlink can become down while we are making the call.
......@@ -294,6 +294,8 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
}
}()
// defer task.Runningf(&ctx, "%s: zload %s", c.nid, xid)(&err) // XXX enable
// Retrieve storages we might need to access.
storv := make([]*xneo.PeerNode, 0, 1)
err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error {
......@@ -390,7 +392,7 @@ func (c *Client) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxn
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, _ zodb.Tid, err error) {
// neo(s)://[credentials@]master1,master2,...,masterN/name?options
defer xerr.Contextf(&err, "neo: open %s", u)
defer task.Runningf(&ctx, "neo: open %s", u)(&err)
var ssl bool
switch u.Scheme {
......
......@@ -66,7 +66,7 @@ import (
type Master struct {
node *xneo.Node
// whole Runs runs under runCtx
// whole Run runs under runCtx
runCtx context.Context
// "global" workgroup under which main, accept and tasks, that should
......@@ -219,8 +219,11 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
defer cancel() // so that .runCtx is canceled if we return due to an error
// XXX ^^^ not needed - we first must wait for all spawned subtasks
mynid := m.allocNID(proto.MASTER)
addr := l.Addr()
defer task.Runningf(&ctx, "master(%v)", addr)(&err)
defer task.Runningf(&ctx, "%s", mynid)(&err)
log.Infof(ctx, "listening on %s ...", addr)
m.runCtx = ctx
m.mainWG = xsync.NewWorkGroup(m.runCtx)
......@@ -234,7 +237,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
m.node.MyInfo = proto.NodeInfo{
Type: proto.MASTER,
Addr: naddr,
NID: m.allocNID(proto.MASTER),
NID: mynid,
State: proto.RUNNING,
IdTime: proto.IdTimeNone, // XXX ok?
}
......@@ -593,7 +596,7 @@ loop2:
// it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, stor *_MasteredPeer) (_ *xneo.PartitionTable, err error) {
slink := stor.node.Link()
defer task.Runningf(&ctx, "%s: stor recovery", stor.node.NID)(&err)
defer task.Runningf(&ctx, "%s recovery", stor.node.NID)(&err)
// XXX cancel on ctx
// XXX close slink on err? (if yes -> xcontext.WithCloseOnErrCancel)
......@@ -787,7 +790,7 @@ func storCtlVerify(ctx context.Context, stor *_MasteredPeer, pt *xneo.PartitionT
// XXX cancel on ctx -> = ^^^
slink := stor.node.Link()
defer task.Runningf(&ctx, "%s: stor verify", stor.node.NID)(&err)
defer task.Runningf(&ctx, "%s verify", stor.node.NID)(&err)
lastOid = zodb.InvalidOid
lastTid = zodb.InvalidTid
......@@ -959,7 +962,7 @@ func (m *Master) serve(ctx context.Context) (err error) {
// storCtlServe drives a storage node during cluster serve state
func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) {
defer task.Runningf(&ctx, "%s: stor serve", stor.node.NID)(&err)
defer task.Runningf(&ctx, "%s serve", stor.node.NID)(&err)
slink := stor.node.Link()
// XXX current neo/py does StartOperation / NotifyReady as separate
......@@ -1120,7 +1123,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
return &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}
}
if nid == 0 {
if nid == 0 || nid == proto.NID(n.idReq.NodeType, 0) /* XXX <- stub for "temp" check */ {
nid = m.allocNID(nodeType)
}
// XXX nid < 0 (temporary) -> reallocate if conflict ?
......@@ -1297,7 +1300,7 @@ func (p *_MasteredPeer) run(ctx context.Context, f func() error) error {
// notify proxies δnodeTab/δpeerTab/δClusterState update to the peer.
// XXX merge into m.acceptPeer ?
func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "notify")(&err)
defer task.Runningf(&ctx, "notify %s", p.node.NID)(&err)
stateCode := p.state0.Code
......
......@@ -24,7 +24,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
......@@ -132,19 +131,9 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
//
// XXX -> FollowMaster? AdhereMaster?
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// me0 describes local node when it starts connecting to master, e.g. 'client C?'.
// we don't use just NID because it is initially 0 and because master can tell us to change it.
me0 := strings.ToLower(node.MyInfo.Type.String())
me0 += " "
mynid0 := node.MyInfo.NID
if mynid0 == 0 {
me0 += "?"
} else {
me0 += mynid0.String()
}
// start logging with initial NID (that might be temporary, and which master can tell us to change)
ctx0 := ctx
defer task.Runningf(&ctx, "%s: talk master(%s)", me0, node.MasterAddr)(&err)
defer task.Runningf(&ctx, "%s: talk master(%s)", node.MyInfo.NID, node.MasterAddr)(&err)
for {
node.updateOperational(func() {
......@@ -188,13 +177,13 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
return xcontext.WithCloseOnErrCancel(ctx, mlink, func() (err error) {
if accept.YourNID != node.MyInfo.NID {
log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID)
node.MyInfo.NID = accept.YourNID // XXX locking ?
node.MyInfo.NID = accept.YourNID // XXX locking ? -> opMu ?
}
// XXX verify Mnid = M*; our nid corresponds to our type
// rebuild nicer task now - when we know both our and master NIDs
// e.g. "client ?: talk master(127.0.0.1:21484)" -> "C1: talk M1".
// e.g. "C?: talk master(127.0.0.1:21484)" -> "C1: talk M1".
ctx := ctxPreTalkM
defer task.Runningf(&ctx, "%s: talk %s", accept.YourNID, accept.MyNID)(&err)
......@@ -330,14 +319,12 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
node.updateOperational(func() {
switch msg := msg.(type) {
default:
node.opMu.Unlock()
panic(fmt.Sprintf("unexpected message: %T", msg))
// <- whole partTab
case *proto.SendPartitionTable:
δpt = true
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt)
node.State.PartTab = pt
......@@ -413,7 +400,7 @@ func (node *_MasteredNode) WithOperational(ctx context.Context, f func(mlink *ne
select {
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("wait operational: %w", ctx.Err())
case <-ready:
// ok - try to relock and read again.
......
......@@ -146,7 +146,7 @@ func _TestMasterStorage(t0 *tEnv) {
tCM.Expect(netconnect("c:1", "m:3", "m:1"))
tCM.Expect(conntx("c:1", "m:3", 1, &proto.RequestIdentification{
NodeType: proto.CLIENT,
NID: 0,
NID: proto.NID(proto.CLIENT, 0),
Address: xnaddr(""),
ClusterName: "abc1",
IdTime: proto.IdTimeNone,
......
......@@ -74,9 +74,11 @@ const nodeTypeChar = "SMCA" // NOTE neo/py does this out of sync with NodeType c
//
// It returns ex 'S1', 'M2', ...
func (nid NodeID) String() string {
/*
if nid == 0 {
return "?(0)0"
}
*/
num := nid & (1<<24 - 1)
......@@ -88,7 +90,12 @@ func (nid NodeID) String() string {
typ := uint8(-int8(nid>>24)) >> 4
if typ < 4 {
return fmt.Sprintf("%c%d", nodeTypeChar[typ], num)
// XXX temp hack until neo.NewNode does not use "temporary" bit in NodeID
nums := "?"
if num != 0 {
nums = fmt.Sprintf("%d", num)
}
return fmt.Sprintf("%c%s", nodeTypeChar[typ], nums)
}
return fmt.Sprintf("?(%d)%d", typ, num)
......
......@@ -35,13 +35,14 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
xxcontext "lab.nexedi.com/kirr/neo/go/internal/xcontext"
taskctx "lab.nexedi.com/kirr/neo/go/internal/xcontext/task"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/internal/xzodb"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
// "lab.nexedi.com/kirr/go123/xsync"
)
// Storage is NEO node that keeps data and provides read/write access to it via network.
......@@ -50,13 +51,19 @@ import (
type Storage struct {
node *_MasteredNode
/*
// context for providing operational service
// it is renewed every time master tells us StartOpertion, so users
// must read it initially only once under opMu via withWhileOperational.
opMu sync.Mutex
opCtx context.Context
*/
lli xneo.Listener
back storage.Backend
// whole Run runs under runCtx
runCtx context.Context
}
// NewStorage creates new storage node that will talk to master on masterAddr.
......@@ -69,10 +76,12 @@ func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage
back: back,
}
/*
// operational context is initially done (no service should be provided)
noOpCtx, cancel := context.WithCancel(context.Background())
cancel()
stor.opCtx = noOpCtx
*/
return stor
}
......@@ -83,24 +92,27 @@ func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage
//
// The storage will be serving incoming connections on l.
func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
stor.runCtx = ctx
addr := l.Addr()
defer task.Runningf(&ctx, "storage(%s)", addr)(&err)
// defer task.Runningf(&ctx, "storage(%s)", addr)(&err) // XXX kill
log.Infof(ctx, "%s: listening on %s ...", stor.node.MyInfo.NID, addr)
// update our serving address in node
naddr, err := proto.Addr(addr)
if err != nil {
return err
return err // XXX + errctx ?
}
stor.node.MyInfo.Addr = naddr
// wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l))
stor.lli = xneo.NewListener(neonet.NewLinkListener(l))
wg := xsync.NewWorkGroup(ctx)
// wg := xsync.NewWorkGroup(ctx) // XXX derive from orig ctx
// connect to master and get commands and updates from it
wg.Go(func(ctx context.Context) error {
return stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// connect to master and let it drive us via commands and updates
// wg.Go(func(ctx context.Context) error {
err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
......@@ -116,9 +128,10 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// we got StartOperation command. Let master drive us during service phase.
return stor.m1serve(ctx, mlink, reqStart)
})
})
// })
// serve incoming connections
/*
// serve incoming connections while connected to M
wg.Go(func(ctx context.Context) (err error) {
defer task.Running(&ctx, "accept")(&err)
......@@ -150,6 +163,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
})
err = wg.Wait()
*/
// XXX should Storage do it, or should it leave back non-closed?
// TODO -> Storage should not close backend.
......@@ -173,7 +187,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// - nil: initialization was ok and a command came from master to start operation.
// - !nil: initialization was cancelled or failed somehow.
func (stor *Storage) m1initialize(ctx context.Context, mlink *_MasterLink) (reqStart *neonet.Request, err error) {
defer task.Runningf(&ctx, "init %s", mlink)(&err)
defer task.Runningf(&ctx, "mserve init")(&err)
for {
req, err := mlink.Recv1()
......@@ -254,8 +268,9 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
// either due to master commanding us to stop, or context cancel or some other
// error.
func (stor *Storage) m1serve(ctx context.Context, mlink *_MasterLink, reqStart *neonet.Request) (err error) {
defer task.Runningf(&ctx, "serve %s", mlink)(&err)
defer task.Runningf(&ctx, "mserve")(&err)
/*
// refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating as master told us to do so.
opCtx, opCancel := context.WithCancel(ctx)
......@@ -263,6 +278,18 @@ func (stor *Storage) m1serve(ctx context.Context, mlink *_MasterLink, reqStart *
stor.opCtx = opCtx
stor.opMu.Unlock()
defer opCancel()
*/
// serve clients while operational
serveCtx := taskctx.Runningf(stor.runCtx, "%s", stor.node.MyInfo.NID)
serveCtx, serveCancel := xcontext.Merge/*Cancel*/(serveCtx, ctx)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
stor.serve(serveCtx)
}()
defer wg.Wait()
defer serveCancel()
// reply M we are ready
// XXX NEO/py sends NotifyReady on another conn; we patched py: see
......@@ -304,6 +331,36 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
// --- serve incoming connections from other nodes ---
func (stor *Storage) serve(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "serve")(&err)
wg := sync.WaitGroup{}
defer wg.Wait()
// XXX dup from master -> Node.Listen() -> Accept() ?
// XXX ? -> Node.Accept(lli) (it will verify IdTime against Node.nodeTab[nid])
// XXX ? -> Node.Serve(lli -> func(idReq))
for {
if ctx.Err() != nil {
return ctx.Err()
}
req, idReq, err := stor.lli.Accept(ctx)
if err != nil {
if !xxcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle?
}
continue
}
wg.Add(1)
go func() {
defer wg.Done()
stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged
}()
}
}
// identify processes identification request from connected peer.
func (stor *Storage) identify(ctx context.Context, idReq *proto.RequestIdentification) (idResp proto.Msg, err error) {
accept, reject := stor.identify_(idReq)
......@@ -329,6 +386,7 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
return nil, &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}
}
/*
// check operational
stor.opMu.Lock()
operational := (stor.opCtx.Err() == nil)
......@@ -337,6 +395,7 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
if !operational {
return nil, &proto.Error{proto.NOT_READY, "cluster not operational"}
}
*/
return &proto.AcceptIdentification{
NodeType: stor.node.MyInfo.Type,
......@@ -346,6 +405,7 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
}
/*
// withWhileOperational derives new context from ctx which will be cancelled, when either
// - ctx is cancelled, or
// - master tells us to stop operational service
......@@ -354,8 +414,9 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
opCtx := stor.opCtx
stor.opMu.Unlock()
return xcontext.Merge/*Cancel*/(ctx, opCtx)
return xcontext.MergeCancel(ctx, opCtx)
}
*/
// serveLink serves incoming node-node link connection.
......@@ -378,9 +439,11 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *
// client passed identification, now serve other requests
/*
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
*/
wg := sync.WaitGroup{} // XXX -> errgroup?
for {
......
......@@ -96,7 +96,7 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA
MyInfo: proto.NodeInfo{
Type: typ,
Addr: proto.Address{},
NID: 0,
NID: proto.NID(typ, 0), // temp, e.g. S? TODO use "temp" bit in NodeID
IdTime: proto.IdTimeNone,
},
ClusterName: clusterName,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment