test.erp5.testWendelin.py 9.24 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002-2015 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################

from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
29
from wendelin.bigarray.array_zodb import ZBigArray
30
from DateTime import DateTime
31
from cStringIO import StringIO
32 33
import msgpack
import numpy as np
34 35
import string
import random
36
import urllib
37

Ivan Tyagov's avatar
Ivan Tyagov committed
38 39
def getRandomString():
  return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \
40
    for _ in xrange(32)])
Ivan Tyagov's avatar
Ivan Tyagov committed
41 42 43 44 45
    
def chunks(l, n):
  """Yield successive n-sized chunks from l."""
  for i in xrange(0, len(l), n):
    yield l[i:i+n]
Ivan Tyagov's avatar
Ivan Tyagov committed
46

47 48 49 50 51 52 53 54
class Test(ERP5TypeTestCase):
  """
  Wendelin Test
  """

  def getTitle(self):
    return "Wendelin Test"

55
  def test_01_IngestionFromFluentd(self, old_fluentd=False):
Ivan Tyagov's avatar
Ivan Tyagov committed
56 57 58 59 60 61
    """
    Test ingestion using a POST Request containing a msgpack encoded message
    simulating input from fluentd.
    """
    portal = self.portal
    request = portal.REQUEST
62

Ivan Tyagov's avatar
Ivan Tyagov committed
63 64 65
    ingestion_policy = portal.restrictedTraverse("portal_ingestion_policies/wendelin_1")
    data_supply = portal.restrictedTraverse("data_supply_module/wendelin_1")
    reference = 'wendelin-default-ingestion'
Ivan Tyagov's avatar
Ivan Tyagov committed
66 67 68 69 70 71 72
    number_string_list = []
    for my_list in list(chunks(range(0, 100001), 10)):
      number_string_list.append(','.join([str(x) for x in my_list]))
    real_data = '\n'.join(number_string_list)
    # make sure real_data tail is also a full line
    real_data += '\n'

73 74 75 76 77 78 79
    # simulate fluentd
    body = msgpack.packb([0, real_data], use_bin_type=True)
    if old_fluentd:
      env = {'CONTENT_TYPE': 'application/x-www-form-urlencoded'}
      body = urllib.urlencode({'data_chunk': body})
    else:
      env = {'CONTENT_TYPE': 'application/octet-stream'}
80 81 82 83 84 85 86 87 88
    path = ingestion_policy.getPath() + '/ingest?reference=' + reference
    publish_kw = dict(basic='ERP5TypeTestCase:', env=env,
      request_method='POST', stdin=StringIO(body))
    response = self.publish(path, **publish_kw)
    # Due to inconsistencies in the Zope framework,
    # a normal instance returns 204. As explained at
    # http://blog.ploeh.dk/2013/04/30/rest-lesson-learned-avoid-204-responses/
    # turning 200 into 204 automatically when the body is empty is questionable.
    self.assertEqual(200, response.getStatus())
89

90 91 92 93
    # at every ingestion if no specialised Data Ingestion exists it is created
    # thus it is needed to wait for server side activities to be processed
    self.tic()

Ivan Tyagov's avatar
Ivan Tyagov committed
94 95 96 97
    # get related Data ingestion
    data_ingestion = data_supply.Base_getRelatedObjectList(portal_type='Data Ingestion')[0]
    self.assertNotEqual(None, data_ingestion)
    data_ingestion_line = [x for x in data_ingestion.objectValues() if x.getReference() == 'out_data_stream'][0]
98

Ivan Tyagov's avatar
Ivan Tyagov committed
99 100
    data_stream = data_ingestion_line.getAggregateValue()
    self.assertEqual('Data Stream', data_stream.getPortalType())
101

102
    data_stream_data = data_stream.getData()
103
    self.assertEqual(real_data, data_stream_data)
Ivan Tyagov's avatar
Ivan Tyagov committed
104

Ivan Tyagov's avatar
Ivan Tyagov committed
105
    # try sample transformation
106 107 108 109 110
    data_array = portal.data_array_module.newContent(
                          portal_type = 'Data Array', 
                          reference = reference)
    data_array.validate()
    self.tic()
111

112
    data_stream.DataStream_transform(\
Ivan Tyagov's avatar
Ivan Tyagov committed
113
        chunk_length = 10450, \
114
        transform_script_id = 'DataStream_copyCSVToDataArray',
Ivan Tyagov's avatar
Ivan Tyagov committed
115
        data_array_reference = reference)
116
    self.tic()
Ivan Tyagov's avatar
Ivan Tyagov committed
117

Ivan Tyagov's avatar
Ivan Tyagov committed
118
    # test that extracted array contains same values as input CSV
119
    zarray = data_array.getArray()
Ivan Tyagov's avatar
Ivan Tyagov committed
120 121
    self.assertEqual(np.average(zarray), np.average(np.arange(100001)))
    self.assertTrue(np.array_equal(zarray, np.arange(100001)))
122

123 124 125
    # clean up
    data_array.invalidate()
    data_stream.setData('')
126
    self.tic()
Ivan Tyagov's avatar
Ivan Tyagov committed
127 128


129 130
  def test_01_1_IngestionFromOldFluentd(self):
    self.test_01_IngestionFromFluentd(True)
Ivan Tyagov's avatar
Ivan Tyagov committed
131

Ivan Tyagov's avatar
Ivan Tyagov committed
132 133 134 135 136 137 138
  def test_01_02_ParallelTransformation(self):
    """
    test parallel execution.
    Note: determining row length is important in this case
    """
    portal = self.portal
    reference = getRandomString()
139
    
Ivan Tyagov's avatar
Ivan Tyagov committed
140 141 142 143
    row = ','.join(['%s' %x for x in range(1000)])
    number_string_list = [row]*20
    real_data = '\n'.join(number_string_list)

Ivan Tyagov's avatar
Ivan Tyagov committed
144 145 146
    data_stream = portal.data_stream_module.newContent(
                    portal_type = 'Data Stream',
                    reference = reference)
Ivan Tyagov's avatar
Ivan Tyagov committed
147
    data_stream.appendData(real_data)
Ivan Tyagov's avatar
Ivan Tyagov committed
148 149 150 151 152
    data_stream.validate()
    data_array = portal.data_array_module.newContent(
                          portal_type = 'Data Array', 
                          reference = reference)
    data_array.validate()
Ivan Tyagov's avatar
Ivan Tyagov committed
153 154 155 156 157 158 159 160 161 162 163
    self.tic()
    
    data_stream.DataStream_transform(\
        chunk_length = len(row), \
        transform_script_id = 'DataStream_copyCSVToDataArray',
        data_array_reference = reference,
        parallelize = 1)

    self.tic()


164
  def test_02_Examples(self):
165 166 167 168 169 170
    """
      Test we can use python scientific libraries by using directly created
      Wendelin examples.
    """
    portal = self.portal
    portal.game_of_life()
171 172
    portal.game_of_life_out_of_core()
    portal.game_of_life_out_of_core_activities()
173 174 175 176 177 178 179
    
  def test_03_DataArray(self):
    """
      Test persistently saving a ZBig Array to a Data Array.
    """
    data_array = self.portal.data_array_module.newContent( \
                   portal_type = 'Data Array')
180
    self.assertEqual(None, data_array.getArray())
181
    data_array.initArray((3, 3), np.uint8)
182 183
    self.tic()
    
184
    # test array stored and we return ZBig Array instance
185
    persistent_zbig_array = data_array.getArray()
186
    self.assertEqual(ZBigArray, persistent_zbig_array.__class__)
187
    
188 189 190 191 192 193 194 195 196 197 198 199
    # try to resize its numpy "view" and check that persistent one is not saved
    # as these are differerent objects
    pure_numpy_array = persistent_zbig_array[:,:] # ZBigArray -> ndarray view of it
    pure_numpy_array = np.resize(pure_numpy_array, (4, 4))
    self.assertNotEquals(pure_numpy_array.shape, persistent_zbig_array.shape)
    
    # test copy numpy -> wendelin but first resize persistent one (add new one)
    data_array.initArray((4, 4), np.uint8)
    persistent_zbig_array = data_array.getArray()
    new_array = np.arange(1,17).reshape((4,4))
    persistent_zbig_array[:,:] = new_array
    self.assertEquals(new_array.shape, persistent_zbig_array.shape)
200
    self.assertTrue(np.array_equal(new_array, persistent_zbig_array))
201
    
202 203
    # test set element in zbig array
    persistent_zbig_array[:2, 2] = 0
204
    self.assertFalse(np.array_equal(new_array, persistent_zbig_array))
205

206
    # resize Zbig Array
207
    persistent_zbig_array = np.resize(persistent_zbig_array, (100,100))
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
    self.assertNotEquals(pure_numpy_array.shape, persistent_zbig_array.shape)
    
    # get array slice (fails)
    data_array = self.portal.data_array_module.newContent( \
                   portal_type = 'Data Array')
    shape = (1000,)
    data_array.initArray(shape, np.uint8)
    self.tic()
    
    persistent_zbig_array = data_array.getArray()
    new_array = np.arange(1000)
    new_array.resize(shape)

    self.assertEquals(new_array.shape, persistent_zbig_array.shape)
        
    persistent_zbig_array[:,] = new_array
    self.tic()

    self.assertTrue(
           np.array_equal(data_array.getArraySlice(0,100), \
                          new_array[:100]))
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261

  def test_04_DataBucket(self):
    """
      Test data bucket
    """
    bucket_stream = self.portal.data_stream_module.newContent( \
                                portal_type = 'Data Bucket Stream')
    self.tic()
    
    self.assertEqual(0, len(bucket_stream))
    
    # test set and get
    bin_string = "1"*100000
    key = len(bucket_stream) + 1
    bucket_stream.insertBucket(key, bin_string )
    self.assertEqual(bin_string, bucket_stream.getBucket(key))
    
    # test sequence
    self.assertEqual(1, len(bucket_stream))
    
    # test pop
    bucket_stream.popBucket(key)
    self.assertEqual(0, len(bucket_stream))
    
    # set many buckets
    for i in range(100):
      bucket_stream.insertBucket(i, i*10000)

    self.assertEqual(100, len(bucket_stream))
    self.assertEqual(range(100), bucket_stream.getKeyList())
    
    # test as sequence
    bucket = bucket_stream.getBucketItemSequence(start_key=10, count=1)[0]
262
    self.assertEqual(100000, bucket[1].value)