Commit 80b82f0a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1cfa529e
...@@ -190,13 +190,13 @@ func (opt *ConnOptions) String() string { ...@@ -190,13 +190,13 @@ func (opt *ConnOptions) String() string {
s += ", " s += ", "
if opt.NoSync { if opt.NoSync {
s += "no" s += "!"
} }
s += "sync" s += "sync"
s += ", " s += ", "
if opt.NoPool { if opt.NoPool {
s += "no" s += "!"
} }
s += "pool" s += "pool"
...@@ -286,11 +286,16 @@ type hwaiter struct { ...@@ -286,11 +286,16 @@ type hwaiter struct {
// //
// Must be called db.mu released. // Must be called db.mu released.
// //
// XXX -> waitHead? // XXX -> waitHead? needHead? waitHeadAfter? ensureAt?
func (db *DB) headWait(ctx context.Context, at Tid) (err error) { func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
db.mu.Lock() db.mu.Lock()
// XXX check if db is already down -> error even if at is under coverage? // XXX check if db is already down -> error even if at is under coverage?
// XXX under mu - ok?
// XXX err ctx
if ready(db.down) {
return db.downErr
}
if at <= db.δtail.Head() { if at <= db.δtail.Head() {
db.mu.Unlock() db.mu.Unlock()
...@@ -343,12 +348,6 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -343,12 +348,6 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
}() }()
// check db is already down/closed
// XXX -> headWait?
if ready(db.down) {
return nil, db.downErr
}
// find out db state we should open at // find out db state we should open at
at := opt.At at := opt.At
if at == 0 { if at == 0 {
...@@ -357,6 +356,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -357,6 +356,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at = db.δtail.Head() at = db.δtail.Head()
db.mu.Unlock() db.mu.Unlock()
} else { } else {
// don't bother to sync to storage if db is down
if ready(db.down) {
return nil, db.downErr
}
// sync storage for lastTid // sync storage for lastTid
var err error var err error
...@@ -370,14 +374,13 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -370,14 +374,13 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
} }
// proceed to open(at)
// wait for db.Head ≥ at // wait for db.Head ≥ at
err = db.headWait(ctx, at) err = db.headWait(ctx, at)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// open(at)
conn := db.open(at, opt.NoPool) conn := db.open(at, opt.NoPool)
conn.resync(ctx, at) conn.resync(ctx, at)
return conn, nil return conn, nil
...@@ -462,13 +465,8 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error { ...@@ -462,13 +465,8 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error {
// XXX err ctx? ("resync @at -> @at") // XXX err ctx? ("resync @at -> @at")
db := conn.db
// XXX check db is already down/closed
// XXX or -> headWait?
// wait for db.Head ≥ at // wait for db.Head ≥ at
err := db.headWait(ctx, at) err := conn.db.headWait(ctx, at)
if err != nil { if err != nil {
return err return err
} }
...@@ -483,7 +481,6 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error { ...@@ -483,7 +481,6 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error {
// Must be called with conn.db released. // Must be called with conn.db released.
func (conn *Connection) resync(ctx context.Context, at Tid) { func (conn *Connection) resync(ctx context.Context, at Tid) {
txn := transaction.Current(ctx) txn := transaction.Current(ctx)
conn.resync1(at) conn.resync1(at)
// upon exit, with all locks released, register conn to txn. // upon exit, with all locks released, register conn to txn.
...@@ -493,6 +490,8 @@ func (conn *Connection) resync(ctx context.Context, at Tid) { ...@@ -493,6 +490,8 @@ func (conn *Connection) resync(ctx context.Context, at Tid) {
} }
// resync1 serves resync. // resync1 serves resync.
//
// it computes δ(conn.at, at) and invalidates objects ∈ δ in conn cache.
func (conn *Connection) resync1(at Tid) { func (conn *Connection) resync1(at Tid) {
if conn.txn != nil { if conn.txn != nil {
panic("Conn.resync: previous transaction is not yet complete") panic("Conn.resync: previous transaction is not yet complete")
...@@ -551,7 +550,6 @@ func (conn *Connection) resync1(at Tid) { ...@@ -551,7 +550,6 @@ func (conn *Connection) resync1(at Tid) {
if δall { if δall {
// XXX keep synced with LiveCache details // XXX keep synced with LiveCache details
// XXX -> conn.cache.forEach? // XXX -> conn.cache.forEach?
// or leave this wait till .Load() time?
for _, wobj := range conn.cache.objtab { for _, wobj := range conn.cache.objtab {
obj, _ := wobj.Get().(IPersistent) obj, _ := wobj.Get().(IPersistent)
if obj != nil { if obj != nil {
...@@ -575,8 +573,8 @@ func (conn *Connection) resync1(at Tid) { ...@@ -575,8 +573,8 @@ func (conn *Connection) resync1(at Tid) {
// //
// XXX recheck [atMin or (atMin -- see "= δtail.Tail" in resync. // XXX recheck [atMin or (atMin -- see "= δtail.Tail" in resync.
// //
// if there is no such connection in the pool - nil is returned. // If there is no such connection in the pool - nil is returned.
// must be called with db.mu locked. // Must be called with db.mu locked.
func (db *DB) get(atMin, at Tid) *Connection { func (db *DB) get(atMin, at Tid) *Connection {
l := len(db.pool) l := len(db.pool)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment