demo_zbigarray.py 4.56 KB
Newer Older
1 2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
3
# Copyright (C) 2014-2022  Nexedi SA and Contributors.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
#                          Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
21 22
"""Demo program that generates and computes on ZBigArray bigger than RAM

23 24
This program demonstrates how it is possible to work with arrays bigger than
RAM. There are 2 execution modes:
25

26 27 28 29 30 31 32
    gen:    generate signal
    read:   read generated signal and compute its mean/var/sum

Generation mode writes signal parts in limited-by-size blocks, because the
amount of changes in one transaction should be less than available RAM.

Read mode creates ndarray view for the whole array and process it in full.
33 34 35 36 37
"""

from __future__ import print_function

from wendelin.bigarray.array_zodb import ZBigArray
38
from wendelin.lib.zodb import dbopen, dbclose
39
import transaction
40
from golang import defer, func
41 42 43 44

from numpy import float64, dtype, cumsum, sin
import psutil
import sys
45
import getopt
46

47 48
from six.moves import range as xrange

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
KB = 1024
MB = 1024*KB
GB = 1024*MB


# read signal and compute its average/var/sum
# signalv - BigArray with signal
def read(signalv):
    print('sig: %s %s (= %.2fGB)' % \
            ('x'.join('%s' % _ for _ in signalv.shape),
             signalv.dtype, float(signalv.nbytes) / GB))
    a = signalv[:]   # BigArray -> ndarray
    print('<sig>:\t%s' % a.mean())
    #print('δ(sig):\t%s' % a.var())   # XXX wants to produce temps (var = S (a - <a>)^2
    print('S(sig):\t%s' % a.sum())


# generate signal S(t) = M⋅sin(f⋅t)
f = 0.2
M = 15
def gen(signalv):
    print('gen signal t=0...%.2e  %s  (= %.2fGB) ' %    \
                (len(signalv), signalv.dtype, float(signalv.nbytes) / GB))
    a = signalv[:]  # BigArray -> ndarray
    blocksize = 32*MB//a.itemsize   # will write out signal in such blocks
    for t0 in xrange(0, len(a), blocksize):
        ablk = a[t0:t0+blocksize]

        ablk[:] = 1             # at = 1
        cumsum(ablk, out=ablk)  # at = t-t0+1
        ablk += (t0-1)          # at = t
        ablk *= f               # at = f⋅t
        sin(ablk, out=ablk)     # at = sin(f⋅t)
        ablk *= M               # at = M⋅sin(f⋅t)

        note = 'gen signal blk [%s:%s]  (%.1f%%)' % (t0, t0+len(ablk), 100. * (t0+len(ablk)) / len(a))
        txn = transaction.get()
        txn.note(note)
        txn.commit()
        print(note)



def usage():
93 94 95 96 97 98 99
    print(
"""Usage: %s [options] (gen|read) <dburi>

options:

    --worksize=<n>      generate array of size n*MB (default 2*RAM)
""" % sys.argv[0], file=sys.stderr)
100 101
    sys.exit(1)

102
@func
103
def main():
104 105 106 107 108 109 110 111 112
    worksize = None
    optv, argv = getopt.getopt(sys.argv[1:], '', ['worksize='])
    for opt, v in optv:
        if opt == '--worksize':
            worksize = int(v) * MB

        else:
            usage()

113
    try:
114 115
        act = argv[0]
        dburi = argv[1]
116 117 118 119 120 121 122
    except IndexError:
        usage()

    if act not in ('gen', 'read'):
        usage()

    ram_nbytes = psutil.virtual_memory().total
123
    print('I: RAM:  %.2fGB' % (float(ram_nbytes) / GB))
124

125
    root = dbopen(dburi)
126
    defer(lambda: dbclose(root))
127 128

    if act == 'gen':
129 130 131
        if worksize is None:
            worksize = 2*ram_nbytes
        print('I: WORK: %.2fGB' % (float(worksize) / GB))
132
        sig_dtype = dtype(float64)
133
        sig_len   = worksize // sig_dtype.itemsize
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
        sig = ZBigArray((sig_len,), sig_dtype)
        root['signalv'] = sig

        # ZBigArray requirement: before we can compute it (with subobject
        # .zfile) have to be made explicitly known to connection or current
        # transaction committed
        transaction.commit()

        gen(sig)

    elif act == 'read':
        read(root['signalv'])

    import os
    p = psutil.Process(os.getpid())
    m = p.memory_info()
    print('VIRT: %i MB\tRSS: %iMB' % (m.vms//MB, m.rss//MB))


if __name__ == '__main__':
    main()