Commit a16e85ce authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e6fbafe8
......@@ -279,8 +279,8 @@ func TestMasterStorage(t *testing.T) {
// TODO test ID rejects (uuid already registered, ...)
// M starts recovery on S
tc.Expect(conntx("m:2", "s:2", 1, &neo.Recovery{}))
tc.Expect(conntx("s:2", "m:2", 1, &neo.AnswerRecovery{
tc.Expect(conntx("m:2", "s:2", 0, &neo.Recovery{}))
tc.Expect(conntx("s:2", "m:2", 0, &neo.AnswerRecovery{
// empty new node
PTid: 0,
BackupTid: neo.INVALID_TID,
......
......@@ -184,16 +184,8 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
}
log.Info(ctx, "identification accepted")
Mlink := Mconn.Link()
// close Mlink on return / cancel
// XXX -> defer xio.CloseWhenDone(ctx, Mlink)()
retch := make(chan struct{})
defer func() {
err2 := Mlink.Close()
err = xerr.First(err, err2)
close(retch)
}()
mlink := Mconn.Link()
defer xio.CloseWhenDone(ctx, mlink)()
// XXX add master UUID -> nodeTab ? or master will notify us with it himself ?
......@@ -207,7 +199,21 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
stor.node.MyInfo.UUID = accept.YourUUID
}
// handle notifications and commands from master
// let master initialize us. If successful this ends with StartOperation command.
err = stor.m1initialize(ctx, mlink)
if err != nil {
log.Error(ctx, err)
return err
}
// we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, mlink)
log.Error(ctx, err)
return err
/*
// accept next connection from master. only 1 connection is served at any given time.
// every new connection from master means talk over previous connection is cancelled.
// XXX recheck compatibility with py
......@@ -221,7 +227,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
return
for {
conn, err := Mlink.Accept(/*ctx*/)
conn, err := Mlink.Accept()
select {
case acceptq <- accepted{conn, err}:
......@@ -292,6 +298,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
return ctx.Err()
}
}
*/
}
// m1initialize drives storage by master messages during initialization phase
......@@ -305,18 +312,18 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// return error indicates:
// - nil: initialization was ok and a command came from master to start operation
// - !nil: initialization was cancelled or failed somehow
func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err error) {
defer task.Runningf(&ctx, "init %v", Mconn)(&err)
func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (err error) {
defer task.Runningf(&ctx, "init %v", mlink)(&err)
for {
msg, err := Mconn.Recv()
req, err := mlink.Recv1()
if err != nil {
return err
}
// XXX vvv move Send out of reply preparing logic
switch msg.(type) {
switch msg := req.Msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
......@@ -325,20 +332,20 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
return nil
case *neo.Recovery:
err = Mconn.Send(&neo.AnswerRecovery{
err = req.Reply(&neo.AnswerRecovery{
PTid: stor.node.PartTab.PTid,
BackupTid: neo.INVALID_TID,
TruncateTid: neo.INVALID_TID})
case *neo.AskPartitionTable:
// TODO initially read PT from disk
err = Mconn.Send(&neo.AnswerPartitionTable{
err = req.Reply(&neo.AnswerPartitionTable{
PTid: stor.node.PartTab.PTid,
RowList: stor.node.PartTab.Dump()})
case *neo.LockedTransactions:
// XXX r/o stub
err = Mconn.Send(&neo.AnswerLockedTransactions{})
err = req.Reply(&neo.AnswerLockedTransactions{})
// TODO AskUnfinishedTransactions
......@@ -349,7 +356,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
return zerr // XXX send the error to M
}
err = Mconn.Send(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
err = req.Reply(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
case *neo.NotifyPartitionTable:
// TODO M sends us whole PT -> save locally
......@@ -366,10 +373,12 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
}
// XXX move Mconn.Send here and ^^^ only prepare reply?
// XXX move req.Reply here and ^^^ only prepare reply
if err != nil {
return err
}
req.Close() // XXX err?
}
}
......@@ -382,8 +391,8 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
// it always returns with an error describing why serve has to be stopped -
// either due to master commanding us to stop, or context cancel or some other
// error.
func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
defer task.Runningf(&ctx, "serve %v", Mconn)(&err)
func (stor *Storage) m1serve(ctx context.Context, mlink *neo.NodeLink) (err error) {
defer task.Runningf(&ctx, "serve %v", mlink)(&err)
// refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating as master told us to do so.
......@@ -394,19 +403,22 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
defer opCancel()
// reply M we are ready
err = Mconn.Send(&neo.NotifyReady{})
// XXX according to current neo/py this is separate send - not reply - and so we do here
err = mlink.Send1(&neo.NotifyReady{})
if err != nil {
return err
}
for {
// XXX abort on ctx (XXX or upper?)
msg, err := Mconn.Recv()
req, err := mlink.Recv1()
if err != nil {
return err
}
switch msg.(type) {
req.Close() // XXX stub, err
switch msg := req.Msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment