Commit cc8de1a8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e665df91
......@@ -621,29 +621,30 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
}
cred := u.User.String()
x, err := parseQuery(cred)
if err != nil {
return nil, zodb.InvalidTid, fmt.Errorf("credentials: %s", err)
}
// xpop pops k from credentials, defaulting to $NEO_<K> if envok.
xpop := func(k string, envok bool) string {
v, ok := x[k]
if !ok && envok {
v = os.Getenv("NEO_"+strings.ToUpper(k))
}
delete(x, k)
return v
}
lonode := xpop("lonode", false)
if !ssl {
if cred != "" {
if len(x) != 0 {
return nil, zodb.InvalidTid, fmt.Errorf("credentials can be specified only with neos:// scheme")
}
} else {
x, err := parseQuery(cred)
if err != nil {
return nil, zodb.InvalidTid, fmt.Errorf("credentials: %s", err)
}
// xpop pops k from credentials, defaultin to NEO_<K>
xpop := func(k string) string {
v, ok := x[k]
if !ok {
v = os.Getenv("NEO_"+strings.ToUpper(k))
}
delete(x, k)
return v
}
ca = xpop("ca")
cert = xpop("cert")
key = xpop("key")
ca = xpop("ca", true)
cert = xpop("cert", true)
key = xpop("key", true)
if len(x) != 0 {
return nil, zodb.InvalidTid, fmt.Errorf("invalid credentials: %v", x)
......@@ -686,6 +687,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
}
c := NewClient(name, u.Host, net)
c.ownNet = true
c.watchq = opt.Watchq
defer func() {
if err != nil {
......
......@@ -63,12 +63,13 @@ func masterMain(argv []string) {
prog.Exit(2)
}
net, err := netSetup()
ctx := context.Background()
net, err := netSetup(ctx)
if err != nil {
prog.Fatal(err)
}
ctx := context.Background()
defer net.Close() // XXX err
err = listenAndServe(ctx, net, *bind, func(ctx context.Context, l xnet.Listener) error {
master := neo.NewMaster(*cluster, net)
......
......@@ -32,6 +32,8 @@ import (
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/lonet"
"lab.nexedi.com/kirr/go123/xstrings"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/neo/internal/xtls"
......@@ -43,13 +45,15 @@ import (
// netFlags installs common network flags and returns function to setup network
// and create selected networker.
func netFlags(flags *flag.FlagSet) /*netSetup*/ func() (xnet.Networker, error) {
func netFlags(flags *flag.FlagSet) (netSetup func(context.Context) (xnet.Networker, error)) {
// XXX also support $NEO_<K> envvars?
fca := flags.String("ca", "", "path to CA certificate")
fcert := flags.String("cert", "", "path to node certificate")
fkey := flags.String("key", "", "path to node private key")
return func() (_ xnet.Networker, err error) {
flonode := flags.String("lonode", "", "<net>/<host> for this node on lonet network")
return func(ctx context.Context) (_ xnet.Networker, err error) {
defer xerr.Contextf(&err, "network setup")
ca := *fca
......@@ -60,7 +64,34 @@ func netFlags(flags *flag.FlagSet) /*netSetup*/ func() (xnet.Networker, error) {
return nil, fmt.Errorf("incomplete ca/cert/key provided")
}
net := xnet.NetPlain("tcp") // TODO not only "tcp" ?
var net xnet.Networker
defer func() {
if err != nil && net != nil {
net.Close() // ignore err
}
}()
lonode := *flonode
if lonode == "" {
net = xnet.NetPlain("tcp") // TODO not only "tcp" ?
} else {
defer xerr.Contextf(&err, "lonode %s", lonode)
netname, hostname, err := xstrings.HeadTail(lonode, "/")
if err != nil {
return nil, fmt.Errorf("invalid lonode")
}
network, err := lonet.Join(ctx, netname)
if err != nil {
return nil, err
}
host, err := network.NewHost(ctx, hostname)
if err != nil {
network.Close() // ignore err
return nil, err
}
net = host // XXX also need to close network on host close
}
if ssl {
tlsCfg, err := xtls.ConfigForP2P(ca, cert, key)
if err != nil {
......
......@@ -105,10 +105,11 @@ func storageMain(argv []string) {
prog.Fatal(err)
}
net, err := netSetup()
net, err := netSetup(ctx)
if err != nil {
prog.Fatal(err)
}
defer net.Close() // XXX err
err = listenAndServe(ctx, net, *bind, func(ctx context.Context, l xnet.Listener) error {
stor := neo.NewStorage(*cluster, master, net, back)
......
......@@ -31,13 +31,11 @@ import (
"github.com/kylelemons/godebug/pretty"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
"lab.nexedi.com/kirr/go123/exc"
// "lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/pipenet"
"lab.nexedi.com/kirr/go123/xsync"
......@@ -344,10 +342,10 @@ func tracetestMasterStorage(t0 *tracetest.T) {
// C loads every other {<,=}serial:oid - established link is reused
ziter := zstor.Iterate(bg, 0, zodb.TidMax)
// XXX hack: disable tracing early so that C.Load() calls do not deadlock
// XXX hack: disable tracing early so that C.Load() calls do not deadlock NOTE
// TODO refactor cluster creation into func
// TODO move client all loading tests into separate test where tracing will be off
t.gotracer.Detach()
t.TraceOff()
for {
_, dataIter, err := ziter.NextTxn(bg)
......@@ -421,98 +419,102 @@ func (d tdispatch1) Dispatch(event interface{}) {
}
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
return
// XXX restore
// // create test cluster <- XXX factor to utility func
// zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
//
// ctx, cancel := context.WithCancel(context.Background())
// wg := xsync.NewWorkGroup(ctx)
// defer wg.Wait()
// defer cancel()
//
// // spawn M
// M := tNewMaster("abc1", "", Mnet)
//
// // XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
// cG := tracetest.NewChan("main")
// tG := tracetest.NewEventChecker(b, nil /* XXX */, cG)
//
// tracer := NewTraceCollector(tdispatch1{cG})
// tracer.RegisterNode(M.node, "m")
//
// tracing.Lock()
// pnode := traceNodeChanged_Attach(nil, tracer.traceNode)
// traceMasterStartReady_Attach(tracer.pg, tracer.traceMasterStartReady)
// tracing.Unlock()
//
// wg.Go(func(ctx context.Context) error {
// return M.Run(ctx)
// })
//
// // determining M serving address XXX better with M api
// ev := cG.Recv()
// mnode, ok := ev.Event.(*eventNodeTab)
// if !ok {
// b.Fatalf("after M start: got %T ; want eventNodeTab", ev.Event)
// }
// Maddr := mnode.NodeInfo.Addr.String()
//
// tracing.Lock()
// pnode.Detach()
// tracing.Unlock()
// ev.Ack()
//
// // now after we know Maddr create S & C and start S serving
// S := tNewStorage("abc1", Maddr, "", Snet, zback)
// wg.Go(func(ctx context.Context) error {
// return S.Run(ctx)
// })
//
// create test cluster <- XXX factor to utility func
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
t := tStartSimpleNEOGoSrv("abc1", zback)
defer t.Stop()
/*
ctx, cancel := context.WithCancel(context.Background())
wg := xsync.NewWorkGroup(ctx)
defer wg.Wait()
defer cancel()
// spawn M
M := tNewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
cG := tracetest.NewChan("main")
tG := tracetest.NewEventChecker(b, nil , cG)
tracer := NewTraceCollector(tdispatch1{cG})
tracer.RegisterNode(M.node, "m")
tracing.Lock()
pnode := traceNodeChanged_Attach(nil, tracer.traceNode)
traceMasterStartReady_Attach(tracer.pg, tracer.traceMasterStartReady)
tracing.Unlock()
wg.Go(func(ctx context.Context) error {
return M.Run(ctx)
})
// determining M serving address XXX better with M api
ev := cG.Recv()
mnode, ok := ev.Event.(*eventNodeTab)
if !ok {
b.Fatalf("after M start: got %T ; want eventNodeTab", ev.Event)
}
Maddr := mnode.NodeInfo.Addr.String()
tracing.Lock()
pnode.Detach()
tracing.Unlock()
ev.Ack()
// now after we know Maddr create S & C and start S serving
S := tNewStorage("abc1", Maddr, "", Snet, zback)
wg.Go(func(ctx context.Context) error {
return S.Run(ctx)
})
*/
// C := NewClient("abc1", Maddr, Cnet)
// wg.Go(func(ctx context.Context) error {
// return C.Run(ctx)
// })
//
// // command M to start
// tG.Expect(masterStartReady("m", true)) // <- XXX better with M api
// tracer.Detach()
//
// err := M.Start()
// if err != nil {
// b.Fatal(err)
// }
//
// xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
//
// obj1, err := zback.Load(ctx, xid1)
t.TraceOff()
err := M.Start(); t.FatalIf(err)
// if err != nil {
// b.Fatal(err)
// }
// buf1, serial1 := obj1.Data, obj1.Serial
//
// // C.Load(xid1)
// xcload1 := func() {
// cbuf1, cserial1, err := C.Load(ctx, xid1)
// if err != nil {
// b.Fatal(err)
// }
//
// if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
// b.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
// }
//
// cbuf1.Release()
// }
//
// // do first C.Load - this also implicitly waits for M & S to come up
// // and C to connect to M and S.
// xcload1()
//
// // now start the benchmark
// b.ResetTimer()
//
// benchit(xcload1)
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
obj1, err := zback.Load(ctx, xid1)
if err != nil {
b.Fatal(err)
}
buf1, serial1 := obj1.Data, obj1.Serial
// C.Load(xid1)
xcload1 := func() {
cbuf1, cserial1, err := C.Load(ctx, xid1)
if err != nil {
b.Fatal(err)
}
if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
b.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
}
cbuf1.Release()
}
// do first C.Load - this also implicitly waits for M & S to come up
// and C to connect to M and S.
xcload1()
// now start the benchmark
b.ResetTimer()
benchit(xcload1)
}
func benchmarkGetObjectSerial(b *testing.B, Mnet, Snet, Cnet xnet.Networker) {
......
......@@ -31,16 +31,19 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb"
)
// tCluster is a test NEO cluster NEO.
// tCluster is a test NEO cluster.
//
// Create it with tNewCluster.
// Create nodes with .NewMaster, .NewStorage and .NewClient.
//
// XXX text about events tracing
type tCluster struct {
*tracetest.T // original testing env this cluster was created at
......@@ -65,16 +68,16 @@ type tNode struct {
// XXX stub
type ITestMaster interface {
Run(ctx context.Context) error
// Run(ctx context.Context) error
Start() error
}
type ITestStorage interface {
Run(ctx context.Context) error
// Run(ctx context.Context) error
}
type ITestClient interface {
Run(ctx context.Context) error
// Run(ctx context.Context) error
zodb.IStorageDriver
}
......@@ -82,7 +85,8 @@ type ITestClient interface {
// tNewCluster creates new NEO test cluster.
//
// At the end of the test the cluster has to be stopped via t.Stop().
// TODO add options describeing whether node type X should be created via Go or via Py.
//
// TODO add options describing whether node type X should be created via Go or via Py.
func tNewCluster(ttest *tracetest.T, name string) *tCluster {
t := &tCluster{
T: ttest,
......@@ -111,8 +115,7 @@ func tNewCluster(ttest *tracetest.T, name string) *tCluster {
// After Stop completes all processes of the cluster are stopped.
func (t *tCluster) Stop() {
t.Helper()
t.gotracer.Detach()
//XXX t.pytracer.Detach()
t.TraceOff()
t.runCancel()
err := t.runWG.Wait()
......@@ -124,6 +127,13 @@ func (t *tCluster) Stop() {
}
}
// TraceOff disables tracing.
// Trace events will stop to be created and delivered.
func (t *tCluster) TraceOff() {
t.gotracer.Detach()
//XXX t.pytracer.Detach()
}
// Checker returns tracechecker corresponding to name.
//
// name might be of "node" or "node1-node2" kind. XXX more text
......@@ -283,3 +293,72 @@ func (m *tMaster) Run(ctx context.Context) error {
return m.Master.Run(ctx, l)
}
// --------
// tStartSimpleNEOGoSrv starts simple NEO/go server with 1 master and 1 storage.
// The cluster is returned in ready-to-start.
func tStartSimpleNEOGoSrv(name string, Sback storage.Backend) *tCluster {
t := tNewCluster(name)
M := t.NewMaster("m")
// XXX if we would always use lonet or pipenet - all network addresses would be predictable
// M starts listening - determine its listen address
t.Expect("m", netlisten("xxx")) // XXX xxx will be the address
t.Expect("m", δnode("m", Maddr, proto.MASTER, 1, proto.RUNNING, proto.IdTimeNone))
t.Expect("m", clusterState("m", proto.ClusterRecovering))
S := t.NewStorage("s", Maddr, Sback)
// S starts listening
t.Expect("s", netlisten("yyy")) // XXX yyy will be the address
// S connects to M
t.Expect("s-m", netdial("s", Maddr))
t.Expect("s-m", netconnect("sXXX", "mYYY", Maddr))
t.Expect("s-m", conntx("s:2", "m:2", 1, &proto.RequestIdentification{
NodeType: proto.STORAGE,
UUID: 0,
Address: xnaddr("s:1"),
ClusterName: "abc1",
IdTime: proto.IdTimeNone,
DevPath: nil,
NewNID: nil,
}))
t.Expect("m", δnode("m", "s:1", proto.STORAGE, 1, proto.PENDING, 0.01))
t.Expect("s-m", conntx("m:2", "s:2", 1, &proto.AcceptIdentification{
NodeType: proto.MASTER,
MyUUID: proto.UUID(proto.MASTER, 1),
YourUUID: proto.UUID(proto.STORAGE, 1),
}))
// M starts recovery on S
t.Expect("m-s", conntx("m:2", "s:2", 0, &proto.Recovery{}))
t.Expect("m-s", conntx("s:2", "m:2", 0, &proto.AnswerRecovery{
// empty new node
PTid: 0,
BackupTid: proto.INVALID_TID,
TruncateTid: proto.INVALID_TID,
}))
t.Expect("m-s", conntx("m:2", "s:2", 2, &proto.AskPartitionTable{}))
t.Expect("m-s", conntx("s:2", "m:2", 2, &proto.AnswerPartitionTable{
PTid: 0,
NumReplicas: 0,
RowList: []proto.RowInfo{},
}))
// M ready to start: new cluster, no in-progress S recovery
t.Expect("m", masterStartReady("m", true))
err := M.Start()
if err != nil {
t.Fatal(err)
}
return t
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment