Commit f456cfaa authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8acb9b73
...@@ -199,7 +199,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -199,7 +199,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// accept next connection from master. only 1 connection is served at any given time. // accept next connection from master. only 1 connection is served at any given time.
// every new connection from master means talk over previous connection is cancelled. // every new connection from master means talk over previous connection is cancelled.
// XXX check compatibility with py // XXX recheck compatibility with py
acceptq := make(chan *neo.Conn, 1) acceptq := make(chan *neo.Conn, 1)
go func () { go func () {
for { for {
...@@ -212,11 +212,19 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -212,11 +212,19 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
} }
}() }()
// now handle notifications and commands from master // handle notifications and commands from master
talkq := make(chan error, 1) talkq := make(chan error, 1)
loop:
for { for {
// one talk cycle for master to drive us over Mconn // wait for next connection from master if talk over previous one finished.
if Mconn == nil {
select {
case Mconn = <-acceptq:
case <-ctx.Done():
return ctx.Err()
}
}
// one talk cycle for master to drive us
// puts error after talk finishes -> talkq // puts error after talk finishes -> talkq
talk := func() error { talk := func() error {
// let master initialize us. If successful this ends with StartOperation command. // let master initialize us. If successful this ends with StartOperation command.
...@@ -239,7 +247,8 @@ loop: ...@@ -239,7 +247,8 @@ loop:
select { select {
case err = <-talkq: case err = <-talkq:
// XXX check for shutdown command // XXX check for shutdown command
continue loop // retry from initializing lclose(ctx, Mconn)
Mconn = nil // now wait for accept to get next Mconn
case conn := <-acceptq: case conn := <-acceptq:
lclose(ctx, Mconn) // wakeup/cancel current talk lclose(ctx, Mconn) // wakeup/cancel current talk
...@@ -250,8 +259,6 @@ loop: ...@@ -250,8 +259,6 @@ loop:
return ctx.Err() return ctx.Err()
} }
} }
return nil // XXX err
} }
// m1initialize drives storage by master messages during initialization phase // m1initialize drives storage by master messages during initialization phase
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment