Commit cfc26e2b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 69fd5651
......@@ -93,6 +93,7 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
mlinkReady: make(chan struct{}),
operational: false,
opReady: make(chan struct{}),
at0Ready: make(chan struct{}),
}
}
......@@ -319,26 +320,27 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er
if c.at0Initialized {
// XXX c.head locking?
if lastTxn.Tid != c.head {
return fmt.Errorf("New transactions were committed while we were disconnected from master (%s -> %s)", c.head, lastTxn.Tid)
return fmt.Errorf("new transactions were committed while we were disconnected from master (%s -> %s)", c.head, lastTxn.Tid)
}
} // XXX -> else?
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
c.at0Mu.Lock()
c.at0 = lastTxn.Tid
c.at0Initialized = true
// c.flushEventq0() XXX reenable
c.at0Mu.Unlock()
} else {
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
c.at0Mu.Lock()
c.at0 = lastTxn.Tid
c.at0Initialized = true
c.flushEventq0()
c.at0Mu.Unlock()
close(c.at0Ready)
}
// XXX what next?
......@@ -402,6 +404,23 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
return nil
}
// flushEventq0 flushes events queued in c.eventq0.
// must be called under .at0Mu
func (c *Client) flushEventq0() {
if !c.at0Initialized {
panic("flush, but .at0 not yet initialized")
}
if c.watchq != nil {
for _, e := range c.eventq0 {
if e.Tid > c.at0 {
c.watchq <- e
}
}
}
c.eventq0 = nil
}
// --- user API calls ---
func (c *Client) Sync(ctx context.Context) (_ zodb.Tid, err error) {
......@@ -564,7 +583,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
c.at0Mu.Lock()
defer c.at0Mu.Unlock()
if c.at0Initialized {
// c.flushEventq0()
c.flushEventq0()
}
if c.watchq != nil {
if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment