...
 
Commits (2924)
This diff is collapsed.
? random access -> mmap
- scheduler won't use free P if a G is taking time and other Gs are queued after it on the same P:
https://groups.google.com/forum/#!topic/golang-nuts/8KYER1ALelg
( not quite so exactly. The scheduler is stealing one, but if GC wants to
stop the world, and one goroutine persists looping, others won't be run until
the looping one parks and GC performs.
From this point of view the above issues is actually about
https://github.com/golang/go/issues/10958 )
e870f06c (runtime: yield time slice to most recently readied G)
ea0386f8 (runtime: improve randomized stealing logic)
4bb491b1 (runtime: improve scheduler fairness)
bc31bccc (runtime: preempt long-running goroutines)
179d41fe (runtime: tune P retake logic)
fb6f8a96 (runtime: remove unnecessary wakeups of worker threads)
/s steal
findrunnable
runqsteal
? runqgrab
schedule <- top entry
868c8b37 (runtime: only sleep before stealing work from a running P)
b75b4d0e (runtime: skip netpoll check if there are no waiters)
-> stopm() // stop and restart m after waiting for work
notesleep(m.park)
notewakeup
startm() (calls notewakeup(m.park)
? wakep() (calls startm(nil, ...)
? startlockedm
https://golang.org/s/go11sched -> syscall/M partking and Unparking/Spinning
runtime/HACKING.md
----------------------------------------
- TODO stats for events (packets received, errors, etc)
- x/net/trace to trace requests and connection logging (and packets ?)
- packet log; general log -> glog ?
-> http://opentracing.io/
- gRPC eventually instead of hand-made protocol ?
go.leveldb - study
go.groupcache - study
groupcache -> consistenthash (-> fork-splitted)
module lab.nexedi.com/kirr/neo/go
go 1.14
require (
github.com/DataDog/czlib v0.0.0-20210322182103-8087f4e14ae7
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/cznic/strutil v0.0.0-20181122101858-275e90344537
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.1
github.com/golang/glog v1.0.0
github.com/golang/protobuf v1.4.3 // indirect
github.com/google/go-cmp v0.5.4 // indirect
github.com/gwenn/gosqlite v0.0.0-20211101095637-b18efb2e44c8
github.com/gwenn/yacr v0.0.0-20211101095056-492fb0c571bc // indirect
github.com/kisielk/og-rek v1.2.0
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0
github.com/philhofer/fwd v1.1.1
github.com/pkg/errors v0.9.1
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/shamaton/msgpack v1.2.1
github.com/soheilhy/cmux v0.1.5
github.com/someonegg/gocontainer v1.0.0
github.com/stretchr/testify v1.7.0
github.com/tinylib/msgp v1.1.6
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 // indirect
golang.org/x/net v0.0.0-20211111160137-58aab5ef257a // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect
google.golang.org/grpc v1.36.0 // indirect
google.golang.org/grpc/examples v0.0.0-20210301210255-fc8f38cccf75 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
lab.nexedi.com/kirr/go123 v0.0.0-20211124154638-01e8697d1901
)
This diff is collapsed.
......@@ -38,7 +38,7 @@ func Running(ctxp *context.Context, name string) func(*error) {
return running(ctxp, name)
}
// Runningf is Running cousin with formatting support
// Runningf is Running with formatting support.
func Runningf(ctxp *context.Context, format string, argv ...interface{}) func(*error) {
return running(ctxp, fmt.Sprintf(format, argv...))
}
......@@ -46,17 +46,40 @@ func Runningf(ctxp *context.Context, format string, argv ...interface{}) func(*e
func running(ctxp *context.Context, name string) func(*error) {
ctx := taskctx.Running(*ctxp, name)
*ctxp = ctx
log.Depth(2).Info(ctx, "[") // TODO log -> trace, don't put ":" before "["
traceBegin(3, ctx)
return func(errp *error) {
err := ""
if e := *errp; e != nil {
err = fmt.Sprintf(" (%s)", e)
}
log.Depth(1).Info(ctx, "]"+err) // TODO log -> trace, don't put ":" before "]"
traceEnd(2, ctx, *errp) // XXX recheck 2
// NOTE not *ctxp here - as context pointed by ctxp could be
// changed when this deferred function is run
taskctx.ErrContext(errp, ctx)
}
}
// XXX merge TraceBegin+TraceEnd into just -> `defer Trace(ctx)(&err)` ?
// (can log return lineno more precisely)
// TraceBegin traces beginning of a task.
func TraceBegin(ctx context.Context) {
traceBegin(2, ctx)
}
// TraceEnd traces end of a task.
func TraceEnd(ctx context.Context, err error) {
traceEnd(2, ctx, err)
}
func traceBegin(depth int, ctx context.Context) {
log.Depth(depth).Info(ctx, "[") // TODO log -> trace, don't put ":" before "["
}
func traceEnd(depth int, ctx context.Context, err error) {
e := ""
if err != nil {
e = fmt.Sprintf(" (%s)", err)
}
log.Depth(depth).Info(ctx, "]"+e) // TODO log -> trace, don't put ":" before "]"
}
......@@ -39,6 +39,7 @@ func (t *Task) Name() string { return t.name }
type taskKey struct{}
// Running creates new task and returns new context with that task set to current.
// XXX -> New?
func Running(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, taskKey{}, &Task{parent: Current(ctx), name: name})
}
......
......@@ -17,46 +17,60 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xcontext provides addons to std package context.
// Package xcontext is staging place for go123/xcontext.
package xcontext
import (
"context"
"io"
"errors"
)
// Cancelled reports whether an error is due to a canceled context.
//
// Since both cancellation ways - explicit and due to exceeding context
// deadline - result in the same signal to work being done under context,
// Canceled treats both context.Canceled and context.DeadlineExceeded as errors
// indicating context cancellation.
//
// XXX naming -> IsCanceled?
func Canceled(err error) bool {
switch {
case err == nil:
return false
case errors.Is(err, context.Canceled):
return true
case errors.Is(err, context.DeadlineExceeded):
return true
}
return false
}
// WithCloseOnErrCancel closes c on ctx cancel while f is run, or if f returns with an error.
//
// It is usually handy to propagate cancellation to interrupt IO.
func WithCloseOnErrCancel(ctx context.Context, c io.Closer, f func() error) (err error) {
closed := false
fdone := make(chan error)
defer func() {
<-fdone // wait for f to complete
if err != nil {
if !closed {
c.Close()
}
/*
// WhenDone arranges for f to be called either when ctx is cancelled or
// surrounding function returns.
//
// To work as intended it should be called under defer like this:
//
// func myfunc(ctx, ...) {
// defer xcontext.WhenDone(ctx, func() { ... })()
//
// XXX -> use WithCloseOn{Err,Ret}Cancel instead?
func WhenDone(ctx context.Context, f func()) func() {
done := make(chan struct{})
go func() {
select {
case <-ctx.Done():
// ok
case <-done:
// ok
}
}()
go func() (err error) {
defer func() {
fdone <- err
close(fdone)
}()
return f()
f()
}()
select {
case <-ctx.Done():
c.Close() // interrupt IO
closed = true
return ctx.Err()
case err := <-fdone:
return err
return func() {
close(done)
}
}
*/
......@@ -18,6 +18,7 @@
// See https://www.nexedi.com/licensing for rationale and options.
// Package xexec complements stdlib package os/exec.
// TODO move -> go123
package xexec
import (
......@@ -43,6 +44,10 @@ func Command(name string, argv ...string) *Cmd {
}
// XXX Cmd.CombinedOutput
// XXX Cmd.Output
// XXX Cmd.Run
// Start is similar to exec.Command.Start - it starts the specified command.
// Started command will be signalled with SIGTERM upon ctx cancel.
func (cmd *Cmd) Start(ctx context.Context) error {
......@@ -55,7 +60,7 @@ func (cmd *Cmd) Start(ctx context.Context) error {
go func() {
select {
case <-ctx.Done():
cmd.Process.Signal(syscall.SIGTERM)
cmd.Process.Signal(syscall.SIGTERM) // XXX err
case <-cmd.done:
// ok
......
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -17,11 +17,16 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xio provides addons to standard package io.
// Package xio is staging place for go123/xio.
package xio
import (
"context"
"errors"
"io"
"lab.nexedi.com/kirr/neo/go/internal/log"
// "lab.nexedi.com/kirr/neo/go/internal/xcontext"
)
......@@ -40,3 +45,101 @@ func EOFok(err error) error {
}
return err
}
/*
// CloseWhenDone arranges for c to be closed either when ctx is cancelled or
// surrounding function returns.
//
// To work as intended it should be called under defer like this:
//
// func myfunc(ctx, ...) {
// defer xio.CloseWhenDone(ctx, c)()
//
// The error - if c.Close() returns with any - is logged.
//
// XXX -> use xcontext.WithCloseOnErrCancel instead?
func CloseWhenDone(ctx context.Context, c io.Closer) func() {
return xcontext.WhenDone(ctx, func() {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
})
}
*/
// LClose closes c and logs closing error if there was any.
// the error is otherwise ignored
//
// XXX naming? -> CloseOrLog? CloseErrLog? CloseLogErr? CloseAndLogErr?
func LClose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
// WithCloseOnErrCancel closes c on ctx cancel while f is run, or if f returns with an error.
//
// It is usually handy to propagate cancellation to interrupt IO.
//
// Returned error is what f returned, or ctx.Err() if f retrned nil despite that
// ctx was canceled and c closed.
func WithCloseOnErrCancel(ctx context.Context, c io.Closer, f func() error) (err error) {
closed := false
fdone := make(chan error)
defer func() {
errf, ok := <-fdone // wait for f to complete
if ok {
// it was ctx cancel and `return ctx.Err()` vvv
// -> change return to be what f returned if !nil
if errf != nil {
err = errf
}
}
if err != nil {
if !closed {
LClose(ctx, c)
}
}
}()
go func() (err error) {
defer func() {
fdone <- err
close(fdone)
}()
return f()
}()
select {
case <-ctx.Done():
LClose(ctx, c) // interrupt IO
closed = true
return ctx.Err()
case err := <-fdone:
return err
}
}
// WithCloseOnRetCancel closes c on ctx cancel while f is run, or when f returns.
//
// It is usually handy to propagate cancellation to interrupt IO.
func WithCloseOnRetCancel(ctx context.Context, c io.Closer, f func() error) error {
err := WithCloseOnErrCancel(ctx, c, func() error {
e := f()
if e == nil {
e = retOK // force c close
}
return e
})
if err == retOK {
err = nil
}
return err
}
var retOK = errors.New("ok")
......@@ -62,7 +62,7 @@ func NeedPy(t testing.TB, modules ...string) {
// verify if python is present
havePy, know := pyHave[".python"]
if !know {
cmd := exec.Command("python2", "-c", "0")
cmd := exec.Command("python", "-c", "0")
err := cmd.Run()
havePy = (err == nil)
pyHave[".python"] = havePy
......@@ -76,7 +76,7 @@ func NeedPy(t testing.TB, modules ...string) {
for _, pymod := range modules {
have, know := pyHave[pymod]
if !know {
cmd := exec.Command("python2", "-c", "import "+pymod)
cmd := exec.Command("python", "-c", "import "+pymod)
err := cmd.Run()
have = (err == nil)
pyHave[pymod] = have
......@@ -125,7 +125,7 @@ func ZPyCommitRaw(zurl string, at zodb.Tid, objv ...ZRawObject) (_ zodb.Tid, err
zin.WriteString("\n")
// run py `zodb commit`
cmd:= exec.Command("python2", "-m", "zodbtools.zodb", "commit", zurl, at.String())
cmd:= exec.Command("python", "-m", "zodbtools.zodb", "commit", zurl, at.String())
cmd.Stdin = zin
cmd.Stderr = os.Stderr
out, err := cmd.Output()
......@@ -142,6 +142,8 @@ func ZPyCommitRaw(zurl string, at zodb.Tid, objv ...ZRawObject) (_ zodb.Tid, err
return tid, nil
}
// XXX + ZPyCommitSrv ?
// ZPyRestore restores transactions specified by zin in zodbdump format.
//
// The restore is performed via zodbtools/py.
......@@ -149,7 +151,7 @@ func ZPyRestore(zurl string, zin string) (tidv []zodb.Tid, err error) {
defer xerr.Contextf(&err, "%s: zpyrestore", zurl)
// run py `zodb restore`
cmd:= exec.Command("python2", "-m", "zodbtools.zodb", "restore", zurl)
cmd:= exec.Command("python", "-m", "zodbtools.zodb", "restore", zurl)
cmd.Stdin = strings.NewReader(zin)
cmd.Stderr = os.Stderr
out, err := cmd.Output()
......@@ -309,10 +311,7 @@ func checkLoad(t *testing.T, zdrv zodb.IStorageDriver, xid zodb.Xid, expect objS
//
// txnvOk is what data to expect to be in the database.
func DrvTestLoad(t *testing.T, zdrv zodb.IStorageDriver, txnvOk []Txn, bugv ...string) {
bugs := map[string]bool{}
for _, bug := range bugv {
bugs[bug] = true
}
bugs := argv2map(bugv)
// current knowledge of what was "before" for an oid as we scan over
// data base entries
......@@ -322,6 +321,8 @@ func DrvTestLoad(t *testing.T, zdrv zodb.IStorageDriver, txnvOk []Txn, bugv ...s
for _, obj := range txn.Data {
txh := txn.Header
// XXX assert obj.Tid == txn.Tid
// XXX check Load finds data at correct .Pos / etc ?
// ~ loadSerial
......@@ -364,9 +365,13 @@ func DrvTestLoad(t *testing.T, zdrv zodb.IStorageDriver, txnvOk []Txn, bugv ...s
}
// DrvTestWatch verifies that storage driver watcher can observe commits done from outside.
func DrvTestWatch(t *testing.T, zurl string, zdrvOpen zodb.DriverOpener) {
func DrvTestWatch(t *testing.T, zurl string, zdrvOpen zodb.DriverOpener, bugv ...string) {
X := FatalIf(t)
bugs := argv2map(bugv)
if bugs["nocommit"] {
t.Skip("skipping: server does not implement commit")
}
NeedPy(t, "zodbtools")
u, err := url.Parse(zurl); X(err)
......@@ -493,6 +498,16 @@ func FatalIf(t *testing.T) func(error) {
}
}
// argv2map converts ...string to map[string]bool.
// it is handy to use e.g. to convert ...bugv to bugs {}.
func argv2map(argv []string) map[string]bool {
dict := map[string]bool{}
for _, arg := range argv {
dict[arg] = true
}
return dict
}
// b is syntactic sugar for byte literals.
//
......
This diff is collapsed.
This diff is collapsed.
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Neo is a driver program for running and managing NEO databases.
package main
import (
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/neo/go/internal/log"
)
var commands = prog.CommandRegistry{
{"master", masterSummary, masterUsage, masterMain},
{"storage", storageSummary, storageUsage, storageMain},
}
var helpTopics = prog.HelpRegistry{
// empty for now
}
func main() {
prog := prog.MainProg{
Name: "neo",
Summary: "Neo is a tool to run NEO services and commands",
Commands: commands,
HelpTopics: helpTopics,
}
defer log.Flush()
prog.Main()
}
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package main
// cli to run master node
import (
"context"
"flag"
"fmt"
"io"
"os"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo"
)
const masterSummary = "run master node"
func masterUsage(w io.Writer) {
fmt.Fprintf(w,
`Usage: neo master [options]
Run NEO master node.
`)
}
func masterMain(argv []string) {
flags := flag.NewFlagSet("", flag.ExitOnError)
flags.Usage = func() { masterUsage(os.Stderr); flags.PrintDefaults() }
netSetup := netFlags(flags)
cluster := flags.String("cluster", "", "cluster name")
// TODO masters here too (multiple masters) ?
bind := flags.String("bind", "", "address to serve on")
flags.Parse(argv[1:])
if *cluster == "" {
prog.Fatal("cluster name must be provided")
}
argv = flags.Args()
if len(argv) > 0 {
flags.Usage()
prog.Exit(2)
}
ctx := context.Background()
err := func() (err error) {
net, err := netSetup(ctx)
if err != nil {
return err
}
defer func() {
__ := net.Close()
err = xerr.First(err, __)
}()
return listenAndServe(ctx, net, *bind, func(ctx context.Context, l xnet.Listener) error {
master := neo.NewMaster(*cluster, net)
return master.Run(ctx, l)
})
}()
if err != nil {
prog.Fatal(err)
}
}
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package main
// routines common to several subcommands
import (
"bytes"
"context"
"flag"
"net/http"
"io"
"fmt"
"github.com/soheilhy/cmux"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
// for http://.../debug/pprof
_ "net/http/pprof"
)
// netFlags installs common network flags and returns function to setup network
// and create selected networker.
func netFlags(flags *flag.FlagSet) (netSetup func(context.Context) (xnet.Networker, error)) {
// XXX also support $NEO_<K> envvars?
fca := flags.String("ca", "", "path to CA certificate")
fcert := flags.String("cert", "", "path to node certificate")
fkey := flags.String("key", "", "path to node private key")
flonode := flags.String("lonode", "", "<net>/<host> for this node on lonet network")
return func(ctx context.Context) (xnet.Networker, error) {
return neonet.Join(ctx, neonet.Config{
CA: *fca,
Cert: *fcert,
Key: *fkey,
LoNode: *flonode,
})
}
}
// neoMatch tells whether incoming stream starts like a NEO protocol handshake word.
func neoMatch(r io.Reader) bool {
var b [4]byte
n, _ := io.ReadFull(r, b[:])
if n < 4 {
return false
}
switch {
// 00 00 00 v - original handshake for NEO <= 1.12
case bytes.Equal(b[:3], []byte{0,0,0}):
return true
// NEO handshake after switch to MessagePack (= start of msgpack encoding for [2](b"NEO", version) )
case bytes.Equal(b[:], []byte{0x92, 0xc4, 3, 'N'}):
return true
default:
return false
}
}
// listenAndServe runs service on laddr.
//
// It starts listening, multiplexes incoming connection to NEO and HTTP
// protocols, passes NEO connections to serve and passes HTTP connection to
// default HTTP mux.
//
// Default HTTP mux can be assumed to contain /debug/pprof and the like.
func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve func(ctx context.Context, l xnet.Listener) error) (err error) {
l, err := net.Listen(ctx, laddr)
if err != nil {
return err
}
defer func() { // just in case if mux.Close does not close l
__ := l.Close()
err = xerr.First(err, __)
}()
log.Infof(ctx, "listening at %s ...", l.Addr())
log.Flush()
mux := cmux.New(xnet.BindCtxL(l, ctx))
neoL := mux.Match(neoMatch)
httpL := mux.Match(cmux.HTTP1(), cmux.HTTP2())
miscL := mux.Match(cmux.Any())
wg := xsync.NewWorkGroup(ctx)
wg.Go(func(ctx context.Context) error {
return xio.WithCloseOnRetCancel(ctx, &noErrCloser{mux},
mux.Serve,
)
})
wg.Go(func(ctx context.Context) error {
return serve(ctx, xnet.WithCtxL(neoL))
})
wg.Go(func(ctx context.Context) error {
srv := &http.Server{}
return xio.WithCloseOnRetCancel(ctx, srv, func() error {
// TODO ^^^ better Shutdown instead of Close
return srv.Serve(httpL)
})
})
wg.Go(func(ctx context.Context) error {
return xio.WithCloseOnRetCancel(ctx, miscL, func() error {
for {
conn, err := miscL.Accept()
if err != nil {
return err
}
// got something unexpected - grab the header (which we
// already have read), log it and reject the connection.
b := make([]byte, 1024)
// must not block as some data is already there in cmux buffer
n, _ := conn.Read(b)
subj := fmt.Sprintf("strange connection from %s:", conn.RemoteAddr())
serr := "peer sent nothing"
if n > 0 {
serr = fmt.Sprintf("peer sent %q", b[:n])
}
log.Warningf(ctx, "%s: %s", subj, serr)
xio.LClose(ctx, conn)
}
})
})
return wg.Wait()
}
// noErrCloser turns `Close()` -> `Close() err` that always returns nil.
type noErrCloser struct {
c interface { Close() }
}
func (c *noErrCloser) Close() error {
c.c.Close()
return nil
}
// Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package main
// cli to run storage node
import (
"context"
"flag"
"fmt"
"io"
"os"
"runtime"
"strings"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/storage"
_ "lab.nexedi.com/kirr/neo/go/neo/storage/fs1"
_ "lab.nexedi.com/kirr/neo/go/neo/storage/sqlite"
)
const storageSummary = "run storage node"
func storageUsage(w io.Writer) {
fmt.Fprintf(w,
`Usage: neo storage [options] <data>
Run NEO storage node.
<data> is backend URL for data persistence.
Available backends are:
`)
for _, back := range storage.AvailableBackends() {
fmt.Fprintf(w, "- %s://\n", back)
}
fmt.Fprintf(w,
`
XXX currently storage is read-only.
`)
}
func storageMain(argv []string) {
flags := flag.NewFlagSet("", flag.ExitOnError)
flags.Usage = func() { storageUsage(os.Stderr); flags.PrintDefaults() }
netSetup := netFlags(flags)
cluster := flags.String("cluster", "", "the cluster name")
masters := flags.String("masters", "", "list of masters")
bind := flags.String("bind", "", "address to serve on")
flags.Parse(argv[1:])
if *cluster == "" {
prog.Fatal("cluster name must be provided")
}
masterv := strings.Split(*masters, ",")
if len(masterv) == 0 {
prog.Fatal("master list must be provided")
}
if len(masterv) > 1 {
prog.Fatal("TODO neo/go POC currently supports only 1 master")
}
master := masterv[0]
argv = flags.Args()
if len(argv) < 1 {
flags.Usage()
prog.Exit(2)
}
// adjust GOMAXPROCS *= N (a lot of file IO) because file IO really consumes OS threads; details:
// https://groups.google.com/forum/#!msg/golang-nuts/jPb_h3TvlKE/rQwbg-etCAAJ
// https://github.com/golang/go/issues/6817
//
// TODO check how varying this affects performance
//
// NOTE should be not needed once, hopefully, Go runtime uses io_uring for IO
// https://github.com/golang/go/issues/31908
maxprocs := runtime.GOMAXPROCS(0)
runtime.GOMAXPROCS(maxprocs*8) // XXX *8 is enough?
ctx := context.Background()
err := func() (err error) {
back, err := storage.OpenBackend(ctx, argv[0])
if err != nil {
return err
}
defer func() {
if back != nil {
__ := back.Close()
err = xerr.First(err, __)
}
}()
net, err := netSetup(ctx)
if err != nil {
return err
}
defer func() {
__ := net.Close()
err = xerr.First(err, __)
}()
return listenAndServe(ctx, net, *bind, func(ctx context.Context, l xnet.Listener) error {
stor := neo.NewStorage(*cluster, master, net, back)
back = nil
return stor.Run(ctx, l)
})
}()
if err != nil {
prog.Fatal(err)
}
}
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xtime complements standard package time.
package xtime
import (
"time"
)
// Mono returns time passed since program start.
//
// It uses monotonic time for measurements and is robust to OS clock adjustments.
//
// XXX better return time.Duration?
func Mono() float64 {
// time.Sub uses monotonic clock readings for the difference
// FIXME py returns it since epoch, not since start
return time.Now().Sub(tstart).Seconds()
}
var tstart time.Time = time.Now()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -132,6 +132,7 @@ func xconnError(err error) error {
// Prepare pktBuf with content.
func _mkpkt(enc proto.Encoding, connid uint32, msgcode uint16, payload []byte) *pktBuf {
// XXX -> enc.PktHeadEncode + enc.MsgEncode
switch enc {
case 'N':
pkt := &pktBuf{make([]byte, proto.PktHeaderLenN+len(payload))}
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -446,7 +446,8 @@ func TestNID(t *testing.T) {
func TestNIDDecode(t *testing.T) {
var testv = []struct{nid uint32; str string}{
{0, "?(0)0"},
// {0, "?(0)0"},
{0, "S?"}, // XXX X0 used as X? until neo.NewNode uses temporary bit
{0x00000001, "S1"},
{0xf0000002, "M2"},
{0xe0000003, "C3"},
......
......@@ -954,7 +954,7 @@ func (s *sizerM) genBasic(path string, typ *types.Basic, userType types.Type) {
upath = fmt.Sprintf("%s(%s)", typ.Name(), upath)
}
// zodb.Tid and zodb.Oid are encoded as [8]bin
// zodb.Tid and zodb.Oid are encoded as [8]bin XXX or nil for INVALID_{TID_OID}
if userType == zodbTid || userType == zodbOid {
s.size.Add(1+1+8) // mbin8 + 8 + [8]data
return
......@@ -990,7 +990,7 @@ func (e *encoderM) genBasic(path string, typ *types.Basic, userType types.Type)
upath = fmt.Sprintf("%s(%s)", typ.Name(), upath)
}
// zodb.Tid and zodb.Oid are encoded as [8]bin
// zodb.Tid and zodb.Oid are encoded as [8]bin XXX or nil
if userType == zodbTid || userType == zodbOid {
e.emit("data[%v] = byte(msgpack.Bin8)", e.n); e.n++
e.emit("data[%v] = 8", e.n); e.n++
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.