Commit 918455e7 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: DB: handle invalidations

Implement invalidation handling to teach DB to reuse Connections from
connection pool: after connection is returned to the pool on transaction
completion, we can use this connection for next DB.Open(at) request, by
seeing which objects we changed in conn.at..at range and invalidating
those objects in connection live cache.

To know which objects were changed, DB adds watch on its storage and
maintains some history tail (using ΔTail - see previous commit).

Finally add test for both DB and Connection, and also for Persistent,
LiveCache, ... - as all those application-level components are tightly
inter-related.
parent d8e9d7a9
...@@ -20,14 +20,14 @@ ...@@ -20,14 +20,14 @@
package zodb package zodb
// application-level database handle. // application-level database handle.
// TODO: handle invalidations
import ( import (
"context" "context"
"fmt"
"sort" "sort"
"sync" "sync"
"time" "time"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
) )
...@@ -46,21 +46,75 @@ import ( ...@@ -46,21 +46,75 @@ import (
// DB is safe to access from multiple goroutines simultaneously. // DB is safe to access from multiple goroutines simultaneously.
type DB struct { type DB struct {
stor IStorage stor IStorage
watchq chan Event // we are watching .stor via here
down chan struct{} // ready when DB is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
downErr error // reason for shutdown
mu sync.Mutex mu sync.Mutex
// pool of unused connections. // pool of unused connections.
//
// On open(at) live cache is reused through finding conn with nearby
// .at and invalidating live objects based on δtail info.
//
// not all connections here have δtail coverage.
pool []*Connection // order by ↑= .at pool []*Connection // order by ↑= .at
// information about invalidations // δtail of database changes.
// XXX -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case) //
invTab []invEntry // order by ↑= .tid // Used for live cache invalidations on open with at close to current
} // storage head. δtail coverage is maintained based on the following:
//
// 1) if open(at) is _far_away_ from head - it is _unlikely_ for
// opened connection to be later propagated towards head.
//
// 2) if open(at) is _close_ to head - it is _possible_ for
// opened connection to be later propagated towards head.
//
// For "1" we don't need δtail coverage; for "2" probability that
// it would make sense for connection to be advanced decreases the
// longer the connection stays opened. Thus the following 2 factors
// affect whether it makes sense to keep δtail coverage for a
// connection:
//
// |at - δhead(when_open)| ΔTnext - avg. time between transactions
// heady = ────────────────────── at - connection opened for this state
// ΔTnext δhead(when_open) - δtail.Head when connection was opened
// Twork(conn) - time the connection is used
// Twork(conn)
// lwork = ───────────
// ΔTnext
//
// if heady >> 1 - it is case "1" and δtail coverage is not needed.
// if heady ~ 1 - it is case "2" and δtail coverage might be needed depending on lwork.
// if lwork >> 1 - the number of objects that will need to be invalidated
// when updating conn to current head grows to ~ 100% of
// connection's live cache. It thus does not make
// sense to keep δtail past some reasonable time.
//
// A good system would monitor both ΔTnext, and lwork for connections
// with small heady, and adjust δtail cut time as e.g.
//
// timelen(δtail) = 3·lwork·ΔTnext
//
//
// FIXME for now we just fix
//
// Tδkeep = 10min
//
// and keep δtail coverage for Tδkeep time
//
// timelen(δtail) = Tδkeep
δtail *ΔTail // [](rev↑, []oid)
tδkeep time.Duration
// invEntry describes invalidations caused by a database transaction. // waiters for δtail.Head to become ≥ their at.
type invEntry struct { hwait map[hwaiter]struct{} // set{(at, ready)}
tid Tid
oidv []Oid // XXX δtail/hwait -> Storage or -> Cache?
// (so it is not duplicated many times for many DB case)
} }
...@@ -69,11 +123,41 @@ type invEntry struct { ...@@ -69,11 +123,41 @@ type invEntry struct {
// Created database handle must be closed when no longer needed. // Created database handle must be closed when no longer needed.
func NewDB(stor IStorage) *DB { func NewDB(stor IStorage) *DB {
// XXX db options? // XXX db options?
return &DB{stor: stor} db := &DB{
stor: stor,
watchq: make(chan Event),
down: make(chan struct{}),
hwait: make(map[hwaiter]struct{}),
tδkeep: 10*time.Minute, // see δtail discussion
}
at0 := stor.AddWatch(db.watchq)
db.δtail = NewΔTail(at0) // init to (at0, at0]
go db.watcher()
return db
}
// shutdown marks db no longer operational due to reason.
//
// It serves both either explicit Close, or shutdown triggered due to error
// received by watcher. Only the first shutdown call has the effect.
func (db *DB) shutdown(reason error) {
db.downOnce.Do(func() {
db.downErr = reason
close(db.down)
db.stor.DelWatch(db.watchq)
})
} }
// Close closes database handle. // Close closes database handle.
//
// After Close DB.Open calls will return error. However it is ok to continue
// to use connections opened prior to Close.
func (db *DB) Close() error { func (db *DB) Close() error {
db.shutdown(fmt.Errorf("db is closed"))
return nil return nil
} }
...@@ -107,6 +191,125 @@ func (opt *ConnOptions) String() string { ...@@ -107,6 +191,125 @@ func (opt *ConnOptions) String() string {
return s return s
} }
// watcher receives events about new committed transactions and updates δtail.
//
// It also notifies δtail waiters.
//
// The watcher stops when it sees either the storage being closed or an error.
// The DB is shutdown on exit.
func (db *DB) watcher() (err error) {
defer func() {
//fmt.Printf("db: watcher: exit: %s\n", err)
xerr.Contextf(&err, "db: watcher")
db.shutdown(err)
}()
var event Event
var ok bool
for {
select {
case <-db.down:
// db is already shut down with concrete reason
return fmt.Errorf("db is down")
case event, ok = <-db.watchq:
if !ok {
return fmt.Errorf("storage closed")
}
}
//fmt.Printf("db: watcher <- %v\n", event)
var δ *EventCommit
switch event := event.(type) {
default:
panic(fmt.Sprintf("unexepected event: %T", event))
case *EventError:
return fmt.Errorf("error: %s", event.Err)
case *EventCommit:
δ = event
}
var readyv []chan struct{} // waiters that become ready
db.mu.Lock()
db.δtail.Append(δ.Tid, δ.Changev)
for w := range db.hwait {
if w.at <= δ.Tid {
readyv = append(readyv, w.ready)
delete(db.hwait, w)
}
}
// forget older δtail entries
tcut := db.δtail.Head().Time().Add(-db.tδkeep)
δcut := TidFromTime(tcut)
//fmt.Printf("db: watcher: δtail: = (%s, %s]\n", db.δtail.Tail(), db.δtail.Head())
//fmt.Printf("db: watcher: forget <= %s\n", δcut)
db.δtail.ForgetPast(δcut)
//fmt.Printf("db: watcher: δtail: -> (%s, %s]\n", db.δtail.Tail(), db.δtail.Head())
db.mu.Unlock()
// wakeup waiters outside of db.mu
for _, ready := range readyv {
//fmt.Printf("db: watcher: wakeup %v\n", ready)
close(ready)
}
}
}
// hwaiter represents someone waiting for δtail.Head to become ≥ at.
type hwaiter struct {
at Tid
ready chan struct{}
}
// headWait waits till db.Head becomes ≥ at.
//
// It returns error either if db is down or ctx is canceled.
//
// Must be called db.mu released.
func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
defer xerr.Contextf(&err, "wait head ≥ %s", at)
// precheck if db is already down -> error even if at is under coverage
if ready(db.down) {
return db.downErr
}
db.mu.Lock()
// we already have the coverage
if at <= db.δtail.Head() {
db.mu.Unlock()
return nil
}
// we have some δtail coverage, but at is ahead of that.
// wait till δtail.head is up to date covering ≥ at.
δready := make(chan struct{})
db.hwait[hwaiter{at, δready}] = struct{}{}
db.mu.Unlock()
select {
case <-δready:
// ok - δtail.head went over at
return nil
case <-ctx.Done():
return ctx.Err()
case <-db.down:
return db.downErr
}
}
// Open opens new connection to the database. // Open opens new connection to the database.
// //
// By default the connection is opened to current latest database state; opt.At // By default the connection is opened to current latest database state; opt.At
...@@ -129,50 +332,188 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -129,50 +332,188 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
}() }()
txn := transaction.Current(ctx) // don't bother to sync to storage if db is down
if ready(db.down) {
return nil, db.downErr
}
// find out db state we should open at
at := opt.At at := opt.At
if at == 0 { if at == 0 {
// XXX init head from current DB.head (head of .invTab) if opt.NoSync {
var head Tid db.mu.Lock()
var err error at = db.δtail.Head()
db.mu.Unlock()
} else {
// sync storage for lastTid // sync storage for lastTid
if !opt.NoSync { var err error
head, err = db.stor.LastTid(ctx) at, err = db.stor.LastTid(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
at = head
} }
// wait till .invTab is up to date covering ≥ lastTid // wait for db.Head ≥ at
// XXX reenable err = db.headWait(ctx, at)
/*
err = db.invTab.Wait(ctx, at)
if err != nil { if err != nil {
return nil, err return nil, err
} }
*/
// now we have both at and invalidation data covering it -> proceed to
// get connection from the pool.
conn := db.get(at)
conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn))
// open(at)
conn := db.open(at)
conn.resync(ctx, at)
return conn, nil return conn, nil
} }
// get returns connection from db pool most close to at. // open is internal worker for Open.
// //
// it creates new one if there is no close-enough connection in the pool. // It returns either new connection, or connection from the pool.
func (db *DB) get(at Tid) *Connection { // Returned connection does not generally have .at=at, and have to go through .resync().
//
// Must be called with at ≤ db.Head .
// Must be called with db.mu released.
func (db *DB) open(at Tid) *Connection {
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
δtail := db.δtail
//fmt.Printf("db.open @%s\t; δtail (%s, %s]\n", at, δtail.Tail(), δtail.Head())
// at should be ≤ head (caller waited for it before invoking us)
if head := δtail.Head(); at > head {
panic(fmt.Sprintf("open: at (%s) > head (%s)", at, head))
}
// check if we already have an exact match
conn := db.get(at, at)
if conn != nil {
return conn
}
// no exact match - let's try to find nearest
// too far in the past, and we know there is no exact match
// -> new historic connection.
if at <= δtail.Tail() {
return newConnection(db, at)
}
// at ∈ (δtail, δhead] ; try to get nearby idle connection or make a new one
//
// note: we are ok to get conn with .at = δtail.Tail inclusive, because
// we need only later transactions to invalidate conn cache, and data
// about later transactions is present in δtail.
conn = db.get(δtail.Tail(), at)
if conn == nil {
conn = newConnection(db, at)
}
return conn
}
// resync serves DB.Open .
//
// Must be called with at ≤ conn.db.Head .
// Must be called with conn.db released.
func (conn *Connection) resync(ctx context.Context, at Tid) {
txn := transaction.Current(ctx)
conn.resync1(at)
// upon exit, with all locks released, register conn to txn.
conn.at = at
conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn))
}
// resync1 serves resync.
//
// it computes δ(conn.at, at) and invalidates objects ∈ δ in conn cache.
func (conn *Connection) resync1(at Tid) {
if conn.txn != nil {
panic("Conn.resync: previous transaction is not yet complete")
}
db := conn.db
db.mu.Lock()
// at should be ≤ head (caller waited for it before invoking us)
if head := db.δtail.Head(); at > head {
db.mu.Unlock()
panic(fmt.Sprintf("resync: at (%s) > head (%s)", at, head))
}
// conn.at == at - nothing to do (even if out of δtail coverage)
if conn.at == at {
db.mu.Unlock()
return
}
// conn.at != at - have to invalidate objects in live cache.
δtail := db.δtail
δobj := make(map[Oid]struct{}) // set(oid) - what to invalidate
δall := false // if we have to invalidate all objects
// both conn.at and at are covered by δtail - we can invalidate selectively
if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) &&
(δtail.Tail() < at && at <= δtail.Head()) {
var δv []ΔRevEntry
if conn.at <= at {
δv = δtail.SliceByRev(conn.at, at)
} else {
// at < conn.at
δv = δtail.SliceByRev(at-1, conn.at-1)
}
for _, δ := range δv {
for _, oid := range δ.Changev {
δobj[oid] = struct{}{}
}
}
// some of conn.at or at is outside δtail coverage - invalidate all
// objects, but keep the objects present in live cache.
} else {
δall = true
}
// unlock db before locking cache and txn
db.mu.Unlock()
conn.cache.Lock()
defer conn.cache.Unlock()
if δall {
// XXX keep synced with LiveCache details
// XXX -> conn.cache.forEach?
for _, wobj := range conn.cache.objtab {
obj, _ := wobj.Get().(IPersistent)
if obj != nil {
obj.PInvalidate()
}
}
} else {
for oid := range δobj {
obj := conn.cache.Get(oid)
if obj != nil {
obj.PInvalidate()
}
}
}
// all done
return
}
// get returns connection from db pool most close to at with conn.at ∈ [atMin, at].
//
// If there is no such connection in the pool - nil is returned.
// Must be called with db.mu locked.
//
// Note: atMin is inclusive, because even if we get conn with .at = δtail.Tail,
// we still can use δtail data to invalidate conn cache with followup transactions.
func (db *DB) get(atMin, at Tid) *Connection {
l := len(db.pool) l := len(db.pool)
// find pool index corresponding to at: // find pool index corresponding to at:
...@@ -181,30 +522,38 @@ func (db *DB) get(at Tid) *Connection { ...@@ -181,30 +522,38 @@ func (db *DB) get(at Tid) *Connection {
return at < db.pool[i].at return at < db.pool[i].at
}) })
//fmt.Printf("pool:\n")
//for i := 0; i < l; i++ {
// fmt.Printf("\t[%d]: .at = %s\n", i, db.pool[i].at)
//}
//fmt.Printf("get [%s, %s] -> %d\n", atMin, at, i)
// search through window of X previous connections and find out the one // search through window of X previous connections and find out the one
// with minimal distance to get to state @at. If all connections are to // with minimal distance to get to state @at that fits into requested range.
// distant - create connection anew.
// //
// XXX search not only previous, but future too? (we can get back to // XXX search not only previous, but future too? (we can get back to
// past by invalidating what was later changed) // past by invalidating what was later changed) (but likely it will
const X = 10 // XXX hardcoded // hurt by destroying cache of more recent connection).
const X = 10 // XXX search window size: hardcoded
jδmin := -1 jδmin := -1
for j := i - X; j < i; j++ { for j := i - X; j < i; j++ {
if j < 0 { if j < 0 {
continue continue
} }
if db.pool[j].at < atMin {
continue
}
// TODO search for max N(live) - N(live, that will need to be invalidated) // TODO search for max N(live) - N(live, that will need to be invalidated)
jδmin = j // XXX stub (using rightmost j) jδmin = j // XXX stub (using rightmost j)
} }
// nothing found or too distant // nothing found
const Tnear = 10*time.Minute // XXX hardcoded if jδmin < 0 {
if jδmin < 0 || tabs(δtid(at, db.pool[jδmin].at)) > Tnear { return nil
return newConnection(db, at)
} }
// reuse the connection // found - reuse the connection
conn := db.pool[jδmin] conn := db.pool[jδmin]
copy(db.pool[jδmin:], db.pool[jδmin+1:]) copy(db.pool[jδmin:], db.pool[jδmin+1:])
db.pool[l-1] = nil db.pool[l-1] = nil
...@@ -217,10 +566,6 @@ func (db *DB) get(at Tid) *Connection { ...@@ -217,10 +566,6 @@ func (db *DB) get(at Tid) *Connection {
panic("DB.get: live connection in the pool") panic("DB.get: live connection in the pool")
} }
if conn.at != at {
panic("DB.get: TODO: invalidations")
}
return conn return conn
} }
...@@ -230,12 +575,11 @@ func (db *DB) put(conn *Connection) { ...@@ -230,12 +575,11 @@ func (db *DB) put(conn *Connection) {
panic("DB.put: conn.db != db") panic("DB.put: conn.db != db")
} }
conn.txn = nil
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
// XXX check if len(pool) > X, and drop conn if yes // XXX check if len(pool) > X, and drop conn if yes
// [i-1].at ≤ at < [i].at // [i-1].at ≤ at < [i].at
i := sort.Search(len(db.pool), func(i int) bool { i := sort.Search(len(db.pool), func(i int) bool {
return conn.at < db.pool[i].at return conn.at < db.pool[i].at
...@@ -246,7 +590,7 @@ func (db *DB) put(conn *Connection) { ...@@ -246,7 +590,7 @@ func (db *DB) put(conn *Connection) {
copy(db.pool[i+1:], db.pool[i:]) copy(db.pool[i+1:], db.pool[i:])
db.pool[i] = conn db.pool[i] = conn
// XXX GC too idle connections here? // TODO GC too idle connections here
} }
// ---- txn sync ---- // ---- txn sync ----
...@@ -264,7 +608,8 @@ func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) { ...@@ -264,7 +608,8 @@ func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) {
conn := (*Connection)(csync) conn := (*Connection)(csync)
conn.checkTxn(txn, "AfterCompletion") conn.checkTxn(txn, "AfterCompletion")
// XXX check that conn was explicitly closed by user? // mark the connection as no longer being live
conn.txn = nil
conn.db.put(conn) conn.db.put(conn)
} }
...@@ -20,12 +20,18 @@ ...@@ -20,12 +20,18 @@
package zodb package zodb
import ( import (
"context"
"fmt" "fmt"
"io/ioutil"
"os"
"reflect" "reflect"
"testing" "testing"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"github.com/stretchr/testify/require" assert "github.com/stretchr/testify/require"
) )
// test Persistent type. // test Persistent type.
...@@ -35,6 +41,10 @@ type MyObject struct { ...@@ -35,6 +41,10 @@ type MyObject struct {
value string value string
} }
func NewMyObject(jar *Connection) *MyObject {
return NewPersistent(reflect.TypeOf(MyObject{}), jar).(*MyObject)
}
type myObjectState MyObject type myObjectState MyObject
func (o *myObjectState) DropState() { func (o *myObjectState) DropState() {
...@@ -122,9 +132,9 @@ func tCheckObj(t testing.TB) func(IPersistent, *Connection, Oid, Tid, ObjectStat ...@@ -122,9 +132,9 @@ func tCheckObj(t testing.TB) func(IPersistent, *Connection, Oid, Tid, ObjectStat
} }
} }
// basic Persistent tests without storage.
func TestPersistent(t *testing.T) { func TestPersistentBasic(t *testing.T) {
assert := require.New(t) assert := assert.New(t)
checkObj := tCheckObj(t) checkObj := tCheckObj(t)
// unknown type -> Broken // unknown type -> Broken
...@@ -179,11 +189,292 @@ func TestPersistent(t *testing.T) { ...@@ -179,11 +189,292 @@ func TestPersistent(t *testing.T) {
obj.PDeactivate() obj.PDeactivate()
}() }()
}
// ---- TestPersistentDB ----
// zcacheControl is simple live cache control that prevents specified objects
// to be evicted from live cache.
type zcacheControl struct {
keep []Oid // objects that must not be evicted
}
func (cc *zcacheControl) WantEvict(obj IPersistent) bool {
for _, oid := range cc.keep {
if obj.POid() == oid {
return false
}
}
return true
}
// tPersistentDB represents one testing environment inside TestPersistentDB.
type tPersistentDB struct {
*testing.T
// a transaction and DB connection opened under it
txn transaction.Transaction
ctx context.Context
conn *Connection
}
// Get gets oid from t.conn and asserts its type.
func (t *tPersistentDB) Get(oid Oid) *MyObject {
t.Helper()
xobj, err := t.conn.Get(t.ctx, oid)
if err != nil {
t.Fatal(err)
}
zclass := ClassOf(xobj)
zmy := "t.zodb.MyObject"
if zclass != zmy {
t.Fatalf("get %d: got %s; want %s", oid, zclass, zmy)
}
return xobj.(*MyObject)
}
// PActivate activates obj in t environment.
func (t *tPersistentDB) PActivate(obj IPersistent) {
t.Helper()
err := obj.PActivate(t.ctx)
if err != nil {
t.Fatal(err)
}
}
// checkObj checks state of obj and that obj ∈ t.conn.
//
// if object is !GHOST - it also verifies its value.
func (t *tPersistentDB) checkObj(obj *MyObject, oid Oid, serial Tid, state ObjectState, refcnt int32, valueOk ...string) {
t.Helper()
// any object with live pointer to it must be also in conn's cache.
cache := t.conn.Cache()
cache.Lock()
connObj := cache.Get(oid)
cache.Unlock()
if obj != connObj {
t.Fatalf("cache.get %s -> not same object:\nhave: %#v\nwant: %#v", oid, connObj, oid)
}
// and conn.Get must return exactly obj.
connObj, err := t.conn.Get(t.ctx, oid)
if err != nil {
t.Fatal(err)
}
if obj != connObj {
t.Fatalf("conn.get %s -> not same object:\nhave: %#v\nwant: %#v", oid, connObj, oid)
}
checkObj(t.T, obj, t.conn, oid, serial, state, refcnt)
if state == GHOST {
if len(valueOk) != 0 {
panic("t.checkObj(GHOST) must come without value")
}
return
}
if len(valueOk) != 1 {
panic("t.checkObj(!GHOST) must come with one value")
}
value := valueOk[0]
if obj.value != value {
t.Fatalf("obj.value mismatch: have %q; want %q", obj.value, value)
}
}
// Abort aborts t's connection and verifies it becomes !live.
func (t *tPersistentDB) Abort() {
t.Helper()
assert.Same(t, t.conn.txn, t.txn)
t.txn.Abort()
assert.Equal(t, t.conn.txn, nil)
}
// Persistent tests with storage.
//
// this test covers everything at application-level: Persistent, DB, Connection, LiveCache.
func TestPersistentDB(t *testing.T) {
// perform tests without and with raw data cache.
// (rawcache=y verifies how raw cache handles invalidations)
t.Run("rawcache=n", func(t *testing.T) { testPersistentDB(t, false) })
t.Run("rawcache=y", func(t *testing.T) { testPersistentDB(t, true) })
}
func testPersistentDB(t0 *testing.T, rawcache bool) {
X := exc.Raiseif
assert := assert.New(t0)
work, err := ioutil.TempDir("", "t-persistent"); X(err)
defer func() {
err := os.RemoveAll(work); X(err)
}()
zurl := work + "/1.fs"
// create test db via py with 2 objects
// XXX hack as _objX go without jar.
_obj1 := NewMyObject(nil); _obj1.oid = 101; _obj1.value = "init"
_obj2 := NewMyObject(nil); _obj2.oid = 102; _obj2.value = "db"
at0, err := ZPyCommit(zurl, 0, _obj1, _obj2); X(err)
_obj1.value = "hello"
_obj2.value = "world"
at1, err := ZPyCommit(zurl, at0, _obj1, _obj2); X(err)
// open connection to it via zodb/go
ctx := context.Background()
stor, err := OpenStorage(ctx, zurl, &OpenOptions{ReadOnly: true, NoCache: !rawcache}); X(err)
db := NewDB(stor)
defer func() {
err := db.Close(); X(err)
}()
// testopen opens new db transaction/connection and wraps it with tPersistentDB.
testopen := func(opt *ConnOptions) *tPersistentDB {
t0.Helper()
txn, ctx := transaction.New(context.Background())
conn, err := db.Open(ctx, opt); X(err)
assert.Same(conn.db, db)
assert.Same(conn.txn, txn)
return &tPersistentDB{
T: t0,
txn: txn,
ctx: ctx,
conn: conn,
}
}
// TODO activate - jar has to load, state changes t1 := testopen(&ConnOptions{})
// TODO activate again - refcnt++ t := t1
// TODO deactivate - refcnt-- assert.Equal(t.conn.At(), at1)
// TODO deactivate - state dropped assert.Equal(db.pool, []*Connection(nil))
// δtail coverage is (at1, at1] (at0 not included)
assert.Equal(db.δtail.Tail(), at1)
assert.Equal(db.δtail.Head(), at1)
// do not evict obj2 from live cache. obj1 is ok to be evicted.
zcache1 := t.conn.Cache()
zcache1.Lock()
zcache1.SetControl(&zcacheControl{[]Oid{_obj2.oid}})
zcache1.Unlock()
// get objects
obj1 := t.Get(101)
obj2 := t.Get(102)
t.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t.checkObj(obj2, 102, InvalidTid, GHOST, 0)
// activate: jar has to load, state changes -> uptodate
t.PActivate(obj1)
t.PActivate(obj2)
t.checkObj(obj1, 101, at1, UPTODATE, 1, "hello")
t.checkObj(obj2, 102, at1, UPTODATE, 1, "world")
// activate again: refcnt++
t.PActivate(obj1)
t.PActivate(obj2)
t.checkObj(obj1, 101, at1, UPTODATE, 2, "hello")
t.checkObj(obj2, 102, at1, UPTODATE, 2, "world")
// deactivate: refcnt--
obj1.PDeactivate()
obj2.PDeactivate()
t.checkObj(obj1, 101, at1, UPTODATE, 1, "hello")
t.checkObj(obj2, 102, at1, UPTODATE, 1, "world")
// deactivate: state dropped for obj1, obj2 stays in live cache
obj1.PDeactivate()
obj2.PDeactivate()
t.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t.checkObj(obj2, 102, at1, UPTODATE, 0, "world")
// invalidate: obj2 state dropped
obj1.PInvalidate()
obj2.PInvalidate()
t.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t.checkObj(obj2, 102, InvalidTid, GHOST, 0)
// commit change to obj2 from external process
_obj2.value = "kitty"
at2, err := ZPyCommit(zurl, at1, _obj2); X(err)
// new db connection should see the change
t2 := testopen(&ConnOptions{})
assert.Equal(t2.conn.At(), at2)
assert.Equal(db.pool, []*Connection(nil))
// δtail coverage is (at1, at2]
assert.Equal(db.δtail.Tail(), at1)
assert.Equal(db.δtail.Head(), at2)
c2obj1 := t2.Get(101)
c2obj2 := t2.Get(102)
t2.checkObj(c2obj1, 101, InvalidTid, GHOST, 0)
t2.checkObj(c2obj2, 102, InvalidTid, GHOST, 0)
t2.PActivate(c2obj1)
t2.PActivate(c2obj2)
t2.checkObj(c2obj1, 101, at1, UPTODATE, 1, "hello")
t2.checkObj(c2obj2, 102, at2, UPTODATE, 1, "kitty")
c2obj1.PDeactivate()
c2obj2.PDeactivate()
// conn1 stays at older view for now
t1.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t1.checkObj(obj2, 102, InvalidTid, GHOST, 0)
t1.PActivate(obj1)
t1.PActivate(obj2)
t1.checkObj(obj1, 101, at1, UPTODATE, 1, "hello")
t1.checkObj(obj2, 102, at1, UPTODATE, 1, "world")
// conn1 deactivate: obj2 stays in conn1 live cache with old state
obj1.PDeactivate()
obj2.PDeactivate()
t1.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t1.checkObj(obj2, 102, at1, UPTODATE, 0, "world")
// txn1 completes - conn1 goes back to db pool
t1.Abort()
assert.Equal(db.pool, []*Connection{t1.conn})
// open new connection - it should be conn1 but at updated database view
t3 := testopen(&ConnOptions{})
assert.Same(t3.conn, t1.conn)
t = t3
assert.Equal(t.conn.At(), at2)
assert.Equal(db.pool, []*Connection{})
// obj2 should be invalidated
t.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t.checkObj(obj2, 102, InvalidTid, GHOST, 0)
// obj2 data should be new
t.PActivate(obj1);
t.PActivate(obj2);
t.checkObj(obj1, 101, at1, UPTODATE, 1, "hello")
t.checkObj(obj2, 102, at2, UPTODATE, 1, "kitty")
obj1.PDeactivate()
obj2.PDeactivate()
t.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t.checkObj(obj2, 102, at2, UPTODATE, 0, "kitty")
// finish tnx3 and txn2 - conn1 and conn2 go back to db pool
t.Abort()
t2.Abort()
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
} }
// TODO Map & List tests. // TODO Map & List tests.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment