Commit dea433be authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9cd5302f
// Copyright (C) 2017-2019 Nexedi SA and Contributors. // Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -70,7 +70,13 @@ type Client struct { ...@@ -70,7 +70,13 @@ type Client struct {
opReady chan struct{} // reinitialized each time state becomes non-operational opReady chan struct{} // reinitialized each time state becomes non-operational
// driver client <- watcher: database commits | errors. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event // FIXME stub watchq chan<- zodb.Event
head zodb.Tid // last invalidation received from server
at0Mu sync.Mutex
at0 zodb.Tid // at0 obtained when initially connecting to server
eventq0 []*zodb.EventCommit // buffer for initial messages, until .at0 is initialized
at0Initialized bool // true after .at0 is initialized
} }
var _ zodb.IStorageDriver = (*Client)(nil) var _ zodb.IStorageDriver = (*Client)(nil)
...@@ -277,8 +283,8 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -277,8 +283,8 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
return c.recvMaster(ctx, mlink) return c.recvMaster(ctx, mlink)
}) })
// init partition table from master // init partition table and lastTid from master
// XXX is this needed at all or we can expect master sending us pt via notify channel? // XXX is this needed at all or we can expect master sending us pt/head via notify channel?
wg.Go(func() error { wg.Go(func() error {
return c.initFromMaster(ctx, mlink) return c.initFromMaster(ctx, mlink)
}) })
...@@ -360,19 +366,36 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er ...@@ -360,19 +366,36 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er
c.node.StateMu.Unlock() c.node.StateMu.Unlock()
opready() opready()
/*
XXX don't need this in init?
// ask M about last_tid // ask M about last_tid
rlastTxn := AnswerLastTransaction{} lastTxn := AnswerLastTransaction{}
err = mlink.Ask1(&LastTransaction{}, &rlastTxn) err = mlink.Ask1(&LastTransaction{}, &lastTxn)
if err != nil { if err != nil {
return err return err
} }
// XXX lock if c.at0Initialized {
// XXX rlastTxn.Tid -> c.lastTid // XXX c.head locking?
*/ if lastTxn.Tid != c.head {
return fmt.Errorf("New transactions were committed while we were disconnected from master (%s -> %s)", c.head, lastTxn.Tid)
}
} // XXX -> else?
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
c.at0Mu.Lock()
c.at0 = lastTxn.Tid
c.at0Initialized = true
c.flushEventq0()
c.at0Mu.Unlock()
// XXX what next? // XXX what next?
return nil return nil
...@@ -540,22 +563,37 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) ( ...@@ -540,22 +563,37 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
c := NewClient(u.User.Username(), u.Host, net) c := NewClient(u.User.Username(), u.Host, net)
c.watchq = opt.Watchq c.watchq = opt.Watchq
// XXX since we read lastTid, in separate protocol exchange there is a // XXX go c.Run()
// chance, that by the time when lastTid was read some new transactions // XXX wait c.ready
/*
lastTid, err := c.Sync(ctx)
if err != nil {
c.Close() // XXX lclose
return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: %s", u, err)
}
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first // were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server // transactions received by watcher via "invalidateObjects" server
// notification. // notification.
// //
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it // TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation // current head and guarantees to send only followup invalidation
// updates. // updates.
at0, err := c.Sync(ctx) //
if err != nil { // XXX -> move talkMaster1 / Client.Run
c.Close() // XXX lclose c.at0Mu.Lock()
return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: %s", u, err) c.at0 = lastTid
} c.at0Initialized = true
c.flushEventq0()
c.at0Mu.Unlock()
*/
return c, at0, nil return c, c.at0, nil
} }
func (c *Client) URL() string { func (c *Client) URL() string {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment