Commit acec3cf1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f0afdaf7
...@@ -570,8 +570,7 @@ type BigFile struct { ...@@ -570,8 +570,7 @@ type BigFile struct {
// progress of being established. XXX text // progress of being established. XXX text
// //
// XXX locking -> watchMu? // XXX locking -> watchMu?
// XXX -> watchTab ? watchTab map[*Watch]struct{}
watches map[*Watch]struct{}
} }
// blkLoadState represents a ZBlk load state/result. // blkLoadState represents a ZBlk load state/result.
...@@ -599,7 +598,7 @@ type WatchLink struct { ...@@ -599,7 +598,7 @@ type WatchLink struct {
id int32 // ID of this /head/watch handle (for debug log) id int32 // ID of this /head/watch handle (for debug log)
head *Head head *Head
// established watches. // watches established over this watch link.
// XXX in-progress - where? -> (XXX no - see vvv) nowhere; here only established watches are added // XXX in-progress - where? -> (XXX no - see vvv) nowhere; here only established watches are added
// XXX -> in-progress here - so that access to new blocks after δFtail // XXX -> in-progress here - so that access to new blocks after δFtail
// was queried also send pins. // was queried also send pins.
...@@ -951,7 +950,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) { ...@@ -951,7 +950,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
delete(f.loading, blk) delete(f.loading, blk)
} }
// TODO skip retrieve/store if len(f.watches) == 0 // TODO skip retrieve/store if len(f.watchTab) == 0
// try to retrieve cache of current head/data[blk], if we got nothing from f.loading // try to retrieve cache of current head/data[blk], if we got nothing from f.loading
if blkdata == nil { if blkdata == nil {
blkdata = make([]byte, blksize) blkdata = make([]byte, blksize)
...@@ -1196,7 +1195,7 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre ...@@ -1196,7 +1195,7 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre
return return
} }
fmt.Printf("S: read #%d -> update watchers (#%d)\n", blk, len(f.watches)) fmt.Printf("S: read #%d -> update watchers (#%d)\n", blk, len(f.watchTab))
// update δFtail index // update δFtail index
bfdir := f.head.bfdir bfdir := f.head.bfdir
...@@ -1204,13 +1203,6 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre ...@@ -1204,13 +1203,6 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre
bfdir.δFtail.Track(f, blk, treepath, zblk) // XXX pass in zblk.rev here? bfdir.δFtail.Track(f, blk, treepath, zblk) // XXX pass in zblk.rev here?
bfdir.δFmu.Unlock() bfdir.δFmu.Unlock()
/* XXX kill
// associate zblk with file, if data was not hole
if zblk != nil {
zblk.bindFile(f, blk)
}
*/
// makes sure that file[blk] on clients side stays as of @w.at state. // makes sure that file[blk] on clients side stays as of @w.at state.
// try to use blkrevMax only as the first cheap criteria to skip updating watchers. // try to use blkrevMax only as the first cheap criteria to skip updating watchers.
...@@ -1220,10 +1212,10 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre ...@@ -1220,10 +1212,10 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre
blkrev := blkrevMax blkrev := blkrevMax
blkrevRough := true blkrevRough := true
// XXX locking (f.watches) // XXX locking (f.watchTab)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
for w := range f.watches { for w := range f.watchTab {
w := w w := w
fmt.Printf("S: read -> update watchers: w @%s\n", w.at) fmt.Printf("S: read -> update watchers: w @%s\n", w.at)
...@@ -1429,7 +1421,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1429,7 +1421,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX locking // XXX locking
if at == zodb.InvalidTid { if at == zodb.InvalidTid {
delete(wlink.byfile, foid) delete(wlink.byfile, foid)
delete(w.file.watches, w) delete(w.file.watchTab, w)
return nil return nil
} }
...@@ -1471,7 +1463,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1471,7 +1463,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// δFtail query and pin-sent to w. Block #3 was not yet accessed but // δFtail query and pin-sent to w. Block #3 was not yet accessed but
// was also changed after w.at . As head/file[#3] might be accessed // was also changed after w.at . As head/file[#3] might be accessed
// simultaneously to watch setup, and f.readBlk will be checking // simultaneously to watch setup, and f.readBlk will be checking
// f.watches; if w ∉ f.watches at that moment, w will miss to receive // f.watchTab; if w ∉ f.watchTab at that moment, w will miss to receive
// pin for #3. // pin for #3.
// //
// NOTE for `unpin blk` to -> @head we can be sure there won't be // NOTE for `unpin blk` to -> @head we can be sure there won't be
...@@ -1492,7 +1484,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1492,7 +1484,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// //
// XXX locking // XXX locking
// XXX register only if watch was created anew, not updated? // XXX register only if watch was created anew, not updated?
f.watches[w] = struct{}{} f.watchTab[w] = struct{}{}
wlink.byfile[foid] = w wlink.byfile[foid] = w
// XXX defer -> unregister watch if error? // XXX defer -> unregister watch if error?
...@@ -1545,7 +1537,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1545,7 +1537,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
} }
// XXX or register w to f & wlink here? // XXX or register w to f & wlink here?
// NOTE registering f.watches[w] and wlink.byfile[foid] = w must come together. // NOTE registering f.watchTab[w] and wlink.byfile[foid] = w must come together.
return nil return nil
} }
...@@ -1613,7 +1605,7 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1613,7 +1605,7 @@ func (wlink *WatchLink) _serve() (err error) {
// unregister all watches created on this wlink // unregister all watches created on this wlink
// XXX locking // XXX locking
for _, w := range wlink.byfile { for _, w := range wlink.byfile {
delete(w.file.watches, w) delete(w.file.watchTab, w)
} }
wlink.byfile = nil wlink.byfile = nil
...@@ -1996,7 +1988,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er ...@@ -1996,7 +1988,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
// FIXME: scan zfile.blktab - so that we can detect all btree changes // FIXME: scan zfile.blktab - so that we can detect all btree changes
// see "XXX building δFtail lazily ..." in notes.txt // see "XXX building δFtail lazily ..." in notes.txt
f.watches = make(map[*Watch]struct{}) f.watchTab = make(map[*Watch]struct{})
} }
return f, nil return f, nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment