Commit d432ae19 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: Rework watchq to also receive errors

This continues 4d2c8b1d (go/zodb: Require drivers to provide
notifications for database change events) and a6580062 (go/zodb: Require
drivers to provide at₀ on open) and reworks watchq to receive not only
commit events, but also errors from watcher in storage driver.
parent 2964364f
...@@ -49,9 +49,7 @@ type DriverOptions struct { ...@@ -49,9 +49,7 @@ type DriverOptions struct {
// //
// The storage driver will send only and all events in (at₀, +∞] range, // The storage driver will send only and all events in (at₀, +∞] range,
// where at₀ is at returned by driver open. // where at₀ is at returned by driver open.
// Watchq chan<- Event
// TODO extend watchq to also receive errors from watcher.
Watchq chan<- CommitEvent
} }
// DriverOpener is a function to open a storage driver. // DriverOpener is a function to open a storage driver.
......
...@@ -98,8 +98,8 @@ type FileStorage struct { ...@@ -98,8 +98,8 @@ type FileStorage struct {
txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty) txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty)
downErr error // !nil when the storage is no longer operational downErr error // !nil when the storage is no longer operational
// driver client <- watcher: database commits. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.CommitEvent watchq chan<- zodb.Event
down chan struct{} // ready when storage is no longer operational down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
...@@ -483,6 +483,9 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) { ...@@ -483,6 +483,9 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
fs.shutdown(err) fs.shutdown(err)
if fs.watchq != nil { if fs.watchq != nil {
if err != nil {
fs.watchq <- &zodb.EventError{err}
}
close(fs.watchq) close(fs.watchq)
} }
} }
...@@ -679,7 +682,7 @@ mainloop: ...@@ -679,7 +682,7 @@ mainloop:
case <-fs.down: case <-fs.down:
return nil return nil
case fs.watchq <- zodb.CommitEvent{it.Txnh.Tid, δoid}: case fs.watchq <- &zodb.EventCommit{it.Txnh.Tid, δoid}:
// ok // ok
} }
} }
......
...@@ -377,7 +377,7 @@ func TestWatch(t *testing.T) { ...@@ -377,7 +377,7 @@ func TestWatch(t *testing.T) {
// force tfs creation & open tfs at go side // force tfs creation & open tfs at go side
at := xcommit(0, xtesting.ZRawObject{0, b("data0")}) at := xcommit(0, xtesting.ZRawObject{0, b("data0")})
watchq := make(chan zodb.CommitEvent) watchq := make(chan zodb.Event)
fs, at0 := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq}) fs, at0 := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq})
if at0 != at { if at0 != at {
t.Fatalf("opened @ %s ; want %s", at0, at) t.Fatalf("opened @ %s ; want %s", at0, at)
...@@ -423,10 +423,22 @@ func TestWatch(t *testing.T) { ...@@ -423,10 +423,22 @@ func TestWatch(t *testing.T) {
xtesting.ZRawObject{i, b(datai)}) xtesting.ZRawObject{i, b(datai)})
// TODO also test for watcher errors // TODO also test for watcher errors
e := <-watchq event := <-watchq
if objvWant := []zodb.Oid{0, i}; !(e.Tid == at && reflect.DeepEqual(e.Changev, objvWant)) { var δ *zodb.EventCommit
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", e.Tid, e.Changev, at, objvWant) switch event := event.(type) {
default:
panic(fmt.Sprintf("unexpected event: %T", event))
case *zodb.EventError:
t.Fatal(event.Err)
case *zodb.EventCommit:
δ = event
}
if objvWant := []zodb.Oid{0, i}; !(δ.Tid == at && reflect.DeepEqual(δ.Changev, objvWant)) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", δ.Tid, δ.Changev, at, objvWant)
} }
checkLastTid(at) checkLastTid(at)
......
...@@ -45,8 +45,8 @@ type zeo struct { ...@@ -45,8 +45,8 @@ type zeo struct {
mu sync.Mutex mu sync.Mutex
lastTid zodb.Tid lastTid zodb.Tid
// driver client <- watcher: database commits. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.CommitEvent // FIXME stub watchq chan<- zodb.Event // FIXME stub
url string // we were opened via this url string // we were opened via this
} }
......
...@@ -426,9 +426,26 @@ type Committer interface { ...@@ -426,9 +426,26 @@ type Committer interface {
// TpcAbort(txn) // TpcAbort(txn)
} }
// Event represents one database event.
//
// Possible events are:
//
// - EventError an error happened
// - EventCommit a transaction was committed
type Event interface {
event()
}
func (_ *EventError) event() {}
func (_ *EventCommit) event() {}
// EventError is event descrbing an error observed by watcher.
type EventError struct {
Err error
}
// CommitEvent is event describing one observed database commit. // EventCommit is event describing one observed database commit.
type CommitEvent struct { type EventCommit struct {
Tid Tid // ID of committed transaction Tid Tid // ID of committed transaction
Changev []Oid // ID of objects changed by committed transaction Changev []Oid // ID of objects changed by committed transaction
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment