Commit 2fbaf03d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d30cd186
......@@ -21,11 +21,14 @@ package neo
import (
"context"
"errors"
"fmt"
"io/ioutil"
stdnet "net"
"net/url"
"os"
"os/exec"
"strings"
"testing"
"time"
......@@ -34,6 +37,10 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
bsqlite "lab.nexedi.com/kirr/neo/go/neo/storage/sqlite"
)
// NEOSrv represents running NEO server.
......@@ -207,8 +214,15 @@ func (n *NEOPySrv) Close() (err error) {
//
// Create it with StartNEOGoSrv.
type NEOGoSrv struct {
// XXX
opt NEOSrvOptions // server options
opt NEOSrvOptions // server options
cancel func() // to stop spawned nodes
serveWG *xsync.WorkGroup // services are spawned under serveWG
Ml stdnet.Listener // M listens here
Sl stdnet.Listener // S listens here
M *Master // M service
S *Storage // S service
Sback *bsqlite.Backend // S backend
}
func (_ *NEOGoSrv) Bugs() []string {
......@@ -218,12 +232,88 @@ func (_ *NEOGoSrv) Bugs() []string {
// StartNEOGoSrv starts NEO/go server specified by options.
func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) {
defer xerr.Contextf(&err, "start neo/go %s/%s", opt.workdir, opt.name)
return nil, fmt.Errorf("TODO")
// XXX release resources on error
ctx, cancel := context.WithCancel(context.Background())
serveWG := xsync.NewWorkGroup(ctx)
n := &NEOGoSrv{opt: opt, cancel: cancel, serveWG: serveWG}
defer func() {
if err != nil {
n.Close() // ignore err
}
}()
// FIXME tune glog to write logs into workdir
net := xnet.NetPlain("tcp") // FIXME
n.Ml, err = net.Listen(""); if err != nil { return nil, err }
n.Sl, err = net.Listen(""); if err != nil { return nil, err }
n.M = NewMaster(opt.name, net)
serveWG.Go(func(ctx context.Context) error {
return n.M.Run(ctx, n.Ml)
})
n.Sback, err = bsqlite.Open(fmt.Sprintf("%s/%s.sqlite", opt.workdir, opt.name))
if err != nil {
return nil, err
}
n.S = NewStorage(opt.name, n.Ml.Addr().String(), net, n.Sback)
serveWG.Go(func(ctx context.Context) error {
return n.S.Run(ctx, n.Sl)
})
// wait till spawned NEO cluster can become operational
// XXX better use tracing and wait for traceMasterStartReady event
for {
if err := ctx.Err(); err != nil {
return nil, err
}
err = n.M.Start()
if err != nil {
if !strings.HasSuffix(err.Error(), "start: cluster is non-operational") { // XXX
return nil, err
}
}
time.Sleep(10*time.Millisecond)
}
return n, nil
}
func (n *NEOGoSrv) Close() (err error) {
defer xerr.Contextf(&err, "stop neo/go %s", n.opt.workdir)
panic("TODO")
n.cancel()
err = n.serveWG.Wait()
if errors.Is(err, context.Canceled) {
err = nil // cancel is expected
}
// ownership of Ml is transferred to M when M starts; M closes its l on stop.
if n.M == nil && n.Ml != nil {
__ := n.Ml.Close()
err = xerr.First(err, __)
}
// ----//---- for Sl and S
if n.S == nil && n.Sl != nil {
__ := n.Ml.Close()
err = xerr.First(err, __)
}
if n.Sback != nil {
__ := n.Sback.Close() // XXX should be in Storage?
err = xerr.First(err, __)
}
return err
}
func (n *NEOGoSrv) ZUrl() string {
......@@ -257,7 +347,7 @@ func withNEOSrv(t *testing.T, f func(t *testing.T, nsrv NEOSrv), optv ...tOption
t.Helper()
X := xtesting.FatalIf(t)
work, err := ioutil.TempDir("", "neo"); X(err)
defer os.RemoveAll(work)
//defer os.RemoveAll(work)
f(work)
}
......@@ -332,11 +422,11 @@ func withNEOSrv(t *testing.T, f func(t *testing.T, nsrv NEOSrv), optv ...tOption
// start NEO/py first. We need it to create the
// database and to preload it, because NEO/go
// does not support commit.
// does not currently support commit.
npy := startNEOpy(t, workdir)
err := npy.Close(); X(err)
// now, as the database is created and preloaded, start NEO/go
// start NEO/go, as the database is created and preloaded
ngo, err := StartNEOGoSrv(neoOpt); X(err)
defer func() {
err := ngo.Close(); X(err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment