Commit 77c4efc7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4a0471de
......@@ -589,246 +589,6 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er
}
// XXX move vδTBuild/vδTBuild1/Merge,Widen to after Update?
// vδTBuild builds vδT from vδZ for root/tracked=trackNew.
//
// It returns:
//
// - vδT,
// - trackNew* - a superset of trackNew accounting that potentially more keys
// become tracked during the build process.
//
// NOTE ΔBtail calls vδTBuild(root, trackNew) to compute update for ΔTtail.vδT.
func vδTBuild(root zodb.Oid, trackNew blib.PPTreeSubSet, δZtail *zodb.ΔTail, db *zodb.DB) (vδT []ΔTree, trackNew_ blib.PPTreeSubSet, err error) {
defer xerr.Contextf(&err, "root<%s>: build vδT", root)
tracefΔBtail("\nvδTBuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
tracefΔBtail("trackNew: %v\n", trackNew)
if len(trackNew) == 0 {
return nil, nil, nil
}
trackNew = trackNew.Clone() // it will become trackNew*
// go backwards and compute vδT <- treediff(lo..hi/trackNew)
vδZ := δZtail.Data()
for {
δtkeycov := &blib.RangedKeySet{} // all keys coming into tracking set during this lo<-hi scan
trackNewCur := trackNew.Clone() // trackNew adjusted as of when going to i<- entry
for i := len(vδZ)-1; i>=0; i-- {
δZ := vδZ[i]
var atPrev zodb.Tid
if i > 0 {
atPrev = vδZ[i-1].Rev
} else {
atPrev = δZtail.Tail()
}
δkv, δtrackNew, δtkeycov_, err := vδTBuild1(atPrev, δZ, trackNewCur, db)
if err != nil {
return nil, nil, err
}
if len(δkv) > 0 {
δT := ΔTree{Rev: δZ.Rev, KV: δkv}
vδTMerge1Inplace(&vδT, δT)
}
trackNewCur.ApplyΔ(δtrackNew)
δtkeycov.UnionInplace(δtkeycov_)
}
// an iteration closer to tail may turn out to add a key to the tracking set.
// We have to recheck all entries newer that revision for changes to that key,
// for example:
//
// 8 5*
// / \ <- / \
// 2 8 2* 7
//
// here initial tracked set is 5*-2*. Going to earlier revision
// 2'th keycov range is widen from [-∞,5) to [-∞,7), so 5*-7 in
// later revision have to be rechecked because 7 was added into
// tracking set.
//
// Implement this via restarting from head and cycling until
// set of tracked keys does not grow anymore.
if δtkeycov.Empty() {
break
}
err := widenTrackNew(trackNew, δtkeycov, root, δZtail.Head(), db)
if err != nil {
return nil, nil, err
}
}
tracefΔBtail("-> vδT: %v\n", vδT)
tracefΔBtail("-> trackNew*: %v\n", trackNew)
return vδT, trackNew, nil
}
// vδTBuild1 builds δT for single δZ.
//
// δtrackNew/δtkeycov represents how trackNew changes when going through `atPrev <- δZ.Rev` .
func vδTBuild1(atPrev zodb.Tid, δZ zodb.ΔRevEntry, trackNew blib.PPTreeSubSet, db *zodb.DB) (δT map[Key]ΔValue, δtrackNew *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
defer xerr.Contextf(&err, "build1 %s<-%s", atPrev, δZ.Rev)
debugfΔBtail("\n build1 @%s <- @%s\n", atPrev, δZ.Rev)
debugfΔBtail(" δZ:\t%v\n", δZ.Changev)
debugfΔBtail(" trackNew: %v\n", trackNew)
defer func() {
debugfΔBtail("-> δT: %v\n", δT)
debugfΔBtail("-> δtrackNew: %v\n", δtrackNew)
debugfΔBtail("-> δtkeycov: %v\n", δtkeycov)
debugfΔBtail("\n\n")
}()
// NOTE: keep vvv in sync with ΔBtail._Update1
δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, trackNew)
// skip opening DB connections if there is no change to this tree
if len(δtopsByRoot) == 0 {
return nil, blib.NewΔPPTreeSubSet(), &blib.RangedKeySet{}, nil
}
if len(δtopsByRoot) != 1 {
panicf("BUG: δtopsByRoot has > 1 entries: %v\ntrackNew: %v\nδZ: %v", δtopsByRoot, trackNew, δZ)
}
var root zodb.Oid
var δtops setOid
for root_, δtops_ := range δtopsByRoot {
root = root_
δtops = δtops_
}
// open ZODB connection corresponding to "current" and "prev" states
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zconnPrev, err := db.Open(ctx, &zodb.ConnOptions{At: atPrev})
if err != nil {
return nil, nil, nil, err
}
zconnCurr, err := db.Open(ctx, &zodb.ConnOptions{At: δZ.Rev})
if err != nil {
return nil, nil, nil, err
}
// diff backwards curr -> prev
δT, δtrack, δtkeycov, err := treediff(ctx, root, δtops, δZTC, trackNew, zconnCurr, zconnPrev)
if err != nil {
return nil, nil, nil, err
}
debugfΔBtail(" -> root<%s> δkv*: %v δtrack*: %v δtkeycov*: %v\n", root, δT, δtrack, δtkeycov)
for k, δv := range δT {
// the diff was backward; vδT entries are with diff forward
δv.New, δv.Old = δv.Old, δv.New
δT[k] = δv
}
return δT, δtrack, δtkeycov, nil
}
// vδTMergeInplace merges vδTnew into vδT.
//
// δrevSet indicates set of new revisions created in vδT.
// vδT is modified inplace.
func vδTMergeInplace(pvδT *[]ΔTree, vδTnew []ΔTree) (δrevSet setTid) {
// TODO if needed: optimize to go through vδT and vδTnew sequentially
δrevSet = setTid{}
for _, δT := range vδTnew {
newRevEntry := vδTMerge1Inplace(pvδT, δT)
if newRevEntry {
δrevSet.Add(δT.Rev)
}
}
return δrevSet
}
// vδTMerge1Inplace merges one δT entry into vδT.
//
// newRevEntry indicates whether δT.Rev was not there before in vδT.
// vδT is modified inplace.
func vδTMerge1Inplace(pvδT *[]ΔTree, δT ΔTree) (newRevEntry bool) {
if len(δT.KV) == 0 {
return false // δT has no change
}
vδT := *pvδT
l := len(vδT)
j := sort.Search(l, func(k int) bool {
return δT.Rev <= vδT[k].Rev
})
if j == l || vδT[j].Rev != δT.Rev {
newRevEntry = true
δTcurr := ΔTree{Rev: δT.Rev, KV: map[Key]ΔValue{}}
// insert(@j, δTcurr)
vδT = append(vδT[:j],
append([]ΔTree{δTcurr},
vδT[j:]...)...)
}
δTcurr := vδT[j]
for k, δv := range δT.KV {
δv_, already := δTcurr.KV[k]
if already {
if δv != δv_ {
// TODO: return "conflict"
panicf("[%v] inconsistent δv:\nδTcurr: %v\nδT: %v", k, δTcurr, δT)
}
} else {
δTcurr.KV[k] = δv
}
}
*pvδT = vδT
return newRevEntry
}
// widenTrackNew widens trackNew to cover δtkeycov.
func widenTrackNew(trackNew blib.PPTreeSubSet, δtkeycov *blib.RangedKeySet, root zodb.Oid, at zodb.Tid, db *zodb.DB) (err error) {
defer xerr.Contextf(&err, "widenTrackNew tree<%s> @%s +%s", root, at, δtkeycov)
debugfΔBtail("\n widenTrackNew %s @%s +%s", root, at, δtkeycov)
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zhead, err := db.Open(ctx, &zodb.ConnOptions{At: at}); /*X*/ if err != nil { return err }
xtree, err := zgetNodeOrNil(ctx, zhead, root); /*X*/ if err != nil { return err }
if xtree == nil {
// root deleted -> root node covers [-∞,∞)
trackNew.AddPath([]zodb.Oid{root})
return nil
}
tree := xtree.(*Tree) // must succeed
top := &nodeInRange{prefix: nil, keycov: blib.KeyRange{KeyMin, KeyMax}, node: tree}
V := rangeSplit{top}
for _, r := range δtkeycov.AllRanges() {
lo := r.Lo
for {
b, err := V.GetToLeaf(ctx, lo); /*X*/ if err != nil { return err }
trackNew.AddPath(b.Path())
// continue with next right bucket until r coverage is complete
if r.Hi_ <= b.keycov.Hi_ {
break
}
lo = b.keycov.Hi_ + 1
}
}
return nil
}
// Update updates δB with object-level ZODB changes.
//
// Only those objects from δZ that belong to tracked set are guaranteed to be
......@@ -1193,6 +953,247 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon
}
// ---- vδTBuild/vδTMerge ----
// vδTBuild builds vδT from vδZ for root/tracked=trackNew.
//
// It returns:
//
// - vδT,
// - trackNew* - a superset of trackNew accounting that potentially more keys
// become tracked during the build process.
//
// NOTE ΔBtail calls vδTBuild(root, trackNew) to compute update for ΔTtail.vδT.
func vδTBuild(root zodb.Oid, trackNew blib.PPTreeSubSet, δZtail *zodb.ΔTail, db *zodb.DB) (vδT []ΔTree, trackNew_ blib.PPTreeSubSet, err error) {
defer xerr.Contextf(&err, "root<%s>: build vδT", root)
tracefΔBtail("\nvδTBuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
tracefΔBtail("trackNew: %v\n", trackNew)
if len(trackNew) == 0 {
return nil, nil, nil
}
trackNew = trackNew.Clone() // it will become trackNew*
// go backwards and compute vδT <- treediff(lo..hi/trackNew)
vδZ := δZtail.Data()
for {
δtkeycov := &blib.RangedKeySet{} // all keys coming into tracking set during this lo<-hi scan
trackNewCur := trackNew.Clone() // trackNew adjusted as of when going to i<- entry
for i := len(vδZ)-1; i>=0; i-- {
δZ := vδZ[i]
var atPrev zodb.Tid
if i > 0 {
atPrev = vδZ[i-1].Rev
} else {
atPrev = δZtail.Tail()
}
δkv, δtrackNew, δtkeycov_, err := vδTBuild1(atPrev, δZ, trackNewCur, db)
if err != nil {
return nil, nil, err
}
if len(δkv) > 0 {
δT := ΔTree{Rev: δZ.Rev, KV: δkv}
vδTMerge1Inplace(&vδT, δT)
}
trackNewCur.ApplyΔ(δtrackNew)
δtkeycov.UnionInplace(δtkeycov_)
}
// an iteration closer to tail may turn out to add a key to the tracking set.
// We have to recheck all entries newer that revision for changes to that key,
// for example:
//
// 8 5*
// / \ <- / \
// 2 8 2* 7
//
// here initial tracked set is 5*-2*. Going to earlier revision
// 2'th keycov range is widen from [-∞,5) to [-∞,7), so 5*-7 in
// later revision have to be rechecked because 7 was added into
// tracking set.
//
// Implement this via restarting from head and cycling until
// set of tracked keys does not grow anymore.
if δtkeycov.Empty() {
break
}
err := widenTrackNew(trackNew, δtkeycov, root, δZtail.Head(), db)
if err != nil {
return nil, nil, err
}
}
tracefΔBtail("-> vδT: %v\n", vδT)
tracefΔBtail("-> trackNew*: %v\n", trackNew)
return vδT, trackNew, nil
}
// vδTBuild1 builds δT for single δZ.
//
// δtrackNew/δtkeycov represents how trackNew changes when going through `atPrev <- δZ.Rev` .
func vδTBuild1(atPrev zodb.Tid, δZ zodb.ΔRevEntry, trackNew blib.PPTreeSubSet, db *zodb.DB) (δT map[Key]ΔValue, δtrackNew *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
defer xerr.Contextf(&err, "build1 %s<-%s", atPrev, δZ.Rev)
debugfΔBtail("\n build1 @%s <- @%s\n", atPrev, δZ.Rev)
debugfΔBtail(" δZ:\t%v\n", δZ.Changev)
debugfΔBtail(" trackNew: %v\n", trackNew)
defer func() {
debugfΔBtail("-> δT: %v\n", δT)
debugfΔBtail("-> δtrackNew: %v\n", δtrackNew)
debugfΔBtail("-> δtkeycov: %v\n", δtkeycov)
debugfΔBtail("\n\n")
}()
// NOTE: keep vvv in sync with ΔBtail._Update1
δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, trackNew)
// skip opening DB connections if there is no change to this tree
if len(δtopsByRoot) == 0 {
return nil, blib.NewΔPPTreeSubSet(), &blib.RangedKeySet{}, nil
}
if len(δtopsByRoot) != 1 {
panicf("BUG: δtopsByRoot has > 1 entries: %v\ntrackNew: %v\nδZ: %v", δtopsByRoot, trackNew, δZ)
}
var root zodb.Oid
var δtops setOid
for root_, δtops_ := range δtopsByRoot {
root = root_
δtops = δtops_
}
// open ZODB connection corresponding to "current" and "prev" states
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zconnPrev, err := db.Open(ctx, &zodb.ConnOptions{At: atPrev})
if err != nil {
return nil, nil, nil, err
}
zconnCurr, err := db.Open(ctx, &zodb.ConnOptions{At: δZ.Rev})
if err != nil {
return nil, nil, nil, err
}
// diff backwards curr -> prev
δT, δtrack, δtkeycov, err := treediff(ctx, root, δtops, δZTC, trackNew, zconnCurr, zconnPrev)
if err != nil {
return nil, nil, nil, err
}
debugfΔBtail(" -> root<%s> δkv*: %v δtrack*: %v δtkeycov*: %v\n", root, δT, δtrack, δtkeycov)
for k, δv := range δT {
// the diff was backward; vδT entries are with diff forward
δv.New, δv.Old = δv.Old, δv.New
δT[k] = δv
}
return δT, δtrack, δtkeycov, nil
}
// vδTMergeInplace merges vδTnew into vδT.
//
// δrevSet indicates set of new revisions created in vδT.
// vδT is modified inplace.
func vδTMergeInplace(pvδT *[]ΔTree, vδTnew []ΔTree) (δrevSet setTid) {
// TODO if needed: optimize to go through vδT and vδTnew sequentially
δrevSet = setTid{}
for _, δT := range vδTnew {
newRevEntry := vδTMerge1Inplace(pvδT, δT)
if newRevEntry {
δrevSet.Add(δT.Rev)
}
}
return δrevSet
}
// vδTMerge1Inplace merges one δT entry into vδT.
//
// newRevEntry indicates whether δT.Rev was not there before in vδT.
// vδT is modified inplace.
func vδTMerge1Inplace(pvδT *[]ΔTree, δT ΔTree) (newRevEntry bool) {
if len(δT.KV) == 0 {
return false // δT has no change
}
vδT := *pvδT
l := len(vδT)
j := sort.Search(l, func(k int) bool {
return δT.Rev <= vδT[k].Rev
})
if j == l || vδT[j].Rev != δT.Rev {
newRevEntry = true
δTcurr := ΔTree{Rev: δT.Rev, KV: map[Key]ΔValue{}}
// insert(@j, δTcurr)
vδT = append(vδT[:j],
append([]ΔTree{δTcurr},
vδT[j:]...)...)
}
δTcurr := vδT[j]
for k, δv := range δT.KV {
δv_, already := δTcurr.KV[k]
if already {
if δv != δv_ {
// TODO: return "conflict"
panicf("[%v] inconsistent δv:\nδTcurr: %v\nδT: %v", k, δTcurr, δT)
}
} else {
δTcurr.KV[k] = δv
}
}
*pvδT = vδT
return newRevEntry
}
// widenTrackNew widens trackNew to cover δtkeycov.
func widenTrackNew(trackNew blib.PPTreeSubSet, δtkeycov *blib.RangedKeySet, root zodb.Oid, at zodb.Tid, db *zodb.DB) (err error) {
defer xerr.Contextf(&err, "widenTrackNew tree<%s> @%s +%s", root, at, δtkeycov)
debugfΔBtail("\n widenTrackNew %s @%s +%s", root, at, δtkeycov)
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zhead, err := db.Open(ctx, &zodb.ConnOptions{At: at}); /*X*/ if err != nil { return err }
xtree, err := zgetNodeOrNil(ctx, zhead, root); /*X*/ if err != nil { return err }
if xtree == nil {
// root deleted -> root node covers [-∞,∞)
trackNew.AddPath([]zodb.Oid{root})
return nil
}
tree := xtree.(*Tree) // must succeed
top := &nodeInRange{prefix: nil, keycov: blib.KeyRange{KeyMin, KeyMax}, node: tree}
V := rangeSplit{top}
for _, r := range δtkeycov.AllRanges() {
lo := r.Lo
for {
b, err := V.GetToLeaf(ctx, lo); /*X*/ if err != nil { return err }
trackNew.AddPath(b.Path())
// continue with next right bucket until r coverage is complete
if r.Hi_ <= b.keycov.Hi_ {
break
}
lo = b.keycov.Hi_ + 1
}
}
return nil
}
// ----------------------------------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment