matrix 5.69 KB
Newer Older
1
#!/usr/bin/env python
2 3 4 5

import sys
import os
import math
6
import traceback
7 8
from time import time

9
from neo.tests import DB_PREFIX
10
from neo.tests.benchmark import BenchmarkRunner
11 12
from ZODB.FileStorage import FileStorage

13 14
class MatrixImportBenchmark(BenchmarkRunner):

15
    error_log = ''
16
    _size = None
17

18 19
    def add_options(self, parser):
        parser.add_option('-d', '--datafs')
Julien Muchembled's avatar
Julien Muchembled committed
20 21 22 23
        parser.add_option('', '--min-storages', type='int', default=1)
        parser.add_option('', '--max-storages', type='int', default=2)
        parser.add_option('', '--min-replicas', type='int', default=0)
        parser.add_option('', '--max-replicas', type='int', default=1)
24
        parser.add_option('', '--threaded', action="store_true")
25 26

    def load_options(self, options, args):
27
        if options.datafs and not os.path.exists(options.datafs):
28 29 30
            sys.exit('Missing or wrong data.fs argument')
        return dict(
            datafs = options.datafs,
Julien Muchembled's avatar
Julien Muchembled committed
31 32 33 34
            min_s = options.min_storages,
            max_s = options.max_storages,
            min_r = options.min_replicas,
            max_r = options.max_replicas,
35
            threaded = options.threaded,
36 37 38 39 40 41 42 43 44 45 46 47 48 49
        )

    def start(self):
        # build storage (logarithm) & replicas (linear) lists
        min_s, max_s = self._config.min_s, self._config.max_s
        min_r, max_r = self._config.min_r, self._config.max_r
        min_s2 = int(math.log(min_s, 2))
        max_s2 = int(math.log(max_s, 2))
        storages = [2 ** x for x in range(min_s2, max_s2 + 1)]
        if storages[0] < min_s:
            storages[0] = min_s
        if storages[-1] < max_s:
            storages.append(max_s)
        replicas = range(min_r, max_r + 1)
50 51
        result_list = [self.runMatrix(storages, replicas)
                       for x in xrange(self._config.repeat)]
52 53 54 55 56 57 58
        results = {}
        for s in storages:
            results[s] = z = {}
            for r in replicas:
                if r < s:
                    x = [x[s][r] for x in result_list if x[s][r] is not None]
                    if x:
59
                        z[r] = min(x)
60 61
                    else:
                        z[r] = None
62 63 64 65 66
        return self.buildReport(storages, replicas, results)

    def runMatrix(self, storages, replicas):
        stats = {}
        for s in storages:
67 68 69 70
            stats[s] = z = {}
            for r in replicas:
                if r < s:
                    z[r] = self.runImport(1, s, r, 100)
71 72 73
        return stats

    def runImport(self, masters, storages, replicas, partitions):
74 75 76 77 78 79 80
        datafs = self._config.datafs
        if datafs:
            dfs_storage = FileStorage(file_name=self._config.datafs)
        else:
            datafs = 'PROD1'
            import random, neo.tests.stat_zodb
            dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
81
                random.Random(0)).as_storage(5000)
82
        print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
83
                datafs, masters, storages, replicas, partitions)
84 85 86 87 88
        if self._config.threaded:
            from neo.tests.threaded import NEOCluster
        else:
            from neo.tests.functional import NEOCluster
        neo = NEOCluster(
89
            db_list=['%s_matrix_%u' % (DB_PREFIX, i) for i in xrange(storages)],
90
            clear_databases=True,
91
            master_count=masters,
92 93 94 95
            partitions=partitions,
            replicas=replicas,
            verbose=self._config.verbose,
        )
96
        try:
Julien Muchembled's avatar
Julien Muchembled committed
97
            neo.start()
98
            try:
Julien Muchembled's avatar
Julien Muchembled committed
99 100 101 102 103
                neo_storage = neo.getZODBStorage()
                if not self._config.threaded:
                    assert len(neo.getStorageList()) == storages
                    neo.expectOudatedCells(number=0)
                start = time()
104
                neo_storage.copyTransactionsFrom(dfs_storage)
105
                end = time()
106 107 108 109 110 111
                size = dfs_storage.getSize()
                if self._size is None:
                    self._size = size
                else:
                    assert self._size == size
                return end - start
Julien Muchembled's avatar
Julien Muchembled committed
112 113 114 115 116 117 118
            finally:
                neo.stop()
        except:
            traceback.print_exc()
            self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (
                masters, storages, replicas, partitions)
            self.error_log += "\n%s\n" % ''.join(traceback.format_exc())
119 120 121

    def buildReport(self, storages, replicas, results):
        # draw an array with results
122 123 124
        dfs_size = self._size
        self.add_status('Input size',
            dfs_size and '%-.1f MB' % (dfs_size / 1e6) or 'N/A')
125 126 127
        fmt = '|' + '|'.join(['  %8s  '] * (len(replicas) + 1)) + '|\n'
        sep = '+' + '+'.join(['-' * 12] * (len(replicas) + 1)) + '+\n'
        report = sep
Julien Muchembled's avatar
Julien Muchembled committed
128
        report += fmt % tuple(['S\R'] + replicas)
129
        report += sep
130 131 132 133 134 135 136
        failures = 0
        speedlist = []
        for s in storages:
            values = []
            assert s in results
            for r in replicas:
                if r in results[s]:
137 138
                    result = results[s][r]
                    if result is None:
139 140 141
                        values.append('FAIL')
                        failures += 1
                    else:
142 143 144
                        result = dfs_size / (result * 1e3)
                        values.append('%8.1f' % result)
                        speedlist.append(result)
145 146 147 148
                else:
                    values.append('N/A')
            report += fmt % (tuple([s] + values))
            report += sep
149
        report += self.error_log
150
        if failures:
151 152 153 154
            info = '%d failures' % (failures, )
        else:
            info = '%.1f KB/s' % (sum(speedlist) / len(speedlist))
        return info, report
155

156
def main(args=None):
157
    MatrixImportBenchmark().run()
158

159 160 161
if __name__ == "__main__":
    main()