Commit 6af054b0 authored by Kirill Smelkov's avatar Kirill Smelkov

restore: Rework extraction pipeline to use xsync.WorkGroup

The pattern where multiple workers are spawned to work on a common task
and where whole work needs to be canceled on first error is now well
understood, with the functionality to broadcast cancel and propagate
errors being wrapped into libraries such as

	https://godoc.org/golang.org/x/sync/errgroup			and
	https://godoc.org/lab.nexedi.com/kirr/go123/xsync#WorkGroup
	(go123@515a6d14)

Let's streamline the code by using xsync.WorkGroup (it is in our hands,
a bit more well designed (imho), has analog in Pygolang, and can be
changed/enhanced as needed).

The other reason to rework the code is that the workgroup is created
under context (currently always background) and can be canceled by that
context cancel. In the next patch we'll teach all git-backup
subcommands, including restore, to work under context, and by using
xsync.WorkGroup we will automatically handle cancellation from outside,
while without reworking extraction pipeline we would need to
additionally glue ctx cancel to signal to workers to stop.

Compared to previous state both xsync.WorkGroup and errogroup return
only the first error, however it should likely not cause problems in
practice as the first error is usually the most informative one.
parent 00f58d0b
...@@ -67,6 +67,7 @@ NOTE the idea of pulling all refs together is similar to git-namespaces ...@@ -67,6 +67,7 @@ NOTE the idea of pulling all refs together is similar to git-namespaces
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
...@@ -77,7 +78,6 @@ import ( ...@@ -77,7 +78,6 @@ import (
"runtime/debug" "runtime/debug"
"sort" "sort"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
...@@ -87,6 +87,7 @@ import ( ...@@ -87,6 +87,7 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xflag" "lab.nexedi.com/kirr/go123/xflag"
"lab.nexedi.com/kirr/go123/xstrings" "lab.nexedi.com/kirr/go123/xstrings"
"lab.nexedi.com/kirr/go123/xsync"
git "github.com/libgit2/git2go" git "github.com/libgit2/git2go"
) )
...@@ -961,23 +962,18 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec) ...@@ -961,23 +962,18 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
repotab = nil repotab = nil
packxq := make(chan PackExtractReq, 2*njobs) // requests to extract packs packxq := make(chan PackExtractReq, 2*njobs) // requests to extract packs
errch := make(chan error) // errors from workers wg := xsync.NewWorkGroup(context.Background())
stopch := make(chan struct{}) // broadcasts restore has to be cancelled
wg := sync.WaitGroup{}
// main worker: walk over specified prefixes restoring files and // main worker: walk over specified prefixes restoring files and
// scheduling pack extraction requests from *.git -> packxq // scheduling pack extraction requests from *.git -> packxq
wg.Add(1) wg.Go(func(ctx context.Context) (err error) {
go func() {
defer wg.Done()
defer close(packxq) defer close(packxq)
// raised err -> errch // raised err -> return
here := my.FuncName() here := my.FuncName()
defer exc.Catch(func(e *exc.Error) { defer exc.Catch(func(e *exc.Error) {
errch <- exc.Addcallingcontext(here, e) err = exc.Addcallingcontext(here, e)
}) })
runloop:
for _, __ := range restorespecv { for _, __ := range restorespecv {
prefix, dir := __.prefix, __.dir prefix, dir := __.prefix, __.dir
...@@ -1041,33 +1037,32 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec) ...@@ -1041,33 +1037,32 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
repopath: reprefix(prefix, dir, repo.repopath), repopath: reprefix(prefix, dir, repo.repopath),
prefix: prefix}: prefix: prefix}:
case <-stopch: case <-ctx.Done():
break runloop return ctx.Err()
} }
} }
} }
}()
return nil
})
// pack workers: packxq -> extract packs // pack workers: packxq -> extract packs
for i := 0; i < njobs; i++ { for i := 0; i < njobs; i++ {
wg.Add(1) wg.Go(func(ctx context.Context) (err error) {
go func() { // raised err -> return
defer wg.Done()
// raised err -> errch
here := my.FuncName() here := my.FuncName()
defer exc.Catch(func(e *exc.Error) { defer exc.Catch(func(e *exc.Error) {
errch <- exc.Addcallingcontext(here, e) err = exc.Addcallingcontext(here, e)
}) })
runloop:
for { for {
select { select {
case <-stopch: case <-ctx.Done():
break runloop return ctx.Err()
case p, ok := <-packxq: case p, ok := <-packxq:
if !ok { if !ok {
break runloop return nil
} }
infof("# git %s\t-> %s", p.prefix, p.repopath) infof("# git %s\t-> %s", p.prefix, p.repopath)
...@@ -1129,27 +1124,12 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec) ...@@ -1129,27 +1124,12 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
// RunWith{stdout: gitprogress(), stderr: gitprogress()}) // RunWith{stdout: gitprogress(), stderr: gitprogress()})
} }
} }
}() })
}
// wait for workers to finish & collect/reraise their errors
go func() {
wg.Wait()
close(errch)
}()
ev := xerr.Errorv{}
for e := range errch {
// tell everything to stop on first error
if len(ev) == 0 {
close(stopch)
}
ev = append(ev, e)
} }
if len(ev) != 0 { // wait for workers to finish & collect/reraise first error, if any
exc.Raise(ev) err = wg.Wait()
} exc.Raiseif(err)
} }
// loadBackupRefs loads 'backup.ref' content from a git object. // loadBackupRefs loads 'backup.ref' content from a git object.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment