Commit 25f5692b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4512fec8
...@@ -47,6 +47,9 @@ type zeo struct { ...@@ -47,6 +47,9 @@ type zeo struct {
at0 zodb.Tid // at0 obtained when initially connecting to server at0 zodb.Tid // at0 obtained when initially connecting to server
eventq0 []*zodb.EventCommit // buffer for initial messeages, until .at0 is initialized eventq0 []*zodb.EventCommit // buffer for initial messeages, until .at0 is initialized
// becomes ready when serve loop finishes
serveWG sync.WaitGroup
url string // we were opened via this url string // we were opened via this
} }
...@@ -163,6 +166,22 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) { ...@@ -163,6 +166,22 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
return nil return nil
} }
// flushEventq0 flushes events queued in z.eventq0.
// must be called under .at0Mu
func (z *zeo) flushEventq0() {
if z.at0 == 0 {
panic("flush, but .at0 not yet initialized")
}
if z.watchq != nil {
for _, e := range z.eventq0 {
if e.Tid > z.at0 {
z.watchq <- e
}
}
}
z.eventq0 = nil
}
// ---------------------------------------- // ----------------------------------------
// errorUnexpectedReply is returned by zLink.Call callers when reply was // errorUnexpectedReply is returned by zLink.Call callers when reply was
...@@ -396,7 +415,12 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -396,7 +415,12 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
z := &zeo{link: zl, watchq: opt.Watchq, url: url} z := &zeo{link: zl, watchq: opt.Watchq, url: url}
zl.StartServe(
// start serve loop on the link
z.serveWG.Add(1)
go func() {
defer z.serveWG.Done()
err := zl.Serve(
// notifyTab // notifyTab
map[string]func(interface{})error { map[string]func(interface{})error {
"invalidateTransaction": z.invalidateTransaction, "invalidateTransaction": z.invalidateTransaction,
...@@ -405,6 +429,20 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -405,6 +429,20 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
nil, nil,
) )
// close .watchq after serve is over
z.at0Mu.Lock()
defer z.at0Mu.Unlock()
if z.at0 != 0 {
z.flushEventq0()
}
if z.watchq != nil {
if err != nil {
z.watchq <- &zodb.EventError{Err: err}
}
close(z.watchq)
}
}()
rpc := z.rpc("register") rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly) xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
if err != nil { if err != nil {
...@@ -435,14 +473,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -435,14 +473,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// filter-out first < at0 messages for this reason. // filter-out first < at0 messages for this reason.
z.at0Mu.Lock() z.at0Mu.Lock()
z.at0 = lastTid z.at0 = lastTid
if z.watchq != nil { z.flushEventq0()
for _, e := range z.eventq0 {
if e.Tid > lastTid {
z.watchq <- e
}
}
}
z.eventq0 = nil
z.at0Mu.Unlock() z.at0Mu.Unlock()
...@@ -468,10 +499,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -468,10 +499,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
func (z *zeo) Close() error { func (z *zeo) Close() error {
err := z.link.Close() err := z.link.Close()
// XXX move -> after wait for go serve z.serveWG.Wait()
if z.watchq != nil { // XXX -> into zlink.shutdown instead? (so it is closed after link is closed)?
close(z.watchq)
}
return err return err
} }
......
...@@ -50,7 +50,7 @@ var protoVersions = []string{ ...@@ -50,7 +50,7 @@ var protoVersions = []string{
// zLink provides service to make and receive RPC requests. // zLink provides service to make and receive RPC requests.
// //
// create zLink via dialZLink or handshake. // create zLink via dialZLink or handshake.
// once link is created .StartServe must be called on it. // once link is created .Serve must be called on it.
type zLink struct { type zLink struct {
link net.Conn // underlying network link net.Conn // underlying network
rxbuf rbuf.RingBuf // buffer for reading from link rxbuf rbuf.RingBuf // buffer for reading from link
...@@ -73,7 +73,7 @@ type zLink struct { ...@@ -73,7 +73,7 @@ type zLink struct {
serveCancel func() // to cancel serveCtx serveCancel func() // to cancel serveCtx
down1 sync.Once down1 sync.Once
errClose error // error got from .link.Close() errDown error // error with which the link was shut down
ver string // protocol version in use (without "Z" or "M" prefix) ver string // protocol version in use (without "Z" or "M" prefix)
enc encoding // protocol encoding in use ('Z' or 'M') enc encoding // protocol encoding in use ('Z' or 'M')
...@@ -87,36 +87,43 @@ func (zl *zLink) start() { ...@@ -87,36 +87,43 @@ func (zl *zLink) start() {
go zl.serveRecv() go zl.serveRecv()
} }
// StartServe starts serving calls from remote peer according to notifyTab and serveTab. // Serve serves calls from remote peer according to notifyTab and serveTab.
//
// Serve returns when zlink becomes down - either on normal Close or on error.
// On normal Close returned error == nil, otherwise it describes the reason for
// why zlink was shut down.
// //
// XXX it would be better for zLink to instead provide .Recv() to receive // XXX it would be better for zLink to instead provide .Recv() to receive
// peer's requests and then serve is just loop over Recv and decide what to do // peer's requests and then serve is just loop over Recv and decide what to do
// with messages. // with messages.
// func (zl *zLink) Serve(
// XXX -> Serve(notifyTab, serveTab) error
// XXX err = nil on normal zlink.Close
// XXX err != nil on zlink.shutdown(err)
func (zl *zLink) StartServe(
notifyTab map[string]func(interface{}) error, notifyTab map[string]func(interface{}) error,
serveTab map[string]func(context.Context, interface{}) interface{}, serveTab map[string]func(context.Context, interface{}) interface{},
) { ) error {
zl.serveTab = serveTab zl.serveTab = serveTab
zl.notifyTab = notifyTab zl.notifyTab = notifyTab
close(zl.serveReady) close(zl.serveReady)
// wait for zlink to become down and return shutdown error
zl.serveWg.Wait()
return zl.errDown
} }
var errLinkClosed = errors.New("zlink is closed") var errLinkClosed = errors.New("zlink is closed")
// shutdown shuts zlink down and sets error (XXX) which // shutdown shuts zlink down and sets reason why the link was shut down.
func (zl *zLink) shutdown(err error) { func (zl *zLink) shutdown(err error) {
zl.down1.Do(func() { zl.down1.Do(func() {
err2 := zl.link.Close()
if err == nil {
err = err2
}
if err != nil { if err != nil {
log.Printf("%s: %s", zl.link.RemoteAddr(), err) log.Printf("%s: %s", zl.link.RemoteAddr(), err)
// XXX what else to do with err?
} }
zl.errDown = err
zl.serveCancel() zl.serveCancel()
zl.errClose = zl.link.Close()
// notify call waiters // notify call waiters
zl.callMu.Lock() zl.callMu.Lock()
...@@ -127,18 +134,13 @@ func (zl *zLink) shutdown(err error) { ...@@ -127,18 +134,13 @@ func (zl *zLink) shutdown(err error) {
for _, rxc := range callTab { for _, rxc := range callTab {
rxc <- msg{arg: nil} // notify link was closed XXX ok? or err explicitly? rxc <- msg{arg: nil} // notify link was closed XXX ok? or err explicitly?
} }
// XXX if err != nil -> watchq <- zodb.EventError{err}
// XXX close watcher
// XXX -> notifyTab.shutdown?
// XXX -> go Serve() -> err -> watchq? <- yes
}) })
} }
func (zl *zLink) Close() error { func (zl *zLink) Close() error {
zl.shutdown(nil) zl.shutdown(nil)
zl.serveWg.Wait() // wait in case shutdown was called from serveRecv zl.serveWg.Wait() // wait in case shutdown was called from serveRecv
return zl.errClose return zl.errDown
} }
...@@ -190,7 +192,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error { ...@@ -190,7 +192,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
} }
// message is notification or call // message is notification or call
// wait until user called StartServe on us // wait until user called Serve on us
<-zl.serveReady <-zl.serveReady
// message is notification // message is notification
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment