demo_zbigarray.py 4.52 KB
Newer Older
1 2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
3
# Copyright (C) 2014-2019  Nexedi SA and Contributors.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
#                          Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
21 22
"""Demo program that generates and computes on ZBigArray bigger than RAM

23 24
This program demonstrates how it is possible to work with arrays bigger than
RAM. There are 2 execution modes:
25

26 27 28 29 30 31 32
    gen:    generate signal
    read:   read generated signal and compute its mean/var/sum

Generation mode writes signal parts in limited-by-size blocks, because the
amount of changes in one transaction should be less than available RAM.

Read mode creates ndarray view for the whole array and process it in full.
33 34 35 36 37
"""

from __future__ import print_function

from wendelin.bigarray.array_zodb import ZBigArray
38
from wendelin.lib.zodb import dbopen, dbclose
39
import transaction
40
from golang import defer, func
41 42 43 44

from numpy import float64, dtype, cumsum, sin
import psutil
import sys
45
import getopt
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

KB = 1024
MB = 1024*KB
GB = 1024*MB


# read signal and compute its average/var/sum
# signalv - BigArray with signal
def read(signalv):
    print('sig: %s %s (= %.2fGB)' % \
            ('x'.join('%s' % _ for _ in signalv.shape),
             signalv.dtype, float(signalv.nbytes) / GB))
    a = signalv[:]   # BigArray -> ndarray
    print('<sig>:\t%s' % a.mean())
    #print('δ(sig):\t%s' % a.var())   # XXX wants to produce temps (var = S (a - <a>)^2
    print('S(sig):\t%s' % a.sum())


# generate signal S(t) = M⋅sin(f⋅t)
f = 0.2
M = 15
def gen(signalv):
    print('gen signal t=0...%.2e  %s  (= %.2fGB) ' %    \
                (len(signalv), signalv.dtype, float(signalv.nbytes) / GB))
    a = signalv[:]  # BigArray -> ndarray
    blocksize = 32*MB//a.itemsize   # will write out signal in such blocks
    for t0 in xrange(0, len(a), blocksize):
        ablk = a[t0:t0+blocksize]

        ablk[:] = 1             # at = 1
        cumsum(ablk, out=ablk)  # at = t-t0+1
        ablk += (t0-1)          # at = t
        ablk *= f               # at = f⋅t
        sin(ablk, out=ablk)     # at = sin(f⋅t)
        ablk *= M               # at = M⋅sin(f⋅t)

        note = 'gen signal blk [%s:%s]  (%.1f%%)' % (t0, t0+len(ablk), 100. * (t0+len(ablk)) / len(a))
        txn = transaction.get()
        txn.note(note)
        txn.commit()
        print(note)



def usage():
91 92 93 94 95 96 97
    print(
"""Usage: %s [options] (gen|read) <dburi>

options:

    --worksize=<n>      generate array of size n*MB (default 2*RAM)
""" % sys.argv[0], file=sys.stderr)
98 99
    sys.exit(1)

100
@func
101
def main():
102 103 104 105 106 107 108 109 110
    worksize = None
    optv, argv = getopt.getopt(sys.argv[1:], '', ['worksize='])
    for opt, v in optv:
        if opt == '--worksize':
            worksize = int(v) * MB

        else:
            usage()

111
    try:
112 113
        act = argv[0]
        dburi = argv[1]
114 115 116 117 118 119 120
    except IndexError:
        usage()

    if act not in ('gen', 'read'):
        usage()

    ram_nbytes = psutil.virtual_memory().total
121
    print('I: RAM:  %.2fGB' % (float(ram_nbytes) / GB))
122

123
    root = dbopen(dburi)
124
    defer(lambda: dbclose(root))
125 126

    if act == 'gen':
127 128 129
        if worksize is None:
            worksize = 2*ram_nbytes
        print('I: WORK: %.2fGB' % (float(worksize) / GB))
130
        sig_dtype = dtype(float64)
131
        sig_len   = worksize // sig_dtype.itemsize
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        sig = ZBigArray((sig_len,), sig_dtype)
        root['signalv'] = sig

        # ZBigArray requirement: before we can compute it (with subobject
        # .zfile) have to be made explicitly known to connection or current
        # transaction committed
        transaction.commit()

        gen(sig)

    elif act == 'read':
        read(root['signalv'])

    import os
    p = psutil.Process(os.getpid())
    m = p.memory_info()
    print('VIRT: %i MB\tRSS: %iMB' % (m.vms//MB, m.rss//MB))


if __name__ == '__main__':
    main()