Commit c4a0aa66 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b03da68b
......@@ -48,16 +48,26 @@ type DB struct {
stor IStorage
mu sync.Mutex
// connections nearby current db
// XXX covered by δtail
// live cache is reused through finding conn with nearby at and
// invalidating live objects based on δtail info.
connv []*Connection // order by ↑= .at
// XXX -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case)
// // connections that are too far away from current db
// // not covered by δtail
// historicv []*Connections // XXX needed? (think again)
// δtail of database changes for invalidations
// min(rev) = min(conn.at) for all conn ∈ db (opened and in the pool)
// XXX + min(conn.at) for all conn ∈ waiting/opening.
δtail *ΔTail // [](rev↑, []oid)
// openers waiting for δtail.Head to become covering their at.
δwait map[δwaiter]struct{} // set{(at, ready)} XXX -> set_δwaiter?
// XXX δtail/δwait -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case)
}
// δwaiter represents someone waiting for δtail.Head to become ≥ at.
......@@ -175,12 +185,13 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
head := Tid(0)
if opt.NoSync {
// XXX locking
// XXX prevent retrieved head to be removed from δtail
head = db.δtail.Head() // = 0 if empty
db.mu.Lock()
// XXX prevent retrieved head to be removed from δtail ?
head = db.δtail.Head() // = 0 if δtail was not yet initialized with first event
db.mu.Unlock()
}
// !NoSync or δtail empty
// !NoSync or δtail !initialized
// sync storage for lastTid
if head == 0 {
var err error
......@@ -197,24 +208,71 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at = head
}
// wait till .δtail.head is up to date covering ≥ at
var δready chan struct{}
db.mu.Lock()
δhead := db.δtail.Head()
// XXX prevent head from going away?
if δhead < at {
δready = make(chan struct{})
db.δwait[δwaiter{at, δready}] = struct{}{}
}
db.mu.Unlock()
if δready != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
// check if we already have the exact match
conn = db.get(at, at)
if conn == nil {
switch {
// too far in the past -> historic connection
case at < db.δtail.Tail():
//conn = db.get(at, at)
conn = newConnection(db, at)
// δtail !initialized yet
case db.δtail.Head() == 0:
// XXX δtail could be not yet initialized, but e.g. last_tid changed
// -> we have to wait for δtail not to loose just-released live cache
conn = newConnection(db, at)
// we already have some history coverage
default:
if at > δhead {
// XXX wait
// XXX -> retry loop (δtail.tail might go over at)
}
// at ∈ [δtail, δhead]
conn = get(δtail.Tail(), at)
if conn == nil {
conn = newConnection(db, at)
} else {
// invalidate changed live objects
for _, δ := range δtail.Slice(conn.at, at) {
for _, oid := range δ.changev {
obj := conn.cache.Get(oid)
if obj != nil {
obj.PInvalidate()
}
}
}
conn.at = at
}
}
db.mu.Unlock()
// wait till .δtail.head is up to date covering ≥ at
var δready chan struct{}
db.mu.Lock()
δhead := db.δtail.Head()
// XXX prevent head from going away?
if δhead < at {
δready = make(chan struct{})
db.δwait[δwaiter{at, δready}] = struct{}{}
}
db.mu.Unlock()
if δready != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-δready:
// ok
case <-δready:
// ok
}
}
}
......@@ -229,11 +287,14 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// get returns connection from db pool most close to at.
//
// it creates new one if there is no close-enough connection in the pool.
// it creates new one if there is no close-enough connection in the pool. XXX -> no
// XXX -> must be run with db.mu locked.
func (db *DB) get(at Tid) *Connection {
db.mu.Lock()
defer db.mu.Unlock()
// XXX at < δtail.Tail -> getHistoric; else -> here
l := len(db.connv)
// find connv index corresponding to at:
......
......@@ -56,17 +56,11 @@ import (
// #blk - file block number, when ΔTail represents changes to a file.
type ΔTail struct {
head Tid
tailv []δRevEntry
tailv []CommitEvent
lastRevOf map[Oid]Tid // index for LastRevOf queries
// TODO also add either tailv idx <-> rev index, or lastRevOf -> tailv idx
// (if linear back-scan of δRevEntry starts to eat cpu).
}
// δRevEntry represents information of what have been changed in one revision.
type δRevEntry struct {
rev Tid
changev []Oid
// (if linear back-scan of CommitEvent starts to eat cpu).
}
// NewΔTail creates new ΔTail object.
......@@ -82,7 +76,39 @@ func (δtail *ΔTail) Head() Tid {
return δtail.head
}
// XXX + Tail?
// XXX doc XXX tests XXX Tail -> End? Back?
func (δtail *ΔTail) Tail() Tid {
if len(δtail.tailv) > 0 {
return δtail.tailv[0].rev
}
return δtail.head
}
// XXX -> git tailv subslice in (low, high]
// XXX tail <= low <= high <= head, else panic
func (δtail *ΔTail) Slice(low, high Tid) []CommitEvent {
tail := δtail.Tail()
head := δtail.head
if !(tail <= low && low <= high && high <= head) {
panic(fmt.Sprintf("δtail.Slice: (%s, %s] invalid; tail..head = %s..%s", low, high, tail, head))
}
// ex (0,0] tail..head = 0..0
if len(tailv) == 0 {
return tailv
}
// find max j : [j].rev <= high
j := len(tailv)-1
for ; tailv[j].rev > high; j-- {}
// find max i : [i].rev > low
i := j
for ; i >= 0 && tailv[i].rev > low; i-- {}
i++
return tailv[i:j]
}
// XXX add way to extend coverage without appending changed data? (i.e. if a
// txn did not change file at all) -> but then it is simply .Append(rev, nil)?
......@@ -97,7 +123,7 @@ func (δtail *ΔTail) Append(rev Tid, changev []Oid) {
}
δtail.head = rev
δtail.tailv = append(δtail.tailv, δRevEntry{rev, changev})
δtail.tailv = append(δtail.tailv, CommitEvent{rev, changev})
for _, id := range changev {
δtail.lastRevOf[id] = rev
}
......@@ -125,7 +151,7 @@ func (δtail *ΔTail) ForgetBefore(revCut Tid) {
// 1) growing underlying storage array indefinitely
// 2) keeping underlying storage after forget
l := len(δtail.tailv)-icut
tailv := make([]δRevEntry, l)
tailv := make([]CommitEvent, l)
copy(tailv, δtail.tailv[icut:])
δtail.tailv = tailv
}
......
......@@ -29,7 +29,7 @@
# The types that are emitted are
#
# ΔTail
# δRevEntry
# ΔRevEntry
input=$(dirname $0)/δtail.go
......@@ -37,4 +37,14 @@ cat "$input" |sed \
-e 's/package zodb/package PACKAGE/g' \
-e '/package PACKAGE/a \\nimport "lab.nexedi.com/kirr/neo/go/zodb"' \
-e 's/Tid/zodb.Tid/g' \
-e 's/Oid/ID/g'
-e 's/Oid/ID/g' \
-e 's/CommitEvent/ΔRevEntry/g'
cat <<EOF
// ΔRevEntry represents information of what have been changed in one revision.
type ΔRevEntry struct {
Rev Tid
Changev []ID
}
EOF
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment