Commit 6dfc69dd authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c4a0aa66
...@@ -211,9 +211,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -211,9 +211,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
db.mu.Lock() db.mu.Lock()
// check if we already have the exact match // check if we already have the exact match
conn = db.get(at, at) conn := db.get(at, at)
if conn == nil { if conn == nil {
δtail := db.δtail // XXX
δhead := db.δtail.Head() // XXX
switch { switch {
// too far in the past -> historic connection // too far in the past -> historic connection
case at < db.δtail.Tail(): case at < db.δtail.Tail():
...@@ -234,12 +237,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -234,12 +237,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
// at ∈ [δtail, δhead] // at ∈ [δtail, δhead]
conn = get(δtail.Tail(), at) conn = db.get(δtail.Tail(), at)
if conn == nil { if conn == nil {
conn = newConnection(db, at) conn = newConnection(db, at)
} else { } else {
// invalidate changed live objects // invalidate changed live objects
for _, δ := range δtail.Slice(conn.at, at) { for _, δ := range δtail.SliceByRev(conn.at, at) {
for _, oid := range δ.changev { for _, oid := range δ.changev {
obj := conn.cache.Get(oid) obj := conn.cache.Get(oid)
if obj != nil { if obj != nil {
...@@ -257,7 +260,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -257,7 +260,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// wait till .δtail.head is up to date covering ≥ at // wait till .δtail.head is up to date covering ≥ at
var δready chan struct{} var δready chan struct{}
db.mu.Lock() db.mu.Lock()
δhead := db.δtail.Head() δhead = db.δtail.Head()
// XXX prevent head from going away? // XXX prevent head from going away?
if δhead < at { if δhead < at {
δready = make(chan struct{}) δready = make(chan struct{})
...@@ -278,7 +281,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -278,7 +281,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// now we have both at and invalidation data covering it -> proceed to // now we have both at and invalidation data covering it -> proceed to
// get connection from the pool. // get connection from the pool.
conn := db.get(at) conn = db.get(at)
conn.txn = txn conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn)) txn.RegisterSync((*connTxnSync)(conn))
...@@ -289,7 +292,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -289,7 +292,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// //
// it creates new one if there is no close-enough connection in the pool. XXX -> no // it creates new one if there is no close-enough connection in the pool. XXX -> no
// XXX -> must be run with db.mu locked. // XXX -> must be run with db.mu locked.
func (db *DB) get(at Tid) *Connection { func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to make it temp. compile
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
......
...@@ -30,7 +30,7 @@ import ( ...@@ -30,7 +30,7 @@ import (
// //
// It semantically consists of // It semantically consists of
// //
// [](rev↑, []id) XXX + head? // [](rev↑, []id) ; rev ∈ [tail, head]
// //
// and index // and index
// //
...@@ -40,13 +40,15 @@ import ( ...@@ -40,13 +40,15 @@ import (
// //
// rev - is ZODB revision, and // rev - is ZODB revision, and
// id - is an identifier of what has been changed(*) // id - is an identifier of what has been changed(*)
// [tail, head] - is covered revision range
// //
// It provides operations to // It provides operations to
// //
// - append information to the tail about next revision, // - append information to the tail about next revision,
// - forget information in the tail past specified revision, and // - forget information in the tail past specified revision, and
// - query the tail for len, head and tail.
// - query the tail for slice with rev ∈ (lo, hi].
// - query the tail about what is last revision that changed an id. // - query the tail about what is last revision that changed an id.
// - query the tail about what head/tail XXX?
// //
// ΔTail is safe to access for multiple-readers / single writer. // ΔTail is safe to access for multiple-readers / single writer.
// //
...@@ -56,11 +58,19 @@ import ( ...@@ -56,11 +58,19 @@ import (
// #blk - file block number, when ΔTail represents changes to a file. // #blk - file block number, when ΔTail represents changes to a file.
type ΔTail struct { type ΔTail struct {
head Tid head Tid
tailv []CommitEvent tailv []δRevEntry
lastRevOf map[Oid]Tid // index for LastRevOf queries lastRevOf map[Oid]Tid // index for LastRevOf queries
// TODO also add either tailv idx <-> rev index, or lastRevOf -> tailv idx // TODO also add either tailv idx <-> rev index, or lastRevOf -> tailv idx
// (if linear back-scan of CommitEvent starts to eat cpu). // (if linear back-scan of δRevEntry starts to eat cpu).
}
// δRevEntry represents information of what have been changed in one revision.
//
// XXX -> CommitEvent?
type δRevEntry struct {
rev Tid
changev []Oid
} }
// NewΔTail creates new ΔTail object. // NewΔTail creates new ΔTail object.
...@@ -68,7 +78,12 @@ func NewΔTail() *ΔTail { ...@@ -68,7 +78,12 @@ func NewΔTail() *ΔTail {
return &ΔTail{lastRevOf: make(map[Oid]Tid)} return &ΔTail{lastRevOf: make(map[Oid]Tid)}
} }
// Head returns database state starting from which δtail has history coverage. XXX // Len returns number of elements.
func (δtail *ΔTail) Len() int {
return len(δtail.tailv)
}
// Head returns newest database state for which δtail has history coverage.
// //
// For newly created ΔTail Head returns 0. // For newly created ΔTail Head returns 0.
// Head is ↑, in particular it does not go back to 0 when δtail becomes empty. // Head is ↑, in particular it does not go back to 0 when δtail becomes empty.
...@@ -76,7 +91,10 @@ func (δtail *ΔTail) Head() Tid { ...@@ -76,7 +91,10 @@ func (δtail *ΔTail) Head() Tid {
return δtail.head return δtail.head
} }
// XXX doc XXX tests XXX Tail -> End? Back? // Tail returns oldest database state for which δtail has history coverage.
//
// For newly created ΔTail Tail returns 0.
// Tail is ↑, in particular it does not go back to 0 when δtail becomes empty.
func (δtail *ΔTail) Tail() Tid { func (δtail *ΔTail) Tail() Tid {
if len(δtail.tailv) > 0 { if len(δtail.tailv) > 0 {
return δtail.tailv[0].rev return δtail.tailv[0].rev
...@@ -84,25 +102,34 @@ func (δtail *ΔTail) Tail() Tid { ...@@ -84,25 +102,34 @@ func (δtail *ΔTail) Tail() Tid {
return δtail.head return δtail.head
} }
// XXX -> git tailv subslice in (low, high] // SliceByRev returns δtail slice with .rev ∈ (low, high].
// XXX tail <= low <= high <= head, else panic //
func (δtail *ΔTail) Slice(low, high Tid) []CommitEvent { // it must be called with the following condition:
//
// tail ≤ low ≤ high ≤ head
//
// the caller must not modify returned slice.
//
// Note: contrary to regular go slicing, low is exclisive while high is inclusive.
func (δtail *ΔTail) SliceByRev(low, high Tid) /*readonly*/ []δRevEntry {
tail := δtail.Tail() tail := δtail.Tail()
head := δtail.head head := δtail.head
if !(tail <= low && low <= high && high <= head) { if !(tail <= low && low <= high && high <= head) {
panic(fmt.Sprintf("δtail.Slice: (%s, %s] invalid; tail..head = %s..%s", low, high, tail, head)) panic(fmt.Sprintf("δtail.Slice: (%s, %s] invalid; tail..head = %s..%s", low, high, tail, head))
} }
tailv := δtail.tailv
// ex (0,0] tail..head = 0..0 // ex (0,0] tail..head = 0..0
if len(tailv) == 0 { if len(tailv) == 0 {
return tailv return tailv
} }
// find max j : [j].rev <= high // find max j : [j].rev <= high XXX linear scan
j := len(tailv)-1 j := len(tailv)-1
for ; tailv[j].rev > high; j-- {} for ; tailv[j].rev > high; j-- {}
// find max i : [i].rev > low // find max i : [i].rev > low XXX linear scan
i := j i := j
for ; i >= 0 && tailv[i].rev > low; i-- {} for ; i >= 0 && tailv[i].rev > low; i-- {}
i++ i++
...@@ -123,7 +150,7 @@ func (δtail *ΔTail) Append(rev Tid, changev []Oid) { ...@@ -123,7 +150,7 @@ func (δtail *ΔTail) Append(rev Tid, changev []Oid) {
} }
δtail.head = rev δtail.head = rev
δtail.tailv = append(δtail.tailv, CommitEvent{rev, changev}) δtail.tailv = append(δtail.tailv, δRevEntry{rev, changev})
for _, id := range changev { for _, id := range changev {
δtail.lastRevOf[id] = rev δtail.lastRevOf[id] = rev
} }
...@@ -151,7 +178,7 @@ func (δtail *ΔTail) ForgetBefore(revCut Tid) { ...@@ -151,7 +178,7 @@ func (δtail *ΔTail) ForgetBefore(revCut Tid) {
// 1) growing underlying storage array indefinitely // 1) growing underlying storage array indefinitely
// 2) keeping underlying storage after forget // 2) keeping underlying storage after forget
l := len(δtail.tailv)-icut l := len(δtail.tailv)-icut
tailv := make([]CommitEvent, l) tailv := make([]δRevEntry, l)
copy(tailv, δtail.tailv[icut:]) copy(tailv, δtail.tailv[icut:])
δtail.tailv = tailv δtail.tailv = tailv
} }
......
...@@ -48,14 +48,20 @@ func TestΔTail(t *testing.T) { ...@@ -48,14 +48,20 @@ func TestΔTail(t *testing.T) {
} }
} }
// XXX Len
if h := δtail.Head(); h != head { if h := δtail.Head(); h != head {
t.Fatalf("Head() -> %s ; want %s", h, head) t.Fatalf("Head() -> %s ; want %s", h, head)
} }
// XXX Tail
if !tailvEqual(δtail.tailv, tailv) { if !tailvEqual(δtail.tailv, tailv) {
t.Fatalf("tailv:\nhave: %v\nwant: %v", δtail.tailv, tailv) t.Fatalf("tailv:\nhave: %v\nwant: %v", δtail.tailv, tailv)
} }
// XXX verify SliceRevBy
// verify lastRevOf query / index // verify lastRevOf query / index
lastRevOf := make(map[Oid]Tid) lastRevOf := make(map[Oid]Tid)
for _, δ := range tailv { for _, δ := range tailv {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment