Commit 474c67e4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 51d8015d
...@@ -637,7 +637,7 @@ const dumpio = true ...@@ -637,7 +637,7 @@ const dumpio = true
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if dumpio { if dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt.Dump()) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
//defer fmt.Printf("\t-> sendPkt err: %v\n", err) //defer fmt.Printf("\t-> sendPkt err: %v\n", err)
} }
...@@ -690,7 +690,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -690,7 +690,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if dumpio { if dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt.Dump()) fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
} }
return pkt, nil return pkt, nil
......
...@@ -88,7 +88,10 @@ type Node struct { ...@@ -88,7 +88,10 @@ type Node struct {
// link to this node; =nil if not connected // link to this node; =nil if not connected
Link *NodeLink Link *NodeLink
// XXX something indicating in-flight connecti/identification // XXX not yet sure it is good idea
Conn *Conn // main connection
// XXX something indicating in-flight connecting/identification
// (wish Link != nil means connected and identified) // (wish Link != nil means connected and identified)
} }
...@@ -108,7 +111,7 @@ func (nt *NodeTable) Get(uuid NodeUUID) *Node { ...@@ -108,7 +111,7 @@ func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// Update updates information about a node // Update updates information about a node
// it returns corresponding node entry for convenience // it returns corresponding node entry for convenience
func (nt *NodeTable) Update(nodeInfo NodeInfo, link *NodeLink) *Node { func (nt *NodeTable) Update(nodeInfo NodeInfo, conn *Conn /*XXX better link *NodeLink*/) *Node {
node := nt.Get(nodeInfo.NodeUUID) node := nt.Get(nodeInfo.NodeUUID)
if node == nil { if node == nil {
node = &Node{} node = &Node{}
...@@ -116,7 +119,8 @@ func (nt *NodeTable) Update(nodeInfo NodeInfo, link *NodeLink) *Node { ...@@ -116,7 +119,8 @@ func (nt *NodeTable) Update(nodeInfo NodeInfo, link *NodeLink) *Node {
} }
node.NodeInfo = nodeInfo node.NodeInfo = nodeInfo
node.Link = link node.Conn = conn
node.Link = conn.Link()
nt.notify(node.NodeInfo) nt.notify(node.NodeInfo)
return node return node
......
...@@ -144,7 +144,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -144,7 +144,7 @@ func TestMasterStorage(t *testing.T) {
defer pg.Done() defer pg.Done()
tracing.Lock() tracing.Lock()
neo_traceConnRecv_Attach(pg, tracer.traceNeoConnRecv) //neo_traceConnRecv_Attach(pg, tracer.traceNeoConnRecv)
neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre) neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre)
tracing.Unlock() tracing.Unlock()
...@@ -236,16 +236,16 @@ func TestMasterStorage(t *testing.T) { ...@@ -236,16 +236,16 @@ func TestMasterStorage(t *testing.T) {
// TODO test ID rejects // TODO test ID rejects
// M starts recovery on S // M starts recovery on S
tc.Expect(conntx("m:2", "s:2", 0, &neo.Recovery{})) tc.Expect(conntx("m:2", "s:2", 1, &neo.Recovery{}))
tc.Expect(conntx("s:2", "m:2", 0, &neo.AnswerRecovery{ tc.Expect(conntx("s:2", "m:2", 1, &neo.AnswerRecovery{
// empty new node // empty new node
PTid: 0, PTid: 0,
BackupTid: 0, BackupTid: neo.INVALID_TID,
TruncateTid: 0, TruncateTid: neo.INVALID_TID,
})) }))
tc.Expect(conntx("m:2", "s:2", 0, &neo.AskPartitionTable{})) tc.Expect(conntx("m:2", "s:2", 1, &neo.AskPartitionTable{}))
tc.Expect(conntx("s:2", "m:2", 0, &neo.AnswerPartitionTable{ tc.Expect(conntx("s:2", "m:2", 1, &neo.AnswerPartitionTable{
PTid: 0, PTid: 0,
RowList: nil, // XXX -> [] RowList: nil, // XXX -> []
})) }))
......
...@@ -413,14 +413,15 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) ...@@ -413,14 +413,15 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
}() }()
defer runningf(&ctx, "%s: stor recovery", stor.Link.RemoteAddr())(&err) defer runningf(&ctx, "%s: stor recovery", stor.Link.RemoteAddr())(&err)
conn, err := stor.Link.NewConn() conn := stor.Conn
if err != nil { // conn, err := stor.Link.NewConn()
return // if err != nil {
} // return
defer func() { // }
err2 := conn.Close() // defer func() {
err = xerr.First(err, err2) // err2 := conn.Close()
}() // err = xerr.First(err, err2)
// }()
// XXX cancel on ctx // XXX cancel on ctx
...@@ -905,7 +906,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -905,7 +906,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
IdTimestamp: monotime(), IdTimestamp: monotime(),
} }
node = m.nodeTab.Update(nodeInfo, n.conn.Link()) // NOTE this notifies all nodeTab subscribers node = m.nodeTab.Update(nodeInfo, n.conn) // NOTE this notifies all nodeTab subscribers
return node, accept return node, accept
} }
...@@ -928,8 +929,9 @@ func (m *Master) reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) { ...@@ -928,8 +929,9 @@ func (m *Master) reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) {
func (m *Master) accept(ctx context.Context, conn *neo.Conn, resp neo.Msg) error { func (m *Master) accept(ctx context.Context, conn *neo.Conn, resp neo.Msg) error {
// XXX cancel on ctx // XXX cancel on ctx
err1 := conn.Send(resp) err1 := conn.Send(resp)
err2 := conn.Close() return err1 // XXX while trying to work on single conn
return xerr.First(err1, err2) //err2 := conn.Close()
//return xerr.First(err1, err2)
} }
// allocUUID allocates new node uuid for a node of kind nodeType // allocUUID allocates new node uuid for a node of kind nodeType
......
...@@ -171,11 +171,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -171,11 +171,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
log.Info(ctx, "connecting ...") log.Info(ctx, "connecting ...")
Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr) Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr)
if err != nil { if err != nil {
log.Info(ctx, "rejected") // XXX ok here? (err is logged above) log.Info(ctx, "identification rejected") // XXX ok here? (err is logged above)
return err return err
} }
log.Info(ctx, "accepted") log.Info(ctx, "identification accepted")
Mlink := Mconn.Link() Mlink := Mconn.Link()
// close Mlink on return / cancel // close Mlink on return / cancel
...@@ -194,7 +194,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -194,7 +194,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX -> node.Dial ? // XXX -> node.Dial ?
if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID { if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID {
log.Infof(ctx, "master told us to have UUID=%v", accept.YourNodeUUID) log.Infof(ctx, "master told us to have uuid=%v", accept.YourNodeUUID)
stor.node.MyInfo.NodeUUID = accept.YourNodeUUID stor.node.MyInfo.NodeUUID = accept.YourNodeUUID
} }
...@@ -206,6 +206,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -206,6 +206,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
acceptq := make(chan accepted) acceptq := make(chan accepted)
go func () { go func () {
// XXX (temp ?) disabled not to let S accept new connections // XXX (temp ?) disabled not to let S accept new connections
// reason: not (yet ?) clear how to allow listen on dialed link without
// missing immediate sends or deadlocks if peer does not follow
// expected protocol exchange (2 receive paths: Recv & Accept)
return return
for { for {
......
...@@ -57,7 +57,7 @@ func _running(ctxp *context.Context, name string) func(*error) { ...@@ -57,7 +57,7 @@ func _running(ctxp *context.Context, name string) func(*error) {
// XXX is it good idea to log to error here? (not in above layer) // XXX is it good idea to log to error here? (not in above layer)
// XXX what is error here could be not so error above // XXX what is error here could be not so error above
// XXX or we still want to log all errors - right? // XXX or we still want to log all errors - right?
log.Depth(1).Error(ctx, *errp) log.Depth(1).Error(ctx, "## ", *errp) // XXX "::" temp
} else { } else {
log.Depth(1).Info(ctx, "ok") log.Depth(1).Info(ctx, "ok")
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment