Commit d3a0bccb authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zeo: Add support for invalidations

Receive invalidation from server and send corresponding events to watchq.
Care to send only events with tid > at0 that we initially returned when opening.

Tests pass, but they need https://github.com/zopefoundation/ZEO/pull/160
parent 2aa27ef9
...@@ -23,7 +23,6 @@ package zeo ...@@ -23,7 +23,6 @@ package zeo
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
...@@ -39,12 +38,12 @@ import ( ...@@ -39,12 +38,12 @@ import (
type zeo struct { type zeo struct {
link *zLink link *zLink
// state we get from server by way of server notifications.
mu sync.Mutex
lastTid zodb.Tid
// driver client <- watcher: database commits | errors. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event // FIXME stub watchq chan<- zodb.Event
head zodb.Tid // last invalidation received from server
at0Mu sync.Mutex
at0 zodb.Tid // at0 obtained when initially connecting to server
eventq0 []*zodb.EventCommit // buffer for initial messages, until .at0 is initialized
// becomes ready when serve loop finishes // becomes ready when serve loop finishes
serveWG sync.WaitGroup serveWG sync.WaitGroup
...@@ -71,6 +70,8 @@ func (z *zeo) Sync(ctx context.Context) (head zodb.Tid, err error) { ...@@ -71,6 +70,8 @@ func (z *zeo) Sync(ctx context.Context) (head zodb.Tid, err error) {
return zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xhead) return zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xhead)
} }
// no need to verify that head↑ - IStorage.Sync does it.
// no need to wait till .watchq is notified till head] - IStorage.Sync does it.
return head, nil return head, nil
} }
...@@ -110,6 +111,79 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte ...@@ -110,6 +111,79 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte
} }
// invalidateTransaction receives invalidations from server.
func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
// (tid, oidv)
enc := z.link.enc
t, ok := enc.asTuple(arg)
if !ok || len(t) != 2 {
return fmt.Errorf("got %#v; expect 2-tuple", arg)
}
tid, ok1 := enc.asTid(t[0])
xoidv, ok2 := enc.asTuple(t[1])
if !(ok1 && ok2) {
return fmt.Errorf("got (%T, %T); expect (tid, []oid)", t...)
}
oidv := []zodb.Oid{}
for _, xoid := range xoidv {
oid, ok := enc.asOid(xoid)
if !ok {
return fmt.Errorf("non-oid %#v in oidv", xoid)
}
oidv = append(oidv, oid)
}
// likely no need to verify for tid↑ because IStorage watcher does it.
// However until .at0 is initialized we do not send events to IStorage,
// so double check for monotonicity here as well.
if tid <= z.head {
return fmt.Errorf("bad invalidation from server: tid not ↑: %s -> %s", z.head, tid)
}
z.head = tid
if z.watchq == nil {
return nil
}
// invalidation event received and we have to send it to .watchq
event := &zodb.EventCommit{Tid: tid, Changev: oidv}
z.at0Mu.Lock()
defer z.at0Mu.Unlock()
// queue initial events until .at0 is initialized after register
// queued events will be sent to watchq by zeo ctor after initializing .at0
if z.at0 == 0 {
z.eventq0 = append(z.eventq0, event)
return nil
}
// at0 is initialized - ok to send current event if it goes > at0
if tid > z.at0 {
z.watchq <- event
}
return nil
}
// flushEventq0 flushes events queued in z.eventq0.
// must be called under .at0Mu
func (z *zeo) flushEventq0() {
if z.at0 == 0 {
panic("flush, but .at0 not yet initialized")
}
if z.watchq != nil {
for _, e := range z.eventq0 {
if e.Tid > z.at0 {
z.watchq <- e
}
}
}
z.eventq0 = nil
}
// ----------------------------------------
// errorUnexpectedReply is returned by zLink.Call callers when reply was // errorUnexpectedReply is returned by zLink.Call callers when reply was
// received successfully, but is not what the caller expected. // received successfully, but is not what the caller expected.
type errorUnexpectedReply struct { type errorUnexpectedReply struct {
...@@ -332,14 +406,6 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -332,14 +406,6 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented") return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented")
} }
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
// TODO watcher(when implementing): filter-out first < at0 messages.
if opt.Watchq != nil {
log.Print("zeo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
}
zlink, err := dialZLink(ctx, net, addr) zlink, err := dialZLink(ctx, net, addr)
if err != nil { if err != nil {
return nil, zodb.InvalidTid, err return nil, zodb.InvalidTid, err
...@@ -360,12 +426,25 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -360,12 +426,25 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
defer z.serveWG.Done() defer z.serveWG.Done()
err := zlink.Serve( err := zlink.Serve(
// notifyTab // notifyTab
nil, map[string]func(interface{})error {
"invalidateTransaction": z.invalidateTransaction,
},
// serveTab // serveTab
nil, nil,
) )
_ = err // close .watchq after serve is over
z.at0Mu.Lock()
defer z.at0Mu.Unlock()
if z.at0 != 0 {
z.flushEventq0()
}
if z.watchq != nil {
if err != nil {
z.watchq <- &zodb.EventError{Err: err}
}
close(z.watchq)
}
}() }()
// call register // call register
...@@ -390,14 +469,18 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -390,14 +469,18 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid) return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid)
} }
z.lastTid = lastTid // since we read lastTid, at least with ZEO < 5, in separate RPC
// call, there is a chance, that by the time when lastTid was read, some
// XXX since we read lastTid, at least with ZEO < 5, in separate RPC
// call, there is a chance, that by the time when lastTid was read some
// new transactions were committed. This way lastTid will be > than // new transactions were committed. This way lastTid will be > than
// some first transactions received by watcher via // some first transactions received by watcher via
// "invalidateTransaction" server notification. // "invalidateTransaction" server notification.
at0 = lastTid //
// filter-out first < at0 messages for this reason.
z.at0Mu.Lock()
z.at0 = lastTid
z.flushEventq0()
z.at0Mu.Unlock()
//call('get_info') -> {}str->str, ex // XXX can be omitted //call('get_info') -> {}str->str, ex // XXX can be omitted
...@@ -416,14 +499,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -416,14 +499,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
'supports_record_iternext': True}) 'supports_record_iternext': True})
*/ */
return z, at0, nil return z, z.at0, nil
} }
func (z *zeo) Close() error { func (z *zeo) Close() error {
err := z.link.Close() err := z.link.Close()
if z.watchq != nil {
close(z.watchq)
}
z.serveWG.Wait() z.serveWG.Wait()
return err return err
} }
......
...@@ -244,6 +244,12 @@ func TestLoad(t *testing.T) { ...@@ -244,6 +244,12 @@ func TestLoad(t *testing.T) {
}) })
} }
func TestWatch(t *testing.T) {
withZEOSrv(t, func(t *testing.T, zsrv ZEOSrv) {
xtesting.DrvTestWatch(t, "zeo://" + zsrv.Addr(), openByURL)
})
}
func zeoOpen(zurl string, opt *zodb.DriverOptions) (_ *zeo, at0 zodb.Tid, err error) { func zeoOpen(zurl string, opt *zodb.DriverOptions) (_ *zeo, at0 zodb.Tid, err error) {
defer xerr.Contextf(&err, "openzeo %s", zurl) defer xerr.Contextf(&err, "openzeo %s", zurl)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment