Commit 0fa96338 authored by Kirill Smelkov's avatar Kirill Smelkov

X Clarified Request.Close semantics - tests working again

parent 443e0199
......@@ -281,35 +281,45 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) (err error
if err != nil {
return err
}
err = c.recvMaster1(ctx, req)
req.Close()
if err != nil {
return err
}
c.node.StateMu.Lock()
}
}
switch msg := req.Msg.(type) {
default:
c.node.StateMu.Unlock()
return fmt.Errorf("unexpected message: %T", msg)
// recvMaster1 handles 1 message from master
func (c *Client) recvMaster1(ctx context.Context, req neo.Request) error {
c.node.StateMu.Lock()
// M sends whole PT
case *neo.SendPartitionTable:
c.node.UpdatePartTab(ctx, msg)
switch msg := req.Msg.(type) {
default:
c.node.StateMu.Unlock()
return fmt.Errorf("unexpected message: %T", msg)
// M sends δPT
//case *neo.NotifyPartitionChanges:
// TODO
// M sends whole PT
case *neo.SendPartitionTable:
c.node.UpdatePartTab(ctx, msg)
case *neo.NotifyNodeInformation:
c.node.UpdateNodeTab(ctx, msg)
// M sends δPT
//case *neo.NotifyPartitionChanges:
// TODO
case *neo.NotifyClusterState:
c.node.UpdateClusterState(ctx, msg)
}
case *neo.NotifyNodeInformation:
c.node.UpdateNodeTab(ctx, msg)
// update .operational + notify those who was waiting for it
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
case *neo.NotifyClusterState:
c.node.UpdateClusterState(ctx, msg)
}
// update .operational + notify those who was waiting for it
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
return nil
}
func (c *Client) initFromMaster(ctx context.Context, mlink *neo.NodeLink) (err error) {
......
......@@ -1427,6 +1427,8 @@ func (c *Conn) Ask(req Msg, resp Msg) error {
//
// No Send or Recv must be in flight.
// The caller must not use c after call to close - the connection is returned to freelist.
//
// XXX must be called only once.
func (c *Conn) lightClose() {
nl := c.link
nl.connMu.Lock()
......@@ -1460,7 +1462,7 @@ func (link *NodeLink) Recv1() (Request, error) {
// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
msg, err := conn.Recv() // XXX directly from <-rxq
if err != nil {
conn.Close() // XXX -> conn.release
conn.Close() // XXX -> conn.lightClose()
return Request{}, err
}
......
......@@ -860,7 +860,7 @@ func storCtlService(ctx context.Context, stor *neo.Node) (err error) {
//if err != nil {
// return err
//}
//req.Close()
//req.Close() XXX must be after req handling
//switch msg := req.Msg.(type) {
//case *neo.NotifyReady:
// // ok
......@@ -910,6 +910,7 @@ func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) {
resp := m.serveClient1(ctx, req.Msg)
err = req.Reply(resp)
req.Close()
if err != nil {
return err
}
......
......@@ -238,66 +238,77 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (req
if err != nil {
return nil, err
}
// XXX vvv move Send out of reply preparing logic
switch msg := req.Msg.(type) {
default:
return nil, fmt.Errorf("unexpected message: %T", msg)
case *neo.StartOperation:
// ok, transition to serve
err = stor.m1initialize1(ctx, req)
if err == cmdStart {
// start - transition to serve
return &req, nil
}
req.Close()
if err != nil {
return nil, err
}
}
}
case *neo.Recovery:
err = req.Reply(&neo.AnswerRecovery{
PTid: stor.node.PartTab.PTid,
BackupTid: neo.INVALID_TID,
TruncateTid: neo.INVALID_TID})
case *neo.AskPartitionTable:
// TODO initially read PT from disk
err = req.Reply(&neo.AnswerPartitionTable{
PTid: stor.node.PartTab.PTid,
RowList: stor.node.PartTab.Dump()})
case *neo.LockedTransactions:
// XXX r/o stub
err = req.Reply(&neo.AnswerLockedTransactions{})
// TODO AskUnfinishedTransactions
case *neo.LastIDs:
lastTid, zerr1 := stor.zstor.LastTid(ctx)
lastOid, zerr2 := stor.zstor.LastOid(ctx)
if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return nil, zerr // XXX send the error to M
}
var cmdStart = errors.New("start requested")
err = req.Reply(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
// m1initialize1 handles one message from master from under m1initialize
func (stor *Storage) m1initialize1(ctx context.Context, req neo.Request) error {
// XXX vvv move Send out of reply preparing logic
var err error
case *neo.SendPartitionTable:
// TODO M sends us whole PT -> save locally
stor.node.UpdatePartTab(ctx, msg) // XXX lock?
switch msg := req.Msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
case *neo.StartOperation:
// ok, transition to serve
return cmdStart
case *neo.Recovery:
err = req.Reply(&neo.AnswerRecovery{
PTid: stor.node.PartTab.PTid,
BackupTid: neo.INVALID_TID,
TruncateTid: neo.INVALID_TID})
case *neo.AskPartitionTable:
// TODO initially read PT from disk
err = req.Reply(&neo.AnswerPartitionTable{
PTid: stor.node.PartTab.PTid,
RowList: stor.node.PartTab.Dump()})
case *neo.LockedTransactions:
// XXX r/o stub
err = req.Reply(&neo.AnswerLockedTransactions{})
// TODO AskUnfinishedTransactions
case *neo.LastIDs:
lastTid, zerr1 := stor.zstor.LastTid(ctx)
lastOid, zerr2 := stor.zstor.LastOid(ctx)
if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr // XXX send the error to M
}
case *neo.NotifyPartitionChanges:
// TODO M sends us δPT -> save locally?
err = req.Reply(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
case *neo.NotifyNodeInformation:
// XXX check for myUUID and consider it a command (like neo/py) does?
stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *neo.SendPartitionTable:
// TODO M sends us whole PT -> save locally
stor.node.UpdatePartTab(ctx, msg) // XXX lock?
case *neo.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
}
case *neo.NotifyPartitionChanges:
// TODO M sends us δPT -> save locally?
// XXX move req.Reply here and ^^^ only prepare reply
if err != nil {
return nil, err
}
case *neo.NotifyNodeInformation:
// XXX check for myUUID and consider it a command (like neo/py) does?
stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
req.Close() // XXX err?
case *neo.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
}
// XXX move req.Reply here and ^^^ only prepare reply
return err
}
// m1serve drives storage by master messages during service phase.
......@@ -324,6 +335,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er
// reply M we are ready
// XXX according to current neo/py this is separate send - not reply - and so we do here
err = reqStart.Reply(&neo.NotifyReady{})
reqStart.Close()
if err != nil {
return err
}
......@@ -334,28 +346,36 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er
if err != nil {
return err
}
err = stor.m1serve1(ctx, req)
req.Close()
if err != nil {
return err
}
}
}
req.Close() // XXX stub, err
switch msg := req.Msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
// m1serve1 handles one message from master under m1serve
func (stor *Storage) m1serve1(ctx context.Context, req neo.Request) error {
switch msg := req.Msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
case *neo.StopOperation:
return fmt.Errorf("stop requested")
case *neo.StopOperation:
return fmt.Errorf("stop requested")
// XXX SendPartitionTable?
// XXX NotifyPartitionChanges?
// XXX SendPartitionTable?
// XXX NotifyPartitionChanges?
case *neo.NotifyNodeInformation:
stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *neo.NotifyNodeInformation:
stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *neo.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
case *neo.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
// TODO commit related messages
}
// TODO commit related messages
}
return nil
}
// --- serve incoming connections from other nodes ---
......@@ -466,6 +486,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
for {
resp := stor.serveClient1(ctx, req.Msg)
err := req.Reply(resp)
req.Close()
if err != nil {
log.Error(ctx, err)
return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment