Commit 42bc63f7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d46afb3e
......@@ -195,7 +195,7 @@ func (c *Client) withOperational(ctx context.Context) error {
// talkMaster connects to master, announces self and receives notifications.
// it tries to persist master link reconnecting as needed.
//
// XXX C -> M for commit
// XXX C -> M for commit (-> another channel)
//
// XXX always error (dup Storage.talkMaster) ?
func (c *Client) talkMaster(ctx context.Context) (err error) {
......
......@@ -27,7 +27,6 @@ import (
"context"
"crypto/sha1"
"io"
"math"
"net"
"reflect"
"testing"
......@@ -42,7 +41,6 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/neo/go/xcommon/xtesting"
......@@ -56,106 +54,83 @@ import (
"time"
)
// XXX dup from connection_test
func xwait(w interface { Wait() error }) {
err := w.Wait()
exc.Raiseif(err)
}
func gox(wg interface { Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf))
}
func xfs1stor(path string) *fs1.FileStorage {
zstor, err := fs1.Open(bg, path)
exc.Raiseif(err)
return zstor
}
var bg = context.Background()
// XXX tracer which can collect tracing events from net + TODO master/storage/etc...
// XXX naming
type MyTracer struct {
*xtesting.SyncTracer
}
// ---- events used in tests ----
func (t *MyTracer) TraceNetConnect(ev *xnet.TraceConnect) { t.Trace1(ev) }
func (t *MyTracer) TraceNetListen(ev *xnet.TraceListen) { t.Trace1(ev) }
func (t *MyTracer) TraceNetTx(ev *xnet.TraceTx) {} // { t.Trace1(ev) }
//type traceNeoRecv struct {conn *neo.Conn; msg neo.Msg}
//func (t *MyTracer) traceNeoConnRecv(c *neo.Conn, msg neo.Msg) { t.Trace1(&traceNeoRecv{c, msg}) }
// xnet.TraceConnect
// xnet.TraceListen
// tx via neo.Conn
type traceNeoSend struct {
// event: tx via neo.Conn
type eventNeoSend struct {
Src, Dst net.Addr
ConnID uint32
Msg neo.Msg
}
func (t *MyTracer) traceNeoMsgSendPre(l *neo.NodeLink, connID uint32, msg neo.Msg) {
t.Trace1(&traceNeoSend{l.LocalAddr(), l.RemoteAddr(), connID, msg})
}
// cluster state changed
type traceClusterState struct {
// event: cluster state changed
type eventClusterState struct {
Ptr *neo.ClusterState // pointer to variable which holds the state
State neo.ClusterState
}
func (t *MyTracer) traceClusterState(cs *neo.ClusterState) {
t.Trace1(&traceClusterState{cs, *cs})
}
func clusterState(cs *neo.ClusterState, v neo.ClusterState) *traceClusterState {
return &traceClusterState{cs, v}
func clusterState(cs *neo.ClusterState, v neo.ClusterState) *eventClusterState {
return &eventClusterState{cs, v}
}
// nodetab entry changed
type traceNode struct {
// event: nodetab entry changed
type eventNodeTab struct {
NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff
NodeInfo neo.NodeInfo
}
func (t *MyTracer) traceNode(nt *neo.NodeTable, n *neo.Node) {
t.Trace1(&traceNode{unsafe.Pointer(nt), n.NodeInfo})
}
// master ready to start changed
type traceMStartReady struct {
// event: master ready to start changed
type eventMStartReady struct {
Master unsafe.Pointer // *Master XXX not to noise test diff
Ready bool
}
func (t *MyTracer) traceMasterStartReady(m *Master, ready bool) {
t.Trace1(masterStartReady(m, ready))
func masterStartReady(m *Master, ready bool) *eventMStartReady {
return &eventMStartReady{unsafe.Pointer(m), ready}
}
func masterStartReady(m *Master, ready bool) *traceMStartReady {
return &traceMStartReady{unsafe.Pointer(m), ready}
// ----------------------------------------
// XXX tracer which can collect tracing events from net + TODO master/storage/etc...
// XXX naming
type MyTracer struct {
*xtesting.SyncChan
}
func (t *MyTracer) TraceNetConnect(ev *xnet.TraceConnect) { t.Send(ev) }
func (t *MyTracer) TraceNetListen(ev *xnet.TraceListen) { t.Send(ev) }
func (t *MyTracer) TraceNetTx(ev *xnet.TraceTx) {} // { t.Send(ev) }
// vclock is a virtual clock
// XXX place -> util?
type vclock struct {
t float64
//type traceNeoRecv struct {conn *neo.Conn; msg neo.Msg}
//func (t *MyTracer) traceNeoConnRecv(c *neo.Conn, msg neo.Msg) { t.Send(&traceNeoRecv{c, msg}) }
func (t *MyTracer) traceNeoMsgSendPre(l *neo.NodeLink, connID uint32, msg neo.Msg) {
t.Send(&eventNeoSend{l.LocalAddr(), l.RemoteAddr(), connID, msg})
}
func (c *vclock) monotime() float64 {
c.t += 1E-2
return c.t
func (t *MyTracer) traceClusterState(cs *neo.ClusterState) {
t.Send(&eventClusterState{cs, *cs})
}
func (c *vclock) tick() { // XXX do we need tick?
t := math.Ceil(c.t)
if !(t > c.t) {
t += 1
}
c.t = t
func (t *MyTracer) traceNode(nt *neo.NodeTable, n *neo.Node) {
t.Send(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
}
func (t *MyTracer) traceMasterStartReady(m *Master, ready bool) {
t.Send(masterStartReady(m, ready))
}
//trace:import "lab.nexedi.com/kirr/neo/go/neo"
// M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
func TestMasterStorage(t *testing.T) {
tracer := &MyTracer{xtesting.NewSyncTracer()}
tc := xtesting.NewTraceChecker(t, tracer.SyncTracer)
tracer := &MyTracer{xtesting.NewSyncChan()}
tc := xtesting.NewEventChecker(t, tracer.SyncChan)
net := pipenet.New("testnet") // test network
......@@ -203,8 +178,8 @@ func TestMasterStorage(t *testing.T) {
}
// shortcut for net tx event over nodelink connection
conntx := func(src, dst string, connid uint32, msg neo.Msg) *traceNeoSend {
return &traceNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg}
conntx := func(src, dst string, connid uint32, msg neo.Msg) *eventNeoSend {
return &eventNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg}
}
// shortcut for NodeInfo
......@@ -219,8 +194,8 @@ func TestMasterStorage(t *testing.T) {
}
// shortcut for nodetab change
node := func(x *neo.NodeApp, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtime neo.IdTime) *traceNode {
return &traceNode{
node := func(x *neo.NodeApp, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtime neo.IdTime) *eventNodeTab {
return &eventNodeTab{
NodeTab: unsafe.Pointer(x.NodeTab),
NodeInfo: nodei(laddr, typ, num, state, idtime),
}
......@@ -603,8 +578,8 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
M := NewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
tracer := &MyTracer{xtesting.NewSyncTracer()}
tc := xtesting.NewTraceChecker(b, tracer.SyncTracer)
tracer := &MyTracer{xtesting.NewSyncChan()}
tc := xtesting.NewEventChecker(b, tracer.SyncChan)
pg := &tracing.ProbeGroup{}
tracing.Lock()
pnode := neo_traceNodeChanged_Attach(nil, tracer.traceNode)
......@@ -616,10 +591,10 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
})
// determing M serving address XXX better with M api
ev := tracer.Get1()
mnode, ok := ev.Event.(*traceNode)
ev := tracer.Recv()
mnode, ok := ev.Event.(*eventNodeTab)
if !ok {
b.Fatal("after M start: got %T ; want traceNode", ev.Event)
b.Fatal("after M start: got %T ; want eventNodeTab", ev.Event)
}
Maddr := mnode.NodeInfo.Addr.String()
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package server
import (
"context"
"math"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/go123/exc"
)
// XXX dup from connection_test
func xwait(w interface { Wait() error }) {
err := w.Wait()
exc.Raiseif(err)
}
func gox(wg interface { Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf))
}
func xfs1stor(path string) *fs1.FileStorage {
zstor, err := fs1.Open(bg, path)
exc.Raiseif(err)
return zstor
}
var bg = context.Background()
// vclock is a virtual clock
type vclock struct {
t float64
}
func (c *vclock) monotime() float64 {
c.t += 1E-2
return c.t
}
func (c *vclock) tick() { // XXX do we need tick?
t := math.Ceil(c.t)
if !(t > c.t) {
t += 1
}
c.t = t
}
......@@ -21,89 +21,82 @@
package xtesting
import (
"fmt"
"reflect"
"strings"
"testing"
"github.com/kylelemons/godebug/pretty"
)
// XXX move -> tracing
// TODO tests for this
// SyncChan provides synchronous channel with additional property that send
// blocks until receiving side explicitly acknowledges message was received and
// processed.
//
// New channels must be created via NewSyncChan.
//
// It is safe to use SyncChan from multiple goroutines simultaneously.
type SyncChan struct {
msgq chan *SyncMsg
}
// XXX Tracer interface {Trace1} ?
// Send sends event to a consumer and waits for ack.
func (ch *SyncChan) Send(event interface{}) {
ack := make(chan struct{})
ch.msgq <- &SyncMsg{event, ack}
<-ack
}
// SyncTracer provides base infrastructure for synchronous tracing
// Recv receives message from a producer.
//
// Tracing events from several sources could be collected and sent for consumption via 1 channel.
// For each event the goroutine which produced it will wait for ack before continue.
type SyncTracer struct {
tracech chan *SyncTraceMsg
// The consumer, after dealing with the message, must send back an ack.
func (ch *SyncChan) Recv() *SyncMsg {
msg := <-ch.msgq
return msg
}
// SyncTraceMsg represents message with 1 synchronous tracing communication.
// SyncMsg represents message with 1 event sent over SyncChan.
//
// the goroutine which produced the message will wait for Ack before continue.
type SyncTraceMsg struct {
// The goroutine which sent the message will wait for Ack before continue.
type SyncMsg struct {
Event interface {}
ack chan<- struct{}
}
// Ack acknowledges the event was processed and unpauses producer goroutine.
func (m *SyncTraceMsg) Ack() {
// Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *SyncMsg) Ack() {
close(m.ack)
}
// XXX doc
func NewSyncTracer() *SyncTracer {
return &SyncTracer{tracech: make(chan *SyncTraceMsg)}
}
// Trace1 sends message with 1 tracing event to a consumer and waits for ack
func (st *SyncTracer) Trace1(event interface{}) {
ack := make(chan struct{})
//fmt.Printf("trace: send: %T %v\n", event, event)
st.tracech <- &SyncTraceMsg{event, ack}
<-ack
}
// Get1 receives message with 1 tracing event from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
func (st *SyncTracer) Get1() *SyncTraceMsg {
msg := <-st.tracech
//fmt.Printf("trace: get1: %T %v\n", msg.Event, msg.Event)
return msg
// NewSyncChan creates new SyncChan channel.
func NewSyncChan() *SyncChan {
return &SyncChan{msgq: make(chan *SyncMsg)}
}
// ----------------------------------------
// XXX naming -> SyncTraceChecker
// TraceChecker synchronously collects and checks tracing events from a SyncTracer
type TraceChecker struct {
// EventChecker is testing utility to verify events coming from a SyncChan are as expected.
type EventChecker struct {
t testing.TB
st *SyncTracer
in *SyncChan
}
// XXX doc
func NewTraceChecker(t testing.TB, st *SyncTracer) *TraceChecker {
return &TraceChecker{t: t, st: st}
// NewEventChecker constructs new EventChecker that will retrieve events from
// `in` and use `t` for tests reporting.
func NewEventChecker(t testing.TB, in *SyncChan) *EventChecker {
return &EventChecker{t: t, in: in}
}
// get1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
// XXX merge back to expect1 ?
func (tc *TraceChecker) xget1(eventp interface{}) *SyncTraceMsg {
tc.t.Helper()
msg := tc.st.Get1()
func (evc *EventChecker) xget1(eventp interface{}) *SyncMsg {
evc.t.Helper()
msg := evc.in.Recv()
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
tc.t.Fatalf("expect: %s: got %#v", reventp.Elem().Type(), msg.Event)
evc.t.Fatalf("expect: %s: got %#v", reventp.Elem().Type(), msg.Event)
}
// *eventp = msg.Event
......@@ -113,35 +106,56 @@ func (tc *TraceChecker) xget1(eventp interface{}) *SyncTraceMsg {
}
// expect1 asks checker to expect next event to be eventExpect (both type and value)
// if checks do not pass - fatal testing error is raised
// XXX merge back to expect?
func (tc *TraceChecker) expect1(eventExpect interface{}) {
tc.t.Helper()
//
// if checks do not pass - fatal testing error is raised.
func (evc *EventChecker) expect1(eventExpect interface{}) *SyncMsg {
evc.t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
reventp := reflect.New(reventExpect.Type())
msg := tc.xget1(reventp.Interface())
msg := evc.xget1(reventp.Interface())
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
tc.t.Fatalf("expect: %s:\nwant: %v\nhave: %v\ndiff: %s",
evc.t.Fatalf("expect: %s:\nwant: %v\nhave: %v\ndiff: %s",
reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface()))
}
return msg
}
// Expect asks checker to receive next event and verify it to be equal to expected.
//
// If check is successful ACK is sent back to event producer.
// If check does not pass - fatal testing error is raised.
func (evc *EventChecker) Expect(expected interface{}) {
evc.t.Helper()
msg := evc.expect1(expected)
msg.Ack()
}
// Expect asks checker to expect next series of events to be from eventExpectV in specified order
func (tc *TraceChecker) Expect(eventExpectV ...interface{}) {
tc.t.Helper()
// ExpectNoACK asks checker to receive next event and verify it to be equal to
// expected without sending back ACK.
//
// No ACK is sent back to event producer - the caller becomes responsible to
// send ACK back by itself.
//
// If check does not pass - fatal testing error is raised.
func (evc *EventChecker) ExpectNoACK(expected interface{}) *SyncMsg {
evc.t.Helper()
for _, eventExpect := range eventExpectV {
tc.expect1(eventExpect)
}
msg := evc.expect1(expected)
return msg
}
// XXX goes away? (if there is no happens-before for events - just check them one by one in dedicated goroutines ?)
/*
// ExpectPar asks checker to expect next series of events to be from eventExpectV in no particular order
// XXX naming
func (tc *TraceChecker) ExpectPar(eventExpectV ...interface{}) {
......@@ -170,3 +184,4 @@ loop:
tc.t.Fatalf("expect:\nhave: %T %v\nwant: [%v]", msg.Event, msg.Event, strings.Join(strv, " | "))
}
}
*/
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment