Commit ace9b05b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c9c57dcf
...@@ -850,7 +850,10 @@ retry: ...@@ -850,7 +850,10 @@ retry:
// invalidate kernel cache for data in changed files // invalidate kernel cache for data in changed files
// NOTE no δFmu lock needed because zhead is WLocked // NOTE no δFmu lock needed because zhead is WLocked
δF := bfdir.δFtail.Update(δZ, zhead) // δF <- δZ |tracked δF, err := bfdir.δFtail.Update(δZ, zhead) // δF <- δZ |tracked
if err != nil {
return err
}
if false { // XXX -> V(2) ? if false { // XXX -> V(2) ?
// debug dump δF // debug dump δF
...@@ -2364,7 +2367,7 @@ func _main() (err error) { ...@@ -2364,7 +2367,7 @@ func _main() (err error) {
fsNode: newFSNode(fSticky), fsNode: newFSNode(fSticky),
head: head, head: head,
fileTab: make(map[zodb.Oid]*BigFile), fileTab: make(map[zodb.Oid]*BigFile),
δFtail: NewΔFtail(zhead.At()), δFtail: NewΔFtail(zhead.At(), zdb),
} }
head.bfdir = bfdir head.bfdir = bfdir
......
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"fmt" "fmt"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/btree" "lab.nexedi.com/kirr/neo/go/zodb/btree"
) )
...@@ -148,11 +149,15 @@ type ΔTree struct { ...@@ -148,11 +149,15 @@ type ΔTree struct {
// //
// Initial tracked set is empty. // Initial tracked set is empty.
// Initial coverage is (at₀, at₀]. // Initial coverage is (at₀, at₀].
func NewΔBtail(at0 zodb.Tid) *ΔBtail { //
// db will be used by ΔBtail to open database connections ... XXX
// XXX or caller provides zhead/zprev explicitly?
func NewΔBtail(at0 zodb.Tid, db *zodb.DB) *ΔBtail {
return &ΔBtail{ return &ΔBtail{
δZtail: zodb.NewΔTail(at0), δZtail: zodb.NewΔTail(at0),
byRoot: make(map[zodb.Oid]*ΔTtail), byRoot: make(map[zodb.Oid]*ΔTtail),
trackIdx: make(map[zodb.Oid]SetOid), trackIdx: make(map[zodb.Oid]SetOid),
db: db,
} }
} }
...@@ -227,7 +232,10 @@ func (δBtail *ΔBtail) Track(path []Node, flags TrackFlags) { // XXX Tree|Bucke ...@@ -227,7 +232,10 @@ func (δBtail *ΔBtail) Track(path []Node, flags TrackFlags) { // XXX Tree|Bucke
// only those keys, that correspond to tracked subset of δZ. // only those keys, that correspond to tracked subset of δZ.
// //
// δZ should include all objects changed by ZODB transaction. // δZ should include all objects changed by ZODB transaction.
func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) ΔB { func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail update %s -> %s", headOld, δZ.Tid)
δBtail.δZtail.Append(δZ.Tid, δZ.Changev) δBtail.δZtail.Append(δZ.Tid, δZ.Changev)
// {} root -> []oid changed under that root in tracked set // {} root -> []oid changed under that root in tracked set
...@@ -244,6 +252,27 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) ΔB { ...@@ -244,6 +252,27 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) ΔB {
δB := ΔB{Rev: δZ.Tid, ByRoot: make(map[zodb.Oid]map[Key]Value)} δB := ΔB{Rev: δZ.Tid, ByRoot: make(map[zodb.Oid]map[Key]Value)}
// skip opening DB connections if there is no change to any tree node
if len(δZByRoot) == 0 {
return δB, nil
}
// open ZODB connections correponding to "old" and "new" states
// TODO caller should provide one of those (probably new)
txn, ctx := transaction.New(context.TODO()) // XXX
defer txn.Abort()
zconnOld, err := δBtail.db.Open(ctx, &zodb.ConnOptions{At: headOld})
if err != nil {
return ΔB{}, err
}
zconnNew, err := δBtail.db.Open(ctx, &zodb.ConnOptions{At: δZ.Tid})
if err != nil {
return ΔB{}, err
}
_ = zconnOld
_ = zconnNew
for root := range δZByRoot { for root := range δZByRoot {
δT, ok := δB.ByRoot[root] δT, ok := δB.ByRoot[root]
if !ok { if !ok {
...@@ -253,7 +282,7 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) ΔB { ...@@ -253,7 +282,7 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) ΔB {
// XXX stub to get file.size invalidation working // XXX stub to get file.size invalidation working
// TODO update δT // TODO update δT
} }
return δB return δB, nil
/* TODO btree-diff: /* TODO btree-diff:
...@@ -275,6 +304,8 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) ΔB { ...@@ -275,6 +304,8 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) ΔB {
*/ */
} }
func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) { func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
δBtail.δZtail.ForgetPast(revCut) // XXX stub δBtail.δZtail.ForgetPast(revCut) // XXX stub
} }
......
...@@ -521,7 +521,7 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid, ...@@ -521,7 +521,7 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid,
// zconn, δbtail @at1 with initial tracked set // zconn, δbtail @at1 with initial tracked set
zconn, err := db.Open(ctx, &zodb.ConnOptions{At: at1}); X(err) zconn, err := db.Open(ctx, &zodb.ConnOptions{At: at1}); X(err)
δbtail := NewΔBtail(zconn.At()) δbtail := NewΔBtail(zconn.At(), db)
xtree, err := zconn.Get(ctx, treeRoot); X(err) xtree, err := zconn.Get(ctx, treeRoot); X(err)
ztree := xtree.(*Tree) ztree := xtree.(*Tree)
...@@ -568,7 +568,7 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid, ...@@ -568,7 +568,7 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid,
} }
// δB <- δZ // δB <- δZ
δB := δbtail.Update(δZ) δB, err := δbtail.Update(δZ); X(err)
if δB.Rev != δZ.Tid { if δB.Rev != δZ.Tid {
badf("δB: rev != δZ.Tid ; rev=%s δZ.Tid=%s", δB.Rev, δZ.Tid) badf("δB: rev != δZ.Tid ; rev=%s δZ.Tid=%s", δB.Rev, δZ.Tid)
return return
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"runtime" "runtime"
"sync" "sync"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/btree" "lab.nexedi.com/kirr/neo/go/zodb/btree"
) )
...@@ -110,9 +111,11 @@ func (z *zblkInΔFtail) inΔFtail() *zblkInΔFtail { return z } ...@@ -110,9 +111,11 @@ func (z *zblkInΔFtail) inΔFtail() *zblkInΔFtail { return z }
// //
// Initial tracked set is empty. // Initial tracked set is empty.
// Initial coverage of created ΔFtail is (at₀, at₀]. // Initial coverage of created ΔFtail is (at₀, at₀].
func NewΔFtail(at0 zodb.Tid) *ΔFtail { //
// XXX db
func NewΔFtail(at0 zodb.Tid, db *zodb.DB) *ΔFtail {
return &ΔFtail{ return &ΔFtail{
δBtail: NewΔBtail(at0), δBtail: NewΔBtail(at0, db),
fileIdx: make(map[zodb.Oid]SetBigFile), fileIdx: make(map[zodb.Oid]SetBigFile),
trackNew: make(map[*BigFile]map[zodb.Oid]*zblkInΔFtail), trackNew: make(map[*BigFile]map[zodb.Oid]*zblkInΔFtail),
} }
...@@ -194,10 +197,17 @@ func (δFtail *ΔFtail) Track(file *BigFile, blk int64, path []btree.LONode, zbl ...@@ -194,10 +197,17 @@ func (δFtail *ΔFtail) Track(file *BigFile, blk int64, path []btree.LONode, zbl
// Zhead must be active connection at δFtail.Head() database state. // Zhead must be active connection at δFtail.Head() database state.
// Objects in Zhead must not be modified. // Objects in Zhead must not be modified.
// During call to Update zhead must not be otherwise used - even for reading. // During call to Update zhead must not be otherwise used - even for reading.
func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) ΔF { func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) (_ ΔF, err error) {
defer xerr.Contextf(&err, "ΔFtail update %s -> %s", δFtail.Head(), δZ.Tid)
// XXX δFtail.update() first? // XXX δFtail.update() first?
δB := δFtail.δBtail.Update(δZ) // XXX verify zhead.At() == δFtail.Head()
δB, err := δFtail.δBtail.Update(δZ)
if err != nil {
return ΔF{}, err
}
δF := ΔF{Rev: δB.Rev, ByFile: make(map[*BigFile]*ΔFile)} δF := ΔF{Rev: δB.Rev, ByFile: make(map[*BigFile]*ΔFile)}
// take btree changes into account // take btree changes into account
...@@ -268,7 +278,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) ΔF { ...@@ -268,7 +278,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit, zhead *ZConn) ΔF {
} }
δFtail.vδF = append(δFtail.vδF, δF) δFtail.vδF = append(δFtail.vδF, δF)
return δF return δF, nil
} }
// update processes new track requests and updates vδF. // update processes new track requests and updates vδF.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment