Commit 646a94b5 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/t/neotest: Switch to zwrk to simulate parallel load from multiple clients

With zwrk for ZODB being similar to what wrk is for HTTP.

Rationale: simulating multiple clients is:

1. noisy - the timings from run to run are changing sometimes up to 50%
2. with significant additional overhead - there are constant OS-level
   process switches in between client processes and this prevents to
   actually create the load.
3. the above load from "2" actually takes resources from the server in
   localhost case.

So let's switch to simulating many requests in lightweight way similarly
to how it is done in wrk - in one process and not so many threads (it
can be just 1) with many connections opened to server and epolly way to
load it with Go providing epoll-goroutine matching.

Example summarized zbench-local output:

	x/src/lab.nexedi.com/kirr/neo/go/neo/t$ benchstat -split node,cluster,dataset x.txt
	name                             time/object
	cluster:rio dataset:wczblk1-8
	fs1-zhash.py                             23.7µs ± 5%
	fs1-zhash.go                             5.68µs ± 8%
	fs1-zhash.go+prefetch128                 6.44µs ±16%
	zeo/py/fs1-zhash.py                       376µs ± 4%
	zeo/py/fs1-zhash.go                       130µs ± 3%
	zeo/py/fs1-zhash.go+prefetch128          72.3µs ± 4%
	neo/py(!log)/sqlite·P1-zhash.py           565µs ± 4%
	neo/py(!log)/sql·P1-zhash.py              491µs ± 8%
	cluster:rio dataset:prod1-1024
	fs1-zhash.py                             19.5µs ± 2%
	fs1-zhash.go                             3.92µs ±12%
	fs1-zhash.go+prefetch128                 4.42µs ± 6%
	zeo/py/fs1-zhash.py                       365µs ± 9%
	zeo/py/fs1-zhash.go                       120µs ± 1%
	zeo/py/fs1-zhash.go+prefetch128          68.4µs ± 3%
	neo/py(!log)/sqlite·P1-zhash.py           560µs ± 5%
	neo/py(!log)/sql·P1-zhash.py              482µs ± 8%

	name                             req/s
	cluster:rio dataset:wczblk1-8
	fs1-zwrk.go·1                              380k ± 2%
	fs1-zwrk.go·2                              666k ± 3%
	fs1-zwrk.go·3                              948k ± 1%
	fs1-zwrk.go·4                             1.24M ± 1%
	fs1-zwrk.go·8                             1.62M ± 0%
	fs1-zwrk.go·12                            1.70M ± 0%
	fs1-zwrk.go·16                            1.71M ± 0%
	zeo/py/fs1-zwrk.go·1                      8.29k ± 1%
	zeo/py/fs1-zwrk.go·2                      10.4k ± 2%
	zeo/py/fs1-zwrk.go·3                      11.2k ± 1%
	zeo/py/fs1-zwrk.go·4                      11.7k ± 1%
	zeo/py/fs1-zwrk.go·8                      12.1k ± 2%
	zeo/py/fs1-zwrk.go·12                     12.3k ± 1%
	zeo/py/fs1-zwrk.go·16                     12.3k ± 2%
	cluster:rio dataset:prod1-1024
	fs1-zwrk.go·1                              594k ± 7%
	fs1-zwrk.go·2                             1.14M ± 4%
	fs1-zwrk.go·3                             1.60M ± 2%
	fs1-zwrk.go·4                             2.09M ± 1%
	fs1-zwrk.go·8                             2.74M ± 1%
	fs1-zwrk.go·12                            2.76M ± 0%
	fs1-zwrk.go·16                            2.76M ± 1%
	zeo/py/fs1-zwrk.go·1                      9.42k ± 9%
	zeo/py/fs1-zwrk.go·2                      10.4k ± 1%
	zeo/py/fs1-zwrk.go·3                      11.4k ± 1%
	zeo/py/fs1-zwrk.go·4                      11.7k ± 2%
	zeo/py/fs1-zwrk.go·8                      12.4k ± 1%
	zeo/py/fs1-zwrk.go·12                     12.5k ± 1%
	zeo/py/fs1-zwrk.go·16                     13.4k ±11%

	name                             latency-time/object
	cluster:rio dataset:wczblk1-8
	fs1-zwrk.go·1                            2.63µs ± 2%
	fs1-zwrk.go·2                            3.00µs ± 3%
	fs1-zwrk.go·3                            3.16µs ± 1%
	fs1-zwrk.go·4                            3.23µs ± 1%
	fs1-zwrk.go·8                            4.94µs ± 0%
	fs1-zwrk.go·12                           7.06µs ± 0%
	fs1-zwrk.go·16                           9.36µs ± 0%
	zeo/py/fs1-zwrk.go·1                      121µs ± 1%
	zeo/py/fs1-zwrk.go·2                      192µs ± 2%
	zeo/py/fs1-zwrk.go·3                      267µs ± 1%
	zeo/py/fs1-zwrk.go·4                      343µs ± 1%
	zeo/py/fs1-zwrk.go·8                      660µs ± 2%
	zeo/py/fs1-zwrk.go·12                     977µs ± 1%
	zeo/py/fs1-zwrk.go·16                    1.30ms ± 2%
	cluster:rio dataset:prod1-1024
	fs1-zwrk.go·1                            1.69µs ± 7%
	fs1-zwrk.go·2                            1.76µs ± 4%
	fs1-zwrk.go·3                            1.88µs ± 2%
	fs1-zwrk.go·4                            1.91µs ± 1%
	fs1-zwrk.go·8                            2.92µs ± 1%
	fs1-zwrk.go·12                           4.34µs ± 0%
	fs1-zwrk.go·16                           5.80µs ± 1%
	zeo/py/fs1-zwrk.go·1                      107µs ± 9%
	zeo/py/fs1-zwrk.go·2                      192µs ± 1%
	zeo/py/fs1-zwrk.go·3                      263µs ± 1%
	zeo/py/fs1-zwrk.go·4                      342µs ± 2%
	zeo/py/fs1-zwrk.go·8                      648µs ± 1%
	zeo/py/fs1-zwrk.go·12                     957µs ± 1%
	zeo/py/fs1-zwrk.go·16                    1.20ms ±10%

The scalability graphs in http://navytux.spb.ru/~kirr/neo.html were
made with simulating client load by zwrk, not many client OS processes.
http://navytux.spb.ru/~kirr/neo.html#performance-tests has some
additional notes on zwrk.

Some draft history related to this patch:

	lab.nexedi.com/kirr/neo/commit/ca0d828b	X neotest: Tzwrk1 - place to control running time of 1 zwrk iteration
	lab.nexedi.com/kirr/neo/commit/bbfb5006	X zwrk: Make sure we warm up connections to all NEO storages when cluster is partitioned
	lab.nexedi.com/kirr/neo/commit/7f22bba6	X zwrk: New tool to simulate paralell load from multiple clients
parent 1f92a4e2
......@@ -931,7 +931,8 @@ cpustat() {
}
Nrun=5 # repeat benchmarks N time
Npar=16 # run so many parallel clients in parallel phase
Nparv="1 2 3 4 8 12 16" # run parallel zwrk benchmarks with so many clients
Tzwrk1=1s # time to run 1 zwrk iteration
#profile=
profile=cpustat
......@@ -943,20 +944,6 @@ nrun() {
done
}
# nrunpar ... - run $Npar ... instances in parallel and wait for completion
nrunpar() {
$profile _nrunpar "$@"
}
_nrunpar() {
local jobv
for i in `seq $Npar`; do
"$@" &
jobv="$jobv $!"
done
wait $jobv
}
# bench_cpu - microbenchmark CPU
bench_cpu() {
echo -ne "node:\t"; xhostname
......@@ -1118,8 +1105,6 @@ zbench() {
#nrun time demo-zbigarray read $url
nrun tzodb.py zhash --check=$zhashok --bench=$topic-%s --$zhashfunc $url
echo -e "\n# ${Npar} clients in parallel"
nrunpar tzodb.py zhash --check=$zhashok --bench=$topic-%s·P$Npar --$zhashfunc $url
echo
zbench_go $url $topic $zhashok
}
......@@ -1140,8 +1125,11 @@ zbench_go() {
nrun tzodb_go zhash -check=$zhashok -bench=$topic -$zhashfunc $url
nrun tzodb_go zhash -check=$zhashok -bench=$topic -$zhashfunc -useprefetch $url
echo -e "\n# ${Npar} clients in parallel"
nrunpar tzodb_go zhash -check=$zhashok -bench=$topic·P$Npar -$zhashfunc $url
for i in ${Nparv}; do
echo -e "\n# $i clients in parallel"
nrun tzodb_go -test.benchtime $Tzwrk1 \
zwrk -nclient $i -check=$zhashok -bench=$topic -$zhashfunc $url
done
}
......
......@@ -35,8 +35,12 @@ import (
"io"
"log"
"os"
"sync/atomic"
"testing"
"time"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xerr"
......@@ -238,10 +242,265 @@ loop:
}
// ----------------------------------------
const zwrkSummary = "benchmark database under parallel load from multiple clients."
func zwrkUsage(w io.Writer) {
fmt.Fprintf(w,
`Usage: tzodb zwrk [options] <url>
`)
}
func zwrkMain(argv []string) {
ctx := context.Background()
flags := flag.NewFlagSet("", flag.ExitOnError)
flags.Usage = func() { zwrkUsage(os.Stderr); flags.PrintDefaults() }
fhash := hashFlags(flags)
fcheck := flags.String("check", "", "verify whole-database hash to be = expected")
fbench := flags.String("bench", "", "benchmarking format for output")
fnclient := flags.Int("nclient", 1, "simulate so many clients")
flags.Parse(argv[1:])
if flags.NArg() != 1 {
flags.Usage()
os.Exit(1)
}
// XXX kill -bench and use labels in neotest
if *fbench == "" {
log.Fatal("-bench must be provided")
}
url := flags.Arg(0)
h := fhash()
if (*fcheck != "") != (h.Hash != nil) {
log.Fatal("-check and -<hash> must be used together")
}
err := zwrk(ctx, url, *fnclient, h, *fbench, *fcheck)
if err != nil {
log.Fatal(err)
}
}
// zwrk simulates database load from multiple clients.
//
// It first serially reads all objects and remember theirs per-object crc32
// checksum. If h/check were provided, this phase, similarly to zhash, also
// checks that the whole database content is as expected.
//
// Then parallel phase starts where nwrk separate connections to database are
// opened and nwrk workers are run to perform database access over them in
// parallel to each other.
//
// At every time when an object is loaded its crc32 checksum is verified to be
// the same as was obtained during serial phase.
func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check string) (err error) {
at, objcheckv, err := zwrkPrepare(ctx, url, h, check)
if err != nil {
return err
}
// establish nwrk connections and warm them up
storv, err := zwrkPreconnect(ctx, url, at, nwrk)
if err != nil {
return err
}
defer func() {
for _, stor := range storv {
if stor != nil {
err2 := stor.Close()
err = xerr.First(err, err2)
}
}
}()
// benchmark parallel loads
defer xerr.Contextf(&err, "zwrk-%d/bench", nwrk)
r := testing.Benchmark(func (b *testing.B) {
wg, ctx := errgroup.WithContext(ctx)
var n int64
for i := 0; i < nwrk; i++ {
stor := storv[i]
oid := zodb.Oid(0)
wg.Go(func() error {
for {
n := atomic.AddInt64(&n, +1)
if n >= int64(b.N) {
return nil
}
xid := zodb.Xid{Oid: oid, At: at}
buf, _, err := stor.Load(ctx, xid)
if err != nil {
return err
}
csum := crc32.ChecksumIEEE(buf.Data)
if csum != objcheckv[oid] {
return fmt.Errorf("%s: %s: crc32 mismatch: got %08x ; expect %08x",
url, oid, csum, objcheckv[oid])
}
buf.Release()
// XXX various scenarios are possible to select next object to read
oid = (oid + 1) % zodb.Oid(len(objcheckv))
}
return nil
})
}
err = wg.Wait()
if err != nil {
// XXX log. (not b.) - workaround for testing.Benchmark
// not allowing to detect failures.
log.Fatal(ctx, err)
}
})
// TODO latency distribution
tavg := float64(r.T) / float64(r.N) / float64(time.Microsecond)
latavg := float64(nwrk) * tavg
rps := float64(r.N) / r.T.Seconds()
topic := fmt.Sprintf(bench, "zwrk.go")
fmt.Printf("Benchmark%s·%d %d\t%.1f req/s %.3f latency-µs/object\n",
topic, nwrk, r.N, rps, latavg)
return nil
}
// zwrkPreconnect establishes nwrk connections and warms them up.
func zwrkPreconnect(ctx context.Context, url string, at zodb.Tid, nwrk int) (_ []zodb.IStorage, err error) {
defer xerr.Contextf(&err, "zwrk-%d/preconnect", nwrk)
storv := make([]zodb.IStorage, nwrk)
wg, ctx := errgroup.WithContext(ctx)
for i := 0; i < nwrk; i++ {
i := i
wg.Go(func() error {
// open storage without caching - we need to take
// latency of every request into account, and a cache
// could be inhibiting (e.g. making significantly
// lower) it for some requests.
var opts = zodb.OpenOptions{
ReadOnly: true,
NoCache: true,
}
stor, err := zodb.OpenStorage(ctx, url, &opts)
if err != nil {
return err
}
storv[i] = stor
// storage to warm-up the connection
// ( in case of NEO LastTid connects to master and Load
// - to a storage )
_, err = stor.LastTid(ctx)
if err != nil {
return err
}
// load several first objects to warm up storages connection
// we need to load several objects so that in case of
// NEO cluster with several storage nodes, we warm-up
// connections to them all.
//
// FIXME 16 hardcoded
for oid := zodb.Oid(0); oid < 16; oid++ {
buf, _, err := stor.Load(ctx, zodb.Xid{Oid: oid, At: at})
buf.XRelease()
if err != nil {
return err
}
}
return nil
})
}
err = wg.Wait()
if err != nil {
for _, stor := range storv {
if stor != nil {
stor.Close() // XXX lclose
}
}
return nil, err
}
return storv, nil
}
// zwrkPrepare serially reads all objects and computes per-object crc32.
func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zodb.Tid, objcheckv []uint32, err error) {
defer xerr.Context(&err, "zwrk/prepare")
stor, err := zodb.OpenStorage(ctx, url, &zodb.OpenOptions{ReadOnly: true})
if err != nil {
return 0, nil, err
}
defer func() {
err2 := stor.Close()
err = xerr.First(err, err2)
}()
lastTid, err := stor.LastTid(ctx)
if err != nil {
return 0, nil, err
}
oid := zodb.Oid(0)
loop:
for {
xid := zodb.Xid{Oid: oid, At: lastTid}
buf, _, err := stor.Load(ctx, xid)
if err != nil {
switch errors.Cause(err).(type) {
case *zodb.NoObjectError:
break loop
default:
return 0, nil, err
}
}
// XXX Castagnoli is more strong and faster to compute
objcheckv = append(objcheckv, crc32.ChecksumIEEE(buf.Data))
if check != "" {
h.Write(buf.Data)
}
oid += 1
buf.Release()
}
// check the data read serially is indeed what was expected.
if check != "" {
hresult := fmt.Sprintf("%s:%x", h.name, h.Sum(nil))
if hresult != check {
return 0, nil, fmt.Errorf("%s: hash mismatch: expected %s ; got %s", url, check, hresult)
}
}
return lastTid, objcheckv, nil
}
// ----------------------------------------
var commands = prog.CommandRegistry{
{"zhash", zhashSummary, zhashUsage, zhashMain},
{"zwrk", zwrkSummary, zwrkUsage, zwrkMain},
}
func main() {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment