Commit a83a83b0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 33abf5f4
...@@ -85,6 +85,7 @@ import ( ...@@ -85,6 +85,7 @@ import (
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
"sync"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
...@@ -142,6 +143,16 @@ type ΔBtail struct { ...@@ -142,6 +143,16 @@ type ΔBtail struct {
// includes all changed objects, not only tracked ones. // includes all changed objects, not only tracked ones.
δZtail *zodb.ΔTail δZtail *zodb.ΔTail
// handle to make connections to access database.
// TODO allow client to optionally provide zconnOld/zconnNew on e.g. Update()
db *zodb.DB // to open connections to load new/old tree|buckets
// mu protects ΔBtail data _and_ all _ΔTtail data for all roots.
//
// NOTE: even though this lock is global, since _ΔTtail.vδT is updated
// via RCU, working with retrieved vδT snapshot does not need to hold the lock.
mu sync.Mutex
vδBroots []_ΔBroots // [] (rev↑, roots changed in this rev) vδBroots []_ΔBroots // [] (rev↑, roots changed in this rev)
byRoot map[zodb.Oid]*_ΔTtail // {} root -> [] k/v change history; only for keys ∈ tracked subset byRoot map[zodb.Oid]*_ΔTtail // {} root -> [] k/v change history; only for keys ∈ tracked subset
...@@ -153,10 +164,6 @@ type ΔBtail struct { ...@@ -153,10 +164,6 @@ type ΔBtail struct {
// set of trees for which _ΔTtail.ktrackNew is non-empty // set of trees for which _ΔTtail.ktrackNew is non-empty
trackNewRoots setOid trackNewRoots setOid
// handle to make connections to access database.
// TODO allow client to optionally provide zconnOld/zconnNew on e.g. Update()
db *zodb.DB // to open connections to load new/old tree|buckets
} }
// _ΔTtail represent tail of revisional changes to one BTree. // _ΔTtail represent tail of revisional changes to one BTree.
...@@ -430,9 +437,10 @@ func (δBtail *ΔBtail) rebuildAll() (err error) { ...@@ -430,9 +437,10 @@ func (δBtail *ΔBtail) rebuildAll() (err error) {
// //
// vδT is rebuilt if there are such not-yet-handled Track requests. // vδT is rebuilt if there are such not-yet-handled Track requests.
func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []ΔTree, err error) { func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []ΔTree, err error) {
// XXX δBtail.lock δBtail.mu.Lock()
δTtail := δBtail.byRoot[root] // must be there δTtail := δBtail.byRoot[root] // must be there
if δTtail == nil { if δTtail == nil {
δBtail.mu.Unlock()
panicf("δBtail: root<%s> not tracked", root) panicf("δBtail: root<%s> not tracked", root)
} }
...@@ -444,31 +452,30 @@ func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []Δ ...@@ -444,31 +452,30 @@ func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []Δ
if !inJobs { if !inJobs {
// key ∉ krebuildJobs -> it should be already in trackSet // key ∉ krebuildJobs -> it should be already in trackSet
vδT = δTtail.vδT vδT = δTtail.vδT
// XXX δBtail.unlock δBtail.mu.Unlock()
return vδT, nil return vδT, nil
} }
// rebuild for root[key] is in progress -> wait for corresponding job to complete // rebuild for root[key] is in progress -> wait for corresponding job to complete
// XXX δBtail.unlock δBtail.mu.Unlock()
<-job.ready <-job.ready
if job.err == nil { if job.err == nil {
// XXX δBtail.lock δBtail.mu.Lock()
vδT = δTtail.vδT vδT = δTtail.vδT
// XXX δBtail.unlock δBtail.mu.Unlock()
} }
return vδT, job.err return vδT, job.err
} }
// key ∈ ktrackNew -> this goroutine becomes responsible to start rebuilding vδT for it // key ∈ ktrackNew -> this goroutine becomes responsible to start rebuilding vδT for it
// launch rebuild job for all keys queued in ktrackNew so far // run rebuild job for all keys queued in ktrackNew so far
err = δTtail._runRebuildJob(root, δBtail) err = δTtail._runRebuildJob(root, δBtail)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// XXX δBtail.lock
vδT = δTtail.vδT vδT = δTtail.vδT
// XXX δBtail.unlock δBtail.mu.Unlock()
return vδT, nil return vδT, nil
} }
...@@ -478,9 +485,10 @@ func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []Δ ...@@ -478,9 +485,10 @@ func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []Δ
// //
// vδT is rebuilt if there are such not-yet-handled Track requests. // vδT is rebuilt if there are such not-yet-handled Track requests.
func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err error) { func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err error) {
// XXX δBtail.lock δBtail.mu.Lock()
δTtail := δBtail.byRoot[root] // must be there δTtail := δBtail.byRoot[root] // must be there
if δTtail == nil { if δTtail == nil {
δBtail.mu.Unlock()
panicf("δBtail: root<%s> not tracked", root) panicf("δBtail: root<%s> not tracked", root)
} }
...@@ -504,7 +512,8 @@ func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err er ...@@ -504,7 +512,8 @@ func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err er
errJob = δTtail._runRebuildJob(root, δBtail) errJob = δTtail._runRebuildJob(root, δBtail)
} }
// XXX δBtail.unlock // wait for previous jobs to complete as well
δBtail.mu.Unlock()
errWait := wg.Wait() errWait := wg.Wait()
err = xerr.First(errJob, errWait) err = xerr.First(errJob, errWait)
...@@ -512,15 +521,18 @@ func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err er ...@@ -512,15 +521,18 @@ func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err er
return nil, err return nil, err
} }
// XXX δBtail.lock // now it is ok to take the snapshot
δBtail.mu.Lock()
vδT = δTtail.vδT vδT = δTtail.vδT
// XXX δBtail.unlock δBtail.mu.Unlock()
return vδT, nil return vδT, nil
} }
// _runRebuildJob runs rebuild job for current .ktrackNew/.trackNew // _runRebuildJob runs rebuild job for current .ktrackNew/.trackNew
// must be called with δBtail locked. //
// must be called with δBtail.mu locked.
// returns with δBtail.mu locked.
func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err error) { func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err error) {
// XXX errctx // XXX errctx
job := &_RebuildJob{ready: make(chan struct{})} job := &_RebuildJob{ready: make(chan struct{})}
...@@ -536,11 +548,11 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er ...@@ -536,11 +548,11 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er
δTtail.krebuildJobs.SetRange(r, job) δTtail.krebuildJobs.SetRange(r, job)
} }
delete(δBtail.trackNewRoots, root) delete(δBtail.trackNewRoots, root)
// XXX δBtail.unlock
// build δ(vδT) without the lock
δBtail.mu.Unlock()
vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db) vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db)
δBtail.mu.Lock()
// XXX δBtail.lock
// krebuildJobs -= rebuildKeys // krebuildJobs -= rebuildKeys
for _, r := range rebuildKeys.AllRanges() { for _, r := range rebuildKeys.AllRanges() {
...@@ -560,10 +572,7 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er ...@@ -560,10 +572,7 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er
// we are done // we are done
job.err = err job.err = err
close(job.ready) close(job.ready)
// XXX δBtail.unlock
return job.err return job.err
} }
// rebuild1 rebuilds ΔBtail for single root. // rebuild1 rebuilds ΔBtail for single root.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment