Source code for oasislmf.pytools.fm.financial_structure
__all__ = [
'extract_financial_structure',
'load_financial_structure',
'create_financial_structure',
'load_static',
]
import logging
import os
import numpy as np
from numba import from_dtype, njit, types
from numba.typed import Dict, List
from oasislmf.pytools.common.data import (load_as_ndarray, load_as_array, almost_equal,
fm_policytc_dtype,
fm_profile_csv_col_map, fm_profile_dtype, fm_profile_step_dtype,
fm_programme_dtype,
fm_xref_csv_col_map, fm_xref_dtype,
items_dtype,
oasis_int, nb_oasis_int, oasis_float, null_index)
from .common import (allowed_allocation_rule, need_extras, need_tiv_policy)
logger = logging.getLogger(__name__)
# temp dictionary types
node_type = types.UniTuple(nb_oasis_int, 2)
output_type = types.UniTuple(nb_oasis_int, 2)
layer_type = types.UniTuple(nb_oasis_int, 3)
# financial structure processed array
nodes_array_dtype = from_dtype(np.dtype([('node_id', np.uint64),
('level_id', oasis_int),
('agg_id', oasis_int),
('layer_len', oasis_int),
('cross_layer_profile', oasis_int),
('profile_len', oasis_int),
('profiles', oasis_int),
('loss', oasis_int),
('net_loss', oasis_int),
('extra', oasis_int),
('is_reallocating', np.uint8),
('parent_len', oasis_int),
('parent', oasis_int),
('children', oasis_int),
('output_ids', oasis_int),
]))
compute_info_dtype = from_dtype(np.dtype([('allocation_rule', oasis_int),
('max_level', oasis_int),
('max_layer', oasis_int),
('node_len', oasis_int),
('children_len', oasis_int),
('parents_len', oasis_int),
('profile_len', oasis_int),
('loss_len', oasis_int),
('extra_len', oasis_int),
('compute_len', oasis_int),
('start_level', oasis_int),
('items_len', oasis_int),
('output_len', oasis_int),
('stepped', np.bool_),
]))
profile_index_dtype = from_dtype(np.dtype([('i_start', oasis_int),
('i_end', oasis_int),
]))
[docs]
def load_static(static_path):
"""
Load the raw financial data from static_path as numpy ndarray
first check if .bin file is present then try .cvs
try loading profile_step before falling back to normal profile,
:param static_path: str
static_path
:return:
programme : link between nodes
policytc : info on layer
profile : policy profile can be profile_step or profile
xref : node to output_id
items : items (item_id and coverage_id mapping)
coverages : Tiv value for each coverage id
:raise:
FileNotFoundError if one of the static is missing
"""
programme = load_as_ndarray(static_path, 'fm_programme', fm_programme_dtype)
policytc = load_as_ndarray(static_path, 'fm_policytc', fm_policytc_dtype)
profile = load_as_ndarray(static_path, 'fm_profile_step', fm_profile_step_dtype, False, col_map=fm_profile_csv_col_map)
if len(profile) == 0:
profile = load_as_ndarray(static_path, 'fm_profile', fm_profile_dtype, col_map=fm_profile_csv_col_map)
stepped = None
else:
stepped = True
xref = load_as_ndarray(static_path, 'fm_xref', fm_xref_dtype, col_map=fm_xref_csv_col_map)
items = load_as_ndarray(static_path, 'items', items_dtype, must_exist=False)[['item_id', 'coverage_id']]
coverages = load_as_array(static_path, 'coverages', oasis_float, must_exist=False)
if np.unique(items['coverage_id']).shape[0] != coverages.shape[0]:
# one of the file is missing we default to empty array
items = np.empty(0, dtype=items_dtype)
coverages = np.empty(0, dtype=oasis_float)
return programme, policytc, profile, stepped, xref, items, coverages
@njit(cache=True)
def does_nothing(profile):
"""
evaluate if the profile is just doing nothing to the loss.
this allows to save some memory and compulation time and memory during the calculation
:param profile: np.array of fm_profile_dtype or fm_profile_step_dtype
profile
:return:
boolean : True is profile is actually doing nothing
"""
return ((profile['calcrule_id'] == 100) or
(profile['calcrule_id'] == 12 and almost_equal(profile['deductible_1'], 0)) or
(profile['calcrule_id'] == 15 and almost_equal(profile['limit_1'], 1)) or
(profile['calcrule_id'] == 16 and almost_equal(profile['deductible_1'], 0)) or
(profile['calcrule_id'] == 34 and almost_equal(profile['deductible_1'], 0)
and almost_equal(profile['attachment_1'], 0)
and almost_equal(profile['share_1'], 1))
)
@njit(cache=True)
def get_all_children(node_to_dependencies, node, items_only):
children = List()
temp = List()
temp.append(node)
# children = []
# temp = [node]
while temp:
parent = temp.pop()
if parent in node_to_dependencies:
if not items_only:
children.append(parent)
temp.extend(node_to_dependencies[parent])
else:
children.append(parent)
return children
@njit(cache=True)
def get_all_parent(node_to_dependencies, node, max_level):
res = set()
temp = List()
temp.extend(node)
while temp:
cur_node = temp.pop()
if cur_node in node_to_dependencies:
temp.extend(node_to_dependencies[cur_node])
if cur_node[0] == max_level:
res.add(cur_node)
elif cur_node[0] <= max_level:
res.add(cur_node)
return List(res)
@njit(cache=True)
def is_multi_peril(fm_programme):
for i in range(fm_programme.shape[0]):
if fm_programme[i]['level_id'] == 1 and fm_programme[i]['from_agg_id'] != fm_programme[i]['to_agg_id']:
return True
else:
return False
@njit(cache=True)
def get_tiv(children, items, coverages):
used_cov = np.zeros_like(coverages, dtype=np.uint8)
tiv = 0
for child_programme in children:
coverage_i = items[child_programme[1] - 1]['coverage_id'] - 1
if not used_cov[coverage_i]:
used_cov[coverage_i] = 1
tiv += coverages[coverage_i]
return tiv
@njit(cache=True)
def prepare_profile_simple(profile, tiv):
# if use TIV convert calcrule to fix deductible
if profile['calcrule_id'] == 4:
profile['calcrule_id'] = 1
profile['deductible_1'] *= tiv
elif profile['calcrule_id'] == 6:
profile['calcrule_id'] = 12
profile['deductible_1'] *= tiv
elif profile['calcrule_id'] == 18:
profile['calcrule_id'] = 2
profile['deductible_1'] *= tiv
elif profile['calcrule_id'] == 21:
profile['calcrule_id'] = 13
profile['deductible_1'] *= tiv
elif profile['calcrule_id'] == 9:
profile['calcrule_id'] = 1
profile['deductible_1'] *= profile['limit_1']
elif profile['calcrule_id'] == 15:
if profile['limit_1'] >= 1:
profile['calcrule_id'] = 12
@njit(cache=True)
def prepare_profile_stepped(profile, tiv):
# if use TIV convert calcrule to fix deductible
if profile['calcrule_id'] == 27:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = min(max(profile['payout_start'] * tiv - profile['deductible_1'], 0), profile['limit_1'])
cond_loss = min(loss * profile['scale_2'], profile['limit_2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale_1'])
elif profile['calcrule_id'] == 28:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
profile['scale_1'] += 1
if profile['payout_start'] == 0: # backward compatibility v1.22.x
profile['calcrule_id'] = 281
elif profile['calcrule_id'] == 29:
profile['calcrule_id'] = 27
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = max(profile['payout_start'] * tiv - profile['deductible_1'], 0)
cond_loss = min(loss * profile['scale_2'], profile['limit_2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale_1'])
elif profile['calcrule_id'] == 30:
profile['calcrule_id'] = 27
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = max(profile['payout_start'] * profile['limit_1'] - profile['deductible_1'], 0)
cond_loss = min(loss * profile['scale_2'], profile['limit_2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale_1'])
elif profile['calcrule_id'] == 31:
profile['calcrule_id'] = 27
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = max(profile['payout_start'] - profile['deductible_1'], 0)
cond_loss = min(loss * profile['scale_2'], profile['limit_2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale_1'])
elif profile['calcrule_id'] == 32:
profile['scale_1'] += 1
elif profile['calcrule_id'] == 37:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
profile['scale_1'] += 1
elif profile['calcrule_id'] == 38:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
profile['scale_1'] += 1
else:
prepare_profile_simple(profile, tiv)
@njit(cache=True)
[docs]
def extract_financial_structure(allocation_rule, fm_programme, fm_policytc, fm_profile, stepped, fm_xref, items, coverages):
"""
:param allocation_rule:
option to indicate out the loss are allocated to the output
:param fm_programme:
structure of the levels
:param fm_policytc:
structure of the layers and policy_id to apply
:param fm_profile:
definition of the policy_id
:param fm_xref:
mapping between the output of the allocation and output item_id
:return:
compute_infos:
nodes_array:
node_parents_array:
node_profiles_array:
output_array:
"""
##### profile_id_to_profile_index ####
# policies may have multiple step, crate a mapping between profile_id and the start and end index in fm_profile file
max_profile_id = np.max(fm_profile['profile_id'])
profile_id_to_profile_index = np.empty(max_profile_id + 1, dtype=profile_index_dtype)
has_tiv_policy = Dict.empty(nb_oasis_int, nb_oasis_int)
last_profile_id = 0 # real profile_id start at 1
for i in range(fm_profile.shape[0]):
if fm_profile[i]['calcrule_id'] in need_tiv_policy:
has_tiv_policy[fm_profile[i]['profile_id']] = nb_oasis_int(0)
profile_id_to_profile_index[fm_profile[i]['profile_id']]['i_end'] = i + 1
if last_profile_id != fm_profile[i]['profile_id']:
profile_id_to_profile_index[fm_profile[i]['profile_id']]['i_start'] = i
last_profile_id = fm_profile[i]['profile_id']
# in fm_programme check if multi-peril and get size of each levels
max_level = np.max(fm_programme['level_id'])
level_node_len = np.zeros(max_level + 1, dtype=oasis_int)
multi_peril = False
for i in range(fm_programme.shape[0]):
programme = fm_programme[i]
if programme['level_id'] == 1 and programme['from_agg_id'] != programme['to_agg_id']:
multi_peril = True
if level_node_len[programme['level_id'] - 1] < programme['from_agg_id']:
level_node_len[programme['level_id'] - 1] = programme['from_agg_id']
if level_node_len[programme['level_id']] < programme['to_agg_id']:
level_node_len[programme['level_id']] = programme['to_agg_id']
##### fm_policytc (level_id agg_id layer_id => profile_id) #####
# programme_node_to_layers dict of (level_id, agg_id) => list of (layer_id, policy_index_start, policy_index_end)
# for each policy needing tiv, we duplicate the policy for each node to then later on calculate the % tiv parameters
programme_node_to_layers = Dict.empty(node_type, List.empty_list(layer_type))
i_new_fm_profile = fm_profile.shape[0]
new_fm_profile_list = List.empty_list(np.int64)
for i in range(fm_policytc.shape[0]):
policytc = fm_policytc[i]
programme_node = (nb_oasis_int(policytc['level_id']), nb_oasis_int(policytc['agg_id']))
i_start = profile_id_to_profile_index[nb_oasis_int(policytc['profile_id'])]['i_start']
i_end = profile_id_to_profile_index[nb_oasis_int(policytc['profile_id'])]['i_end']
if policytc['profile_id'] in has_tiv_policy:
if has_tiv_policy[policytc['profile_id']]:
for j in range(i_start, i_end):
new_fm_profile_list.append(j)
i_start, i_end = i_new_fm_profile, i_new_fm_profile + i_end - i_start
i_new_fm_profile = i_end
else:
has_tiv_policy[policytc['profile_id']] = nb_oasis_int(1)
layer = (nb_oasis_int(policytc['layer_id']), nb_oasis_int(i_start), nb_oasis_int(i_end))
if programme_node not in programme_node_to_layers:
_list = List.empty_list(layer_type)
_list.append(layer)
programme_node_to_layers[programme_node] = _list
else:
programme_node_to_layers[programme_node].append(layer)
# create a new fm_profile with all the needed duplicated %tiv profiles
if i_new_fm_profile - fm_profile.shape[0]:
new_fm_profile = np.empty(i_new_fm_profile, dtype=fm_profile.dtype)
new_fm_profile[:fm_profile.shape[0]] = fm_profile[:]
for i in range(i_new_fm_profile - fm_profile.shape[0]):
new_fm_profile[fm_profile.shape[0] + i] = fm_profile[new_fm_profile_list[i]]
fm_profile = new_fm_profile
# fm_xref
if multi_peril: # if single peril we can skip item level computation (level 0)
start_level = nb_oasis_int(0)
else:
start_level = nb_oasis_int(1)
if start_level == max_level: # there is only one level we can switch the computation as if it were a0
allocation_rule = 0
if allocation_rule == 0:
out_level = nb_oasis_int(max_level)
else:
out_level = start_level
##### xref #####
# create mapping node (level_id, agg_id) => list of (layer_id, output_id)
node_to_output_id = Dict.empty(node_type, List.empty_list(output_type))
output_len = 0
for i in range(fm_xref.shape[0]):
xref = fm_xref[i]
programme_node = (out_level, xref['agg_id'])
if output_len < xref['output_id']:
output_len = nb_oasis_int(xref['output_id'])
if programme_node in node_to_output_id:
node_to_output_id[programme_node].append((nb_oasis_int(xref['layer_id']), nb_oasis_int(xref['output_id'])))
else:
_list = List.empty_list(output_type)
_list.append((nb_oasis_int(xref['layer_id']), nb_oasis_int(xref['output_id'])))
node_to_output_id[programme_node] = _list
##### programme ####
# node_layers will contain the number of layer for each nodes
node_layers = Dict.empty(node_type, nb_oasis_int)
node_cross_layers = Dict.empty(node_type, nb_oasis_int)
# fill up node_layers with the number of policies for each node
for programme in fm_programme:
parent = (nb_oasis_int(programme['level_id']), nb_oasis_int(programme['to_agg_id']))
if parent not in node_layers:
node_layers[parent] = nb_oasis_int(len(programme_node_to_layers[parent]))
# create 2 mapping to get the parents and the childs of each nodes
# update the number of layer for nodes based on the number of layer of their parents
# go through each level from top to botom
parent_to_children = Dict.empty(node_type, List.empty_list(node_type))
child_to_parents = Dict.empty(node_type, List.empty_list(node_type))
children_len = 1
parents_len = 0
# go through each level from top to botom
for level in range(max_level, start_level, -1):
for programme in fm_programme:
if programme['level_id'] == level:
parent = (nb_oasis_int(programme['level_id']), nb_oasis_int(programme['to_agg_id']))
if programme['from_agg_id'] > 0: # level of node is programme['level_id'] - 1
child_programme = (nb_oasis_int(programme['level_id'] - 1), nb_oasis_int(programme['from_agg_id']))
else: # negative agg_id level is item level
child_programme = (start_level, nb_oasis_int(-programme['from_agg_id']))
if parent not in parent_to_children:
children_len += 2
_list = List.empty_list(node_type)
_list.append(child_programme)
parent_to_children[parent] = _list
# parent_to_children[parent] = [child_programme]
else:
children_len += 1
parent_to_children[parent].append(child_programme)
parents_len += 1
if child_programme not in child_to_parents:
_list = List.empty_list(node_type)
_list.append(parent)
child_to_parents[child_programme] = _list
else:
child_to_parents[child_programme].insert(0, parent)
# child_to_parents[child_programme] = [parent]
if child_programme not in node_layers or node_layers[child_programme] < node_layers[parent]:
node_layers[child_programme] = node_layers[parent]
elif node_layers[child_programme] > node_layers[parent]: # cross layer node
grand_parents = get_all_parent(child_to_parents, [parent], max_level)
for grand_parent in grand_parents:
if node_layers[grand_parent] < node_layers[child_programme]:
node_cross_layers[grand_parent] = nb_oasis_int(1)
node_layers[parent] = node_layers[child_programme]
# compute number of steps (steps), max size of each level node_level_start, max size of node to compute (compute_len)
node_level_start = np.zeros(level_node_len.shape[0] + 1, oasis_int)
for i in range(start_level, level_node_len.shape[0]):
node_level_start[i + 1] = node_level_start[i] + level_node_len[i]
steps = max_level + (1 - start_level)
compute_len = node_level_start[-1] + steps + level_node_len[-1] + 1
output_array_size = 0
for node, layer_size in node_layers.items():
if node[0] == out_level:
output_array_size += layer_size
nodes_array = np.empty(node_level_start[-1] + 1, dtype=nodes_array_dtype)
node_parents_array = np.empty(parents_len, dtype=oasis_int)
node_profiles_array = np.zeros(fm_policytc.shape[0] + 1, dtype=profile_index_dtype)
output_array = np.zeros(output_array_size, dtype=oasis_int)
node_i = 1
children_i = 1
parents_i = 0
profile_i = 1
loss_i = 0
extra_i = 0
output_i = 0
for level in range(start_level, max_level + 1):
for agg_id in range(1, level_node_len[level] + 1):
node_programme = (nb_oasis_int(level), nb_oasis_int(agg_id))
node = nodes_array[node_i]
node['node_id'] = node_i
node_i += 1
node['level_id'] = level
node['agg_id'] = agg_id
# layers
node['layer_len'] = node_layers[node_programme]
node['cross_layer_profile'] = 0 # set default to 0 change if it is a cross_layer_profile after
node['loss'], loss_i = loss_i, loss_i + node['layer_len']
if level == start_level:
node['net_loss'], loss_i = loss_i, loss_i + 1
node['extra'] = null_index
node['is_reallocating'] = 0
# children
if node_programme in parent_to_children:
children = parent_to_children[node_programme]
node['children'], children_i = children_i, children_i + 1 + len(children)
else:
node['children'] = 0
# parent
if node_programme in child_to_parents:
parents = child_to_parents[node_programme]
node['parent_len'] = len(parents)
node['parent'] = parents_i
for parent in parents:
node_parents_array[parents_i], parents_i = node_level_start[parent[0]] + parent[1], nb_oasis_int(parents_i + 1)
else:
node['parent_len'] = 0
# profiles
if node_programme in programme_node_to_layers:
profiles = programme_node_to_layers[node_programme]
if node_programme in node_cross_layers:
node['profile_len'] = node['layer_len']
node['cross_layer_profile'] = 1
else:
node['profile_len'] = len(profiles)
node['profiles'] = profile_i
for layer_id, i_start, i_end in sorted(profiles):
node_profile, profile_i = node_profiles_array[profile_i], profile_i + 1
node_profile['i_start'] = i_start
node_profile['i_end'] = i_end
# if use TIV we compute it and precompute % TIV values
for profile_index in range(i_start, i_end):
if fm_profile[profile_index]['calcrule_id'] in need_tiv_policy:
all_children = get_all_children(parent_to_children, node_programme, True)
tiv = get_tiv(all_children, items, coverages)
break
else:
tiv = 0
for profile_index in range(i_start, i_end):
profile = fm_profile[profile_index]
if stepped is None:
prepare_profile_simple(profile, tiv)
else:
prepare_profile_stepped(profile, tiv)
if does_nothing(profile):
# only non step policy can "does_nothing" so this is safe
node_profile['i_end'] = node_profile['i_start']
# check if we need to compute extras (min and max ded policies)
for profile_index in range(i_start, i_end):
if fm_profile[profile_index]['calcrule_id'] in need_extras:
node['is_reallocating'] = allocation_rule == 2 and fm_profile[profile_index]['calcrule_id'] != 27
items_child = get_all_children(parent_to_children, node_programme, True)
all_parent = get_all_parent(child_to_parents, items_child, node_programme[0])
for parent in all_parent:
all_children = get_all_children(parent_to_children, parent, False)
for child_programme in all_children: # include current node
child = nodes_array[node_level_start[child_programme[0]] + child_programme[1]]
if child['extra'] == null_index:
child['extra'], extra_i = extra_i, extra_i + node['layer_len']
break
else: # item level has no profile
node['profile_len'] = 1
node['profiles'] = 0
if level == out_level:
if node_programme in node_to_output_id:
node['output_ids'], output_i = output_i, output_i + node['layer_len']
for layer, output_id in node_to_output_id[node_programme]:
output_array[node['output_ids'] + layer - 1] = output_id
else:
raise KeyError("Some output nodes are missing output_ids")
compute_infos = np.empty(1, dtype=compute_info_dtype)
compute_info = compute_infos[0]
compute_info['allocation_rule'] = allocation_rule
compute_info['max_level'] = max_level
compute_info['node_len'] = node_i
compute_info['children_len'] = children_i
compute_info['parents_len'] = parents_i
compute_info['profile_len'] = profile_i
compute_info['loss_len'] = loss_i
compute_info['extra_len'] = extra_i
compute_info['compute_len'] = compute_len
compute_info['start_level'] = start_level
compute_info['items_len'] = level_node_len[0]
compute_info['output_len'] = output_len
compute_info['stepped'] = stepped is not None
compute_info['max_layer'] = max(nodes_array['layer_len'][1:])
return compute_infos, nodes_array, node_parents_array, node_profiles_array, output_array, fm_profile
[docs]
def create_financial_structure(allocation_rule, static_path):
"""
:param allocation_rule: int
back-allocation rule
:param static_path: string
path to the static files
:return:
compute_queue : the step of the computation to perform on each event
node_indexes : map node to index of item in result array
index_dependencies : map node to its dependent indexes
node_profile : map node to profile
output_item_index : list of item_id, index to put in the output
"""
if allocation_rule not in allowed_allocation_rule:
raise ValueError(f"allocation_rule must be in {allowed_allocation_rule}, found {allocation_rule}")
if allocation_rule == 3:
allocation_rule = 2
fm_programme, fm_policytc, fm_profile, stepped, fm_xref, items, coverages = load_static(static_path)
financial_structure = extract_financial_structure(allocation_rule, fm_programme, fm_policytc, fm_profile,
stepped, fm_xref, items, coverages)
compute_info, nodes_array, node_parents_array, node_profiles_array, output_array, fm_profile = financial_structure
logger.info(f'nodes_array has {len(nodes_array)} elements')
logger.info(f'compute_info : {dict(zip(compute_info.dtype.names, compute_info[0]))}')
np.save(os.path.join(static_path, f'compute_info_{allocation_rule}'), compute_info)
np.save(os.path.join(static_path, f'nodes_array_{allocation_rule}'), nodes_array)
np.save(os.path.join(static_path, f'node_parents_array_{allocation_rule}'), node_parents_array)
np.save(os.path.join(static_path, f'node_profiles_array_{allocation_rule}'), node_profiles_array)
np.save(os.path.join(static_path, f'output_array_{allocation_rule}'), output_array)
np.save(os.path.join(static_path, 'fm_profile'), fm_profile)
[docs]
def load_financial_structure(allocation_rule, static_path):
compute_info = np.load(os.path.join(static_path, f'compute_info_{allocation_rule}.npy'), mmap_mode='r')
nodes_array = np.load(os.path.join(static_path, f'nodes_array_{allocation_rule}.npy'), mmap_mode='r')
node_parents_array = np.load(os.path.join(static_path, f'node_parents_array_{allocation_rule}.npy'), mmap_mode='r')
node_profiles_array = np.load(os.path.join(static_path, f'node_profiles_array_{allocation_rule}.npy'), mmap_mode='r')
output_array = np.load(os.path.join(static_path, f'output_array_{allocation_rule}.npy'), mmap_mode='r')
fm_profile = np.load(os.path.join(static_path, 'fm_profile.npy'), mmap_mode='r')
return compute_info, nodes_array, node_parents_array, node_profiles_array, output_array, fm_profile