Commit 210e9b07 authored by Kirill Smelkov's avatar Kirill Smelkov

X Fix ΔBtail.SliceByRootRev (lo,hi] handling

parent 8f715257
......@@ -209,17 +209,7 @@ func (orig *ΔBtail) Clone() *ΔBtail {
// Clone returns copy of ΔTtail.
func (orig *ΔTtail) Clone() *ΔTtail {
klon := &ΔTtail{}
klon.vδT = make([]ΔTree, 0, len(orig.vδT))
for _, origδT := range orig.vδT {
klonδT := ΔTree{
Rev: origδT.Rev,
ΔKV: make(map[Key]ΔValue, len(origδT.ΔKV)),
}
for k, δv := range origδT.ΔKV {
klonδT.ΔKV[k] = δv
}
klon.vδT = append(klon.vδT, klonδT)
}
klon.vδT = vδTClone(orig.vδT)
klon.trackNew = orig.trackNew.Clone()
klon.KVAtTail = make(map[Key]Value, len(orig.KVAtTail))
for k, v := range orig.KVAtTail {
......@@ -232,6 +222,22 @@ func (orig *ΔTtail) Clone() *ΔTtail {
return klon
}
// vδTClone returns deep copy of []ΔTree.
func vδTClone(orig []ΔTree) []ΔTree {
klon := make([]ΔTree, 0, len(orig))
for _, origδT := range orig {
klonδT := ΔTree{
Rev: origδT.Rev,
ΔKV: make(map[Key]ΔValue, len(origδT.ΔKV)),
}
for k, δv := range origδT.ΔKV {
klonδT.ΔKV[k] = δv
}
klon = append(klon, klonδT)
}
return klon
}
// (tail, head] coverage
func (δBtail *ΔBtail) Head() zodb.Tid { return δBtail.δZtail.Head() }
func (δBtail *ΔBtail) Tail() zodb.Tid { return δBtail.δZtail.Tail() }
......@@ -350,6 +356,11 @@ func (δTtail *ΔTtail) rebuild(root zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB
δrevSet = setTid{}
// clone vδT before modifying it
// queries such as SliceByRootRev return slices of vδT and we do not
// want to change data that is already returned to user.
δTtail.vδT = vδTClone(δTtail.vδT)
// go backwards and merge vδT <- treediff(lo..hi/trackNew)
vδZ := δZtail.Data()
for {
......@@ -665,6 +676,7 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e
tracefΔBtail("\n-> root<%s> δkv: %v δtrack: %v δtkeycov: %v\n", root, δT, δtrack, δtkeycov)
// XXX also needs vδT clone here?
δTtail := δBtail.vδTbyRoot[root] // must be there
if len(δT) > 0 { // an object might be resaved without change
δTtail.vδT = append(δTtail.vδT, ΔTree{Rev: δZ.Tid, ΔKV: δT})
......@@ -739,6 +751,7 @@ func (δTtail *ΔTtail) forgetPast(revCut zodb.Tid) {
}
// vδT[:icut] should be forgotten
// NOTE clones vδT because queries return vδT aliases
δTtail.vδT = append([]ΔTree(nil), δTtail.vδT[icut:]...)
}
......@@ -839,7 +852,27 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonl
return []ΔTree{}
}
// XXX dup data - because they can be further rebuilt in parallel to caller using them
return δTtail.vδT // FIXME process lo, hi
// XXX no -> dup data in rebuild, not here
vδT := δTtail.vδT
l := len(vδT)
if l == 0 {
return nil
}
// find max j : [j].rev ≤ hi XXX linear scan -> binary search
j := l - 1
for ; j >= 0 && vδT[j].Rev > hi; j-- {}
if j < 0 {
return nil // ø
}
// find max i : [i].rev > lo XXX linear scan -> binary search
i := j
for ; i >= 0 && vδT[i].Rev > lo; i-- {}
i++
return vδT[i:j+1]
}
......
......@@ -1192,6 +1192,179 @@ func TestΔBtailForget(t_ *testing.T) {
assertΔTtail(t.T, "forget ≤ at1", δbtail, t3, t.Root(), xat, t2.Δxkv, t3.Δxkv)
δbtail.ForgetPast(t3.At)
assertΔTtail(t.T, "forget ≤ at3", δbtail, t3, t.Root(), xat, )
// XXX verify no aliasing
}
func TestΔBtailSliceByRootRev(t_ *testing.T) {
// SliceByRootRev is thin wrapper to return ΔTtail.vδT slice.
// Recomputing ΔTtail.vδT itself is exercised in depth by xverifyΔBTail_rebuild.
// Here we verify only properties of the wrapper.
t := xbtreetest.NewT(t_)
X := exc.Raiseif
// ΔT is similar to ΔTree but uses Δstring instead of ΔValue for ΔKV
type ΔT struct {
Rev zodb.Tid
ΔKV map[Key]Δstring
}
// δ is shorthand for ΔKV
type δ = map[Key]Δstring
t0 := t.CommitTree("T2/B1:a-B2:f")
t1 := t.CommitTree("T2/B1:b-B2:g")
t2 := t.CommitTree("T2/B1:c-B2:h")
const a, b, c = "a", "b", "c"
const f, g, h = "f", "g", "h"
xat := map[zodb.Tid]string{
t0.At: "at0",
t1.At: "at1",
t2.At: "at2",
}
at2t := map[zodb.Tid]*xbtreetest.Commit{ // XXX -> move to treeenv ?
t0.At: t0,
t1.At: t1,
t2.At: t2,
}
δbtail := NewΔBtail(t0.At, t.DB)
_, err := δbtail.Update(t1.ΔZ); X(err)
_, err = δbtail.Update(t2.ΔZ); X(err)
// track 2 + rebuild.
_2 := setKey{}; _2.Add(2)
xtrackKeys(δbtail, t2, _2)
err = δbtail.rebuildAll(); X(err)
δttail := δbtail.vδTbyRoot[t.Root()]
// assertvδT asserts that vδT matches vδTok
assertvδT := func(subj string, vδT []ΔTree, vδTok ...ΔT) {
t.Helper()
// convert vδT from ΔTree to ΔT
var vδT_ []ΔT
for _, δT := range vδT {
tj := at2t[δT.Rev]
δt := ΔT{δT.Rev, xgetδKV(tj.Prev, tj, δT.ΔKV)}
vδT_ = append(vδT_, δt)
}
if reflect.DeepEqual(vδT_, vδTok) {
return
}
have := []string{}
for _, δT := range vδT_ {
have = append(have, fmt.Sprintf("@%s·%v", xat[δT.Rev], δT.ΔKV))
}
want := []string{}
for _, δT := range vδTok {
want = append(want, fmt.Sprintf("@%s·%v", xat[δT.Rev], δT.ΔKV))
}
t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want)
}
// zblkByName returns oid of ZBlk that has .Name == name
zblkByName := func(name string) zodb.Oid {
for oid, zblki := range t0.ZBlkTab {
if zblki.Name == name {
return oid
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid // XXX should be not needed
}
s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
s12 := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At)
s22 := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At)
vδT := δttail.vδT
assertvδT("t2.vδT", vδT, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}})
assertvδT("t2.s00", s00)
assertvδT("t2.s01", s01, ΔT{t1.At, δ{2:{f,g}}})
assertvδT("t2.s02", s02, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}})
assertvδT("t2.s12", s12, ΔT{t2.At, δ{2:{g,h}}})
assertvδT("t2.s22", s22)
// sXX should be all aliased to vδT
gg := zblkByName("g")
hh := zblkByName("h")
vδT[0].Rev = t0.At; δkv0 := vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{11:{gg,gg}}
vδT[1].Rev = t0.At; δkv1 := vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{12:{hh,hh}}
assertvδT("t2.vδT*", vδT, ΔT{t0.At, δ{11:{g,g}}}, ΔT{t0.At, δ{12:{h,h}}})
assertvδT("t2.s00*", s00)
assertvδT("t2.s01*", s01, ΔT{t0.At, δ{11:{g,g}}})
assertvδT("t2.s02*", s02, ΔT{t0.At, δ{11:{g,g}}}, ΔT{t0.At, δ{12:{h,h}}})
assertvδT("t2.s12*", s12, ΔT{t0.At, δ{12:{h,h}}})
assertvδT("2.s22*", s22)
vδT[0].Rev = t1.At; vδT[0].ΔKV = δkv0
vδT[1].Rev = t2.At; vδT[1].ΔKV = δkv1
assertvδT("t2.vδT+", vδT, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}})
assertvδT("t2.s00+", s00)
assertvδT("t2.s01+", s01, ΔT{t1.At, δ{2:{f,g}}})
assertvδT("t2.s02+", s02, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}})
assertvδT("t2.s12+", s12, ΔT{t2.At, δ{2:{g,h}}})
assertvδT("t2.s22+", s22)
// after track 1 + rebuild old slices remain unchanged, but new queries return updated data
_1 := setKey{}; _1.Add(1)
xtrackKeys(δbtail, t2, _1)
err = δbtail.rebuildAll(); X(err)
s00_ := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01_ := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02_ := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
s12_ := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At)
s22_ := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At)
vδT = δttail.vδT
assertvδT("t12.vδT", vδT, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
assertvδT("t12.s00", s00)
assertvδT("t12.s00_", s00_)
assertvδT("t12.s01", s01, ΔT{t1.At, δ{ 2:{f,g}}})
assertvδT("t12.s01_", s01_, ΔT{t1.At, δ{1:{a,b},2:{f,g}}})
assertvδT("t12.s02", s02, ΔT{t1.At, δ{ 2:{f,g}}}, ΔT{t2.At, δ{ 2:{g,h}}})
assertvδT("t12.s02_", s02_, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
assertvδT("t12.s12", s12, ΔT{t2.At, δ{ 2:{g,h}}})
assertvδT("t12.s12_", s12_, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
assertvδT("t12.s22", s22)
assertvδT("t12.s22_", s22_)
// sXX_ should be all aliased to vδT, but not sXX
bb := zblkByName("b")
cc := zblkByName("c")
vδT[0].Rev = t0.At; δkv0 = vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{111:{bb,bb}}
vδT[1].Rev = t0.At; δkv1 = vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{112:{cc,cc}}
assertvδT("t12.vδT*", vδT, ΔT{t0.At, δ{111:{b,b}}}, ΔT{t0.At, δ{112:{c,c}}})
assertvδT("t12.s00*", s00)
assertvδT("t12.s00_*", s00_)
assertvδT("t12.s01*", s01, ΔT{t1.At, δ{ 2:{f,g}}})
assertvδT("t12.s01_*", s01_, ΔT{t0.At, δ{111:{b,b} }})
assertvδT("t12.s02*", s02, ΔT{t1.At, δ{ 2:{f,g}}}, ΔT{t2.At, δ{ 2:{g,h}}})
assertvδT("t12.s02_*", s02_, ΔT{t0.At, δ{111:{b,b} }}, ΔT{t0.At, δ{112:{c,c} }})
assertvδT("t12.s12*", s12, ΔT{t2.At, δ{ 2:{g,h}}})
assertvδT("t12.s12_*", s12_, ΔT{t0.At, δ{112:{c,c} }})
assertvδT("t12.s22*", s22)
assertvδT("t12.s22_*", s22_)
vδT[0].Rev = t1.At; vδT[0].ΔKV = δkv0
vδT[1].Rev = t2.At; vδT[1].ΔKV = δkv1
assertvδT("t12.vδT+", vδT, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
assertvδT("t12.s00+", s00)
assertvδT("t12.s00_+", s00_)
assertvδT("t12.s01+", s01, ΔT{t1.At, δ{ 2:{f,g}}})
assertvδT("t12.s01_+", s01_, ΔT{t1.At, δ{1:{a,b},2:{f,g}}})
assertvδT("t12.s02+", s02, ΔT{t1.At, δ{ 2:{f,g}}}, ΔT{t2.At, δ{ 2:{g,h}}})
assertvδT("t12.s02_+", s02_, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
assertvδT("t12.s12+", s12, ΔT{t2.At, δ{ 2:{g,h}}})
assertvδT("t12.s12_+", s12_, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
assertvδT("t12.s22+", s22)
assertvδT("t12.s22_+", s22_)
}
......
......@@ -383,10 +383,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
} else {
ZinblkAt = lo
}
// XXX hack - should be in ΔBtail.SliceByRevRoot
if ZinblkAt <= lo {
it = -1 // don't go down beyond that
}
iz := len(vδZ) - 1
for (iz >= 0 || it >= 0) {
......@@ -442,11 +438,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
} else {
ZinblkAt = lo
}
// XXX hack - should be in ΔBtail.SliceByRevRoot
if ZinblkAt <= lo {
it = -1 // don't go down beyond that
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment