Commit f68398c9 authored by Kirill Smelkov's avatar Kirill Smelkov

X wcfs: Move treediff into its own file

parent b0d9653c
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package main
// diff for BTree
// FIXME the algorythm is different: recursion is implemented by expanding rangeSplit step by step.
//
// δ(BTree) notes
// ==============
//
// input: BTree, (@new, []oid) -> find out δ(BTree) i.e. {-k(v), +k'(v'), ...}
//
// - oid ∈ Bucket
// - oid ∈ BTree
//
// Bucket:
//
// old = {k -> v}
// new = {k' -> v'}
//
// Δ = -k(v), +k(v), ...
//
// => for all buckets
//
// Δ accumulates to []δk(v)[n+,n-] n+ ∈ {0,1}, n- ∈ {0,1}, if n+=n- - cancel
//
//
// BTree:
//
// old = {k -> B} or {k -> T}
// new = {k' -> B'} or {k' -> T'}
//
// Δ = -k(B), +k(B), -k(T), +K(T), ...
//
// we translate (in top-down order):
//
// k(B) -> {} of k(v)
// k(T) -> {} of k(B) -> {} of k(v)
//
// which gives
//
// Δ = k(v), +k(v), ...
//
// i.e. exactly as for buckets and it accumulates to global Δ.
//
// The globally-accumulated Δ is the answer for δ(BTree, (@new, []oid))
//
// top-down order is obtained via toposort({oid}) wrt visited PathSet.
//
// δ(BTree) in wcfs context:
//
// . -k(blk) -> invalidate #blk
// . +k(blk) -> invalidate #blk (e.g. if blk was previously read as hole)
import (
"context"
"fmt"
"reflect"
"sort"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/zodb"
)
// treeSetKey represents ordered set of keys.
// it can be point-queried and range-accessed.
// TODO -> btree
type treeSetKey struct {
SetKey
}
// InRange returns
func (hi treeSetKey) GetInRange(lo, hi_ Key) SetKey {
// FIXME dumb O(n) -> TODO use cznic/b
ret := SetKey{}
for k := range hi.SetKey {
if lo <= k && k <= hi_ {
ret.Add(k)
}
}
return ret
}
// δZConnectTracked computes connected closure of δZ/T.
//
// δZ - all changes in a ZODB transaction.
// δZ/T - subset of those changes intersecting with tracking set.
// δZ/TC - connected closure for δZ/T
//
// for example for e.g. t₀->t₁->b₂ if δZ/T={t₀ b₂} -> δZ/TC=δZ/T+{t₁}
//
// δtopsByRoot = {} root -> {top changed nodes in that tree}
func δZConnectTracked(δZv []zodb.Oid, T PPTreeSubSet) (δZTC SetOid, δtopsByRoot map[zodb.Oid]SetOid) {
δZ := SetOid{}; for _, δ := range δZv { δZ.Add(δ) }
δZTC = SetOid{}
δtopsByRoot = map[zodb.Oid]SetOid{}
for δ := range δZ {
track, ok := T[δ]
if !ok {
continue // not tracked at all
}
δZTC.Add(δ)
// go up by .parent till root or till another tracked node in the tree
// if root -> δtopsByRoot[root] += δ
// if !root -> δZTC += path through which we reached another node (forming connection)
path := []zodb.Oid{}
node := δ
parent := track.parent
for {
// reached root
if parent == zodb.InvalidOid {
root := node
δtops, ok := δtopsByRoot[root]
if !ok {
δtops = SetOid{}
δtopsByRoot[root] = δtops
}
δtops.Add(δ)
break
}
// reached another tracked node
if δZ.Has(parent) {
for _, δp := range path {
δZTC.Add(δp)
}
break
}
path = append(path, parent)
trackUp, ok := T[parent]
if !ok {
panicf("BUG: .p%s -> %s, but %s is not tracked", node, parent, parent)
}
node = parent
parent = trackUp.parent
}
}
return δZTC, δtopsByRoot
}
// XXX place
// nodeInRange represents a Node coming under [lo, hi_] key range in its tree.
type nodeInRange struct {
prefix []zodb.Oid // path to this node goes via this objects
lo, hi_ Key // [lo, hi_] NOTE _not_ hi) not to overflow at ∞
node Node
done bool // whether this node was already taken into account while computing diff
}
// XXX place, doc
/*
func (n *nodeInRange) NodePath() []Node {
path := []Node{}
for n != nil {
path = append([]Node{n.node}, path...)
n = n.parent
}
return path
}
*/
func (n *nodeInRange) Path() []zodb.Oid {
return append(n.prefix, n.node.POid())
}
// rangeSplit represents set of nodes covering a range.
// nodes come with key↑ and no intersection in between their [lo,hi)
type rangeSplit []*nodeInRange // key↑
// Get returns node covering key k.
// Get panics if k is not covered.
func (rs rangeSplit) Get(k Key) *nodeInRange {
rnode, ok := rs.Get_(k)
if !ok {
panicf("key %v not covered; coverage: %s", k, rs)
}
return rnode
}
// Get_ returns node covering key k.
func (rs rangeSplit) Get_(k Key) (rnode *nodeInRange, ok bool) {
i := sort.Search(len(rs), func(i int) bool {
return k <= rs[i].hi_
})
if i == len(rs) {
return nil, false // key not covered
}
rn := rs[i]
if !(rn.lo <= k && k <= rn.hi_) {
panicf("BUG: get(%v) -> %s; coverage: %s", k, rn, rs)
}
return rn, true
}
// Expand replaces rnode with its children.
//
// rnode must be initially in *prs.
// rnode.node must be tree.
// rnode.node must be already activated.
//
// inserted children are returned for convenience.
func (prs *rangeSplit) Expand(rnode *nodeInRange) (children rangeSplit) {
rs := *prs
i := sort.Search(len(rs), func(i int) bool {
return rnode.hi_ <= rs[i].hi_
})
if i == len(rs) || rs[i] != rnode {
panicf("%s not in rangeSplit; coverage: %s", rnode, rs)
}
// [i].Key ≤ [i].Child.*.Key < [i+1].Key i ∈ [0, len([]))
//
// [0].Key = -∞ ; always returned so
// [len(ev)].Key = +∞ ; should be assumed so
tree := rnode.node.(*Tree)
treev := tree.Entryv()
children = make(rangeSplit, 0, len(treev)+1)
for i := range treev {
lo := rnode.lo
if i > 0 {
lo = treev[i].Key()
}
hi_ := rnode.hi_
if i < len(treev)-1 {
hi_ = treev[i+1].Key()-1 // NOTE -1 because it is hi_] not hi)
}
children = append(children, &nodeInRange{
prefix: rnode.Path(),
lo: lo,
hi_: hi_,
node: treev[i].Child(),
})
}
// del[i]; insert(@i, children)
*prs = append(rs[:i], append(children, rs[i+1:]...)...)
return children
}
// GetToLeaf returns leaf node corresponding to key k.
//
// Leaf is usually bucket node, but, in the sole single case of empty tree, can be that root tree node.
// GetToLeaf expands step-by-step every tree through which it has to traverse to next depth level.
//
// GetToLeaf panics if k is not covered.
// XXX also return path?
func (prs *rangeSplit) GetToLeaf(ctx context.Context, k Key) (*nodeInRange, error) {
rnode, ok, err := prs.GetToLeaf_(ctx, k)
if err == nil && !ok {
panicf("key %v not covered; coverage: %s", k, *prs)
}
return rnode, err
}
// GetToLeaf_ is comma-ok version of GetToLeaf.
func (prs *rangeSplit) GetToLeaf_(ctx context.Context, k Key) (rnode *nodeInRange, ok bool, err error) {
rnode, ok = prs.Get_(k)
if !ok {
return nil, false, nil // key not covered
}
for {
switch rnode.node.(type) {
// bucket = leaf
case *Bucket:
return rnode, true, nil
}
// its tree -> activate to expand; check for ø case
tree := rnode.node.(*Tree)
err = tree.PActivate(ctx)
if err != nil {
return nil, false, err
}
defer tree.PDeactivate()
// empty tree -> don't expand - it is already leaf
if len(tree.Entryv()) == 0 {
return rnode, true, nil
}
// expand tree children
children := prs.Expand(rnode)
rnode = children.Get(k) // k must be there
}
}
func (rs rangeSplit) String() string {
if len(rs) == 0 {
return "ø"
}
s := ""
for _, rn := range rs {
if s != "" {
s += " "
}
s += fmt.Sprintf("%s", rn)
}
return s
}
// treediff computes δT/δtrack for tree/trackSet specified by root in between old..new.
//
// δtops is set of top nodes for changed subtrees.
// δZTC is connected(δZ/T) - connected closure for subset of δZ(old..new) that
// touches tracked nodes of T.
//
// XXX holeIdx is updated XXX -> return similarly to δtrack
// XXX ^^^ -> but better kill holeIdx and do everything only via trackSet
func treediff(ctx context.Context, root zodb.Oid, δtops SetOid, δZTC SetOid, trackSet PPTreeSubSet, holeIdx treeSetKey, zconnOld, zconnNew *zodb.Connection) (δT map[Key]ΔValue, δtrack *ΔPPTreeSubSet, err error) {
defer xerr.Contextf(&err, "treediff %s..%s %s", zconnOld.At(), zconnNew.At(), root)
tracef("\ntreediff %s δtops: %v δZTC: %v\n", root, δtops, δZTC)
defer tracef("\n")
δT = map[Key]ΔValue{}
δtrackv := []*ΔPPTreeSubSet{}
for top := range δtops { // XXX -> sorted?
a, err1 := zgetNode(ctx, zconnOld, top)
b, err2 := zgetNode(ctx, zconnNew, top)
err := xerr.Merge(err1, err2)
if err != nil {
return nil, nil, err
}
δtop, δtrackTop, err := diffX(ctx, a, b, δZTC, trackSet, holeIdx)
if err != nil {
return nil, nil, err
}
// FIXME -> merge (VDEL vs add)
// XXX no - not needed here - keys cannot migrate in between two disconnected subtrees
// -> assert that keys from different δtop do not overlap
// DEL k -> Tkextra += k
// +k -> Tkextra -= k
tracef("-> δtop: %v\n", δtop)
tracef("-> δtrackTop: %v\n", δtrackTop)
for k,δv := range δtop {
δT[k] = δv
}
δtrackv = append(δtrackv, δtrackTop)
}
// adjust holeIdx
for k, δv := range δT {
if δv.Old == VDEL {
holeIdx.Del(k)
}
if δv.New == VDEL {
holeIdx.Add(k)
}
}
// adjust trackSet by merge(δtrackTops)
δtrack = &ΔPPTreeSubSet{Del: PPTreeSubSet{}, Add: PPTreeSubSet{}, δnchildNonLeafs: map[zodb.Oid]int{}}
for _, δ := range δtrackv {
δtrack.Update(δ)
}
return δT, δtrack, nil
}
// diffX computes difference in between two revisions of a tree's subtree.
//
// a, b point to top of the subtree @old and @new revisions and must be of the
// same type - tree or bucket.
//
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
//
// a/b can be nil; a=nil means addition, b=nil means deletion.
//
// δtrack is trackSet δ that needs to be applied to trackSet to keep it
// consistent with b (= a + δ).
func diffX(ctx context.Context, a, b Node, δZTC SetOid, trackSet PPTreeSubSet, holeIdx treeSetKey) (δ map[Key]ΔValue, δtrack *ΔPPTreeSubSet, err error) {
if a==nil && b==nil {
panic("BUG: both a & b == nil")
}
var aT, bT *Tree
var aB, bB *Bucket
isT := false
if a != nil {
aT, isT = a.(*Tree)
aB, _ = a.(*Bucket)
if aT == nil && aB == nil {
panicf("a: bad type %T", a)
}
}
if b != nil {
bT, isT = b.(*Tree)
bB, _ = b.(*Bucket)
if bT == nil && bB == nil {
panicf("b: bad type %T", b)
}
}
if a != nil && b != nil {
if a.POid() != b.POid() {
panicf("BUG: a.oid != b.oid ; a: %s b: %s", a.POid(), b.POid())
}
if !((aT != nil && bT != nil) || (aB != nil && bB != nil)) {
return nil, nil, fmt.Errorf("object %s: type mutated %s -> %s", a.POid(),
zodb.ClassOf(a), zodb.ClassOf(b))
}
}
if isT {
return diffT(ctx, aT, bT, δZTC, trackSet, holeIdx)
} else {
var δtrack *ΔPPTreeSubSet
δ, err := diffB(ctx, aB, bB)
if δ != nil {
δtrack = &ΔPPTreeSubSet{}
}
return δ, δtrack, err
}
}
// diffT computes difference in between two subtrees.
//
// a, b point to top of subtrees @old and @new revisions.
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
func diffT(ctx context.Context, A, B *Tree, δZTC SetOid, trackSet PPTreeSubSet, holeIdx treeSetKey) (δ map[Key]ΔValue, δtrack *ΔPPTreeSubSet, err error) {
tracef(" diffT %s %s\n", xidOf(A), xidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xidOf(A), xidOf(B))
if A == nil { panic("A is nil") }
if B == nil { panic("B is nil") }
δ = map[Key]ΔValue{}
δtrack = &ΔPPTreeSubSet{Del: PPTreeSubSet{}, Add: PPTreeSubSet{}, δnchildNonLeafs: map[zodb.Oid]int{}}
defer tracef(" -> δ: %v\n", δ)
// path prefix to A and B
prefix := []zodb.Oid{}
t := trackSet[A.POid()]
for t.parent != zodb.InvalidOid {
prefix = append([]zodb.Oid{t.parent}, prefix...)
t = trackSet[t.parent]
}
// initial split ranges for A and B
// XXX maybe walk till a from root to get more precise initial range?
atop := &nodeInRange{prefix: prefix, lo: KeyMin, hi_: KeyMax, node: A} // [-∞, ∞)
btop := &nodeInRange{prefix: prefix, lo: KeyMin, hi_: KeyMax, node: B} // [-∞, ∞)
Av := rangeSplit{atop} // nodes expanded from A
Bv := rangeSplit{btop} // nodes expanded from B
// for phase 2:
Akqueue := SetKey{} // queue for keys in A to be processed for δ-
Bkqueue := SetKey{} // ----//---- in B for δ+
Akdone := SetKey{} // already processed keys in A
Bkdone := SetKey{} // ----//---- in B
Aktodo := func(k Key) {
if !Akdone.Has(k) {
tracef(" Akq <- %d\n", k)
Akqueue.Add(k)
}
}
Bktodo := func(k Key) {
if !Bkdone.Has(k) {
tracef(" Bkq <- %d\n", k)
Bkqueue.Add(k)
}
}
// {} oid -> parent for all nodes in Bv: current and previously expanded - up till top B
// XXX requires A.oid == B.oid
BtrackSet := PPTreeSubSet{}
BtrackSet.AddPath(trackSet.Path(B.POid()))
// phase 1: expand A top->down driven by δZTC.
// by default a node contributes to δ-
// a node ac does not contribute to δ- and can be skipped, if:
// - ac is not tracked, or
// - ac ∉ δZTC && ∃ bc from B: ac.oid == bc.oid (ac+ac.children were not changed, and ac stays in the tree)
Aq := []*nodeInRange{atop} // queue for A nodes that contribute to δ-
for len(Aq) > 0 {
tracef("\n")
tracef(" aq: %v\n", Aq)
tracef(" av: %s\n", Av)
tracef(" bv: %s\n", Bv)
ra := pop(&Aq)
err = ra.node.PActivate(ctx); /*X*/if err != nil { return nil,nil, err }
defer ra.node.PDeactivate()
tracef(" a: %s\n", ra)
switch a := ra.node.(type) {
case *Bucket:
// a is bucket -> δ-
δA, err := diffB(ctx, a, nil); /*X*/if err != nil { return nil,nil, err }
err = δMerge(δ, δA); /*X*/if err != nil { return nil,nil, err }
δtrack.Del.AddPath(ra.Path())
// Bkqueue <- δA
for k := range δA {
Akdone.Add(k)
Bktodo(k)
}
// Bkqueue <- holes(ra.range)
for k := range holeIdx.GetInRange(ra.lo, ra.hi_) {
Bktodo(k)
}
ra.done = true
case *Tree:
// empty tree - only queue holes covered by it
if len(a.Entryv()) == 0 {
for k := range holeIdx.GetInRange(ra.lo, ra.hi_) {
Bktodo(k)
}
continue
}
// a is !empty tree - expand it and queue children
// check for each children whether it can be skipped
achildren := Av.Expand(ra)
for _, ac := range achildren {
acOid := ac.node.POid()
at, tracked := trackSet[acOid]
if !tracked && /*cannot skip embedded bucket:*/acOid != zodb.InvalidOid {
continue
}
if !δZTC.Has(acOid) && /*cannot skip embedded bucket:*/acOid != zodb.InvalidOid {
// check B children for node with ac.oid
// while checking expand Bv till ac.lo and ac.hi_ point to the same node
// ( this does not give exact answer but should be a reasonable heuristic;
// the diff is the same if heuristic does not work and we
// look into and load more nodes to compute δ )
_, found := BtrackSet[acOid]
if !found {
for {
blo := Bv.Get(ac.lo)
bhi_ := Bv.Get(ac.hi_)
if blo != bhi_ {
break
}
_, ok := blo.node.(*Tree)
if !ok {
break // bucket
}
err = blo.node.PActivate(ctx); /*X*/if err != nil { return nil,nil, err }
defer blo.node.PDeactivate()
// XXX check for empty tree?
bchildren := Bv.Expand(blo)
for _, bc := range bchildren {
bcOid := bc.node.POid()
BtrackSet.AddPath(bc.Path())
if acOid == bcOid {
found = true
}
}
if found {
break
}
}
}
if found {
// ac can be skipped
// XXX Bkqueue <- holes(ac.range \ bc.range) XXX test for this
// adjust trackSet since path to the node could have changed
apath := trackSet.Path(acOid)
bpath := BtrackSet.Path(acOid)
if !pathEqual(apath, bpath) {
δtrack.Del.AddPath(apath)
δtrack.Add.AddPath(bpath)
if nc := at.nchild; nc != 0 {
δtrack.δnchildNonLeafs[acOid] = nc
}
}
continue
}
}
// ac cannot be skipped
push(&Aq, ac)
}
}
}
// phase 2: reach consistency in between A and B.
// Every key removed in A has to be checked for whether it is present
// in B and contribute to δ+. In B, in turn, adding that key can add
// other keys to δ+. Those keys, in turn, have to be checked for
// whether they were present in A and contribute to δ-. For example:
//
// [ 2 4 ] [ 3 5 ]
// ↓ ↓ ↓ ↓ ↓ ↓
// |1| |23| |45| |12| |34| |56|
//
// if values for all keys change, tracked={1}, change to 1 adds
// * -B1, which queues B.1 and leads to
// * +B12, which queues A.2 and leads to
// * -B23, which queues B.3 and leads to
// * +B23, ...
//
// XXX inefficient: we process each key separately, while they can be
// processed in sorted batches.
tracef("\nphase 2:\n")
for {
tracef("\n")
tracef(" av: %s\n", Av)
tracef(" bv: %s\n", Bv)
tracef("\n")
tracef(" Bkq: %s\n", Bkqueue)
if len(Bkqueue) == 0 {
break
}
for k := range Bkqueue {
b, err := Bv.GetToLeaf(ctx, k); /*X*/if err != nil { return nil,nil, err }
tracef(" B k%d -> %s\n", k, b)
// +bucket if that bucket is reached for the first time
if !b.done {
var δB map[Key]ΔValue
bbucket, ok := b.node.(*Bucket)
if ok { // !ok means ø tree
δB, err = diffB(ctx, nil, bbucket); /*X*/if err != nil { return nil,nil, err }
}
// δ <- δB
err = δMerge(δ, δB); /*X*/if err != nil { return nil,nil, err }
δtrack.Add.AddPath(b.Path())
// Akqueue <- δB
for k_ := range δB {
Bkdone.Add(k_)
Aktodo(k_)
}
b.done = true
}
// XXX k is not there -> hole XXX test
}
Bkqueue = SetKey{}
tracef("\n")
tracef(" Akq: %s\n", Akqueue)
for k := range Akqueue {
a, err := Av.GetToLeaf(ctx, k); /*X*/if err != nil { return nil,nil, err }
tracef(" A k%d -> %s\n", k, a)
// -bucket if that bucket is reached for the first time
if !a.done {
var δA map[Key]ΔValue
abucket, ok := a.node.(*Bucket)
if ok { // !ok means ø tree
δA, err = diffB(ctx, abucket, nil); /*X*/if err != nil { return nil,nil, err }
}
// δ <- δA
err = δMerge(δ, δA); /*X*/if err != nil { return nil,nil, err }
δtrack.Del.AddPath(a.Path())
// Bkqueue <- δA
for k_ := range δA {
Akdone.Add(k_)
Bktodo(k_)
}
// Bkqueue <- holes(a.range)
for k_ := range holeIdx.GetInRange(a.lo, a.hi_) {
Bktodo(k_)
}
a.done = true
}
}
Akqueue = SetKey{}
}
return δ, δtrack, nil
}
// δMerge merges changes from δ2 into δ.
// δ is total-building diff, while δ2 is diff from comparing some subnodes.
func δMerge(δ, δ2 map[Key]ΔValue) error {
tracef(" δmerge %v <- %v\n", δ, δ2)
defer tracef(" -> %v\n", δ)
// merge δ <- δ2
for k, δv2 := range δ2 {
δv1, already := δ[k]
if !already {
δ[k] = δv2
continue
}
// both δ and δ2 has [k] - it can be that key
// entry migrated from one bucket into another.
// XXX also handle ø->ø + ø->c i.e. [k] was hole and become value
if !( (δv1.New == VDEL && δv2.Old == VDEL) ||
(δv1.Old == VDEL && δv2.New == VDEL) ) {
return fmt.Errorf("BUG or btree corrupt: [%v] has " +
"duplicate entries: %v, %v", k, δv1, δv2)
}
δv := ΔValue{}
switch {
// x -> ø | ø -> x
// ø -> ø
case δv2.Old == VDEL && δv2.New == VDEL: // δv2 == hole
δv = δv1
// ø -> ø
// y -> ø | ø -> y
case δv1.Old == VDEL && δv1.New == VDEL: // δv1 == hole
δv = δv2
// ø -> x -> y->x
// y -> ø
case δv2.New == VDEL:
δv.Old = δv2.Old
δv.New = δv1.New
// x -> ø -> x->y
// ø -> y
default:
δv.Old = δv1.Old
δv.New = δv2.New
}
tracef(" [%v] merge %s %s -> %s\n", k, δv1, δv2, δv)
if δv.Old != δv.New {
δ[k] = δv
} else {
delete(δ, k) // NOTE also annihilates hole migration
}
}
return nil
}
// diffB computes difference in between two buckets.
// see diffX for details.
func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
tracef(" diffB %s %s\n", xidOf(a), xidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xidOf(a), xidOf(b))
// XXX oid can be InvalidOid for T/B... (i.e. B is part of T and is not yet committed separately)
var av []BucketEntry
var bv []BucketEntry
if a != nil {
err = a.PActivate(ctx); if err != nil { return nil, err }
defer a.PDeactivate()
av = a.Entryv() // key↑
}
if b != nil {
err = b.PActivate(ctx); if err != nil { return nil, err }
defer b.PDeactivate()
bv = b.Entryv() // key↑
}
δ = map[Key]ΔValue{}
defer tracef(" -> δb: %v\n", δ)
//tracef(" av: %v", av)
//tracef(" bv: %v", bv)
for len(av) > 0 || len(bv) > 0 {
ka, va := KeyMax, VDEL
kb, vb := KeyMax, VDEL
if len(av) > 0 {
ka = av[0].Key()
va, err = vOid(av[0].Value())
if err != nil {
return nil, fmt.Errorf("a[%v]: %s", ka, err)
}
}
if len(bv) > 0 {
kb = bv[0].Key()
vb, err = vOid(bv[0].Value())
if err != nil {
return nil, fmt.Errorf("b[%v]: %s", kb, err)
}
}
switch {
case ka < kb: // -a[0]
δ[ka] = ΔValue{va, VDEL}
av = av[1:]
case ka > kb: // +b[0]
δ[kb] = ΔValue{VDEL, vb}
bv = bv[1:]
// ka == kb // va->vb
default:
if va != vb {
δ[ka] = ΔValue{va, vb}
}
av = av[1:]
bv = bv[1:]
}
}
return δ, nil
}
// zgetNode returns btree node corresponding to zconn.Get(oid) .
func zgetNode(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ Node, err error) {
defer xerr.Contextf(&err, "getnode %s@%s", oid, zconn.At())
xnode, err := zconn.Get(ctx, oid)
if err != nil {
return nil, err
}
node, ok := xnode.(Node)
if !ok {
return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xnode))
}
return node, nil
}
// vOid returns OID of a value object.
// it is an error if value is not persistent object.
func vOid(xvalue interface{}) (zodb.Oid, error) {
value, ok := xvalue.(zodb.IPersistent)
if !ok {
return zodb.InvalidOid, fmt.Errorf("%T is not a persitent object", xvalue)
}
return value.POid(), nil
}
// xidOf return string representation of object xid.
func xidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
func (rn nodeInRange) String() string {
slo := "-∞"; if rn.lo > KeyMin { slo = fmt.Sprintf("%v", rn.lo) }
shi := "∞"; if rn.hi_ < KeyMax { shi = fmt.Sprintf("%v", rn.hi_+1) }
done := " "; if rn.done { done = "*" }
return fmt.Sprintf("%s[%s,%s)%s", done, slo, shi, vnode(rn.node))
}
// push pushes element to node stack.
func push(nodeStk *[]*nodeInRange, top *nodeInRange) {
*nodeStk = append(*nodeStk, top)
}
// pop pops top element from node stack.
func pop(nodeStk *[]*nodeInRange) *nodeInRange {
stk := *nodeStk
l := len(stk)
top := stk[l-1]
*nodeStk = stk[:l-1]
return top
}
// pathEqual returns whether two paths are the same.
func pathEqual(patha, pathb []zodb.Oid) bool {
if len(patha) != len(pathb) {
return false
}
for i, a := range patha {
if pathb[i] != a {
return false
}
}
return true
}
const debug = false
func tracef(format string, argv ...interface{}) {
if debug {
fmt.Printf(format, argv...)
}
}
......@@ -22,63 +22,11 @@ package main
// TODO move -> btree when ΔTail matures.
// XXX doc
// FIXME the algorythm is different: recursion is implemented by expanding rangeSplit step by step.
//
// δ(BTree) notes
// ==============
//
// input: BTree, (@new, []oid) -> find out δ(BTree) i.e. {-k(v), +k'(v'), ...}
//
// - oid ∈ Bucket
// - oid ∈ BTree
//
// Bucket:
//
// old = {k -> v}
// new = {k' -> v'}
//
// Δ = -k(v), +k(v), ...
//
// => for all buckets
//
// Δ accumulates to []δk(v)[n+,n-] n+ ∈ {0,1}, n- ∈ {0,1}, if n+=n- - cancel
//
//
// BTree:
//
// old = {k -> B} or {k -> T}
// new = {k' -> B'} or {k' -> T'}
//
// Δ = -k(B), +k(B), -k(T), +K(T), ...
//
// we translate (in top-down order):
//
// k(B) -> {} of k(v)
// k(T) -> {} of k(B) -> {} of k(v)
//
// which gives
//
// Δ = k(v), +k(v), ...
//
// i.e. exactly as for buckets and it accumulates to global Δ.
//
// The globally-accumulated Δ is the answer for δ(BTree, (@new, []oid))
//
// top-down order is obtained via toposort({oid}) wrt visited PathSet.
//
// δ(BTree) in wcfs context:
//
// . -k(blk) -> invalidate #blk
// . +k(blk) -> invalidate #blk (e.g. if blk was previously read as hole)
//go:generate ./gen-set main Oid Oid zset_oid.go
import (
"context"
"fmt"
"math"
"reflect"
"sort"
"strings"
......@@ -215,25 +163,6 @@ type ΔTree struct {
}
// treeSetKey represents ordered set of keys.
// it can be point-queried and range-accessed.
// TODO -> btree
type treeSetKey struct {
SetKey
}
// InRange returns
func (hi treeSetKey) GetInRange(lo, hi_ Key) SetKey {
// FIXME dumb O(n) -> TODO use cznic/b
ret := SetKey{}
for k := range hi.SetKey {
if lo <= k && k <= hi_ {
ret.Add(k)
}
}
return ret
}
// NewΔBtail creates new empty ΔBtail object.
//
// Initial tracked set is empty.
......@@ -562,779 +491,6 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
return δB, nil
}
// δZConnectTracked computes connected closure of δZ/T.
//
// δZ - all changes in a ZODB transaction.
// δZ/T - subset of those changes intersecting with tracking set.
// δZ/TC - connected closure for δZ/T
//
// for example for e.g. t₀->t₁->b₂ if δZ/T={t₀ b₂} -> δZ/TC=δZ/T+{t₁}
//
// δtopsByRoot = {} root -> {top changed nodes in that tree}
func δZConnectTracked(δZv []zodb.Oid, T PPTreeSubSet) (δZTC SetOid, δtopsByRoot map[zodb.Oid]SetOid) {
δZ := SetOid{}; for _, δ := range δZv { δZ.Add(δ) }
δZTC = SetOid{}
δtopsByRoot = map[zodb.Oid]SetOid{}
for δ := range δZ {
track, ok := T[δ]
if !ok {
continue // not tracked at all
}
δZTC.Add(δ)
// go up by .parent till root or till another tracked node in the tree
// if root -> δtopsByRoot[root] += δ
// if !root -> δZTC += path through which we reached another node (forming connection)
path := []zodb.Oid{}
node := δ
parent := track.parent
for {
// reached root
if parent == zodb.InvalidOid {
root := node
δtops, ok := δtopsByRoot[root]
if !ok {
δtops = SetOid{}
δtopsByRoot[root] = δtops
}
δtops.Add(δ)
break
}
// reached another tracked node
if δZ.Has(parent) {
for _, δp := range path {
δZTC.Add(δp)
}
break
}
path = append(path, parent)
trackUp, ok := T[parent]
if !ok {
panicf("BUG: .p%s -> %s, but %s is not tracked", node, parent, parent)
}
node = parent
parent = trackUp.parent
}
}
return δZTC, δtopsByRoot
}
// XXX place
// nodeInRange represents a Node coming under [lo, hi_] key range in its tree.
type nodeInRange struct {
prefix []zodb.Oid // path to this node goes via this objects
lo, hi_ Key // [lo, hi_] NOTE _not_ hi) not to overflow at ∞
node Node
done bool // whether this node was already taken into account while computing diff
}
// XXX place, doc
/*
func (n *nodeInRange) NodePath() []Node {
path := []Node{}
for n != nil {
path = append([]Node{n.node}, path...)
n = n.parent
}
return path
}
*/
func (n *nodeInRange) Path() []zodb.Oid {
return append(n.prefix, n.node.POid())
}
// rangeSplit represents set of nodes covering a range.
// nodes come with key↑ and no intersection in between their [lo,hi)
type rangeSplit []*nodeInRange // key↑
// Get returns node covering key k.
// Get panics if k is not covered.
func (rs rangeSplit) Get(k Key) *nodeInRange {
rnode, ok := rs.Get_(k)
if !ok {
panicf("key %v not covered; coverage: %s", k, rs)
}
return rnode
}
// Get_ returns node covering key k.
func (rs rangeSplit) Get_(k Key) (rnode *nodeInRange, ok bool) {
i := sort.Search(len(rs), func(i int) bool {
return k <= rs[i].hi_
})
if i == len(rs) {
return nil, false // key not covered
}
rn := rs[i]
if !(rn.lo <= k && k <= rn.hi_) {
panicf("BUG: get(%v) -> %s; coverage: %s", k, rn, rs)
}
return rn, true
}
// Expand replaces rnode with its children.
//
// rnode must be initially in *prs.
// rnode.node must be tree.
// rnode.node must be already activated.
//
// inserted children are returned for convenience.
func (prs *rangeSplit) Expand(rnode *nodeInRange) (children rangeSplit) {
rs := *prs
i := sort.Search(len(rs), func(i int) bool {
return rnode.hi_ <= rs[i].hi_
})
if i == len(rs) || rs[i] != rnode {
panicf("%s not in rangeSplit; coverage: %s", rnode, rs)
}
// [i].Key ≤ [i].Child.*.Key < [i+1].Key i ∈ [0, len([]))
//
// [0].Key = -∞ ; always returned so
// [len(ev)].Key = +∞ ; should be assumed so
tree := rnode.node.(*Tree)
treev := tree.Entryv()
children = make(rangeSplit, 0, len(treev)+1)
for i := range treev {
lo := rnode.lo
if i > 0 {
lo = treev[i].Key()
}
hi_ := rnode.hi_
if i < len(treev)-1 {
hi_ = treev[i+1].Key()-1 // NOTE -1 because it is hi_] not hi)
}
children = append(children, &nodeInRange{
prefix: rnode.Path(),
lo: lo,
hi_: hi_,
node: treev[i].Child(),
})
}
// del[i]; insert(@i, children)
*prs = append(rs[:i], append(children, rs[i+1:]...)...)
return children
}
// GetToLeaf returns leaf node corresponding to key k.
//
// Leaf is usually bucket node, but, in the sole single case of empty tree, can be that root tree node.
// GetToLeaf expands step-by-step every tree through which it has to traverse to next depth level.
//
// GetToLeaf panics if k is not covered.
// XXX also return path?
func (prs *rangeSplit) GetToLeaf(ctx context.Context, k Key) (*nodeInRange, error) {
rnode, ok, err := prs.GetToLeaf_(ctx, k)
if err == nil && !ok {
panicf("key %v not covered; coverage: %s", k, *prs)
}
return rnode, err
}
// GetToLeaf_ is comma-ok version of GetToLeaf.
func (prs *rangeSplit) GetToLeaf_(ctx context.Context, k Key) (rnode *nodeInRange, ok bool, err error) {
rnode, ok = prs.Get_(k)
if !ok {
return nil, false, nil // key not covered
}
for {
switch rnode.node.(type) {
// bucket = leaf
case *Bucket:
return rnode, true, nil
}
// its tree -> activate to expand; check for ø case
tree := rnode.node.(*Tree)
err = tree.PActivate(ctx)
if err != nil {
return nil, false, err
}
defer tree.PDeactivate()
// empty tree -> don't expand - it is already leaf
if len(tree.Entryv()) == 0 {
return rnode, true, nil
}
// expand tree children
children := prs.Expand(rnode)
rnode = children.Get(k) // k must be there
}
}
func (rs rangeSplit) String() string {
if len(rs) == 0 {
return "ø"
}
s := ""
for _, rn := range rs {
if s != "" {
s += " "
}
s += fmt.Sprintf("%s", rn)
}
return s
}
// treediff computes δT/δtrack for tree/trackSet specified by root in between old..new.
//
// δtops is set of top nodes for changed subtrees.
// δZTC is connected(δZ/T) - connected closure for subset of δZ(old..new) that
// touches tracked nodes of T.
//
// XXX holeIdx is updated XXX -> return similarly to δtrack
// XXX ^^^ -> but better kill holeIdx and do everything only via trackSet
func treediff(ctx context.Context, root zodb.Oid, δtops SetOid, δZTC SetOid, trackSet PPTreeSubSet, holeIdx treeSetKey, zconnOld, zconnNew *zodb.Connection) (δT map[Key]ΔValue, δtrack *ΔPPTreeSubSet, err error) {
defer xerr.Contextf(&err, "treediff %s..%s %s", zconnOld.At(), zconnNew.At(), root)
tracef("\ntreediff %s δtops: %v δZTC: %v\n", root, δtops, δZTC)
defer tracef("\n")
δT = map[Key]ΔValue{}
δtrackv := []*ΔPPTreeSubSet{}
for top := range δtops { // XXX -> sorted?
a, err1 := zgetNode(ctx, zconnOld, top)
b, err2 := zgetNode(ctx, zconnNew, top)
err := xerr.Merge(err1, err2)
if err != nil {
return nil, nil, err
}
δtop, δtrackTop, err := diffX(ctx, a, b, δZTC, trackSet, holeIdx)
if err != nil {
return nil, nil, err
}
// FIXME -> merge (VDEL vs add)
// XXX no - not needed here - keys cannot migrate in between two disconnected subtrees
// -> assert that keys from different δtop do not overlap
// DEL k -> Tkextra += k
// +k -> Tkextra -= k
tracef("-> δtop: %v\n", δtop)
tracef("-> δtrackTop: %v\n", δtrackTop)
for k,δv := range δtop {
δT[k] = δv
}
δtrackv = append(δtrackv, δtrackTop)
}
// adjust holeIdx
for k, δv := range δT {
if δv.Old == VDEL {
holeIdx.Del(k)
}
if δv.New == VDEL {
holeIdx.Add(k)
}
}
// adjust trackSet by merge(δtrackTops)
δtrack = &ΔPPTreeSubSet{Del: PPTreeSubSet{}, Add: PPTreeSubSet{}, δnchildNonLeafs: map[zodb.Oid]int{}}
for _, δ := range δtrackv {
δtrack.Update(δ)
}
return δT, δtrack, nil
}
// diffX computes difference in between two revisions of a tree's subtree.
//
// a, b point to top of the subtree @old and @new revisions and must be of the
// same type - tree or bucket.
//
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
//
// a/b can be nil; a=nil means addition, b=nil means deletion.
//
// δtrack is trackSet δ that needs to be applied to trackSet to keep it
// consistent with b (= a + δ).
func diffX(ctx context.Context, a, b Node, δZTC SetOid, trackSet PPTreeSubSet, holeIdx treeSetKey) (δ map[Key]ΔValue, δtrack *ΔPPTreeSubSet, err error) {
if a==nil && b==nil {
panic("BUG: both a & b == nil")
}
var aT, bT *Tree
var aB, bB *Bucket
isT := false
if a != nil {
aT, isT = a.(*Tree)
aB, _ = a.(*Bucket)
if aT == nil && aB == nil {
panicf("a: bad type %T", a)
}
}
if b != nil {
bT, isT = b.(*Tree)
bB, _ = b.(*Bucket)
if bT == nil && bB == nil {
panicf("b: bad type %T", b)
}
}
if a != nil && b != nil {
if a.POid() != b.POid() {
panicf("BUG: a.oid != b.oid ; a: %s b: %s", a.POid(), b.POid())
}
if !((aT != nil && bT != nil) || (aB != nil && bB != nil)) {
return nil, nil, fmt.Errorf("object %s: type mutated %s -> %s", a.POid(),
zodb.ClassOf(a), zodb.ClassOf(b))
}
}
if isT {
return diffT(ctx, aT, bT, δZTC, trackSet, holeIdx)
} else {
var δtrack *ΔPPTreeSubSet
δ, err := diffB(ctx, aB, bB)
if δ != nil {
δtrack = &ΔPPTreeSubSet{}
}
return δ, δtrack, err
}
}
// diffT computes difference in between two subtrees.
//
// a, b point to top of subtrees @old and @new revisions.
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
func diffT(ctx context.Context, A, B *Tree, δZTC SetOid, trackSet PPTreeSubSet, holeIdx treeSetKey) (δ map[Key]ΔValue, δtrack *ΔPPTreeSubSet, err error) {
tracef(" diffT %s %s\n", xidOf(A), xidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xidOf(A), xidOf(B))
if A == nil { panic("A is nil") }
if B == nil { panic("B is nil") }
δ = map[Key]ΔValue{}
δtrack = &ΔPPTreeSubSet{Del: PPTreeSubSet{}, Add: PPTreeSubSet{}, δnchildNonLeafs: map[zodb.Oid]int{}}
defer tracef(" -> δ: %v\n", δ)
// path prefix to A and B
prefix := []zodb.Oid{}
t := trackSet[A.POid()]
for t.parent != zodb.InvalidOid {
prefix = append([]zodb.Oid{t.parent}, prefix...)
t = trackSet[t.parent]
}
// initial split ranges for A and B
// XXX maybe walk till a from root to get more precise initial range?
atop := &nodeInRange{prefix: prefix, lo: KeyMin, hi_: KeyMax, node: A} // [-∞, ∞)
btop := &nodeInRange{prefix: prefix, lo: KeyMin, hi_: KeyMax, node: B} // [-∞, ∞)
Av := rangeSplit{atop} // nodes expanded from A
Bv := rangeSplit{btop} // nodes expanded from B
// for phase 2:
Akqueue := SetKey{} // queue for keys in A to be processed for δ-
Bkqueue := SetKey{} // ----//---- in B for δ+
Akdone := SetKey{} // already processed keys in A
Bkdone := SetKey{} // ----//---- in B
Aktodo := func(k Key) {
if !Akdone.Has(k) {
tracef(" Akq <- %d\n", k)
Akqueue.Add(k)
}
}
Bktodo := func(k Key) {
if !Bkdone.Has(k) {
tracef(" Bkq <- %d\n", k)
Bkqueue.Add(k)
}
}
// {} oid -> parent for all nodes in Bv: current and previously expanded - up till top B
// XXX requires A.oid == B.oid
BtrackSet := PPTreeSubSet{}
BtrackSet.AddPath(trackSet.Path(B.POid()))
// phase 1: expand A top->down driven by δZTC.
// by default a node contributes to δ-
// a node ac does not contribute to δ- and can be skipped, if:
// - ac is not tracked, or
// - ac ∉ δZTC && ∃ bc from B: ac.oid == bc.oid (ac+ac.children were not changed, and ac stays in the tree)
Aq := []*nodeInRange{atop} // queue for A nodes that contribute to δ-
for len(Aq) > 0 {
tracef("\n")
tracef(" aq: %v\n", Aq)
tracef(" av: %s\n", Av)
tracef(" bv: %s\n", Bv)
ra := pop(&Aq)
err = ra.node.PActivate(ctx); /*X*/if err != nil { return nil,nil, err }
defer ra.node.PDeactivate()
tracef(" a: %s\n", ra)
switch a := ra.node.(type) {
case *Bucket:
// a is bucket -> δ-
δA, err := diffB(ctx, a, nil); /*X*/if err != nil { return nil,nil, err }
err = δMerge(δ, δA); /*X*/if err != nil { return nil,nil, err }
δtrack.Del.AddPath(ra.Path())
// Bkqueue <- δA
for k := range δA {
Akdone.Add(k)
Bktodo(k)
}
// Bkqueue <- holes(ra.range)
for k := range holeIdx.GetInRange(ra.lo, ra.hi_) {
Bktodo(k)
}
ra.done = true
case *Tree:
// empty tree - only queue holes covered by it
if len(a.Entryv()) == 0 {
for k := range holeIdx.GetInRange(ra.lo, ra.hi_) {
Bktodo(k)
}
continue
}
// a is !empty tree - expand it and queue children
// check for each children whether it can be skipped
achildren := Av.Expand(ra)
for _, ac := range achildren {
acOid := ac.node.POid()
at, tracked := trackSet[acOid]
if !tracked && /*cannot skip embedded bucket:*/acOid != zodb.InvalidOid {
continue
}
if !δZTC.Has(acOid) && /*cannot skip embedded bucket:*/acOid != zodb.InvalidOid {
// check B children for node with ac.oid
// while checking expand Bv till ac.lo and ac.hi_ point to the same node
// ( this does not give exact answer but should be a reasonable heuristic;
// the diff is the same if heuristic does not work and we
// look into and load more nodes to compute δ )
_, found := BtrackSet[acOid]
if !found {
for {
blo := Bv.Get(ac.lo)
bhi_ := Bv.Get(ac.hi_)
if blo != bhi_ {
break
}
_, ok := blo.node.(*Tree)
if !ok {
break // bucket
}
err = blo.node.PActivate(ctx); /*X*/if err != nil { return nil,nil, err }
defer blo.node.PDeactivate()
// XXX check for empty tree?
bchildren := Bv.Expand(blo)
for _, bc := range bchildren {
bcOid := bc.node.POid()
BtrackSet.AddPath(bc.Path())
if acOid == bcOid {
found = true
}
}
if found {
break
}
}
}
if found {
// ac can be skipped
// XXX Bkqueue <- holes(ac.range \ bc.range) XXX test for this
// adjust trackSet since path to the node could have changed
apath := trackSet.Path(acOid)
bpath := BtrackSet.Path(acOid)
if !pathEqual(apath, bpath) {
δtrack.Del.AddPath(apath)
δtrack.Add.AddPath(bpath)
if nc := at.nchild; nc != 0 {
δtrack.δnchildNonLeafs[acOid] = nc
}
}
continue
}
}
// ac cannot be skipped
push(&Aq, ac)
}
}
}
// phase 2: reach consistency in between A and B.
// Every key removed in A has to be checked for whether it is present
// in B and contribute to δ+. In B, in turn, adding that key can add
// other keys to δ+. Those keys, in turn, have to be checked for
// whether they were present in A and contribute to δ-. For example:
//
// [ 2 4 ] [ 3 5 ]
// ↓ ↓ ↓ ↓ ↓ ↓
// |1| |23| |45| |12| |34| |56|
//
// if values for all keys change, tracked={1}, change to 1 adds
// * -B1, which queues B.1 and leads to
// * +B12, which queues A.2 and leads to
// * -B23, which queues B.3 and leads to
// * +B23, ...
//
// XXX inefficient: we process each key separately, while they can be
// processed in sorted batches.
tracef("\nphase 2:\n")
for {
tracef("\n")
tracef(" av: %s\n", Av)
tracef(" bv: %s\n", Bv)
tracef("\n")
tracef(" Bkq: %s\n", Bkqueue)
if len(Bkqueue) == 0 {
break
}
for k := range Bkqueue {
b, err := Bv.GetToLeaf(ctx, k); /*X*/if err != nil { return nil,nil, err }
tracef(" B k%d -> %s\n", k, b)
// +bucket if that bucket is reached for the first time
if !b.done {
var δB map[Key]ΔValue
bbucket, ok := b.node.(*Bucket)
if ok { // !ok means ø tree
δB, err = diffB(ctx, nil, bbucket); /*X*/if err != nil { return nil,nil, err }
}
// δ <- δB
err = δMerge(δ, δB); /*X*/if err != nil { return nil,nil, err }
δtrack.Add.AddPath(b.Path())
// Akqueue <- δB
for k_ := range δB {
Bkdone.Add(k_)
Aktodo(k_)
}
b.done = true
}
// XXX k is not there -> hole XXX test
}
Bkqueue = SetKey{}
tracef("\n")
tracef(" Akq: %s\n", Akqueue)
for k := range Akqueue {
a, err := Av.GetToLeaf(ctx, k); /*X*/if err != nil { return nil,nil, err }
tracef(" A k%d -> %s\n", k, a)
// -bucket if that bucket is reached for the first time
if !a.done {
var δA map[Key]ΔValue
abucket, ok := a.node.(*Bucket)
if ok { // !ok means ø tree
δA, err = diffB(ctx, abucket, nil); /*X*/if err != nil { return nil,nil, err }
}
// δ <- δA
err = δMerge(δ, δA); /*X*/if err != nil { return nil,nil, err }
δtrack.Del.AddPath(a.Path())
// Bkqueue <- δA
for k_ := range δA {
Akdone.Add(k_)
Bktodo(k_)
}
// Bkqueue <- holes(a.range)
for k_ := range holeIdx.GetInRange(a.lo, a.hi_) {
Bktodo(k_)
}
a.done = true
}
}
Akqueue = SetKey{}
}
return δ, δtrack, nil
}
// δMerge merges changes from δ2 into δ.
// δ is total-building diff, while δ2 is diff from comparing some subnodes.
func δMerge(δ, δ2 map[Key]ΔValue) error {
tracef(" δmerge %v <- %v\n", δ, δ2)
defer tracef(" -> %v\n", δ)
// merge δ <- δ2
for k, δv2 := range δ2 {
δv1, already := δ[k]
if !already {
δ[k] = δv2
continue
}
// both δ and δ2 has [k] - it can be that key
// entry migrated from one bucket into another.
// XXX also handle ø->ø + ø->c i.e. [k] was hole and become value
if !( (δv1.New == VDEL && δv2.Old == VDEL) ||
(δv1.Old == VDEL && δv2.New == VDEL) ) {
return fmt.Errorf("BUG or btree corrupt: [%v] has " +
"duplicate entries: %v, %v", k, δv1, δv2)
}
δv := ΔValue{}
switch {
// x -> ø | ø -> x
// ø -> ø
case δv2.Old == VDEL && δv2.New == VDEL: // δv2 == hole
δv = δv1
// ø -> ø
// y -> ø | ø -> y
case δv1.Old == VDEL && δv1.New == VDEL: // δv1 == hole
δv = δv2
// ø -> x -> y->x
// y -> ø
case δv2.New == VDEL:
δv.Old = δv2.Old
δv.New = δv1.New
// x -> ø -> x->y
// ø -> y
default:
δv.Old = δv1.Old
δv.New = δv2.New
}
tracef(" [%v] merge %s %s -> %s\n", k, δv1, δv2, δv)
if δv.Old != δv.New {
δ[k] = δv
} else {
delete(δ, k) // NOTE also annihilates hole migration
}
}
return nil
}
// diffB computes difference in between two buckets.
// see diffX for details.
func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
tracef(" diffB %s %s\n", xidOf(a), xidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xidOf(a), xidOf(b))
// XXX oid can be InvalidOid for T/B... (i.e. B is part of T and is not yet committed separately)
var av []BucketEntry
var bv []BucketEntry
if a != nil {
err = a.PActivate(ctx); if err != nil { return nil, err }
defer a.PDeactivate()
av = a.Entryv() // key↑
}
if b != nil {
err = b.PActivate(ctx); if err != nil { return nil, err }
defer b.PDeactivate()
bv = b.Entryv() // key↑
}
δ = map[Key]ΔValue{}
defer tracef(" -> δb: %v\n", δ)
//tracef(" av: %v", av)
//tracef(" bv: %v", bv)
for len(av) > 0 || len(bv) > 0 {
ka, va := KeyMax, VDEL
kb, vb := KeyMax, VDEL
if len(av) > 0 {
ka = av[0].Key()
va, err = vOid(av[0].Value())
if err != nil {
return nil, fmt.Errorf("a[%v]: %s", ka, err)
}
}
if len(bv) > 0 {
kb = bv[0].Key()
vb, err = vOid(bv[0].Value())
if err != nil {
return nil, fmt.Errorf("b[%v]: %s", kb, err)
}
}
switch {
case ka < kb: // -a[0]
δ[ka] = ΔValue{va, VDEL}
av = av[1:]
case ka > kb: // +b[0]
δ[kb] = ΔValue{VDEL, vb}
bv = bv[1:]
// ka == kb // va->vb
default:
if va != vb {
δ[ka] = ΔValue{va, vb}
}
av = av[1:]
bv = bv[1:]
}
}
return δ, nil
}
// zgetNode returns btree node corresponding to zconn.Get(oid) .
func zgetNode(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ Node, err error) {
defer xerr.Contextf(&err, "getnode %s@%s", oid, zconn.At())
xnode, err := zconn.Get(ctx, oid)
if err != nil {
return nil, err
}
node, ok := xnode.(Node)
if !ok {
return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xnode))
}
return node, nil
}
// vOid returns OID of a value object.
// it is an error if value is not persistent object.
func vOid(xvalue interface{}) (zodb.Oid, error) {
value, ok := xvalue.(zodb.IPersistent)
if !ok {
return zodb.InvalidOid, fmt.Errorf("%T is not a persitent object", xvalue)
}
return value.POid(), nil
}
// xidOf return string representation of object xid.
func xidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
δBtail.δZtail.ForgetPast(revCut) // XXX stub
......@@ -1481,46 +637,3 @@ func vnode(node Node) string {
}
return kind + node.POid().String()
}
func (rn nodeInRange) String() string {
slo := "-∞"; if rn.lo > KeyMin { slo = fmt.Sprintf("%v", rn.lo) }
shi := "∞"; if rn.hi_ < KeyMax { shi = fmt.Sprintf("%v", rn.hi_+1) }
done := " "; if rn.done { done = "*" }
return fmt.Sprintf("%s[%s,%s)%s", done, slo, shi, vnode(rn.node))
}
// push pushes element to node stack.
func push(nodeStk *[]*nodeInRange, top *nodeInRange) {
*nodeStk = append(*nodeStk, top)
}
// pop pops top element from node stack.
func pop(nodeStk *[]*nodeInRange) *nodeInRange {
stk := *nodeStk
l := len(stk)
top := stk[l-1]
*nodeStk = stk[:l-1]
return top
}
// pathEqual returns whether two paths are the same.
func pathEqual(patha, pathb []zodb.Oid) bool {
if len(patha) != len(pathb) {
return false
}
for i, a := range patha {
if pathb[i] != a {
return false
}
}
return true
}
const debug = false
func tracef(format string, argv ...interface{}) {
if debug {
fmt.Printf(format, argv...)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment