Commit 0419e890 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1160f6b3
...@@ -22,13 +22,13 @@ package zodb ...@@ -22,13 +22,13 @@ package zodb
import ( import (
"context" "context"
"fmt"
"sort" "sort"
"sync" "sync"
"time" "time"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
"fmt"
) )
// DB represents a handle to database at application level and contains pool // DB represents a handle to database at application level and contains pool
...@@ -48,6 +48,10 @@ type DB struct { ...@@ -48,6 +48,10 @@ type DB struct {
stor IStorage stor IStorage
watchq chan Event // we are watching .stor via here watchq chan Event // we are watching .stor via here
down chan struct{} // ready when DB is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
downErr error // reason for shutdown
mu sync.Mutex mu sync.Mutex
// pool of unused connections. // pool of unused connections.
...@@ -122,6 +126,7 @@ func NewDB(stor IStorage) *DB { ...@@ -122,6 +126,7 @@ func NewDB(stor IStorage) *DB {
db := &DB{ db := &DB{
stor: stor, stor: stor,
watchq: make(chan Event), watchq: make(chan Event),
down: make(chan struct{}),
δwait: make(map[δwaiter]struct{}), δwait: make(map[δwaiter]struct{}),
tδkeep: 10*time.Minute, // see δtail discussion tδkeep: 10*time.Minute, // see δtail discussion
...@@ -134,19 +139,25 @@ func NewDB(stor IStorage) *DB { ...@@ -134,19 +139,25 @@ func NewDB(stor IStorage) *DB {
return db return db
} }
// XXX DB.shutdown(reason error) ? // shutdown mark db no longer operational due to reason.
//
// It serves both explicit Close, or shutdown triggered due to error received
// by watcher.
func (db *DB) shutdown(reason error) {
db.downOnce.Do(func() {
db.downErr = reason // XXX err ctx
close(db.down)
db.stor.DelWatch(db.watchq)
})
}
// Close closes database handle. // Close closes database handle.
// //
// After Close DB.Open calls will return error. However it is ok to continue // After Close DB.Open calls will return error. However it is ok to continue
// working with connections opened prior Close. // working with connections opened prior Close.
func (db *DB) Close() error { func (db *DB) Close() error {
db.mu.Lock() db.shutdown(fmt.Errorf("db is closed"))
defer db.mu.Unlock()
stor.DelWatch(db.watchq)
// XXX stub
return nil return nil
} }
...@@ -195,14 +206,23 @@ type δwaiter struct { ...@@ -195,14 +206,23 @@ type δwaiter struct {
// watcher receives events about new committed transactions and updates δtail. // watcher receives events about new committed transactions and updates δtail.
// //
// it also notifies δtail waiters. // it also notifies δtail waiters.
func (db *DB) watcher() { // XXX err ? func (db *DB) watcher() (err error) {
defer db.shutdown(err)
defer xerr.Contextf(&err, "db: watcher")
var event Event
var ok bool
for { for {
// XXX check for db.down select {
event, ok := <-db.watchq case <-db.down:
if !ok { // should be already shut down with concrete reason
fmt.Printf("db: watcher: close\n") return fmt.Errorf("db is down")
// XXX wake up all waiters?
return // closed case event, ok = <-db.watchq:
if !ok {
return fmt.Errorf("storage closed")
}
} }
//fmt.Printf("db: watcher <- %v\n", event) //fmt.Printf("db: watcher <- %v\n", event)
...@@ -213,8 +233,7 @@ func (db *DB) watcher() { // XXX err ? ...@@ -213,8 +233,7 @@ func (db *DB) watcher() { // XXX err ?
panic(fmt.Sprintf("unexepected event: %T", event)) panic(fmt.Sprintf("unexepected event: %T", event))
case *EventError: case *EventError:
fmt.Printf("db: watcher: error: %s\n", event.Err) return fmt.Errorf("error: %s", event.Err)
continue // XXX shutdown instead
case *EventCommit: case *EventCommit:
δ = event δ = event
...@@ -360,14 +379,17 @@ retry: ...@@ -360,14 +379,17 @@ retry:
db.mu.Unlock() db.mu.Unlock()
select { select {
case <-ctx.Done():
// leave db.mu unlocked
return nil, ctx.Err()
case <-δready: case <-δready:
// ok - δtail.head went over at; relock db and retry // ok - δtail.head went over at; relock db and retry
db.mu.Lock() db.mu.Lock()
continue retry continue retry
// on error leave db.mu unlocked
case <-ctx.Done():
return nil, ctx.Err()
case <-db.down:
return nil, db.downErr
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment