Commit e05f89b1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6559cefe
......@@ -122,6 +122,7 @@
// clients, that had requested it (separately to each client), about the
// changes:
//
// XXX rev_max -> rev?
// S: 2 pin <bigfileX> #<blk> @<rev_max> XXX 2-> 2*k (multiple pins in parallel)
//
// and waits until all clients confirm that changed file block can be updated
......@@ -503,16 +504,15 @@ type Watcher struct {
}
// FileWatch represents watching for 1 BigFile.
// XXX merge into watcher?
type FileWatch struct {
link *Watcher // link to client
file *BigFile // XXX needed?
// XXX locking
// requested to be watched @at
at zodb.Tid
// XXX pinned
at zodb.Tid // requested to be watched @at
pinned SetI64 // blocks that are already pinned to be ≤ at
}
// /(head|<rev>)/bigfile/ - served by BigFileDir.
......@@ -803,7 +803,7 @@ retry:
// fmt.Printf("\t- %s\n", file.zfile.POid())
//}
wg, ctx := errgroup.WithContext(context.TODO())
wg, ctx := errgroup.WithContext(context.TODO()) // XXX ctx = ?
for file, finv := range toinvalidate {
file := file
for blk := range finv.blkmap {
......@@ -822,7 +822,7 @@ retry:
// we need to do it only if we see topology (i.e. btree) change
//
// do it after completing data invalidations.
wg, ctx = errgroup.WithContext(context.TODO())
wg, ctx = errgroup.WithContext(context.TODO()) // XXX ctx = ?
for file, finv := range toinvalidate {
if !finv.size {
continue
......@@ -1096,7 +1096,8 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
}
// we have the data - it can be used after watchers are updated
f.updateWatchers(blk, treepath, pathRevMax)
// XXX should we use ctx here? (see updateWatcher comments)
f.updateWatchers(ctx, blk, treepath, pathRevMax)
// data can be used now
close(loading.ready)
......@@ -1129,7 +1130,11 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zconnMu rlocked.
func (f *BigFile) updateWatchers(blk int64, treepath []zodb.IPersistent, pathRevMax zodb.Tid) {
//
// XXX do we really need to use/propagate caller contex here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure?
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []zodb.IPersistent, pathRevMax zodb.Tid) {
// only head/ is being watched for
if f.head.rev != 0 {
return
......@@ -1144,29 +1149,19 @@ func (f *BigFile) updateWatchers(blk int64, treepath []zodb.IPersistent, pathRev
blkrevmax, _ := f.δFtail.LastRevOf(blk, f.zfile.PJar().At()) // XXX = f.head.zconn.At()
blkrevmax = tidmin(blkrevmax, pathRevMax)
wg, ctx := errgroup.WithContext(ctx)
for w := range f.watchers {
w := w
wg.Go(func() error {
// XXX close watcher on any error
return w.pin(ctx, blk, blkrevmax)
})
_ = w
}
/*
// XXX locking
for _, mapping := range f.mappings {
if revmax <= mapping.at || !mapping.blkrange.in(blk) {
continue // do nothing
}
if mapping.pinned.Contains(blk) {
continue // do nothing
}
rev = max(δFtail.by(blk) : _ <= mapping.at)
// XXX vvv -> go
client.remmap(mapping.addr[blk], file/@<rev>/data)
mapping.pinned.Add(blk)
err := wg.Wait()
if err != nil {
panic(err) // XXX
}
*/
}
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
......@@ -1240,7 +1235,45 @@ retry:
// -------- notifications to Watcher --------
// XXX WatchFile.Pin(blk, at)
// pin makes sure that file[blk] on client side is the same as of @rev state.
//
// XXX describe more.
// XXX explain that if rev ≤ .at there is no rev_next: rev < rev_next ≤ at.
// XXX error - when?
//
// XXX -> WatchFile.Pin(blk, at)
// XXX place=ok?
func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid()
defer xerr.Contextf(&err, "f<%s>: watch%d: pin #blk @%s", foid, w.link.id, blk, rev)
// XXX locking?
// XXX simultaneous calls?
if rev <= w.at {
return // client's view already coveris rev
}
if w.pinned.Has(blk) {
// XXX pinned has to be invalidated when w.at^
return // already pinned
}
// XXX comment
rev, _ = w.file.δFtail.LastRevOf(blk, w.at)
ack, err := w.link.send(ctx, fmt.Sprintf("pin %s #%s @%s", foid, blk, rev))
if err != nil {
return err
}
if ack != "ack" {
return fmt.Errorf("expect %q; got %q", "ack", ack)
}
w.pinned.Add(blk)
return nil
}
// ---- Watch server ----
......
......@@ -479,7 +479,7 @@ class tWatch:
reqv = [] # of received requests
while len(expected) > 0:
req = t.recvReq()
assert req is not None
assert req is not None # channel not closed
assert req.msg in expected
expected.delete(req.msg)
reqv.append(req)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment