Commit c4532d1a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 81b5056a
......@@ -806,8 +806,8 @@ retry:
// XXX no indexMu lock needed because head is Locked
δF := bfdir.δFtail.Update(δZ, zhead)
fmt.Printf("\n\nzδhandle: δF (#%d):\n", len(δF.Change))
for file, δfile := range δF.Change {
fmt.Printf("\n\nzδhandle: δF (#%d):\n", len(δF.ByFile))
for file, δfile := range δF.ByFile {
blkv := δfile.Blocks.Elements()
sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j]
......@@ -822,7 +822,7 @@ retry:
fmt.Printf("\n\n")
wg, ctx := errgroup.WithContext(context.TODO()) // XXX ctx = ?
for file, δfile := range δF.Change {
for file, δfile := range δF.ByFile {
// XXX needed?
// XXX even though δBtail is complete, not all ZBlk are present here
file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
......@@ -845,7 +845,7 @@ retry:
//
// do it after completing data invalidations.
wg, ctx = errgroup.WithContext(context.TODO()) // XXX ctx = ?
for file, δfile := range δF.Change {
for file, δfile := range δF.ByFile {
if !δfile.Size {
continue
}
......@@ -874,7 +874,7 @@ retry:
// 2. restat invalidated ZBigFile
// XXX -> parallel
// XXX locking
for file := range δF.Change {
for file := range δF.ByFile {
size, sizePath, err := file.zfile.Size(ctx)
if err != nil {
panic(err) // XXX
......@@ -1391,9 +1391,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX locking
// pin all tracked file blocks that were changed (at, head] range
fhead := tidmin(f.δtail.Head(), headAt)
for _, δfile := range f.δtail.SliceByRev(at, fhead) {
for _, blk := range δfile.Changev {
// fhead := tidmin(f.δtail.Head(), headAt)
// for _, δfile := range f.δtail.SliceByRev(at, fhead) {
for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {
for blk := range δfile.Blocks {
_, already := toPin[blk]
if already {
continue
......
......@@ -470,6 +470,9 @@ class tWatch:
#
# expectv is [] of (zf, blk, at)
# returns [] of received pin requests.
#
# XXX cancel waiting upon receiving "ok" from wcfs (-> error that missed pins were not received)
# XXX abort on timeout?
def expectPin(t, expectv):
expected = set() # of expected pin messages
for zf, blk, at in expectv:
......
......@@ -104,6 +104,7 @@ type ΔBtail struct {
// ΔB represents a change in BTrees space.
type ΔB struct {
Rev zodb.Tid
// XXX -> ByRoot?
Change map[*Tree]map[Key]Value // {} root -> {}(key, value)
}
......
......@@ -66,12 +66,16 @@ type ΔFtail struct {
// ΔFtail merge btree.ΔTail with history of ZBlk
δBtail *ΔBtail
fileIdx map[*btree.LOBTree]SetBigFile // root -> {} BigFile XXX root -> oid?
// data with δF changes. Actual for part of tracked set that was taken
// into account.
vδF []ΔF
}
// ΔF represents a change in files space.
type ΔF struct {
Rev zodb.Tid
Change map[*BigFile]*ΔFile // file -> δfile
ByFile map[*BigFile]*ΔFile // file -> δfile
}
// ΔFile represents a change to one file.
......@@ -169,7 +173,7 @@ func (δFtail *ΔFtail) Track(file *BigFile, blk int64, path []btree.LONode, zbl
// During call to Update zhead must not be otherwise used - even for reading.
func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) ΔF {
δB := δFtail.δBtail.Update(δZ)
δF := ΔF{Rev: δB.Rev, Change: make(map[*BigFile]*ΔFile)}
δF := ΔF{Rev: δB.Rev, ByFile: make(map[*BigFile]*ΔFile)}
// take btree changes into account
for root, δt := range δB.Change {
......@@ -178,10 +182,10 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) ΔF {
panicf("ΔFtail: root<%s> -> ø file", root.POid())
}
for file := range files {
δfile, ok := δF.Change[file]
δfile, ok := δF.ByFile[file]
if !ok {
δfile = &ΔFile{Rev: δF.Rev, Blocks: make(SetI64)}
δF.Change[file] = δfile
δF.ByFile[file] = δfile
}
for blk /*, zblk*/ := range δt {
// FIXME stub - need to take both keys and zblk changes into account
......@@ -215,10 +219,10 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) ΔF {
// is mutating z.infile. XXX recheck
z := obj.inΔFtail()
for file, blocks := range z.infile {
δfile, ok := δF.Change[file]
δfile, ok := δF.ByFile[file]
if !ok {
δfile = &ΔFile{Rev: δF.Rev, Blocks: make(SetI64)}
δF.Change[file] = δfile
δF.ByFile[file] = δfile
}
δfile.Blocks.Update(blocks)
......@@ -238,6 +242,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) ΔF {
runtime.KeepAlive(obj)
}
δFtail.vδF = append(δFtail.vδF, δF)
return δF
}
......@@ -262,9 +267,44 @@ func (δFtail *ΔFtail) SliceByRev(lo, hi zodb.Tid) /*readonly*/ []ΔF {
// the caller must not modify returned slice.
//
// Note: contrary to regular go slicing, low is exclusive while high is inclusive.
func (δFtail *ΔFtail) SliceByFileRev(file *BigFile, lo, hi zodb.Tid) /*readonly*/[]ΔFile {
func (δFtail *ΔFtail) SliceByFileRev(file *BigFile, lo, hi zodb.Tid) /*readonly*/[]*ΔFile {
δassertSlice(δFtail, lo, hi)
// find vδF range corresponding to (lo, hi]
// XXX linear scan
vδF := δFtail.vδF
if len(vδF) == 0 {
return nil
}
// find max j : [j].rev ≤ hi XXX linear scan -> binary search
j := len(vδF)-1
for ; j >= 0 && vδF[j].Rev > hi; j-- {}
if j < 0 {
return nil // ø
}
// find max i : [i].rev > low XXX linear scan -> binary search
i := j
for ; i >= 0 && vδF[i].Rev > lo; i-- {}
i++
vδF = vδF[i:j+1]
// filter found changed to have only file-related bits
var vδfile []*ΔFile
for _, δF := range vδF {
δfile, ok := δF.ByFile[file]
if ok {
vδfile = append(vδfile, δfile)
}
}
// XXX merge into vδF zblk from not yet handled tracked part
return vδfile
// merging tree (δT) and Zblk (δZblk) histories into file history (δFile):
// δT ────────·──────────────·─────────────────·────────────
......@@ -278,10 +318,15 @@ func (δFtail *ΔFtail) SliceByFileRev(file *BigFile, lo, hi zodb.Tid) /*readonl
//
// δFile ────────o───────o──────x─────x────────────────────────
/*
vδZ := δFtail.δBtail.δZtail.SliceByRev(lo, hi)
_ = vδZ
panic("TODO")
// XXX stub that takes only ZBlk changes into account
// XXX dumb
for _, δZ := range vδZ {
}
*/
/*
// XXX activate zfile?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment