Commit 7c1578e7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bdecaf11
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -68,6 +68,9 @@ type Client struct { ...@@ -68,6 +68,9 @@ type Client struct {
// protected by .node.StateMu // protected by .node.StateMu
operational bool // XXX <- somehow move to NodeApp? operational bool // XXX <- somehow move to NodeApp?
opReady chan struct{} // reinitialized each time state becomes non-operational opReady chan struct{} // reinitialized each time state becomes non-operational
// driver client <- watcher: database commits.
watchq chan<- zodb.CommitEvent // FIXME stub
} }
var _ zodb.IStorageDriver = (*Client)(nil) var _ zodb.IStorageDriver = (*Client)(nil)
...@@ -103,6 +106,9 @@ func (c *Client) Close() error { ...@@ -103,6 +106,9 @@ func (c *Client) Close() error {
c.talkMasterCancel() c.talkMasterCancel()
// XXX wait talkMaster finishes -> XXX return err from that? // XXX wait talkMaster finishes -> XXX return err from that?
// XXX what else? // XXX what else?
if c.watchq != nil {
close(c.watchq)
}
return nil return nil
} }
...@@ -507,9 +513,11 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) ( ...@@ -507,9 +513,11 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
return nil, fmt.Errorf("neo: %s: TODO write mode not implemented", u) return nil, fmt.Errorf("neo: %s: TODO write mode not implemented", u)
} }
// XXX handle opt.Watchq // FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
if opt.Watchq != nil { if opt.Watchq != nil {
panic("TODO watchq") log.Error(ctx, "neo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
} }
// XXX check/use other url fields // XXX check/use other url fields
...@@ -519,6 +527,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) ( ...@@ -519,6 +527,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
// as ctx for open can be done after open finishes - not covering // as ctx for open can be done after open finishes - not covering
// whole storage working lifetime. // whole storage working lifetime.
c := NewClient(u.User.Username(), u.Host, net) c := NewClient(u.User.Username(), u.Host, net)
c.watchq = opt.Watchq
return c, nil return c, nil
} }
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"log"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
...@@ -43,6 +44,8 @@ type zeo struct { ...@@ -43,6 +44,8 @@ type zeo struct {
mu sync.Mutex mu sync.Mutex
lastTid zodb.Tid lastTid zodb.Tid
// driver client <- watcher: database commits.
watchq chan<- zodb.CommitEvent // FIXME stub
url string // we were opened via this url string // we were opened via this
} }
...@@ -307,9 +310,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -307,9 +310,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, fmt.Errorf("TODO write mode not implemented") return nil, fmt.Errorf("TODO write mode not implemented")
} }
// XXX handle opt.Watchq // FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
if opt.Watchq != nil { if opt.Watchq != nil {
panic("TODO watchq") log.Print("zeo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
} }
zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ... zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ...
...@@ -324,7 +329,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -324,7 +329,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}() }()
z := &zeo{srv: zl, url: url} z := &zeo{srv: zl, watchq: opt.Watchq, url: url}
rpc := z.rpc("register") rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly) xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
...@@ -370,7 +375,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -370,7 +375,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
func (z *zeo) Close() error { func (z *zeo) Close() error {
return z.srv.Close() err := z.srv.Close()
if z.watchq != nil {
close(z.watchq)
}
return err
} }
func (z *zeo) URL() string { func (z *zeo) URL() string {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment