Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
9a386826
Commit
9a386826
authored
Jan 18, 2010
by
Antoine Pitrou
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add ccbench to the Tools directory
parent
c9d6238b
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
462 additions
and
0 deletions
+462
-0
Tools/ccbench/ccbench.py
Tools/ccbench/ccbench.py
+462
-0
No files found.
Tools/ccbench/ccbench.py
0 → 100644
View file @
9a386826
# -*- coding: utf-8 -*-
# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
from
__future__
import
division
from
__future__
import
print_function
"""
ccbench, a Python concurrency benchmark.
"""
import
time
import
os
import
sys
import
functools
import
itertools
import
threading
import
subprocess
import
socket
from
optparse
import
OptionParser
,
SUPPRESS_HELP
import
platform
# Compatibility
try
:
xrange
except
NameError
:
xrange
=
range
try
:
map
=
itertools
.
imap
except
AttributeError
:
pass
THROUGHPUT_DURATION
=
2.0
LATENCY_PING_INTERVAL
=
0.1
LATENCY_DURATION
=
2.0
def
task_pidigits
():
"""Pi calculation (Python)"""
_map
=
map
_count
=
itertools
.
count
_islice
=
itertools
.
islice
def
calc_ndigits
(
n
):
# From http://shootout.alioth.debian.org/
def
gen_x
():
return
_map
(
lambda
k
:
(
k
,
4
*
k
+
2
,
0
,
2
*
k
+
1
),
_count
(
1
))
def
compose
(
a
,
b
):
aq
,
ar
,
as_
,
at
=
a
bq
,
br
,
bs
,
bt
=
b
return
(
aq
*
bq
,
aq
*
br
+
ar
*
bt
,
as_
*
bq
+
at
*
bs
,
as_
*
br
+
at
*
bt
)
def
extract
(
z
,
j
):
q
,
r
,
s
,
t
=
z
return
(
q
*
j
+
r
)
//
(
s
*
j
+
t
)
def
pi_digits
():
z
=
(
1
,
0
,
0
,
1
)
x
=
gen_x
()
while
1
:
y
=
extract
(
z
,
3
)
while
y
!=
extract
(
z
,
4
):
z
=
compose
(
z
,
next
(
x
))
y
=
extract
(
z
,
3
)
z
=
compose
((
10
,
-
10
*
y
,
0
,
1
),
z
)
yield
y
return
list
(
_islice
(
pi_digits
(),
n
))
return
calc_ndigits
,
(
50
,
)
def
task_regex
():
"""regular expression (C)"""
# XXX this task gives horrendous latency results.
import
re
# Taken from the `inspect` module
pat
=
re
.
compile
(
r'^(\
s*de
f\
s)|(.*(?<!
\w)lambda(:|\
s))|^(
\s*@)'
,
re
.
MULTILINE
)
with
open
(
__file__
,
"r"
)
as
f
:
arg
=
f
.
read
(
2000
)
def
findall
(
s
):
t
=
time
.
time
()
try
:
return
pat
.
findall
(
s
)
finally
:
print
(
time
.
time
()
-
t
)
return
pat
.
findall
,
(
arg
,
)
def
task_sort
():
"""list sorting (C)"""
def
list_sort
(
l
):
l
=
l
[::
-
1
]
l
.
sort
()
return
list_sort
,
(
list
(
range
(
1000
)),
)
def
task_compress_zlib
():
"""zlib compression (C)"""
import
zlib
with
open
(
__file__
,
"rb"
)
as
f
:
arg
=
f
.
read
(
5000
)
*
3
def
compress
(
s
):
zlib
.
decompress
(
zlib
.
compress
(
s
,
5
))
return
compress
,
(
arg
,
)
def
task_compress_bz2
():
"""bz2 compression (C)"""
import
bz2
with
open
(
__file__
,
"rb"
)
as
f
:
arg
=
f
.
read
(
3000
)
*
2
def
compress
(
s
):
bz2
.
compress
(
s
)
return
compress
,
(
arg
,
)
def
task_hashing
():
"""SHA1 hashing (C)"""
import
hashlib
with
open
(
__file__
,
"rb"
)
as
f
:
arg
=
f
.
read
(
5000
)
*
30
def
compute
(
s
):
hashlib
.
sha1
(
s
).
digest
()
return
compute
,
(
arg
,
)
throughput_tasks
=
[
task_pidigits
,
task_regex
]
for
mod
in
'bz2'
,
'hashlib'
:
try
:
globals
()[
mod
]
=
__import__
(
mod
)
except
ImportError
:
globals
()[
mod
]
=
None
# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
# hashlib if available.
# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
if
bz2
is
not
None
:
throughput_tasks
.
append
(
task_compress_bz2
)
elif
hashlib
is
not
None
:
throughput_tasks
.
append
(
task_hashing
)
else
:
throughput_tasks
.
append
(
task_compress_zlib
)
latency_tasks
=
throughput_tasks
class
TimedLoop
:
def
__init__
(
self
,
func
,
args
):
self
.
func
=
func
self
.
args
=
args
def
__call__
(
self
,
start_time
,
min_duration
,
end_event
,
do_yield
=
False
):
step
=
20
niters
=
0
duration
=
0.0
_time
=
time
.
time
_sleep
=
time
.
sleep
_func
=
self
.
func
_args
=
self
.
args
t1
=
start_time
while
True
:
for
i
in
range
(
step
):
_func
(
*
_args
)
t2
=
_time
()
# If another thread terminated, the current measurement is invalid
# => return the previous one.
if
end_event
:
return
niters
,
duration
niters
+=
step
duration
=
t2
-
start_time
if
duration
>=
min_duration
:
end_event
.
append
(
None
)
return
niters
,
duration
if
t2
-
t1
<
0.01
:
# Minimize interference of measurement on overall runtime
step
=
step
*
3
//
2
elif
do_yield
:
# OS scheduling of Python threads is sometimes so bad that we
# have to force thread switching ourselves, otherwise we get
# completely useless results.
_sleep
(
0.0001
)
t1
=
t2
def
run_throughput_test
(
func
,
args
,
nthreads
):
assert
nthreads
>=
1
# Warm up
func
(
*
args
)
results
=
[]
loop
=
TimedLoop
(
func
,
args
)
end_event
=
[]
if
nthreads
==
1
:
# Pure single-threaded performance, without any switching or
# synchronization overhead.
start_time
=
time
.
time
()
results
.
append
(
loop
(
start_time
,
THROUGHPUT_DURATION
,
end_event
,
do_yield
=
False
))
return
results
started
=
False
ready_cond
=
threading
.
Condition
()
start_cond
=
threading
.
Condition
()
ready
=
[]
def
run
():
with
ready_cond
:
ready
.
append
(
None
)
ready_cond
.
notify
()
with
start_cond
:
while
not
started
:
start_cond
.
wait
()
results
.
append
(
loop
(
start_time
,
THROUGHPUT_DURATION
,
end_event
,
do_yield
=
True
))
threads
=
[]
for
i
in
range
(
nthreads
):
threads
.
append
(
threading
.
Thread
(
target
=
run
))
for
t
in
threads
:
t
.
setDaemon
(
True
)
t
.
start
()
# We don't want measurements to include thread startup overhead,
# so we arrange for timing to start after all threads are ready.
with
ready_cond
:
while
len
(
ready
)
<
nthreads
:
ready_cond
.
wait
()
with
start_cond
:
start_time
=
time
.
time
()
started
=
True
start_cond
.
notify
(
nthreads
)
for
t
in
threads
:
t
.
join
()
return
results
def
run_throughput_tests
(
max_threads
):
for
task
in
throughput_tasks
:
print
(
task
.
__doc__
)
print
()
func
,
args
=
task
()
nthreads
=
1
baseline_speed
=
None
while
nthreads
<=
max_threads
:
results
=
run_throughput_test
(
func
,
args
,
nthreads
)
# Taking the max duration rather than average gives pessimistic
# results rather than optimistic.
speed
=
sum
(
r
[
0
]
for
r
in
results
)
/
max
(
r
[
1
]
for
r
in
results
)
print
(
"threads=%d: %d"
%
(
nthreads
,
speed
),
end
=
""
)
if
baseline_speed
is
None
:
print
(
" iterations/s."
)
baseline_speed
=
speed
else
:
print
(
" ( %d %%)"
%
(
speed
/
baseline_speed
*
100
))
nthreads
+=
1
print
()
LAT_END
=
"END"
def
_sendto
(
sock
,
s
,
addr
):
sock
.
sendto
(
s
.
encode
(
'ascii'
),
addr
)
def
_recv
(
sock
,
n
):
return
sock
.
recv
(
n
).
decode
(
'ascii'
)
def
latency_client
(
addr
,
nb_pings
,
interval
):
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
_time
=
time
.
time
_sleep
=
time
.
sleep
def
_ping
():
_sendto
(
sock
,
"%r
\
n
"
%
_time
(),
addr
)
# The first ping signals the parent process that we are ready.
_ping
()
# We give the parent a bit of time to notice.
_sleep
(
1.0
)
for
i
in
range
(
nb_pings
):
_sleep
(
interval
)
_ping
()
_sendto
(
sock
,
LAT_END
+
"
\
n
"
,
addr
)
def
run_latency_client
(
**
kwargs
):
cmd_line
=
[
sys
.
executable
,
'-E'
,
os
.
path
.
abspath
(
__file__
)]
cmd_line
.
extend
([
'--latclient'
,
repr
(
kwargs
)])
return
subprocess
.
Popen
(
cmd_line
)
#, stdin=subprocess.PIPE,
#stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def
run_latency_test
(
func
,
args
,
nthreads
):
# Create a listening socket to receive the pings. We use UDP which should
# be painlessly cross-platform.
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
sock
.
bind
((
"127.0.0.1"
,
0
))
addr
=
sock
.
getsockname
()
interval
=
LATENCY_PING_INTERVAL
duration
=
LATENCY_DURATION
nb_pings
=
int
(
duration
/
interval
)
results
=
[]
threads
=
[]
end_event
=
[]
start_cond
=
threading
.
Condition
()
started
=
False
if
nthreads
>
0
:
# Warm up
func
(
*
args
)
results
=
[]
loop
=
TimedLoop
(
func
,
args
)
ready
=
[]
ready_cond
=
threading
.
Condition
()
def
run
():
with
ready_cond
:
ready
.
append
(
None
)
ready_cond
.
notify
()
with
start_cond
:
while
not
started
:
start_cond
.
wait
()
loop
(
start_time
,
duration
*
1.5
,
end_event
,
do_yield
=
False
)
for
i
in
range
(
nthreads
):
threads
.
append
(
threading
.
Thread
(
target
=
run
))
for
t
in
threads
:
t
.
setDaemon
(
True
)
t
.
start
()
# Wait for threads to be ready
with
ready_cond
:
while
len
(
ready
)
<
nthreads
:
ready_cond
.
wait
()
# Run the client and wait for the first ping(s) to arrive before
# unblocking the background threads.
chunks
=
[]
process
=
run_latency_client
(
addr
=
sock
.
getsockname
(),
nb_pings
=
nb_pings
,
interval
=
interval
)
s
=
_recv
(
sock
,
4096
)
_time
=
time
.
time
with
start_cond
:
start_time
=
_time
()
started
=
True
start_cond
.
notify
(
nthreads
)
while
LAT_END
not
in
s
:
s
=
_recv
(
sock
,
4096
)
t
=
_time
()
chunks
.
append
((
t
,
s
))
# Tell the background threads to stop.
end_event
.
append
(
None
)
for
t
in
threads
:
t
.
join
()
process
.
wait
()
for
recv_time
,
chunk
in
chunks
:
# NOTE: it is assumed that a line sent by a client wasn't received
# in two chunks because the lines are very small.
for
line
in
chunk
.
splitlines
():
line
=
line
.
strip
()
if
line
and
line
!=
LAT_END
:
send_time
=
eval
(
line
)
assert
isinstance
(
send_time
,
float
)
results
.
append
((
send_time
,
recv_time
))
return
results
def
run_latency_tests
(
max_threads
):
for
task
in
latency_tasks
:
print
(
"Background CPU task:"
,
task
.
__doc__
)
print
()
func
,
args
=
task
()
nthreads
=
0
while
nthreads
<=
max_threads
:
results
=
run_latency_test
(
func
,
args
,
nthreads
)
n
=
len
(
results
)
# We print out milliseconds
lats
=
[
1000
*
(
t2
-
t1
)
for
(
t1
,
t2
)
in
results
]
#print(list(map(int, lats)))
avg
=
sum
(
lats
)
/
n
dev
=
(
sum
((
x
-
avg
)
**
2
for
x
in
lats
)
/
n
)
**
0.5
print
(
"CPU threads=%d: %d ms. (std dev: %d ms.)"
%
(
nthreads
,
avg
,
dev
),
end
=
""
)
print
()
#print(" [... from %d samples]" % n)
nthreads
+=
1
print
()
def
main
():
usage
=
"usage: %prog [-h|--help] [options]"
parser
=
OptionParser
(
usage
=
usage
)
parser
.
add_option
(
"-t"
,
"--throughput"
,
action
=
"store_true"
,
dest
=
"throughput"
,
default
=
False
,
help
=
"run throughput tests"
)
parser
.
add_option
(
"-l"
,
"--latency"
,
action
=
"store_true"
,
dest
=
"latency"
,
default
=
False
,
help
=
"run latency tests"
)
parser
.
add_option
(
"-i"
,
"--interval"
,
action
=
"store"
,
type
=
"int"
,
dest
=
"check_interval"
,
default
=
None
,
help
=
"sys.setcheckinterval() value"
)
parser
.
add_option
(
"-I"
,
"--switch-interval"
,
action
=
"store"
,
type
=
"float"
,
dest
=
"switch_interval"
,
default
=
None
,
help
=
"sys.setswitchinterval() value"
)
parser
.
add_option
(
"-n"
,
"--num-threads"
,
action
=
"store"
,
type
=
"int"
,
dest
=
"nthreads"
,
default
=
4
,
help
=
"max number of threads in tests"
)
# Hidden option to run the pinging client
parser
.
add_option
(
""
,
"--latclient"
,
action
=
"store"
,
dest
=
"latclient"
,
default
=
None
,
help
=
SUPPRESS_HELP
)
options
,
args
=
parser
.
parse_args
()
if
args
:
parser
.
error
(
"unexpected arguments"
)
if
options
.
latclient
:
kwargs
=
eval
(
options
.
latclient
)
latency_client
(
**
kwargs
)
return
if
not
options
.
throughput
and
not
options
.
latency
:
options
.
throughput
=
options
.
latency
=
True
if
options
.
check_interval
:
sys
.
setcheckinterval
(
options
.
check_interval
)
if
options
.
switch_interval
:
sys
.
setswitchinterval
(
options
.
switch_interval
)
print
(
"== %s %s (%s) =="
%
(
platform
.
python_implementation
(),
platform
.
python_version
(),
platform
.
python_build
()[
0
],
))
# Processor identification often has repeated spaces
cpu
=
' '
.
join
(
platform
.
processor
().
split
())
print
(
"== %s %s on '%s' =="
%
(
platform
.
machine
(),
platform
.
system
(),
cpu
,
))
print
()
if
options
.
throughput
:
print
(
"--- Throughput ---"
)
print
()
run_throughput_tests
(
options
.
nthreads
)
if
options
.
latency
:
print
(
"--- Latency ---"
)
print
()
run_latency_tests
(
options
.
nthreads
)
if
__name__
==
"__main__"
:
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment