Commit 8c352458 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9c0279da
......@@ -493,12 +493,12 @@ import (
// "time"
log "github.com/golang/glog"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/go123/xruntime/race"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -793,7 +793,7 @@ func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err er
return zevent.Err
case *zodb.EventCommit:
err = root.handleδZ(zevent)
err = root.handleδZ(ctx, zevent)
if err != nil {
return err
}
......@@ -802,7 +802,7 @@ func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err er
}
// handleδZ handles 1 change event from ZODB notification.
func (root *Root) handleδZ(δZ *zodb.EventCommit) (err error) {
func (root *Root) handleδZ(ctx context.Context, δZ *zodb.EventCommit) (err error) {
defer xerr.Contextf(&err, "handleδZ @%s", δZ.Tid)
head := root.head
......@@ -817,6 +817,7 @@ func (root *Root) handleδZ(δZ *zodb.EventCommit) (err error) {
continueOSCacheUpload := make(chan struct{})
retry:
for {
// XXX ctx cancel
head.zheadMu.Lock()
head.pauseOSCacheUpload = true
head.continueOSCacheUpload = continueOSCacheUpload
......@@ -863,7 +864,7 @@ retry:
fmt.Printf("\n\n")
}
wg, ctx := errgroup.WithContext(context.TODO()) // XXX ctx = ?
wg := xsync.NewWorkGroup(ctx)
for file, δfile := range δF.ByFile {
// // XXX needed?
// // XXX even though δBtail is complete, not all ZBlk are present here
......@@ -872,8 +873,8 @@ retry:
file := file
for blk := range δfile.Blocks {
blk := blk
wg.Go(func() error {
return file.invalidateBlk(ctx, blk)
wg.Go(func(ctx context.Context) error {
return file.invalidateBlk(ctx, blk) // XXX +ctx ?
})
}
}
......@@ -886,13 +887,13 @@ retry:
// we need to do it only if we see topology (i.e. btree) change
//
// do it after completing data invalidations.
wg, ctx = errgroup.WithContext(context.TODO()) // XXX ctx = ?
wg = xsync.NewWorkGroup(ctx)
for file, δfile := range δF.ByFile {
if !δfile.Size {
continue
}
file := file
wg.Go(func() error {
wg.Go(func(ctx context.Context) error {
return file.invalidateAttr() // NOTE does not accept ctx
})
}
......@@ -1179,11 +1180,11 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
defer cancel()
// read/load all block(s) in parallel
wg, ctx := errgroup.WithContext(ctx)
wg := xsync.NewWorkGroup(ctx)
for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
blkoff := blkoff
blk := blkoff / f.blksize
wg.Go(func() error {
wg.Go(func(ctx context.Context) error {
δ := blkoff-aoff // blk position in dest
//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
......@@ -1481,7 +1482,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
blkrev := blkrevMax
blkrevRough := true
wg, ctx := errgroup.WithContext(ctx)
wg := xsync.NewWorkGroup(ctx)
f.watchMu.RLock()
for w := range f.watchTab {
......@@ -1522,7 +1523,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
pinrev, _ := w.file.LastBlkRev(ctx, blk, w.at) // XXX move into go?
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
wg.Go(func() error {
wg.Go(func(ctx context.Context) error {
defer w.atMu.RUnlock()
// XXX close watcher on any error
return w.pin(ctx, blk, pinrev)
......@@ -1700,11 +1701,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
w.atMu.RLock()
defer w.atMu.RUnlock()
wg, ctx := errgroup.WithContext(ctx)
wg := xsync.NewWorkGroup(ctx)
for blk, rev := range toPin {
blk := blk
rev := rev
wg.Go(func() error {
wg.Go(func(ctx context.Context) error {
return w._pin(ctx, blk, rev)
})
}
......@@ -1759,7 +1760,7 @@ func (wlink *WatchLink) _serve() (err error) {
ctx0 := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0)
wg, ctx := errgroup.WithContext(ctx)
wg := xsync.NewWorkGroup(ctx)
r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx))
......@@ -1800,7 +1801,7 @@ func (wlink *WatchLink) _serve() (err error) {
// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
retq := make(chan struct{})
defer close(retq)
wg.Go(func() error {
wg.Go(func(ctx context.Context) error {
// monitor is always canceled - either at parent ctx cancel, or
// upon return from serve (see "cancel all handlers ..." ^^^).
// If it was return - report returned error to wg.Wait, not "canceled".
......@@ -1861,7 +1862,7 @@ func (wlink *WatchLink) _serve() (err error) {
}
// watch ...
wg.Go(func() error {
wg.Go(func(ctx context.Context) error {
return wlink.handleWatch(ctx, stream, msg)
})
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment