Commit 8b51a7b8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 96da2b58
......@@ -385,19 +385,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented")
}
z := &zeo{watchq: opt.Watchq, url: url}
//zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ...
zl, err := dialZLink(ctx, net, addr,
/*
// notifyTab
map[string]func(interface{})error {
"invalidateTransaction": z.invalidateTransaction,
},
// serveTab
nil,
*/
)
zl, err := dialZLink(ctx, net, addr)
if err != nil {
return nil, zodb.InvalidTid, err
}
......@@ -409,7 +397,15 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}()
z.link = zl
z := &zeo{link: zl, watchq: opt.Watchq, url: url}
zl.StartServe(
// notifyTab
map[string]func(interface{})error {
"invalidateTransaction": z.invalidateTransaction,
},
// serveTab
nil,
)
rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
......
......@@ -50,6 +50,7 @@ var protoVersions = []string{
// zLink provides service to make and receive RPC requests.
//
// create zLink via dialZLink or handshake.
// once link is created .StartServe must be called on it.
type zLink struct {
link net.Conn // underlying network
rxbuf rbuf.RingBuf // buffer for reading from link
......@@ -60,11 +61,12 @@ type zLink struct {
callID int64 // ID for next call; incremented at every call
// methods peer can invoke
serveReady chan struct{} // ready after serveTab and notifyTab are initialized by user
// methods are served in parallel
serveTab map[string]func(context.Context, interface{})interface{}
// notifications peer can send
// notifications are invoked in order
notifyTab map[string]func(interface{})
notifyTab map[string]func(interface{}) error
serveWg sync.WaitGroup // for serveRecv and serveTab spawned from it
serveCtx context.Context // serveTab handlers are called with this ctx
......@@ -85,6 +87,15 @@ func (zl *zLink) start() {
go zl.serveRecv()
}
func (zl *zLink) StartServe(
notifyTab map[string]func(interface{}) error,
serveTab map[string]func(context.Context, interface{}) interface{},
) {
zl.serveTab = serveTab
zl.notifyTab = notifyTab
close(zl.serveReady)
}
var errLinkClosed = errors.New("zlink is closed")
// shutdown shuts zlink down and sets error (XXX) which
......@@ -166,6 +177,10 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
return nil
}
// message is notification or call
// wait until user called StartServe on us
<-zl.serveReady
// message is notification
if m.flags & msgAsync != 0 {
// notifications go in-order
......@@ -455,7 +470,7 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
// create raw zlink since we need to do the handshake as ZEO message exchange,
// but don't start serve goroutines yet.
zl := &zLink{link: conn}
zl := &zLink{link: conn, serveReady: make(chan struct{})}
// ready when/if handshake tx/rx exchange succeeds
hok := make(chan struct{})
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment