Commit 93aeff70 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Factorize test runner code with BenchmarkRunner class.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2444 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 239fc596
import sys
import email
import smtplib
import optparse
import platform
import datetime
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from neo.tests.functional import NEOCluster
MAIL_SERVER = '127.0.0.1:25'
class AttributeDict(dict):
def __getattr__(self, item):
return self.__getitem__(item)
class BenchmarkRunner(object):
"""
Base class for a command-line benchmark test runner.
"""
def __init__(self):
self._successful = True
self._status = []
parser = optparse.OptionParser()
# register common options
parser.add_option('', '--title')
parser.add_option('-v', '--verbose', action='store_true')
parser.add_option('', '--mail-to', action='append')
parser.add_option('', '--mail-from')
parser.add_option('', '--mail-server')
self.add_options(parser)
# check common arguments
options, self._args = parser.parse_args()
if bool(options.mail_to) ^ bool(options.mail_from):
sys.exit('Need a sender and recipients to mail report')
mail_server = options.mail_server or MAIL_SERVER
# check specifics arguments
self._config = AttributeDict()
self._config.update(self.load_options(options, self._args))
self._config.update(dict(
title = options.title or self.__class__.__name__,
verbose = options.verbose,
mail_from = options.mail_from,
mail_to = options.mail_to,
mail_server = mail_server.split(':'),
))
def add_status(self, key, value):
self._status.append((key, value))
def build_report(self, content):
fmt = "%-20s : %s"
status = "\n".join([fmt % item for item in [
('Title', self._config.title),
('Date', datetime.date.today().isoformat()),
('Node', platform.node()),
('Machine', platform.machine()),
('System', platform.system()),
('Python', platform.python_version()),
]])
status += '\n\n'
status += "\n".join([fmt % item for item in self._status])
return "%s\n\n%s" % (status, content)
def send_report(self, subject, report):
# build report
# build email
msg = MIMEMultipart()
msg['Subject'] = '%s: %s' % (self._config.title, subject)
msg['From'] = self._config.mail_from
msg['To'] = ', '.join(self._config.mail_to)
msg['X-ERP5-Tests'] = 'NEO'
if self._successful:
msg['X-ERP5-Tests-Status'] = 'OK'
msg.epilogue = ''
msg.attach(MIMEText(report))
# send it
s = smtplib.SMTP()
s.connect(*self._config.mail_server)
mail = msg.as_string()
for recipient in self._config.mail_to:
try:
s.sendmail(self._config.mail_from, recipient, mail)
except smtplib.SMTPRecipientsRefused:
print "Mail for %s fails" % recipient
s.close()
def run(self):
subject, report = self.start()
report = self.build_report(report)
if self._config.mail_to:
self.send_report(subject, report)
print subject
print
print report
def was_successful(self):
return self._successful
def add_options(self, parser):
""" Append options to command line parser """
raise NotImplementedError
def load_options(self, options, args):
""" Check options and return a configuration dict """
raise NotImplementedError
def start(self):
""" Run the test """
raise NotImplementedError
...@@ -3,158 +3,128 @@ ...@@ -3,158 +3,128 @@
import sys import sys
import os import os
import math import math
import optparse
import traceback import traceback
from time import time from time import time
from neo.tests.benchmark import BenchmarkRunner
from neo.tests.functional import NEOCluster from neo.tests.functional import NEOCluster
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
def run(masters, storages, replicas, partitions, datafs, verbose): MIN_STORAGES = 1
print "Import of %s with m=%s, s=%s, r=%s, p=%s" % ( MAX_STORAGES = 2
datafs, masters, storages, replicas, partitions) MIN_REPLICAS = 0
# cluster MAX_REPLICAS = 1
neo = NEOCluster(
db_list=['test_import_%d' % i for i in xrange(storages)], class MatrixImportBenchmark(BenchmarkRunner):
clear_databases=True,
partitions=partitions, def add_options(self, parser):
replicas=replicas, parser.add_option('-d', '--datafs')
master_node_count=masters, parser.add_option('', '--min-storages')
verbose=verbose, parser.add_option('', '--max-storages')
) parser.add_option('', '--min-replicas')
# import parser.add_option('', '--max-replicas')
neo_storage = neo.getZODBStorage()
dfs_storage = FileStorage(file_name=datafs) def load_options(self, options, args):
neo.start() if not options.datafs or not os.path.exists(options.datafs):
start = time() sys.exit('Missing or wrong data.fs argument')
try: return dict(
datafs = options.datafs,
min_s = int(options.min_storages or MIN_STORAGES),
max_s = int(options.max_storages or MAX_STORAGES),
min_r = int(options.min_replicas or MIN_REPLICAS),
max_r = int(options.max_replicas or MAX_REPLICAS),
)
def start(self):
# build storage (logarithm) & replicas (linear) lists
min_s, max_s = self._config.min_s, self._config.max_s
min_r, max_r = self._config.min_r, self._config.max_r
min_s2 = int(math.log(min_s, 2))
max_s2 = int(math.log(max_s, 2))
storages = [2 ** x for x in range(min_s2, max_s2 + 1)]
if storages[0] < min_s:
storages[0] = min_s
if storages[-1] < max_s:
storages.append(max_s)
replicas = range(min_r, max_r + 1)
results = self.runMatrix(storages, replicas)
return self.buildReport(storages, replicas, results)
def runMatrix(self, storages, replicas):
stats = {}
size = float(os.path.getsize(self._config.datafs))
for s in storages:
for r in [r for r in replicas if r < s]:
stats.setdefault(s, {})
result = self.runImport(1, s, r, 100)
if result is not None:
result = size / result / 1024
stats[s][r] = result
return stats
def runImport(self, masters, storages, replicas, partitions):
print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
self._config.datafs, masters, storages, replicas, partitions)
# cluster
neo = NEOCluster(
db_list=['neot_matrix_%d' % i for i in xrange(storages)],
clear_databases=True,
partitions=partitions,
replicas=replicas,
master_node_count=masters,
verbose=self._config.verbose,
)
# import
neo_storage = neo.getZODBStorage()
dfs_storage = FileStorage(file_name=self._config.datafs)
neo.start()
start = time()
try: try:
neo_storage.copyTransactionsFrom(dfs_storage) try:
return time() - start neo_storage.copyTransactionsFrom(dfs_storage)
except: return time() - start
traceback.print_exc() except:
return None traceback.print_exc()
finally: return None
neo.stop() finally:
neo.stop()
def runMatrix(datafs, storages, replicas, verbose):
stats = {} def buildReport(self, storages, replicas, results):
size = float(os.path.getsize(datafs)) config = self._config
for s in storages: self.add_status('Min storages', config.min_s)
for r in [r for r in replicas if r < s]: self.add_status('Max storages', config.max_s)
stats.setdefault(s, {}) self.add_status('Min replicas', config.min_r)
result = run(1, s, r, 100, datafs, verbose) self.add_status('Max replicas', config.max_r)
if result is not None: # draw an array with results
result = size / result / 1024 fmt = '|' + '|'.join([' %8s '] * (len(replicas) + 1)) + '|\n'
stats[s][r] = result sep = '+' + '+'.join(['-' * 12] * (len(replicas) + 1)) + '+\n'
return stats report = sep
report += fmt % tuple(['S\R'] + range(0, len(replicas)))
def buildReport(storages, replicas, results):
# draw an array with results
fmt = '|' + '|'.join([' %8s '] * (len(replicas) + 1)) + '|\n'
sep = '+' + '+'.join(['-' * 12] * (len(replicas) + 1)) + '+\n'
report = sep
report += fmt % tuple(['S\R'] + range(0, len(replicas)))
report += sep
failures = 0
speedlist = []
for s in storages:
values = []
assert s in results
for r in replicas:
if r in results[s]:
if results[s][r] is None:
values.append('FAIL')
failures += 1
else:
values.append('%8.1f' % results[s][r])
speedlist.append(results[s][r])
else:
values.append('N/A')
report += fmt % (tuple([s] + values))
report += sep report += sep
if failures: failures = 0
info = '%d failures' % (failures, ) speedlist = []
else: for s in storages:
info = '%.1f KB/s' % (sum(speedlist) / len(speedlist)) values = []
summary = 'Matrix : %s ' % (info, ) assert s in results
return (summary, report) for r in replicas:
if r in results[s]:
def sendReport(sender, recipients, server, summary, report): if results[s][r] is None:
""" Send a mail with the report summary """ values.append('FAIL')
# XXX: C/C from perfs bench failures += 1
import smtplib else:
from email.MIMEMultipart import MIMEMultipart values.append('%8.1f' % results[s][r])
from email.MIMEText import MIMEText speedlist.append(results[s][r])
else:
# build the email values.append('N/A')
msg = MIMEMultipart() report += fmt % (tuple([s] + values))
msg['Subject'] = summary report += sep
msg['From'] = sender if failures:
msg['To'] = ', '.join(recipients) info = '%d failures' % (failures, )
msg.epilogue = '' else:
msg.attach(MIMEText(report)) info = '%.1f KB/s' % (sum(speedlist) / len(speedlist))
summary = 'Matrix : %s ' % (info, )
# Send via smtp server return (summary, report)
s = smtplib.SMTP()
s.connect(*server)
mail = msg.as_string()
for recipient in recipients:
try:
s.sendmail(sender, recipient, mail)
except smtplib.SMTPRecipientsRefused:
print "Mail for %s fails" % recipient
s.close()
if __name__ == "__main__": if __name__ == "__main__":
MatrixImportBenchmark().run()
# options
parser = optparse.OptionParser()
parser.add_option('-d', '--datafs')
parser.add_option('', '--min-storages')
parser.add_option('', '--max-storages')
parser.add_option('', '--min-replicas')
parser.add_option('', '--max-replicas')
parser.add_option('', '--recipient', action='append')
parser.add_option('', '--sender')
parser.add_option('', '--server')
parser.add_option('-v', '--verbose', action='store_true')
(options, args) = parser.parse_args()
# check arguments
if not options.datafs or not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
if bool(options.sender) ^ bool(options.recipient):
sys.exit('Need a sender and recipients to mail report')
# parse args
min_s = int(options.min_storages or 1)
max_s = int(options.max_storages or 2)
min_r = int(options.min_replicas or 0)
max_r = int(options.max_replicas or 1)
datafs = options.datafs
mail_server = options.server or '127.0.0.1:25'
mail_server = mail_server.split(':')
sender = options.sender
recipient = options.recipient
verbose = options.verbose or False
# build storage (logarithm) & replicas (linear) lists
min_s2 = int(math.log(min_s, 2))
max_s2 = int(math.log(max_s, 2))
storages = [2 ** x for x in range(min_s2, max_s2 + 1)]
if storages[0] < min_s:
storages[0] = min_s
if storages[-1] < max_s:
storages.append(max_s)
replicas = range(min_r, max_r + 1)
results = runMatrix(datafs, storages, replicas, verbose)
summary, report = buildReport(storages, replicas, results)
print summary
print
print report
if options.sender:
sendReport(sender, recipient, mail_server, summary, report)
...@@ -2,179 +2,127 @@ ...@@ -2,179 +2,127 @@
import os import os
import sys import sys
import optparse
import platform import platform
import datetime import datetime
from time import time from time import time
from ZODB.FileStorage import FileStorage
from neo.tests.benchmark import BenchmarkRunner
from neo.tests.functional import NEOCluster from neo.tests.functional import NEOCluster
from neo.client.Storage import Storage
from ZODB.FileStorage import FileStorage
from neo.profiling import PROFILING_ENABLED, profiler_decorator, \ from neo.profiling import PROFILING_ENABLED, profiler_decorator, \
profiler_report profiler_report
def runImport(neo, datafs): class ImportBenchmark(BenchmarkRunner):
""" Test import of a datafs """
def counter(wrapped, d):
@profiler_decorator def add_options(self, parser):
def wrapper(*args, **kw): parser.add_option('-d', '--datafs')
# count number of tick per second parser.add_option('-m', '--masters')
t = int(time()) parser.add_option('-s', '--storages')
d.setdefault(t, 0) parser.add_option('-p', '--partitions')
d[t] += 1 parser.add_option('-r', '--replicas')
# call original method
wrapped(*args, **kw) def load_options(self, options, args):
return wrapper if not options.datafs or not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
# open storages clients return dict(
neo_storage = neo.getZODBStorage() datafs = options.datafs,
dfs_storage = FileStorage(file_name=datafs) masters = int(options.masters or 1),
dfs_size = os.path.getsize(datafs) storages = int(options.storages or 1),
partitions = int(options.partitions or 10),
# monkey patch storage replicas = int(options.replicas or 0),
txn_dict, obj_dict = {}, {} )
neo_storage.app.tpc_begin = counter(neo_storage.app.tpc_begin, txn_dict)
neo_storage.app.store = counter(neo_storage.app.store, obj_dict) def start(self):
config = self._config
# run import # start neo
start = time() neo = NEOCluster(
stats = neo_storage.copyTransactionsFrom(dfs_storage) db_list=['neot_perfs_%d' % i for i in xrange(config.storages)],
elapsed = time() - start clear_databases=True,
partitions=config.partitions,
# return stats replicas=config.replicas,
stats = { master_node_count=config.masters,
'Transactions': txn_dict.values(), verbose=False,
'Objects': obj_dict.values(), )
}
return (dfs_size, elapsed, stats) # import datafs
neo.start()
def buildReport(config, dfs_size, elapsed, stats):
""" build a report for the given import data """
pat = '%19s | %8s | %5s | %5s | %5s \n'
sep = '%19s+%8s+%5s+%5s+%5s\n'
sep %= ('-' * 20, '-' * 10) + ('-' * 7, ) * 3
dfs_size /= 1024
size = dfs_size / 1024
speed = dfs_size / elapsed
# system
report = ' ' * 20 + ' NEO PERF REPORT\n\n'
report += "\tDate : %s\n" % datetime.date.today().isoformat()
report += "\tNode : %s\n" % platform.node()
report += "\tProcessor : %s (%s)\n" % (platform.processor(),
platform.architecture()[0])
report += "\tSystem : %s (%s)\n" % (platform.system(),
platform.release())
report += '\n'
# configuration
report += "\tMasters : %s\n" % (config['masters'], )
report += "\tStorages : %s\n" % (config['storages'], )
report += "\tReplicas : %s\n" % (config['replicas'], )
report += "\tPartitions : %s\n" % (config['partitions'], )
report += '\n'
# results
report += '\n%19s: %6.1f MB' % ('Input size', size)
report += '\n%19s: %6d sec' % ('Import duration', elapsed)
report += '\n%19s: %6.1f KB/s\n' % ('Average speed', speed)
report += '\n\n'
# stats on objects and transactions
report += pat % ('', ' num ', 'min/s', 'avg/s', 'max/s')
for k, v in stats.items():
report += sep
s = sum(v)
report += pat % (k, s, min(v), s / len(v), max(v))
report += sep
# build summary
summary = 'Neo : %6.1f KB/s (%6.1f MB)' % (speed, size)
return (summary, report)
def sendReport(sender, recipients, server, summary, report):
""" Send a mail with the report summary """
import smtplib
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
# build the email
msg = MIMEMultipart()
msg['Subject'] = summary
msg['From'] = sender
msg['To'] = ', '.join(recipients)
msg.epilogue = ''
msg.attach(MIMEText(report))
# Send via smtp server
s = smtplib.SMTP()
s.connect(*server)
mail = msg.as_string()
for recipient in recipients:
try: try:
s.sendmail(sender, recipient, mail) return self.buildReport(*self.runImport(neo))
except smtplib.SMTPRecipientsRefused: finally:
print "Mail for %s fails" % recipient neo.stop()
s.close()
def runImport(self, neo):
if __name__ == "__main__":
def counter(wrapped, d):
@profiler_decorator
def wrapper(*args, **kw):
# count number of tick per second
t = int(time())
d.setdefault(t, 0)
d[t] += 1
# call original method
wrapped(*args, **kw)
return wrapper
# open storages clients
datafs = self._config.datafs
neo_storage = neo.getZODBStorage()
dfs_storage = FileStorage(file_name=datafs)
dfs_size = os.path.getsize(datafs)
# monkey patch storage
txn_dict, obj_dict = {}, {}
neo_storage.app.tpc_begin = counter(neo_storage.app.tpc_begin, txn_dict)
neo_storage.app.store = counter(neo_storage.app.store, obj_dict)
# run import
start = time()
stats = neo_storage.copyTransactionsFrom(dfs_storage)
elapsed = time() - start
# return stats
stats = {
'Transactions': txn_dict.values(),
'Objects': obj_dict.values(),
}
return (dfs_size, elapsed, stats)
def buildReport(self, dfs_size, elapsed, stats):
""" build a report for the given import data """
config = self._config
dfs_size /= 1024
size = dfs_size / 1024
speed = dfs_size / elapsed
# configuration
self.add_status('Masters', config.masters)
self.add_status('Storages', config.storages)
self.add_status('Replicas', config.replicas)
self.add_status('Partitions', config.partitions)
# results
self.add_status('Input size', '%-.1f MB' % size)
self.add_status('Import duration', '%-d secs' % elapsed)
self.add_status('Average speed', '%-.1f KB/s' % speed)
# stats on objects and transactions
pat = '%19s | %8s | %5s | %5s | %5s \n'
sep = '%19s+%8s+%5s+%5s+%5s\n'
sep %= ('-' * 20, '-' * 10) + ('-' * 7, ) * 3
report = pat % ('', ' num ', 'min/s', 'avg/s', 'max/s')
for k, v in stats.items():
report += sep
s = sum(v)
report += pat % (k, s, min(v), s / len(v), max(v))
report += sep
# handle command line options # build summary
parser = optparse.OptionParser() summary = 'Perf : %.1f KB/s (%.1f MB)' % (speed, size)
parser.add_option('-d', '--datafs') return (summary, report)
parser.add_option('-m', '--master-count')
parser.add_option('-s', '--storage-count')
parser.add_option('-p', '--partition-count')
parser.add_option('-r', '--replica-count')
parser.add_option('', '--recipient', action='append')
parser.add_option('', '--sender')
parser.add_option('', '--server')
(options, args) = parser.parse_args()
# check arguments
if not options.datafs or not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
if bool(options.sender) ^ bool(options.recipient):
sys.exit('Need a sender and recipients to mail report')
# load options or defaults
config = dict(
masters = int(options.master_count or 1),
storages = int(options.storage_count or 1),
partitions = int(options.partition_count or 10),
replicas = int(options.replica_count or 0),
)
datafs = options.datafs
mail_server = options.server or '127.0.0.1:25'
mail_server = mail_server.split(':')
sender = options.sender
recipient = options.recipient
# start neo
neo = NEOCluster(
db_list=['test_import_%d' % i for i in xrange(config['storages'])],
clear_databases=True,
partitions=config['partitions'],
replicas=config['replicas'],
master_node_count=config['masters'],
verbose=False,
)
# import datafs
neo.start()
summary, report = buildReport(config, *runImport(neo, datafs))
neo.stop()
if __name__ == "__main__":
ImportBenchmark().run()
if PROFILING_ENABLED: if PROFILING_ENABLED:
print profiler_report() print profiler_report()
# display and/or send the report
print summary
print report
if options.sender:
sendReport(sender, recipient, mail_server, summary, report)
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import traceback import traceback
import optparse
import unittest import unittest
import tempfile import tempfile
import logging import logging
...@@ -26,6 +25,8 @@ import sys ...@@ -26,6 +25,8 @@ import sys
import neo import neo
import os import os
from neo.tests.benchmark import BenchmarkRunner
# list of test modules # list of test modules
# each of them have to import its TestCase classes # each of them have to import its TestCase classes
UNIT_TEST_MODULES = [ UNIT_TEST_MODULES = [
...@@ -153,7 +154,7 @@ class NeoTestRunner(unittest.TestResult): ...@@ -153,7 +154,7 @@ class NeoTestRunner(unittest.TestResult):
def startTest(self, test): def startTest(self, test):
unittest.TestResult.startTest(self, test) unittest.TestResult.startTest(self, test)
logging.info(" * TEST %s" % test) logging.info(" * TEST %s", test)
stats = self._getModuleStats(test) stats = self._getModuleStats(test)
stats.run += 1 stats.run += 1
self.lastStart = time.time() self.lastStart = time.time()
...@@ -179,42 +180,18 @@ class NeoTestRunner(unittest.TestResult): ...@@ -179,42 +180,18 @@ class NeoTestRunner(unittest.TestResult):
stats.failures += 1 stats.failures += 1
self._updateTimer(stats) self._updateTimer(stats)
def _buildSystemInfo(self): def _buildSummary(self, add_status):
import platform
import datetime
success = self.testsRun - len(self.errors) - len(self.failures) success = self.testsRun - len(self.errors) - len(self.failures)
s = """ add_status('Directory', self.temp_directory)
Title : %s add_status('Status', '%.3f%%' % (success * 100.0 / self.testsRun))
Date : %s # visual
Node : %s
Machine : %s
System : %s (%s)
Python : %s
Directory : %s
Status : %7.3f%%
""" % (
self._title,
datetime.date.today().isoformat(),
platform.node(),
platform.machine(),
platform.system(),
platform.release(),
platform.python_version(),
self.temp_directory,
success * 100.0 / self.testsRun,
)
return s
def _buildSummary(self):
# visual
header = "%25s | run | success | errors | fails | time \n" % 'Test Module' header = "%25s | run | success | errors | fails | time \n" % 'Test Module'
separator = "%25s-+---------+---------+---------+---------+----------\n" % ('-' * 25) separator = "%25s-+---------+---------+---------+---------+----------\n" % ('-' * 25)
format = "%25s | %3s | %3s | %3s | %3s | %6.2fs \n" format = "%25s | %3s | %3s | %3s | %3s | %6.2fs \n"
group_f = "%25s | | | | | \n" group_f = "%25s | | | | | \n"
# header # header
s = ' ' * 30 + ' NEO TESTS REPORT' s = ' ' * 30 + ' NEO TESTS REPORT'
s += '\n\n' s += '\n'
s += self._buildSystemInfo()
s += '\n' + header + separator s += '\n' + header + separator
group = None group = None
t_success = 0 t_success = 0
...@@ -243,7 +220,7 @@ class NeoTestRunner(unittest.TestResult): ...@@ -243,7 +220,7 @@ class NeoTestRunner(unittest.TestResult):
return s return s
def _buildErrors(self): def _buildErrors(self):
s = '\n' s = ''
test_formatter = lambda t: t.id() test_formatter = lambda t: t.id()
if len(self.errors): if len(self.errors):
s += '\nERRORS:\n' s += '\nERRORS:\n'
...@@ -272,98 +249,53 @@ class NeoTestRunner(unittest.TestResult): ...@@ -272,98 +249,53 @@ class NeoTestRunner(unittest.TestResult):
s += '\n' s += '\n'
return s return s
def build(self): def buildReport(self, add_status):
self.time = sum([s.time for s in self.modulesStats.values()]) self.time = sum([s.time for s in self.modulesStats.values()])
self.subject = "%s: %s Tests, %s Errors, %s Failures" % (self._title, self.subject = "%s: %s Tests, %s Errors, %s Failures" % (self._title,
self.testsRun, len(self.errors), len(self.failures)) self.testsRun, len(self.errors), len(self.failures))
self._summary = self._buildSummary() summary = self._buildSummary(add_status)
self._errors = self._buildErrors() errors = self._buildErrors()
self._warnings = self._buildWarnings() warnings = self._buildWarnings()
report = '\n'.join([summary, errors, warnings])
def sendReport(self, smtp_server, sender, recipients): return (self.subject, report)
""" Send a mail with the report summary """
class TestRunner(BenchmarkRunner):
import smtplib
from email.MIMEMultipart import MIMEMultipart def add_options(self, parser):
from email.MIMEText import MIMEText parser.add_option('-f', '--functional', action='store_true')
parser.add_option('-u', '--unit', action='store_true')
# build the email parser.add_option('-z', '--zodb', action='store_true')
msg = MIMEMultipart()
msg['Subject'] = self.subject def load_options(self, options, args):
msg['From'] = sender if not (options.unit or options.functional or options.zodb or args):
msg['To'] = ', '.join(recipients) sys.exit('Nothing to run, please give one of -f, -u, -z')
#msg.preamble = self.subject return dict(
msg.epilogue = '' unit = options.unit,
functional = options.functional,
# Add custom headers for client side filtering zodb = options.zodb,
msg['X-ERP5-Tests'] = 'NEO' )
if self.wasSuccessful():
msg['X-ERP5-Tests-Status'] = 'OK'
# write the body
body = MIMEText(self._summary + self._warnings + self._errors)
msg.attach(body)
# attach the log file
if ATTACH_LOG:
log = MIMEText(file(LOG_FILE, 'r').read())
log.add_header('Content-Disposition', 'attachment', filename=LOG_FILE)
msg.attach(log)
# Send the email via a smtp server def start(self):
s = smtplib.SMTP() config = self._config
s.connect(*mail_server) # run requested tests
mail = msg.as_string() runner = NeoTestRunner(title=config.title or 'Neo')
for recipient in recipients: try:
try: if config.unit:
s.sendmail(sender, recipient, mail) runner.run('Unit tests', UNIT_TEST_MODULES)
except smtplib.SMTPRecipientsRefused, e: if config.functional:
print "Mail for %s fails : %s" % (recipient, e) runner.run('Functional tests', FUNC_TEST_MODULES)
s.close() if config.zodb:
runner.run('ZODB tests', ZODB_TEST_MODULES)
except KeyboardInterrupt:
config['mail_to'] = None
traceback.print_exc()
# build report
self._successful = runner.wasSuccessful()
return runner.buildReport(self.add_status)
if __name__ == "__main__": if __name__ == "__main__":
runner = TestRunner()
# handle command line options runner.run()
parser = optparse.OptionParser() if not runner.was_successful():
parser.add_option('-f', '--functional', action='store_true')
parser.add_option('-u', '--unit', action='store_true')
parser.add_option('-z', '--zodb', action='store_true')
parser.add_option('', '--recipient', action='append')
parser.add_option('', '--sender')
parser.add_option('', '--server')
parser.add_option('', '--title')
(options, args) = parser.parse_args()
# check arguments
if bool(options.sender) ^ bool(options.recipient):
sys.exit('Need a sender and recipients to mail report')
if not (options.unit or options.functional or options.zodb or args):
sys.exit('Nothing to run, please give one of -f, -u, -z')
mail_server = options.server or '127.0.0.1:25'
mail_server = mail_server.split(':')
# run requested tests
runner = NeoTestRunner(title=options.title or 'Neo')
try:
if options.unit:
runner.run('Unit tests', UNIT_TEST_MODULES)
if options.functional:
runner.run('Functional tests', FUNC_TEST_MODULES)
if options.zodb:
runner.run('ZODB tests', ZODB_TEST_MODULES)
except KeyboardInterrupt:
traceback.print_exc()
options.sender = False
# build report
runner.build()
print runner._errors
print runner._warnings
print runner._summary
# send a mail
if options.sender:
runner.sendReport(mail_server, options.sender, options.recipient)
if not runner.wasSuccessful():
sys.exit(1) sys.exit(1)
sys.exit(0) sys.exit(0)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment