Commit 44f5a726 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ffef2a72
......@@ -494,7 +494,7 @@ func (c *Client) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
// ---- ZODB open/url support ----
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorageDriver, error) {
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, error) {
// neo://name@master1,master2,...,masterN?options
if u.User == nil {
......@@ -507,6 +507,8 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zo
return nil, fmt.Errorf("neo: %s: TODO write mode not implemented", u)
}
// XXX handle opt.WatchQ
// XXX check/use other url fields
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
......
......@@ -35,8 +35,21 @@ type OpenOptions struct {
NoCache bool // don't use cache for read/write operations; prefetch will be noop
}
// DriverOptions describes options for DriverOpener
type DriverOptions struct {
ReadOnly bool // whether to open storage as read-only
// Channel where watched storage events have to be delivered.
// WatchQ can be nil to ignore such events. However if WatchQ != nil, the events
// have to be consumed or else the storage driver will misbehave - e.g.
// it can get out of sync with the on-disk database file.
//
// XXX the channel will be closed after ... ?
WatchQ chan WatchEvent
}
// DriverOpener is a function to open a storage driver.
type DriverOpener func (ctx context.Context, u *url.URL, opt *OpenOptions) (IStorageDriver, error)
type DriverOpener func (ctx context.Context, u *url.URL, opt *DriverOptions) (IStorageDriver, error)
// {} scheme -> DriverOpener
var driverRegistry = map[string]DriverOpener{}
......@@ -77,7 +90,12 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme)
}
storDriver, err := opener(ctx, u, opt)
drvOpt := &DriverOptions{
ReadOnly: opt.ReadOnly,
// TODO watchq
}
storDriver, err := opener(ctx, u, drvOpt)
if err != nil {
return nil, err
}
......
......@@ -97,7 +97,7 @@ type FileStorage struct {
txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty)
// driver client <- watcher: data file updates.
watchq chan watchEvent
watchq chan zodb.WatchEvent
down chan struct{} // ready when FileStorage is no longer operational
downOnce sync.Once
......@@ -513,7 +513,7 @@ mainloop:
return err
}
fsize := fi.Size()
tracef("toppos: %d\tfsize: %d\n", idx.TopPos, fsize)
//tracef("toppos: %d\tfsize: %d\n", idx.TopPos, fsize)
switch {
case fsize == idx.TopPos:
continue // same as before
......@@ -562,7 +562,7 @@ mainloop:
// read ok - reset t₀(partial)
t0partial = time.Time{}
tracef("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status)
//tracef("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status)
// XXX dup wrt Index.Update
......@@ -604,25 +604,20 @@ mainloop:
}
fs.mu.Unlock()
tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv)
//tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv)
select {
case <-fs.down:
return nil
case fs.watchq <- watchEvent{it.Txnh.Tid, oidv}:
case fs.watchq <- zodb.WatchEvent{it.Txnh.Tid, oidv}:
// ok
}
}
}
}
// watchEvent is one event from watch to Watch
type watchEvent struct {
tid zodb.Tid
oidv []zodb.Oid
}
/*
// XXX doc
func (fs *FileStorage) Watch(ctx context.Context) (_ zodb.Tid, _ []zodb.Oid, err error) {
defer xerr.Contextf(&err, "%s: watch", fs.file.Name())
......@@ -638,6 +633,7 @@ func (fs *FileStorage) Watch(ctx context.Context) (_ zodb.Tid, _ []zodb.Oid, err
return w.tid, w.oidv, nil
}
}
*/
// --- open + rebuild index ---
......@@ -662,9 +658,13 @@ func (fs *FileStorage) Close() error {
// Open opens FileStorage @path.
//
// TODO read-write support
func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, err error) {
if !opt.ReadOnly {
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
}
fs := &FileStorage{
watchq: make(chan watchEvent),
watchq: opt.WatchQ,
down: make(chan struct{}),
}
......
......@@ -22,24 +22,17 @@ package fs1
import (
"context"
"fmt"
"net/url"
"lab.nexedi.com/kirr/neo/go/zodb"
)
func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorageDriver, error) {
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, error) {
// TODO handle query
// XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path
// XXX readonly stub
// XXX place = ?
if !opt.ReadOnly {
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
}
fs, err := Open(ctx, path)
fs, err := Open(ctx, path, opt)
return fs, err
}
......
......@@ -283,7 +283,7 @@ func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply {
// ---- open ----
func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.IStorageDriver, err error) {
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, err error) {
url := u.String()
defer xerr.Contextf(&err, "open %s:", url)
......@@ -310,6 +310,8 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.I
return nil, fmt.Errorf("TODO write mode not implemented")
}
// XXX handle opt.WatchQ
zl, err := dialZLink(ctx, net, addr) // XXX + methodTable
if err != nil {
return nil, err
......
......@@ -352,6 +352,8 @@ type Prefetcher interface {
}
// IStorageDriver is the raw interface provided by ZODB storage drivers.
//
// A storage driver is created by DriverOpener
type IStorageDriver interface {
// URL returns URL of how the storage was opened
URL() string
......@@ -366,21 +368,21 @@ type IStorageDriver interface {
Loader
Iterator
/*
Watcher
/*
// Notifier returns storage driver notifier.
//
// The notifier represents invalidation channel (notify about changes
// made to DB not by us from outside). XXX
// made to DB). XXX
//
// To simplify drivers, there must be only 1 logical user of
// storage-driver level notifier interface. Contrary IStorage allows
// for several users of notification channel. XXX ok?
//
// XXX -> nil, if driver does not support notifications?
// XXX or always support them, even with FileStorage (inotify)?
//Notifier() Notifier
// XXX Watch() -> Watcher
// XXX SetWatcher(watchq) SetWatchSink() ? XXX -> ctor ?
*/
}
......@@ -457,6 +459,12 @@ type Notifier interface {
}
*/
// WatchEvent is one event describing observed database change.
type WatchEvent struct {
Tid Tid
Oidv []Oid
}
// Watcher allows to be notified of changes to database.
type Watcher interface {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment