Commit a8945cbf authored by Kirill Smelkov's avatar Kirill Smelkov

X Start reworking rebuild routines not to modify data inplace

This will be needed to organize concurrent rebuild/queries, so that
several rebuild jobs could be run in parallel, and only when they are
done, one particular place of code locks ΔBtail/ΔTtail and merges
rebuild results into vδT & friends.
parent 5909d6a4
......@@ -164,6 +164,12 @@ type ΔBtail struct {
//
// See ΔBtail documentation for details.
type _ΔTtail struct {
// XXX explain about snapshot mode + RCU
// ...
// XXX text from rebuild:
// clone vδT before modifying it
// queries such as SliceByRootRev return slices of vδT and we do not
// want to change data that is already returned to user.
vδT []ΔTree // changes to tree keys; rev↑. covers keys ∈ tracked subset
// // set of keys(nodes) that were requested to be tracked in this tree,
......@@ -516,6 +522,185 @@ func (δBtail *ΔBtail) rebuild1(root zodb.Oid) error {
}
// vδTBuild builds vδT from vδZ for root/tracked=trackNew.
//
// It returns:
//
// - vδT,
// - trackNew* - a superset of trackNew accounting that potentially more keys
// become tracked during the build process.
//
// NOTE ΔBtail calls vδTBuild(root, trackNew) to compute update for ΔTtail.vδT.
func vδTBuild(root zodb.Oid, trackNew blib.PPTreeSubSet, δZtail *zodb.ΔTail, db *zodb.DB) (vδT []ΔTree, trackNew_ blib.PPTreeSubSet, err error) {
defer xerr.Contextf(&err, "root<%s>: build vδT", root)
tracefΔBtail("\nvδTBuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
tracefΔBtail("trackNew: %v\n", trackNew)
if len(trackNew) == 0 {
return nil, nil, nil
}
trackNew = trackNew.Clone() // it will become trackNew*
// go backwards and compute vδT <- treediff(lo..hi/trackNew)
vδZ := δZtail.Data()
for {
δtkeycov := &blib.RangedKeySet{} // all keys coming into tracking set during this lo<-hi scan
trackNewCur := trackNew.Clone() // trackNew adjusted as of when going to i<- entry
for i := len(vδZ)-1; i>=0; i-- {
δZ := vδZ[i]
var atPrev zodb.Tid
if i > 0 {
atPrev = vδZ[i-1].Rev
} else {
atPrev = δZtail.Tail()
}
// XXX δkv instead of δT ?
δkv, δtrackNew, δtkeycov_, err := vδTBuild1(atPrev, δZ, trackNewCur, db)
if err != nil {
return nil, nil, err
}
if len(δkv) > 0 {
δT := ΔTree{Rev: δZ.Rev, KV: δkv}
vδTMerge1Inplace(&vδT, δT)
}
trackNewCur.ApplyΔ(δtrackNew)
δtkeycov.UnionInplace(δtkeycov_)
}
// an iteration closer to tail may turn out to add a key to the tracking set.
// We have to recheck all entries newer that revision for changes to that key,
// for example:
//
// 8 5*
// / \ <- / \
// 2 8 2* 7
//
// here initial tracked set is 5*-2*. Going to earlier revision
// 2'th keycov range is widen from [-∞,5) to [-∞,7), so 5*-7 in
// later revision have to be rechecked because 7 was added into
// tracking set.
//
// Implement this via restarting from head and cycling until
// set of tracked keys does not grow anymore.
if δtkeycov.Empty() {
break
}
err := widenTrackNew(trackNew, δtkeycov, root, δZtail.Head(), db)
if err != nil {
return nil, nil, err
}
}
return vδT, trackNew, nil
}
// vδTBuild1 builds δT for single δZ.
//
// δtrackNew/δtkeycov represents how trackNew changes when going through `atPrev <- δZ.Rev` .
func vδTBuild1(atPrev zodb.Tid, δZ zodb.ΔRevEntry, trackNew blib.PPTreeSubSet, db *zodb.DB) (δT map[Key]ΔValue, δtrackNew *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
defer xerr.Contextf(&err, "build1 %s<-%s", atPrev, δZ.Rev)
debugfΔBtail("\n build1 @%s <- @%s\n", atPrev, δZ.Rev)
debugfΔBtail(" δZ:\t%v\n", δZ.Changev)
debugfΔBtail(" trackNew: %v\n", trackNew)
defer func() {
debugfΔBtail("-> δT: %v\n", δT)
debugfΔBtail("-> δtrackNew: %v\n", δtrackNew)
debugfΔBtail("-> δtkeycov: %v\n", δtkeycov)
debugfΔBtail("\n\n")
}()
// NOTE: keep vvv in sync with ΔBtail._Update1
δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, trackNew)
// skip opening DB connections if there is no change to this tree
if len(δtopsByRoot) == 0 {
return nil, blib.NewΔPPTreeSubSet(), &blib.RangedKeySet{}, nil
}
if len(δtopsByRoot) != 1 {
panicf("BUG: δtopsByRoot has > 1 entries: %v\ntrackNew: %v\nδZ: %v", δtopsByRoot, trackNew, δZ)
}
var root zodb.Oid
var δtops setOid
for root_, δtops_ := range δtopsByRoot {
root = root_
δtops = δtops_
}
// open ZODB connection corresponding to "current" and "prev" states
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zconnPrev, err := db.Open(ctx, &zodb.ConnOptions{At: atPrev})
if err != nil {
return nil, nil, nil, err
}
zconnCurr, err := db.Open(ctx, &zodb.ConnOptions{At: δZ.Rev})
if err != nil {
return nil, nil, nil, err
}
// diff backwards curr -> prev
δT, δtrack, δtkeycov, err := treediff(ctx, root, δtops, δZTC, trackNew, zconnCurr, zconnPrev)
if err != nil {
return nil, nil, nil, err
}
debugfΔBtail(" -> root<%s> δkv*: %v δtrack*: %v δtkeycov*: %v\n", root, δT, δtrack, δtkeycov)
for _, δv := range δT {
// the diff was backward; vδT entries are with diff forward
δv.New, δv.Old = δv.Old, δv.New
}
return δT, δtrack, δtkeycov, nil
}
// vδTMerge1Inplace merges one δT entry into vδT.
// vδT is modified inplace.
// XXX test for vδTMerge
func vδTMerge1Inplace(pvδT *[]ΔTree, δT ΔTree) {
vδT := *pvδT
l := len(vδT)
j := sort.Search(l, func(k int) bool {
return δT.Rev <= vδT[k].Rev
})
if j == l || vδT[j].Rev != δT.Rev {
// newRevEntry = true
δTcurr := ΔTree{Rev: δT.Rev, KV: map[Key]ΔValue{}}
// insert(@j, δTcurr)
vδT = append(vδT[:j],
append([]ΔTree{δTcurr},
vδT[j:]...)...)
}
δTcurr := vδT[j]
for k, δv := range δT.KV {
δv_, already := δTcurr.KV[k]
if already {
if δv != δv_ {
panicf("[%v] inconsistent δv:\nδTcurr: %v\nδT: %v", k, δTcurr, δT)
}
} else {
δTcurr.KV[k] = δv
}
}
*pvδT = vδT
}
// rebuild rebuilds _ΔTtail taking trackNew requests into account.
//
// It returns:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment