Commit f5fb51db authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a35bc53b
...@@ -224,6 +224,8 @@ func (conn *Connection) load(ctx context.Context, oid Oid) (_ *mem.Buf, serial T ...@@ -224,6 +224,8 @@ func (conn *Connection) load(ctx context.Context, oid Oid) (_ *mem.Buf, serial T
// ---------------------------------------- // ----------------------------------------
// checkTxnCtx asserts that current transaction is the same as conn.txn . // checkTxnCtx asserts that current transaction is the same as conn.txn .
//
// XXX swap naming?
func (conn *Connection) checkTxnCtx(ctx context.Context, who string) { func (conn *Connection) checkTxnCtx(ctx context.Context, who string) {
conn.checkTxn(transaction.Current(ctx), who) conn.checkTxn(transaction.Current(ctx), who)
} }
......
...@@ -16,6 +16,7 @@ package zodb ...@@ -16,6 +16,7 @@ package zodb
import ( import (
"context" "context"
"sort"
"sync" "sync"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
...@@ -38,7 +39,16 @@ type DB struct { ...@@ -38,7 +39,16 @@ type DB struct {
stor IStorage stor IStorage
mu sync.Mutex mu sync.Mutex
connv []*Connection connv []*Connection // order by ↑= .at
// information about invalidations
invTab []invEntry // order by ↑= .tid
}
// invEntry describes invalidations caused by a database transaction.
type invEntry struct {
tid Tid
oidv []Oid
} }
...@@ -49,17 +59,17 @@ func NewDB(stor IStorage) *DB { ...@@ -49,17 +59,17 @@ func NewDB(stor IStorage) *DB {
return &DB{stor: stor} return &DB{stor: stor}
} }
// Open opens new connection to the database. // Open opens new connection to the database. XXX @lastTid
// //
// XXX must be called under transaction. // XXX must be called under transaction.
// XXX connectin must be used under the same transaction only.
// //
// XXX text // XXX text
func (db *DB) Open(ctx context.Context) *Connection { func (db *DB) Open(ctx context.Context) *Connection {
txn := transaction.Current(ctx) txn := transaction.Current(ctx)
conn := db.get() conn := db.get()
// XXX sync storage for lastTid -> conn.at // XXX sync storage for lastTid; process invalidations [conn.at, lastTid]
conn.txn = txn conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn)) txn.RegisterSync((*connTxnSync)(conn))
...@@ -95,14 +105,20 @@ func (db *DB) get() *Connection { ...@@ -95,14 +105,20 @@ func (db *DB) get() *Connection {
func (db *DB) put(conn *Connection) { func (db *DB) put(conn *Connection) {
// XXX assert conn.db == db // XXX assert conn.db == db
conn.txn = nil conn.txn = nil
//XXX want to do this; but also need to preserve info at..last_tid to process invalidations
//conn.at = 0
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
// XXX check if len(connv) > X, and drop conn if yes // XXX check if len(connv) > X, and drop conn if yes
db.connv = append(db.connv, conn) // [i-1].at ≤ at < [i].at
i := sort.Search(len(db.connv), func(i int) bool {
return conn.at < db.connv[i].at
})
//db.connv = append(db.connv[:i], conn, db.connv[i:]...)
db.connv = append(db.connv, nil)
copy(db.connv[i+1:], db.connv[i:])
db.connv[i] = conn
} }
// ---- txn sync ---- // ---- txn sync ----
...@@ -115,10 +131,10 @@ func (csync *connTxnSync) BeforeCompletion(txn transaction.Transaction) { ...@@ -115,10 +131,10 @@ func (csync *connTxnSync) BeforeCompletion(txn transaction.Transaction) {
// nothing // nothing
} }
// AfterCompletion puts conn back into db pool after transaction is complete.
func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) { func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) {
conn := (*Connection)(csync) conn := (*Connection)(csync)
conn.checkTxn(txn, "AfterCompletion") conn.checkTxn(txn, "AfterCompletion")
// put conn back into db pool.
conn.db.put(conn) conn.db.put(conn)
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment