Commit bf3ace66 authored by Kirill Smelkov's avatar Kirill Smelkov

X ΔFtail: Rebuild vδE after first track

This is needed to detect where the file was created and indicate with
δF.Epoch=true correspondingly.
parent a508aed8
...@@ -83,7 +83,10 @@ type ΔFtail struct { ...@@ -83,7 +83,10 @@ type ΔFtail struct {
// ΔFtail merges ΔBtail with history of ZBlk // ΔFtail merges ΔBtail with history of ZBlk
δBtail *xbtree.ΔBtail δBtail *xbtree.ΔBtail
fileIdx map[zodb.Oid]setOid // tree-root -> {} ZBigFile<oid> as of @head XXX -> root2file ? fileIdx map[zodb.Oid]setOid // tree-root -> {} ZBigFile<oid> as of @head XXX -> root2file ?
byFile map[zodb.Oid]*_ΔFileTail // file -> δf tail XXX byFile map[zodb.Oid]*_ΔFileTail // file -> vδf tail XXX
// set of files, which are newly tracked and for which vδE was not yet rebuilt
trackNew setOid // {}foid
trackSetZBlk map[zodb.Oid]*zblkTrack // zblk -> {} root -> {}blk as of @head trackSetZBlk map[zodb.Oid]*zblkTrack // zblk -> {} root -> {}blk as of @head
} }
...@@ -137,6 +140,7 @@ func NewΔFtail(at0 zodb.Tid, db *zodb.DB) *ΔFtail { ...@@ -137,6 +140,7 @@ func NewΔFtail(at0 zodb.Tid, db *zodb.DB) *ΔFtail {
δBtail: xbtree.NewΔBtail(at0, db), δBtail: xbtree.NewΔBtail(at0, db),
fileIdx: map[zodb.Oid]setOid{}, fileIdx: map[zodb.Oid]setOid{},
byFile: map[zodb.Oid]*_ΔFileTail{}, byFile: map[zodb.Oid]*_ΔFileTail{},
trackNew: setOid{},
trackSetZBlk: map[zodb.Oid]*zblkTrack{}, trackSetZBlk: map[zodb.Oid]*zblkTrack{},
} }
} }
...@@ -184,6 +188,7 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb ...@@ -184,6 +188,7 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb
if !ok { if !ok {
δftail = &_ΔFileTail{root: roid, vδE: nil /*will need to be rebuilt till past*/} δftail = &_ΔFileTail{root: roid, vδE: nil /*will need to be rebuilt till past*/}
δFtail.byFile[foid] = δftail δFtail.byFile[foid] = δftail
δFtail.trackNew.Add(foid)
} }
if δftail.root != roid { if δftail.root != roid {
panicf("zfile<%s> changed root from %s -> %s", foid, δftail.root, roid) panicf("zfile<%s> changed root from %s -> %s", foid, δftail.root, roid)
...@@ -211,6 +216,90 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb ...@@ -211,6 +216,90 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb
} }
} }
// rebuildAll rebuilds vδE for all files from trackNew requests.
func (δFtail *ΔFtail) rebuildAll() (err error) {
defer xerr.Contextf(&err, "ΔFtail rebuildAll")
// XXX locking
for foid := range δFtail.trackNew {
delete(δFtail.trackNew, foid)
δftail := δFtail.byFile[foid]
δBtail := δFtail.δBtail
err := δftail.rebuild1(foid, δBtail.ΔZtail(), δBtail.DB())
if err != nil {
return err
}
}
return nil
}
// rebuildIfNeeded rebuilds vδE if there is such need.
//
// it returns corresponding δftail for convenience.
// the only case when vδE actually needs to be rebuilt is when the file just started to be tracked.
func (δFtail *ΔFtail) rebuildIfNeeded(foid zodb.Oid) (_ *_ΔFileTail, err error) {
// XXX locking
δftail := δFtail.byFile[foid]
if δftail.vδE != nil {
err = nil
} else {
δBtail := δFtail.δBtail
err = δftail.rebuild1(foid, δBtail.ΔZtail(), δBtail.DB())
}
return δftail, err
}
// rebuild rebuilds vδE.
func (δftail *_ΔFileTail) rebuild1(foid zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (err error) {
defer xerr.Contextf(&err, "file<%s>: rebuild", foid)
// XXX locking
if δftail.vδE != nil {
panic("rebuild: vδE != nil")
}
vδE := []_ΔFileEpoch{}
vδZ := δZtail.Data()
atPrev := δZtail.Tail()
for i := 0; i < len(vδZ); i++ {
δZ := vδZ[i]
fchanged := false
for _, oid := range δZ.Changev {
if oid == foid {
fchanged = true
break
}
}
if !fchanged {
continue
}
δ, err := zfilediff(db, foid, atPrev, δZ.Rev)
if err != nil {
return err
}
if δ != nil {
δE := _ΔFileEpoch{
Rev: δZ.Rev,
oldRoot: δ.blktabOld,
newRoot: δ.blktabNew,
newBlkSize: δ.blksizeNew,
oldTrackSetZBlk: nil, // nothing was tracked
}
vδE = append(vδE, δE)
}
atPrev = δZ.Rev
}
δftail.vδE = vδE
return nil
}
// Update updates δFtail given raw ZODB changes. // Update updates δFtail given raw ZODB changes.
// //
// It returns change in files space that corresponds to δZ. // It returns change in files space that corresponds to δZ.
...@@ -228,6 +317,12 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) { ...@@ -228,6 +317,12 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
// XXX verify zhead.At() == δFtail.Head() // XXX verify zhead.At() == δFtail.Head()
// XXX locking // XXX locking
// rebuild vδE for newly tracked files
err = δFtail.rebuildAll()
if err != nil {
return ΔF{}, err
}
headOld := δFtail.Head() headOld := δFtail.Head()
δB, err := δFtail.δBtail.Update(δZ) δB, err := δFtail.δBtail.Update(δZ)
...@@ -466,7 +561,10 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -466,7 +561,10 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
return δfTail return δfTail
} }
δftail := δFtail.byFile[zfile.POid()] δftail, err := δFtail.rebuildIfNeeded(zfile.POid())
if err != nil {
panic(err) // XXX
}
vδZ := δFtail.δBtail.ΔZtail().SliceByRev(lo, hi) vδZ := δFtail.δBtail.ΔZtail().SliceByRev(lo, hi)
iz := len(vδZ) - 1 iz := len(vδZ) - 1
...@@ -631,9 +729,11 @@ func (δFtail *ΔFtail) _LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64 ...@@ -631,9 +729,11 @@ func (δFtail *ΔFtail) _LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64
//fmt.Printf("\nblkrev #%d @%s\n", blk, at) //fmt.Printf("\nblkrev #%d @%s\n", blk, at)
// XXX locking // XXX locking
// XXX rebuild
δftail := δFtail.byFile[zf.POid()] δftail, err := δFtail.rebuildIfNeeded(zf.POid())
if err != nil {
return zodb.InvalidTid, false, err
}
// find epoch that covers at and associated blktab root/object // find epoch that covers at and associated blktab root/object
vδE := δftail.vδE vδE := δftail.vδE
......
...@@ -100,8 +100,6 @@ func TestΔFtail(t *testing.T) { ...@@ -100,8 +100,6 @@ func TestΔFtail(t *testing.T) {
// XXX text // XXX text
{nil, nil}, {nil, nil},
// XXX more
// ---- found by TestΔFtailRandom ---- // ---- found by TestΔFtailRandom ----
{δT{1:a,6:i,7:d,8:e}, δD(a,c,e,f,g,h,i,j)}, {δT{1:a,6:i,7:d,8:e}, δD(a,c,e,f,g,h,i,j)},
...@@ -177,11 +175,30 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -177,11 +175,30 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
t := xbtreetest.NewT(t_) t := xbtreetest.NewT(t_)
X := exc.Raiseif X := exc.Raiseif
// XXX start δFtail when zfile is not yet exists xat := map[zodb.Tid]string{} // tid > "at<i>"
// this way we'll verify how ΔFtail rebuilds vδE for started-to-be-tracked file
// start δFtail when zfile does not yet exists
// this way we'll verify how ΔFtail rebuilds vδE for started-to-be-tracked file
t0 := t.CommitTree("øf")
xat[t0.At] = "at0"
t.Logf("# @at0 (%s)", t0.At)
δFtail := NewΔFtail(t.Head().At, t.DB) δFtail := NewΔFtail(t.Head().At, t.DB)
// load dataTab
dataTab := map[string]string{} // ZBlk<name> -> data
for /*oid*/_, zblki := range t.Head().ZBlkTab {
dataTab[zblki.Name] = zblki.Data
}
// create zfile, but do not track it yet
t1 := t.CommitTree(fmt.Sprintf("t0:a D%s", dataTabTxt(dataTab)))
xat[t1.At] = "at1"
t.Logf("# → @at1 (%s) %s\t; not-yet-tracked", t1.At, t1.Tree)
δF, err := δFtail.Update(t1.ΔZ); X(err)
if !(δF.Rev == t1.At && len(δF.ByFile) == 0) {
t.Errorf("wrong δF:\nhave {%s, %v}\nwant: {%s, ø}", δF.Rev, δF.ByFile, t1.At)
}
// load zfile via root['treegen/file'] // load zfile via root['treegen/file']
txn, ctx := transaction.New(context.Background()) txn, ctx := transaction.New(context.Background())
zconn, err := t.DB.Open(ctx, &zodb.ConnOptions{At: t.Head().At, NoPool: true}); X(err) zconn, err := t.DB.Open(ctx, &zodb.ConnOptions{At: t.Head().At, NoPool: true}); X(err)
...@@ -199,28 +216,31 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -199,28 +216,31 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
} }
zfile.PDeactivate() zfile.PDeactivate()
// start track zfile[0,∞) from the beginning
// this should make ΔFtail to see all zfile changes
size, path, err := zfile.Size(ctx); X(err)
δFtail.Track(zfile, /*blk*/-1, path, /*zblk*/nil)
if sizeOK := 1*blksize; size != sizeOK { // NOTE maches t1 commit
t.Fatalf("BUG: zfile size: have %d ; want %d", size, sizeOK)
}
// data built via applying changes from testv // data built via applying changes from testv
vδf := []*ΔFile{} // (rev↑, {}blk) vδf := []*ΔFile{ // (rev↑, {}blk)
//vδE := []_ΔFileEpoch{} // (rev↑, EPOCH) {Rev: t1.At, Epoch: true},
var vδE []_ΔFileEpoch // FIXME change to !nil (^^^) after ΔFtail.rebuild is added }
blkTab := map[int64]string{} // #blk -> ZBlk<name> vδE := []_ΔFileEpoch{ // (rev↑, EPOCH)
dataTab := map[string]string{} // ZBlk<name> -> data {
Rev: t1.At,
oldRoot: zodb.InvalidOid,
newRoot: blktabOid,
newBlkSize: blksize,
oldTrackSetZBlk: nil,
},
}
blkTab := map[int64]string{0:"a"} // #blk -> ZBlk<name>
Zinblk := map[string]setI64{} // ZBlk<name> -> which #blk refer to it Zinblk := map[string]setI64{} // ZBlk<name> -> which #blk refer to it
blkRevAt := map[zodb.Tid]map[int64]zodb.Tid{} // {} at -> {} #blk -> rev blkRevAt := map[zodb.Tid]map[int64]zodb.Tid{} // {} at -> {} #blk -> rev
// initialize dataTab from root['treegen/values']
for /*oid*/_, zblki := range t.Head().ZBlkTab {
dataTab[zblki.Name] = zblki.Data
}
// track zfile[-∞,∞) from the beginning
// this should make ΔFtail to see all zfile changes
size, path, err := zfile.Size(ctx); X(err)
δFtail.Track(zfile, /*blk*/-1, path, /*zblk*/nil)
if size != 0 {
t.Fatalf("BUG: zfile is not initially empty: size=%d", size)
}
// retrack should be called after new epoch to track zfile[-∞,∞) again // retrack should be called after new epoch to track zfile[-∞,∞) again
retrack := func() { retrack := func() {
for blk := range blkTab { for blk := range blkTab {
...@@ -229,11 +249,8 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -229,11 +249,8 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
} }
} }
xat := map[zodb.Tid]string{} // tid > "at<i>"
xat[δFtail.Head()] = "at0"
t.Logf("# @at0 (%s)", δFtail.Head())
epochv := []zodb.Tid{0} epochv := []zodb.Tid{t0.At, t1.At}
// δfstr/vδfstr converts δf/vδf to string taking xat into account // δfstr/vδfstr converts δf/vδf to string taking xat into account
δfstr := func(δf *ΔFile) string { δfstr := func(δf *ΔFile) string {
...@@ -255,7 +272,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -255,7 +272,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
} }
i := 0 i := 1 // matches t1
delfilePrev := false delfilePrev := false
for test := range testq { for test := range testq {
i++ i++
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment