Commit 3c0e3c38 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f62fba7e
......@@ -305,22 +305,28 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
// --- serve incoming connections from other nodes ---
// identify processes identification request from connected peer.
func (stor *Storage) identify(ctx context.Context, idReq *proto.RequestIdentification) (proto.Msg, bool) {
idResp, ok := stor.identify_(idReq)
if ok {
func (stor *Storage) identify(ctx context.Context, idReq *proto.RequestIdentification) (idResp proto.Msg, err error) {
accept, reject := stor.identify_(idReq)
if accept != nil {
log.Info(ctx, "accepting identification")
idResp = accept
} else {
log.Info(ctx, "rejecting identification (%s)", idResp.(*proto.Error).Message)
log.Info(ctx, "rejecting identification (%s)", reject.Message)
idResp = reject
}
return idResp, ok
var ereject error
if reject != nil {
ereject = reject
}
return idResp, ereject
}
func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, bool) {
func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *proto.Error) {
// XXX stub: we accept clients and don't care about their NID
if idReq.NodeType != proto.CLIENT {
return &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}, false
return nil, &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}
}
if idReq.ClusterName != stor.node.ClusterName {
return &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}, false
return nil, &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}
}
// check operational
......@@ -329,14 +335,14 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, b
stor.opMu.Unlock()
if !operational {
return &proto.Error{proto.NOT_READY, "cluster not operational"}, false
return nil, &proto.Error{proto.NOT_READY, "cluster not operational"}
}
return &proto.AcceptIdentification{
NodeType: stor.node.MyInfo.Type,
MyNID: stor.node.MyInfo.NID, // XXX lock wrt update
YourNID: idReq.NID,
}, true
}, nil
}
......@@ -355,18 +361,17 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
// serveLink serves incoming node-node link connection.
func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *proto.RequestIdentification) (err error) {
link := req.Link()
defer task.Runningf(&ctx, "serve %s", link)(&err)
defer task.Runningf(&ctx, "serve %s", idReq.NID)(&err)
defer xio.CloseWhenDone(ctx, link)()
// first process identification
// XXX just req.Reply(idResp) + return if !ok
idResp, ok := stor.identify(ctx, idReq)
if !ok {
reject(ctx, req, idResp)
return nil
// TODO -> .Accept() that would listen, handshake and identify a node
// -> and only then return it to serve loop if identified ok.
idResp, err := stor.identify(ctx, idReq)
err2 := req.Reply(idResp)
if err == nil {
err = err2
}
err = accept(ctx, req, idResp)
if err != nil {
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment