Source code for oasislmf.pytools.fm.financial_structure
__all__ = [
'extract_financial_structure',
'load_financial_structure',
'create_financial_structure',
'load_static',
]
import logging
import os
import numpy as np
from numba import from_dtype, njit
from oasislmf.pytools.common.data import (load_as_ndarray, load_as_array, almost_equal,
fm_policytc_dtype,
fm_profile_dtype, fm_profile_step_dtype,
fm_programme_dtype,
fm_xref_dtype,
items_dtype,
oasis_int, nb_oasis_int, oasis_float, null_index)
from .common import (allowed_allocation_rule, need_extras, need_tiv_policy)
logger = logging.getLogger(__name__)
# financial structure processed array
nodes_array_dtype = from_dtype(np.dtype([('node_id', np.uint64),
('level_id', oasis_int),
('agg_id', oasis_int),
('layer_len', oasis_int),
('cross_layer_profile', oasis_int),
('profile_len', oasis_int),
('profiles', oasis_int),
('loss', oasis_int),
('net_loss', oasis_int),
('extra', oasis_int),
('is_reallocating', np.uint8),
('parent_len', oasis_int),
('parent', oasis_int),
('children', oasis_int),
('output_ids', oasis_int),
]))
compute_info_dtype = from_dtype(np.dtype([('allocation_rule', oasis_int),
('max_level', oasis_int),
('max_layer', oasis_int),
('node_len', oasis_int),
('children_len', oasis_int),
('parents_len', oasis_int),
('profile_len', oasis_int),
('loss_len', oasis_int),
('extra_len', oasis_int),
('compute_len', oasis_int),
('start_level', oasis_int),
('items_len', oasis_int),
('output_len', oasis_int),
('stepped', np.bool_),
]))
profile_index_dtype = from_dtype(np.dtype([('i_start', oasis_int),
('i_end', oasis_int),
]))
# Profile entry dtype for CSR storage of programme_node_to_profiles
profile_entry_dtype = np.dtype([('layer_id', oasis_int),
('i_start', oasis_int),
('i_end', oasis_int)])
[docs]
def load_static(static_path):
"""
Load the raw financial data from static_path as numpy ndarray
first check if .bin file is present then try .cvs
try loading profile_step before falling back to normal profile,
:param static_path: str
static_path
:return:
programme : link between nodes
policytc : info on layer
profile : policy profile can be profile_step or profile
xref : node to output_id
items : items (item_id and coverage_id mapping)
coverages : Tiv value for each coverage id
:raise:
FileNotFoundError if one of the static is missing
"""
programme = load_as_ndarray(static_path, 'fm_programme', fm_programme_dtype)
policytc = load_as_ndarray(static_path, 'fm_policytc', fm_policytc_dtype)
profile = load_as_ndarray(static_path, 'fm_profile_step', fm_profile_step_dtype, False)
if len(profile) == 0:
profile = load_as_ndarray(static_path, 'fm_profile', fm_profile_dtype)
stepped = None
else:
stepped = True
profile = profile.copy() # profile is a memmap but we may update some value so we copy it into its own array.
xref = load_as_ndarray(static_path, 'fm_xref', fm_xref_dtype)
items = load_as_ndarray(static_path, 'items', items_dtype, must_exist=False)[['item_id', 'coverage_id']]
coverages = load_as_array(static_path, 'coverages', oasis_float, must_exist=False)
if np.unique(items['coverage_id']).shape[0] != coverages.shape[0]:
# one of the file is missing we default to empty array
items = np.empty(0, dtype=items_dtype)
coverages = np.empty(0, dtype=oasis_float)
return programme, policytc, profile, stepped, xref, items, coverages
@njit(cache=True)
def does_nothing(profile):
"""
evaluate if the profile is just doing nothing to the loss.
this allows to save some memory and compulation time and memory during the calculation
:param profile: np.array of fm_profile_dtype or fm_profile_step_dtype
profile
:return:
boolean : True is profile is actually doing nothing
"""
return ((profile['calcrule_id'] == 100) or
(profile['calcrule_id'] == 12 and almost_equal(profile['deductible1'], 0)) or
(profile['calcrule_id'] == 15 and almost_equal(profile['limit1'], 1)) or
(profile['calcrule_id'] == 16 and almost_equal(profile['deductible1'], 0)) or
(profile['calcrule_id'] == 34 and almost_equal(profile['deductible1'], 0)
and almost_equal(profile['attachment1'], 0)
and almost_equal(profile['share1'], 1))
)
@njit(cache=True)
def idx_to_node(node_idx, node_level_start, start_level, max_level):
"""Convert a flat node index back to (level, agg_id) tuple."""
for level in range(start_level, max_level + 1):
level_start = node_level_start[level]
level_end = node_level_start[level + 1]
if level_start < node_idx <= level_end:
return (nb_oasis_int(level), nb_oasis_int(node_idx - level_start))
# Fallback (shouldn't happen)
return (nb_oasis_int(0), nb_oasis_int(0))
@njit(cache=True)
def get_all_children_csr(node_idx, children_indptr, children_data, items_only, max_nodes):
"""CSR version of get_all_children using NumPy arrays.
Args:
node_idx: Starting node index
children_indptr, children_data: CSR arrays for parent->children relationship
items_only: If True, only return leaf nodes (items)
max_nodes: Maximum possible nodes (for pre-allocation)
Returns:
Tuple of (result_array, result_len) - node indices of children
"""
result = np.empty(max_nodes, dtype=oasis_int)
stack = np.empty(max_nodes, dtype=oasis_int)
result_len = 0
stack_len = 1
stack[0] = node_idx
while stack_len > 0:
stack_len -= 1
current = stack[stack_len]
start = children_indptr[current]
end = children_indptr[current + 1]
if start < end: # has children
if not items_only:
result[result_len] = current
result_len += 1
for i in range(start, end):
stack[stack_len] = children_data[i]
stack_len += 1
else: # leaf node
result[result_len] = current
result_len += 1
return result, result_len
@njit(cache=True)
def get_all_parent_csr(start_nodes, start_len, parents_indptr, parents_data, target_level, node_level_start, max_nodes):
"""CSR version of get_all_parent using NumPy arrays.
Args:
start_nodes: Array of starting node indices
start_len: Number of valid entries in start_nodes
parents_indptr, parents_data: CSR arrays for child->parents relationship
target_level: Stop at nodes at this level
node_level_start: Array to convert index to level
max_nodes: Maximum possible nodes (for pre-allocation)
Returns:
Tuple of (result_array, result_len) - unique node indices at target_level
"""
result = np.empty(max_nodes, dtype=oasis_int)
stack = np.empty(max_nodes, dtype=oasis_int)
visited = np.zeros(max_nodes, dtype=np.uint8) # Track unique results
result_len = 0
stack_len = 0
# Initialize stack with start nodes
for i in range(start_len):
stack[stack_len] = start_nodes[i]
stack_len += 1
while stack_len > 0:
stack_len -= 1
current = stack[stack_len]
start = parents_indptr[current]
end = parents_indptr[current + 1]
if start < end: # has parents
for i in range(start, end):
parent_idx = parents_data[i]
stack[stack_len] = parent_idx
stack_len += 1
# Check if current is at target level
for level in range(node_level_start.shape[0] - 1):
if node_level_start[level] < current <= node_level_start[level + 1]:
if level == target_level and visited[current] == 0:
visited[current] = 1
result[result_len] = current
result_len += 1
break
else:
# No parents - check if at or below target level
for level in range(node_level_start.shape[0] - 1):
if node_level_start[level] < current <= node_level_start[level + 1]:
if level <= target_level and visited[current] == 0:
visited[current] = 1
result[result_len] = current
result_len += 1
break
return result, result_len
@njit(cache=True)
def is_multi_peril(fm_programme):
for i in range(fm_programme.shape[0]):
if fm_programme[i]['level_id'] == 1 and fm_programme[i]['from_agg_id'] != fm_programme[i]['to_agg_id']:
return True
else:
return False
@njit(cache=True)
def get_tiv_csr(children_indices, children_len, items, coverages, node_level_start, start_level):
"""CSR-compatible version of get_tiv using node indices.
Args:
children_indices: Array of child node indices (item level nodes)
children_len: Number of valid entries in children_indices
items: Items array mapping item_id to coverage_id
coverages: Coverage values
node_level_start: Array for converting index to level/agg_id
start_level: The start level (item level)
Returns:
Total insured value for the children
"""
used_cov = np.zeros_like(coverages, dtype=np.uint8)
tiv = 0
item_level_start = node_level_start[start_level]
for i in range(children_len):
node_idx = children_indices[i]
# Convert node index to agg_id (item_id for item level)
agg_id = node_idx - item_level_start
coverage_i = items[agg_id - 1]['coverage_id'] - 1
if not used_cov[coverage_i]:
used_cov[coverage_i] = 1
tiv += coverages[coverage_i]
return tiv
@njit(cache=True)
def prepare_profile_simple(profile, tiv):
# if use TIV convert calcrule to fix deductible
if profile['calcrule_id'] == 4:
profile['calcrule_id'] = 1
profile['deductible1'] *= tiv
elif profile['calcrule_id'] == 6:
profile['calcrule_id'] = 12
profile['deductible1'] *= tiv
elif profile['calcrule_id'] == 18:
profile['calcrule_id'] = 2
profile['deductible1'] *= tiv
elif profile['calcrule_id'] == 21:
profile['calcrule_id'] = 13
profile['deductible1'] *= tiv
elif profile['calcrule_id'] == 9:
profile['calcrule_id'] = 1
profile['deductible1'] *= profile['limit1']
elif profile['calcrule_id'] == 15:
if profile['limit1'] >= 1:
profile['calcrule_id'] = 12
@njit(cache=True)
def prepare_profile_stepped(profile, tiv):
# if use TIV convert calcrule to fix deductible
if profile['calcrule_id'] == 27:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = min(max(profile['payout_start'] * tiv - profile['deductible1'], 0), profile['limit1'])
cond_loss = min(loss * profile['scale2'], profile['limit2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale1'])
elif profile['calcrule_id'] == 28:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
profile['scale1'] += 1
if profile['payout_start'] == 0: # backward compatibility v1.22.x
profile['calcrule_id'] = 281
elif profile['calcrule_id'] == 29:
profile['calcrule_id'] = 27
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = max(profile['payout_start'] * tiv - profile['deductible1'], 0)
cond_loss = min(loss * profile['scale2'], profile['limit2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale1'])
elif profile['calcrule_id'] == 30:
profile['calcrule_id'] = 27
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = max(profile['payout_start'] * profile['limit1'] - profile['deductible1'], 0)
cond_loss = min(loss * profile['scale2'], profile['limit2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale1'])
elif profile['calcrule_id'] == 31:
profile['calcrule_id'] = 27
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
loss = max(profile['payout_start'] - profile['deductible1'], 0)
cond_loss = min(loss * profile['scale2'], profile['limit2'])
profile['payout_start'] = (loss + cond_loss) * (1 + profile['scale1'])
elif profile['calcrule_id'] == 32:
profile['scale1'] += 1
elif profile['calcrule_id'] == 37:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
profile['scale1'] += 1
elif profile['calcrule_id'] == 38:
profile['trigger_start'] *= tiv
if profile['trigger_end'] == 1:
profile['trigger_end'] = np.inf
else:
profile['trigger_end'] *= tiv
profile['scale1'] += 1
else:
prepare_profile_simple(profile, tiv)
@njit(cache=True)
[docs]
def extract_financial_structure(allocation_rule, fm_programme, fm_policytc, fm_profile, stepped, fm_xref, items, coverages):
"""
:param allocation_rule:
option to indicate out the loss are allocated to the output
:param fm_programme:
structure of the levels
:param fm_policytc:
structure of the layers and policy_id to apply
:param fm_profile:
definition of the policy_id
:param fm_xref:
mapping between the output of the allocation and output item_id
:return:
compute_infos:
nodes_array:
node_parents_array:
node_profiles_array:
output_array:
"""
##### profile_id_to_profile_index ####
# policies may have multiple step, create a mapping between profile_id and the start and end index in fm_profile file
max_profile_id = np.max(fm_profile['profile_id'])
profile_id_to_profile_index = np.empty(max_profile_id + 1, dtype=profile_index_dtype)
# is_tiv_profile[profile_id] = 1 if profile requires TIV calculation
is_tiv_profile = np.zeros(max_profile_id + 1, dtype=np.uint8)
last_profile_id = 0 # real profile_id start at 1
for i in range(fm_profile.shape[0]):
if fm_profile[i]['calcrule_id'] in need_tiv_policy:
is_tiv_profile[fm_profile[i]['profile_id']] = 1
profile_id_to_profile_index[fm_profile[i]['profile_id']]['i_end'] = i + 1
if last_profile_id != fm_profile[i]['profile_id']:
profile_id_to_profile_index[fm_profile[i]['profile_id']]['i_start'] = i
last_profile_id = fm_profile[i]['profile_id']
# in fm_programme check if multi-peril and get size of each levels
max_level = np.max(fm_programme['level_id'])
level_node_len = np.zeros(max_level + 1, dtype=oasis_int)
multi_peril = False
for i in range(fm_programme.shape[0]):
programme = fm_programme[i]
if programme['level_id'] == 1 and programme['from_agg_id'] != programme['to_agg_id']:
multi_peril = True
if level_node_len[programme['level_id'] - 1] < programme['from_agg_id']:
level_node_len[programme['level_id'] - 1] = programme['from_agg_id']
if level_node_len[programme['level_id']] < programme['to_agg_id']:
level_node_len[programme['level_id']] = programme['to_agg_id']
##### fm_policytc (level_id agg_id layer_id => profile_id) #####
# Pre-pass: Count TIV profile duplicates needed (first occurrence uses original, subsequent need copies)
# tiv_first_seen[profile_id] = 1 after first occurrence
tiv_first_seen = np.zeros(max_profile_id + 1, dtype=np.uint8)
num_tiv_duplicates = 0
for i in range(fm_policytc.shape[0]):
policytc = fm_policytc[i]
profile_id = nb_oasis_int(policytc['profile_id'])
if is_tiv_profile[profile_id]:
if tiv_first_seen[profile_id]:
# Subsequent occurrence - needs duplicate profile entries
i_start = profile_id_to_profile_index[profile_id]['i_start']
i_end = profile_id_to_profile_index[profile_id]['i_end']
num_tiv_duplicates += i_end - i_start
else:
tiv_first_seen[profile_id] = 1
# Create expanded fm_profile with TIV duplicates pre-allocated
original_fm_profile_len = fm_profile.shape[0]
if num_tiv_duplicates > 0:
new_fm_profile = np.empty(original_fm_profile_len + num_tiv_duplicates, dtype=fm_profile.dtype)
new_fm_profile[:original_fm_profile_len] = fm_profile[:]
fm_profile = new_fm_profile
# fm_xref
if multi_peril: # if single peril we can skip item level computation (level 0)
start_level = nb_oasis_int(0)
else:
start_level = nb_oasis_int(1)
if start_level == max_level: # there is only one level we can switch the computation as if it were a0
allocation_rule = 0
if allocation_rule == 0:
out_level = nb_oasis_int(max_level)
else:
out_level = start_level
# Compute node_level_start for array-based indexing
# node_level_start[level] gives the starting index for nodes at that level
node_level_start = np.zeros(level_node_len.shape[0] + 1, oasis_int)
for i in range(start_level, level_node_len.shape[0]):
node_level_start[i + 1] = node_level_start[i] + level_node_len[i]
total_nodes = node_level_start[-1] + 1
# Build profiles CSR directly from fm_policytc (no intermediate dict)
# Pass 1: Count profiles per node
profiles_count = np.zeros(total_nodes, dtype=oasis_int)
for i in range(fm_policytc.shape[0]):
policytc = fm_policytc[i]
node_idx = node_level_start[policytc['level_id']] + policytc['agg_id']
profiles_count[node_idx] += 1
# Build profiles_indptr
profiles_indptr = np.zeros(total_nodes + 1, dtype=oasis_int)
for i in range(total_nodes):
profiles_indptr[i + 1] = profiles_indptr[i] + profiles_count[i]
# Allocate profiles_data with structured dtype (layer_id, i_start, i_end)
profiles_data = np.empty(profiles_indptr[-1], dtype=profile_entry_dtype)
# Pass 2: Fill profiles_data directly (handling TIV duplicates)
profiles_cursor = np.zeros(total_nodes, dtype=oasis_int)
tiv_first_seen[:] = 0 # Reset for second pass
i_new_fm_profile = original_fm_profile_len
for i in range(fm_policytc.shape[0]):
policytc = fm_policytc[i]
profile_id = nb_oasis_int(policytc['profile_id'])
node_idx = node_level_start[policytc['level_id']] + policytc['agg_id']
i_start = profile_id_to_profile_index[profile_id]['i_start']
i_end = profile_id_to_profile_index[profile_id]['i_end']
# Handle TIV profile duplication
if is_tiv_profile[profile_id]:
if tiv_first_seen[profile_id]:
# Subsequent occurrence - create duplicate in expanded fm_profile
old_i_start, old_i_end = i_start, i_end
i_start = i_new_fm_profile
for j in range(old_i_start, old_i_end):
fm_profile[i_new_fm_profile] = fm_profile[j]
i_new_fm_profile += 1
i_end = i_new_fm_profile
else:
tiv_first_seen[profile_id] = 1
# Fill CSR data
pos = profiles_indptr[node_idx] + profiles_cursor[node_idx]
profiles_data[pos]['layer_id'] = policytc['layer_id']
profiles_data[pos]['i_start'] = i_start
profiles_data[pos]['i_end'] = i_end
profiles_cursor[node_idx] += 1
# Sort profiles within each node by layer_id (in-place bubble sort)
temp_profile = np.empty(1, dtype=profiles_data.dtype)
for node_idx in range(total_nodes):
start = profiles_indptr[node_idx]
end = profiles_indptr[node_idx + 1]
n = end - start
if n > 1:
# Bubble sort - simple and works well for small n
for j in range(n):
for k in range(start, end - 1 - j):
if profiles_data[k]['layer_id'] > profiles_data[k + 1]['layer_id']:
# Swap entries
temp_profile[0] = profiles_data[k]
profiles_data[k] = profiles_data[k + 1]
profiles_data[k + 1] = temp_profile[0]
##### xref #####
# Create 2D array mapping (agg_id, layer_id) => output_id for out_level nodes
# output_id_arr[agg_id, layer_id] = output_id (0 = no output)
max_agg_id_out = level_node_len[out_level]
max_layer_id = np.max(fm_xref['layer_id']) if fm_xref.shape[0] > 0 else 1
output_id_arr = np.zeros((max_agg_id_out + 1, max_layer_id + 1), dtype=oasis_int)
output_len = 0
for i in range(fm_xref.shape[0]):
xref = fm_xref[i]
if output_len < xref['output']:
output_len = nb_oasis_int(xref['output'])
output_id_arr[xref['agg_id'], xref['layer_id']] = xref['output']
##### programme ####
# node_layers will contain the number of layers for each node
# Using array indexed by node_level_start[level] + agg_id (0 = not set)
node_layers_arr = np.zeros(total_nodes, dtype=oasis_int)
# node_cross_layers tracks cross-layer nodes (0 = false, 1 = true)
node_cross_layers_arr = np.zeros(total_nodes, dtype=np.uint8)
# layer_source tracks where each node gets its layer list from (index to source node)
# If layer_source[idx] == idx, node uses its own profiles from programme_node_to_profiles
# Otherwise, it inherits from layer_source[idx]
layer_source = np.arange(total_nodes, dtype=oasis_int) # default: each node is its own source
# fill up node_layers with the number of policies for each node
for programme in fm_programme:
parent_level = nb_oasis_int(programme['level_id'])
parent_agg = nb_oasis_int(programme['to_agg_id'])
parent_idx = node_level_start[parent_level] + parent_agg
if node_layers_arr[parent_idx] == 0:
# Use CSR format: profiles_indptr[idx+1] - profiles_indptr[idx] = count
node_layers_arr[parent_idx] = profiles_indptr[parent_idx + 1] - profiles_indptr[parent_idx]
# layer_source[parent_idx] = parent_idx already (uses own profiles)
# Build parent/child CSR arrays directly from fm_programme (no intermediate dicts)
# This uses a two-pass approach: count first, then fill
# Pass 1: Count children per parent and parents per child
children_count = np.zeros(total_nodes, dtype=oasis_int)
parents_count = np.zeros(total_nodes, dtype=oasis_int)
parents_len = 0
for i in range(fm_programme.shape[0]):
programme = fm_programme[i]
if programme['level_id'] > start_level:
parent_idx = node_level_start[programme['level_id']] + programme['to_agg_id']
if programme['from_agg_id'] > 0:
child_idx = node_level_start[programme['level_id'] - 1] + programme['from_agg_id']
else:
child_idx = node_level_start[start_level] + (-programme['from_agg_id'])
children_count[parent_idx] += 1
parents_count[child_idx] += 1
parents_len += 1
# Build CSR indptr arrays
children_indptr = np.zeros(total_nodes + 1, dtype=oasis_int)
for i in range(total_nodes):
children_indptr[i + 1] = children_indptr[i] + children_count[i]
parents_indptr = np.zeros(total_nodes + 1, dtype=oasis_int)
for i in range(total_nodes):
parents_indptr[i + 1] = parents_indptr[i] + parents_count[i]
# Allocate CSR data arrays
children_data = np.empty(children_indptr[-1], dtype=oasis_int)
parents_data = np.empty(parents_indptr[-1], dtype=oasis_int)
# Pass 2: Fill CSR data arrays (iterate level by level like original to preserve order)
children_cursor = np.zeros(total_nodes, dtype=oasis_int)
parents_cursor = np.zeros(total_nodes, dtype=oasis_int)
# Iterate from max_level down to start_level to match original parent ordering
for level in range(max_level, start_level, -1):
for i in range(fm_programme.shape[0]):
programme = fm_programme[i]
if programme['level_id'] == level:
parent_idx = node_level_start[programme['level_id']] + programme['to_agg_id']
if programme['from_agg_id'] > 0:
child_idx = node_level_start[programme['level_id'] - 1] + programme['from_agg_id']
else:
child_idx = node_level_start[start_level] + (-programme['from_agg_id'])
# Add child to parent's children list
c_pos = children_indptr[parent_idx] + children_cursor[parent_idx]
children_data[c_pos] = child_idx
children_cursor[parent_idx] += 1
# Add parent to child's parents list (insert at front for correct order)
# We fill from the end to maintain insertion order (like insert(0, parent))
p_start = parents_indptr[child_idx]
p_end = parents_indptr[child_idx + 1]
p_pos = p_end - 1 - parents_cursor[child_idx]
parents_data[p_pos] = parent_idx
parents_cursor[child_idx] += 1
# Now process layer propagation and cross-layer detection using CSR
# Go through each level from top to bottom
for level in range(max_level, start_level, -1):
for i in range(fm_programme.shape[0]):
programme = fm_programme[i]
if programme['level_id'] == level:
parent_idx = node_level_start[programme['level_id']] + programme['to_agg_id']
if programme['from_agg_id'] > 0:
child_idx = node_level_start[programme['level_id'] - 1] + programme['from_agg_id']
else:
child_idx = node_level_start[start_level] + (-programme['from_agg_id'])
if node_layers_arr[child_idx] == 0 or node_layers_arr[child_idx] <= node_layers_arr[parent_idx]:
node_layers_arr[child_idx] = node_layers_arr[parent_idx]
# Child inherits layer source from parent (follow chain if parent also inherits)
layer_source[child_idx] = layer_source[parent_idx]
elif node_layers_arr[child_idx] > node_layers_arr[parent_idx]: # cross layer node
# Use CSR-based get_all_parent_csr instead of dict-based get_all_parent
start_nodes = np.array([parent_idx], dtype=oasis_int)
grand_parents_arr, grand_parents_len = get_all_parent_csr(
start_nodes, 1, parents_indptr, parents_data,
max_level, node_level_start, total_nodes)
for gp_i in range(grand_parents_len):
gp_idx = grand_parents_arr[gp_i]
if node_layers_arr[gp_idx] < node_layers_arr[child_idx]:
node_cross_layers_arr[gp_idx] = nb_oasis_int(1)
node_layers_arr[parent_idx] = node_layers_arr[child_idx]
# compute number of steps (steps), max size of node to compute (compute_len)
# Note: node_level_start was computed earlier for array-based indexing
steps = max_level + (1 - start_level)
compute_len = node_level_start[-1] + steps + level_node_len[-1] + 1
# Compute output_array_size using array-based iteration
output_array_size = 0
for agg_id in range(1, level_node_len[out_level] + 1):
node_idx = node_level_start[out_level] + agg_id
output_array_size += node_layers_arr[node_idx]
nodes_array = np.empty(node_level_start[-1] + 1, dtype=nodes_array_dtype)
node_parents_array = np.empty(parents_len, dtype=oasis_int)
node_profiles_array = np.zeros(fm_policytc.shape[0] + 1, dtype=profile_index_dtype)
output_array = np.zeros(output_array_size, dtype=oasis_int)
node_i = 1
children_i = 1
parents_i = 0
profile_i = 1
loss_i = 0
extra_i = 0
output_i = 0
for level in range(start_level, max_level + 1):
for agg_id in range(1, level_node_len[level] + 1):
node = nodes_array[node_i]
node['node_id'] = node_i
node_i += 1
node['level_id'] = level
node['agg_id'] = agg_id
# layers
node_idx = node_level_start[level] + agg_id
node['layer_len'] = node_layers_arr[node_idx]
node['cross_layer_profile'] = 0 # set default to 0 change if it is a cross_layer_profile after
node['loss'], loss_i = loss_i, loss_i + node['layer_len']
if level == start_level:
node['net_loss'], loss_i = loss_i, loss_i + 1
node['extra'] = null_index
node['is_reallocating'] = 0
# children - use CSR format
num_children = children_indptr[node_idx + 1] - children_indptr[node_idx]
if num_children > 0:
node['children'], children_i = children_i, children_i + 1 + num_children
else:
node['children'] = 0
# parent - use CSR format
p_start = parents_indptr[node_idx]
p_end = parents_indptr[node_idx + 1]
num_parents = p_end - p_start
if num_parents > 0:
node['parent_len'] = num_parents
node['parent'] = parents_i
for pi in range(p_start, p_end):
node_parents_array[parents_i], parents_i = parents_data[pi], nb_oasis_int(parents_i + 1)
else:
node['parent_len'] = 0
# profiles - use CSR format
prof_start = profiles_indptr[node_idx]
prof_end = profiles_indptr[node_idx + 1]
num_profiles = prof_end - prof_start
if num_profiles > 0:
if node_cross_layers_arr[node_idx]:
node['profile_len'] = node['layer_len']
node['cross_layer_profile'] = 1
else:
node['profile_len'] = num_profiles
node['profiles'] = profile_i
# Iterate through profiles from CSR (already sorted by layer_id)
for prof_idx in range(prof_start, prof_end):
layer_id = profiles_data[prof_idx]['layer_id']
i_start = profiles_data[prof_idx]['i_start']
i_end = profiles_data[prof_idx]['i_end']
node_profile, profile_i = node_profiles_array[profile_i], profile_i + 1
node_profile['i_start'] = i_start
node_profile['i_end'] = i_end
# if use TIV we compute it and precompute % TIV values
for profile_index in range(i_start, i_end):
if fm_profile[profile_index]['calcrule_id'] in need_tiv_policy:
all_children_arr, all_children_len = get_all_children_csr(
node_idx, children_indptr, children_data, True, total_nodes)
tiv = get_tiv_csr(all_children_arr, all_children_len, items, coverages,
node_level_start, start_level)
break
else:
tiv = 0
for profile_index in range(i_start, i_end):
profile = fm_profile[profile_index]
if stepped is None:
prepare_profile_simple(profile, tiv)
else:
prepare_profile_stepped(profile, tiv)
if does_nothing(profile):
# only non step policy can "does_nothing" so this is safe
node_profile['i_end'] = node_profile['i_start']
# check if we need to compute extras (min and max ded policies)
for profile_index in range(i_start, i_end):
if fm_profile[profile_index]['calcrule_id'] in need_extras:
node['is_reallocating'] = allocation_rule == 2 and fm_profile[profile_index]['calcrule_id'] != 27
items_child_arr, items_child_len = get_all_children_csr(
node_idx, children_indptr, children_data, True, total_nodes)
all_parent_arr, all_parent_len = get_all_parent_csr(
items_child_arr, items_child_len, parents_indptr, parents_data,
level, node_level_start, total_nodes)
for pi in range(all_parent_len):
parent_node_idx = all_parent_arr[pi]
all_children_arr, all_children_len = get_all_children_csr(
parent_node_idx, children_indptr, children_data, False, total_nodes)
for ci in range(all_children_len):
child_node_idx = all_children_arr[ci]
child = nodes_array[child_node_idx]
if child['extra'] == null_index:
child['extra'], extra_i = extra_i, extra_i + node['layer_len']
break
else: # item level has no profile
node['profile_len'] = 1
node['profiles'] = 0
if level == out_level:
# Check if any layer for this agg_id has an output_id
has_output = False
for layer_idx in range(output_id_arr.shape[1]):
if output_id_arr[agg_id, layer_idx] != 0:
has_output = True
break
if has_output:
node['output_ids'], output_i = output_i, output_i + node['layer_len']
# Get the source node for layers (could be this node or an ancestor)
source_idx = layer_source[node_idx]
# Use CSR format directly with source_idx
src_prof_start = profiles_indptr[source_idx]
src_prof_end = profiles_indptr[source_idx + 1]
for i in range(src_prof_end - src_prof_start):
layer_id = profiles_data[src_prof_start + i]['layer_id']
if output_id_arr[agg_id, layer_id] != 0:
output_array[node['output_ids'] + i] = output_id_arr[agg_id, layer_id]
else:
raise KeyError("Some output nodes are missing output_ids")
compute_infos = np.empty(1, dtype=compute_info_dtype)
compute_info = compute_infos[0]
compute_info['allocation_rule'] = allocation_rule
compute_info['max_level'] = max_level
compute_info['node_len'] = node_i
compute_info['children_len'] = children_i
compute_info['parents_len'] = parents_i
compute_info['profile_len'] = profile_i
compute_info['loss_len'] = loss_i
compute_info['extra_len'] = extra_i
compute_info['compute_len'] = compute_len
compute_info['start_level'] = start_level
compute_info['items_len'] = level_node_len[0]
compute_info['output_len'] = output_len
compute_info['stepped'] = stepped is not None
compute_info['max_layer'] = max(nodes_array['layer_len'][1:])
return compute_infos, nodes_array, node_parents_array, node_profiles_array, output_array, fm_profile
[docs]
def create_financial_structure(allocation_rule, static_path):
"""
:param allocation_rule: int
back-allocation rule
:param static_path: string
path to the static files
:return:
compute_queue : the step of the computation to perform on each event
node_indexes : map node to index of item in result array
index_dependencies : map node to its dependent indexes
node_profile : map node to profile
output_item_index : list of item_id, index to put in the output
"""
if allocation_rule not in allowed_allocation_rule:
raise ValueError(f"allocation_rule must be in {allowed_allocation_rule}, found {allocation_rule}")
if allocation_rule == 3:
allocation_rule = 2
fm_programme, fm_policytc, fm_profile, stepped, fm_xref, items, coverages = load_static(static_path)
financial_structure = extract_financial_structure(allocation_rule, fm_programme, fm_policytc, fm_profile,
stepped, fm_xref, items, coverages)
compute_info, nodes_array, node_parents_array, node_profiles_array, output_array, fm_profile = financial_structure
logger.info(f'nodes_array has {len(nodes_array)} elements')
logger.info(f'compute_info : {dict(zip(compute_info.dtype.names, compute_info[0]))}')
np.save(os.path.join(static_path, f'compute_info_{allocation_rule}'), compute_info)
np.save(os.path.join(static_path, f'nodes_array_{allocation_rule}'), nodes_array)
np.save(os.path.join(static_path, f'node_parents_array_{allocation_rule}'), node_parents_array)
np.save(os.path.join(static_path, f'node_profiles_array_{allocation_rule}'), node_profiles_array)
np.save(os.path.join(static_path, f'output_array_{allocation_rule}'), output_array)
np.save(os.path.join(static_path, 'fm_profile'), fm_profile)
[docs]
def load_financial_structure(allocation_rule, static_path):
compute_info = np.load(os.path.join(static_path, f'compute_info_{allocation_rule}.npy'), mmap_mode='r')
nodes_array = np.load(os.path.join(static_path, f'nodes_array_{allocation_rule}.npy'), mmap_mode='r')
node_parents_array = np.load(os.path.join(static_path, f'node_parents_array_{allocation_rule}.npy'), mmap_mode='r')
node_profiles_array = np.load(os.path.join(static_path, f'node_profiles_array_{allocation_rule}.npy'), mmap_mode='r')
output_array = np.load(os.path.join(static_path, f'output_array_{allocation_rule}.npy'), mmap_mode='r')
fm_profile = np.load(os.path.join(static_path, 'fm_profile.npy'), mmap_mode='r')
return compute_info, nodes_array, node_parents_array, node_profiles_array, output_array, fm_profile