Commit e956f33c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a158ddd1
...@@ -195,6 +195,7 @@ func (c *Client) updateOperational() (sendReady func()) { ...@@ -195,6 +195,7 @@ func (c *Client) updateOperational() (sendReady func()) {
// //
// The only error possible is if provided ctx cancels. // The only error possible is if provided ctx cancels.
// XXX and client stopped/closed? (ctx passed to Run cancelled) // XXX and client stopped/closed? (ctx passed to Run cancelled)
//
// XXX change signature to call f from under withOperational ? // XXX change signature to call f from under withOperational ?
func (c *Client) withOperational(ctx context.Context) error { func (c *Client) withOperational(ctx context.Context) error {
for { for {
...@@ -222,7 +223,7 @@ func (c *Client) withOperational(ctx context.Context) error { ...@@ -222,7 +223,7 @@ func (c *Client) withOperational(ctx context.Context) error {
// talkMaster connects to master, announces self and receives notifications. // talkMaster connects to master, announces self and receives notifications.
// it tries to persist master link reconnecting as needed. // it tries to persist master link reconnecting as needed.
// //
// XXX C -> M for commit (-> another channel) // TODO C -> M for commit (-> another channel)
// //
// XXX always error (dup Storage.talkMaster) ? // XXX always error (dup Storage.talkMaster) ?
func (c *Client) talkMaster(ctx context.Context) (err error) { func (c *Client) talkMaster(ctx context.Context) (err error) {
...@@ -318,7 +319,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -318,7 +319,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
}) })
// init partition table and lastTid from master // init partition table and lastTid from master
// TODO better change protocol for master to send us pt/head via notify // TODO better change protocol for master to send us head via notify
// channel right after identification. // channel right after identification.
wg.Go(func() error { wg.Go(func() error {
return c.initFromMaster(ctx, mlink) return c.initFromMaster(ctx, mlink)
...@@ -327,7 +328,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -327,7 +328,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
return wg.Wait() return wg.Wait()
} }
// initFromMaster asks M for partTab and DB head right after identification. // initFromMaster asks M for DB head right after identification.
func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) { func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "init")(&err) defer task.Running(&ctx, "init")(&err)
...@@ -362,15 +363,11 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er ...@@ -362,15 +363,11 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er
close(c.at0Ready) close(c.at0Ready)
} }
// XXX what next?
return nil return nil
// TODO transaction control? -> better in original goroutines doing the txn (just share mlink)
} }
// recvMaster receives and handles notifications from master // recvMaster receives and handles notifications from master.
func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) { func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "rx")(&err) defer task.Running(&ctx, "rx")(&err)
...@@ -388,7 +385,7 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err er ...@@ -388,7 +385,7 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err er
} }
} }
// recvMaster1 handles 1 message from master // recvMaster1 handles 1 message from master.
func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error { func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
switch msg := req.Msg.(type) { switch msg := req.Msg.(type) {
// <- committed txn // <- committed txn
...@@ -398,6 +395,7 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error { ...@@ -398,6 +395,7 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
// messages for state changes // messages for state changes
// XXX -> NodeApp into common code to handle NodeTab + PartTab updates from M?
c.node.StateMu.Lock() c.node.StateMu.Lock()
switch msg := req.Msg.(type) { switch msg := req.Msg.(type) {
...@@ -549,8 +547,9 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -549,8 +547,9 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
if err != nil { if err != nil {
return nil, 0, err // XXX err ctx return nil, 0, err // XXX err ctx
} }
// FIXME ^^^ slink.CloseAccept after really dialed (not to deadlock if // close accept after really dialed (not to deadlock if S decides to
// S decides to send us something) // send us something).
slink.CloseAccept() // XXX need to close only after really dialed
// on the wire it comes as "before", not "at" // on the wire it comes as "before", not "at"
req := proto.GetObject{ req := proto.GetObject{
......
...@@ -211,7 +211,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -211,7 +211,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
} }
// runMain is the process that implements main master cluster management logic: node tracking, cluster // runMain is the process that implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc. // state updates, scheduling data movement between storage nodes, etc.
func (m *Master) runMain(ctx context.Context) (err error) { func (m *Master) runMain(ctx context.Context) (err error) {
defer task.Running(&ctx, "main")(&err) defer task.Running(&ctx, "main")(&err)
...@@ -220,6 +220,7 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -220,6 +220,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// XXX however since clients request state reading we should use node.StateMu? // XXX however since clients request state reading we should use node.StateMu?
// XXX -> better rework protocol so that master pushes itself (not // XXX -> better rework protocol so that master pushes itself (not
// being pulled) to clients everything they need. // being pulled) to clients everything they need.
// -> it was reworked (see bf240897)
for ctx.Err() == nil { for ctx.Err() == nil {
// recover partition table from storages and wait till enough // recover partition table from storages and wait till enough
...@@ -488,6 +489,7 @@ func storCtlRecovery(ctx context.Context, stor *Node, res chan storRecovery) { ...@@ -488,6 +489,7 @@ func storCtlRecovery(ctx context.Context, stor *Node, res chan storRecovery) {
defer task.Runningf(&ctx, "%s: stor recovery", slink.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: stor recovery", slink.RemoteAddr())(&err)
// XXX cancel on ctx // XXX cancel on ctx
// XXX close slink on err? (if yes -> xcontext.WithCloseOnErrCancel)
recovery := proto.AnswerRecovery{} recovery := proto.AnswerRecovery{}
err = slink.Ask1(&proto.Recovery{}, &recovery) err = slink.Ask1(&proto.Recovery{}, &recovery)
...@@ -678,8 +680,8 @@ type storVerify struct { ...@@ -678,8 +680,8 @@ type storVerify struct {
// storCtlVerify drives a storage node during cluster verifying (= starting) state // storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, stor *Node, pt *PartitionTable, res chan storVerify) { func storCtlVerify(ctx context.Context, stor *Node, pt *PartitionTable, res chan storVerify) {
// XXX link.Close on err // XXX link.Close on err -> = xcontext.WithCloseOnErrCancel
// XXX cancel on ctx // XXX cancel on ctx -> = ^^^
var err error var err error
defer func() { defer func() {
......
// Copyright (C) 2016-2020 Nexedi SA and Contributors. // Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
......
...@@ -103,6 +103,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -103,6 +103,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
//stor.node.OnShutdown = serveCancel //stor.node.OnShutdown = serveCancel
// XXX hack: until ctx cancel is not handled properly by Recv/Send // XXX hack: until ctx cancel is not handled properly by Recv/Send
// XXX -> xcontext.WithCloseOnRetCancel
stor.node.OnShutdown = func() { stor.node.OnShutdown = func() {
serveCancel() serveCancel()
lclose(ctx, lli) lclose(ctx, lli)
...@@ -156,6 +157,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -156,6 +157,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
serveCancel() serveCancel()
wg.Wait() wg.Wait()
// XXX should Storage do it, or should it leave back non-closed?
err2 := stor.back.Close() err2 := stor.back.Close()
if err == nil { if err == nil {
err = err2 err = err2
...@@ -221,6 +223,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -221,6 +223,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
stor.node.MyInfo.UUID = accept.YourUUID stor.node.MyInfo.UUID = accept.YourUUID
} }
// XXX the first packet M sends always is NotifyNodeInformation (with us)
// -> receive it first via Expect1
// handle notifications and commands from master // handle notifications and commands from master
// let master initialize us. If successful this ends with StartOperation command. // let master initialize us. If successful this ends with StartOperation command.
...@@ -230,7 +235,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -230,7 +235,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
return err return err
} }
// we got StartOperation command. Let master drive us during servicing phase. // we got StartOperation command. Let master drive us during service phase.
err = stor.m1serve(ctx, reqStart) err = stor.m1serve(ctx, reqStart)
//log.Error(ctx, err) //log.Error(ctx, err)
return err return err
...@@ -310,6 +315,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -310,6 +315,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid}) err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
// XXX -> somehow to common part in NodeApp ?
case *proto.SendPartitionTable: case *proto.SendPartitionTable:
// TODO M sends us whole PT -> save locally // TODO M sends us whole PT -> save locally
stor.node.UpdatePartTab(ctx, msg) // XXX lock? XXX handle msg.NumReplicas stor.node.UpdatePartTab(ctx, msg) // XXX lock? XXX handle msg.NumReplicas
...@@ -381,6 +387,8 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error { ...@@ -381,6 +387,8 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
case *proto.StopOperation: case *proto.StopOperation:
return fmt.Errorf("stop requested") return fmt.Errorf("stop requested")
// should be served by NodeApp.commonRecv1
// ---- 8< ----
// XXX SendPartitionTable? // XXX SendPartitionTable?
// XXX NotifyPartitionChanges? // XXX NotifyPartitionChanges?
...@@ -389,6 +397,7 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error { ...@@ -389,6 +397,7 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
case *proto.NotifyClusterState: case *proto.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it? stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
// ---- 8< ----
// TODO commit related messages // TODO commit related messages
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment