Commit 7f22bba6 authored by Kirill Smelkov's avatar Kirill Smelkov

X zwrk: New tool to simulate paralell load from multiple clients

Similarly to wrk on HTTP.

Rationale: simulating multiple clients is:

1. noisy - the timings from run to run are changing sometimes up to 50%
2. with significant additional overhead - there are constant OS-level
   process switches in between client processes and this prevents to
   actually create the load.
3. the above load from "2" actually takes resources from the server in
   localhost case.

So let's switch to simlating many requests in lightweight way similarly
to how it is done in wrk - in one process and not so many threads (it
can be just 1) with many connections opened to server and epolly way to
load it with Go providing epoll-goroutine matching.
parent c86ba1b0
/log
/var
/zhash
/zhash_go
/tcpu
/tcpu_go
/tzodb
/tzodb_go
/ioping.tmp
......@@ -432,7 +432,7 @@ GENfs() {
# remember correct hash to later check in benchmarks
# crc32:1552c530 ; oid=0..2127 nread=8534126 t=0.033s (15.7μs / object) x=zhash.py
zhash.py --$zhashfunc $fs1/data.fs |awk '{print $1}' >$ds/zhash.ok
tzodb.py zhash --$zhashfunc $fs1/data.fs |awk '{print $1}' >$ds/zhash.ok
}
# generate data in sqlite
......@@ -861,7 +861,8 @@ cpustat() {
}
Nrun=5 # repeat benchmarks N time
Npar=16 # run so many parallel clients in parallel phase
Nparv="1 2 4 8 16" # run parallel zwrk benchmarks with so many clients (XXX +6, +12 ?)
#Npar=16 # run so many parallel clients in parallel phase
#profile=
profile=cpustat
......@@ -874,6 +875,7 @@ nrun() {
}
# nrunpar ... - run $Npar ... instances in parallel and wait for completion
# XXX running processes in parallel is deprecated in favour of zwrk.
nrunpar() {
$profile _nrunpar "$@"
}
......@@ -1047,9 +1049,10 @@ zbench() {
zhashok=$3
# nrun time demo-zbigarray read $url
nrun zhash.py --check=$zhashok --bench=$topic/%s --$zhashfunc $url
echo -e "\n# ${Npar} clients in parallel"
nrunpar zhash.py --check=$zhashok --bench=$topic/%s-P$Npar --$zhashfunc $url
nrun tzodb.py zhash --check=$zhashok --bench=$topic/%s --$zhashfunc $url
# XXX running processes in parallel is deprecated in favour of zwrk.
# echo -e "\n# ${Npar} clients in parallel"
# nrunpar tzodb.py zhash --check=$zhashok --bench=$topic/%s-P$Npar --$zhashfunc $url
echo
zbench_go $url $topic $zhashok
}
......@@ -1059,11 +1062,17 @@ zbench_go() {
url=$1
topic=$2
zhashok=$3
nrun zhash_go -check=$zhashok --bench=$topic/%s --log_dir=$log -$zhashfunc $url
nrun zhash_go -check=$zhashok --bench=$topic/%s --log_dir=$log -$zhashfunc -useprefetch $url
nrun tzodb_go -log_dir=$log zhash -check=$zhashok -bench=$topic/%s -$zhashfunc $url
nrun tzodb_go -log_dir=$log zhash -check=$zhashok -bench=$topic/%s -$zhashfunc -useprefetch $url
echo -e "\n# ${Npar} clients in parallel"
nrunpar zhash_go -check=$zhashok --bench=$topic/%s-P$Npar --log_dir=$log -$zhashfunc $url
# XXX running processes in parallel is deprecated in favour of zwrk.
# echo -e "\n# ${Npar} clients in parallel"
# nrunpar tzodb_go -log_dir=$log zhash -check=$zhashok -bench=$topic/%s-P$Npar -$zhashfunc $url
for i in ${Nparv}; do
echo -e "\n# $i clients in parallel"
nrun tzodb_go -log_dir=$log zwrk -nclient $i -check=$zhashok -bench=$topic/%s -$zhashfunc $url
done
}
......@@ -1402,15 +1411,15 @@ cpustat)
;;
esac
# make sure zhash*, tcpu* and zgenprod are on PATH (because we could be invoked from another dir)
# make sure tzodb*, tcpu* and zgenprod are on PATH (because we could be invoked from another dir)
X=$(cd `dirname $0` && pwd)
export PATH=$X:$PATH
# rebuild go bits
# neo/py, wendelin.core, ... - must be pip install'ed - `neotest deploy` cares about that
go install -v lab.nexedi.com/kirr/neo/go/...
go build -o $X/zhash_go $X/zhash.go
#go build -race -o $X/zhash_go $X/zhash.go
go build -o $X/tzodb_go $X/tzodb.go
#go build -race -o $X/tzodb_go $X/tzodb.go
go build -o $X/tcpu_go $X/tcpu.go
# setup network & fs environment
......
......@@ -69,6 +69,8 @@ func prettyarg(arg string) string {
// benchit runs the benchmark for benchf
func benchit(benchname string, bencharg string, benchf func(*testing.B, string)) {
// FIXME testing.Benchmark does not allow to detect whether benchmark failed.
// (use log.Fatal, not {t,b}.Fatal as workaround)
r := testing.Benchmark(func (b *testing.B) {
benchf(b, bencharg)
})
......@@ -86,7 +88,7 @@ func benchit(benchname string, bencharg string, benchf func(*testing.B, string))
func benchHash(b *testing.B, h hash.Hash, arg string) {
blksize, err := strconv.Atoi(arg)
if err != nil {
b.Fatal(err)
log.Fatal(err)
}
data := make([]byte, blksize)
......@@ -101,23 +103,23 @@ func BenchmarkAdler32(b *testing.B, arg string) { benchHash(b, adler32.New(), ar
func BenchmarkCrc32(b *testing.B, arg string) { benchHash(b, crc32.NewIEEE(), arg) }
func BenchmarkSha1(b *testing.B, arg string) { benchHash(b, sha1.New(), arg) }
func xreadfile(t testing.TB, path string) []byte {
func xreadfile(path string) []byte {
data, err := ioutil.ReadFile(path)
if err != nil {
t.Fatal(err)
log.Fatal(err)
}
return data
}
func BenchmarkUnzlib(b *testing.B, zfile string) {
zdata := xreadfile(b, fmt.Sprintf("testdata/zlib/%s", zfile))
zdata := xreadfile(fmt.Sprintf("testdata/zlib/%s", zfile))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := xzlib.Decompress(zdata)
if err != nil {
b.Fatal(err)
log.Fatal(err)
}
}
}
......
......@@ -24,7 +24,7 @@ from __future__ import print_function
import sys
import hashlib
import zhash
import tzodb
import zlib
from time import time
from math import ceil, log10
......@@ -125,8 +125,8 @@ def _bench_hasher(b, h, blksize):
i += 1
def bench_adler32(b, blksize): _bench_hasher(b, zhash.Adler32Hasher(), blksize)
def bench_crc32(b, blksize): _bench_hasher(b, zhash.CRC32Hasher(), blksize)
def bench_adler32(b, blksize): _bench_hasher(b, tzodb.Adler32Hasher(), blksize)
def bench_crc32(b, blksize): _bench_hasher(b, tzodb.CRC32Hasher(), blksize)
def bench_sha1(b, blksize): _bench_hasher(b, hashlib.sha1(), blksize)
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -19,7 +19,7 @@
// +build ignore
// zhash - compute hash of whole latest objects stream in a ZODB database
// tzodb - ZODB-related benchmarks
package main
import (
......@@ -29,12 +29,18 @@ import (
"crypto/sha512"
"flag"
"fmt"
"io"
"hash"
"hash/crc32"
"hash/adler32"
"os"
"sync/atomic"
"testing"
"time"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
......@@ -43,7 +49,6 @@ import (
_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
"github.com/pkg/errors"
"github.com/pkg/profile"
)
// hasher is hash.Hash which also knows its name
......@@ -61,28 +66,17 @@ func (h nullHasher) Reset() {}
func (h nullHasher) Size() int { return 1 }
func (h nullHasher) BlockSize() int { return 1 }
func main() {
defer log.Flush()
fnull := flag.Bool("null", false, "don't compute hash - just read data")
fadler32 := flag.Bool("adler32",false, "compute Adler32 checksum")
fcrc32 := flag.Bool("crc32", false, "compute CRC32 checksum")
fsha1 := flag.Bool("sha1", false, "compute SHA1 cryptographic hash")
fsha256 := flag.Bool("sha256", false, "compute SHA256 cryptographic hash")
fsha512 := flag.Bool("sha512", false, "compute SHA512 cryptographic hash")
fcheck := flag.String("check", "", "verify resulting hash to be = expected")
fbench := flag.String("bench", "", "use benchmarking format for output")
useprefetch := flag.Bool("useprefetch", false, "prefetch loaded objects")
flag.Parse()
if flag.NArg() != 1 {
flag.Usage()
os.Exit(1)
}
url := flag.Arg(0)
ctx := context.Background()
// hashFlags installs common hash flags and returns function to retrieve selected hasher.
func hashFlags(ctx context.Context, flags *flag.FlagSet) func() hasher {
fnull := flags.Bool("null", false, "don't compute hash - just read data")
fadler32 := flags.Bool("adler32",false, "compute Adler32 checksum")
fcrc32 := flags.Bool("crc32", false, "compute CRC32 checksum")
fsha1 := flags.Bool("sha1", false, "compute SHA1 cryptographic hash")
fsha256 := flags.Bool("sha256", false, "compute SHA256 cryptographic hash")
fsha512 := flags.Bool("sha512", false, "compute SHA512 cryptographic hash")
return func() hasher {
var h hasher
inith := func(name string, ctor func() hash.Hash) {
if h.name != "" {
......@@ -102,6 +96,41 @@ func main() {
case *fsha512: inith("sha512", sha512.New)
}
return h
}
}
// ----------------------------------------
const zhashSummary = "compute hash of whole latest objects stream in a ZODB database."
func zhashUsage(w io.Writer) {
fmt.Fprintf(w,
`Usage: tzodb zhash [options] <url>
`)
}
func zhashMain(argv []string) {
ctx := context.Background()
flags := flag.NewFlagSet("", flag.ExitOnError)
flags.Usage = func() { zhashUsage(os.Stderr); flags.PrintDefaults() }
fhash := hashFlags(ctx, flags)
fcheck := flags.String("check", "", "verify resulting hash to be = expected")
fbench := flags.String("bench", "", "use benchmarking format for output")
useprefetch := flags.Bool("useprefetch", false, "prefetch loaded objects")
flags.Parse(argv[1:])
if flags.NArg() != 1 {
flags.Usage()
os.Exit(1)
}
url := flags.Arg(0)
h := fhash()
if h.Hash == nil {
log.Fatal(ctx, "no hash function specified")
}
......@@ -157,12 +186,6 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
return err
}
if false {
defer profile.Start(profile.TraceProfile).Stop()
//defer profile.Start(profile.MemProfile).Stop()
//defer profile.Start(profile.CPUProfile).Stop()
}
tstart := time.Now()
oid := zodb.Oid(0)
......@@ -218,3 +241,253 @@ loop:
return nil
}
// ----------------------------------------
const zwrkSummary = "benchmark database under parallel load from multiple clients."
func zwrkUsage(w io.Writer) {
fmt.Fprintf(w,
`Usage: tzodb zwrk [options] <url>
`)
}
func zwrkMain(argv []string) {
ctx := context.Background()
flags := flag.NewFlagSet("", flag.ExitOnError)
flags.Usage = func() { zwrkUsage(os.Stderr); flags.PrintDefaults() }
fhash := hashFlags(ctx, flags)
fcheck := flags.String("check", "", "verify whole-database hash to be = expected")
fbench := flags.String("bench", "", "benchmarking format for output")
fnclient := flags.Int("nclient", 1, "simulate so many clients")
flags.Parse(argv[1:])
if flags.NArg() != 1 {
flags.Usage()
os.Exit(1)
}
// XXX kill -bench and use labels in neotest
if *fbench == "" {
log.Fatal(ctx, "-bench must be provided")
}
url := flags.Arg(0)
h := fhash()
if (*fcheck != "") != (h.Hash != nil) {
log.Fatal(ctx, "-check and -<hash> must be used together")
}
err := zwrk(ctx, url, *fnclient, h, *fbench, *fcheck)
if err != nil {
log.Fatal(ctx, err)
}
}
// zwrk simulates database load from multiple clients.
//
// It first serially reads all objects and remember theirs per-object crc32
// checksum. If h/check were provided this phase, similarly to zhash, also
// checks that the whole database content is as expected.
//
// Then parallel phase starts where nwrk separate connections to database are
// opened and nwrk workers are run to perform database access over them in
// parallel to each other.
//
// At every time when an object is loaded its crc32 checksum is verified to be
// the same as was obtained during serial phase.
func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check string) (err error) {
at, objcheckv, err := zwrkPrepare(ctx, url, h, check)
if err != nil {
return err
}
// parallel phase
defer task.Runningf(&ctx, "zwrk-%d", nwrk)(&err)
ctx0 := ctx
// establish nwrk connections and warm them up
storv := make([]zodb.IStorage, nwrk)
defer func() {
for _, stor := range storv {
if stor != nil {
err2 := stor.Close()
err = xerr.First(err, err2)
}
}
}()
wg, ctx := errgroup.WithContext(ctx0)
for i := 0; i < nwrk; i++ {
i := i
wg.Go(func() error {
// open storage without caching - we need to take
// latency of every request into account, and a cache
// could be inhibiting (e.g. making significantly
// lower) it for some requests.
var opts = zodb.OpenOptions{
ReadOnly: true,
NoCache: true,
}
stor, err := zodb.OpenStorage(ctx, url, &opts)
if err != nil {
return err
}
storv[i] = stor
// ping storage to warm-up the connection
// (in case of NEO LastTid connects to master and Load - to storage)
_, err = stor.LastTid(ctx)
if err != nil {
return err
}
buf, _, err := stor.Load(ctx, zodb.Xid{Oid: 0, At: at})
buf.XRelease()
if err != nil {
return err
}
return nil
})
}
err = wg.Wait()
if err != nil {
return err
}
// benchmark parallel loads
r := testing.Benchmark(func (b *testing.B) {
wg, ctx = errgroup.WithContext(ctx0)
var n int64
for i := 0; i < nwrk; i++ {
stor := storv[i]
oid := zodb.Oid(0)
wg.Go(func() error {
for {
n := atomic.AddInt64(&n, +1)
if n >= int64(b.N) {
return nil
}
xid := zodb.Xid{Oid: oid, At: at}
buf, _, err := stor.Load(ctx, xid)
if err != nil {
return err
}
csum := crc32.ChecksumIEEE(buf.Data)
if csum != objcheckv[oid] {
return fmt.Errorf("%s: %s: crc32 mismatch: got %08x ; expect %08x",
url, oid, csum, objcheckv[oid])
}
buf.Release()
// XXX various scenarios are possible to select next object to read
oid = (oid + 1) % zodb.Oid(len(objcheckv))
}
return nil
})
}
err = wg.Wait()
if err != nil {
// XXX log. (not b.) - workaround for testing.Benchmark
// not allowing to detect failures.
log.Fatal(ctx, err)
}
})
// TODO latency distribution
tavg := float64(r.T) / float64(r.N) / float64(time.Microsecond)
latavg := float64(nwrk) * tavg
rps := float64(r.N) / r.T.Seconds()
topic := fmt.Sprintf(bench, "zwrk.go")
fmt.Printf("Benchmark%s-%d %d\t%.1f req/s %.3f latency-µs/object\n",
topic, nwrk, r.N, rps, latavg)
return nil
}
func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zodb.Tid, objcheckv []uint32, err error) {
defer task.Running(&ctx, "zwrk-prepare")(&err)
stor, err := zodb.OpenStorage(ctx, url, &zodb.OpenOptions{ReadOnly: true})
if err != nil {
return 0, nil, err
}
defer func() {
err2 := stor.Close()
err = xerr.First(err, err2)
}()
lastTid, err := stor.LastTid(ctx)
if err != nil {
return 0, nil, err
}
oid := zodb.Oid(0)
loop:
for {
xid := zodb.Xid{Oid: oid, At: lastTid}
buf, _, err := stor.Load(ctx, xid)
if err != nil {
switch errors.Cause(err).(type) {
case *zodb.NoObjectError:
break loop
default:
return 0, nil, err
}
}
// XXX Castagnoli is more strong and faster to compute
objcheckv = append(objcheckv, crc32.ChecksumIEEE(buf.Data))
if check != "" {
h.Write(buf.Data)
}
oid += 1
buf.Release()
}
// check the data read serially is indeed what was expected.
if check != "" {
hresult := fmt.Sprintf("%s:%x", h.name, h.Sum(nil))
if hresult != check {
return 0, nil, fmt.Errorf("%s: hash mismatch: expected %s ; got %s", url, check, hresult)
}
}
return lastTid, objcheckv, nil
}
// ----------------------------------------
var commands = prog.CommandRegistry{
{"zhash", zhashSummary, zhashUsage, zhashMain},
{"zwrk", zwrkSummary, zwrkUsage, zwrkMain},
}
func main() {
prog := prog.MainProg{
Name: "tzodb",
Summary: "tzodb is a tool to run ZODB-related benchmarks",
Commands: commands,
HelpTopics: nil,
}
defer log.Flush()
prog.Main()
}
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Nexedi SA and Contributors.
# Copyright (C) 2017-2018 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -18,7 +18,7 @@
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""zhash - compute hash of whole latest objects stream in a ZODB database"""
"""tzodb - ZODB-related benchmarks"""
from __future__ import print_function
......@@ -81,7 +81,7 @@ hashRegistry = {
def usage(w):
print(\
"""Usage: zhash [options] url
"""Usage: tzodb zhash [options] url
options:
......@@ -97,9 +97,14 @@ options:
--bench=<topic> use benchmarking format for output
""", file=w)
def main():
def zhash():
"""zhash - compute hash of whole latest objects stream in a ZODB database"""
if len(sys.argv) < 2 or sys.argv[1] != "zhash":
usage(sys.stderr)
exit(1)
try:
optv, argv = getopt(sys.argv[1:], "h", ["help", "check=", "bench="] + hashRegistry.keys())
optv, argv = getopt(sys.argv[2:], "h", ["help", "check=", "bench="] + hashRegistry.keys())
except GetoptError as e:
print("E: %s" % e, file=sys.stderr)
usage(sys.stderr)
......@@ -164,7 +169,7 @@ def main():
x = "zhash.py"
hresult = "%s:%s" % (h.name, h.hexdigest())
if bench is None:
print('%s ; oid=0..%d nread=%d t=%.3fs (%.1fμs / object) x=%s' % \
print('%s ; oid=0..%d nread=%d t=%.3fs (%.1fµs / object) x=%s' % \
(hresult, oid-1, nread, dt, dt * 1E6 / oid, x))
else:
topic = bench % x
......@@ -175,5 +180,9 @@ def main():
print("%s: hash mismatch: expected %s ; got %s\t# x=%s" % (url, check, hresult, x), file=sys.stderr)
sys.exit(1)
def main():
zhash() # XXX stub
if __name__ == '__main__':
main()
......@@ -32,6 +32,7 @@ import (
// OpenOptions describes options for OpenStorage
type OpenOptions struct {
ReadOnly bool // whether to open storage as read-only
NoCache bool // don't use cache for read/write operations
}
// DriverOpener is a function to open a storage driver
......@@ -69,6 +70,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
// XXX commonly handle some options from url -> opt?
// (e.g. ?readonly=1 -> opt.ReadOnly=true + remove ?readonly=1 from URL)
// ----//---- nocache
opener, ok := driverRegistry[u.Scheme]
if !ok {
......@@ -80,12 +82,16 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
return nil, err
}
return &storage{
IStorageDriver: storDriver,
var cache *Cache
if !opt.NoCache {
// small cache so that prefetch can work for loading
// XXX 512K hardcoded (= ~ 128 · 4K-entries)
l1cache: NewCache(storDriver, 128 * 4*1024),
cache = NewCache(storDriver, 128 * 4*1024)
}
return &storage{
IStorageDriver: storDriver,
l1cache: cache,
}, nil
}
......@@ -97,7 +103,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
// and other storage-independed higher-level functionality.
type storage struct {
IStorageDriver
l1cache *Cache
l1cache *Cache // can be =nil, if opened with NoCache
}
......@@ -106,9 +112,15 @@ type storage struct {
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX here: offload xid validation from cache and driver ?
// XXX here: offload wrapping err -> OpError{"load", err} ?
if s.l1cache != nil {
return s.l1cache.Load(ctx, xid)
} else {
return s.IStorageDriver.Load(ctx, xid)
}
}
func (s *storage) Prefetch(ctx context.Context, xid Xid) {
if s.l1cache != nil {
s.l1cache.Prefetch(ctx, xid)
}
}
......@@ -179,6 +179,7 @@ type IStorage interface {
// started, to complete.
//
// Prefetch does not return any error.
// Prefetch is noop if storage was opened with NoCache option.
Prefetch(ctx context.Context, xid Xid)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment