Commit 4ef8dd3c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 700f6028
......@@ -88,6 +88,7 @@ import (
"strings"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -95,7 +96,7 @@ import (
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xtail"
)
//type zodbOid = zodb.Oid XXX kill probably
//type zodbOid = zodb.Oid XXX kill
const traceΔBtail = false
const debugΔBtail = false
......@@ -387,6 +388,7 @@ func (δBtail *ΔBtail) track(path []zodb.Oid, keycov KeyRange) {
}
return
}
// XXX also check if keycov ∈ krebuildJobs
// queue path into trackNew
δTtail, ok := δBtail.byRoot[root]
......@@ -397,10 +399,13 @@ func (δBtail *ΔBtail) track(path []zodb.Oid, keycov KeyRange) {
δBtail.trackNewRoots.Add(root)
δTtail.trackNew.AddPath(path)
tracefΔBtail("->T: [%s].trackNew -> %s\n", root, δTtail.trackNew)
δTtail.ktrackNew.AddRange(keycov)
tracefΔBtail("->T: [%s].trackNew -> %s\n", root, δTtail.trackNew)
tracefΔBtail("->T: [%s].ktrackNew -> %s\n", root, δTtail.ktrackNew)
}
// rebuildAll rebuilds ΔBtail taking all trackNew requests into account.
// XXX inline into _Update1 ?
func (δBtail *ΔBtail) rebuildAll() (err error) {
defer xerr.Context(&err, "ΔBtail rebuildAll")
// XXX locking
......@@ -427,6 +432,215 @@ func (δBtail *ΔBtail) rebuild1IfNeeded(root zodb.Oid) error {
return δBtail.rebuild1(root)
}
// vδTSnapWithTrackedKey returns vδT snapshot for root that takes into account all previous Track requests related to key.
func (δBtail *ΔBtail) vδTSnapWithTrackedKey(root zodb.Oid, key Key) (vδT []ΔTree, err error) {
// XXX δBtail.lock
δTtail := δBtail.byRoot[root] // must be there
if !δTtail.ktrackNew.Has(key) {
// key ∉ ktrackNew
job, inJobs := δTtail.krebuildJobs.Get_(key)
if !inJobs {
// key ∉ krebuildJobs -> it should be already in trackSet
vδT = δTtail.vδT
// XXX δBtail.unlock
return vδT, nil
}
// rebuild for root[key] is in progress -> wait for corresponding job to complete
// XXX δBtail.unlock
<-job.ready
if job.err == nil {
// XXX δBtail.lock
vδT = δTtail.vδT
// XXX δBtail.unlock
}
return vδT, job.err
}
// key ∈ ktrackNew -> this goroutine becomes responsible to start rebuilding vδT for it
// lauch rebuild job for all keys queued in ktrackNew so far
err = δTtail._runRebuildJob(root, δBtail)
if err != nil {
return nil, err
}
// XXX δBtail.lock
vδT = δTtail.vδT
// XXX δBtail.unlock
return vδT, nil
}
// vδTSnapWithTracked returns vδT snapshot for root that takes into account all previous Track requests.
func (δBtail *ΔBtail) vδTSnapWithTracked(root zodb.Oid) (vδT []ΔTree, err error) {
// XXX δBtail.lock
δTtail := δBtail.byRoot[root] // must be there
// prepare to wait for all already running jobs, if any
wg := xsync.NewWorkGroup(context.Background())
for _, e := range δTtail.krebuildJobs.AllRanges() {
job := e.Value
wg.Go(func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-job.ready:
return job.err
}
})
}
// run new rebuild job if there are not-yet-handled Track requests
var errJob error
if !δTtail.ktrackNew.Empty() {
errJob = δTtail._runRebuildJob(root, δBtail)
}
// XXX δBtail.unlock
errWait := wg.Wait()
err = xerr.First(errJob, errWait)
if err != nil {
return nil, err
}
// XXX δBtail.lock
vδT = δTtail.vδT
// XXX δBtail.unlock
return vδT, nil
}
// _runRebuildJob runs rebuild job for current .ktrackNew/.trackNew
// must be called with δBtail locked.
func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err error) {
// XXX errctx
job := &_RebuildJob{ready: make(chan struct{})}
trackNew := δTtail.trackNew
rebuildKeys := δTtail.ktrackNew
δTtail.trackNew = blib.PPTreeSubSet{}
δTtail.ktrackNew = blib.RangedKeySet{}
// krebuildJobs += rebuildKeys
for _, r := range rebuildKeys.AllRanges() {
// XXX assert !krebuildJobs.IntersectsRange(e.KeyRange)
δTtail.krebuildJobs.SetRange(r, job)
}
delete(δBtail.trackNewRoots, root)
// XXX δBtail.unlock
vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db)
// XXX δBtail.lock
// krebuildJobs -= rebuildKeys
for _, r := range rebuildKeys.AllRanges() {
// XXX assert krebuildJobs[r] == job
δTtail.krebuildJobs.DelRange(r)
}
// merge rebuild result
if err != nil {
// XXX comment about RCU snapshot
δTtail.vδT = vδTClone(δTtail.vδT)
δrevSet := vδTMergeInplace(&δTtail.vδT, vδTnew)
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
}
// we are done
job.err = err
close(job.ready)
// XXX δBtail.unlock
return job.err
}
/*
// vδTSnapWithTracked returns vδT snapshot for root that takes into account all previous Track requests.
func (δBtail *ΔBtail) vδTSnapWithTracked(root zodb.Oid) (vδT []ΔTree, err error) {
// XXX δBtail.lock
δTtail := δBtail.byRoot[root] // must be there
/ * XXX check .trackNewRoots ?
_, ok := δBtail.trackNewRoots[root]
if !ok {
// XXX δBtail.unlock
return nil
}
* /
if !δTtail.ktrackNew.Has(key) {
// key ∉ ktrackNew
job, inJobs := δTtail.krebuildJobs.Get_(key)
if !inJobs {
// key ∉ krebuildJobs -> it should be already in trackSet
vδT = δTtail.vδT
// XXX δBtail.unlock
return vδT, nil
}
// rebuild for root[key] is in progress -> wait for corresponding job to complete
// XXX δBtail.unlock
<-job.ready
if job.err == nil {
// XXX δBtail.lock
vδT = δTtail.vδT
// XXX δBtail.unlock
}
return vδT, job.err
}
// key ∈ ktrackNew -> this goroutine becomes responsible to start rebuilding vδT for it
// lauch rebuild job for all keys queued in ktrackNew so far
job := &_RebuildJob{ready: make(chan struct{})}
trackNew := δTtail.trackNew
rebuildKeys := δTtail.ktrackNew
δTtail.trackNew = blib.PPTreeSubSet{}
δTtail.ktrackNew = blib.RangedKeySet{}
// krebuildJobs += rebuildKeys
for _, r := range rebuildKeys.AllRanges() {
// XXX assert !krebuildJobs.IntersectsRange(e.KeyRange)
δTtail.krebuildJobs.SetRange(r, job)
}
delete(δBtail.trackNewRoots, root)
// XXX δBtail.unlock
vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db)
// XXX δBtail.lock
// krebuildJobs -= rebuildKeys
for _, r := range rebuildKeys.AllRanges() {
// XXX assert krebuildJobs[r] == job
δTtail.krebuildJobs.DelRange(r)
}
// merge rebuild result
if err != nil {
// XXX comment about RCU snapshot
δTtail.vδT = vδTClone(δTtail.vδT)
δrevSet := vδTMergeInplace(&δTtail.vδT, vδTnew)
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
}
// we are done
job.err = err
close(job.ready)
// XXX δBtail.unlock
return job.err
}
*/
// rebuild1KeyIfNeeded rebuilds ΔBtail for single root if that root[key] needs rebuilding.
func (δBtail *ΔBtail) rebuild1KeyIfNeeded(root zodb.Oid, key Key) error {
// XXX δBtail.lock
......@@ -672,7 +886,6 @@ func vδTBuild1(atPrev zodb.Tid, δZ zodb.ΔRevEntry, trackNew blib.PPTreeSubSet
return δT, δtrack, δtkeycov, nil
}
// vδTMergeInplace merges vδTnew into vδT.
//
// δrevSet indicates set of new revisions created in vδT.
......@@ -775,6 +988,8 @@ func widenTrackNew(trackNew blib.PPTreeSubSet, δtkeycov *blib.RangedKeySet, roo
//
// TODO optionally accept zconnOld/zconnNew from client
func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
// XXX locking
headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid)
......@@ -848,6 +1063,8 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e
for _, root := range δBtail.trackNewRoots.SortedElements() {
δTtail := δBtail.byRoot[root]
tracefΔBtail("[%s].trackNew: %v\n", root, δTtail.trackNew)
// XXX ktrackNew
// XXX krebuildJobs
}
δB1 = _ΔBUpdate1{ByRoot: make(map[zodb.Oid]*_ΔTUpdate1)}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment