Commit 51e5b8b2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 96080842
......@@ -39,12 +39,12 @@ import (
type zeo struct {
srv *zLink // XXX rename -> link?
// state we get from server by way of server notifications.
mu sync.Mutex
lastTid zodb.Tid
// driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event // FIXME stub
watchq chan<- zodb.Event
head zodb.Tid // last invalidation received from server
at0Mu sync.Mutex
at0 zodb.Tid // at0 obtained when initially connecting to server
eventq0 []*zodb.EventCommit // buffer for initial messeages, until .at0 is initialized
url string // we were opened via this
}
......@@ -112,6 +112,56 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte
}
// invalidateTransaction receives invalidations from server
func (z *zeo) invalidateTransaction(arg interface{}) error {
t, ok := z.srv.asTuple(arg)
if !ok || len(t) != 2 {
return XXX("got %#v; expect 2-tuple", arg)
}
// (tid, oidv)
tid, ok1 := z.srv.tidUnpack(t[0])
xoidt, ok2 := z.srv.asTuple(t[1])
if !(ok1 && ok2) {
return XXX("got (%T, %T); expect (tid, []oid)")
}
oidv := []zodb.Oid{}
for _, xoid := range xoidt {
oid, ok := z.srv.oidUnpack(xoid)
if !ok {
return XXX("non-oid %#v in oidv", xoid)
}
}
if tid <= z.head {
return XXX("bad invalidation from server: tid not ↑: %s -> %s", z.head, tid)
}
z.head = tid
if z.watchq == nil {
return nil
}
event := &zodb.EventCommit{Tid: tid, Changev: oidv}
z.at0Mu.Lock()
defer z.at0Mu.Unlock()
// queue initial events until .at0 is initalized after register
if z.at0 == 0 {
z.eventq0 = append(z.eventq0, event)
return nil
}
// at0 is initialized - ok to send current event if it goes > at0
if tid > z.at0 {
z.watchq <- event
}
return nil
}
// ----------------------------------------
// errorUnexpectedReply is returned by zLink.Call callers when reply was
// received successfully, but is not what the caller expected.
type errorUnexpectedReply struct {
......@@ -331,7 +381,6 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
// TODO watcher(when implementing): filter-out first < at0 messages.
if opt.Watchq != nil {
log.Print("zeo: FIXME: watchq support not implemented - there " +
"won't be notifications about database changes")
......@@ -372,14 +421,25 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid)
}
z.lastTid = lastTid
// XXX since we read lastTid, at least with ZEO < 5, in separate RPC
// call, there is a chance, that by the time when lastTid was read some
// since we read lastTid, at least with ZEO < 5, in separate RPC
// call, there is a chance, that by the time when lastTid was read, some
// new transactions were committed. This way lastTid will be > than
// some first transactions received by watcher via
// "invalidateTransaction" server notification.
at0 = lastTid
//
// filter-out first < at0 messages for this reason.
z.at0Mu.Lock()
z.at0 = lastTid
if z.watchq != nil {
for _, e := range z.eventq0 {
if e.Tid > lastTid {
z.watchq <- e
}
}
}
z.eventq0 = nil
z.at0Mu.Unlock()
//call('get_info') -> {}str->str, ex // XXX can be omitted
......@@ -398,7 +458,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
'supports_record_iternext': True})
*/
return z, at0, nil
return z, z.at0, nil
}
func (z *zeo) Close() error {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment