Commit 533f0c73 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: DB - application-level handle to database (very draft)

DB represents a handle to database at application level and contains pool
of connections. DB.Open opens database connection. The connection will be
automatically put back into DB pool for future reuse after corresponding
transaction is complete. DB thus provides service to maintain live objects
cache and reuse live objects from transaction to transaction.

Note that it is possible to have several DB handles to the same database.
This might be useful if application accesses distinctly different sets of
objects in different transactions and knows beforehand which set it will be
next time. Then, to avoid huge cache misses, it makes sense to keep DB
handles opened for every possible case of application access.

TODO handle invalidations.
parent c67ff9ea
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb/internal/weak" "lab.nexedi.com/kirr/neo/go/zodb/internal/weak"
) )
...@@ -38,8 +39,15 @@ import ( ...@@ -38,8 +39,15 @@ import (
// isolated from further database transactions. // isolated from further database transactions.
// //
// Connection is safe to access from multiple goroutines simultaneously. // Connection is safe to access from multiple goroutines simultaneously.
//
// Connection and objects obtained from it must be used by application only
// inside transaction where Connection was opened.
//
// Use DB.Open to open a connection.
type Connection struct { type Connection struct {
stor IStorage // underlying storage stor IStorage // underlying storage
db *DB // Connection is part of this DB
txn transaction.Transaction // opened under this txn; nil if idle in DB pool.
at Tid // current view of database; stable inside a transaction. at Tid // current view of database; stable inside a transaction.
// {} oid -> obj // {} oid -> obj
...@@ -112,6 +120,15 @@ type LiveCacheControl interface { ...@@ -112,6 +120,15 @@ type LiveCacheControl interface {
// ---------------------------------------- // ----------------------------------------
// newConnection creates new Connection associated with db.
func newConnection(db *DB, at Tid) *Connection {
return &Connection{
stor: db.stor,
db: db,
at: at,
objtab: make(map[Oid]*weak.Ref),
}
}
// wrongClassError is the error cause returned when ZODB object's class is not what was expected. // wrongClassError is the error cause returned when ZODB object's class is not what was expected.
type wrongClassError struct { type wrongClassError struct {
...@@ -165,6 +182,7 @@ func (conn *Connection) get(class string, oid Oid) (IPersistent, error) { ...@@ -165,6 +182,7 @@ func (conn *Connection) get(class string, oid Oid) (IPersistent, error) {
// The object's data is not necessarily loaded after Get returns. Use // The object's data is not necessarily loaded after Get returns. Use
// PActivate to make sure the object is fully loaded. // PActivate to make sure the object is fully loaded.
func (conn *Connection) Get(ctx context.Context, oid Oid) (_ IPersistent, err error) { func (conn *Connection) Get(ctx context.Context, oid Oid) (_ IPersistent, err error) {
conn.checkTxnCtx(ctx, "Get")
defer xerr.Contextf(&err, "Get %s", oid) defer xerr.Contextf(&err, "Get %s", oid)
conn.objmu.Lock() // XXX -> rlock? conn.objmu.Lock() // XXX -> rlock?
...@@ -203,5 +221,20 @@ func (conn *Connection) Get(ctx context.Context, oid Oid) (_ IPersistent, err er ...@@ -203,5 +221,20 @@ func (conn *Connection) Get(ctx context.Context, oid Oid) (_ IPersistent, err er
// load loads object specified by oid. // load loads object specified by oid.
func (conn *Connection) load(ctx context.Context, oid Oid) (_ *mem.Buf, serial Tid, _ error) { func (conn *Connection) load(ctx context.Context, oid Oid) (_ *mem.Buf, serial Tid, _ error) {
conn.checkTxnCtx(ctx, "load")
return conn.stor.Load(ctx, Xid{Oid: oid, At: conn.at}) return conn.stor.Load(ctx, Xid{Oid: oid, At: conn.at})
} }
// ----------------------------------------
// checkTxnCtx asserts that current transaction is the same as conn.txn .
func (conn *Connection) checkTxnCtx(ctx context.Context, who string) {
conn.checkTxn(transaction.Current(ctx), who)
}
// checkTxn asserts that specified "current" transaction is the same as conn.txn .
func (conn *Connection) checkTxn(txn transaction.Transaction, who string) {
if txn != conn.txn {
panic("connection: " + who + "current transaction is different from connection transaction")
}
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodb
// application-level database handle.
// TODO: handle invalidations
import (
"context"
"fmt"
"sort"
"sync"
"time"
"lab.nexedi.com/kirr/neo/go/transaction"
)
// DB represents a handle to database at application level and contains pool
// of connections. DB.Open opens database connection. The connection will be
// automatically put back into DB pool for future reuse after corresponding
// transaction is complete. DB thus provides service to maintain live objects
// cache and reuse live objects from transaction to transaction.
//
// Note that it is possible to have several DB handles to the same database.
// This might be useful if application accesses distinctly different sets of
// objects in different transactions and knows beforehand which set it will be
// next time. Then, to avoid huge cache misses, it makes sense to keep DB
// handles opened for every possible case of application access.
//
// DB is safe to access from multiple goroutines simultaneously.
type DB struct {
stor IStorage
mu sync.Mutex
connv []*Connection // order by ↑= .at
// information about invalidations
// XXX -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case)
invTab []invEntry // order by ↑= .tid
}
// invEntry describes invalidations caused by a database transaction.
type invEntry struct {
tid Tid
oidv []Oid
}
// NewDB creates new database handle.
func NewDB(stor IStorage) *DB {
// XXX db options?
return &DB{stor: stor}
}
// ConnOptions describes options to DB.Open .
type ConnOptions struct {
At Tid // if !0, open Connection bound to `at` view of database; not latest.
NoSync bool // don't sync with storage to get its last tid.
}
// Open opens new connection to the database.
//
// By default the connection is opened to current latest database state; opt.At
// can be specified to open connection bound to particular view of the database.
//
// Open must be called under transaction.
// Opened connection must be used only under the same transaction and only
// until that transaction is complete.
func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err error) {
defer func() {
if err == nil {
return
}
var argv []interface{}
if opt.At != 0 {
argv = append(argv, fmt.Sprintf("at=%s", opt.At))
}
if opt.NoSync {
argv = append(argv, "nosync")
}
err = &OpError{
URL: db.stor.URL(),
Op: "open db",
Args: argv,
Err: err,
}
}()
txn := transaction.Current(ctx)
at := opt.At
if at == 0 {
// XXX init head from current DB.head (head of .invTab)
var head Tid
var err error
// sync storage for lastTid
if !opt.NoSync {
head, err = db.stor.LastTid(ctx)
if err != nil {
return nil, err
}
}
at = head
}
// wait till .invTab is up to date covering ≥ lastTid
// XXX reenable
/*
err = db.invTab.Wait(ctx, at)
if err != nil {
return nil, err
}
*/
// now we have both at and invalidation data covering it -> proceed to
// get connection from the pool.
conn := db.get(at)
conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn))
return conn, nil
}
// get returns connection from db pool most close to at.
//
// it creates new one if there is no close-enough connection in the pool.
func (db *DB) get(at Tid) *Connection {
db.mu.Lock()
defer db.mu.Unlock()
l := len(db.connv)
// find connv index corresponding to at:
// [i-1].at ≤ at < [i].at
i := sort.Search(l, func(i int) bool {
return at < db.connv[i].at
})
// search through window of X previous connections and find out the one
// with minimal distance to get to state @at. If all connections are to
// distant - create connection anew.
//
// XXX search not only previous, but future too? (we can get back to
// past by invalidating what was later changed)
const X = 10 // XXX hardcoded
jδmin := -1
for j := i - X; j < i; j++ {
if j < 0 {
continue
}
// TODO search for max N(live) - N(live, that will need to be invalidated)
jδmin = j // XXX stub (using rightmost j)
}
// nothing found or too distant
const Tnear = 10*time.Minute // XXX hardcoded
if jδmin < 0 || tabs(δtid(at, db.connv[jδmin].at)) > Tnear {
return newConnection(db, at)
}
// reuse the connection
conn := db.connv[jδmin]
copy(db.connv[jδmin:], db.connv[jδmin+1:])
db.connv[l-1] = nil
db.connv = db.connv[:l-1]
if conn.db != db {
panic("DB.get: foreign connection in the pool")
}
if conn.txn != nil {
panic("DB.get: live connection in the pool")
}
if conn.at != at {
panic("DB.get: TODO: invalidations")
}
return conn
}
// put puts connection back into db pool.
func (db *DB) put(conn *Connection) {
if conn.db != db {
panic("DB.put: conn.db != db")
}
conn.txn = nil
db.mu.Lock()
defer db.mu.Unlock()
// XXX check if len(connv) > X, and drop conn if yes
// [i-1].at ≤ at < [i].at
i := sort.Search(len(db.connv), func(i int) bool {
return conn.at < db.connv[i].at
})
//db.connv = append(db.connv[:i], conn, db.connv[i:]...)
db.connv = append(db.connv, nil)
copy(db.connv[i+1:], db.connv[i:])
db.connv[i] = conn
// XXX GC too idle connections here?
}
// ---- txn sync ----
type connTxnSync Connection // hide from public API
func (csync *connTxnSync) BeforeCompletion(txn transaction.Transaction) {
conn := (*Connection)(csync)
conn.checkTxn(txn, "BeforeCompletion")
// nothing
}
// AfterCompletion puts conn back into db pool after transaction is complete.
func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) {
conn := (*Connection)(csync)
conn.checkTxn(txn, "AfterCompletion")
// XXX check that conn was explicitly closed by user?
conn.db.put(conn)
}
...@@ -77,3 +77,20 @@ func (tid Tid) Time() TimeStamp { ...@@ -77,3 +77,20 @@ func (tid Tid) Time() TimeStamp {
// TODO TidFromTime() // TODO TidFromTime()
// TODO TidFromTimeStamp() // TODO TidFromTimeStamp()
// TODO TidForNow() ? // TODO TidForNow() ?
// δtid returns distance from tid1 to tid2 in term of time.
//
// it can be thought as (tid2 - tid1).
func δtid(tid1, tid2 Tid) time.Duration {
d := tid2.Time().Sub(tid1.Time().Time)
return d
}
// tabs returns abs value of time.Duration .
func tabs(δt time.Duration) time.Duration {
if δt < 0 {
δt = -δt
}
return δt
}
...@@ -95,8 +95,8 @@ ...@@ -95,8 +95,8 @@
// For MyObject to implement IPersistent it must embed Persistent type. // For MyObject to implement IPersistent it must embed Persistent type.
// MyObject also has to register itself to persistency machinery with RegisterClass. // MyObject also has to register itself to persistency machinery with RegisterClass.
// //
// In-RAM application objects are handled in groups. // In-RAM application objects are handled in groups. During the scope of
// A group corresponds to particular // corresponding in-progress transaction(*), a group corresponds to particular
// view of the database (at) and has isolation guarantee from further database // view of the database (at) and has isolation guarantee from further database
// transactions, and from in-progress changes to in-RAM objects in other // transactions, and from in-progress changes to in-RAM objects in other
// groups. // groups.
...@@ -114,13 +114,26 @@ ...@@ -114,13 +114,26 @@
// application objects isolated for modifications is represented by Connection. // application objects isolated for modifications is represented by Connection.
// Connection is also sometimes called a "jar" in ZODB terminology. // Connection is also sometimes called a "jar" in ZODB terminology.
// //
// DB represents a handle to database at application level and contains pool
// of connections. DB.Open opens database connection. The connection will be
// automatically put back into DB pool for future reuse after corresponding
// transaction is complete. DB thus provides service to maintain live objects
// cache and reuse live objects from transaction to transaction.
// //
// Both Connection and object activation protocol is safe to access from // Note that it is possible to have several DB handles to the same database.
// This might be useful if application accesses distinctly different sets of
// objects in different transactions and knows beforehand which set it will be
// next time. Then, to avoid huge live cache misses, it makes sense to keep DB
// handles opened for every possible case of application access.
//
//
// All DB, Connection and object activation protocol is safe to access from
// multiple goroutines simultaneously. // multiple goroutines simultaneously.
// //
// //
// -------- // --------
// //
// (*) see package lab.nexedi.com/kirr/neo/go/transaction.
// (+) if both objects are from the same database. // (+) if both objects are from the same database.
// //
// Python data // Python data
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment