Commit ca095828 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2ba7c9bf
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
package zodb package zodb
// application-level database handle. // application-level database handle.
// TODO: handle invalidations
import ( import (
"context" "context"
"sort" "sort"
...@@ -50,23 +48,33 @@ type DB struct { ...@@ -50,23 +48,33 @@ type DB struct {
mu sync.Mutex mu sync.Mutex
connv []*Connection // order by ↑= .at connv []*Connection // order by ↑= .at
// information about invalidations
// XXX -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case) // XXX -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case)
// XXX -> ΔTail<tid, oid>
invTab []invEntry // order by ↑= .tid // δtail of database changes for invalidations
// min(rev) = min(conn.at) for all conn ∈ db (opened and in the pool)
δtail ΔTail // [](rev↑, []oid)
// openers waiting for δtail.Head to become covering their at.
δwait map[δwaiter]struct{} // set(at, ready)
} }
// invEntry describes invalidations caused by a database transaction. // δwaiter represents someone waiting for δtail.Head to become ≥ at.
type invEntry struct { // XXX place
tid Tid type δwaiter struct {
oidv []Oid at Tid
ready chan struct{}
} }
// NewDB creates new database handle. // NewDB creates new database handle.
func NewDB(stor IStorage) *DB { func NewDB(stor IStorage) *DB {
// XXX db options? // XXX db options?
return &DB{stor: stor} db := &DB{stor: stor}
watchq := make(chan CommitEvent)
stor.AddWatch(watchq)
// XXX DelWatch? in db.Close() ?
go db.watcher(watchq)
return db
} }
// ConnOptions describes options to DB.Open . // ConnOptions describes options to DB.Open .
...@@ -96,6 +104,36 @@ func (opt *ConnOptions) String() string { ...@@ -96,6 +104,36 @@ func (opt *ConnOptions) String() string {
return s return s
} }
// watcher receives events about new committed transactions and updates δtail.
//
// it also wakes up δtail waiters.
func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
for {
event, ok := <-watchq
if !ok {
// XXX wake up all waiters?
return // closed
}
var readyv []chan struct{} // waiters that become ready
db.mu.Lock()
db.δtail.Append(event.Tid, event.Changev)
for w := range db.δwait {
if w.at <= event.Tid {
readyv = append(readyv, w.ready)
delete(db.δwait, w)
}
}
db.mu.Unlock()
// wakeup waiters outside of db.mu
for _, ready := range readyv {
close(ready)
}
}
}
// Open opens new connection to the database. // Open opens new connection to the database.
// //
// By default the connection is opened to current latest database state; opt.At // By default the connection is opened to current latest database state; opt.At
...@@ -122,12 +160,19 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -122,12 +160,19 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at := opt.At at := opt.At
if at == 0 { if at == 0 {
// XXX init head from current DB.head (head of .invTab) head := zodb.Tid(0)
var head Tid
var err error
if opt.NoSync {
// XXX locking
// XXX prevent retrieved head to be removed from δtail
head = db.δtail.Head() // = 0 if empty
}
// !NoSync or δtail empty
// sync storage for lastTid // sync storage for lastTid
if !opt.NoSync { if head == 0 {
var err error
// XXX stor.LastTid returns last_tid storage itself // XXX stor.LastTid returns last_tid storage itself
// received on server, not last_tid on server. // received on server, not last_tid on server.
// -> add stor.Sync() ? // -> add stor.Sync() ?
...@@ -140,14 +185,26 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -140,14 +185,26 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at = head at = head
} }
// wait till .invTab is up to date covering ≥ lastTid // wait till .δtail.head is up to date covering ≥ at
// XXX reenable var δready chan struct{}
/* db.mu.Lock()
err = db.invTab.Wait(ctx, at) δhead := δtail.Head()
if err != nil { // XXX prevent head from going away?
return nil, err if δhead < at {
δready = make(chan struct{})
db.δwait[δwaiter{at, δready}] = struct{}
}
db.mu.Unlock()
if δready != nil {
select {
case <-ctx.Done():
return ctx.Err()
case <-δready:
// ok
}
} }
*/
// now we have both at and invalidation data covering it -> proceed to // now we have both at and invalidation data covering it -> proceed to
// get connection from the pool. // get connection from the pool.
......
...@@ -447,14 +447,16 @@ type Watcher interface { ...@@ -447,14 +447,16 @@ type Watcher interface {
// Once registered, watchq must be read. Not doing so will stuck whole storage. // Once registered, watchq must be read. Not doing so will stuck whole storage.
// //
// Multiple AddWatch calls with the same watchq register watchq only once. // Multiple AddWatch calls with the same watchq register watchq only once.
AddWatch(watchq chan CommitEvent) //
// XXX watchq closed when stor.watchq closed?
AddWatch(watchq chan<- CommitEvent)
// DelWatch unregisters watchq to be notified of database changes. // DelWatch unregisters watchq from being notified of database changes.
// //
// After DelWatch call completes, no new events will be sent to watchq. // After DelWatch call completes, no new events will be sent to watchq.
// //
// DelWatch is noop if watchq was not registered. // DelWatch is noop if watchq was not registered.
DelWatch(watchq chan CommitEvent) DelWatch(watchq chan<- CommitEvent)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment