Commit 4eaaf186 authored by Kirill Smelkov's avatar Kirill Smelkov

X do M -> S (StartOperation); M <- S (NotifyReady) on the same conn

Else M has to listen from incoming connections from S but currently to
simplify workings it does CloseAccept after accepting S's identification.

It also looks logical from protocol point of view that there is a reply
to StartOperation command.
parent 7890d4cb
......@@ -344,7 +344,7 @@ func TestMasterStorage(t *testing.T) {
// TODO ^^^ should be sent to S
tc.Expect(conntx("m:2", "s:2", 10, &neo.StartOperation{Backup: false}))
tc.Expect(conntx("s:2", "m:2", 3, &neo.NotifyReady{}))
tc.Expect(conntx("s:2", "m:2", 10, &neo.NotifyReady{}))
// TODO S leave while service
......
......@@ -865,26 +865,26 @@ func storCtlService(ctx context.Context, stor *neo.Node) (err error) {
// XXX send clusterInformation ?
// XXX current neo/py does StartOperation / NotifyReady as separate
// sends, not exchange on the same conn.
//ready := neo.NotifyReady{}
//err = slink.Ask1(&neo.StartOperation{Backup: false}, &ready)
err = slink.Send1(&neo.StartOperation{Backup: false})
if err != nil {
return err
}
req, err := slink.Recv1()
if err != nil {
return err
}
req.Close()
switch msg := req.Msg.(type) {
case *neo.NotifyReady:
// ok
case *neo.Error:
return msg
default:
return fmt.Errorf("unexpected message %T", msg)
}
// sends, not exchange on the same conn. - fixed
ready := neo.NotifyReady{}
err = slink.Ask1(&neo.StartOperation{Backup: false}, &ready)
//err = slink.Send1(&neo.StartOperation{Backup: false})
//if err != nil {
// return err
//}
//req, err := slink.Recv1()
//if err != nil {
// return err
//}
//req.Close()
//switch msg := req.Msg.(type) {
//case *neo.NotifyReady:
// // ok
//case *neo.Error:
// return msg
//default:
// return fmt.Errorf("unexpected message %T", msg)
//}
// now wait in a loop
......
......@@ -202,14 +202,14 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// handle notifications and commands from master
// let master initialize us. If successful this ends with StartOperation command.
err = stor.m1initialize(ctx, mlink)
reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil {
log.Error(ctx, err)
return err
}
// we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, mlink)
err = stor.m1serve(ctx, reqStart)
log.Error(ctx, err)
return err
......@@ -312,24 +312,24 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// return error indicates:
// - nil: initialization was ok and a command came from master to start operation
// - !nil: initialization was cancelled or failed somehow
func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (err error) {
func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (reqStart *neo.Request, err error) {
defer task.Runningf(&ctx, "init %v", mlink)(&err)
for {
req, err := mlink.Recv1()
if err != nil {
return err
return nil, err
}
// XXX vvv move Send out of reply preparing logic
switch msg := req.Msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
return nil, fmt.Errorf("unexpected message: %T", msg)
case *neo.StartOperation:
// ok, transition to serve
return nil
return &req, nil
case *neo.Recovery:
err = req.Reply(&neo.AnswerRecovery{
......@@ -353,7 +353,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (err
lastTid, zerr1 := stor.zstor.LastTid(ctx)
lastOid, zerr2 := stor.zstor.LastOid(ctx)
if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr // XXX send the error to M
return nil, zerr // XXX send the error to M
}
err = req.Reply(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
......@@ -375,7 +375,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (err
// XXX move req.Reply here and ^^^ only prepare reply
if err != nil {
return err
return nil, err
}
req.Close() // XXX err?
......@@ -391,7 +391,8 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (err
// it always returns with an error describing why serve has to be stopped -
// either due to master commanding us to stop, or context cancel or some other
// error.
func (stor *Storage) m1serve(ctx context.Context, mlink *neo.NodeLink) (err error) {
func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err error) {
mlink := reqStart.Link()
defer task.Runningf(&ctx, "serve %v", mlink)(&err)
// refresh stor.opCtx and cancel it when we finish so that client
......@@ -404,7 +405,7 @@ func (stor *Storage) m1serve(ctx context.Context, mlink *neo.NodeLink) (err erro
// reply M we are ready
// XXX according to current neo/py this is separate send - not reply - and so we do here
err = mlink.Send1(&neo.NotifyReady{})
err = reqStart.Reply(&neo.NotifyReady{})
if err != nil {
return err
}
......
......@@ -231,7 +231,10 @@ class Application(BaseApplication):
self.master_conn.setHandler(initialization.InitializationHandler(self))
while not self.operational:
_poll()
self.master_conn.send(Packets.NotifyReady())
#self.master_conn.send(Packets.NotifyReady())
#self.master_conn.answer(Packets.NotifyReady()) # NOTE to keep same conn as was used when M send StartOperation
# XXX ^^^ answer check pkt.isResponse()
self.master_conn.send(Packets.NotifyReady(), msg_id=self.master_conn.peer_id)
def doOperation(self):
"""Handle everything, including replications and transactions."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment