Commit 4d2c8b1d authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: Require drivers to provide notifications for database change events

A ZODB database can be changed by local client as well as another
process. A notification channel is thus needed for local cache and
database view to know they have to update to current database state.

This patch builds the interface of how such notifications should be
provided by drivers. Contrary to ZODB/py it is required that every
driver provide it.

However we will be providing driver support incrementally and for now
all drivers behave as if the database is not changing.

A note on why Watchq is passed to driver as options: low-level ZODB
users, who might want to work with drivers directly, might not need it,
and this way with Watchq not present in driver options they will continue to
observe the same driver behaviour as before watchq was introduced. In
practice many low-level utilities don't need notification support, and
it would be not good to required them all to update open calls and to
provide watchq drainer not to get stuck.

TODO Watchq should be extended to also receive errors from watcher, so
that clients could be notified when there is something wrong with the
database.
parent 559a1be7
...@@ -38,6 +38,17 @@ type OpenOptions struct { ...@@ -38,6 +38,17 @@ type OpenOptions struct {
// DriverOptions describes options for DriverOpener. // DriverOptions describes options for DriverOpener.
type DriverOptions struct { type DriverOptions struct {
ReadOnly bool // whether to open storage as read-only ReadOnly bool // whether to open storage as read-only
// Channel where storage events have to be delivered.
//
// Watchq can be nil to ignore such events. However if Watchq != nil, the events
// have to be consumed or else the storage driver will misbehave - e.g.
// it can get out of sync with the on-disk database file.
//
// The storage driver closes !nil Watchq when the driver is closed.
//
// TODO extend watchq to also receive errors from watcher.
Watchq chan<- CommitEvent
} }
// DriverOpener is a function to open a storage driver. // DriverOpener is a function to open a storage driver.
...@@ -84,6 +95,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -84,6 +95,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
drvOpt := &DriverOptions{ drvOpt := &DriverOptions{
ReadOnly: opt.ReadOnly, ReadOnly: opt.ReadOnly,
Watchq: nil, // TODO use watchq to implement high-level watching
} }
storDriver, err := opener(ctx, u, drvOpt) storDriver, err := opener(ctx, u, drvOpt)
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -90,6 +90,9 @@ type FileStorage struct { ...@@ -90,6 +90,9 @@ type FileStorage struct {
// (both with .Len=0 & .Tid=0 if database is empty) // (both with .Len=0 & .Tid=0 if database is empty)
txnhMin TxnHeader txnhMin TxnHeader
txnhMax TxnHeader txnhMax TxnHeader
// driver client <- watcher: database commits.
watchq chan<- zodb.CommitEvent // FIXME stub
} }
// IStorageDriver // IStorageDriver
...@@ -508,6 +511,9 @@ func Open(ctx context.Context, path string) (_ *FileStorage, err error) { ...@@ -508,6 +511,9 @@ func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
func (fs *FileStorage) Close() error { func (fs *FileStorage) Close() error {
err := fs.file.Close() err := fs.file.Close()
if fs.watchq != nil {
close(fs.watchq)
}
if err != nil { if err != nil {
return &zodb.OpError{URL: fs.URL(), Op: "close", Args: nil, Err: err} return &zodb.OpError{URL: fs.URL(), Op: "close", Args: nil, Err: err}
} }
......
...@@ -23,6 +23,7 @@ package fs1 ...@@ -23,6 +23,7 @@ package fs1
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net/url" "net/url"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -39,7 +40,15 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.I ...@@ -39,7 +40,15 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.I
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path) return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
} }
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
if opt.Watchq != nil {
log.Print("fs1: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
}
fs, err := Open(ctx, path) fs, err := Open(ctx, path)
fs.watchq = opt.Watchq
return fs, err return fs, err
} }
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"log"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
...@@ -43,6 +44,8 @@ type zeo struct { ...@@ -43,6 +44,8 @@ type zeo struct {
mu sync.Mutex mu sync.Mutex
lastTid zodb.Tid lastTid zodb.Tid
// driver client <- watcher: database commits.
watchq chan<- zodb.CommitEvent // FIXME stub
url string // we were opened via this url string // we were opened via this
} }
...@@ -307,6 +310,13 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -307,6 +310,13 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, fmt.Errorf("TODO write mode not implemented") return nil, fmt.Errorf("TODO write mode not implemented")
} }
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
if opt.Watchq != nil {
log.Print("zeo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
}
zl, err := dialZLink(ctx, net, addr) zl, err := dialZLink(ctx, net, addr)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -319,7 +329,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -319,7 +329,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}() }()
z := &zeo{srv: zl, url: url} z := &zeo{srv: zl, watchq: opt.Watchq, url: url}
rpc := z.rpc("register") rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly) xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
...@@ -365,7 +375,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -365,7 +375,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
func (z *zeo) Close() error { func (z *zeo) Close() error {
return z.srv.Close() err := z.srv.Close()
if z.watchq != nil {
close(z.watchq)
}
return err
} }
func (z *zeo) URL() string { func (z *zeo) URL() string {
......
...@@ -361,6 +361,9 @@ type IStorageDriver interface { ...@@ -361,6 +361,9 @@ type IStorageDriver interface {
Loader Loader
Iterator Iterator
// A storage driver also delivers database change events to watchq
// channel, which is passed to it when the driver is created.
} }
// Loader provides functionality to load objects. // Loader provides functionality to load objects.
...@@ -424,6 +427,12 @@ type Committer interface { ...@@ -424,6 +427,12 @@ type Committer interface {
} }
// CommitEvent is event describing one observed database commit.
type CommitEvent struct {
Tid Tid // ID of committed transaction
Changev []Oid // ID of objects changed by committed transaction
}
// Notifier allows to be notified of database changes made by other clients. // Notifier allows to be notified of database changes made by other clients.
type Notifier interface { type Notifier interface {
// TODO: invalidation channel (notify about changes made to DB not by us from outside) // TODO: invalidation channel (notify about changes made to DB not by us from outside)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment