Commit 48c864b0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 14ca848c
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
) )
...@@ -113,13 +114,15 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -113,13 +114,15 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
// XXX dup from Server.talkMaster1 // XXX dup from Server.talkMaster1
// XXX put logging into Dial? // XXX put logging into Dial?
log.Info(ctx, "connecting ...") log.Info(ctx, "connecting ...")
Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr) Mconn, accept, err := c.node.Dial(ctx, neo.MASTER, c.node.MasterAddr)
if err != nil { if err != nil {
// FIXME it is not only identification - e.g. ECONNREFUSED // FIXME it is not only identification - e.g. ECONNREFUSED
log.Info(ctx, "identification rejected") // XXX ok here? (err is logged above) log.Info(ctx, "identification rejected") // XXX ok here? (err is logged above)
return err return err
} }
_ = accept // XXX
log.Info(ctx, "identification accepted") log.Info(ctx, "identification accepted")
Mlink := Mconn.Link() Mlink := Mconn.Link()
...@@ -127,21 +130,38 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -127,21 +130,38 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
// XXX .nodeTab.Reset() // XXX .nodeTab.Reset()
Ask(partiotionTable) rpt := neo.AnswerPartitionTable{}
Ask(lastTransaction) err = Mlink.Ask1(&neo.AskPartitionTable{}, &rpt)
if err != nil {
// XXX
}
pt := neo.PartTabFromDump(rpt.PTid, rpt.RowList)
// XXX pt -> c.node.PartTab ?
_ = pt
rlastTxn := neo.AnswerLastTransaction{}
err = Mlink.Ask1(&neo.LastTransaction{}, &rlastTxn)
if err != nil {
// XXX
}
// XXX rlastTxn.Tid -> c.lastTid
for { for {
msg, err := Mconn.Recv() req, err := Mlink.Recv1()
if err != nil { if err != nil {
return err return err
} }
msg := req.Msg
switch msg.(type) { switch msg.(type) {
default: default:
return fmt.Errorf("unexpected message: %T", msg) return fmt.Errorf("unexpected message: %T", msg)
case *neo.NotifyPartitionTable: //case *neo.NotifyPartitionTable:
// TODO M sends whole PT // // TODO M sends whole PT
//case *neo.NotifyPartitionChanges: //case *neo.NotifyPartitionChanges:
// // TODO M sends δPT // // TODO M sends δPT
...@@ -151,7 +171,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -151,7 +171,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
case *neo.NotifyClusterState: case *neo.NotifyClusterState:
// TODO // TODO
}
} }
} }
......
...@@ -324,6 +324,8 @@ func (c *Conn) CloseRecv() { ...@@ -324,6 +324,8 @@ func (c *Conn) CloseRecv() {
atomic.StoreInt32(&c.rxclosed, 1) atomic.StoreInt32(&c.rxclosed, 1)
c.shutdownRX(errConnClosed) c.shutdownRX(errConnClosed)
// FIXME vvv should be active on Close path too and under shutdown() called from link shutdown
// dequeue all packets already queued in c.rxq // dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into // (once serveRecv sees c.rxdown it won't try to put new packets into
// c.rxq, but something finite could be already there) // c.rxq, but something finite could be already there)
...@@ -1189,7 +1191,9 @@ func (c *Conn) Ask(req Msg, resp Msg) error { ...@@ -1189,7 +1191,9 @@ func (c *Conn) Ask(req Msg, resp Msg) error {
} }
// ---- exchange of 1-1 request-reply ---- // ---- exchange of 1-1 request-reply ----
// (impedance matche for current neo/py imlementation) // (impedance matcher for current neo/py imlementation)
// TODO Recv1/Reply/Send1/Ask1 tests
// Request is a message received from the link + connection handle to make a reply. // Request is a message received from the link + connection handle to make a reply.
// //
...@@ -1226,17 +1230,17 @@ func (link *NodeLink) Recv1() (Request, error) { ...@@ -1226,17 +1230,17 @@ func (link *NodeLink) Recv1() (Request, error) {
// Reply sends response to request. // Reply sends response to request.
// //
// XXX doc // XXX doc
func (req Request) Reply(resp Msg) error { func (req *Request) Reply(resp Msg) error {
err1 := req.conn.Send(resp) err1 := req.conn.Send(resp)
err2 := req.conn.Close() err2 := req.conn.Close()
return xerr.First(err1, err2) return xerr.First(err1, err2)
} }
// Close should be called to free request resources for requests without a reply // Close should be called to free request resources for requests without a reply.
// //
// XXX doc // XXX doc
// It is safe to call Close several times. // It is safe to call Close several times.
func (req Request) Close() error { func (req *Request) Close() error {
return req.conn.Close() return req.conn.Close()
} }
...@@ -1291,3 +1295,7 @@ func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) { ...@@ -1291,3 +1295,7 @@ func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) {
return err return err
} }
func (req *Request) Link() *NodeLink {
return req.conn.Link()
}
...@@ -66,7 +66,6 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error { ...@@ -66,7 +66,6 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
*/ */
// FIXME kill vvv // FIXME kill vvv
///*
// ---------------------------------------- // ----------------------------------------
// XXX goes away? (we need a func to make sure to recv RequestIdentification // XXX goes away? (we need a func to make sure to recv RequestIdentification
...@@ -80,7 +79,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy ...@@ -80,7 +79,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy
defer xerr.Contextf(&err, "%s: identify", link) defer xerr.Contextf(&err, "%s: identify", link)
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
conn, err := link.Accept(ctx) conn, err := link.Accept(/*ctx*/)
if err != nil { if err != nil {
return nodeInfo, err return nodeInfo, err
} }
...@@ -116,4 +115,3 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy ...@@ -116,4 +115,3 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy
return req, nil return req, nil
} }
//*/
...@@ -218,7 +218,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -218,7 +218,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
return return
for { for {
conn, err := Mlink.Accept(ctx) conn, err := Mlink.Accept(/*ctx*/)
select { select {
case acceptq <- accepted{conn, err}: case acceptq <- accepted{conn, err}:
...@@ -428,10 +428,10 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) (err err ...@@ -428,10 +428,10 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) (err err
return return
} }
var serveConn func(context.Context, *neo.Conn) var serveReq func(context.Context, neo.Request)
switch nodeInfo.NodeType { switch nodeInfo.NodeType {
case neo.CLIENT: case neo.CLIENT:
serveConn = stor.serveClient serveReq = stor.serveClient
default: default:
// XXX vvv should be reply to peer // XXX vvv should be reply to peer
...@@ -441,14 +441,13 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) (err err ...@@ -441,14 +441,13 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) (err err
// identification passed, now serve other requests // identification passed, now serve other requests
for { for {
conn, err := link.Accept(ctx) req, err := link.Recv1()
if err != nil { if err != nil {
log.Error(ctx, err) log.Error(ctx, err)
break break
} }
// XXX wrap conn close to happen here, not in serveClient ? go serveReq(ctx, req)
go serveConn(ctx, conn)
} }
// TODO wait all spawned serveConn // TODO wait all spawned serveConn
...@@ -467,36 +466,38 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, ...@@ -467,36 +466,38 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
return xcontext.Merge(ctx, opCtx) return xcontext.Merge(ctx, opCtx)
} }
// serveClient serves incoming connection on which peer identified itself as client // serveClient serves incoming client request.
// the connection is closed when serveClient returns //
// XXX +error return? // XXX +error return?
// //
// XXX version that reuses goroutine to serve next client requests // XXX version that reuses goroutine to serve next client requests
// XXX for py compatibility (py has no way to tell us Conn is closed) // XXX for py compatibility (py has no way to tell us Conn is closed)
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) { func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running? // XXX vvv move level-up
//log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation // rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx) ctx, cancel := stor.withWhileOperational(ctx)
defer cancel() defer cancel()
link := conn.Link() link := req.Link()
for { for {
err := stor.serveClient1(ctx, conn) resp := stor.serveClient1(ctx, req.Msg)
err := req.Reply(resp)
if err != nil { if err != nil {
log.Infof(ctx, "%v: %v", conn, err) log.Info(ctx, err)
return return
} }
lclose(ctx, conn) //lclose(ctx, conn)
// keep on going in the same goroutine to avoid goroutine creation overhead // keep on going in the same goroutine to avoid goroutine creation overhead
// TODO Accept += timeout, go away if inactive // TODO += timeout -> go away if inactive
conn, err = link.Accept(ctx) req, err = link.Recv1()
if err != nil { if err != nil {
// lclose(link) XXX ? // lclose(link) XXX ?
log.Error(ctx, "%v: %v", conn, err) log.Error(ctx, err)
return return
} }
} }
...@@ -548,12 +549,12 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) { ...@@ -548,12 +549,12 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
} }
*/ */
// serveClient1 serves 1 request from a client // serveClient1 prepares response for 1 request from client
func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error { func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
req, err := conn.Recv() // req, err := conn.Recv()
if err != nil { // if err != nil {
return err // XXX log / err / send error before closing // return err // XXX log / err / send error before closing
} // }
switch req := req.(type) { switch req := req.(type) {
case *neo.GetObject: case *neo.GetObject:
...@@ -566,13 +567,13 @@ func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error { ...@@ -566,13 +567,13 @@ func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error {
xid.TidBefore = true xid.TidBefore = true
} }
var reply neo.Msg
data, tid, err := stor.zstor.Load(ctx, xid) data, tid, err := stor.zstor.Load(ctx, xid)
if err != nil { if err != nil {
// TODO translate err to NEO protocol error codes // TODO translate err to NEO protocol error codes
reply = neo.ErrEncode(err) return neo.ErrEncode(err)
} else { }
reply = &neo.AnswerGetObject{
return &neo.AnswerGetObject{
Oid: xid.Oid, Oid: xid.Oid,
Serial: tid, Serial: tid,
...@@ -583,27 +584,24 @@ func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error { ...@@ -583,27 +584,24 @@ func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error {
// XXX .NextSerial // XXX .NextSerial
// XXX .DataSerial // XXX .DataSerial
} }
}
conn.Send(reply) // XXX err // req.Reply(reply) // XXX err
case *neo.LastTransaction: case *neo.LastTransaction:
var reply neo.Msg
lastTid, err := stor.zstor.LastTid(ctx) lastTid, err := stor.zstor.LastTid(ctx)
if err != nil { if err != nil {
reply = neo.ErrEncode(err) return neo.ErrEncode(err)
} else {
reply = &neo.AnswerLastTransaction{lastTid}
} }
conn.Send(reply) // XXX err return &neo.AnswerLastTransaction{lastTid}
// conn.Send(reply) // XXX err
//case *ObjectHistory: //case *ObjectHistory:
//case *StoreObject: //case *StoreObject:
default: default:
panic("unexpected packet") // XXX return &neo.Error{neo.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req)}
} }
//req.Put(...) //req.Put(...)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment