Commit 7602c1f4 authored by Kirill Smelkov's avatar Kirill Smelkov

ΔBtail concurrency

See changes in δbtail.go for overview.

* t2+RebuildJob: (52 commits)
  .
  .
  .
  .
  .
  X Track should be nop if keycov/path is already in krebuildJobs
  .
  .
  .
  X xbtree/blib: RangedMap, RangedSet += IntersectsRange, Intersection
  X xbtree: tests: Also verify state of ΔTtail.ktrackNew
  .
  .
  .
  .
  .
  .
  .
  .
  .
  ...
parents 57be0126 c4e16704
......@@ -11,7 +11,7 @@ require (
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.7.0
lab.nexedi.com/kirr/go123 v0.0.0-20210906140734-c9eb28d9e408
lab.nexedi.com/kirr/neo/go v0.0.0-20210908100526-87199da2b163
lab.nexedi.com/kirr/neo/go v0.0.0-20211004111643-c74a5a3cd0d0
)
// we use kirr/go-fuse@y/nodefs-cancel
......
......@@ -214,3 +214,5 @@ lab.nexedi.com/kirr/neo/go v0.0.0-20210720105030-d99bf118d61a h1:ex8P5oGhvDDp4y3
lab.nexedi.com/kirr/neo/go v0.0.0-20210720105030-d99bf118d61a/go.mod h1:llI3hcJJMACe+rYuXUfS5dljjwIrlBMfJ1ZeRcey96A=
lab.nexedi.com/kirr/neo/go v0.0.0-20210908100526-87199da2b163 h1:0HTNfLHL2ZNmfETtlF0iFPpWfuAAjzfIkxL5r6x2ALE=
lab.nexedi.com/kirr/neo/go v0.0.0-20210908100526-87199da2b163/go.mod h1:llI3hcJJMACe+rYuXUfS5dljjwIrlBMfJ1ZeRcey96A=
lab.nexedi.com/kirr/neo/go v0.0.0-20211004111643-c74a5a3cd0d0 h1:rmfVDj/IaTiMUFAXTKyW993f1G5IxKcZ1vtcrrqscpk=
lab.nexedi.com/kirr/neo/go v0.0.0-20211004111643-c74a5a3cd0d0/go.mod h1:llI3hcJJMACe+rYuXUfS5dljjwIrlBMfJ1ZeRcey96A=
......@@ -36,7 +36,8 @@ type Node = btree.LONode
type TreeEntry = btree.LOEntry
type BucketEntry = btree.LOBucketEntry
type Key = int64
type Key = int64
type KeyRange = btree.LKeyRange
const KeyMax Key = math.MaxInt64
const KeyMin Key = math.MinInt64
......
......@@ -26,10 +26,27 @@ VALUE=$2
out=$3
input=$(dirname $0)/rangemap.go.in
blib=$(cd $(dirname $0) && go list) # fullpath for blib package
curr=$(go list) # ----//---- current package
pkgname=$(go list -f {{.Name}}) # name of current package
echo "// Code generated by gen-rangemap $TYPE $VALUE; DO NOT EDIT." >$out
echo >>$out
# fiximports adjusts rangemap.go code to work outside of blib packages.
fiximports() {
if [ "$curr" == "$blib" ]; then
cat
return
fi
sed \
-e "/package blib/a \\\\nimport \"$blib\"\\n" \
-e "s/package blib/package $pkgname/g" \
-e 's/\([^\w.]\)KeyRange\b/\1blib.KeyRange/g' \
-e 's/\bKStr\b/blib.KStr/g'
}
sed \
-e "s/VALUE/$VALUE/g" \
-e "s/\bRangedMap\b/${TYPE}/g" \
......@@ -40,4 +57,4 @@ sed \
-e "s/\btraceRangeMap\b/trace${TYPE}/g" \
-e "s/\bdebugRangeMap\b/debug${TYPE}/g" \
-e "s/\bdebugfRMap\b/debugf${TYPE}/g" \
$input >>$out
$input |fiximports >>$out
// Copyright (C) 2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package blib
// range of keys.
import (
"fmt"
)
// KeyRange represents [lo,hi) Key range.
type KeyRange struct {
Lo Key
Hi_ Key // NOTE _not_ hi) to avoid overflow at ∞; hi = hi_ + 1
}
// Has returns whether key k belongs to the range.
func (r *KeyRange) Has(k Key) bool {
return (r.Lo <= k && k <= r.Hi_)
}
// Empty returns whether key range is empty.
func (r *KeyRange) Empty() bool {
hi := r.Hi_
if hi == KeyMax {
// [x,∞] cannot be empty because max x is ∞ and [∞,∞] has one element: ∞
return false
}
hi++ // no overflow
return r.Lo >= hi
}
func (r KeyRange) String() string {
var shi string
if r.Hi_ == KeyMax {
shi = KStr(r.Hi_) // ∞
} else {
shi = fmt.Sprintf("%d", r.Hi_+1)
}
return fmt.Sprintf("[%s,%s)", KStr(r.Lo), shi)
}
......@@ -46,9 +46,11 @@ type RangedMapEntry struct {
// Get returns value associated with key k.
func (M *RangedMap) Get(k Key) VALUE {
v, _ := M.Get_(k)
return v
//
// KeyRange indicates all keys adjacent to k, that are too mapped to the same value.
func (M *RangedMap) Get(k Key) (VALUE, KeyRange) {
v, r, _ := M.Get_(k)
return v, r
}
// Set changes M to map key k to value v.
......@@ -63,20 +65,21 @@ func (M *RangedMap) Del(k Key) {
// Has returns whether key k is present in the map.
func (M *RangedMap) Has(k Key) bool {
_, ok := M.Get_(k)
_, _, ok := M.Get_(k)
return ok
}
// Get_ is comma-ok version of Get.
func (M *RangedMap) Get_(k Key) (v VALUE, ok bool) {
func (M *RangedMap) Get_(k Key) (v VALUE, r KeyRange, ok bool) {
r = KeyRange{0,-1} // zero value represents non-empty [0,1)
if traceRangeMap {
fmt.Printf("\n\nGet_:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" k: %s\n", KStr(k))
defer func() {
fmt.Printf("->·: %v, %t\n", v, ok)
fmt.Printf("->·: %v%s, %t\n", v, r, ok)
}()
}
......@@ -99,7 +102,7 @@ func (M *RangedMap) Get_(k Key) (v VALUE, ok bool) {
}
// found
return e.Value, true
return e.Value, e.KeyRange, true
}
// SetRange changes M to map key range r to value v.
......@@ -359,6 +362,40 @@ func (M *RangedMap) HasRange(r KeyRange) (yes bool) {
}
}
// IntersectsRange returns whether some keys from range r belong to the map.
func (M *RangedMap) IntersectsRange(r KeyRange) (yes bool) {
if traceRangeMap {
fmt.Printf("\n\nIntersectsRange:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" r: %s\n", r)
defer func() {
fmt.Printf("->·: %v\n", yes)
}()
}
M.verify()
if r.Empty() {
return false
}
// find first ilo: r.lo < [ilo].hi
l := len(M.entryv)
ilo := sort.Search(l, func(i int) bool {
return r.Lo <= M.entryv[i].Hi_
})
debugfRMap("\tilo: %d\n", ilo)
if ilo == l { // not found
return false
}
// [ilo].hi may be either inside r (≤ r.hi), or > r.hi
// - if it is inside -> overlap is there,
// - if it is > r.hi -> overlap is there if [ilo].lo < r.hi
// => in any case overlap is there if [ilo].lo < r.hi
return M.entryv[ilo].Lo <= r.Hi_
}
// --------
......
......@@ -38,14 +38,15 @@ const (
func TestRangedMap(t *testing.T) {
type testEntry struct {
M *RangedMap
X RangedMapEntry
Set *RangedMap // M.SetRange(X.keycov, X.value)
Del *RangedMap // M.DelRange(X.keycov)
Has bool // M.HasRange(X.keycov)
M *RangedMap
X RangedMapEntry
Set *RangedMap // M.SetRange(X.keycov, X.value)
Del *RangedMap // M.DelRange(X.keycov)
Has bool // M.HasRange(X.keycov)
Intersects bool // M.IntersectsRange(X.Keycov)
}
E := func(M *RangedMap, X RangedMapEntry, S, D *RangedMap, H bool) testEntry {
return testEntry{M, X, S, D, H}
E := func(M *RangedMap, X RangedMapEntry, S, D *RangedMap, H, I bool) testEntry {
return testEntry{M, X, S, D, H, I}
}
// M is shorthand to create RangedMap, e.g. M(1,2,a, 3,4,b) will return {[1,2):a [3,4):b}.
......@@ -101,7 +102,8 @@ func TestRangedMap(t *testing.T) {
X(0,0,x), // X
M(), // Set
M(), // Del
y), // Has
y, // Has
n), // Intersects
// empty vs !empty
E(
......@@ -109,7 +111,8 @@ func TestRangedMap(t *testing.T) {
X(1,2,x), // X
M(1,2,x), // Set
M(), // Del
n), // Has
n, // Has
n), // Intersects
// !empty vs empty
E(
......@@ -117,7 +120,8 @@ func TestRangedMap(t *testing.T) {
X(0,0,x), // X
M(1,2,a), // Set
M(1,2,a), // Del
y), // Has
y, // Has
n), // Intersects
// basic change
E(
......@@ -125,7 +129,8 @@ func TestRangedMap(t *testing.T) {
X(1,2,x), // X
M(1,2,x), // Set
M(), // Del
y), // Has
y, // Has
y), // Intersects
// adjacent [1,3) [3,5)
E(
......@@ -133,7 +138,8 @@ func TestRangedMap(t *testing.T) {
X(3,5,x), // X
M(1,3,a, 3,5,x), // Set
M(1,3,a), // Del
n), // Has
n, // Has
n), // Intersects
// overlapping [1,3) [2,4)
E(
......@@ -141,7 +147,8 @@ func TestRangedMap(t *testing.T) {
X(2,4,x), // X
M(1,2,a, 2,4,x), // Set
M(1,2,a), // Del
n), // Has
n, // Has
y), // Intersects
// [1,7) vs [3,5) -> split
E(
......@@ -149,7 +156,8 @@ func TestRangedMap(t *testing.T) {
X(3,5,x), // X
M(1,3,a, 3,5,x, 5,7,a), // Set
M(1,3,a, 5,7,a), // Del
y), // Has
y, // Has
y), // Intersects
// several ranges vs [-∞,∞)
E(
......@@ -157,14 +165,16 @@ func TestRangedMap(t *testing.T) {
X(noo,oo,x), // X
M(noo,oo,x), // Set
M(), // Del
n), // Has
n, // Has
y), // Intersects
E(
M(1,2,a, 2,3,b), // M
X(1,3,x), // X
M(1,3,x), // Set
M(), // Del
y), // Has
y, // Has
y), // Intersects
// coalesce (same value, no overlap)
E(
......@@ -172,7 +182,8 @@ func TestRangedMap(t *testing.T) {
X(2,4,a), // X
M(1,5,a), // Set
M(1,2,a, 4,5,a), // Del
n), // Has
n, // Has
n), // Intersects
// coalesce (same value, overlap)
E(
......@@ -180,7 +191,8 @@ func TestRangedMap(t *testing.T) {
X(2,6,a), // X
M(1,8,a), // Set
M(1,2,a, 6,8,a), // Del
n), // Has
n, // Has
y), // Intersects
// - shrink left/right (value !same) + new entry
E(
......@@ -188,19 +200,22 @@ func TestRangedMap(t *testing.T) {
X(2,6,x), // X
M(1,2,a, 2,6,x), // Set
M(1,2,a), // Del
n), // Has
n, // Has
y), // Intersects
E(
M(5,8,b), // M
X(2,6,x), // X
M(2,6,x, 6,8,b), // Set
M( 6,8,b), // Del
n), // Has
n, // Has
y), // Intersects
E(
M(1,4,a, 5,8,b), // M
X(2,6,x), // X
M(1,2,a, 2,6,x, 6,8,b), // Set
M(1,2,a, 6,8,b), // Del
n), // Has
M(1,2,a, 6,8,b), // Del
n, // Has
y), // Intersects
}
for _, tt := range testv {
......@@ -210,6 +225,7 @@ func TestRangedMap(t *testing.T) {
v := X.Value
assertMapHasRange(t, M, r, tt.Has)
assertMapIntersectsRange(t, M, r, tt.Intersects)
Mset := M.Clone()
Mdel := M.Clone()
Mset.SetRange(r, v)
......@@ -226,11 +242,9 @@ func TestRangedMap(t *testing.T) {
}
assertMapHasRange(t, Mset, r, true)
rInMdel := false
if r.Empty() {
rInMdel = true
}
assertMapHasRange(t, Mdel, r, rInMdel)
assertMapHasRange(t, Mdel, r, r.Empty())
assertMapIntersectsRange(t, Mset, r, !r.Empty())
assertMapIntersectsRange(t, Mdel, r, false)
verifyGet(t, M)
verifyGet(t, Mset)
......@@ -238,7 +252,7 @@ func TestRangedMap(t *testing.T) {
}
}
// assertMapHasRange asserts that RangeMap M.HasRange(r) == hasOK.
// assertMapHasRange asserts that RangedMap M.HasRange(r) == hasOK.
func assertMapHasRange(t *testing.T, M *RangedMap, r KeyRange, hasOK bool) {
t.Helper()
has := M.HasRange(r)
......@@ -247,6 +261,15 @@ func assertMapHasRange(t *testing.T, M *RangedMap, r KeyRange, hasOK bool) {
}
}
// assertMapIntersectsRange asserts that RangedMap M.IntersectsRange(r) == intersectsOK.
func assertMapIntersectsRange(t *testing.T, M *RangedMap, r KeyRange, intersectsOK bool) {
t.Helper()
intersects := M.IntersectsRange(r)
if !(intersects == intersectsOK) {
t.Errorf("IntersectsRange:\n M: %s\n r: %s\n ->·: %t\n ok·: %t\n", M, r, intersects, intersectsOK)
}
}
// verifyGet verifies RangedMap.Get .
func verifyGet(t *testing.T, M *RangedMap) {
t.Helper()
......@@ -260,10 +283,10 @@ func verifyGet(t *testing.T, M *RangedMap) {
lo := kmax(e.Lo, Z.Lo)
hi_ := kmin(e.Hi_, Z.Hi_)
for k := lo; k <= hi_; k++ {
v, ok := M.Get_(k)
if !(v == e.Value && ok) {
t.Errorf("%s\tGet(%s):\nhave: %q, %t\nwant: %q, true",
M, KStr(k), v, ok, e.Value)
v, r, ok := M.Get_(k)
if !(v == e.Value && r == e.KeyRange && ok) {
t.Errorf("%s\tGet(%s):\nhave: %q%s, %t\nwant: %q%s, true",
M, KStr(k), v, r, ok, e.Value, e.KeyRange)
}
}
}
......@@ -280,10 +303,10 @@ func verifyGet(t *testing.T, M *RangedMap) {
lo := kmax(r.Lo, Z.Lo)
hi_ := kmin(r.Hi_, Z.Hi_)
for k := lo; k <= hi_; k++ {
v, ok := M.Get_(k)
if !(v == "" && !ok) {
t.Errorf("%s\tGet(%s):\nhave: %q, %t\nwant: %q, false",
M, KStr(k), v, ok, "")
v, r_, ok := M.Get_(k)
if !(v == "" && r_.Empty() && !ok) {
t.Errorf("%s\tGet(%s):\nhave: %q%s, %t\nwant: %q[), false",
M, KStr(k), v, r_, ok, "")
}
}
}
......
......@@ -63,10 +63,15 @@ func (S *RangedKeySet) DelRange(r KeyRange) {
}
// HasRange returns whether all keys from range r belong to the set.
func (S *RangedKeySet) HasRange(r KeyRange) (yes bool) {
func (S *RangedKeySet) HasRange(r KeyRange) bool {
return S.m.HasRange(r)
}
// IntersectsRange returns whether some keys from range r belong to the set.
func (S *RangedKeySet) IntersectsRange(r KeyRange) bool {
return S.m.IntersectsRange(r)
}
// Union returns RangedKeySet(A.keys | B.keys).
func (A *RangedKeySet) Union(B *RangedKeySet) *RangedKeySet {
......@@ -82,7 +87,12 @@ func (A *RangedKeySet) Difference(B *RangedKeySet) *RangedKeySet {
return D
}
// TODO Intersection
// Intersection returns RangedKeySet(A.keys ^ B.keys).
func (A *RangedKeySet) Intersection(B *RangedKeySet) *RangedKeySet {
I := A.Clone()
I.IntersectionInplace(B)
return I
}
func (A *RangedKeySet) UnionInplace(B *RangedKeySet) {
A.verify()
......@@ -109,6 +119,21 @@ func (A *RangedKeySet) DifferenceInplace(B *RangedKeySet) {
}
}
func (A *RangedKeySet) IntersectionInplace(B *RangedKeySet) {
A.verify()
B.verify()
defer A.verify()
// XXX very dumb
// A^B = (A∪B) \ (A\B ∪ B\A)
AdB := A.Difference(B)
BdA := B.Difference(A)
ddd := AdB
ddd.UnionInplace(BdA)
A.UnionInplace(B)
A.DifferenceInplace(ddd)
}
// --------
......@@ -152,6 +177,6 @@ func (S *RangedKeySet) AllRanges() /*readonly*/[]KeyRange {
}
func (S RangedKeySet) String() string {
// RangeMap<void> supports formatting for set out of the box
// RangedMap<void> supports formatting for set out of the box
return S.m.String()
}
......@@ -44,12 +44,13 @@ func TestRangedKeySetTypes(t *testing.T) {
func TestRangedKeySet(t *testing.T) {
type testEntry struct {
A, B *RangedKeySet
Union *RangedKeySet
Difference *RangedKeySet
A, B *RangedKeySet
Union *RangedKeySet
Difference *RangedKeySet
Intersection *RangedKeySet
}
E := func(A, B, U, D *RangedKeySet) testEntry {
return testEntry{A, B, U, D}
E := func(A, B, U, D, I *RangedKeySet) testEntry {
return testEntry{A, B, U, D, I}
}
// S is shorthand to create RangedKeySet, e.g. S(1,2, 4,5) will return {[1,2) [4,5)}
......@@ -81,68 +82,78 @@ func TestRangedKeySet(t *testing.T) {
S(), // A
S(), // B
S(), // U
S()), // D
S(), // D
S()), // I
E(
S(), // A
S(1,2), // B
S(1,2), // U
S()), // D
S(), // D
S()), // I
E(
S(1,2), // A
S(), // B
S(1,2), // U
S(1,2)),// D
S(1,2), // D
S()), // I
E(
S(1,2), // A
S(1,2), // B
S(1,2), // U
S()), // D
S(), // D
S(1,2)),// I
// adjacent [1,3) [3,5)
E(
S(1,3), // A
S(3,5), // B
S(1,5), // U
S(1,3)), // D
S(1,5), // U
S(1,3), // D
S()), // I
// overlapping [1,3) [2,4)
E(
S(1,3), // A
S(2,4), // B
S(1,4), // U
S(1,2)), // D
S(1,4), // U
S(1,2), // D
S(2,3)),// I
// [1,7) \ [3,5) -> [1,3) [5,7)
E(
S(1,7), // A
S(3,5), // B
S(1,7),
S(1,3, 5,7)),
S(1,7), // U
S(1,3, 5,7), // D
S(3,5)), // I
// several ranges \ [-∞,∞) -> ø
E(
S(1,3, 5,7, 11,100), // A
S(noo, oo), // B
S(noo, oo), // U
S()), // D
S(noo, oo), // U
S(), // D
S(1,3, 5,7, 11,100)), // I
// [1,3) [5,7) + insert [3,5) -> [1,7)
E(
S(1,3, 5,7), // A
S(3,5), // B
S(1,7), // U
S(1,3, 5,7)), // D
S(1,3, 5,7), // D
S()), // I
// delete covering several ranges
// [-1,0) [1,3) [5,7) [9,11) [15,20) [100,200) \ [2,17)
E(
S(-1,0, 1,3, 5,7, 9,11, 15,20, 100,200), // A
S(2,17), // B
S(-1,0, 1,3, 5,7, 9,11, 15,20, 100,200),// A
S(2,17), // B
S(-1,0, 1,20, 100,200), // U
S(-1,0, 1,2, 17,20, 100,200)), // D
S(-1,0, 1,2, 17,20, 100,200), // D
S(2,3, 5,7, 9,11, 15,17)), // I
}
for _, tt := range testv {
......@@ -150,6 +161,7 @@ func TestRangedKeySet(t *testing.T) {
B := tt.B
U := A.Union(B)
D := A.Difference(B)
I := A.Intersection(B)
if !U.Equal(tt.Union) {
t.Errorf("Union:\n A: %s\n B: %s\n ->u: %s\n okU: %s\n", A, B, U, tt.Union)
......@@ -157,12 +169,18 @@ func TestRangedKeySet(t *testing.T) {
if !D.Equal(tt.Difference) {
t.Errorf("Difference:\n A: %s\n B: %s\n ->d: %s\n okD: %s\n", A, B, D, tt.Difference)
}
if !I.Equal(tt.Intersection) {
t.Errorf("Intersection:\n A: %s\n B: %s\n ->i: %s\n okI: %s\n", A, B, I, tt.Intersection)
}
// HasRange
assertSetHasRanges(t, A, A.AllRanges(), true)
assertSetHasRanges(t, B, B.AllRanges(), true)
assertSetHasRanges(t, U, A.AllRanges(), true)
assertSetHasRanges(t, U, B.AllRanges(), true)
assertSetHasRanges(t, A, I.AllRanges(), true)
assertSetHasRanges(t, B, I.AllRanges(), true)
assertSetHasRanges(t, U, I.AllRanges(), true)
Dab := D
Dba := B.Difference(A)
......@@ -170,6 +188,20 @@ func TestRangedKeySet(t *testing.T) {
assertSetHasRanges(t, B, Dab.AllRanges(), false)
assertSetHasRanges(t, B, Dba.AllRanges(), true)
assertSetHasRanges(t, A, Dba.AllRanges(), false)
assertSetHasRanges(t, Dab, I.AllRanges(), false)
assertSetHasRanges(t, Dba, I.AllRanges(), false)
assertSetHasRanges(t, I, Dab.AllRanges(), false)
assertSetHasRanges(t, I, Dba.AllRanges(), false)
// IntersectsRange (= (A^B)!=ø)
assertSetIntersectsRanges(t, A, I.AllRanges(), !I.Empty())
assertSetIntersectsRanges(t, B, I.AllRanges(), !I.Empty())
assertSetIntersectsRanges(t, Dab, B.AllRanges(), false)
assertSetIntersectsRanges(t, Dba, A.AllRanges(), false)
assertSetIntersectsRanges(t, Dab, I.AllRanges(), false)
assertSetIntersectsRanges(t, Dba, I.AllRanges(), false)
assertSetIntersectsRanges(t, I, Dab.AllRanges(), false)
assertSetIntersectsRanges(t, I, Dba.AllRanges(), false)
}
}
......@@ -183,3 +215,14 @@ func assertSetHasRanges(t *testing.T, S *RangedKeySet, rangev []KeyRange, hasOK
}
}
}
// assertSetIntersectsRanges asserts for all ranges from rangev that RangedSet S.IntersectsRange(r) == intersectsOK.
func assertSetIntersectsRanges(t *testing.T, S *RangedKeySet, rangev []KeyRange, intersectsOK bool) {
t.Helper()
for _, r := range rangev {
intersects := S.IntersectsRange(r)
if intersects != intersectsOK {
t.Errorf("IntersectsRange:\n S: %s\n r: %s\n ->: %v\n ok: %v\n", S, r, intersects, intersectsOK)
}
}
}
......@@ -48,9 +48,11 @@ type _RangedMap_strEntry struct {
// Get returns value associated with key k.
func (M *_RangedMap_str) Get(k Key) string {
v, _ := M.Get_(k)
return v
//
// KeyRange indicates all keys adjacent to k, that are too mapped to the same value.
func (M *_RangedMap_str) Get(k Key) (string, KeyRange) {
v, r, _ := M.Get_(k)
return v, r
}
// Set changes M to map key k to value v.
......@@ -65,20 +67,21 @@ func (M *_RangedMap_str) Del(k Key) {
// Has returns whether key k is present in the map.
func (M *_RangedMap_str) Has(k Key) bool {
_, ok := M.Get_(k)
_, _, ok := M.Get_(k)
return ok
}
// Get_ is comma-ok version of Get.
func (M *_RangedMap_str) Get_(k Key) (v string, ok bool) {
func (M *_RangedMap_str) Get_(k Key) (v string, r KeyRange, ok bool) {
r = KeyRange{0,-1} // zero value represents non-empty [0,1)
if trace_RangedMap_str {
fmt.Printf("\n\nGet_:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" k: %s\n", KStr(k))
defer func() {
fmt.Printf("->·: %v, %t\n", v, ok)
fmt.Printf("->·: %v%s, %t\n", v, r, ok)
}()
}
......@@ -101,7 +104,7 @@ func (M *_RangedMap_str) Get_(k Key) (v string, ok bool) {
}
// found
return e.Value, true
return e.Value, e.KeyRange, true
}
// SetRange changes M to map key range r to value v.
......@@ -361,6 +364,40 @@ func (M *_RangedMap_str) HasRange(r KeyRange) (yes bool) {
}
}
// IntersectsRange returns whether some keys from range r belong to the map.
func (M *_RangedMap_str) IntersectsRange(r KeyRange) (yes bool) {
if trace_RangedMap_str {
fmt.Printf("\n\nIntersectsRange:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" r: %s\n", r)
defer func() {
fmt.Printf("->·: %v\n", yes)
}()
}
M.verify()
if r.Empty() {
return false
}
// find first ilo: r.lo < [ilo].hi
l := len(M.entryv)
ilo := sort.Search(l, func(i int) bool {
return r.Lo <= M.entryv[i].Hi_
})
debugf_RangedMap_str("\tilo: %d\n", ilo)
if ilo == l { // not found
return false
}
// [ilo].hi may be either inside r (≤ r.hi), or > r.hi
// - if it is inside -> overlap is there,
// - if it is > r.hi -> overlap is there if [ilo].lo < r.hi
// => in any case overlap is there if [ilo].lo < r.hi
return M.entryv[ilo].Lo <= r.Hi_
}
// --------
......
......@@ -48,9 +48,11 @@ type _RangedMap_voidEntry struct {
// Get returns value associated with key k.
func (M *_RangedMap_void) Get(k Key) void {
v, _ := M.Get_(k)
return v
//
// KeyRange indicates all keys adjacent to k, that are too mapped to the same value.
func (M *_RangedMap_void) Get(k Key) (void, KeyRange) {
v, r, _ := M.Get_(k)
return v, r
}
// Set changes M to map key k to value v.
......@@ -65,20 +67,21 @@ func (M *_RangedMap_void) Del(k Key) {
// Has returns whether key k is present in the map.
func (M *_RangedMap_void) Has(k Key) bool {
_, ok := M.Get_(k)
_, _, ok := M.Get_(k)
return ok
}
// Get_ is comma-ok version of Get.
func (M *_RangedMap_void) Get_(k Key) (v void, ok bool) {
func (M *_RangedMap_void) Get_(k Key) (v void, r KeyRange, ok bool) {
r = KeyRange{0,-1} // zero value represents non-empty [0,1)
if trace_RangedMap_void {
fmt.Printf("\n\nGet_:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" k: %s\n", KStr(k))
defer func() {
fmt.Printf("->·: %v, %t\n", v, ok)
fmt.Printf("->·: %v%s, %t\n", v, r, ok)
}()
}
......@@ -101,7 +104,7 @@ func (M *_RangedMap_void) Get_(k Key) (v void, ok bool) {
}
// found
return e.Value, true
return e.Value, e.KeyRange, true
}
// SetRange changes M to map key range r to value v.
......@@ -361,6 +364,40 @@ func (M *_RangedMap_void) HasRange(r KeyRange) (yes bool) {
}
}
// IntersectsRange returns whether some keys from range r belong to the map.
func (M *_RangedMap_void) IntersectsRange(r KeyRange) (yes bool) {
if trace_RangedMap_void {
fmt.Printf("\n\nIntersectsRange:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" r: %s\n", r)
defer func() {
fmt.Printf("->·: %v\n", yes)
}()
}
M.verify()
if r.Empty() {
return false
}
// find first ilo: r.lo < [ilo].hi
l := len(M.entryv)
ilo := sort.Search(l, func(i int) bool {
return r.Lo <= M.entryv[i].Hi_
})
debugf_RangedMap_void("\tilo: %d\n", ilo)
if ilo == l { // not found
return false
}
// [ilo].hi may be either inside r (≤ r.hi), or > r.hi
// - if it is inside -> overlap is there,
// - if it is > r.hi -> overlap is there if [ilo].lo < r.hi
// => in any case overlap is there if [ilo].lo < r.hi
return M.entryv[ilo].Lo <= r.Hi_
}
// --------
......
......@@ -46,9 +46,10 @@ type Node = blib.Node
type TreeEntry = blib.TreeEntry
type BucketEntry = blib.BucketEntry
type Key = blib.Key
const KeyMax = blib.KeyMax
const KeyMin = blib.KeyMin
type Key = blib.Key
type KeyRange = blib.KeyRange
const KeyMax = blib.KeyMax
const KeyMin = blib.KeyMin
// value is assumed to be persistent reference.
// deletion is represented as VDEL.
......
......@@ -36,55 +36,18 @@ import (
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/zdata"
)
type Tree = xbtreetest.Tree
type Node = xbtreetest.Node
type Key = xbtreetest.Key
type Tree = xbtreetest.Tree
type Node = xbtreetest.Node
type Key = xbtreetest.Key
type KeyRange = xbtreetest.KeyRange
type ZBlk = zdata.ZBlk
// ztreeGetBlk returns ztree[k] and tree path that lead to this block.
// XXX +return blkRevMax and use it ?
func ztreeGetBlk(ctx context.Context, ztree *Tree, k Key) (zblk ZBlk, ok bool, path []Node, err error) {
path = []Node{}
xzblk, ok, err := ztree.VGet(ctx, k, func(node Node) {
path = append(path, node)
})
if err != nil {
return nil, false, nil, err
}
if ok {
zblk, ok = xzblk.(ZBlk)
if !ok {
return nil, false, nil, fmt.Errorf("expect ZBlk*; got %s", xzodb.TypeOf(xzblk)) // XXX errctx
}
}
return zblk, ok, path, nil
}
func init() {
xbtreetest.ZTreeGetBlkData = _ZTreeGetBlkData
xbtreetest.ZGetBlkData = _ZGetBlkData
xbtreetest.ZGetBlkData = _ZGetBlkData
}
// _ZTreeGetBlkData returns block data from block pointed to by ztree[k].
func _ZTreeGetBlkData(ctx context.Context, ztree *Tree, k Key) (data string, ok bool, path []Node, err error) {
defer xerr.Contextf(&err, "@%s: tree<%s>: get blkdata from [%d]", ztree.PJar().At(), ztree.POid(), k)
zblk, ok, path, err := ztreeGetBlk(ctx, ztree, k)
if err != nil || !ok {
return "", ok, path, err
}
bdata, _, err := zblk.LoadBlkData(ctx)
if err != nil {
return "", false, nil, err
}
return string(bdata), true, path, nil
}
// _ZGetBlkData loads block data from ZBlk object specified by its oid.
func _ZGetBlkData(ctx context.Context, zconn *zodb.Connection, zblkOid zodb.Oid) (data string, err error) {
......
......@@ -36,9 +36,10 @@ type Node = blib.Node
type TreeEntry = blib.TreeEntry
type BucketEntry = blib.BucketEntry
type Key = blib.Key
const KeyMax = blib.KeyMax
const KeyMin = blib.KeyMin
type Key = blib.Key
type KeyRange = blib.KeyRange
const KeyMax = blib.KeyMax
const KeyMin = blib.KeyMin
type setKey = set.I64
......
......@@ -25,19 +25,17 @@ import (
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
)
// ZBlk-related functions are imported at runtime by package xbtreetest/init
var (
ZTreeGetBlkData func(context.Context, *Tree, Key) (string, bool, []Node, error)
ZGetBlkData func(context.Context, *zodb.Connection, zodb.Oid) (string, error)
ZGetBlkData func(context.Context, *zodb.Connection, zodb.Oid) (string, error)
)
func zassertInitDone() {
if ZTreeGetBlkData == nil {
if ZGetBlkData == nil {
panic("xbtreetest/zdata not initialized -> import xbtreetest/init to fix")
}
}
......@@ -54,15 +52,3 @@ func xzgetBlkData(ctx context.Context, zconn *zodb.Connection, zblkOid zodb.Oid)
data, err := ZGetBlkData(ctx, zconn, zblkOid); X(err)
return string(data)
}
// xzgetBlkDataAt loads block data from ZBlk object specified by oid@at.
func xzgetBlkDataAt(db *zodb.DB, zblkOid zodb.Oid, at zodb.Tid) string {
zassertInitDone()
X := exc.Raiseif
txn, ctx := transaction.New(context.Background())
defer txn.Abort()
zconn, err := db.Open(ctx, &zodb.ConnOptions{At: at}); X(err)
return xzgetBlkData(ctx, zconn, zblkOid)
}
// Code generated by gen-rangemap _RangedMap_RebuildJob *_RebuildJob; DO NOT EDIT.
// Copyright (C) 2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xbtree
import "lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xbtree/blib"
// map [lo,hi) Key ranges to values.
import (
"fmt"
"sort"
)
const trace_RangedMap_RebuildJob = false
const debug_RangedMap_RebuildJob = false
// _RangedMap_RebuildJob is Key->*_RebuildJob map with adjacent keys mapped to the same value coalesced into Ranges.
//
// Zero value represents empty map.
type _RangedMap_RebuildJob struct {
// TODO rework to use BTree lo->hi_ instead if in practice in treediff,
// and other usage places, N(ranges) turns out to be not small
// (i.e. performance turns out to be not acceptable)
entryv []_RangedMap_RebuildJobEntry // lo↑
}
// _RangedMap_RebuildJobEntry represents one entry in _RangedMap_RebuildJob.
type _RangedMap_RebuildJobEntry struct {
Value *_RebuildJob
blib.KeyRange
}
// Get returns value associated with key k.
//
// blib.KeyRange indicates all keys adjacent to k, that are too mapped to the same value.
func (M *_RangedMap_RebuildJob) Get(k Key) (*_RebuildJob, blib.KeyRange) {
v, r, _ := M.Get_(k)
return v, r
}
// Set changes M to map key k to value v.
func (M *_RangedMap_RebuildJob) Set(k Key, v *_RebuildJob) {
M.SetRange(blib.KeyRange{Lo: k, Hi_: k}, v)
}
// Del removes key k.
func (M *_RangedMap_RebuildJob) Del(k Key) {
M.DelRange(blib.KeyRange{Lo: k, Hi_: k})
}
// Has returns whether key k is present in the map.
func (M *_RangedMap_RebuildJob) Has(k Key) bool {
_, _, ok := M.Get_(k)
return ok
}
// Get_ is comma-ok version of Get.
func (M *_RangedMap_RebuildJob) Get_(k Key) (v *_RebuildJob, r blib.KeyRange, ok bool) {
r = blib.KeyRange{0,-1} // zero value represents non-empty [0,1)
if trace_RangedMap_RebuildJob {
fmt.Printf("\n\nGet_:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" k: %s\n", blib.KStr(k))
defer func() {
fmt.Printf("->·: %v%s, %t\n", v, r, ok)
}()
}
M.verify()
// find first ilo: k < [ilo].hi
l := len(M.entryv)
ilo := sort.Search(l, func(i int) bool {
return k <= M.entryv[i].Hi_
})
debugf_RangedMap_RebuildJob("\tilo: %d\n", ilo)
if ilo == l { // not found
return
}
e := M.entryv[ilo]
if !(e.Lo <= k) { // not found
return
}
// found
return e.Value, e.KeyRange, true
}
// SetRange changes M to map key range r to value v.
func (M *_RangedMap_RebuildJob) SetRange(r blib.KeyRange, v *_RebuildJob) {
e := _RangedMap_RebuildJobEntry{v,r}
if trace_RangedMap_RebuildJob {
fmt.Printf("\n\nSetRange:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" e: %s\n", e)
defer fmt.Printf("->·: %s\n", M)
}
M.verify()
defer M.verify()
if r.Empty() {
return
}
// clear range for r and insert new entry
// TODO optimize for same-value/set case (just merge all covered
// entries into one - see commented AddRange from set vvv)
i := M.delRange(r)
vInsert__RangedMap_RebuildJob(&M.entryv, i, e)
debugf_RangedMap_RebuildJob("\tinsert %s\t-> %s\n", e, M)
// check if we should merge inserted entry with right/left neighbours
if i+1 < len(M.entryv) { // right
x := M.entryv[i]
right := M.entryv[i+1]
if (x.Hi_+1 == right.Lo) && (v == right.Value) {
vReplaceSlice__RangedMap_RebuildJob(&M.entryv, i,i+2,
_RangedMap_RebuildJobEntry{v, blib.KeyRange{x.Lo, right.Hi_}})
debugf_RangedMap_RebuildJob("\tmerge right\t-> %s\n", M)
}
}
if i > 0 { // left
left := M.entryv[i-1]
x := M.entryv[i]
if (left.Hi_+1 == x.Lo) && (left.Value == v) {
vReplaceSlice__RangedMap_RebuildJob(&M.entryv, i-1,i+1,
_RangedMap_RebuildJobEntry{v, blib.KeyRange{left.Lo, x.Hi_}})
debugf_RangedMap_RebuildJob("\tmerge left\t-> %s\n", M)
}
}
// done
/* how it was for just set:
// find first ilo: r.Lo < [ilo].hi
l := len(S.rangev)
ilo := sort.Search(l, func(i int) bool {
return r.Lo <= S.rangev[i].Hi_
})
debugfRSet("\tilo: %d\n", ilo)
if ilo == l { // not found
S.rangev = append(S.rangev, r)
l++
debugfRSet("\tappend %s\t-> %s\n", r, S)
}
// find last jhi: [jhi].Lo < r.hi
jhi := ilo
for ;; jhi++ {
if jhi == l {
break
}
if S.rangev[jhi].Lo <= r.Hi_ {
continue
}
break
}
debugfRSet("\tjhi: %d\n", jhi)
// entries in [ilo:jhi) ∈ [r.Lo,r.hi) and should be merged into one
if (jhi - ilo) > 1 {
lo := S.rangev[ilo].Lo
hi_ := S.rangev[jhi-1].Hi_
vReplaceSlice__RangedMap_RebuildJob(&S.rangev, ilo,jhi, blib.KeyRange{lo,hi_})
debugfRSet("\tmerge S[%d:%d]\t-> %s\n", ilo, jhi, S)
}
jhi = -1 // no longer valid
// if [r.lo,r.hi) was outside of any entry - create new entry
if r.Hi_ < S.rangev[ilo].Lo {
vInsert__RangedMap_RebuildJob(&S.rangev, ilo, r)
debugfRSet("\tinsert %s\t-> %s\n", r, S)
}
// now we have covered entries merged as needed into [ilo]
// extend this entry if r coverage is wider
if r.Lo < S.rangev[ilo].Lo {
S.rangev[ilo].Lo = r.Lo
debugfRSet("\textend left\t-> %s\n", S)
}
if r.Hi_ > S.rangev[ilo].Hi_ {
S.rangev[ilo].Hi_ = r.Hi_
debugfRSet("\textend right\t-> %s\n", S)
}
// and check if we should merge it with right/left neighbours
if ilo+1 < len(S.rangev) { // right
if S.rangev[ilo].Hi_+1 == S.rangev[ilo+1].Lo {
vReplaceSlice__RangedMap_RebuildJob(&S.rangev, ilo,ilo+2,
blib.KeyRange{S.rangev[ilo].Lo, S.rangev[ilo+1].Hi_})
debugfRSet("\tmerge right\t-> %s\n", S)
}
}
if ilo > 0 { // left
if S.rangev[ilo-1].Hi_+1 == S.rangev[ilo].Lo {
vReplaceSlice__RangedMap_RebuildJob(&S.rangev, ilo-1,ilo+1,
blib.KeyRange{S.rangev[ilo-1].Lo, S.rangev[ilo].Hi_})
debugfRSet("\tmerge left\t-> %s\n", S)
}
}
// done
*/
}
// DelRange removes range r from the map.
func (M *_RangedMap_RebuildJob) DelRange(r blib.KeyRange) {
if trace_RangedMap_RebuildJob {
fmt.Printf("\n\nDelRange:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" r: %s\n", r)
defer fmt.Printf("->·: %s\n", M)
}
M.verify()
defer M.verify()
if r.Empty() {
return
}
M.delRange(r)
}
// delRange deletes range r from the map and returns .entryv index where r
// should be inserted/appended if needed.
//
// r must be !empty.
func (M *_RangedMap_RebuildJob) delRange(r blib.KeyRange) (i int) {
// find first ilo: r.Lo < [ilo].hi
l := len(M.entryv)
ilo := sort.Search(l, func(i int) bool {
return r.Lo <= M.entryv[i].Hi_
})
debugf_RangedMap_RebuildJob("\tilo: %d\n", ilo)
if ilo == l { // not found
debugf_RangedMap_RebuildJob("\tnon-overlap right\n")
return l
}
// find last jhi: [jhi].Lo < r.hi
jhi := ilo
for ;; jhi++ {
if jhi == l {
break
}
if M.entryv[jhi].Lo <= r.Hi_ {
continue
}
break
}
debugf_RangedMap_RebuildJob("\tjhi: %d\n", jhi)
if jhi == 0 {
debugf_RangedMap_RebuildJob("\tnon-overlap left\n")
return 0
}
// [ilo+1:jhi-1] should be deleted
// [ilo] and [jhi-1] overlap with [r.lo,r.hi) - they should be deleted, or shrinked,
// or split+shrinked if ilo==jhi-1 and r is inside [ilo]
if jhi-ilo == 1 && M.entryv[ilo].Lo < r.Lo && r.Hi_ < M.entryv[ilo].Hi_ {
x := M.entryv[ilo]
vInsert__RangedMap_RebuildJob(&M.entryv, ilo, x)
jhi++
debugf_RangedMap_RebuildJob("\tpresplit copy %s\t-> %s\n", x, M)
}
if M.entryv[ilo].Lo < r.Lo { // shrink left
M.entryv[ilo].Hi_ = r.Lo-1
debugf_RangedMap_RebuildJob("\tshrink [%d] left \t-> %s\n", ilo, M)
ilo++
}
if r.Hi_ < M.entryv[jhi-1].Hi_ { // shrink right
M.entryv[jhi-1].Lo = r.Hi_+1
debugf_RangedMap_RebuildJob("\tshrink [%d] right\t-> %s\n", jhi-1, M)
jhi--
}
if (jhi - ilo) > 0 {
vDeleteSlice__RangedMap_RebuildJob(&M.entryv, ilo,jhi)
debugf_RangedMap_RebuildJob("\tdelete M[%d:%d]\t-> %s\n", ilo, jhi, M)
}
// done
return ilo
}
// HasRange returns whether all keys from range r belong to the map.
func (M *_RangedMap_RebuildJob) HasRange(r blib.KeyRange) (yes bool) {
if trace_RangedMap_RebuildJob {
fmt.Printf("\n\nHasRange:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" r: %s\n", r)
defer func() {
fmt.Printf("->·: %v\n", yes)
}()
}
M.verify()
if r.Empty() {
return true
}
// find first ilo: r.lo < [ilo].hi
l := len(M.entryv)
ilo := sort.Search(l, func(i int) bool {
return r.Lo <= M.entryv[i].Hi_
})
debugf_RangedMap_RebuildJob("\tilo: %d\n", ilo)
if ilo == l { // not found
return false
}
// scan right and verify that whole r is covered
lo := r.Lo
for {
e := M.entryv[ilo]
debugf_RangedMap_RebuildJob("\te: %s\ttocheck: %s\n", e, blib.KeyRange{lo, r.Hi_})
if lo < e.Lo {
return false // hole in coverage
}
if r.Hi_ <= e.Hi_ {
return true // reached full coverage
}
lo = e.Hi_
if lo < KeyMax {
lo++
}
ilo++
if ilo == l {
return false // r's right not fully covered
}
}
}
// IntersectsRange returns whether some keys from range r belong to the map.
func (M *_RangedMap_RebuildJob) IntersectsRange(r blib.KeyRange) (yes bool) {
if trace_RangedMap_RebuildJob {
fmt.Printf("\n\nIntersectsRange:\n")
fmt.Printf(" M: %s\n", M)
fmt.Printf(" r: %s\n", r)
defer func() {
fmt.Printf("->·: %v\n", yes)
}()
}
M.verify()
if r.Empty() {
return false
}
// find first ilo: r.lo < [ilo].hi
l := len(M.entryv)
ilo := sort.Search(l, func(i int) bool {
return r.Lo <= M.entryv[i].Hi_
})
debugf_RangedMap_RebuildJob("\tilo: %d\n", ilo)
if ilo == l { // not found
return false
}
// [ilo].hi may be either inside r (≤ r.hi), or > r.hi
// - if it is inside -> overlap is there,
// - if it is > r.hi -> overlap is there if [ilo].lo < r.hi
// => in any case overlap is there if [ilo].lo < r.hi
return M.entryv[ilo].Lo <= r.Hi_
}
// --------
// verify checks _RangedMap_RebuildJob for internal consistency:
// - ranges must be not overlapping and ↑
// - adjacent ranges must map to different values
func (M *_RangedMap_RebuildJob) verify() {
// TODO !debug -> return
var badv []string
badf := func(format string, argv ...interface{}) {
badv = append(badv, fmt.Sprintf(format, argv...))
}
defer func() {
if badv != nil {
emsg := "M.verify: fail:\n\n"
for _, bad := range badv {
emsg += fmt.Sprintf("- %s\n", bad)
}
emsg += fmt.Sprintf("\nM: %s\n", M)
panic(emsg)
}
}()
hi_Prev := KeyMin
var v_Prev *_RebuildJob
for i, e := range M.entryv {
hiPrev := hi_Prev + 1
if i > 0 {
if (e.Value == v_Prev) {
if !(hiPrev < e.Lo) { // NOTE not ≤ - adjacent ranges must be merged
badf("[%d]: same value: !(hiPrev < e.lo)", i)
}
} else {
if !(hi_Prev <= e.Lo) {
badf("[%d]: different value: !(hiPrev ≤ e.lo)", i)
}
}
}
if !(e.Lo <= e.Hi_) {
badf("[%d]: !(e.lo ≤ e.hi_)", i)
}
hi_Prev = e.Hi_
v_Prev = e.Value
}
}
// Clone returns copy of the map.
//
// NOTE values are _not_ cloned.
func (orig *_RangedMap_RebuildJob) Clone() *_RangedMap_RebuildJob {
klon := &_RangedMap_RebuildJob{}
klon.entryv = append(klon.entryv, orig.entryv...)
return klon
}
// Empty returns whether the map is empty.
func (M *_RangedMap_RebuildJob) Empty() bool {
return len(M.entryv) == 0
}
// Equal returns whether A == B.
func (A *_RangedMap_RebuildJob) Equal(B *_RangedMap_RebuildJob) bool {
if len(A.entryv) != len(B.entryv) {
return false
}
for i, ea := range A.entryv {
eb := B.entryv[i]
if ea != eb {
return false
}
}
return true
}
// Clear removes all elements from the map.
func (M *_RangedMap_RebuildJob) Clear() {
M.entryv = nil
}
// AllRanges returns slice of all key ranges in the set.
//
// TODO -> iter?
func (M *_RangedMap_RebuildJob) AllRanges() /*readonly*/[]_RangedMap_RebuildJobEntry {
return M.entryv
}
func (M _RangedMap_RebuildJob) String() string {
s := "{"
for i, e := range M.entryv {
if i > 0 {
s += " "
}
s += e.String()
}
s += "}"
return s
}
func (e _RangedMap_RebuildJobEntry) String() string {
s := e.KeyRange.String()
v := fmt.Sprintf("%v", e.Value)
if v != "" { // omit ":<v>" in the case of set
s += ":" + v
}
return s
}
func debugf_RangedMap_RebuildJob(format string, argv ...interface{}) {
if !debug_RangedMap_RebuildJob {
return
}
fmt.Printf(format, argv...)
}
// ---- slice ops ----
// vInsert__RangedMap_RebuildJob inserts e into *pv[i].
func vInsert__RangedMap_RebuildJob(pv *[]_RangedMap_RebuildJobEntry, i int, e _RangedMap_RebuildJobEntry) {
v := *pv
v = append(v, _RangedMap_RebuildJobEntry{})
copy(v[i+1:], v[i:])
v[i] = e
*pv = v
}
// vDeleteSlice__RangedMap_RebuildJob deletes *pv[lo:hi].
func vDeleteSlice__RangedMap_RebuildJob(pv *[]_RangedMap_RebuildJobEntry, lo,hi int) {
v := *pv
n := copy(v[lo:], v[hi:])
v = v[:lo+n]
*pv = v
}
// vReplaceSlice__RangedMap_RebuildJob replaces *pv[lo:hi] with e.
func vReplaceSlice__RangedMap_RebuildJob(pv *[]_RangedMap_RebuildJobEntry, lo,hi int, e _RangedMap_RebuildJobEntry) {
v := *pv
n := copy(v[lo+1:], v[hi:])
v[lo] = e
v = v[:lo+1+n]
*pv = v
}
......@@ -34,7 +34,7 @@ package xbtree
//
// Because it is very computationally expensive(+) to find out for an object to
// which BTree it belongs, ΔBtail cannot provide full BTree-level history given
// just ΔZtail with δZ changes. Because of this ΔBtail requires help from
// just ΔZtail with δZ changes. Due to this ΔBtail requires help from
// users, which are expected to call ΔBtail.Track(treepath) to let ΔBtail know
// that such and such ZODB objects constitute a path from root of a tree to some
// of its leaf. After Track call the objects from the path and tree keys, that
......@@ -53,33 +53,103 @@ package xbtree
// traversal also belongs to the set.
//
// A new Track request potentially grows tracked keys coverage. Due to this,
// ΔBtail needs to recompute potentially whole vδT of the affected tree. This
// recomputation is managed by "rebuild..." family of functions and uses the
// same treediff algorithm, that Update is using, but modulo PPTreeSubSet
// corresponding to δ key coverage. Update also potentially needs to rebuild
// whole vδT history, not only append new δT, because a change to tracked tree
// nodes can result in growth of tracked key coverage.
// on a query, ΔBtail needs to recompute potentially whole vδT of the affected
// tree. This recomputation is managed by "vδTSnapForTracked*" and "_rebuild"
// functions and uses the same treediff algorithm, that Update is using, but
// modulo PPTreeSubSet corresponding to δ key coverage. Update also potentially
// needs to rebuild whole vδT history, not only append new δT, because a
// change to tracked tree nodes can result in growth of tracked key coverage.
//
// Queries are relatively straightforward code that work on vδT snapshot. The
// main complexity, besides BTree-diff algorithm, lies in recomputing vδT when
// set of tracked keys changes. XXX and in concurrency
// set of tracked keys changes, and in handling that recomputation in such a way
// that multiple Track and queries requests could be all served in parallel.
//
//
// XXX concurrency
// Concurrency
//
// In order to allow multiple Track and queries requests to be served in
// parallel ΔBtail employs special organization of vδT rebuild process:
//
// 1. vδT is managed under read-copy-update (RCU) discipline: before making
// any vδT change the mutator atomically clones whole vδT and applies its
// change to the clone. This way a query, once it retrieves vδT snapshot,
// does not need to further synchronize with vδT mutators, and can rely on
// that retrieved vδT snapshot will remain immutable.
//
// 2. a Track request goes through 3 states: "new", "handle-in-progress" and
// "handled". At each state keys/nodes of the Track are maintained in:
//
// - ΔTtail.ktrackNew and .trackNew for "new",
// - ΔTtail.krebuildJobs for "handle-in-progress", and
// - ΔBtail.trackSet for "handled".
//
// trackSet keeps nodes, and implicitly keys, from all handled Track
// requests. For all keys, covered by trackSet, vδT is fully computed.
//
// a new Track(keycov, path) is remembered in ktrackNew and trackNew to be
// further processed when a query should need keys from keycov. vδT is not
// yet providing data for keycov keys.
//
// when a Track request starts to be processed, its keys and nodes are moved
// from ktrackNew/trackNew into krebuildJobs. vδT is not yet providing data
// for requested-to-be-tracked keys.
//
// all trackSet, trackNew/ktrackNew and krebuildJobs are completely disjoint:
//
// trackSet ^ trackNew = ø
// trackSet ^ krebuildJobs = ø
// trackNew ^ krebuildJobs = ø
//
// 3. when a query is served, it needs to retrieve vδT snapshot that takes
// related previous Track requests into account. Retrieving such snapshots
// is implemented in vδTSnapForTracked*() family of functions: there it
// checks ktrackNew/trackNew, and if those sets overlap with query's keys
// of interest, run vδT rebuild for keys queued in ktrackNew.
//
// the main part of that rebuild can be run without any locks, because it
// does not use nor modify any ΔBtail data, and for δ(vδT) it just computes
// a fresh full vδT build modulo retrieved ktrackNew. Only after that
// computation is complete, ΔBtail is locked again to quickly merge in
// δ(vδT) update back into vδT.
//
// This organization is based on the fact that
//
// vδT/(T₁∪T₂) = vδT/T₁ | vδT/T₂
//
// i.e. vδT computed for tracked set being union of T₁ and T₂ is the same
// as merge of vδT computed for tracked set T₁ and vδT computed for tracked
// set T₂.
//
// this merge property allows to run computation for δ(vδT) independently
// and with ΔBtail unlocked, which in turn enables running several
// Track/queries in parallel.
//
// 4. while vδT rebuild is being run, krebuildJobs keeps corresponding keycov
// entry to indicate in-progress rebuild. Should a query need vδT for keys
// from that job, it first waits for corresponding job(s) to complete.
//
// Explained rebuild organization allows non-overlapping queries/track-requests
// to run simultaneously. This property is essential to WCFS because otherwise
// WCFS would not be able to serve several non-overlapping READ requests to one
// file in parallel.
//
// --------
//
// (*) implemented in treediff.go
// (+) full database scan
//go:generate ./blib/gen-rangemap _RangedMap_RebuildJob *_RebuildJob zrangemap_rebuildjob.go
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -126,43 +196,71 @@ const debugΔBtail = false
// An example for tracked set is a set of visited BTree paths.
// There is no requirement that tracked set belongs to only one single BTree.
//
// XXX concurrent use
//
// See also zodb.ΔTail and zdata.ΔFtail
//
//
// Concurrency
//
// ΔBtail is safe to use in single-writer / multiple-readers mode. That is at
// any time there should be either only sole writer, or, potentially several
// simultaneous readers. The table below classifies operations:
//
// Writers: Update, ForgetPast
// Readers: Track + all queries (SliceByRev, SliceByRootRev, GetAt)
//
// Note that, in particular, it is correct to run multiple Track and queries
// requests simultaneously.
type ΔBtail struct {
// raw ZODB changes; Kept to rebuild .byRoot after new Track.
// includes all changed objects, not only tracked ones.
δZtail *zodb.ΔTail
// handle to make connections to access database.
// TODO allow client to optionally provide zconnOld/zconnNew on e.g. Update()
db *zodb.DB // to open connections to load new/old tree|buckets
// mu protects ΔBtail data _and_ all _ΔTtail data for all roots.
//
// NOTE: even though this lock is global, since _ΔTtail.vδT is updated
// via RCU, working with retrieved vδT snapshot does not need to hold the lock.
mu sync.Mutex
vδBroots []_ΔBroots // [] (rev↑, roots changed in this rev)
byRoot map[zodb.Oid]*_ΔTtail // {} root -> [] k/v change history; only for keys ∈ tracked subset
// set of tracked nodes as of @head state.
// For this set all vδT are fully computed.
// The set of nodes that were requested to be tracked, but were not yet
// taken into account, is kept in _ΔTtail.trackNew & co.
// The set of keys(nodes) that were requested to be tracked, but were
// not yet taken into account, is kept in _ΔTtail.ktrackNew & co.
trackSet blib.PPTreeSubSet
// set of trees for which _ΔTtail.trackNew is non-empty
// set of trees for which _ΔTtail.ktrackNew is non-empty
trackNewRoots setOid
// handle to make connections to access database.
// TODO allow client to optionally provide zconnOld/zconnNew on e.g. Update()
db *zodb.DB // to open connections to load new/old tree|buckets
}
// _ΔTtail represent tail of revisional changes to one BTree.
//
// See ΔBtail documentation for details.
type _ΔTtail struct {
vδT []ΔTree // changes to tree keys; rev↑. covers keys ∈ tracked subset
// set of nodes that were requested to be tracked in this tree, but for
// which vδT was not yet rebuilt
trackNew blib.PPTreeSubSet
// changes to tree keys; rev↑. covers keys ∈ tracked subset
// Note: changes to vδT go through RCU - see "Concurrency" in overview.
vδT []ΔTree
// set of keys that were requested to be tracked in this tree,
// but for which vδT rebuild was not yet started
ktrackNew blib.RangedKeySet // {keycov}
// set of nodes corresponding to ktrackNew
trackNew blib.PPTreeSubSet // PP{nodes}
// set of keys(nodes) for which rebuild is in progress
krebuildJobs _RangedMap_RebuildJob // {} keycov -> job
}
// XXX + trackNewKeys RangedKeySet (concurrency)
// XXX + trackSetKeys RangedKeySet
// _RebuildJob represents currently in-progress vδT rebuilding job.
type _RebuildJob struct {
trackNew blib.PPTreeSubSet // rebuilding for this trackNew
ready chan struct{} // closed when job completes
err error
}
// _ΔBroots represents roots-only part of ΔB.
......@@ -254,6 +352,9 @@ func (orig *_ΔTtail) Clone() *_ΔTtail {
// vδTClone returns deep copy of []ΔTree.
func vδTClone(orig []ΔTree) []ΔTree {
if orig == nil {
return nil
}
klon := make([]ΔTree, 0, len(orig))
for _, origδT := range orig {
klonδT := ΔTree{
......@@ -273,30 +374,20 @@ func (δBtail *ΔBtail) Head() zodb.Tid { return δBtail.δZtail.Head() }
func (δBtail *ΔBtail) Tail() zodb.Tid { return δBtail.δZtail.Tail() }
// ---- Track/rebuild/Update/Forget ----
// ---- Track/snapshot+rebuild/Update/Forget ----
// Track adds tree path to tracked set.
//
// path[0] signifies tree root.
// All path elements must be Tree except last one which, for non-empty tree, must be Bucket.
// path[-1] signifies leaf node.
// keycov should be key range covered by the leaf node.
//
// Besides key (which might point to value or hole), δBtail will also track all
// keys covered by leaf node. In particular after request for KeyMax or KeyMin
// to be tracked, δBtail will keep on tracking changes to maximum or minimum
// keys correspondingly.
// ΔBtail will start tracking provided tree nodes and keys ∈ keycov.
//
// Objects in path must be with .PJar().At() == .Head()
func (δBtail *ΔBtail) Track(key Key, nodePath []Node) {
// NOTE key not needed for anything besides tracing
// (tracking set will be added with all keys, covered by leaf keyrange)
if traceΔBtail {
pathv := []string{}
for _, node := range nodePath { pathv = append(pathv, vnode(node)) }
tracefΔBtail("\nTrack [%v] %s\n", key, strings.Join(pathv, " -> "))
tracefΔBtail("trackSet: %s\n", δBtail.trackSet) // XXX locking
}
// All path elements must be Tree except last one which, for non-empty tree, must be Bucket.
//
// Objects in the path must be with .PJar().At() == .Head()
func (δBtail *ΔBtail) Track(nodePath []Node, keycov KeyRange) {
head := δBtail.Head()
for _, node := range nodePath {
nodeAt := node.PJar().At()
......@@ -306,7 +397,7 @@ func (δBtail *ΔBtail) Track(key Key, nodePath []Node) {
}
path := nodePathToPath(nodePath)
δBtail.track(key, path)
δBtail.track(path, keycov)
}
// nodePathToPath converts path from []Node to []Oid.
......@@ -334,8 +425,16 @@ func nodePathToPath(nodePath []Node) (path []zodb.Oid) {
return path
}
func (δBtail *ΔBtail) track(key Key, path []zodb.Oid) {
// XXX locking
func (δBtail *ΔBtail) track(path []zodb.Oid, keycov KeyRange) {
δBtail.mu.Lock() // TODO verify that there is no in-progress writers
defer δBtail.mu.Unlock()
if traceΔBtail {
pathv := []string{}
for _, node := range path { pathv = append(pathv, node.String()) }
tracefΔBtail("\nTrack %s %s\n", keycov, strings.Join(pathv, " -> "))
tracefΔBtail("trackSet: %s\n", δBtail.trackSet)
}
// first normalize path: remove embedded bucket and check if it was an
// empty artificial tree. We need to do the normalization because we
......@@ -347,284 +446,222 @@ func (δBtail *ΔBtail) track(key Key, path []zodb.Oid) {
}
root := path[0]
// nothing to do if key is already tracked
leaf := path[len(path)-1]
if δBtail.trackSet.Has(leaf) {
tracefΔBtail("->T: nop\n")
path_ := δBtail.trackSet.Path(leaf)
// assertSamePathToLeaf asserts that T.Path(leaf) == path.
assertSamePathToLeaf := func(T blib.PPTreeSubSet, Tname string) {
path_ := T.Path(leaf)
if !pathEqual(path, path_) {
panicf("BUG: key %s is already tracked via path=%v\ntrack requests path=%v", kstr(key), path_, path)
panicf("BUG: keycov %s is already in %s via path=%v\ntrack requests path=%v", keycov, Tname, path_, path)
}
}
// nothing to do if keycov is already tracked
if δBtail.trackSet.Has(leaf) {
tracefΔBtail("->T: nop (already in trackSet)\n")
assertSamePathToLeaf(δBtail.trackSet, "trackSet")
return
}
// queue path into trackNew
δTtail, ok := δBtail.byRoot[root]
if !ok {
δTtail = newΔTtail()
δBtail.byRoot[root] = δTtail
}
δBtail.trackNewRoots.Add(root)
δTtail.trackNew.AddPath(path)
tracefΔBtail("->T: [%s].trackNew -> %s\n", root, δTtail.trackNew)
}
// rebuildAll rebuilds ΔBtail taking all trackNew requests into account.
func (δBtail *ΔBtail) rebuildAll() (err error) {
defer xerr.Context(&err, "ΔBtail rebuildAll")
// XXX locking
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), δBtail.trackNewRoots)
for root := range δBtail.trackNewRoots {
delete(δBtail.trackNewRoots, root)
δBtail.rebuild1(root)
// nothing to do if keycov is already queued to be tracked in trackNew or krebuildJobs
if δTtail.krebuildJobs.IntersectsRange(keycov) {
tracefΔBtail("->T: nop (already in krebuildJobs)\n")
job, r, ok := δTtail.krebuildJobs.Get_(keycov.Lo)
if !(ok && r == keycov) {
panicf("BUG: keycov is already present in krebuildJobs, but only partly\nkeycov: %s\nkrebuildJobs: %v",
keycov, δTtail.krebuildJobs)
}
assertSamePathToLeaf(job.trackNew, "job.trackNew")
return
}
return nil
}
// rebuild1IfNeeded rebuilds ΔBtail for single root if that root needs rebuilding.
func (δBtail *ΔBtail) rebuild1IfNeeded(root zodb.Oid) error {
// XXX locking
_, ok := δBtail.trackNewRoots[root]
if !ok {
return nil
if δTtail.trackNew.Has(leaf) {
tracefΔBtail("->T: nop (already in trackNew)\n")
assertSamePathToLeaf(δTtail.trackNew, "trackNew")
return
}
delete(δBtail.trackNewRoots, root)
return δBtail.rebuild1(root)
}
// rebuild1 rebuilds ΔBtail for single root.
func (δBtail *ΔBtail) rebuild1(root zodb.Oid) error {
// XXX locking
δTtail := δBtail.byRoot[root] // must be there
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
if err != nil {
return err
}
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
return nil
// keycov not in trackSet/trackNew/krebuildJobs -> queue it into trackNew
δBtail.trackNewRoots.Add(root)
δTtail.trackNew.AddPath(path)
δTtail.ktrackNew.AddRange(keycov)
tracefΔBtail("->T: [%s].trackNew -> %s\n", root, δTtail.trackNew)
tracefΔBtail("->T: [%s].ktrackNew -> %s\n", root, δTtail.ktrackNew)
}
// rebuild rebuilds _ΔTtail taking trackNew requests into account.
// vδTSnapForTrackedKey returns vδT snapshot for root that takes into account
// at least all previous Track requests related to key.
//
// It returns:
//
// - set of nodes that must be added to ΔBtail.trackSet to account for
// keys that becomes tracked. Note: this set is potentially wider compared to what was in .trackNew.
// - set of revisions for which new entries in .vδT have been created.
func (δTtail *_ΔTtail) rebuild(root zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (δtrackSet blib.PPTreeSubSet, δrevSet setTid, err error) {
defer xerr.Contextf(&err, "ΔTtail<%s> rebuild", root)
// XXX locking
tracefΔBtail("\nRebuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
tracefΔBtail("trackNew: %v\n", δTtail.trackNew)
trackNew := δTtail.trackNew
δTtail.trackNew = blib.PPTreeSubSet{}
if len(trackNew) == 0 {
return nil, nil, nil
// vδT is rebuilt if there are such not-yet-handled Track requests.
func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []ΔTree, err error) {
δBtail.mu.Lock() // TODO verify that there is no in-progress writers
δTtail := δBtail.byRoot[root] // must be there
if δTtail == nil {
δBtail.mu.Unlock()
panicf("δBtail: root<%s> not tracked", root)
}
δrevSet = setTid{}
// clone vδT before modifying it
// queries such as SliceByRootRev return slices of vδT and we do not
// want to change data that is already returned to user.
δTtail.vδT = vδTClone(δTtail.vδT)
// TODO key not tracked -> panic (check key ∈ lastRevOf)
// go backwards and merge vδT <- treediff(lo..hi/trackNew)
vδZ := δZtail.Data()
for {
δtkeycov := &blib.RangedKeySet{} // all keys coming into tracking set during this lo<-hi scan
trackNewCur := trackNew.Clone() // trackNew adjusted as of when going to i<- entry
for i := len(vδZ)-1; i>=0; i-- {
δZ := vδZ[i]
var atPrev zodb.Tid
if i > 0 {
atPrev = vδZ[i-1].Rev
} else {
atPrev = δZtail.Tail()
}
δtrackNew, δtkeycov_, newRevEntry, err := δTtail.rebuild1(atPrev, δZ, trackNewCur, db)
if err != nil {
return nil, nil, err
}
trackNewCur.ApplyΔ(δtrackNew)
δtkeycov.UnionInplace(δtkeycov_)
if newRevEntry {
δrevSet.Add(δZ.Rev)
}
if !δTtail.ktrackNew.Has(key) {
// key ∉ ktrackNew
job, _, inJobs := δTtail.krebuildJobs.Get_(key)
if !inJobs {
// key ∉ krebuildJobs -> it should be already in trackSet
vδT = δTtail.vδT
δBtail.mu.Unlock()
return vδT, nil
}
// an iteration closer to tail may turn out to add a key to the tracking set.
// We have to recheck all entries newer that revision for changes to that key,
// for example:
//
// 8 5*
// / \ <- / \
// 2 8 2* 7
//
// here initial tracked set is 5*-2*. Going to earlier revision
// 2'th keycov range is widen from [-∞,5) to [-∞,7), so 5*-7 in
// later revision have to be rechecked because 7 was added into
// tracking set.
//
// Implement this via restarting from head and cycling until
// set of tracked keys does not grow anymore.
if δtkeycov.Empty() {
break
// rebuild for root[key] is in progress -> wait for corresponding job to complete
δBtail.mu.Unlock()
<-job.ready
if job.err == nil {
δBtail.mu.Lock()
vδT = δTtail.vδT
δBtail.mu.Unlock()
}
return vδT, job.err
}
err := widenTrackNew(trackNew, δtkeycov, root, δZtail.Head(), db)
if err != nil {
return nil, nil, err
}
// key ∈ ktrackNew -> this goroutine becomes responsible to rebuild vδT for it
// run rebuild job for all keys queued in ktrackNew so far
err = δTtail._rebuild(root, δBtail)
if err == nil {
vδT = δTtail.vδT
}
δBtail.mu.Unlock()
return trackNew, δrevSet, nil
return vδT, err
}
// rebuild1 rebuilds δT for single δZ.
// vδTSnapForTracked returns vδT snapshot for root that takes into account all
// previous Track requests.
//
// δtrackNew/δtkeycov represents how trackNew changes when going through `atPrev <- δZ.Rev` .
// newRevEntry indicates whether δZ.Rev was not there before in .vδT and new corresponding δT entry was created.
func (δTtail *_ΔTtail) rebuild1(atPrev zodb.Tid, δZ zodb.ΔRevEntry, trackNew blib.PPTreeSubSet, db *zodb.DB) (δtrackNew *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, newRevEntry bool, err error) {
defer xerr.Contextf(&err, "rebuild1 %s<-%s", atPrev, δZ.Rev)
debugfΔBtail("\n rebuild1 @%s <- @%s\n", atPrev, δZ.Rev)
debugfΔBtail(" δZ:\t%v\n", δZ.Changev)
debugfΔBtail(" trackNew: %v\n", trackNew)
defer func() {
debugfΔBtail("-> δtrackNew: %v\n", δtrackNew)
debugfΔBtail("-> δtkeycov: %v\n", δtkeycov)
debugfΔBtail("-> newRevEntry: %v\n", newRevEntry)
debugfΔBtail("\n\n")
}()
// NOTE: keep vvv in sync with ΔBtail._Update1
δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, trackNew)
// skip opening DB connections if there is no change to this tree
if len(δtopsByRoot) == 0 {
return blib.NewΔPPTreeSubSet(), &blib.RangedKeySet{}, false, nil
// vδT is rebuilt if there are such not-yet-handled Track requests.
func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err error) {
δBtail.mu.Lock() // TODO verify that there is no in-progress writers
δTtail := δBtail.byRoot[root] // must be there
if δTtail == nil {
δBtail.mu.Unlock()
panicf("δBtail: root<%s> not tracked", root)
}
if len(δtopsByRoot) != 1 {
panicf("BUG: δtopsByRoot has > 1 entries: %v\ntrackNew: %v\nδZ: %v", δtopsByRoot, trackNew, δZ)
}
var root zodb.Oid
var δtops setOid
for root_, δtops_ := range δtopsByRoot {
root = root_
δtops = δtops_
// prepare to wait for all already running jobs, if any
wg := xsync.NewWorkGroup(context.Background())
for _, e := range δTtail.krebuildJobs.AllRanges() {
job := e.Value
wg.Go(func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-job.ready:
return job.err
}
})
}
// run new rebuild job if there are not-yet-handled Track requests
var errJob error
if !δTtail.ktrackNew.Empty() {
errJob = δTtail._rebuild(root, δBtail)
}
// open ZODB connection corresponding to "current" and "prev" states
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
// wait for previous jobs to complete as well
δBtail.mu.Unlock()
errWait := wg.Wait()
zconnPrev, err := db.Open(ctx, &zodb.ConnOptions{At: atPrev})
err = xerr.First(errJob, errWait)
if err != nil {
return nil, nil, false, err
}
zconnCurr, err := db.Open(ctx, &zodb.ConnOptions{At: δZ.Rev})
if err != nil {
return nil, nil, false, err
return nil, err
}
// diff backwards curr -> prev
δT, δtrack, δtkeycov, err := treediff(ctx, root, δtops, δZTC, trackNew, zconnCurr, zconnPrev)
if err != nil {
return nil, nil, false, err
}
// now it is ok to take the snapshot
δBtail.mu.Lock()
vδT = δTtail.vδT
δBtail.mu.Unlock()
debugfΔBtail(" -> root<%s> δkv*: %v δtrack*: %v δtkeycov*: %v\n", root, δT, δtrack, δtkeycov)
return vδT, nil
}
if len(δT) == 0 { // an object might be resaved without change
return δtrack, δtkeycov, false, nil
// _rebuild runs rebuild job for current .ktrackNew/.trackNew
//
// must be called with δBtail.mu locked.
// returns with δBtail.mu locked.
func (δTtail *_ΔTtail) _rebuild(root zodb.Oid, δBtail *ΔBtail) (err error) {
return δTtail.__rebuild(root, δBtail, /*releaseLock=*/true)
}
func (δTtail *_ΔTtail) __rebuild(root zodb.Oid, δBtail *ΔBtail, releaseLock bool) (err error) {
defer xerr.Contextf(&err, "ΔBtail._rebuild root<%s>", root)
trackNew := δTtail.trackNew
ktrackNew := δTtail.ktrackNew
δTtail.trackNew = blib.PPTreeSubSet{}
δTtail.ktrackNew = blib.RangedKeySet{}
job := &_RebuildJob{trackNew: trackNew, ready: make(chan struct{})}
// krebuildJobs += ktrackNew
for _, r := range ktrackNew.AllRanges() {
// assert krebuildJobs ^ r = ø
if δTtail.krebuildJobs.IntersectsRange(r) {
panicf("BUG: rebuild: prologue: " +
"krebuildJobs ^ ktrackNew != ø:\nkrebuildJobs: %s\nktrackNew: %s",
δTtail.krebuildJobs, ktrackNew)
}
δTtail.krebuildJobs.SetRange(r, job)
}
delete(δBtail.trackNewRoots, root)
// δTtail.vδT <- merge δT*
l := len(δTtail.vδT)
j := sort.Search(l, func(k int) bool {
return δZ.Rev <= δTtail.vδT[k].Rev
})
if j == l || δTtail.vδT[j].Rev != δZ.Rev {
newRevEntry = true
δTcurr := ΔTree{Rev: δZ.Rev, KV: map[Key]ΔValue{}}
// insert(@j, δTcurr)
δTtail.vδT = append(δTtail.vδT[:j],
append([]ΔTree{δTcurr},
δTtail.vδT[j:]...)...)
// build δ(vδT) without the lock
if releaseLock {
δBtail.mu.Unlock()
}
vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db)
if releaseLock {
δBtail.mu.Lock()
}
δTcurr := δTtail.vδT[j]
for k, δv := range δT {
// the diff was backward; δTtail entries are with diff forward
δv.New, δv.Old = δv.Old, δv.New
δv_, already := δTcurr.KV[k]
if already {
if δv != δv_ {
panicf("[%v] inconsistent δv:\nδTcurr: %v\nδT: %v", k, δTcurr, δT)
}
} else {
δTcurr.KV[k] = δv
// krebuildJobs -= ktrackNew
for _, r := range ktrackNew.AllRanges() {
// assert krebuildJobs[r] = job
job_, r_ := δTtail.krebuildJobs.Get(r.Lo)
if !(job_ == job && r_ == r) {
panicf("BUG: rebuild: epilogue: " +
"krebuildJobs entry mutated:\nset in prologue [%s]=%p\ngot in epilogue: [%s]=%p",
r, job, r_, job_)
}
δTtail.krebuildJobs.DelRange(r)
}
return δtrack, δtkeycov, newRevEntry, nil
}
// widenTrackNew widens trackNew to cover δtkeycov.
func widenTrackNew(trackNew blib.PPTreeSubSet, δtkeycov *blib.RangedKeySet, root zodb.Oid, at zodb.Tid, db *zodb.DB) (err error) {
defer xerr.Contextf(&err, "widenTrackNew tree<%s> @%s +%s", root, at, δtkeycov)
debugfΔBtail("\n widenTrackNew %s @%s +%s", root, at, δtkeycov)
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zhead, err := db.Open(ctx, &zodb.ConnOptions{At: at}); /*X*/ if err != nil { return err }
xtree, err := zgetNodeOrNil(ctx, zhead, root); /*X*/ if err != nil { return err }
if xtree == nil {
// root deleted -> root node covers [-∞,∞)
trackNew.AddPath([]zodb.Oid{root})
return nil
// merge rebuild result
if err == nil {
// vδT <- vδTnew RCU; trackSet += δtrackSet
δTtail.vδT = vδTClone(δTtail.vδT)
δrevSet := vδTMergeInplace(&δTtail.vδT, vδTnew)
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail._vδBroots_Update(root, δrevSet)
} else {
// reinstate trackNew and ktrackNew back, so that data for those
// keys are tried to be rebuilt next time, not silently remain
// missing in vδT, i.e. corrupted.
δTtail.trackNew.UnionInplace(trackNew)
δTtail.ktrackNew.UnionInplace(&ktrackNew)
δBtail.trackNewRoots.Add(root)
}
tree := xtree.(*Tree) // must succeed
top := &nodeInRange{prefix: nil, keycov: blib.KeyRange{KeyMin, KeyMax}, node: tree}
V := rangeSplit{top}
for _, r := range δtkeycov.AllRanges() {
lo := r.Lo
for {
b, err := V.GetToLeaf(ctx, lo); /*X*/ if err != nil { return err }
trackNew.AddPath(b.Path())
// continue with next right bucket until r coverage is complete
if r.Hi_ <= b.keycov.Hi_ {
break
}
lo = b.keycov.Hi_ + 1
}
}
return nil
// we are done
job.err = err
close(job.ready)
return err
}
// Update updates δB with object-level ZODB changes.
//
// Only those objects from δZ that belong to tracked set are guaranteed to be
......@@ -635,6 +672,10 @@ func widenTrackNew(trackNew blib.PPTreeSubSet, δtkeycov *blib.RangedKeySet, roo
//
// TODO optionally accept zconnOld/zconnNew from client
func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
δBtail.mu.Lock()
defer δBtail.mu.Unlock()
// TODO verify that there is no in-progress readers/writers
headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid)
......@@ -651,19 +692,18 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
if err != nil {
return ΔB{}, err
}
if !δTtail.trackNew.Empty() {
panicf("BUG: ΔBtail.Update: ΔTtail<%s>.trackNew != ø after _Update1", root)
}
δTtail.trackNew = trackNew
// NOTE we cannot skip computing diff for HEAD~..HEAD
// even after _Update1 because _Update1 was working with different trackNew.
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db)
if err != nil {
return ΔB{}, err
}
// vδT <- vδTnew RCU; trackSet += δtrackSet
δTtail.vδT = vδTClone(δTtail.vδT)
δrevSet := vδTMergeInplace(&δTtail.vδT, vδTnew)
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
δBtail._vδBroots_Update(root, δrevSet)
}
// build δB. Even if δT=ø after _Update1, but δtkeycov1 != ø, above
......@@ -680,7 +720,7 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
// vδBroots += δB (δB.Rev could be already there added by ^^^ rebuild)
for root := range δB.ByRoot {
δBtail.vδBroots_Update1(root, δB.Rev)
δBtail._vδBroots_Update1(root, δB.Rev)
}
return δB, err
......@@ -705,19 +745,20 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e
for _, root := range δBtail.trackNewRoots.SortedElements() {
δTtail := δBtail.byRoot[root]
tracefΔBtail("[%s].trackNew: %v\n", root, δTtail.trackNew)
tracefΔBtail("[%s].ktrackNew: %v\n", root, δTtail.ktrackNew)
}
δB1 = _ΔBUpdate1{ByRoot: make(map[zodb.Oid]*_ΔTUpdate1)}
// update .trackSet and vδB from .trackNew
err = δBtail.rebuildAll()
err = δBtail._rebuildAll()
if err != nil {
return δB1, err
}
δBtail.δZtail.Append(δZ.Tid, δZ.Changev)
// NOTE: keep vvv in sync with ΔTtail.rebuild1
// NOTE: keep vvv in sync with vδTBuild1
δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, δBtail.trackSet)
......@@ -762,17 +803,34 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e
return δB1, nil
}
// vδBroots_Update updates .vδBroots to remember that _ΔTtail for root has
// _rebuildAll rebuilds ΔBtail taking all trackNew requests into account.
func (δBtail *ΔBtail) _rebuildAll() (err error) {
defer xerr.Context(&err, "ΔBtail._rebuildAll")
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), δBtail.trackNewRoots)
for root := range δBtail.trackNewRoots {
δTtail := δBtail.byRoot[root] // must be there
err = δTtail.__rebuild(root, δBtail, /*releaseLock=*/false)
if err != nil {
return err
}
}
return nil
}
// _vδBroots_Update updates .vδBroots to remember that _ΔTtail for root has
// changed entries with δrevSet revisions.
func (δBtail *ΔBtail) vδBroots_Update(root zodb.Oid, δrevSet setTid) {
// XXX locking
//
// must be called with δBtail.mu locked.
func (δBtail *ΔBtail) _vδBroots_Update(root zodb.Oid, δrevSet setTid) {
// TODO δrevSet -> []rev↑ and merge them in one go
for rev := range δrevSet {
δBtail.vδBroots_Update1(root, rev)
δBtail._vδBroots_Update1(root, rev)
}
}
func (δBtail *ΔBtail) vδBroots_Update1(root zodb.Oid, rev zodb.Tid) {
func (δBtail *ΔBtail) _vδBroots_Update1(root zodb.Oid, rev zodb.Tid) {
l := len(δBtail.vδBroots)
j := sort.Search(l, func(k int) bool {
return rev <= δBtail.vδBroots[k].Rev
......@@ -790,7 +848,9 @@ func (δBtail *ΔBtail) vδBroots_Update1(root zodb.Oid, rev zodb.Tid) {
// ForgetPast forgets history entries with revision ≤ revCut.
func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
// XXX locking
δBtail.mu.Lock()
defer δBtail.mu.Unlock()
// TODO verify that there is no in-progress readers/writers
δBtail.δZtail.ForgetPast(revCut)
......@@ -812,13 +872,11 @@ func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
// trim roots
for root := range totrim {
δTtail := δBtail.byRoot[root] // must be present
δTtail.forgetPast(revCut)
δTtail._forgetPast(revCut)
}
}
func (δTtail *_ΔTtail) forgetPast(revCut zodb.Tid) {
// XXX locking
func (δTtail *_ΔTtail) _forgetPast(revCut zodb.Tid) {
icut := 0
for ; icut < len(δTtail.vδT); icut++ {
if δTtail.vδT[icut].Rev > revCut {
......@@ -869,7 +927,22 @@ func (δTtail *_ΔTtail) forgetPast(revCut zodb.Tid) {
// key must be tracked
// at must ∈ (tail, head]
func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value, rev zodb.Tid, valueExact, revExact bool, err error) {
defer xerr.Contextf(&err, "δBtail: root<%s>: get %d @%s", root, key, at)
defer xerr.Contextf(&err, "ΔBtail: root<%s>: get %d @%s", root, key, at)
if traceΔBtail {
tracefΔBtail("\nGet root<%s>[%s] @%s\n", root, kstr(key), at)
defer func() {
vexact := ""
rexact := ""
if !valueExact {
vexact = "~"
}
if !revExact {
rexact = "~"
}
tracefΔBtail("-> value: %s%s rev: @%s%s\n", value, vexact, rev, rexact)
}()
}
tail := δBtail.Tail()
head := δBtail.Head()
......@@ -877,29 +950,22 @@ func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value,
panicf("at out of bounds: at: @%s, (tail, head] = (@%s, @%s]", at, tail, head)
}
// XXX locking
value = VDEL
valueExact = false
rev = tail
revExact = false
// XXX need to rebuild only if key was not rebuilt yet
// XXX need to rebuild only for key, not for whole trackNew
err = δBtail.rebuild1IfNeeded(root)
// retrieve vδT snapshot that is rebuilt to take Track(key) requests into account
vδT, err := δBtail.vδTSnapForTrackedKey(root, key)
if err != nil {
return value, rev, valueExact, revExact, err
}
debugfΔBtail(" vδT: %v\n", vδT)
δTtail := δBtail.byRoot[root]
if δTtail == nil {
panicf("δBtail: root<%s> not tracked", root)
}
// TODO key not tracked -> panic (check key ∈ lastRevOf -- see vvv)
// TODO -> index lastRevOf(key) | linear scan ↓ looking for change ≤ at
for i := len(δTtail.vδT)-1; i >= 0; i-- {
δT := δTtail.vδT[i]
for i := len(vδT)-1; i >= 0; i-- {
δT := vδT[i]
δvalue, ok_ := δT.KV[key]
if ok_ {
valueExact = true
......@@ -931,21 +997,23 @@ func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value,
// Only tracked keys are guaranteed to be present.
//
// Note: contrary to regular go slicing, low is exclusive while high is inclusive.
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonly*/[]ΔTree {
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readonly*/vδT []ΔTree) {
xtail.AssertSlice(δBtail, lo, hi)
// XXX locking
err := δBtail.rebuild1IfNeeded(root)
if err != nil {
panic(err) // XXX
if traceΔBtail {
tracefΔBtail("\nSlice root<%s> (@%s,@%s]\n", root, lo, hi)
defer func() {
tracefΔBtail("-> vδT(lo,hi]: %v\n", vδT)
}()
}
δTtail, ok := δBtail.byRoot[root]
if !ok {
return []ΔTree{}
// retrieve vδT snapshot that is rebuilt to take all previous Track requests into account
vδT, err := δBtail.vδTSnapForTracked(root)
if err != nil {
panic(err) // XXX
}
debugfΔBtail(" vδT: %v\n", vδT)
vδT := δTtail.vδT
l := len(vδT)
if l == 0 {
return nil
......@@ -963,16 +1031,255 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonl
for ; i >= 0 && vδT[i].Rev > lo; i-- {}
i++
// NOTE: no need to duplicate returned vδT slice because
// _ΔTtail.rebuild clones vδT before modifying it. This way the data we
// return to caller will stay unchanged even if rebuild is running
// simultaneously.
// NOTE: no need to duplicate returned vδT slice because vδT is
// modified via RCU: i.e. _ΔTtail.rebuild clones vδT before modifying it.
// This way the data we return to caller will stay unchanged even if
// rebuild is running simultaneously.
return vδT[i:j+1]
}
// ----------------------------------------
// ---- vδTBuild/vδTMerge (rebuild core) ----
// vδTBuild builds vδT from vδZ for root/tracked=trackNew.
//
// It returns:
//
// - vδT,
// - trackNew* - a superset of trackNew accounting that potentially more keys
// become tracked during the build process.
//
// NOTE ΔBtail calls vδTBuild(root, trackNew) to compute update for ΔTtail.vδT.
func vδTBuild(root zodb.Oid, trackNew blib.PPTreeSubSet, δZtail *zodb.ΔTail, db *zodb.DB) (vδT []ΔTree, trackNew_ blib.PPTreeSubSet, err error) {
defer xerr.Contextf(&err, "root<%s>: build vδT", root)
tracefΔBtail("\nvδTBuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
tracefΔBtail("trackNew: %v\n", trackNew)
if len(trackNew) == 0 {
return nil, nil, nil
}
trackNew = trackNew.Clone() // it will become trackNew*
// go backwards and compute vδT <- treediff(lo..hi/trackNew)
vδZ := δZtail.Data()
for {
δtkeycov := &blib.RangedKeySet{} // all keys coming into tracking set during this lo<-hi scan
trackNewCur := trackNew.Clone() // trackNew adjusted as of when going to i<- entry
for i := len(vδZ)-1; i>=0; i-- {
δZ := vδZ[i]
var atPrev zodb.Tid
if i > 0 {
atPrev = vδZ[i-1].Rev
} else {
atPrev = δZtail.Tail()
}
δkv, δtrackNew, δtkeycov_, err := vδTBuild1(atPrev, δZ, trackNewCur, db)
if err != nil {
return nil, nil, err
}
if len(δkv) > 0 {
δT := ΔTree{Rev: δZ.Rev, KV: δkv}
vδTMerge1Inplace(&vδT, δT)
}
trackNewCur.ApplyΔ(δtrackNew)
δtkeycov.UnionInplace(δtkeycov_)
}
// an iteration closer to tail may turn out to add a key to the tracking set.
// We have to recheck all entries newer that revision for changes to that key,
// for example:
//
// 8 5*
// / \ <- / \
// 2 8 2* 7
//
// here initial tracked set is 5*-2*. Going to earlier revision
// 2'th keycov range is widen from [-∞,5) to [-∞,7), so 5*-7 in
// later revision have to be rechecked because 7 was added into
// tracking set.
//
// Implement this via restarting from head and cycling until
// set of tracked keys does not grow anymore.
if δtkeycov.Empty() {
break
}
err := widenTrackNew(trackNew, δtkeycov, root, δZtail.Head(), db)
if err != nil {
return nil, nil, err
}
}
tracefΔBtail("-> vδT: %v\n", vδT)
tracefΔBtail("-> trackNew*: %v\n", trackNew)
return vδT, trackNew, nil
}
// vδTBuild1 builds δT for single δZ.
//
// δtrackNew/δtkeycov represents how trackNew changes when going through `atPrev <- δZ.Rev` .
func vδTBuild1(atPrev zodb.Tid, δZ zodb.ΔRevEntry, trackNew blib.PPTreeSubSet, db *zodb.DB) (δT map[Key]ΔValue, δtrackNew *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
defer xerr.Contextf(&err, "build1 %s<-%s", atPrev, δZ.Rev)
debugfΔBtail("\n build1 @%s <- @%s\n", atPrev, δZ.Rev)
debugfΔBtail(" δZ:\t%v\n", δZ.Changev)
debugfΔBtail(" trackNew: %v\n", trackNew)
defer func() {
debugfΔBtail("-> δT: %v\n", δT)
debugfΔBtail("-> δtrackNew: %v\n", δtrackNew)
debugfΔBtail("-> δtkeycov: %v\n", δtkeycov)
debugfΔBtail("\n\n")
}()
// NOTE: keep vvv in sync with ΔBtail._Update1
δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, trackNew)
// skip opening DB connections if there is no change to this tree
if len(δtopsByRoot) == 0 {
return nil, blib.NewΔPPTreeSubSet(), &blib.RangedKeySet{}, nil
}
if len(δtopsByRoot) != 1 {
panicf("BUG: δtopsByRoot has > 1 entries: %v\ntrackNew: %v\nδZ: %v", δtopsByRoot, trackNew, δZ)
}
var root zodb.Oid
var δtops setOid
for root_, δtops_ := range δtopsByRoot {
root = root_
δtops = δtops_
}
// open ZODB connection corresponding to "current" and "prev" states
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zconnPrev, err := db.Open(ctx, &zodb.ConnOptions{At: atPrev})
if err != nil {
return nil, nil, nil, err
}
zconnCurr, err := db.Open(ctx, &zodb.ConnOptions{At: δZ.Rev})
if err != nil {
return nil, nil, nil, err
}
// diff backwards curr -> prev
δT, δtrack, δtkeycov, err := treediff(ctx, root, δtops, δZTC, trackNew, zconnCurr, zconnPrev)
if err != nil {
return nil, nil, nil, err
}
debugfΔBtail(" -> root<%s> δkv*: %v δtrack*: %v δtkeycov*: %v\n", root, δT, δtrack, δtkeycov)
for k, δv := range δT {
// the diff was backward; vδT entries are with diff forward
δv.New, δv.Old = δv.Old, δv.New
δT[k] = δv
}
return δT, δtrack, δtkeycov, nil
}
// vδTMergeInplace merges vδTnew into vδT.
//
// δrevSet indicates set of new revisions created in vδT.
// vδT is modified inplace.
func vδTMergeInplace(pvδT *[]ΔTree, vδTnew []ΔTree) (δrevSet setTid) {
// TODO if needed: optimize to go through vδT and vδTnew sequentially
δrevSet = setTid{}
for _, δT := range vδTnew {
newRevEntry := vδTMerge1Inplace(pvδT, δT)
if newRevEntry {
δrevSet.Add(δT.Rev)
}
}
return δrevSet
}
// vδTMerge1Inplace merges one δT entry into vδT.
//
// newRevEntry indicates whether δT.Rev was not there before in vδT.
// vδT is modified inplace.
func vδTMerge1Inplace(pvδT *[]ΔTree, δT ΔTree) (newRevEntry bool) {
if len(δT.KV) == 0 {
return false // δT has no change
}
vδT := *pvδT
l := len(vδT)
j := sort.Search(l, func(k int) bool {
return δT.Rev <= vδT[k].Rev
})
if j == l || vδT[j].Rev != δT.Rev {
newRevEntry = true
δTcurr := ΔTree{Rev: δT.Rev, KV: map[Key]ΔValue{}}
// insert(@j, δTcurr)
vδT = append(vδT[:j],
append([]ΔTree{δTcurr},
vδT[j:]...)...)
}
δTcurr := vδT[j]
for k, δv := range δT.KV {
δv_, already := δTcurr.KV[k]
if already {
if δv != δv_ {
// TODO: return "conflict"
panicf("[%v] inconsistent δv:\nδTcurr: %v\nδT: %v", k, δTcurr, δT)
}
} else {
δTcurr.KV[k] = δv
}
}
*pvδT = vδT
return newRevEntry
}
// widenTrackNew widens trackNew to cover δtkeycov.
func widenTrackNew(trackNew blib.PPTreeSubSet, δtkeycov *blib.RangedKeySet, root zodb.Oid, at zodb.Tid, db *zodb.DB) (err error) {
defer xerr.Contextf(&err, "widenTrackNew root<%s> @%s +%s", root, at, δtkeycov)
debugfΔBtail("\n widenTrackNew %s @%s +%s", root, at, δtkeycov)
txn, ctx := transaction.New(context.TODO()) // TODO - merge in cancel via ctx arg
defer txn.Abort()
zhead, err := db.Open(ctx, &zodb.ConnOptions{At: at}); /*X*/ if err != nil { return err }
xtree, err := zgetNodeOrNil(ctx, zhead, root); /*X*/ if err != nil { return err }
if xtree == nil {
// root deleted -> root node covers [-∞,∞)
trackNew.AddPath([]zodb.Oid{root})
return nil
}
tree := xtree.(*Tree) // must succeed
top := &nodeInRange{prefix: nil, keycov: blib.KeyRange{KeyMin, KeyMax}, node: tree}
V := rangeSplit{top}
for _, r := range δtkeycov.AllRanges() {
lo := r.Lo
for {
b, err := V.GetToLeaf(ctx, lo); /*X*/ if err != nil { return err }
trackNew.AddPath(b.Path())
// continue with next right bucket until r coverage is complete
if r.Hi_ <= b.keycov.Hi_ {
break
}
lo = b.keycov.Hi_ + 1
}
}
return nil
}
// ----------------------------------------
// ΔZtail returns raw ZODB changes accumulated in δBtail so far.
//
......
......@@ -703,7 +703,8 @@ func xverifyΔBTail_Update1(t *testing.T, subj string, db *zodb.DB, treeRoot zod
}
}
ø := blib.PPTreeSubSet{}
ø := blib.PPTreeSubSet{}
:= &blib.RangedKeySet{}
// trackSet1 = xkv1[tracked1]
// trackSet2 = xkv2[tracked2] ( = xkv2[kadj[tracked1]]
......@@ -711,7 +712,7 @@ func xverifyΔBTail_Update1(t *testing.T, subj string, db *zodb.DB, treeRoot zod
trackSet2, tkeyCov2 := trackSetWithCov(t2.Xkv, initialTrackedKeys.Union(kadjTrackedδZ))
// verify δbtail.trackSet against @at1
δbtail.assertTrack(t, "1", ø, trackSet1)
δbtail.assertTrack(t, "1", ø, trackSet1, tkeyCov1)
// δB <- δZ
//
......@@ -756,7 +757,7 @@ func xverifyΔBTail_Update1(t *testing.T, subj string, db *zodb.DB, treeRoot zod
}
// verify δbtail.trackSet against @at2
δbtail.assertTrack(t, "2", trackSet2, ø)
δbtail.assertTrack(t, "2", trackSet2, ø, )
// assert δB.ByRoot == {treeRoot -> ...} if δTok != ø
......@@ -838,7 +839,8 @@ func xverifyΔBTail_rebuild(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, t0, t1
// kadj210 = kadj10·kadj21
kadj210 := kadj10.Mul(kadj21)
ø := blib.PPTreeSubSet{}
ø := blib.PPTreeSubSet{}
:= &blib.RangedKeySet{}
// verify t0 -> t1 Track(keys1) Rebuild -> t2 Track(keys2) Rebuild
// for all combinations of keys1 and keys2
......@@ -858,14 +860,14 @@ func xverifyΔBTail_rebuild(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, t0, t1
}
}
Tkeys1 := trackSet(t1.Xkv, keys1)
Tkeys1_0 := trackSet(t1.Xkv, keys1_0)
Tkeys1, kTkeys1 := trackSetWithCov(t1.Xkv, keys1)
Tkeys1_0 := trackSet(t1.Xkv, keys1_0)
t.Run(fmt.Sprintf(" T%s;R", keys1), func(t *testing.T) {
δbtail := NewΔBtail(t0.At, db)
// assert trackSet=ø, trackNew=ø, vδB=[]
δbtail.assertTrack(t, "@at0", ø, ø)
δbtail.assertTrack(t, "@at0", ø, ø, )
assertΔTtail(t, "@at0", δbtail, t0, treeRoot,
/*vδT=ø*/)
......@@ -875,8 +877,9 @@ func xverifyΔBTail_rebuild(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, t0, t1
xverifyΔBTail_rebuild_TR(t, δbtail, t1, treeRoot,
// after Track(keys1)
keys1,
/*trackSet=*/ ø,
/*trackNew=*/ Tkeys1,
/*trackSet=*/ ø,
/*trackNew=*/ Tkeys1,
/*ktrackNew=*/ kTkeys1,
// after rebuild
/*trackSet=*/ Tkeys1_0,
......@@ -908,7 +911,7 @@ func xverifyΔBTail_rebuild(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, t0, t1
}
}
Tkeys1R2 := trackSet(t2.Xkv, keys1R2)
Tkeys1R2, kTkeys1R2 := trackSetWithCov(t2.Xkv, keys1R2)
xverifyΔBTail_rebuild_U(t, δbtail, treeRoot, t1, t2,
/*trackSet=*/ Tkeys1R2,
......@@ -943,8 +946,8 @@ func xverifyΔBTail_rebuild(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, t0, t1
keys12R2 = keys12R2_
}
Tkeys2 := trackSet(t2.Xkv, keys2)
Tkeys12R2 := trackSet(t2.Xkv, keys12R2)
Tkeys2, kTkeys2 := trackSetWithCov(t2.Xkv, keys2)
Tkeys12R2 := trackSet(t2.Xkv, keys12R2)
/*
fmt.Printf("\n\n\nKKK\nkeys1=%s keys2=%s\n", keys1, keys2)
fmt.Printf("keys1R2: %s\n", keys1R2)
......@@ -987,6 +990,9 @@ func xverifyΔBTail_rebuild(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, t0, t1
// trackNew should not cover ranges that are
// already in trackSet
Tkeys1R2),
/*ktrackNew*/ kTkeys2.Difference(
// see ^^^ about trackNew
kTkeys1R2),
// after rebuild
/* trackSet=*/ Tkeys12R2,
......@@ -1004,12 +1010,13 @@ func xverifyΔBTail_rebuild_U(t *testing.T, δbtail *ΔBtail, treeRoot zodb.Oid,
t.Helper()
X := exc.Raiseif
ø := blib.PPTreeSubSet{}
:= &blib.RangedKeySet{}
subj := fmt.Sprintf("after Update(@%s→@%s)", ti.AtSymb(), tj.AtSymb())
// Update ati -> atj
δB, err := δbtail.Update(tj.ΔZ); X(err)
δbtail.assertTrack(t, subj, trackSet, ø)
δbtail.assertTrack(t, subj, trackSet, ø, )
assertΔTtail(t, subj, δbtail, tj, treeRoot, vδTok...)
// assert δB = vδTok[-1]
......@@ -1045,20 +1052,21 @@ func xverifyΔBTail_rebuild_U(t *testing.T, δbtail *ΔBtail, treeRoot zodb.Oid,
}
// xverifyΔBTail_rebuild_TR verifies ΔBtail state after Track(keys) + rebuild.
func xverifyΔBTail_rebuild_TR(t *testing.T, δbtail *ΔBtail, tj *xbtreetest.Commit, treeRoot zodb.Oid, keys setKey, trackSet blib.PPTreeSubSet, trackNew, trackSetAfterRebuild blib.PPTreeSubSet, vδTok ...map[Key]Δstring) {
func xverifyΔBTail_rebuild_TR(t *testing.T, δbtail *ΔBtail, tj *xbtreetest.Commit, treeRoot zodb.Oid, keys setKey, trackSet, trackNew blib.PPTreeSubSet, ktrackNew *blib.RangedKeySet, trackSetAfterRebuild blib.PPTreeSubSet, vδTok ...map[Key]Δstring) {
t.Helper()
ø := blib.PPTreeSubSet{}
:= &blib.RangedKeySet{}
// Track(keys)
trackKeys(δbtail, tj, keys)
subj := fmt.Sprintf("@%s: after Track%v", tj.AtSymb(), keys)
δbtail.assertTrack(t, subj, trackSet, trackNew)
δbtail.assertTrack(t, subj, trackSet, trackNew, ktrackNew)
δbtail.rebuildAll()
δbtail._rebuildAll()
subj += " + rebuild"
δbtail.assertTrack(t, subj, trackSetAfterRebuild, ø)
δbtail.assertTrack(t, subj, trackSetAfterRebuild, ø, )
// verify δbtail.byRoot[treeRoot]
assertΔTtail(t, subj, δbtail, tj, treeRoot, vδTok...)
......@@ -1209,7 +1217,7 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
// track 2 + rebuild.
_2 := setKey{}; _2.Add(2)
trackKeys(δbtail, t2, _2)
err = δbtail.rebuildAll(); X(err)
err = δbtail._rebuildAll(); X(err)
δttail := δbtail.byRoot[t.Root()]
......@@ -1276,7 +1284,7 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
// after track 1 + rebuild old slices remain unchanged, but new queries return updated data
_1 := setKey{}; _1.Add(1)
trackKeys(δbtail, t2, _1)
err = δbtail.rebuildAll(); X(err)
err = δbtail._rebuildAll(); X(err)
s00_ := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01_ := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
......@@ -1341,7 +1349,7 @@ func TestΔBtailClone(t_ *testing.T) {
_, err := δbtail.Update(t1.ΔZ); X(err)
_2 := setKey{}; _2.Add(2)
trackKeys(δbtail, t1, _2)
err = δbtail.rebuildAll(); X(err)
err = δbtail._rebuildAll(); X(err)
δkv1_1 := map[Key]Δstring{2:{"b","d"}}
assertΔTtail(t.T, "orig @at1", δbtail, t1, t.Root(), δkv1_1)
......@@ -1357,6 +1365,96 @@ func TestΔBtailClone(t_ *testing.T) {
assertΔTtail(t.T, "klon @at1 after orig @at->@at2", δbklon, t1, t.Root(), δkv1_1)
}
// -------- vδTMerge --------
func TestVδTMerge(t *testing.T) {
vδTMerge1 := func(vδT []ΔTree, δT ΔTree) (_ []ΔTree, newRevEntry bool) {
vδT = vδTClone(vδT)
newRevEntry = vδTMerge1Inplace(&vδT, δT)
return vδT, newRevEntry
}
vδTMerge := func(vδT, vδTnew []ΔTree) (_ []ΔTree, δrevSet setTid) {
vδT = vδTClone(vδT)
δrevSet = vδTMergeInplace(&vδT, vδTnew)
return vδT, δrevSet
}
assertMerge1 := func(vδT []ΔTree, δT ΔTree, newRevEntryOK bool, mergeOK []ΔTree) {
t.Helper()
merge, newRevEntry := vδTMerge1(vδT, δT)
if !(reflect.DeepEqual(merge, mergeOK) && newRevEntry == newRevEntryOK) {
t.Errorf("merge1 %v + %v:\nhave: %v %t\nwant: %v %t",
vδT, δT, merge, newRevEntry, mergeOK, newRevEntryOK)
}
}
assertMerge := func(vδT, vδTnew []ΔTree, δrevSetOK setTid, mergeOK []ΔTree) {
t.Helper()
merge, δrevSet := vδTMerge(vδT, vδTnew)
if !(reflect.DeepEqual(merge, mergeOK) && δrevSet.Equal(δrevSetOK)) {
t.Errorf("merge %v + %v:\nhave: %v %v\nwant: %v %v",
vδT, vδTnew, merge, δrevSet, mergeOK, δrevSetOK)
}
}
// syntax sugar
type Δ = ΔTree
type δ = map[Key]ΔValue
v := func(vδT ...Δ) []Δ {
return vδT
}
r := func(tidv ...zodb.Tid) setTid {
s := setTid{}
for _, tid := range tidv {
s.Add(tid)
}
return s
}
δ1 := δ{1:{1,2}}
δ2 := δ{2:{2,3}}; δ22 := δ{22:{0,1}}; δ2_22 := δ{2:{2,3}, 22:{0,1}}
δ3 := δ{3:{3,4}}
δ4 := δ{4:{4,5}}; δ44 := δ{44:{0,1}}; δ4_44 := δ{4:{4,5}, 44:{0,1}}
δ5 := δ{5:{5,6}}
δ12 := δ{1:{1,2}, 2:{2,3}}
Δ101 := Δ{0x101, δ1}
Δ102 := Δ{0x102, δ2}
Δ103 := Δ{0x103, δ3}
Δ104 := Δ{0x104, δ4}
Δ105 := Δ{0x105, δ5}
// merge1
assertMerge1(nil, Δ{0x100, nil}, false,
nil)
assertMerge1(v(Δ{0x100, δ1}), Δ{0x100, δ2}, false,
v(Δ{0x100, δ12}))
assertMerge1(v(Δ101, Δ103), Δ102, true,
v(Δ101, Δ102, Δ103))
assertMerge1(v(Δ101, Δ102), Δ103, true,
v(Δ101, Δ102, Δ103))
assertMerge1(v(Δ102, Δ103), Δ101, true,
v(Δ101, Δ102, Δ103))
// merge
assertMerge(nil, nil, r(),
nil)
assertMerge(nil, v(Δ101, Δ103), r(0x101, 0x103),
v(Δ101, Δ103))
assertMerge(v(Δ101, Δ103), nil, r(),
v(Δ101, Δ103))
assertMerge(v(Δ102, Δ104), v(Δ101, Δ{0x102, δ22}, Δ103, Δ{0x104, δ44}, Δ105),
r(0x101, 0x103, 0x105),
v(Δ101, Δ{0x102, δ2_22}, Δ103, Δ{0x104, δ4_44}, Δ105))
}
// -------- KAdj --------
......@@ -1589,9 +1687,9 @@ func assertΔTtail(t *testing.T, subj string, δbtail *ΔBtail, tj *xbtreetest.C
}
}
// assertTrack verifies state of .trackSet and ΔTtail.trackNew.
// assertTrack verifies state of .trackSet and ΔTtail.(k)trackNew.
// it assumes that only one tree root is being tracked.
func (δBtail *ΔBtail) assertTrack(t *testing.T, subj string, trackSetOK blib.PPTreeSubSet, trackNewOK blib.PPTreeSubSet) {
func (δBtail *ΔBtail) assertTrack(t *testing.T, subj string, trackSetOK blib.PPTreeSubSet, trackNewOK blib.PPTreeSubSet, ktrackNewOK *blib.RangedKeySet) {
t.Helper()
if !δBtail.trackSet.Equal(trackSetOK) {
t.Errorf("%s: trackSet:\n\thave: %v\n\twant: %v", subj, δBtail.trackSet, trackSetOK)
......@@ -1602,8 +1700,14 @@ func (δBtail *ΔBtail) assertTrack(t *testing.T, subj string, trackSetOK blib.P
roots.Add(root)
}
tEmpty := trackNewOK.Empty()
kEmpty := ktrackNewOK.Empty()
if tEmpty != kEmpty {
t.Errorf("BUG: %s: empty(trackNewOK) != empty(ktrackNewOK)", subj)
return
}
nrootsOK := 1
if trackSetOK.Empty() && trackNewOK.Empty() {
if trackSetOK.Empty() && tEmpty {
nrootsOK = 0
}
if len(roots) != nrootsOK {
......@@ -1630,6 +1734,9 @@ func (δBtail *ΔBtail) assertTrack(t *testing.T, subj string, trackSetOK blib.P
if !δTtail.trackNew.Equal(trackNewOK) {
t.Errorf("%s: vδT.trackNew:\n\thave: %v\n\twant: %v", subj, δTtail.trackNew, trackNewOK)
}
if !δTtail.ktrackNew.Equal(ktrackNewOK) {
t.Errorf("%s: vδT.ktrackNew:\n\thave: %v\n\twant: %v", subj, δTtail.ktrackNew, ktrackNewOK)
}
}
// trackSet returns what should be ΔBtail.trackSet coverage for specified tracked key set.
......@@ -1678,7 +1785,7 @@ func trackKeys(δbtail *ΔBtail, t *xbtreetest.Commit, keys setKey) {
// tracking set. By aligning initial state to the same as after
// T1->ø, we test what will happen on ø->T2.
b := t.Xkv.Get(k)
δbtail.track(k, b.Path())
δbtail.track(b.Path(), b.Keycov)
}
}
......
......@@ -412,45 +412,48 @@ func (bf *ZBigFile) BlkSize() int64 {
// it also returns:
//
// - BTree path in .blktab to loaded block,
// - blocks covered by leaf node in the BTree path,
// - max(_.serial for _ in ZBlk(#blk), all BTree/Bucket that lead to ZBlk)
// which provides a rough upper-bound estimate for file[blk] revision.
//
// TODO load into user-provided buf.
func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath []btree.LONode, zblk ZBlk, blkRevMax zodb.Tid, err error) {
func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath []btree.LONode, blkCov btree.LKeyRange, zblk ZBlk, blkRevMax zodb.Tid, err error) {
defer xerr.Contextf(&err, "bigfile %s: loadblk %d", bf.POid(), blk)
:= btree.LKeyRange{Lo: 0, Hi_: -1} // empty KeyRange
err = bf.PActivate(ctx)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, , nil, 0, err
}
defer bf.PDeactivate()
blkRevMax = 0
xzblk, ok, err := bf.blktab.VGet(ctx, blk, func(node btree.LONode) {
xzblk, ok, err := bf.blktab.VGet(ctx, blk, func(node btree.LONode, keycov btree.LKeyRange) {
treePath = append(treePath, node)
blkCov = keycov // will be set last for leaf
blkRevMax = tidmax(blkRevMax, node.PSerial())
})
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, , nil, 0, err
}
if !ok {
return make([]byte, bf.blksize), treePath, nil, blkRevMax, nil
return make([]byte, bf.blksize), treePath, blkCov, nil, blkRevMax, nil
}
zblk, err = vZBlk(xzblk)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, , nil, 0, err
}
blkdata, zblkrev, err := zblk.LoadBlkData(ctx)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, , nil, 0, err
}
blkRevMax = tidmax(blkRevMax, zblkrev)
l := int64(len(blkdata))
if l > bf.blksize {
return nil, nil, nil, 0, fmt.Errorf("zblk %s: invalid blk: size = %d (> blksize = %d)", zblk.POid(), l, bf.blksize)
return nil, nil, , nil, 0, fmt.Errorf("zblk %s: invalid blk: size = %d (> blksize = %d)", zblk.POid(), l, bf.blksize)
}
// append trailing \0 to data to reach .blksize
......@@ -460,37 +463,39 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
blkdata = d
}
return blkdata, treePath, zblk, blkRevMax, nil
return blkdata, treePath, blkCov, zblk, blkRevMax, nil
}
// Size returns whole file size.
//
// it also returns BTree path scanned to obtain the size.
func (bf *ZBigFile) Size(ctx context.Context) (_ int64, treePath []btree.LONode, err error) {
func (bf *ZBigFile) Size(ctx context.Context) (_ int64, treePath []btree.LONode, blkCov btree.LKeyRange, err error) {
defer xerr.Contextf(&err, "bigfile %s: size", bf.POid())
:= btree.LKeyRange{Lo: 0, Hi_: -1} // empty KeyRange
err = bf.PActivate(ctx)
if err != nil {
return 0, nil, err
return 0, nil, , err
}
defer bf.PDeactivate()
tailblk, ok, err := bf.blktab.VMaxKey(ctx, func(node btree.LONode) {
tailblk, ok, err := bf.blktab.VMaxKey(ctx, func(node btree.LONode, keycov btree.LKeyRange) {
treePath = append(treePath, node)
blkCov = keycov // will be set last for leaf
})
if err != nil {
return 0, nil, err
return 0, nil, , err
}
if !ok {
return 0, treePath, nil
return 0, treePath, blkCov, nil
}
size := (tailblk + 1) * bf.blksize
if size / bf.blksize != tailblk + 1 {
return 0, nil, syscall.EFBIG // overflow
return 0, nil, , syscall.EFBIG // overflow
}
return size, treePath, nil
return size, treePath, blkCov, nil
}
// vZBlk checks and converts xzblk to a ZBlk object.
......
......@@ -110,17 +110,17 @@ func TestZBlk(t *testing.T) {
t.Fatalf("zf: [1] -> %#v; want z1", z1_)
}
size, _, err := zf.Size(ctx); X(err)
size, _, _, err := zf.Size(ctx); X(err)
assert.Equal(size, int64(zf_size), "ZBigFile size wrong")
// LoadBlk
z0Data, _, _, _, err = zf.LoadBlk(ctx, 1); X(err)
z0Data, _, _, _, _, err = zf.LoadBlk(ctx, 1); X(err)
assert.Equal(len(z0Data), int(zf.blksize))
z0Data = bytes.TrimRight(z0Data, "\x00")
assert.Equal(z0Data, z0DataOK)
z1Data, _, _, _, err = zf.LoadBlk(ctx, 3); X(err)
z1Data, _, _, _, _, err = zf.LoadBlk(ctx, 3); X(err)
assert.Equal(len(z1Data), int(zf.blksize))
z1Data = bytes.TrimRight(z1Data, "\x00")
assert.Equal(z1Data, z1DataOK)
......
......@@ -195,16 +195,16 @@ func (δFtail *ΔFtail) Tail() zodb.Tid { return δFtail.δBtail.Tail() }
// ---- Track/rebuild/Update/Forget ----
// Track associates file[blk]@head with tree path and zblk object.
// Track associates file[blk]@head with zblk object and file[blkcov]@head with tree path.
//
// Path root becomes associated with the file, and the path and zblk object become tracked.
// One root can be associated with several files (each provided on different Track calls).
//
// zblk can be nil, which represents a hole.
// blk=-1 should be used for tracking after ZBigFile.Size() query (no zblk is accessed at all).
// if zblk is nil -> blk is ignored and can be arbitrary.
//
// Objects in path and zblk must be with .PJar().At() == .head
func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zblk ZBlk) {
func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, blkcov btree.LKeyRange, zblk ZBlk) {
// XXX locking
head := δFtail.Head()
......@@ -222,10 +222,7 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb
// path.at == head is verified by ΔBtail.Track
foid := file.POid()
if blk == -1 {
blk = xbtree.KeyMax
}
δFtail.δBtail.Track(blk, path)
δFtail.δBtail.Track(path, blkcov)
rootObj := path[0].(*btree.LOBTree)
root := rootObj.POid()
......@@ -684,7 +681,10 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
//fmt.Printf("Zinblk: %v\n", Zinblk)
// vδT for current epoch
vδT := δFtail.δBtail.SliceByRootRev(root, epoch, head) // NOTE @head, not hi
var vδT []xbtree.ΔTree
if root != xbtree.VDEL {
vδT = δFtail.δBtail.SliceByRootRev(root, epoch, head) // NOTE @head, not hi
}
it := len(vδT) - 1
if it >= 0 {
ZinblkAt = vδT[it].Rev
......
......@@ -266,8 +266,8 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
// ( later retrackAll should be called after new epoch to track zfile[-∞,∞) again )
retrackAll := func() {
for blk := range blkTab {
_, path, zblk, _, err := zfile.LoadBlk(ctx, blk); X(err)
δFtail.Track(zfile, blk, path, zblk)
_, path, blkcov, zblk, _, err := zfile.LoadBlk(ctx, blk); X(err)
δFtail.Track(zfile, blk, path, blkcov, zblk)
}
}
retrackAll()
......@@ -614,8 +614,8 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
zfile, _ := t.XLoadZFile(ctx, zconn)
xtrackBlk := func(blk int64) {
_, path, zblk, _, err := zfile.LoadBlk(ctx, blk); X(err)
δFtail.Track(zfile, blk, path, zblk)
_, path, blkcov, zblk, _, err := zfile.LoadBlk(ctx, blk); X(err)
δFtail.Track(zfile, blk, path, blkcov, zblk)
}
// track 0, but do not track 1 and 2.
......
......@@ -963,14 +963,14 @@ retry:
zfile := file.zfile
// XXX need to do only if δfile.Size changed
size, sizePath, err := zfile.Size(ctx)
size, sizePath, blkCov, err := zfile.Size(ctx)
if err != nil {
return err
}
file.size = size
// see "3) for */head/data the following invariant is maintained..."
bfdir.δFtail.Track(zfile, -1, sizePath, nil)
bfdir.δFtail.Track(zfile, -1, sizePath, blkCov, nil)
// XXX we can miss a change to file if δblk is not yet tracked
// -> need to update file.rev at read time -> locking=XXX
......@@ -1283,7 +1283,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
}
// noone was loading - we became responsible to load this block
blkdata, treepath, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
blkdata, treepath, blkcov, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata
loading.err = err
......@@ -1298,7 +1298,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see readPinWatchers comments)
f.readPinWatchers(ctx, blk, treepath, zblk, blkrevMax)
f.readPinWatchers(ctx, blk, treepath, blkcov, zblk, blkrevMax)
// data can be used now
close(loading.ready)
......@@ -1516,7 +1516,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblk ZBlk, blkrevMax zodb.Tid) {
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btree.LONode, blkcov btree.LKeyRange, zblk ZBlk, blkrevMax zodb.Tid) {
// only head/ is being watched for
if f.head.rev != 0 {
return
......@@ -1531,7 +1531,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
bfdir := f.head.bfdir
δFtail := bfdir.δFtail
bfdir.δFmu.Lock() // XXX locking correct? XXX -> better push down?
δFtail.Track(f.zfile, blk, treepath, zblk) // XXX pass in zblk.rev here?
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk) // XXX pass in zblk.rev here?
f.accessed.Add(blk)
bfdir.δFmu.Unlock()
......@@ -2229,7 +2229,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
rev := zfile.PSerial()
zfile.PDeactivate()
size, sizePath, err := zfile.Size(ctx)
size, sizePath, blkCov, err := zfile.Size(ctx)
if err != nil {
return nil, err
}
......@@ -2248,7 +2248,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
if head.rev == 0 {
// see "3) for */head/data the following invariant is maintained..."
head.bfdir.δFmu.Lock() // XXX locking ok?
head.bfdir.δFtail.Track(f.zfile, -1, sizePath, nil)
head.bfdir.δFtail.Track(f.zfile, -1, sizePath, blkCov, nil)
head.bfdir.δFmu.Unlock()
// FIXME: scan zfile.blktab - so that we can detect all btree changes
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment