Commit d85bb82c authored by Kirill Smelkov's avatar Kirill Smelkov

ΔFtail concurrency

See changes in δftail.go for overview.

* t2+ΔFtail-concurrency: (39 commits)
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  X zdata: Switch SliceByFileRev not to clone Zinblk
  .
  .
  .
  .
  .
  ...
parents 54b623ba 3207c0ad
......@@ -137,31 +137,55 @@ func (t *T) Head() *Commit {
// XGetCommit finds and returns Commit created with revision at.
func (t *T) XGetCommit(at zodb.Tid) *Commit {
commit, _, _ := t.getCommit(at)
if commit == nil {
panicf("no commit corresponding to @%s", at)
}
return commit
}
func (t *T) getCommit(at zodb.Tid) (commit, cprev, cnext *Commit) {
l := len(t.commitv)
i := sort.Search(l, func(i int) bool {
return at <= t.commitv[i].At
})
var commit *Commit
if i < l {
commit = t.commitv[i]
if commit.At != at {
cnext = commit
commit = nil
} else if i+1 < l {
cnext = t.commitv[i+1]
}
}
if commit == nil {
panicf("no commit corresponding to @%s", at)
if i > 0 {
cprev = t.commitv[i-1]
}
if commit.idx != i {
if commit != nil && commit.idx != i {
panicf("BUG: commit.idx (%d) != i (%d)", commit.idx, i)
}
return commit
return commit, cprev, cnext
}
// AtSymb returns symbolic representation of at, for example "at3".
//
// at must correspond to a Commit.
// at should correspond to a Commit.
func (t *T) AtSymb(at zodb.Tid) string {
return t.XGetCommit(at).AtSymb()
commit, cprev, cnext := t.getCommit(at)
if commit != nil {
return commit.AtSymb()
}
// at does not correspond to commit - return something like ~at2<xxxx>at3
s := "~"
if cprev != nil {
s += cprev.AtSymb() + "<"
}
s += at.String()
if cnext != nil {
s += ">" + cnext.AtSymb()
}
return s
}
// AtSymb returns symbolic representation of c.At, for example "at3".
......
......@@ -69,7 +69,9 @@ package xbtree
// Concurrency
//
// In order to allow multiple Track and queries requests to be served in
// parallel ΔBtail employs special organization of vδT rebuild process:
// parallel ΔBtail employs special organization of vδT rebuild process where
// complexity of concurrency is reduced to math on merging updates to vδT and
// trackSet, and on key range lookup:
//
// 1. vδT is managed under read-copy-update (RCU) discipline: before making
// any vδT change the mutator atomically clones whole vδT and applies its
......@@ -117,13 +119,21 @@ package xbtree
//
// vδT/(T₁∪T₂) = vδT/T₁ | vδT/T₂
//
// i.e. vδT computed for tracked set being union of T₁ and T₂ is the same
// as merge of vδT computed for tracked set T₁ and vδT computed for tracked
// set T₂.
// ( i.e. vδT computed for tracked set being union of T₁ and T₂ is the
// same as merge of vδT computed for tracked set T₁ and vδT computed
// for tracked set T₂ )
//
// this merge property allows to run computation for δ(vδT) independently
// and with ΔBtail unlocked, which in turn enables running several
// Track/queries in parallel.
// and that
//
// trackSet | (δPP₁|δPP₂) = (trackSet|δPP₁) | (trackSet|δPP₂)
//
// ( i.e. tracking set updated for union of δPP₁ and δPP₂ is the same
// as union of tracking set updated with δPP₁ and tracking set updated
// with δPP₂ )
//
// these merge properties allow to run computation for δ(vδT) and δ(trackSet)
// independently and with ΔBtail unlocked, which in turn enables running
// several Track/queries in parallel.
//
// 4. while vδT rebuild is being run, krebuildJobs keeps corresponding keycov
// entry to indicate in-progress rebuild. Should a query need vδT for keys
......@@ -247,9 +257,9 @@ type _ΔTtail struct {
vδT []ΔTree
// set of keys that were requested to be tracked in this tree,
// but for which vδT rebuild was not yet started
// but for which vδT rebuild was not yet started as of @head
ktrackNew blib.RangedKeySet // {keycov}
// set of nodes corresponding to ktrackNew
// set of nodes corresponding to ktrackNew as of @head
trackNew blib.PPTreeSubSet // PP{nodes}
// set of keys(nodes) for which rebuild is in progress
......@@ -672,13 +682,13 @@ func (δTtail *_ΔTtail) __rebuild(root zodb.Oid, δBtail *ΔBtail, releaseLock
//
// TODO optionally accept zconnOld/zconnNew from client
func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid)
δBtail.mu.Lock()
defer δBtail.mu.Unlock()
// TODO verify that there is no in-progress readers/writers
headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid)
δB1, err := δBtail._Update1(δZ)
δB := ΔB{Rev: δZ.Tid, ByRoot: make(map[zodb.Oid]map[Key]ΔValue)}
......@@ -997,7 +1007,7 @@ func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value,
// Only tracked keys are guaranteed to be present.
//
// Note: contrary to regular go slicing, low is exclusive while high is inclusive.
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readonly*/vδT []ΔTree) {
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readonly*/vδT []ΔTree, err error) {
xtail.AssertSlice(δBtail, lo, hi)
if traceΔBtail {
......@@ -1008,22 +1018,22 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon
}
// retrieve vδT snapshot that is rebuilt to take all previous Track requests into account
vδT, err := δBtail.vδTSnapForTracked(root)
vδT, err = δBtail.vδTSnapForTracked(root)
if err != nil {
panic(err) // XXX
return nil, err
}
debugfΔBtail(" vδT: %v\n", vδT)
l := len(vδT)
if l == 0 {
return nil
return nil, nil
}
// find max j : [j].rev ≤ hi linear scan -> TODO binary search
j := l - 1
for ; j >= 0 && vδT[j].Rev > hi; j-- {}
if j < 0 {
return nil // ø
return nil, nil // ø
}
// find max i : [i].rev > lo linear scan -> TODO binary search
......@@ -1035,7 +1045,7 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon
// modified via RCU: i.e. _ΔTtail.rebuild clones vδT before modifying it.
// This way the data we return to caller will stay unchanged even if
// rebuild is running simultaneously.
return vδT[i:j+1]
return vδT[i:j+1], nil
}
......
......@@ -1246,11 +1246,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want)
}
s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
s12 := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At)
s22 := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At)
s00, err := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At); X(err)
s01, err := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At); X(err)
s02, err := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At); X(err)
s12, err := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At); X(err)
s22, err := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At); X(err)
vδT := δttail.vδT
assertvδT("t2.vδT", vδT, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}})
......@@ -1286,11 +1286,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
trackKeys(δbtail, t2, _1)
err = δbtail._rebuildAll(); X(err)
s00_ := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01_ := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02_ := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
s12_ := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At)
s22_ := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At)
s00_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At); X(err)
s01_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At); X(err)
s02_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At); X(err)
s12_, err := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At); X(err)
s22_, err := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At); X(err)
vδT = δttail.vδT
assertvδT("t12.vδT", vδT, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
......
This diff is collapsed.
......@@ -242,12 +242,12 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
// update vδf + co for t1
vδf = append(vδf, &ΔFile{Rev: t1.At, Epoch: true})
vδE = append(vδE, _ΔFileEpoch{
Rev: t1.At,
oldRoot: zodb.InvalidOid,
newRoot: t.Root(),
oldBlkSize: -1,
newBlkSize: blksize,
oldTrackSetZBlk: nil,
Rev: t1.At,
oldRoot: zodb.InvalidOid,
newRoot: t.Root(),
oldBlkSize: -1,
newBlkSize: blksize,
oldZinblk: nil,
})
epochv = append(epochv, t1.At)
for blk, zblk := range δt1 {
......@@ -305,7 +305,11 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
for blk, zblk := range test.δblkTab {
zprev, ok := blkTab[blk]
if ok {
delete(Zinblk[zprev], blk)
inblk := Zinblk[zprev]
inblk.Del(blk)
if len(inblk) == 0 {
delete(Zinblk, zprev)
}
} else {
zprev = ø
}
......@@ -423,12 +427,12 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
δE.oldBlkSize = -1
δE.newBlkSize = blksize
}
oldTrackSetZBlk := map[zodb.Oid]setI64{}
oldZinblk := map[zodb.Oid]setI64{}
for zblk, inblk := range ZinblkPrev {
oid, _ := commit.XGetBlkByName(zblk)
oldTrackSetZBlk[oid] = inblk
oldZinblk[oid] = inblk
}
δE.oldTrackSetZBlk = oldTrackSetZBlk
δE.oldZinblk = oldZinblk
vδE = append(vδE, δE)
}
......@@ -445,26 +449,60 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
retrackAll()
}
// verify δFtail.trackSetZBlk
trackZinblk := map[string]setI64{}
for oid, zt := range δFtail.trackSetZBlk {
zblki := commit.ZBlkTab[oid]
for root, blocks := range zt.inroot {
if root != t.Root() {
t.Errorf(".trackSetZBlk: zblk %s points to unexpected blktab %s", zblki.Name, t.Root())
continue
}
// verify byRoot
trackRfiles := map[zodb.Oid]setOid{}
for root, rt := range δFtail.byRoot {
trackRfiles[root] = rt.ftrackSet
}
filesOK := setOid{}
if !delfile {
filesOK.Add(foid)
}
RfilesOK := map[zodb.Oid]setOid{}
if len(filesOK) != 0 {
RfilesOK[t.Root()] = filesOK
}
if !reflect.DeepEqual(trackRfiles, RfilesOK) {
t.Errorf("Rfiles:\nhave: %v\nwant: %v", trackRfiles, RfilesOK)
}
inblk, ok := trackZinblk[zblki.Name]
if !ok {
inblk = setI64{}
trackZinblk[zblki.Name] = inblk
// verify Zinroot
trackZinroot := map[string]setOid{}
for zoid, inroot := range δFtail.ztrackInRoot {
zblki := commit.ZBlkTab[zoid]
trackZinroot[zblki.Name] = inroot.Clone() // XXX clone needed?
}
Zinroot := map[string]setOid{}
for zblk := range Zinblk {
inroot := setOid{}; inroot.Add(t.Root())
Zinroot[zblk] = inroot
}
if !reflect.DeepEqual(trackZinroot, Zinroot) {
t.Errorf("Zinroot:\nhave: %v\nwant: %v", trackZinroot, Zinroot)
}
// verify Zinblk
trackZinblk := map[string]setI64{}
switch {
case len(δFtail.byRoot) == 0:
// ok
case len(δFtail.byRoot) == 1:
rt, ok := δFtail.byRoot[t.Root()]
if !ok {
t.Errorf(".byRoot points to unexpected blktab")
} else {
for zoid, inblk := range rt.ztrackInBlk {
zblki := commit.ZBlkTab[zoid]
trackZinblk[zblki.Name] = inblk.Clone() // XXX clone needed?
}
inblk.Update(blocks)
}
default:
t.Errorf("len(.byRoot) != (0,1) ; byRoot: %v", δFtail.byRoot)
}
if !reflect.DeepEqual(trackZinblk, Zinblk) {
t.Errorf(".trackSetZBlk:\n~have: %v\n want: %v", trackZinblk, Zinblk)
t.Errorf("Zinblk:\nhave: %v\nwant: %v", trackZinblk, Zinblk)
}
// ForgetPast configured threshold
......@@ -485,16 +523,6 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
vδE = vδE[icut:]
}
// verify δFtail.filesByRoot
filesByRootOK := map[zodb.Oid]setOid{}
if !delfile {
__ := setOid{}; __.Add(foid)
filesByRootOK[t.Root()] = __
}
if !reflect.DeepEqual(δFtail.filesByRoot, filesByRootOK) {
t.Errorf("filesByRoot:\nhave: %v\nwant: %v", δFtail.filesByRoot, filesByRootOK)
}
// verify δftail.root
δftail := δFtail.byFile[foid]
rootOK := t.Root()
......@@ -523,7 +551,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
hi := vδf[k].Rev
vδf_ok := vδf[j:k+1] // [j,k]
vδf_ := δFtail.SliceByFileRev(zfile, lo, hi)
vδf_, err := δFtail.SliceByFileRev(zfile, lo, hi); X(err)
if !reflect.DeepEqual(vδf_, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf_), t.vδfstr(vδf_ok))
}
......@@ -547,7 +575,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
at := vδf[j].Rev
blkRev := blkRevAt[at]
for _, blk := range blkv {
rev, exact := δFtail.BlkRevAt(ctx, zfile, blk, at)
rev, exact, err := δFtail.BlkRevAt(ctx, zfile, blk, at); X(err)
revOK, ok := blkRev[blk]
if !ok {
k := len(epochv) - 1
......@@ -626,7 +654,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at1, at4] -> changes to both 0 and 1, because they both are changed in the same bucket @at2
lo := t1.At
hi := t4.At
vδf := δFtail.SliceByFileRev(zfile, lo, hi)
vδf, err := δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok := []*ΔFile{
&ΔFile{Rev: t2.At, Blocks: b(0,1), Size: true},
&ΔFile{Rev: t3.At, Blocks: b(0,1), Size: false},
......@@ -639,7 +667,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at2, at4] -> changes to only 0, because there is no change to 2 via blktab
lo = t2.At
vδf = δFtail.SliceByFileRev(zfile, lo, hi)
vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile{
&ΔFile{Rev: t3.At, Blocks: b(0), Size: false},
}
......@@ -649,7 +677,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at3, at4] -> changes to only 0, ----/----
lo = t3.At
vδf = δFtail.SliceByFileRev(zfile, lo, hi)
vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile(nil)
if !reflect.DeepEqual(vδf, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok))
......
......@@ -1085,22 +1085,29 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
//
// if we have the data - preserve it under @revX/bigfile/file[blk].
if int64(len(blkdata)) == blksize {
func() {
err := func() error {
// store retrieved data back to OS cache for file @<rev>/file[blk]
δFtail := f.head.bfdir.δFtail
blkrev, _ := δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
blkrev, _, err := δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
if err != nil {
return err
}
frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid())
if err != nil {
log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
return
return fmt.Errorf("BUG: %s", err)
}
defer funlock()
st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
if st != fuse.OK {
log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path(), st)
return fmt.Errorf("BUG: %s: store cache: %s", frev.path(), st)
}
return nil
}()
if err != nil {
log.Errorf("%s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
}
}
// invalidate file/head/data[blk] in OS file cache.
......@@ -1566,7 +1573,11 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// we'll relock atMu again and recheck blkrev vs w.at after.
w.atMu.RUnlock()
blkrev, _ = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
var err error
blkrev, _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
if err != nil {
panic(err) // XXX
}
blkrevRough = false
w.atMu.RLock()
......@@ -1582,8 +1593,11 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
pinrev, _ := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go?
pinrev, _, err := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go?
// XXX ^^^ w.file vs f ?
if err != nil {
panic(err) // XXX
}
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
wg.Go(func(ctx context.Context) error {
......@@ -1728,7 +1742,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
toPin := map[int64]zodb.Tid{} // blk -> @rev
δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail
vδf, err := δFtail.SliceByFileRev(f.zfile, at, headAt) // XXX locking δFtail
if err != nil {
panic(err) // XXX
}
for _, δfile := range vδf {
if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of
......@@ -1764,7 +1782,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
continue
}
toPin[blk], _ = δFtail.BlkRevAt(ctx, f.zfile, blk, at) // XXX err
toPin[blk], _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, at)
if err != nil {
panic(err) // XXX
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment