Commit fd1704ae authored by Elvis Pranskevichus's avatar Elvis Pranskevichus

Benchmark automation; initial commit

parent 342baab2
import asyncio
import httptools
class HttpRequest:
__slots__ = ('_protocol', '_url', '_headers', '_version')
def __init__(self, protocol, url, headers, version):
self._protocol = protocol
self._url = url
self._headers = headers
self._version = version
class HttpResponse:
__slots__ = ('_protocol', '_request', '_headers_sent')
def __init__(self, protocol, request):
self._protocol = protocol
self._request = request
self._headers_sent = False
def write(self, data):
if isinstance(data, str):
data = data.encode()
self._protocol._transport.writelines([
'HTTP/{} 200 OK\r\n'.format(
self._request._version).encode('latin-1'),
b'Content-Type: text/plain\r\n',
'Content-Length: {}\r\n'.format(len(data)).encode('latin-1'),
b'\r\n',
data
])
RESP = b'Hello World' * 512
class HttpProtocol(asyncio.Protocol):
__slots__ = ('_loop',
'_transport', '_current_request', '_current_parser',
'_current_url', '_current_headers')
def __init__(self, *, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._transport = None
self._current_request = None
self._current_parser = None
self._current_url = None
self._current_headers = None
def on_url(self, url):
self._current_url = url
def on_header(self, name, value):
self._current_headers.append((name, value))
def on_headers_complete(self):
self._current_request = HttpRequest(
self, self._current_url, self._current_headers,
self._current_parser.get_http_version())
self._loop.call_soon(
self.handle, self._current_request,
HttpResponse(self, self._current_request))
####
def connection_made(self, transport):
self._transport = transport
def connection_lost(self, exc):
self._current_request = self._current_parser = None
def data_received(self, data):
if self._current_parser is None:
assert self._current_request is None
self._current_headers = []
self._current_parser = httptools.HttpRequestParser(self)
self._current_parser.feed_data(data)
def handle(self, request, response):
response.write(RESP)
......@@ -56,7 +56,7 @@ if __name__ == '__main__':
if loop_type not in {'asyncio', 'uvloop'}:
abort('unrecognized loop type: {}'.format(loop_type))
if server_type not in {'aiohttp'}:
if server_type not in {'aiohttp', 'httptools'}:
abort('unrecognized server type: {}'.format(server_type))
if loop_type:
......
#!/bin/bash
_cache="$(realpath $(dirname $0)/.cache)"
docker run --rm -t -i -p 25000:25000 \
-v "${_cache}":/var/lib/cache magic/benchmark \
uvloop/examples/bench/http_server.py --addr='0.0.0.0:25000' "$@"
#!/usr/bin/env python3
import os.path
import socket
import subprocess
import sys
import time
_dir = os.path.dirname(__file__)
_cache = os.path.abspath(os.path.join(_dir, '.cache'))
http_client = "wrk --latency -d 30 -c 200 -t 4 http://127.0.0.1:25000/{msize}"
tcp_client = "./tcp_client --addr=127.0.0.1:25000 --workers=4 --msize={msize}"
http_server = "uvloop/examples/bench/http_server.py --addr='0.0.0.0:25000'"
server_base = ['docker', 'run', '--rm', '-t', '-p', '25000:25000',
'-v', '{_cache}:/var/lib/cache'.format(_cache=_cache),
'--name', 'magicbench', 'magic/benchmark']
benchmarks = [{
'title': 'TCP echo server (asyncio)',
'server': ['uvloop/examples/bench/server.py', '--addr=0.0.0.0:25000',
'--streams'],
'client': ['./tcp_client', '--addr=127.0.0.1:25000', '--workers=4'],
'warmup': ['--msize=1024', '--workers=4', '--duration=5'],
'variations': [{
'title': '1kb messages, concurrency 4',
'args': ['--msize=1024', '--workers=4', '--duration=30']
}, {
'title': '10kb messages, concurrency 4',
'args': ['--msize=10240', '--workers=4', '--duration=30']
}, {
'title': '100kb messages, concurrency 4',
'args': ['--msize=102400', '--workers=4', '--duration=30']
}]
}, {
'title': 'TCP echo server (uvloop)',
'server': ['uvloop/examples/bench/server.py', '--addr=0.0.0.0:25000',
'--streams', '--uvloop'],
'client': ['./tcp_client', '--addr=127.0.0.1:25000', '--workers=4'],
'warmup': ['--msize=1024', '--workers=4', '--duration=5'],
'variations': [{
'title': '1kb messages, concurrency 4',
'args': ['--msize=1024', '--workers=4', '--duration=30']
}, {
'title': '10kb messages, concurrency 4',
'args': ['--msize=10240', '--workers=4', '--duration=30']
}, {
'title': '100kb messages, concurrency 4',
'args': ['--msize=102400', '--workers=4', '--duration=30']
}]
}]
def abort(msg):
print(msg, file=sys.stdout)
sys.exit(1)
def start_and_wait_for_server(server_cmd, timeout=10):
server = subprocess.Popen(server_cmd, universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start = time.monotonic()
while time.monotonic() - start < timeout:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(time.monotonic() - start)
try:
sock.connect(('127.0.0.1', 25000))
sock.sendall(b'ping')
if sock.recv(4):
print('Server is up and running.')
else:
raise IOError('socket read')
except IOError:
if server.returncode is not None:
abort('Could not start server\n' +
'----------------------\n' +
server.communicate()[1])
else:
sock.close()
return server
kill_server(server)
abort('Could not start server\n' +
'----------------------\n' +
server.communicate()[1])
def kill_server(server):
if server.returncode is None:
print('Shutting down server...')
subprocess.check_output(['docker', 'stop', 'magicbench'])
server.wait()
def main():
for benchmark in benchmarks:
print(benchmark['title'])
print('=' * len(benchmark['title']))
print()
print('Starting server...')
server_cmd = server_base + benchmark['server']
print(' ' + ' '.join(server_cmd))
server = start_and_wait_for_server(server_cmd)
print()
print('Warming up server...')
warmup_cmd = benchmark['client'] + benchmark['warmup']
print(' '.join(warmup_cmd))
subprocess.check_output(warmup_cmd)
try:
for variation in benchmark['variations']:
print(variation['title'])
print('-' * len(variation['title']))
client_cmd = benchmark['client'] + variation['args']
print(' '.join(client_cmd))
subprocess.check_call(client_cmd)
print()
finally:
kill_server(server)
print()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
#
# Copied with minimal modifications from curio
# https://github.com/dabeaz/curio
from concurrent import futures
import argparse
import socket
import time
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--msize', default=1000, type=int,
help='message size in bytes')
parser.add_argument('--duration', '-T', default=30, type=int,
help='duration of test in seconds')
parser.add_argument('--times', default=1, type=int,
help='number of times to run the test')
parser.add_argument('--workers', default=3, type=int,
help='number of workers')
parser.add_argument('--addr', default='127.0.0.1:25000', type=str,
help='number of workers')
args = parser.parse_args()
unix = False
if args.addr.startswith('file:'):
unix = True
addr = args.addr[5:]
else:
addr = args.addr.split(':')
addr[1] = int(addr[1])
addr = tuple(addr)
MSGSIZE = args.msize
msg = b'x' * MSGSIZE
def run_test(start, duration):
if unix:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(addr)
n = 0
while time.monotonic() - start < duration:
sock.sendall(msg)
nrecv = 0
while nrecv < MSGSIZE:
resp = sock.recv(MSGSIZE)
if not resp:
raise SystemExit()
nrecv += len(resp)
n += 1
return n
TIMES = args.times
N = args.workers
DURATION = args.duration
messages = 0
start = time.monotonic()
for _ in range(TIMES):
with futures.ProcessPoolExecutor(max_workers=N) as e:
fs = []
for _ in range(N):
fs.append(e.submit(run_test, start, DURATION))
res = futures.wait(fs)
for fut in res.done:
messages += fut.result()
end = time.monotonic()
duration = end - start
print(messages, 'in', round(duration, 2))
print(round(messages / duration, 2), 'requests/sec')
throughput = (messages * MSGSIZE / (1024 * 1024)) / duration
print(round(throughput, 2), 'MiB/sec')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment