Commit 073b3f12 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f5fb51db
......@@ -18,6 +18,7 @@ import (
"context"
"sort"
"sync"
"time"
"lab.nexedi.com/kirr/neo/go/transaction"
)
......@@ -65,11 +66,15 @@ func NewDB(stor IStorage) *DB {
// XXX connectin must be used under the same transaction only.
//
// XXX text
//
// XXX +OpenAt ?
func (db *DB) Open(ctx context.Context) *Connection {
txn := transaction.Current(ctx)
conn := db.get()
// XXX sync storage for lastTid; process invalidations [conn.at, lastTid]
// XXX sync storage for lastTid
var lastTid Tid
// XXX wait till invTab.Head() >= lastTid
conn := db.get(lastTid)
conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn))
......@@ -77,12 +82,43 @@ func (db *DB) Open(ctx context.Context) *Connection {
return conn
}
// get returns connection from db pool, or creates new one if pool was empty.
func (db *DB) get() *Connection {
// get returns connection from db pool most close to at.
//
// it creates new one if there is no close-enough connection in the pool.
func (db *DB) get(at Tid) *Connection {
db.mu.Lock()
defer db.mu.Unlock()
// find connv index corresponding to at:
// [i-1].at ≤ at < [i].at
i := sort.Search(len(db.connv), func(i int) bool {
return at < db.connv[i].at
})
// search through window of X previous connections and find out the one
// with minimal distance to get to state @at. If all connections are to
// distant - create connection anew.
// XXX search not only previous, but future too? (we can get back to
// past by invalidating what was later changed)
const X = 10 // XXX hardcoded
jδmin := -1
for j := i - X; j < i; j++ {
if j < 0 {
continue
}
// TODO search for max N(live) - N(live, that will need to be invalidated)
jδmin = j // XXX stub
}
// nothing found or too distant
const Tnear = 10*time.Minute // XXX hardcoded
if jδmin < 0 || tabs(δtid(at, db.connv[jδmin].at)) > Tnear {
return &Connection{stor: db.stor, db: db}
}
var conn *Connection
if l := len(db.connv); l > 0 {
// pool is !empty - use latest closed conn.
......@@ -119,6 +155,8 @@ func (db *DB) put(conn *Connection) {
db.connv = append(db.connv, nil)
copy(db.connv[i+1:], db.connv[i:])
db.connv[i] = conn
// XXX GC too idle connections here?
}
// ---- txn sync ----
......
......@@ -77,3 +77,20 @@ func (tid Tid) Time() TimeStamp {
// TODO TidFromTime()
// TODO TidFromTimeStamp()
// TODO TidForNow() ?
// δtid returns distance from tid1 to tid2 in term of time.
//
// it can be thought as (tid2 - tid1).
func δtid(tid1, tid2 Tid) time.Duration {
d := tid2.Time().Sub(tid1.Time().Time)
return d
}
// tabs returns abs value of time.Duration .
func tabs(δt time.Duration) time.Duration {
if δt < 0 {
δt = -δt
}
return δt
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment