Commit 6e05fb59 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 87b612aa
......@@ -87,11 +87,11 @@ func (n *NodeCommon) Dial(ctx context.Context, peerType NodeType, addr string) (
}
req := &RequestIdentification{
NodeType: n.MyInfo.Type,
NodeUUID: n.MyInfo.UUID,
Address: n.MyInfo.Addr,
ClusterName: n.ClusterName,
IdTimestamp: n.MyInfo.IdTimestamp, // XXX ok?
NodeType: n.MyInfo.Type,
UUID: n.MyInfo.UUID,
Address: n.MyInfo.Addr,
ClusterName: n.ClusterName,
IdTimestamp: n.MyInfo.IdTimestamp, // XXX ok?
}
accept := &AcceptIdentification{}
err = conn.Ask(req, accept)
......
......@@ -22,8 +22,10 @@ package neo
import (
"bytes"
"context"
"fmt"
//"sync"
"sync"
"time"
)
// NodeTable represents known nodes in a cluster
......@@ -93,30 +95,39 @@ type Peer struct {
NodeInfo // .uuid, .addr, ...
// link to this peer
linkMu sync.Mutex
link *NodeLink // link to peer or nil if not connected
// linkErr error // dialing gave this error
dialT time.Time // dialing finished at this time
// linkReady chan struct{} // becomes ready after dial finishes; reinitialized at each redial
dialing *dialReady // dialer notifies waiters via this; reinitialized at each redial; nil while not dialing
linkMu sync.Mutex
link *NodeLink // link to peer or nil if not connected
dialT time.Time // dialing finished at this time
// dialer notifies waiters via this; reinitialized at each redial; nil while not dialing
//
// NOTE duplicates .link to have the following properties:
//
// 1. all waiters of current in-progress dial wakup immediately after
// dial completes and get link/error from dial result.
//
// 2. any .Connect() that sees .link=nil starts new redial with throttle
// to make sure peer is dialed not faster than δtRedial.
//
// (if we do not have dialing.link waiter will need to relock
// peer.linkMu and for some waiters chances are another .Connect()
// already started redialing and they will have to wait again)
dialing *dialed
}
type dialReady struct {
// dialed is result of dialing a peer.
type dialed struct {
link *NodeLink
err error
ready chan struct{}
}
// Connect returns link to this peer.
// Connect returns link to this peer. XXX -> DialLink ?
//
// If the link was not yet established Connect dials the peer appropriately,
// handshakes, requests identification and checks that identification reply is
// as expected.
func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) {
// XXX p.State != RUNNING
// XXX p.Addr != ""
p.linkMu.Lock()
// ok if already connected
......@@ -134,43 +145,71 @@ func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) {
return nil, ctx.Err()
case <-dialing.ready:
return dialed.link, dialed.err
return dialing.link, dialing.err
}
}
// otherwise this goroutine becomes responsible for (re)dialing the peer
dialing = &dialReady{ready: make(chan struct{})}
p.dialing = dialing
// start dialing - in singleflight
// XXX p.State != RUNNING
// XXX p.Addr != ""
dialT := p.dialT
dialing := &dialed{ready: make(chan struct{})}
p.dialing = dialing
p.linkMu.Unlock()
go func() {
// throttle redialing if too fast
δt := time.Now().Sub(dialT)
if δt < δtRedial && !dialT.IsZero() {
select {
case <-ctx.Done():
// XXX -> return nil, ctx.Err()
link, err := func() (*NodeLink, error) {
// throttle redialing if too fast
δt := time.Now().Sub(dialT)
if δt < δtRedial && !dialT.IsZero() {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(δtRedial - δt):
// ok
}
}
case <-time.After(δtRedial - δt):
// ok
var me *NodeCommon // XXX temp stub
conn0, accept, err := me.Dial(ctx, p.Type, p.Addr.String())
dialT = time.Now()
if err != nil {
return nil, err
}
}
conn0, accept, err := Dial(ctx, p.Type, p.Addr)
if err != nil {
// XXX -> return nil, err
}
link := conn0.Link()
// XXX accept.NodeType == p.Type
// XXX accept.MyUUID == p.UUID
// XXX accept.YourUUID == (what has been given us by master)
// XXX accept.Num{Partitions,Replicas} == (what is expected - (1,1) currently)
// verify peer identifies as what we expect
// XXX move to Dial?
switch {
case accept.NodeType != p.Type:
err = fmt.Errorf("connected, but peer is not %v (identifies as %v)", p.Type, accept.NodeType)
case accept.MyUUID != p.UUID:
err = fmt.Errorf("connected, but peer's uuid is not %v (identifies as %v)", p.UUID, accept.MyUUID)
case accept.YourUUID != me.MyInfo.UUID:
err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, me.MyInfo.UUID)
case !(accept.NumPartitions == 1 && accept.NumReplicas == 1):
err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.")
}
if err != nil {
//log.Iferr(ctx, link.Close())
lclose(ctx, link)
link = nil
}
return link, err
}()
p.linkMu.Lock()
p.link = link
p.linkErr = err
p.dialT = time.Now()
p.dialT = dialT
p.dialing = nil
p.linkMu.Unlock()
dialing.link = link
dialing.err = err
......
......@@ -22,54 +22,10 @@ package server
import (
"context"
"fmt"
"io"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// XXX -> task (and current task -> taskctx) ?
// running is syntactic sugar to push new task to operational stack, log it and
// adjust error return with task prefix.
//
// use like this:
//
// defer running(&ctx, "my task")(&err)
func running(ctxp *context.Context, name string) func(*error) {
return _running(ctxp, name)
}
// runningf is running cousin with formatting support
func runningf(ctxp *context.Context, format string, argv ...interface{}) func(*error) {
return _running(ctxp, fmt.Sprintf(format, argv...))
}
func _running(ctxp *context.Context, name string) func(*error) {
ctx := task.Running(*ctxp, name)
*ctxp = ctx
log.Depth(2).Info(ctx, "start")
return func(errp *error) {
if *errp != nil {
// XXX is it good idea to log to error here? (not in above layer)
// XXX what is error here could be not so error above
// XXX or we still want to log all errors - right?
log.Depth(1).Error(ctx, "## ", *errp) // XXX "::" temp
} else {
log.Depth(1).Info(ctx, "done")
}
// XXX do we need vvv if we log it anyway ^^^ ?
// NOTE not *ctxp here - as context pointed by ctxp could be
// changed when this deferred function is run
task.ErrContext(errp, ctx)
}
}
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
func lclose(ctx context.Context, c io.Closer) {
......
......@@ -276,7 +276,7 @@ func (p *RequestIdentification) neoMsgEncodedLen() int {
func (p *RequestIdentification) neoMsgEncode(data []byte) {
binary.BigEndian.PutUint32(data[0:], uint32(int32(p.NodeType)))
binary.BigEndian.PutUint32(data[4:], uint32(int32(p.NodeUUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32(p.UUID)))
{
l := uint32(len(p.Address.Host))
binary.BigEndian.PutUint32(data[8:], l)
......@@ -301,7 +301,7 @@ func (p *RequestIdentification) neoMsgDecode(data []byte) (int, error) {
goto overflow
}
p.NodeType = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
p.NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[4:])))
p.UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[4:])))
{
l := binary.BigEndian.Uint32(data[8:])
data = data[12:]
......@@ -342,10 +342,10 @@ func (p *AcceptIdentification) neoMsgEncodedLen() int {
func (p *AcceptIdentification) neoMsgEncode(data []byte) {
binary.BigEndian.PutUint32(data[0:], uint32(int32(p.NodeType)))
binary.BigEndian.PutUint32(data[4:], uint32(int32(p.MyNodeUUID)))
binary.BigEndian.PutUint32(data[4:], uint32(int32(p.MyUUID)))
binary.BigEndian.PutUint32(data[8:], p.NumPartitions)
binary.BigEndian.PutUint32(data[12:], p.NumReplicas)
binary.BigEndian.PutUint32(data[16:], uint32(int32(p.YourNodeUUID)))
binary.BigEndian.PutUint32(data[16:], uint32(int32(p.YourUUID)))
}
func (p *AcceptIdentification) neoMsgDecode(data []byte) (int, error) {
......@@ -353,10 +353,10 @@ func (p *AcceptIdentification) neoMsgDecode(data []byte) (int, error) {
goto overflow
}
p.NodeType = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
p.MyNodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[4:])))
p.MyUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[4:])))
p.NumPartitions = binary.BigEndian.Uint32(data[8:])
p.NumReplicas = binary.BigEndian.Uint32(data[12:])
p.YourNodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[16:])))
p.YourUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[16:])))
return 20, nil
overflow:
......
......@@ -29,7 +29,7 @@ import (
"github.com/golang/glog"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext/task"
)
// withTask prepends string describing current operational task stack to argv and returns it
......
......@@ -17,73 +17,50 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package task provides primitives to track tasks via contexts.
// Package task provides handy utilities to define & log tasks.
package task
import (
"context"
"fmt"
"context"
"fmt"
"lab.nexedi.com/kirr/go123/xerr"
taskctx "lab.nexedi.com/kirr/neo/go/xcommon/xcontext/task"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// Task represents currently running operation
type Task struct {
Parent *Task
Name string
}
type taskKey struct{}
// Running creates new task and returns new context with that task set to current
func Running(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, taskKey{}, &Task{Parent: Current(ctx), Name: name})
// Running is syntactic sugar to push new task to operational stack, log it and
// adjust error return with task prefix.
//
// use like this:
//
// defer task.Running(&ctx, "my task")(&err)
func Running(ctxp *context.Context, name string) func(*error) {
return running(ctxp, name)
}
// Runningf is Running cousin with formatting support
func Runningf(ctx context.Context, format string, argv ...interface{}) context.Context {
return Running(ctx, fmt.Sprintf(format, argv...))
}
// Current returns current task represented by context.
// if there is no current task - it returns nil.
func Current(ctx context.Context) *Task {
task, _ := ctx.Value(taskKey{}).(*Task)
return task
func Runningf(ctxp *context.Context, format string, argv ...interface{}) func(*error) {
return running(ctxp, fmt.Sprintf(format, argv...))
}
// ErrContext adds current task name to error on error return.
// To work as intended it should be called under defer like this:
//
// func myfunc(ctx, ...) (..., err error) {
// ctx = task.Running("doing something")
// defer task.ErrContext(&err, ctx)
// ...
//
// Please see lab.nexedi.com/kirr/go123/xerr.Context for semantic details.
func ErrContext(errp *error, ctx context.Context) {
task := Current(ctx)
if task == nil {
return
}
xerr.Context(errp, task.Name)
}
// String returns string representing whole operational stack.
//
// For example if task "c" is running under task "b" which in turn is running
// under task "a" - the operational stack will be "a: b: c".
//
// nil Task is represented as "".
func (t *Task) String() string {
if t == nil {
return ""
}
func running(ctxp *context.Context, name string) func(*error) {
ctx := taskctx.Running(*ctxp, name)
*ctxp = ctx
log.Depth(2).Info(ctx, "start")
prefix := t.Parent.String()
if prefix != "" {
prefix += ": "
}
return func(errp *error) {
if *errp != nil {
// XXX is it good idea to log to error here? (not in above layer)
// XXX what is error here could be not so error above
// XXX or we still want to log all errors - right?
log.Depth(1).Error(ctx, "## ", *errp) // XXX "::" temp
} else {
log.Depth(1).Info(ctx, "done")
}
return prefix + t.Name
// XXX do we need vvv if we log it anyway ^^^ ?
// NOTE not *ctxp here - as context pointed by ctxp could be
// changed when this deferred function is run
taskctx.ErrContext(errp, ctx)
}
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package task provides primitives to track tasks via contexts.
package task
import (
"context"
"fmt"
"lab.nexedi.com/kirr/go123/xerr"
)
// Task represents currently running operation
type Task struct {
Parent *Task
Name string
}
type taskKey struct{}
// Running creates new task and returns new context with that task set to current
func Running(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, taskKey{}, &Task{Parent: Current(ctx), Name: name})
}
// Runningf is Running cousin with formatting support
func Runningf(ctx context.Context, format string, argv ...interface{}) context.Context {
return Running(ctx, fmt.Sprintf(format, argv...))
}
// Current returns current task represented by context.
// if there is no current task - it returns nil.
func Current(ctx context.Context) *Task {
task, _ := ctx.Value(taskKey{}).(*Task)
return task
}
// ErrContext adds current task name to error on error return.
// To work as intended it should be called under defer like this:
//
// func myfunc(ctx, ...) (..., err error) {
// ctx = task.Running("doing something")
// defer task.ErrContext(&err, ctx)
// ...
//
// Please see lab.nexedi.com/kirr/go123/xerr.Context for semantic details.
func ErrContext(errp *error, ctx context.Context) {
task := Current(ctx)
if task == nil {
return
}
xerr.Context(errp, task.Name)
}
// String returns string representing whole operational stack.
//
// For example if task "c" is running under task "b" which in turn is running
// under task "a" - the operational stack will be "a: b: c".
//
// nil Task is represented as "".
func (t *Task) String() string {
if t == nil {
return ""
}
prefix := t.Parent.String()
if prefix != "" {
prefix += ": "
}
return prefix + t.Name
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment