Commit 235cf425 authored by Douglas's avatar Douglas

first commit

parents
from wendelin.bigarray.array_zodb import ZBigArray
import transaction
import numpy as np
import pandas as pd
from numpy import float64, dtype, cumsum, sin, uint8
from DateTime import DateTime
from time import time
import sys
import psutil
import os
import cProfile
import pstats
def bigger_than_memory(self):
message_list = []
message_list.extend(bigger_than_memory_write(self))
message_list.extend(bigger_than_memory_read(self))
return message_list
def bigger_than_memory_write(self, out_of_core_index=False):
message_list = []
message_list.append('Write start: ' + get_process_memory_usage())
#ram_nbytes = psutil.virtual_memory().total
#big_array_len = (1*ram_nbytes) // big_array_dtype.itemsize
_, array_schema = get_field_names_and_schema()
big_array = create_zbigarray((360000,), dtype=array_schema)
big_index = create_zbigarray((1, 360000), dtype(float64)) if out_of_core_index else np.arange(360000)
root = self.getPhysicalRoot()
if out_of_core_index:
big_array, big_index = store_arrays(root, [big_array, 'big_array'], [big_index, 'big_index'])
else:
big_array = store_arrays(root, [big_array, 'big_array'])[0]
response = populate_array(self, 30000, 12, big_array, big_index, array_schema)
message_list.extend(response['messages'])
message_list.append('Write end: ' + get_process_memory_usage())
return message_list
def bigger_than_memory_read(self, out_of_core_index=False):
message_list = []
message_list.append('Read start ' + get_process_memory_usage())
root = self.getPhysicalRoot()
# We don't have the columns names here, so we set manually the 14th column
# name to 'quantity' to simplify things.
columns = range(23)
columns[13] = 'quantity'
big_index = root.big_index[:] if out_of_core_index else np.arange(360000)
response = process_data(root.big_array[:], big_index[:], columns)
message_list.extend(response['messages'])
message_list.append(response['result'])
message_list.append('Read end: ' + get_process_memory_usage())
return message_list
def bigger_than_memory_profile(self):
profile_path_template = '/tmp/profile_%s'
profiles = [bigger_than_memory_read_profile, bigger_than_memory_write_profile]
return [profile(self, profile_path_template) for profile in profiles]
def bigger_than_memory_read_profile(self, profile_path_template):
cProfile.runctx('bigger_than_memory_read(self)', globals(), locals(), profile_path_template % 'read')
return "To see the profile start a console on server and: import pstats; pstats.Stats('/tmp/profile/read').print_stats()"
def bigger_than_memory_write_profile(self, profile_path_template):
cProfile.runctx('bigger_than_memory_write(self)', globals(), locals(), profile_path_template % 'write')
return "To see the profile start a console on server and: import pstats; pstats.Stats('/tmp/profile/write').print_stats()"
def get_process_memory_usage():
process = psutil.Process(os.getpid())
m = process.memory_info()
return 'VIRT: %i MB\tRSS: %iMB' % (m.vms//MB, m.rss//MB)
def get_field_names_and_schema():
field_names = ['f%s' % number if not number == 13 else 'quantity' for number in range(23)]
array_schema = np.dtype({
'names' : field_names,
'formats': map(
np.dtype,
['i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8',
'i8', 'i8', 'i8', 'f8', 'i8', 'i8', 'f8', 'f8', 'f8', 'a50',
'a50', 'a50', 'a50']
)
})
return field_names, array_schema
def create_zbigarray(dimension, dtype):
return ZBigArray(dimension, dtype)
def store_arrays(root, *arrays_filenames):
for array, filename in arrays_filenames:
setattr(root, filename, None)
setattr(root, filename, array)
transaction.commit()
return [getattr(root, filename) for _, filename in arrays_filenames]
def populate_array(root, chunk_size, number_of_chunks, big_array, big_index, schema):
offset = 0
# ZSQL Method to fetch data from stocks table, chunk by chunk, and put it in
# the ZBigArray. Implemented using DTML's sqlvar instruction, LIMIT andd OFFSET.
start_populate_wendelin = mysql_time = time()
data = root.stock_offset(my_offset=0, my_limit=chunk_size)
mysql_time = time() - mysql_time
columns = data._names
tuples = data.tuples()
counter = 0
message_list = []
message_list.append('Running with %s chunks of %s rows.' % (number_of_chunks, chunk_size))
chunk_byte_size = sys.getsizeof(tuples)
message_list.append('Memory (in bytes) for each chunk: %s' % chunk_byte_size)
for i in range(number_of_chunks):
for index, row in enumerate(tuples):
tuples[index] = tuple([filter_item(item) for item in row])
if len(tuples) < chunk_size:
chunk_end = chunk_size*(i) + len(tuples)
else:
chunk_end = chunk_size*(i+1)
big_array[offset:chunk_end] = np.array(tuples, schema)
offset += chunk_size
temp_mysql_time = time()
data = root.stock_offset(my_offset=offset, my_limit=chunk_size)
mysql_time += time()-temp_mysql_time
tuples = data.tuples()
total_time = time() - start_populate_wendelin
message_list.append('%s seconds to load & write %s records into wendelin array. %s seconds on MySQL loads and %s writting to disk.' % (total_time, chunk_end, mysql_time, total_time-mysql_time))
transaction.commit()
message_list.append('Total size of out-of-core array: %s megabytes.' % (big_array.nbytes // MB))
return {'messages': message_list, 'columns': columns}
def filter_item(item):
if not item or isinstance(item, type(None)):
return 0
elif isinstance(item, DateTime):
return float(item)
else:
return item
def process_data(big_array, big_index, columns):
message_list = []
start = time()
df = pd.DataFrame.from_records(big_array, index=big_index, columns=columns)
result = df.quantity.sum()
finish = time()
message_list.append('%s seconds to read the wendelin array with %s rows.' % (finish-start, len(big_array)))
message_list.append('Total size of out-of-core array: %s megabytes.' % (big_array.nbytes // MB))
return { 'messages': message_list, 'result': result }
\ No newline at end of file
from statistics import median, stdev
results = {}
# Read is how we call the process of reading all the data to calculate the
# sum of a column.
hot_cache_read = {'name': 'Hot cache - read'}
hot_cache_read['data'] = [0.411816120148, 0.310564041138, 0.347157001495, 0.311173915863, 0.334348917007]
cold_cache_read = {'name': 'Cold cache - read'}
#cold_cache_read['data'] = [0.843955993652, 0.802474975586, 0.990427970886, 0.795747995377, 0.807102918625]
cold_cache_read['data'] = [1.63526105881, 2.28860187531, 1.74772405624, 1.64203691483, 1.85263895988]
mysql_hot_cache_read = {'name': 'MySQL hot cache - read'}
mysql_hot_cache_read['data'] = [0.15, 0.14, 0.15, 0.15, 0.15]
mysql_cold_cache_read = {'name': 'MySQL cold cache - read'}
mysql_cold_cache_read['data'] = [0.53, 0.57, 0.39, 0.42, 0.46]
# Write is how much time wendelin.core spend to write all data into the storage.
# It's not affected by cache.
write = {'name': 'Write'}
write['data'] = [25.7491948605, 25.300052166, 25.1220777035, 27.6381318569, 25.0087022781]
samples = [hot_cache_read, cold_cache_read, mysql_hot_cache_read, mysql_cold_cache_read, write]
for key, statistic_function in [['average', median], ['stdev', stdev], ['max', max], ['min', min]]:
for sample in samples:
sample[key] = statistic_function(sample['data'])
for sample in samples:
print sample
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment