Commit 8acb9b73 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cf76530c
......@@ -198,20 +198,17 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// accept next connection from master. only 1 connection is served at any given time.
// every new connection from master means talk over previous connection is done/cancelled.
// every new connection from master means talk over previous connection is cancelled.
// XXX check compatibility with py
type accepted struct {
conn *neo.Conn
err error
}
acceptq := make(chan accepted, 1)
acceptq := make(chan *neo.Conn, 1)
go func () {
for {
conn, err := Mlink.Accept()
acceptq <- accepted{conn, err}
if err != nil {
break
log.Error(ctx, err)
return
}
acceptq <- conn
}
}()
......@@ -219,10 +216,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
talkq := make(chan error, 1)
loop:
for {
// main worker which talks with master over Mconn
// one talk cycle for master to drive us over Mconn
// puts error after talk finishes -> talkq
go func() {
err := func() error {
talk := func() error {
// let master initialize us. If successful this ends with StartOperation command.
err := stor.m1initialize(ctx, Mconn)
if err != nil {
......@@ -234,8 +230,9 @@ loop:
err = stor.m1serve(ctx, Mconn)
log.Error(ctx, err)
return err
}()
talkq <- err
}
go func() {
talkq <- talk()
}()
// talk finished / next connection / cancel
......@@ -244,13 +241,10 @@ loop:
// XXX check for shutdown command
continue loop // retry from initializing
case a := <-acceptq:
case conn := <-acceptq:
lclose(ctx, Mconn) // wakeup/cancel current talk
if a.err != nil {
return a.err
}
<-talkq
Mconn = a.conn
Mconn = conn
case <-ctx.Done():
return ctx.Err()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment