Commit 36ade85d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c97ea8f3
...@@ -301,8 +301,6 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -301,8 +301,6 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// XXX check db is aready down/closed // XXX check db is aready down/closed
txn := transaction.Current(ctx)
// find out db state we should open at // find out db state we should open at
at := opt.At at := opt.At
if at == 0 { if at == 0 {
...@@ -338,7 +336,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -338,7 +336,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn.resyncAndDBUnlock(txn, at) err = conn.resyncAndDBUnlock(ctx, at)
if err != nil {
// XXX release conn?
return nil, err
}
return conn, nil return conn, nil
} }
...@@ -424,7 +426,11 @@ retry: ...@@ -424,7 +426,11 @@ retry:
// - contrary to DB.Open, at cannot be 0. // - contrary to DB.Open, at cannot be 0.
// //
// Note: new at can be both higher and lower than previous connection at. // Note: new at can be both higher and lower than previous connection at.
func (conn *Connection) Resync(txn transaction.Transaction, at Tid) { //
// Note: if new at is already covered by DB.Head Resync will be non-blocking
// operation. However if at is > current DB.Head Resync, similarly to DB.Open,
// will block waiting for DB.Head to become ≥ at.
func (conn *Connection) Resync(ctx context.Context, at Tid) error {
if !conn.noPool { if !conn.noPool {
panic("Conn.Resync: connection was opened without NoPool flag") panic("Conn.Resync: connection was opened without NoPool flag")
} }
...@@ -433,14 +439,15 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) { ...@@ -433,14 +439,15 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) {
} }
conn.db.mu.Lock() conn.db.mu.Lock()
conn.resyncAndDBUnlock(txn, at) return conn.resyncAndDBUnlock(ctx, at)
} }
// resyncAndDBUnlock serves Connection.Resync and DB.Open . // resyncAndDBUnlock serves Connection.Resync and DB.Open .
// //
// must be called with conn.db locked and unlocks it at the end. // must be called with conn.db locked and unlocks it at the end.
func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) { func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) error {
db := conn.db db := conn.db
txn := transaction.Current(ctx)
if conn.txn != nil { if conn.txn != nil {
db.mu.Unlock() db.mu.Unlock()
...@@ -457,10 +464,13 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) { ...@@ -457,10 +464,13 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
// conn.at == at - nothing to do (even if out of δtail coverage) // conn.at == at - nothing to do (even if out of δtail coverage)
if conn.at == at { if conn.at == at {
db.mu.Unlock() db.mu.Unlock()
return return nil
} }
// XXX -> DB.deltaObj(at1, at2) // XXX -> DB.δobj(at1, at2)
// XXX first wait for db.stor.head to cover at (else if at is slightly
// not covered yet -> we'll hit δall case()
// conn.at != at - have to invalidate objects in live cache. // conn.at != at - have to invalidate objects in live cache.
δtail := db.δtail δtail := db.δtail
...@@ -500,7 +510,6 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) { ...@@ -500,7 +510,6 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
if δall { if δall {
// XXX keep synced with LiveCache details // XXX keep synced with LiveCache details
// XXX -> conn.cache.forEach? // XXX -> conn.cache.forEach?
// XXX should we wait for db.stor.head to cover at? FIXME openOrDBUnlock does this
// or leave this wait till .Load() time? // or leave this wait till .Load() time?
for _, wobj := range conn.cache.objtab { for _, wobj := range conn.cache.objtab {
obj, _ := wobj.Get().(IPersistent) obj, _ := wobj.Get().(IPersistent)
...@@ -518,7 +527,7 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) { ...@@ -518,7 +527,7 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
} }
// all done // all done
return return nil
} }
// get returns connection from db pool most close to at with conn.at ∈ [atMin, at]. // get returns connection from db pool most close to at with conn.at ∈ [atMin, at].
......
...@@ -293,7 +293,10 @@ func (t *tPersistentDB) Resync(at Tid) { ...@@ -293,7 +293,10 @@ func (t *tPersistentDB) Resync(at Tid) {
db := t.conn.db // XXX -> t.db ? db := t.conn.db // XXX -> t.db ?
txn, ctx := transaction.New(context.Background()) txn, ctx := transaction.New(context.Background())
t.conn.Resync(txn, at) err := t.conn.Resync(ctx, at)
if err != nil {
t.Fatalf("resync %s -> %s", at, err)
}
t.txn = txn t.txn = txn
t.ctx = ctx t.ctx = ctx
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment