Commit 529c4666 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6e05fb59
...@@ -173,35 +173,8 @@ func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) { ...@@ -173,35 +173,8 @@ func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) {
} }
} }
var me *NodeCommon // XXX temp stub link, err := p.dial(ctx)
conn0, accept, err := me.Dial(ctx, p.Type, p.Addr.String())
dialT = time.Now() dialT = time.Now()
if err != nil {
return nil, err
}
link := conn0.Link()
// verify peer identifies as what we expect
// XXX move to Dial?
switch {
case accept.NodeType != p.Type:
err = fmt.Errorf("connected, but peer is not %v (identifies as %v)", p.Type, accept.NodeType)
case accept.MyUUID != p.UUID:
err = fmt.Errorf("connected, but peer's uuid is not %v (identifies as %v)", p.UUID, accept.MyUUID)
case accept.YourUUID != me.MyInfo.UUID:
err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, me.MyInfo.UUID)
case !(accept.NumPartitions == 1 && accept.NumReplicas == 1):
err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.")
}
if err != nil {
//log.Iferr(ctx, link.Close())
lclose(ctx, link)
link = nil
}
return link, err return link, err
}() }()
...@@ -221,6 +194,39 @@ func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) { ...@@ -221,6 +194,39 @@ func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) {
} }
// XXX dial does low-level work to dial peer
// XXX p.* reading without lock - ok?
func (p *Peer) dial(ctx context.Context) (*NodeLink, error) {
var me *NodeCommon // XXX temp stub
conn0, accept, err := me.Dial(ctx, p.Type, p.Addr.String())
if err != nil {
return nil, err
}
link := conn0.Link()
// verify peer identifies as what we expect
// XXX move to Dial?
switch {
case accept.NodeType != p.Type:
err = fmt.Errorf("connected, but peer is not %v (identifies as %v)", p.Type, accept.NodeType)
case accept.MyUUID != p.UUID:
err = fmt.Errorf("connected, but peer's uuid is not %v (identifies as %v)", p.UUID, accept.MyUUID)
case accept.YourUUID != me.MyInfo.UUID:
err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, me.MyInfo.UUID)
case !(accept.NumPartitions == 1 && accept.NumReplicas == 1):
err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.")
}
if err != nil {
//log.Errorif(ctx, link.Close())
lclose(ctx, link)
link = nil
}
return link, err
}
......
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext" "lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
...@@ -159,7 +160,7 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -159,7 +160,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
return err // XXX err ctx return err // XXX err ctx
} }
defer runningf(&ctx, "master(%v)", l.Addr())(&err) defer task.Runningf(&ctx, "master(%v)", l.Addr())(&err)
m.node.MasterAddr = l.Addr().String() m.node.MasterAddr = l.Addr().String()
naddr, err := neo.Addr(l.Addr()) naddr, err := neo.Addr(l.Addr())
...@@ -221,7 +222,7 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -221,7 +222,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
// runMain is the process which implements main master cluster management logic: node tracking, cluster // runMain is the process which implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) runMain(ctx context.Context) (err error) { func (m *Master) runMain(ctx context.Context) (err error) {
defer running(&ctx, "main")(&err) defer task.Running(&ctx, "main")(&err)
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state // NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
...@@ -286,7 +287,7 @@ type storRecovery struct { ...@@ -286,7 +287,7 @@ type storRecovery struct {
// - nil: recovery was ok and a command came for cluster to start // - nil: recovery was ok and a command came for cluster to start
// - !nil: recovery was cancelled // - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) (err error) { func (m *Master) recovery(ctx context.Context) (err error) {
defer running(&ctx, "recovery")(&err) defer task.Running(&ctx, "recovery")(&err)
m.setClusterState(neo.ClusterRecovering) m.setClusterState(neo.ClusterRecovering)
ctx, rcancel := context.WithCancel(ctx) ctx, rcancel := context.WithCancel(ctx)
...@@ -497,7 +498,7 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) ...@@ -497,7 +498,7 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
// on error provide feedback to storRecovery chan // on error provide feedback to storRecovery chan
res <- storRecovery{stor: stor, err: err} res <- storRecovery{stor: stor, err: err}
}() }()
defer runningf(&ctx, "%s: stor recovery", stor.Link.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: stor recovery", stor.Link.RemoteAddr())(&err)
conn := stor.Conn conn := stor.Conn
// conn, err := stor.Link.NewConn() // conn, err := stor.Link.NewConn()
...@@ -553,7 +554,7 @@ var errClusterDegraded = stderrors.New("cluster became non-operatonal") ...@@ -553,7 +554,7 @@ var errClusterDegraded = stderrors.New("cluster became non-operatonal")
// //
// prerequisite for start: .partTab is operational wrt .nodeTab // prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) (err error) { func (m *Master) verify(ctx context.Context) (err error) {
defer running(&ctx, "verify")(&err) defer task.Running(&ctx, "verify")(&err)
m.setClusterState(neo.ClusterVerifying) m.setClusterState(neo.ClusterVerifying)
ctx, vcancel := context.WithCancel(ctx) ctx, vcancel := context.WithCancel(ctx)
...@@ -714,7 +715,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable, ...@@ -714,7 +715,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
res <- storVerify{stor: stor, err: err} res <- storVerify{stor: stor, err: err}
} }
}() }()
defer runningf(&ctx, "%s: stor verify", stor.Link)(&err) defer task.Runningf(&ctx, "%s: stor verify", stor.Link)(&err)
conn := stor.Conn conn := stor.Conn
...@@ -772,7 +773,7 @@ type serviceDone struct { ...@@ -772,7 +773,7 @@ type serviceDone struct {
// //
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed // prerequisite for start: .partTab is operational wrt .nodeTab and verification passed
func (m *Master) service(ctx context.Context) (err error) { func (m *Master) service(ctx context.Context) (err error) {
defer running(&ctx, "service")(&err) defer task.Running(&ctx, "service")(&err)
m.setClusterState(neo.ClusterRunning) m.setClusterState(neo.ClusterRunning)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
...@@ -827,6 +828,7 @@ loop: ...@@ -827,6 +828,7 @@ loop:
case d := <-serviced: case d := <-serviced:
// TODO if S goes away -> check partTab still operational -> if not - recovery // TODO if S goes away -> check partTab still operational -> if not - recovery
_ = d
// XXX who sends here? // XXX who sends here?
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
...@@ -866,7 +868,7 @@ func storCtlService(ctx context.Context, stor *neo.Node, done chan serviceDone) ...@@ -866,7 +868,7 @@ func storCtlService(ctx context.Context, stor *neo.Node, done chan serviceDone)
} }
func storCtlService1(ctx context.Context, stor *neo.Node) (err error) { func storCtlService1(ctx context.Context, stor *neo.Node) (err error) {
defer runningf(&ctx, "%s: stor service", stor.Link.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: stor service", stor.Link.RemoteAddr())(&err)
conn := stor.Conn conn := stor.Conn
...@@ -906,7 +908,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -906,7 +908,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
// - NodeType valid // - NodeType valid
// - IdTimestamp ? // - IdTimestamp ?
uuid := n.idReq.NodeUUID uuid := n.idReq.UUID
nodeType := n.idReq.NodeType nodeType := n.idReq.NodeType
err := func() *neo.Error { err := func() *neo.Error {
...@@ -939,7 +941,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -939,7 +941,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
return nil return nil
}() }()
subj := fmt.Sprintf("identify: %s (%s)", n.conn.Link().RemoteAddr(), n.idReq.NodeUUID) subj := fmt.Sprintf("identify: %s (%s)", n.conn.Link().RemoteAddr(), n.idReq.UUID)
if err != nil { if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err) log.Infof(ctx, "%s: rejecting: %s", subj, err)
return nil, err return nil, err
...@@ -949,10 +951,10 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -949,10 +951,10 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
accept := &neo.AcceptIdentification{ accept := &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
MyNodeUUID: m.node.MyInfo.UUID, MyUUID: m.node.MyInfo.UUID,
NumPartitions: 1, // FIXME hardcoded NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid, YourUUID: uuid,
} }
// update nodeTab // update nodeTab
......
...@@ -118,10 +118,10 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req ...@@ -118,10 +118,10 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req
err = conn.Send(&neo.AcceptIdentification{ err = conn.Send(&neo.AcceptIdentification{
NodeType: myNodeType, NodeType: myNodeType,
MyNodeUUID: 0, // XXX MyUUID: 0, // XXX
NumPartitions: 1, // XXX NumPartitions: 1, // XXX
NumReplicas: 1, // XXX NumReplicas: 1, // XXX
YourNodeUUID: req.NodeUUID, YourUUID: req.UUID,
}) })
if err != nil { if err != nil {
......
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext" "lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
...@@ -94,7 +95,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -94,7 +95,7 @@ func (stor *Storage) Run(ctx context.Context) error {
return err // XXX err ctx return err // XXX err ctx
} }
defer runningf(&ctx, "storage(%v)", l.Addr())(&err) defer task.Runningf(&ctx, "storage(%v)", l.Addr())(&err)
// start serving incoming connections // start serving incoming connections
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
...@@ -143,7 +144,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -143,7 +144,7 @@ func (stor *Storage) Run(ctx context.Context) error {
// //
// it always returns an error - either due to cancel or command from master to shutdown // it always returns an error - either due to cancel or command from master to shutdown
func (stor *Storage) talkMaster(ctx context.Context) (err error) { func (stor *Storage) talkMaster(ctx context.Context) (err error) {
defer runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err) defer task.Runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err)
for { for {
err := stor.talkMaster1(ctx) err := stor.talkMaster1(ctx)
...@@ -194,9 +195,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -194,9 +195,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
} }
// XXX -> node.Dial ? // XXX -> node.Dial ?
if accept.YourNodeUUID != stor.node.MyInfo.UUID { if accept.YourUUID != stor.node.MyInfo.UUID {
log.Infof(ctx, "master told us to have uuid=%v", accept.YourNodeUUID) log.Infof(ctx, "master told us to have uuid=%v", accept.YourUUID)
stor.node.MyInfo.UUID = accept.YourNodeUUID stor.node.MyInfo.UUID = accept.YourUUID
} }
...@@ -298,7 +299,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -298,7 +299,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// - nil: initialization was ok and a command came from master to start operation // - nil: initialization was ok and a command came from master to start operation
// - !nil: initialization was cancelled or failed somehow // - !nil: initialization was cancelled or failed somehow
func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err error) { func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err error) {
defer runningf(&ctx, "init %v", Mconn)(&err) defer task.Runningf(&ctx, "init %v", Mconn)(&err)
for { for {
msg, err := Mconn.Recv() msg, err := Mconn.Recv()
...@@ -371,7 +372,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -371,7 +372,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
// either due to master commanding us to stop, or context cancel or some other // either due to master commanding us to stop, or context cancel or some other
// error. // error.
func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) { func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
defer runningf(&ctx, "serve %v", Mconn)(&err) defer task.Runningf(&ctx, "serve %v", Mconn)(&err)
// refresh stor.opCtx and cancel it when we finish so that client // refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating as master told us to do so. // handlers know they need to stop operating as master told us to do so.
......
...@@ -23,11 +23,14 @@ package server ...@@ -23,11 +23,14 @@ package server
import ( import (
"context" "context"
"io" "io"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
) )
// lclose closes c and logs closing error if there was any. // lclose closes c and logs closing error if there was any.
// the error is otherwise ignored // the error is otherwise ignored
// XXX dup in neo
func lclose(ctx context.Context, c io.Closer) { func lclose(ctx context.Context, c io.Closer) {
err := c.Close() err := c.Close()
if err != nil { if err != nil {
......
package neo
import (
"context"
"io"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
// XXX dup in server
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment