Commit b77b5f81 authored by Elvis Pranskevichus's avatar Elvis Pranskevichus

Add support for Unix socket testing, JSON output of benchmarks, selective running

parent e7fe9dc8
......@@ -13,18 +13,20 @@ RUN mkdir -p /usr/local/python-venvs
RUN DEBIAN_FRONTEND=noninteractive \
apt-get update && apt-get install -y \
autoconf automake libtool build-essential \
python3 python3-pip git nodejs
python3 python3-pip git nodejs gosu
RUN pip3 install vex
RUN vex --python=python3.5 -m bench pip install -U pip
RUN mkdir -p /var/lib/cache/pip
ADD http_server.py /tmp/http_server.py
ADD torecho.py /tmp/torecho.py
ADD requirements.txt /tmp/requirements.txt
EXPOSE 25000
VOLUME /var/lib/cache
VOLUME /tmp/sockets
ENTRYPOINT ["/entrypoint"]
......
......@@ -6,6 +6,8 @@
import argparse
from concurrent import futures
import json
import math
import socket
import time
......@@ -26,6 +28,8 @@ if __name__ == '__main__':
help='socket timeout in seconds')
parser.add_argument('--addr', default='127.0.0.1:25000', type=str,
help='server address')
parser.add_argument('--output-format', default='text', type=str,
help='output format', choices=['text', 'json'])
args = parser.parse_args()
unix = False
......@@ -49,11 +53,13 @@ if __name__ == '__main__':
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.settimeout(15)
sock.connect(addr)
n = 0
latency_stats = np.zeros((timeout * 10,))
latency_stats = np.zeros((timeout * 100,))
min_latency = float('inf')
max_latency = 0.0
while time.monotonic() - start < duration:
req_start = time.monotonic()
......@@ -64,49 +70,42 @@ if __name__ == '__main__':
if not resp:
raise SystemExit()
nrecv += len(resp)
req_time = round((time.monotonic() - req_start) * 10000)
req_time = round((time.monotonic() - req_start) * 100000)
if req_time > max_latency:
max_latency = req_time
if req_time < min_latency:
min_latency = req_time
latency_stats[req_time] += 1
n += 1
return n, latency_stats
return n, latency_stats, min_latency, max_latency
def weighted_quantile(values, quantiles, sample_weight=None,
values_sorted=False, old_style=False):
def weighted_quantile(values, quantiles, weights):
""" Very close to np.percentile, but supports weights.
NOTE: quantiles should be in [0, 1]!
:param values: np.array with data
:param quantiles: array-like with many quantiles needed
:param sample_weight: array-like of the same length as `array`
:param values_sorted: bool, if True, then will avoid sorting of initial array
:param old_style: if True, will correct output to be consistent with np.percentile.
:param quantiles: array-like with many quantiles needed,
quantiles should be in [0, 1]!
:param weights: array-like of the same length as `array`
:return: np.array with computed quantiles.
"""
values = np.array(values)
quantiles = np.array(quantiles)
if sample_weight is None:
sample_weight = np.ones(len(values))
sample_weight = np.array(sample_weight)
weights = np.array(weights)
assert np.all(quantiles >= 0) and np.all(quantiles <= 1), \
'quantiles should be in [0, 1]'
if not values_sorted:
sorter = np.argsort(values)
values = values[sorter]
sample_weight = sample_weight[sorter]
weighted_quantiles = np.cumsum(weights) - 0.5 * weights
weighted_quantiles /= np.sum(weights)
weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight
if old_style:
# To be convenient with np.percentile
weighted_quantiles -= weighted_quantiles[0]
weighted_quantiles /= weighted_quantiles[-1]
else:
weighted_quantiles /= np.sum(sample_weight)
return np.interp(quantiles, weighted_quantiles, values)
TIMES = args.times
N = args.concurrency
DURATION = args.duration
min_latency = float('inf')
max_latency = 0.0
messages = 0
latency_stats = None
start = time.monotonic()
......@@ -118,35 +117,78 @@ if __name__ == '__main__':
res = futures.wait(fs)
for fut in res.done:
t_messages, t_latency_stats = fut.result()
t_messages, t_latency_stats, t_min_latency, t_max_latency = \
fut.result()
messages += t_messages
if latency_stats is None:
latency_stats = t_latency_stats
else:
latency_stats = np.add(latency_stats, t_latency_stats)
if t_max_latency > max_latency:
max_latency = t_max_latency
if t_min_latency < min_latency:
min_latency = t_min_latency
end = time.monotonic()
duration = end - start
arange = np.arange(len(latency_stats))
stddev = np.std(arange)
weighted_latency = np.multiply(latency_stats, arange)
mean_latency = np.sum(weighted_latency) / messages
mean_latency = np.average(arange, weights=latency_stats)
variance = np.average((arange - mean_latency) ** 2, weights=latency_stats)
latency_std = math.sqrt(variance)
latency_cv = latency_std / mean_latency
percentiles = [50, 75, 90, 99]
percentile_data = []
quantiles = weighted_quantile(arange, [p / 100 for p in percentiles],
sample_weight=latency_stats,
values_sorted=True)
weights=latency_stats)
for i, percentile in enumerate(percentiles):
percentile_data.append('{}%: {}ms'.format(
percentile, round(quantiles[i], 2)))
print(messages, 'in', round(duration, 2))
print('Latency avg: {}ms'.format(round(mean_latency, 2)))
print('Latency distribution: {}'.format('; '.join(percentile_data)))
print('Requests/sec: {}'.format(round(messages / duration, 2)))
transfer = (messages * MSGSIZE / (1024 * 1024)) / duration
print('Transfer/sec: {}MiB'.format(round(transfer, 2)))
percentile_data.append((percentile, round(quantiles[i] / 100, 3)))
data = dict(
messages=messages,
transfer=round((messages * MSGSIZE / (1024 * 1024)) / DURATION, 2),
rps=round(messages / DURATION, 2),
latency_min=round(min_latency / 100, 3),
latency_mean=round(mean_latency / 100, 3),
latency_max=round(max_latency / 100, 3),
latency_std=round(latency_std / 100, 3),
latency_cv=round(latency_cv * 100, 2),
latency_percentiles=percentile_data
)
if args.output_format == 'json':
data['latency_percentiles'] = json.dumps(percentile_data)
output = '''\
{{
"messages": {messages},
"transfer": {transfer},
"rps": {rps},
"latency_min": {latency_min},
"latency_mean": {latency_mean},
"latency_max": {latency_max},
"latency_std": {latency_std},
"latency_cv": {latency_cv},
"latency_percentiles": {latency_percentiles}
}}'''.format(**data)
else:
data['latency_percentiles'] = '; '.join(
'{}% under {}ms'.format(*v) for v in percentile_data)
output = '''\
{messages} {size}KiB messages in {duration} seconds
Latency: min {latency_min}ms; max {latency_max}ms; mean {latency_mean}ms; \
std: {latency_std}ms ({latency_cv}%)
Latency distribtion: {latency_percentiles}
Requests/sec: {rps}
Transfer/sec: {transfer}MiB
'''.format(duration=DURATION, size=round(MSGSIZE / 1024, 2), **data)
print(output)
......@@ -10,5 +10,7 @@ git clone https://github.com/dabeaz/curio.git
git clone https://github.com/MagicStack/uvloop.git
cp /tmp/http_server.py uvloop/examples/bench/
echo "Running server on port 25000..."
"$@"
UID=${UID:-0}
GID=${GID:-0}
gosu ${UID}:${GID} "$@"
......@@ -2,97 +2,170 @@
import argparse
import multiprocessing
import json
import os
import os.path
import socket
import subprocess
import sys
import textwrap
import time
_dir = os.path.dirname(__file__)
_cache = os.path.abspath(os.path.join(_dir, '.cache'))
_socket = os.path.abspath(os.path.join(_dir, 'sockets'))
http_client = "wrk --latency -d 30 -c 200 -t 4 http://127.0.0.1:25000/{msize}"
http_server = "uvloop/examples/bench/http_server.py --addr='0.0.0.0:25000'"
server_base = ['docker', 'run', '--rm', '-t', '-p', '25000:25000',
'-e', 'UID={}'.format(os.geteuid()),
'-e', 'GID={}'.format(os.getegid()),
'-v', '{_cache}:/var/lib/cache'.format(_cache=_cache),
'-v', '{_socket}:/tmp/sockets'.format(_socket=_socket),
'--name', 'magicbench', 'magic/benchmark']
python = ['vex', 'bench', 'python']
nodejs = ['nodejs']
benchmarks = [{
'title': 'TCP echo server (asyncio)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=0.0.0.0:25000',
'--streams'],
'client': ['./tcp_client', '--addr=127.0.0.1:25000', '--concurrency=4'],
'warmup': ['--msize=1024', '--concurrency=4', '--duration=5'],
'variations': [{
'title': '1kb messages, concurrency 4',
'args': ['--msize=1024', '--concurrency=4']
}, {
'title': '10kb messages, concurrency 4',
'args': ['--msize=10240', '--concurrency=4']
}, {
'title': '100kb messages, concurrency 4',
'args': ['--msize=102400', '--concurrency=4']
}]
echo_variations = [{
'name': '1kb',
'title': '1kb messages, concurrency 4',
'args': ['--msize=1024', '--concurrency=4']
}, {
'name': '10kb',
'title': '10kb messages, concurrency 4',
'args': ['--msize=10240', '--concurrency=4']
}, {
'name': '100kb',
'title': '100kb messages, concurrency 4',
'args': ['--msize=102400', '--concurrency=4']
}]
echo_client = ['./echo_client', '--concurrency=4', '--output-format=json']
echo_warmup = ['--msize=1024', '--concurrency=4', '--duration=5']
tcp_address = '127.0.0.1:25000'
unix_address = 'file:{_socket}/server.sock'.format(_socket=_socket)
tcp_client = echo_client + ['--addr={}'.format(tcp_address)]
unix_client = echo_client + ['--addr={}'.format(unix_address)]
benchmarks = [{
'name': 'tcpecho-gevent',
'title': 'TCP echo server (gevent)',
'server': python + ['curio/examples/bench/gevecho.py'],
'client': ['./tcp_client', '--addr=127.0.0.1:25000', '--concurrency=4'],
'warmup': ['--msize=1024', '--concurrency=4', '--duration=5'],
'variations': [{
'title': '1kb messages, concurrency 4',
'args': ['--msize=1024', '--concurrency=4']
}, {
'title': '10kb messages, concurrency 4',
'args': ['--msize=10240', '--concurrency=4']
}, {
'title': '100kb messages, concurrency 4',
'args': ['--msize=102400', '--concurrency=4']
}]
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'tcpecho-tornado',
'title': 'TCP echo server (tornado)',
'server': python + ['/tmp/torecho.py'],
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'tcpecho-curio',
'title': 'TCP echo server (curio)',
'server': python + ['curio/examples/bench/curioecho.py'],
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'tcpecho-nodejs',
'title': 'TCP echo server (nodejs)',
'server': nodejs + ['curio/examples/bench/nodeecho.js'],
'client': ['./tcp_client', '--addr=127.0.0.1:25000', '--concurrency=4'],
'warmup': ['--msize=1024', '--concurrency=4', '--duration=5'],
'variations': [{
'title': '1kb messages, concurrency 4',
'args': ['--msize=1024', '--concurrency=4']
}, {
'title': '10kb messages, concurrency 4',
'args': ['--msize=10240', '--concurrency=4']
}, {
'title': '100kb messages, concurrency 4',
'args': ['--msize=102400', '--concurrency=4']
}]
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'tcpecho-asyncio-stdstreams',
'title': 'TCP echo server (asyncio/stdstreams)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=0.0.0.0:25000',
'--streams'],
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'tcpecho-asyncio-minproto',
'title': 'TCP echo server (asyncio/minproto)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=0.0.0.0:25000',
'--proto'],
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'unixecho-asyncio-stdstreams',
'title': 'Unix socket echo server (asyncio/stdstreams)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=file:/tmp/sockets/server.sock',
'--streams'],
'server_address': unix_address,
'client': unix_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'unixecho-asyncio-minproto',
'title': 'Unix socket echo server (asyncio/minproto)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=file:/tmp/sockets/server.sock',
'--proto'],
'server_address': unix_address,
'client': unix_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'tcpecho-uvloop-stdstreams',
'title': 'TCP echo server (uvloop/stdstreams)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=0.0.0.0:25000',
'--streams', '--uvloop'],
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'title': 'TCP echo server (uvloop)',
'name': 'tcpecho-uvloop-minproto',
'title': 'TCP echo server (uvloop/minproto)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=0.0.0.0:25000',
'--proto', '--uvloop'],
'server_address': tcp_address,
'client': tcp_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'unixecho-uvloop-stdstreams',
'title': 'Unix socket echo server (uvloop/stdstreams)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=file:/tmp/sockets/server.sock',
'--streams', '--uvloop'],
'client': ['./tcp_client', '--addr=127.0.0.1:25000', '--concurrency=4'],
'warmup': ['--msize=1024', '--concurrency=4', '--duration=5'],
'variations': [{
'title': '1kb messages, concurrency 4',
'args': ['--msize=1024', '--concurrency=4']
}, {
'title': '10kb messages, concurrency 4',
'args': ['--msize=10240', '--concurrency=4']
}, {
'title': '100kb messages, concurrency 4',
'args': ['--msize=102400', '--concurrency=4']
}]
'server_address': unix_address,
'client': unix_client,
'warmup': echo_warmup,
'variations': echo_variations,
}, {
'name': 'unixecho-uvloop-minproto',
'title': 'Unix socket echo server (uvloop/minproto)',
'server': python + ['uvloop/examples/bench/server.py',
'--addr=file:/tmp/sockets/server.sock',
'--proto', '--uvloop'],
'server_address': unix_address,
'client': unix_client,
'warmup': echo_warmup,
'variations': echo_variations,
}]
......@@ -101,17 +174,30 @@ def abort(msg):
sys.exit(1)
def start_and_wait_for_server(server_cmd, timeout=60):
def start_and_wait_for_server(server_cmd, address, timeout=60):
kill_server()
server = subprocess.Popen(server_cmd, universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start = time.monotonic()
if address.startswith('file:'):
family = socket.AF_UNIX
addr = address[5:]
else:
family = socket.AF_INET
addr = address.split(':')
addr[1] = int(addr[1])
addr = tuple(addr)
print('Trying to connect to server at address {}'.format(address))
while time.monotonic() - start < timeout:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = socket.socket(family, socket.SOCK_STREAM)
sock.settimeout(time.monotonic() - start)
try:
sock.connect(('127.0.0.1', 25000))
sock.connect(addr)
sock.sendall(b'ping')
if sock.recv(4):
print('Server is up and running.')
......@@ -126,33 +212,70 @@ def start_and_wait_for_server(server_cmd, timeout=60):
sock.close()
return server
kill_server(server)
kill_server()
abort('Could not start server\n' +
'----------------------\n' +
'\n\n'.join(server.communicate()))
def kill_server(server):
if server.returncode is None:
def server_is_running():
try:
ret = subprocess.check_output(
['docker', 'inspect', '--type=container',
'--format="{{ .State.Running }}"', 'magicbench'],
stderr=subprocess.DEVNULL,
universal_newlines=True)
except subprocess.CalledProcessError:
return False
else:
return ret == 'true\n'
def server_container_exists():
ret = subprocess.call(['docker', 'inspect', '--type=container',
'magicbench'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return ret == 0
def kill_server():
if server_is_running():
print('Shutting down server...')
subprocess.check_output(['docker', 'stop', 'magicbench'])
server.wait()
ret = subprocess.call(['docker', 'inspect', '--type=container',
'magicbench'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
if ret == 0:
subprocess.check_output(['docker', 'rm', 'magicbench'])
if server_container_exists():
print('Removing server container...')
subprocess.check_output(['docker', 'rm', 'magicbench'])
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--duration', '-T', default=30, type=int,
parser.add_argument('--duration', '-D', default=30, type=int,
help='duration of each benchmark in seconds')
parser.add_argument('--benchmarks', type=str,
help='comma-separated list of benchmarks to run')
parser.add_argument('--save-json', '-J', default='text', type=str,
help='path to save benchmark results in JSON format')
args = parser.parse_args()
if not os.path.exists(_socket):
os.mkdir(_socket)
if args.benchmarks:
benchmarks_to_run = args.benchmarks.split(',')
else:
benchmarks_to_run = {b['name'] for b in benchmarks}
benchmarks_data = []
for benchmark in benchmarks:
if benchmark['name'] not in benchmarks_to_run:
continue
print(benchmark['title'])
print('=' * len(benchmark['title']))
print()
......@@ -160,7 +283,7 @@ def main():
print('Starting server...')
server_cmd = server_base + benchmark['server']
print(' ' + ' '.join(server_cmd))
server = start_and_wait_for_server(server_cmd)
start_and_wait_for_server(server_cmd, benchmark['server_address'])
print()
print('Warming up server...')
......@@ -171,6 +294,13 @@ def main():
duration = args.duration
benchmark_data = {
'name': benchmark['name'],
'variations': []
}
benchmarks_data.append(benchmark_data)
try:
for variation in benchmark['variations']:
title = 'BENCHMARK: {}'.format(variation['title'])
......@@ -179,13 +309,38 @@ def main():
client_cmd = benchmark['client'] + variation['args']
client_cmd += ['--duration={}'.format(duration)]
print(' '.join(client_cmd))
subprocess.check_call(client_cmd)
print()
output = subprocess.check_output(
client_cmd, universal_newlines=True)
data = json.loads(output)
format_data = data.copy()
format_data['latency_percentiles'] = '; '.join(
'{}% under {}ms'.format(*v)
for v in data['latency_percentiles'])
output = textwrap.dedent('''\
{messages} messages in {duration} seconds
Latency: min {latency_min}ms; max {latency_max}ms; mean {latency_mean}ms; std: {latency_std}ms ({latency_cv}%)
Latency distribtion: {latency_percentiles}
Requests/sec: {rps}
Transfer/sec: {transfer}MiB
''').format(duration=duration, **format_data)
print(output)
data['name'] = variation['name']
benchmark_data['variations'].append(data)
finally:
kill_server(server)
kill_server()
print()
if args.save_json:
with open(args.save_json, 'w') as f:
json.dump(benchmarks_data, f)
if __name__ == '__main__':
main()
from tornado.ioloop import IOLoop
from tornado.tcpserver import TCPServer
class StreamHandler:
def __init__(self, stream):
self._stream = stream
self._stream.read_until_close(None, self._handle_read)
def _handle_read(self, data):
self._stream.write(data)
class EchoServer(TCPServer):
def handle_stream(self, stream, address):
StreamHandler(stream)
if __name__ == '__main__':
server = EchoServer()
server.bind(25000)
server.start(1)
IOLoop.instance().start()
IOLoop.instance().close()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment