Source code for oasislmf.preparation.il_inputs
__all__ = [
'get_calc_rule_ids',
'get_grouped_fm_profile_by_level_and_term_group',
'get_grouped_fm_terms_by_level_and_term_group',
'get_oed_hierarchy',
'get_il_input_items',
]
import contextlib
import copy
import itertools
import os
import time
import warnings
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
from ods_tools.oed import fill_empty, BLANK_VALUES
from oasislmf.preparation.summaries import get_useful_summary_cols
from oasislmf.pytools.common.data import (fm_policytc_headers, fm_policytc_dtype,
fm_profile_headers, fm_profile_dtype,
fm_profile_step_headers, fm_profile_step_dtype,
fm_programme_headers, fm_programme_dtype,
fm_xref_headers, fm_xref_dtype,
DTYPE_IDX, calcrule_id, profile_id, layer_id)
from oasislmf.pytools.converters.csvtobin.utils.common import df_to_ndarray
from oasislmf.utils.calc_rules import get_calc_rules
from oasislmf.utils.coverages import SUPPORTED_COVERAGE_TYPES
from oasislmf.utils.data import get_ids, DEFAULT_LOC_FIELD_TYPES
from oasislmf.utils.defaults import (OASIS_FILES_PREFIXES,
get_default_accounts_profile, get_default_exposure_profile,
get_default_fm_aggregation_profile, SOURCE_IDX)
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.fm import (CALCRULE_ASSIGNMENT_METHODS, COVERAGE_AGGREGATION_METHODS,
DEDUCTIBLE_AND_LIMIT_TYPES, FM_LEVELS, FML_ACCALL, STEP_TRIGGER_TYPES,
SUPPORTED_FM_LEVELS, FM_TERMS, GROUPED_SUPPORTED_FM_LEVELS)
from oasislmf.utils.log import oasis_log
from oasislmf.utils.path import as_path
from oasislmf.utils.profiles import (get_default_step_policies_profile,
get_grouped_fm_profile_by_level_and_term_group,
get_grouped_fm_terms_by_level_and_term_group,
get_oed_hierarchy)
pd.options.mode.chained_assignment = None
warnings.simplefilter(action='ignore', category=FutureWarning)
# convertion from np dtype to pandas dtype
fm_policytc_pd_dtype = {col: dtype for col, (dtype, _) in fm_policytc_dtype.fields.items()}
fm_profile_pd_dtype = {col: dtype for col, (dtype, _) in fm_profile_dtype.fields.items()}
fm_profile_step_pd_dtype = {col: dtype for col, (dtype, _) in fm_profile_step_dtype.fields.items()}
fm_programme_pd_dtype = {col: dtype for col, (dtype, _) in fm_programme_dtype.fields.items()}
fm_xref_pd_dtype = {col: dtype for col, (dtype, _) in fm_xref_dtype.fields.items()}
# Define a list of all supported OED coverage types in the exposure
supp_cov_type_ids = [v['id'] for v in SUPPORTED_COVERAGE_TYPES.values()]
policytc_cols = ['profile_id', 'calcrule_id', 'deductible', 'deductible_min', 'deductible_max', 'attachment', 'limit', 'share']
profile_cols_map = {
'deductible': 'deductible1',
'deductible_min': 'deductible2',
'deductible_max': 'deductible3',
'attachment': 'attachment1',
'limit': 'limit1',
'share': 'share1'
}
cross_layer_level = {FML_ACCALL, }
risk_disaggregation_term = {'deductible', 'deductible_min', 'deductible_max', 'attachment', 'limit'}
fm_term_ids = [fm_term['id'] for fm_term in FM_TERMS.values()]
BITYPE_columns = {"BIWaitingPeriodType", "BIPOIType"}
BIPOI_default_exeption = "BIPOI"
# Calcrule ID for passthrough (no financial terms applied)
PASSTHROUGH_CALCRULE_ID = 100
def prepare_ded_and_limit(level_df):
simplify_no_terms = {
'deductible': ['ded_type', 'ded_code'],
'limit': ['lim_type', 'lim_code']
}
level_df['need_tiv'] = False
for term, type_and_code in simplify_no_terms.items():
if term in level_df.columns:
for elm in type_and_code:
if elm in level_df.columns:
level_df.loc[level_df[term] == 0, elm] = 0
if elm[-5:] == '_type':
level_df['need_tiv'] |= level_df[elm].isin({2, 3}) # 2 or 3 type means term depend on tiv
else:
level_df = level_df.drop(columns=type_and_code, errors='ignore')
return level_df
[docs]
def get_calc_rule_ids(il_inputs_calc_rules_df, calc_rule_type):
"""
Merges IL inputs with the correct calc_rule table and returns calc rule IDs.
Args:
il_inputs_calc_rules_df (pandas.DataFrame): IL input items dataframe.
calc_rule_type (str): Type of calc_rule to look for.
Returns:
pandas.Series: Series of calculation rule IDs.
"""
il_inputs_calc_rules_df = il_inputs_calc_rules_df.copy()
calc_rules_df, calc_rule_term_info = get_calc_rules(calc_rule_type)
calc_rules_df = calc_rules_df.drop(columns=['desc', 'id_key'], errors='ignore')
terms = []
terms_indicators = []
no_terms = {
'deductible': ['ded_type', 'ded_code'],
'limit': ['lim_type', 'lim_code']
}
for term in calc_rule_term_info['terms']:
if term in il_inputs_calc_rules_df.columns:
terms.append(term)
terms_indicators.append('{}_gt_0'.format(term))
else:
calc_rules_df = calc_rules_df[calc_rules_df['{}_gt_0'.format(term)] == 0].drop(columns=['{}_gt_0'.format(term)])
for term in calc_rule_term_info['types_and_codes']:
if term in il_inputs_calc_rules_df.columns:
il_inputs_calc_rules_df[term] = il_inputs_calc_rules_df[term].fillna(0).astype('uint8')
else:
calc_rules_df = calc_rules_df[calc_rules_df[term] == 0].drop(columns=[term])
for term, type_and_code in no_terms.items():
if f'{term}_gt_0' in calc_rules_df.columns:
for elm in type_and_code:
if elm in il_inputs_calc_rules_df.columns:
il_inputs_calc_rules_df.loc[il_inputs_calc_rules_df[term] == 0, elm] = 0
else:
for elm in type_and_code:
if elm in calc_rules_df.columns:
calc_rules_df = calc_rules_df[calc_rules_df[elm] == 0].drop(columns=[elm])
il_inputs_calc_rules_df.loc[:, terms_indicators] = np.where(il_inputs_calc_rules_df[terms] > 0, 1, 0)
merge_col = list(set(il_inputs_calc_rules_df.columns).intersection(calc_rules_df.columns).difference({'calcrule_id'}))
if len(merge_col):
calcrule_ids = (
il_inputs_calc_rules_df[merge_col].reset_index()
.merge(calc_rules_df[merge_col + ['calcrule_id']].drop_duplicates(), how='left', on=merge_col)
).set_index('index')['calcrule_id'].fillna(0)
else:
return PASSTHROUGH_CALCRULE_ID # no term we return pass through
if 0 in calcrule_ids.unique():
_cols = list(set(['PortNumber', 'AccNumber', 'LocNumber'] + merge_col).intersection(il_inputs_calc_rules_df.columns))
no_match_keys = il_inputs_calc_rules_df.loc[calcrule_ids == 0, _cols].drop_duplicates()
err_msg = 'Calculation Rule mapping error, non-matching keys:\n{}'.format(no_match_keys)
raise OasisException(err_msg)
return calcrule_ids
def get_profile_ids(il_inputs_df):
"""
Returns a Numpy array of policy TC IDs from a table of IL input items.
Args:
il_inputs_df (pandas.DataFrame): IL input items dataframe.
Returns:
numpy.ndarray: Numpy array of policy TC IDs.
"""
factor_col = list(set(il_inputs_df.columns).intersection(policytc_cols).difference({'profile_id', }))
return il_inputs_df.groupby(factor_col, sort=False, observed=True, dropna=False).ngroup().astype('int32') + 1
def __split_fm_terms_by_risk(df):
"""
Adjusts financial terms by the number of risks.
For example, deductible is split into each individual building risk.
Args:
df (pandas.DataFrame): The dataframe for an FM level.
"""
for term in risk_disaggregation_term.intersection(set(df.columns)):
if f'{term[:3]}_type' in df.columns:
code_filter = df[f'{term[:3]}_type'] == 0
df.loc[code_filter, term] /= df.loc[code_filter, 'NumberOfRisks']
else:
df[term] /= df['NumberOfRisks']
def get_cond_info(locations_df, accounts_df):
pol_info = {}
level_conds = {}
extra_accounts = []
default_cond_tag = '0'
if 'CondTag' in locations_df.columns:
fill_empty(locations_df, 'CondTag', default_cond_tag)
loc_condkey_df = locations_df.loc[locations_df['CondTag'] != default_cond_tag, ['acc_id', 'CondTag']].drop_duplicates()
else:
loc_condkey_df = pd.DataFrame([], columns=['acc_id', 'CondTag'])
if 'CondTag' in accounts_df.columns:
fill_empty(accounts_df, 'CondTag', default_cond_tag)
acc_condkey_df = accounts_df.loc[accounts_df['CondTag'] != '', ['acc_id', 'CondTag']].drop_duplicates()
condkey_match_df = acc_condkey_df.merge(loc_condkey_df, how='outer', indicator=True)
missing_condkey_df = condkey_match_df.loc[condkey_match_df['_merge'] == 'right_only', ['acc_id', 'CondTag']]
else:
acc_condkey_df = pd.DataFrame([], columns=['acc_id', 'CondTag'])
missing_condkey_df = loc_condkey_df
if missing_condkey_df.shape[0]:
raise OasisException(f'Those condtag are present in locations but missing in the account file:\n{missing_condkey_df}')
if acc_condkey_df.shape[0]:
if 'CondTag' not in locations_df.columns:
locations_df['CondTag'] = default_cond_tag
# we get information about cond from accounts_df
cond_tags = {} # information about each cond tag
account_layer_exclusion = {} # for each account and layer, store info about cond class exclusion
if 'CondPriority' in accounts_df.columns:
fill_empty(accounts_df, 'CondPriority', 1)
else:
accounts_df['CondPriority'] = 1
if 'CondPeril' in accounts_df.columns:
fill_empty(accounts_df, 'CondPeril', '')
else:
accounts_df['CondPeril'] = ''
for acc_rec in accounts_df.to_dict(orient="records"):
cond_tag_key = (acc_rec['acc_id'], acc_rec['CondTag'])
cond_number_key = (acc_rec['acc_id'], acc_rec['CondTag'], acc_rec['CondNumber'])
cond_tag = cond_tags.setdefault(cond_tag_key, {'CondPriority': acc_rec['CondPriority'] or 1, 'CondPeril': acc_rec['CondPeril']})
cond_tag.setdefault('layers', {})[acc_rec['layer_id']] = {'CondNumber': cond_number_key}
exclusion_cond_tags = account_layer_exclusion.setdefault(acc_rec['acc_id'], {}).setdefault(acc_rec['layer_id'],
set())
pol_info[(acc_rec['acc_id'], acc_rec['layer_id'])] = [acc_rec['PolNumber'], acc_rec['LayerNumber'], acc_rec['acc_idx']]
if acc_rec.get('CondClass') == 1:
exclusion_cond_tags.add(acc_rec['CondTag'])
# we get the list of loc for each cond_tag
loc_conds = {}
KEY_INDEX = 0
PRIORITY_INDEX = 1
for loc_rec in locations_df.to_dict(orient="records"):
loc_key = loc_rec['loc_id']
cond_key = (loc_rec['acc_id'], loc_rec.get('CondTag', default_cond_tag))
if cond_key in cond_tags:
cond_tag = cond_tags[cond_key]
else:
cond_tag = {'CondPriority': 1, 'layers': {}}
cond_tags[cond_key] = cond_tag
cond_location = cond_tag.setdefault('locations', set())
cond_location.add(loc_key)
cond_tag_priority = cond_tag['CondPriority']
conds = loc_conds.setdefault(loc_key, [])
for i, cond in enumerate(conds):
if cond_tag_priority < cond[PRIORITY_INDEX]:
conds.insert(i, (cond_key, cond_tag_priority))
break
elif cond_tag_priority == cond[PRIORITY_INDEX] and cond_key != cond[KEY_INDEX]:
raise OasisException(f"{cond_key} and {cond[KEY_INDEX]} have same priority in {loc_key}")
else:
conds.append((cond_key, cond_tag_priority))
# at first we just want condtag for each level
for cond_key, cond_info in cond_tags.items():
acc_id, cond_tag = cond_key
cond_level_start = 1
for loc_key in cond_info.get('locations', set()):
for i, (cond_key_i, _) in enumerate(loc_conds[loc_key]):
if cond_key_i == cond_key:
cond_level_start = max(cond_level_start, i + 1)
break
cond_info['cond_level_start'] = cond_level_start
for layer_id, exclusion_conds in account_layer_exclusion[acc_id].items():
if layer_id not in cond_info['layers']:
PolNumber, LayerNumber, acc_idx = pol_info[(acc_id, layer_id)]
if exclusion_conds:
extra_accounts.append({
'acc_idx': acc_idx,
'acc_id': acc_id,
'PolNumber': PolNumber,
'LayerNumber': LayerNumber,
'CondTag': cond_tag,
'layer_id': layer_id,
'CondNumber': 'FullFilter',
'CondDed6All': 1,
'CondDedType6All': 1,
'CondPeril': 'AA1',
})
else:
extra_accounts.append({
'acc_idx': acc_idx,
'acc_id': acc_id,
'PolNumber': PolNumber,
'LayerNumber': LayerNumber,
'CondTag': cond_tag,
'layer_id': layer_id,
'CondNumber': '',
'CondPeril': 'AA1',
})
level_conds.setdefault(cond_level_start, set()).add(cond_key)
return level_conds, extra_accounts
def get_levels(locations_df, accounts_df):
if locations_df is not None and not {"CondTag", "CondNumber"}.difference(accounts_df.columns):
level_conds, extra_accounts = get_cond_info(locations_df, accounts_df)
else:
level_conds = []
for group_name, group_info in copy.deepcopy(GROUPED_SUPPORTED_FM_LEVELS).items():
if group_info['oed_source'] == 'location':
if locations_df is not None:
locations_df['layer_id'] = 1
yield locations_df, list(group_info['levels'].items()), group_info.get('fm_peril_field'), None
elif group_info['oed_source'] == 'account':
if group_name == 'cond' and level_conds:
loc_conds_df = locations_df[['loc_id', 'acc_id', 'CondTag']].drop_duplicates()
for stage, cond_keys in level_conds.items():
cond_filter_df = (pd.DataFrame(cond_keys, columns=['acc_id', 'CondTag'])
.sort_values(by=['acc_id', 'CondTag']))
loc_conds_df_filter = cond_filter_df[['acc_id', 'CondTag']].drop_duplicates().merge(loc_conds_df, how='left')
yield (cond_filter_df.merge(pd.concat([accounts_df, pd.DataFrame(extra_accounts)]), how='left'),
group_info['levels'].items(),
group_info['fm_peril_field'],
loc_conds_df_filter[['loc_id', 'CondTag']])
else:
yield accounts_df, group_info['levels'].items(), group_info.get('fm_peril_field'), None
def get_level_term_info(term_df_source, level_column_mapper, level_id, step_level, fm_peril_field, oed_schema):
level_terms = set()
terms_maps = {}
coverage_group_map = {}
fm_group_tiv = {}
non_zero_default = {}
for ProfileElementName, term_info in level_column_mapper[level_id].items():
if term_info.get("FMTermType") == "tiv":
continue
if ProfileElementName == BIPOI_default_exeption:
default_value = 0.
else:
default_value = oed_schema.get_default(ProfileElementName)
if ProfileElementName not in term_df_source.columns:
if default_value == 0 or default_value in BLANK_VALUES:
continue
else:
non_zero_default[ProfileElementName] = [term_info['FMTermType'].lower(), default_value]
continue
elif ProfileElementName not in BITYPE_columns:
fill_empty(term_df_source, ProfileElementName, default_value)
if pd.isna(default_value):
non_default_val = ~term_df_source[ProfileElementName].isna()
else:
non_default_val = (term_df_source[ProfileElementName] != default_value)
if 'FMProfileStep' in term_info:
profile_steps = term_info["FMProfileStep"]
if isinstance(profile_steps, int):
profile_steps = [profile_steps]
valid_step_trigger_types = term_df_source.loc[(term_df_source['StepTriggerType'].isin(profile_steps))
& non_default_val, 'StepTriggerType'].unique()
if len(valid_step_trigger_types):
level_terms.add(term_info['FMTermType'].lower())
for step_trigger_type in valid_step_trigger_types:
coverage_aggregation_method = STEP_TRIGGER_TYPES[step_trigger_type]['coverage_aggregation_method']
calcrule_assignment_method = STEP_TRIGGER_TYPES[step_trigger_type]['calcrule_assignment_method']
for coverage_type_id in supp_cov_type_ids:
FMTermGroupID = COVERAGE_AGGREGATION_METHODS[coverage_aggregation_method].get(coverage_type_id)
if FMTermGroupID is None: # step policy not supported for this coverage
continue
if (step_trigger_type, coverage_type_id) in coverage_group_map:
if coverage_group_map[(step_trigger_type, coverage_type_id)] != FMTermGroupID:
raise OasisException(
f"multiple coverage_type_id {(step_trigger_type, coverage_type_id)} "
f"assigned to the different FMTermGroupID "
f"{(FMTermGroupID, coverage_group_map[(step_trigger_type, coverage_type_id)])}")
else:
coverage_group_map[(step_trigger_type, coverage_type_id)] = FMTermGroupID
terms_map = terms_maps.setdefault((FMTermGroupID, step_trigger_type), {fm_peril_field: 'fm_peril'} if fm_peril_field else {})
if CALCRULE_ASSIGNMENT_METHODS[calcrule_assignment_method][FMTermGroupID]:
terms_map[ProfileElementName] = term_info['FMTermType'].lower()
else:
if not (non_default_val).any():
continue
level_terms.add(term_info['FMTermType'].lower())
coverage_type_ids = term_info.get("CoverageTypeID", supp_cov_type_ids)
FMTermGroupID = term_info.get('FMTermGroupID', 1)
if step_level:
term_key = (FMTermGroupID, 0)
else:
term_key = FMTermGroupID
if isinstance(coverage_type_ids, int):
coverage_type_ids = [coverage_type_ids]
fm_group_tiv[FMTermGroupID] = coverage_type_ids
for coverage_type_id in coverage_type_ids:
if step_level:
coverage_group_key = (0, coverage_type_id)
else:
coverage_group_key = coverage_type_id
if coverage_type_id in coverage_group_map:
if coverage_group_map[coverage_group_key] != FMTermGroupID:
raise OasisException(
f"multiple coverage_type_id {coverage_group_key}"
f"assigned to the different FMTermGroupID {(FMTermGroupID, coverage_group_map[coverage_group_key])}")
else:
coverage_group_map[coverage_group_key] = FMTermGroupID
terms_maps.setdefault(term_key, {fm_peril_field: 'fm_peril'} if fm_peril_field else {})[
ProfileElementName] = term_info['FMTermType'].lower()
if level_terms:
for ProfileElementName, (term, default_value) in non_zero_default.items():
term_df_source[ProfileElementName] = default_value
level_terms.add(term)
for terms_map in terms_maps.values():
terms_map[ProfileElementName] = term
return level_terms, terms_maps, coverage_group_map, fm_group_tiv
def associate_items_peril_to_policy_peril(item_perils, policy_df, fm_peril_col, oed_schema):
"""
Maps item perils to policy perils and merges the mapping with policies.
For each peril_id in item_perils, maps it to policy perils so that each line
will have a peril_id that can be used directly as a key when merging with gul_input_df.
Args:
item_perils (pandas.DataFrame): All peril IDs from gul_input_df.
policy_df (pandas.DataFrame): The dataframe with the policies.
fm_peril_col (str): The name of the column that defines the policy peril filter.
oed_schema: The schema object used to map perils and subperils.
Returns:
pandas.DataFrame: Policy dataframe merged with peril mapping.
"""
fm_perils = policy_df[[fm_peril_col]].drop_duplicates()
peril_map_df = pd.merge(fm_perils, item_perils, how='cross')
peril_map_df = peril_map_df[oed_schema.peril_filtering(peril_map_df['peril_id'], peril_map_df[fm_peril_col])]
return policy_df.merge(peril_map_df)
@oasis_log
[docs]
def get_il_input_items(
gul_inputs_df,
exposure_data,
target_dir,
logger,
exposure_profile=get_default_exposure_profile(),
accounts_profile=get_default_accounts_profile(),
fm_aggregation_profile=get_default_fm_aggregation_profile(),
do_disaggregation=True,
oasis_files_prefixes=OASIS_FILES_PREFIXES['il'],
chunksize=(2 * 10 ** 5),
intermediary_csv=False,
):
"""
Generates IL (Insured Loss) input items by applying financial terms to GUL items.
This function builds the Financial Module (FM) structure by processing insurance
policy terms across multiple hierarchical levels. It takes GUL (Ground-Up Loss)
items and enriches them with policy terms (deductibles, limits, shares) from
location and account data.
Overview of Processing Flow:
===========================
1. SETUP: Prepare data structures, merge account IDs, handle step policies
2. LEVEL PROCESSING: For each FM level (site coverage -> policy layer):
a. Extract terms from location/account data for this level
b. Compute aggregation IDs (agg_id) based on aggregation keys
c. Calculate TIV (Total Insured Value) for percentage-based terms
d. Assign calculation rules and profile IDs
e. Merge terms back to gul_inputs_df (handling layered/non-layered separately)
f. Write level data to FM output files
3. FINALIZE: Assign output IDs, fill missing PolNumbers, write fm_xref
Key Data Structures:
===================
- gul_inputs_df: Main working DataFrame, starts with GUL items and accumulates
FM columns (agg_id, layer_id, profile_id, level_id) as levels are processed.
Each row represents an item that will flow through the FM calculation.
- level_df: Terms extracted from location/account data for the current level.
Contains financial terms (deductible, limit, share) and is merged into
gul_inputs_df after profile IDs are assigned.
- agg_id: Aggregation ID - groups items that share the same financial terms
at each level. Computed using groupby().ngroup() on aggregation keys.
- layer_id: Distinguishes policy layers (1 = base, 2+ = excess layers).
Rows start with layer_id=0 (unassigned) and get layer_id from level_df merge.
Layered vs Non-Layered Processing:
==================================
When merging level_df terms into gul_inputs_df, rows are handled differently:
- Non-layered rows (layer_id == 0): Have not yet been assigned to a layer.
Must NOT merge on layer_id column, as they need to pick up layer info from
level_df. After merge, they may expand into multiple rows (one per layer).
- Layered rows (layer_id > 0): Already assigned to a specific layer from a
previous level. Must merge on layer_id to preserve their layer assignment.
After merging, "premature layering" is removed: if rows differ only by layer_id
but have identical financial terms (same profile_id), duplicates are dropped.
This prevents unnecessary row multiplication when layers don't differ.
Output Files Written:
====================
- fm_policytc.csv: Maps (level_id, agg_id, layer_id) -> profile_id
- fm_profile.csv: Profile definitions with calculation rules and term values
- fm_programme.csv: Hierarchical structure linking levels (from_agg_id -> to_agg_id)
- fm_xref.csv: Maps GUL item IDs to FM output IDs
Args:
gul_inputs_df (pandas.DataFrame): GUL input items with columns including
item_id, loc_id, coverage_type_id, tiv, peril_id, building_id, etc.
exposure_data: OedExposure object containing location and account DataFrames
with policy terms (deductibles, limits, shares, layers).
target_dir (str): Directory path where FM output files will be written.
logger: Logger object for progress messages.
exposure_profile (dict, optional): Maps OED fields to FM term types for locations.
accounts_profile (dict, optional): Maps OED fields to FM term types for accounts.
fm_aggregation_profile (dict, optional): Defines aggregation keys for each FM level.
do_disaggregation (bool, optional): If True, split aggregate exposure terms
by NumberOfRisks. Default True.
oasis_files_prefixes (dict, optional): File name prefixes for output files.
chunksize (int, optional): Rows per chunk when writing CSVs. Default 200,000.
intermediary_csv (bool, optional): If True, also write CSV files alongside
binary for debugging. Defaults to False.
Returns:
pandas.DataFrame: IL inputs with columns including output_id, layer_id,
gul_input_id, agg_id, PolNumber, acc_idx, and summary columns.
"""
# =========================================================================
# SETUP PHASE: Open output files and prepare input data
# =========================================================================
target_dir = as_path(target_dir, 'Target IL input files directory', is_dir=True, preexists=False)
il_input_files = {}
with contextlib.ExitStack() as stack:
if exposure_data.location is not None:
locations_df = exposure_data.location.dataframe
accounts_df = exposure_data.account.dataframe
if 'acc_id' not in accounts_df:
accounts_df['acc_id'] = get_ids(exposure_data.account.dataframe, ['PortNumber', 'AccNumber'])
acc_id_map = accounts_df[['PortNumber', 'AccNumber', 'acc_id']].drop_duplicates()
gul_inputs_df = gul_inputs_df.merge(acc_id_map, how='left')
locations_df = locations_df.merge(acc_id_map, how='left')
locations_df = locations_df.drop(columns=['PortNumber', 'AccNumber', 'LocNumber'])
accounts_df = accounts_df[accounts_df['acc_id'].isin(locations_df['acc_id'].unique())].drop(columns=['PortNumber', 'AccNumber'])
# fill default types
for field_type in DEFAULT_LOC_FIELD_TYPES:
if field_type['field_col'] not in locations_df.columns:
continue
else:
locations_df[field_type['field_col']] = locations_df[field_type['field_col']].fillna(
0.) # set default to 0 to ignore term if empty
if field_type['type_col'] not in locations_df.columns:
locations_df[field_type['type_col']] = field_type['type_value']
else:
locations_df[field_type['type_col']] = locations_df[field_type['type_col']].fillna(
field_type['type_value'])
else: # no location, case for cyber, marine ...
locations_df = None
gul_inputs_df['acc_id'] = gul_inputs_df['loc_id']
accounts_df = exposure_data.account.dataframe
accounts_df['acc_id'] = accounts_df['loc_id']
accounts_df = accounts_df.drop(columns=['PortNumber', 'AccNumber'])
oed_schema = exposure_data.oed_schema
profile = get_grouped_fm_profile_by_level_and_term_group(exposure_profile, accounts_profile)
# Get the FM aggregation profile - this describes how the IL input
# items are to be aggregated in the various FM levels
fm_aggregation_profile = copy.deepcopy(fm_aggregation_profile)
if not fm_aggregation_profile:
raise OasisException(
'FM aggregation profile is empty - this is required to perform aggregation'
)
# Get the OED hierarchy terms profile - this defines the column names for loc.
# ID, acc. ID, policy no. and portfolio no., as used in the source exposure
# and accounts files. This is to ensure that the method never makes hard
# coded references to the corresponding columns in the source files, as
# that would mean that changes to these column names in the source files
# may break the method
oed_hierarchy = get_oed_hierarchy(exposure_profile, accounts_profile)
tiv_terms = {v['tiv']['ProfileElementName']: str(v['tiv']['CoverageTypeID']) for k, v in
profile[FM_LEVELS['site coverage']['id']].items()}
useful_cols = sorted(set(['layer_id', 'orig_level_id', 'level_id', 'agg_id', 'gul_input_id', 'tiv', 'NumberOfRisks']
+ get_useful_summary_cols(oed_hierarchy)).union(tiv_terms)
- {'profile_id', 'item_id', 'output_id'}, key=str.lower)
gul_inputs_df = gul_inputs_df.rename(columns={'item_id': 'gul_input_id'})
# adjust tiv columns and name them as their coverage id
gul_inputs_df[['risk_id', 'NumberOfRisks']] = gul_inputs_df[['building_id', 'NumberOfBuildings']]
gul_inputs_df.loc[gul_inputs_df['IsAggregate'] == 0, 'risk_id'] = 1
gul_inputs_df.loc[gul_inputs_df['IsAggregate'] == 0, 'NumberOfRisks'] = 1
gul_inputs_df.loc[gul_inputs_df['NumberOfRisks'] == 0, 'NumberOfRisks'] = 1
# initialization
agg_keys = set()
for level_id in fm_aggregation_profile:
agg_keys = agg_keys.union(set([v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()]))
present_cols = [col for col in gul_inputs_df.columns if col in set(useful_cols).union(agg_keys).union(fm_term_ids)]
gul_inputs_df = gul_inputs_df[present_cols].rename(columns=tiv_terms)
# Profile dict are base on key that correspond to the fm term name.
# this prevents multiple file columns to point to the same fm term
# which is necessary to have a generic logic that works with step policy
# so we change the key to be the column and use FMTermType to store the term name
level_column_mapper = {}
for level_id, level_profile in profile.items():
column_map = {}
level_column_mapper[level_id] = column_map
# for fm we only use term_id 1
for term_name, term_info in itertools.chain.from_iterable(profile.items() for profile in level_profile.values()):
new_term_info = copy.deepcopy(term_info)
new_term_info['FMTermType'] = term_name
column_map[term_info['ProfileElementName']] = new_term_info
gul_inputs_df = gul_inputs_df.drop(columns=fm_term_ids, errors='ignore').reset_index(drop=True)
gul_inputs_df['agg_id_prev'] = gul_inputs_df['gul_input_id']
gul_inputs_df['layer_id'] = 1
gul_inputs_df['PolNumber'] = pd.NA
item_perils = gul_inputs_df[['peril_id']].drop_duplicates()
# Determine whether step policies are listed, are not full of nans and step
# numbers are greater than zero
step_policies_present = False
extra_fm_col = ['layer_id', 'PolNumber', 'NumberOfBuildings', 'IsAggregate']
if 'StepTriggerType' in accounts_df and 'StepNumber' in accounts_df:
fill_empty(accounts_df, ['StepTriggerType', 'StepNumber'], 0)
step_account = (accounts_df[accounts_df[accounts_df['StepTriggerType'].notnull()]['StepNumber'].gt(0)][['acc_id']]
.drop_duplicates())
step_policies_present = bool(step_account.shape[0])
# this is done only for fmcalc to make sure it program nodes stay as a tree, remove 'is_step' logic when fmcalc is dropped
if step_policies_present:
step_account['is_step'] = 1
gul_inputs_df = gul_inputs_df.merge(step_account, how='left')
gul_inputs_df['is_step'] = gul_inputs_df['is_step'].fillna(0).astype('int8')
extra_fm_col.append('is_step')
# If step policies listed, keep step trigger type and columns associated
# with those step trigger types that are present
if step_policies_present:
# we happend the fm step policy term to policy layer
step_policy_level_map = level_column_mapper[SUPPORTED_FM_LEVELS['policy all']['id']]
for col in ['StepTriggerType', 'cov_agg_id', 'assign_step_calcrule']:
step_policy_level_map[col] = {
'ProfileElementName': col,
'FMTermType': col,
}
for key, step_term in get_default_step_policies_profile().items():
step_policy_level_map[step_term['Key']] = {
'ProfileElementName': step_term['Key'],
'FMTermType': step_term['FMProfileField'],
'FMProfileStep': step_term.get('FMProfileStep')
}
fm_profile_cols = fm_profile_step_headers
profile_bin_name = f"{oasis_files_prefixes['fm_profile']}_step"
else:
fm_profile_cols = fm_profile_headers
profile_bin_name = f"{oasis_files_prefixes['fm_profile']}"
# Binary file handles (always written)
il_input_files['fm_policytc'] = os.path.join(target_dir, f"{oasis_files_prefixes['fm_policytc']}.bin")
fm_policytc_bin = stack.enter_context(open(il_input_files['fm_policytc'], 'wb'))
il_input_files['fm_programme'] = os.path.join(target_dir, f"{oasis_files_prefixes['fm_programme']}.bin")
fm_programme_bin = stack.enter_context(open(il_input_files['fm_programme'], 'wb'))
il_input_files['fm_xref'] = os.path.join(target_dir, f"{oasis_files_prefixes['fm_xref']}.bin")
fm_xref_bin = stack.enter_context(open(il_input_files['fm_xref'], 'wb'))
il_input_files['fm_profile'] = os.path.join(target_dir, f"{profile_bin_name}.bin")
fm_profile_bin = stack.enter_context(open(il_input_files['fm_profile'], 'wb'))
# CSV file handles (only when intermediary_csv=True)
if intermediary_csv:
fm_policytc_csv = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_policytc']}.csv"), 'w'))
fm_policytc_csv.write(f"{','.join(fm_policytc_headers)}{os.linesep}")
fm_profile_csv = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_profile']}.csv"), 'w'))
fm_profile_csv.write(f"{','.join(fm_profile_cols)}{os.linesep}")
fm_programme_csv = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_programme']}.csv"), 'w'))
fm_programme_csv.write(f"{','.join(fm_programme_headers)}{os.linesep}")
fm_xref_csv = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_xref']}.csv"), 'w'))
else:
fm_policytc_csv = None
fm_profile_csv = None
fm_programme_csv = None
fm_xref_csv = None
pass_through_profile = pd.DataFrame({col: [0] for col in fm_profile_cols})
pass_through_profile['profile_id'] = 1
pass_through_profile['calcrule_id'] = PASSTHROUGH_CALCRULE_ID
write_fm_profile_level(pass_through_profile, fm_profile_csv, fm_profile_bin, step_policies_present, chunksize)
profile_id_offset = 1 # profile_id 1 is the passthrough policy (calcrule 100)
cur_level_id = 0
# =========================================================================
# LEVEL PROCESSING LOOP: Process each FM level from bottom to top
# =========================================================================
# get_levels() yields (term_df_source, levels, fm_peril_field, CondTag_merger):
# - term_df_source: DataFrame containing terms (locations_df or accounts_df)
# - levels: Iterator of (level_name, level_info) for levels using this source
# - fm_peril_field: Field name for FM peril filtering (if applicable)
# - CondTag_merger: DataFrame for conditional tag merging (accounts only)
for term_df_source, levels, fm_peril_field, CondTag_merger in get_levels(locations_df, accounts_df):
t0 = time.time()
if CondTag_merger is not None:
gul_inputs_df = gul_inputs_df.drop(columns='CondTag', errors='ignore').merge(CondTag_merger, how='left')
for level, level_info in levels:
level_id = level_info['id']
is_policy_layer_level = level_id == SUPPORTED_FM_LEVELS['policy layer']['id']
step_level = 'StepTriggerType' in level_column_mapper[level_id] # only true is step policy are present
fm_peril_field = fm_peril_field if fm_peril_field in term_df_source.columns else None
level_terms, terms_maps, coverage_group_map, fm_group_tiv = get_level_term_info(
term_df_source, level_column_mapper, level_id, step_level, fm_peril_field, oed_schema)
agg_key = [v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()]
if not terms_maps: # no terms we skip this level
if is_policy_layer_level: # for policy layer we group all to make sure we can have a0 ALLOCATION_RULE
cur_level_id += 1
write_empty_policy_layer(gul_inputs_df, cur_level_id, agg_key, fm_policytc_csv, fm_policytc_bin,
fm_programme_csv, fm_programme_bin, chunksize)
gul_inputs_df = reset_gul_inputs(gul_inputs_df)
logger.info(f"level {cur_level_id} {level_info} took {time.time() - t0}")
t0 = time.time()
continue
# reset non layered agg_id
layered_agg_id = gul_inputs_df[gul_inputs_df['layer_id'] > 1]["agg_id_prev"].unique()
gul_inputs_df['layer_id'] = gul_inputs_df['layer_id'].where(gul_inputs_df['agg_id_prev'].isin(layered_agg_id), 0)
# get all rows with terms in term_df_source and determine the correct FMTermGroupID
level_df_list = []
valid_term_default = {}
for group_key, terms in terms_maps.items():
if step_level:
FMTermGroupID, step_trigger_type = group_key
group_df = term_df_source[term_df_source['StepTriggerType'] == step_trigger_type]
terms = {**terms_maps.get((1, 0), {}), **terms} # take all common terms in (1,0) plus term with step_trigger_type filter
else:
FMTermGroupID = group_key
group_df = term_df_source
group_df = (group_df[list(set(agg_key + list(terms.keys()) + extra_fm_col)
.union(set(useful_cols).difference(set(gul_inputs_df.columns)))
.intersection(group_df.columns))]
.assign(FMTermGroupID=FMTermGroupID))
# only keep policy with non default values, remove duplicate
numeric_terms = [term for term in terms.keys() if is_numeric_dtype(group_df[term])]
term_filter = pd.Series(data=False, index=group_df.index)
for term in numeric_terms:
if pd.isna(oed_schema.get_default(term)):
term_filter |= ~group_df[term].isna()
valid_term_default[terms[term]] = 0
elif term == BIPOI_default_exeption:
term_filter |= (group_df[term] != 0.)
valid_term_default[terms[term]] = 0
else:
term_filter |= (group_df[term] != oed_schema.get_default(term))
valid_term_default[terms[term]] = oed_schema.get_default(term)
term_filter |= (group_df[term] != oed_schema.get_default(term))
keep_df = group_df[term_filter][list(
set(agg_key).intersection(group_df.columns))].drop_duplicates()
group_df = group_df.merge(keep_df, how='inner')
# multiple ProfileElementName can have the same fm terms (ex: StepTriggerType 5), we take the max to have a unique one
for ProfileElementName, term in terms.items():
if term in group_df:
group_df[term] = group_df[[term, ProfileElementName]].max(axis=1)
group_df.drop(columns=ProfileElementName, inplace=True)
else:
group_df.rename(columns={ProfileElementName: term}, inplace=True)
level_df_list.append(group_df)
level_df = pd.concat(level_df_list, copy=True, ignore_index=True)
for term, default in valid_term_default.items():
level_df[term] = level_df[term].fillna(default)
if do_disaggregation and 'risk_id' in agg_key:
level_df['NumberOfRisks'] = level_df['NumberOfBuildings'].mask(level_df['IsAggregate'] == 0, 1)
__split_fm_terms_by_risk(level_df)
level_df = level_df.drop(columns=['NumberOfBuildings', 'IsAggregate', 'NumberOfRisks'])
level_df = level_df.drop_duplicates(subset=set(level_df.columns) - set(SOURCE_IDX.values()))
agg_id_merge_col = agg_key + ['FMTermGroupID']
agg_id_merge_col_extra = ['need_tiv']
if step_level:
# merge with gul_inputs_df needs to be based on 'steptriggertype' and 'coverage_type_id'
coverage_type_id_df = pd.DataFrame(
[[StepTriggerType, coverage_type_id, FMTermGroupID]
for (StepTriggerType, coverage_type_id), FMTermGroupID in coverage_group_map.items()],
columns=['steptriggertype', 'coverage_type_id', 'FMTermGroupID'])
level_df = level_df.merge(coverage_type_id_df)
level_df["is_step"] = level_df['steptriggertype'] > 0
agg_id_merge_col_extra.append('coverage_type_id')
else:
# map the coverage_type_id to the correct FMTermGroupID for this level. coverage_type_id without term and therefor FMTermGroupID are map to 0
gul_inputs_df['FMTermGroupID'] = (gul_inputs_df['coverage_type_id']
.map(coverage_group_map, na_action='ignore')
.astype('Int32'))
# check that peril_id is part of the fm peril policy, if not we remove the terms
if 'fm_peril' in level_df:
level_df = associate_items_peril_to_policy_peril(item_perils, level_df, 'fm_peril', oed_schema)
factorize_key = agg_key + ['FMTermGroupID', 'fm_peril']
agg_id_merge_col += ['fm_peril']
agg_id_merge_col_extra.append('peril_id')
else:
factorize_key = agg_key + ['FMTermGroupID']
if level_df.empty: # No actual terms for this level
if is_policy_layer_level: # for policy layer we group all to make sure we can have a0 ALLOCATION_RULE
cur_level_id += 1
write_empty_policy_layer(gul_inputs_df, cur_level_id, agg_key, fm_policytc_csv, fm_policytc_bin,
fm_programme_csv, fm_programme_bin, chunksize)
gul_inputs_df = reset_gul_inputs(gul_inputs_df)
logger.info(f"level {cur_level_id} {level_info} took {time.time() - t0}")
t0 = time.time()
continue
level_df = prepare_ded_and_limit(level_df)
agg_id_merge_col = list(set(agg_id_merge_col).intersection(level_df.columns))
gul_inputs_df = gul_inputs_df.merge(
level_df[agg_id_merge_col + agg_id_merge_col_extra].drop_duplicates(), how='left', validate='many_to_one')
if is_policy_layer_level: # we merge all on account at this level even if there is no policy
gul_inputs_df["FMTermGroupID"] = gul_inputs_df["FMTermGroupID"].fillna(-1).astype('i4')
else:
gul_inputs_df["FMTermGroupID"] = gul_inputs_df["FMTermGroupID"].mask(
gul_inputs_df["need_tiv"].isna(), -gul_inputs_df["agg_id_prev"]).astype('i4')
# Assign agg_id: items with identical factorize_key values get the same agg_id
# This groups items that will share the same financial terms at this level
gul_inputs_df["agg_id"] = gul_inputs_df.groupby(factorize_key, sort=False, observed=True, dropna=False).ngroup().astype('int32') + 1
# =====================================================================
# TIV PROCESSING: Calculate aggregate TIV for percentage-based terms
# =====================================================================
# Some terms (ded_type=2, lim_type=2) are specified as % of TIV
# We need the aggregate TIV for each agg_id to convert to absolute values
# make sure correct tiv sum exist
tiv_df_list = []
for FMTermGroupID, coverage_type_ids in fm_group_tiv.items():
tiv_key = '_'.join(map(str, sorted(coverage_type_ids)))
if tiv_key not in gul_inputs_df:
gul_inputs_df[tiv_key] = gul_inputs_df[list(
set(gul_inputs_df.columns).intersection(map(str, sorted(coverage_type_ids))))].sum(axis=1)
tiv_df_list.append(gul_inputs_df[(gul_inputs_df["need_tiv"] == True) &
(gul_inputs_df["FMTermGroupID"] == FMTermGroupID)]
.drop_duplicates(subset=['agg_id', 'loc_id', 'building_id'])
.rename(columns={tiv_key: 'agg_tiv'})
.groupby("agg_id", observed=True)
.agg({**{col: 'first' for col in agg_id_merge_col}, **{'agg_tiv': 'sum'}})
)
tiv_df = pd.concat(tiv_df_list)
gul_inputs_df = gul_inputs_df.merge(tiv_df['agg_tiv'], left_on='agg_id', right_index=True, how='left')
gul_inputs_df['agg_tiv'] = gul_inputs_df['agg_tiv'].fillna(0)
level_df = level_df.merge(tiv_df.drop_duplicates(), how='left').drop(columns=['need_tiv'])
level_df['agg_tiv'] = level_df['agg_tiv'].fillna(0)
# Apply rule to convert type 2 deductibles and limits to TIV shares
if 'deductible' in level_df.columns and 'ded_type' in level_df.columns:
level_df['deductible'] = level_df['deductible'].fillna(0.)
level_df['ded_type'] = level_df['ded_type'].infer_objects(copy=False).fillna(0.).astype('i4')
level_df['deductible'] = np.where(
level_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'],
level_df['deductible'] * level_df['agg_tiv'],
level_df['deductible']
)
level_df.loc[level_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']
['id'], 'ded_type'] = DEDUCTIBLE_AND_LIMIT_TYPES['flat']['id']
type_filter = level_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['bi']['id']
level_df.loc[type_filter, 'deductible'] = level_df.loc[type_filter, 'deductible'] * level_df.loc[type_filter, 'agg_tiv'] / 365.
if 'limit' in level_df.columns and 'lim_type' in level_df.columns:
level_df['limit'] = level_df['limit'].fillna(0.)
level_df['lim_type'] = level_df['lim_type'].infer_objects(copy=False).fillna(0.).astype('i4')
level_df['limit'] = np.where(
level_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'],
level_df['limit'] * level_df['agg_tiv'],
level_df['limit']
)
level_df.loc[level_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']
['id'], 'lim_type'] = DEDUCTIBLE_AND_LIMIT_TYPES['flat']['id']
type_filter = level_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['bi']['id']
level_df.loc[type_filter, 'limit'] = level_df.loc[type_filter, 'limit'] * level_df.loc[type_filter, 'agg_tiv'] / 365.
if level_id == SUPPORTED_FM_LEVELS['policy layer']['id']:
level_df['calcrule_id'] = get_calc_rule_ids(level_df, calc_rule_type='policy_layer')
level_df['profile_id'] = get_profile_ids(level_df) + profile_id_offset
profile_id_offset = level_df['profile_id'].max() if not level_df.empty else profile_id_offset
elif "is_step" in level_df.columns:
step_filter = level_df["is_step"]
# Before assigning calc. rule IDs and policy TC IDs, the StepTriggerType
# should be split into its sub-types in cases where the associated
# coverages are covered separately
# For example, StepTriggerType = 5 covers buildings and contents separately
def assign_sub_step_trigger_type(row):
try:
step_trigger_type = STEP_TRIGGER_TYPES[row['steptriggertype']]['sub_step_trigger_types'][
row['coverage_type_id']]
return step_trigger_type
except KeyError:
return row['steptriggertype']
level_df.loc[step_filter, 'steptriggertype'] = level_df[step_filter].apply(
lambda row: assign_sub_step_trigger_type(row), axis=1
)
final_step_filter = level_df['steptriggertype'] > 0
# step part
level_df.loc[final_step_filter, 'calcrule_id'] = get_calc_rule_ids(level_df[final_step_filter], calc_rule_type='step')
has_step = final_step_filter & (~level_df["step_id"].isna())
level_df.loc[has_step, 'profile_id'] = level_df.loc[has_step, ['layer_id'] + factorize_key].groupby(['layer_id'] + factorize_key, sort=False, observed=True, dropna=False).ngroup().astype(
'int32') + profile_id_offset + 1
profile_id_offset = level_df.loc[has_step, 'profile_id'].max() if has_step.any() else profile_id_offset
# non step part
level_df.loc[~final_step_filter, 'calcrule_id'] = get_calc_rule_ids(level_df[~final_step_filter], calc_rule_type='base')
level_df.loc[~has_step, 'profile_id'] = (get_profile_ids(level_df[~has_step]) + profile_id_offset)
profile_id_offset = level_df.loc[~has_step, 'profile_id'].max() if (~has_step).any() else profile_id_offset
else:
level_df['calcrule_id'] = get_calc_rule_ids(level_df, calc_rule_type='base')
level_df['profile_id'] = get_profile_ids(level_df) + profile_id_offset
profile_id_offset = level_df['profile_id'].max() if not level_df.empty else profile_id_offset
write_fm_profile_level(level_df, fm_profile_csv, fm_profile_bin, step_policies_present, chunksize=chunksize)
# =====================================================================
# MERGE LEVEL TERMS INTO GUL_INPUTS_DF
# =====================================================================
# At this point:
# - level_df contains financial terms with profile_id assigned
# - gul_inputs_df has agg_id assigned but needs profile_id from level_df
#
# The merge must handle layered vs non-layered rows differently:
# - Non-layered (layer_id=0): Don't merge on layer_id, let merge bring in layer info
# - Layered (layer_id>0): Must merge on layer_id to preserve layer assignment
merge_col = list(set(level_df.columns).intersection(set(gul_inputs_df)))
level_df = level_df[merge_col + ['profile_id']].drop_duplicates()
# Check if there are any layered inputs (rows that already have a layer assigned)
layered_mask = gul_inputs_df['layer_id'] > 0
has_layered = layered_mask.any()
if has_layered:
# Original path: separate merges for layered and non-layered
# gul_inputs that don't have layer yet must not be merge using layer_id
non_layered_inputs_df = (
gul_inputs_df[~layered_mask]
.drop(columns=['layer_id'])
.rename(columns={'PolNumber': 'PolNumber_temp'})
.merge(level_df, how='left')
)
if 'PolNumber' in level_df.columns:
# gul_inputs['PolNumber'] is set to level_df['PolNumber'] if empty or if this is the first time layer appear
new_layered_gul_input_id = non_layered_inputs_df[non_layered_inputs_df['layer_id'] > 1]['gul_input_id'].unique()
non_layered_inputs_df['PolNumber'] = non_layered_inputs_df['PolNumber'].where(
(non_layered_inputs_df['gul_input_id'].isin(new_layered_gul_input_id)) | (non_layered_inputs_df['PolNumber_temp'].isna()),
non_layered_inputs_df['PolNumber_temp']
)
non_layered_inputs_df = non_layered_inputs_df.drop(columns=['PolNumber_temp'])
else:
non_layered_inputs_df = non_layered_inputs_df.rename(columns={'PolNumber_temp': 'PolNumber'})
layered_inputs_df = gul_inputs_df[layered_mask].merge(level_df, how='left')
# PREMATURE LAYERING REMOVAL:
# After merge, non-layered rows may have expanded into multiple rows
# (one per layer from level_df). If these layers have identical terms
# (same profile_id), we should keep only one to avoid unnecessary duplication.
# The 'layered_id' trick: set to layer_id only if agg_id exists in layered
# inputs (meaning there's a reason to keep layers separate), else set to 0.
# Then dedup on (gul_input_id, agg_id, profile_id, layered_id).
if not is_policy_layer_level and level_id not in cross_layer_level:
non_layered_inputs_df['layered_id'] = (non_layered_inputs_df['layer_id']
.where(non_layered_inputs_df['agg_id'].isin(layered_inputs_df['agg_id']), 0))
non_layered_inputs_df = (non_layered_inputs_df
.drop_duplicates(subset=['gul_input_id', 'agg_id', 'profile_id', 'layered_id'])
.drop(columns=['layered_id']))
gul_inputs_df = pd.concat([layered_inputs_df, non_layered_inputs_df], ignore_index=True)
else:
# Optimized path: all rows have layer_id == 0
# No need for separate layered/non-layered split and concat
gul_inputs_df = (gul_inputs_df
.drop(columns=['layer_id'])
.rename(columns={'PolNumber': 'PolNumber_temp'})
.merge(level_df, how='left'))
# Handle PolNumber (same as original)
if 'PolNumber' in level_df.columns:
new_layered_gul_input_id = gul_inputs_df[gul_inputs_df['layer_id'] > 1]['gul_input_id'].unique()
gul_inputs_df['PolNumber'] = gul_inputs_df['PolNumber'].where(
(gul_inputs_df['gul_input_id'].isin(new_layered_gul_input_id)) | (gul_inputs_df['PolNumber_temp'].isna()),
gul_inputs_df['PolNumber_temp']
)
gul_inputs_df = gul_inputs_df.drop(columns=['PolNumber_temp'])
else:
gul_inputs_df = gul_inputs_df.rename(columns={'PolNumber_temp': 'PolNumber'})
# Drop premature layering (no difference of policy between layers)
# Since no initial layered rows, layered_id would be 0 for all, so dedup on base columns
if not is_policy_layer_level and level_id not in cross_layer_level:
gul_inputs_df = gul_inputs_df.drop_duplicates(subset=['gul_input_id', 'agg_id', 'profile_id'])
gul_inputs_df['layer_id'] = gul_inputs_df['layer_id'].fillna(1).astype(layer_id[DTYPE_IDX])
gul_inputs_df["profile_id"] = gul_inputs_df["profile_id"].fillna(1).astype(profile_id[DTYPE_IDX])
# check rows in prev df that are this level granularity (if prev_agg_id has multiple corresponding agg_id)
need_root_start_df = gul_inputs_df.groupby("agg_id_prev", observed=True)["agg_id"].nunique()
need_root_start_df = need_root_start_df[need_root_start_df > 1].index
gul_inputs_df = gul_inputs_df.drop(columns=["root_start"], errors='ignore') # clean up
if need_root_start_df.shape[0]:
gul_inputs_df["root_start"] = gul_inputs_df["agg_id"].isin(
set(gul_inputs_df.loc[gul_inputs_df["agg_id_prev"].isin(need_root_start_df), 'agg_id'])
)
cur_level_id += 1
gul_inputs_df['level_id'] = cur_level_id
# =====================================================================
# WRITE FM OUTPUT FILES FOR THIS LEVEL
# =====================================================================
# fm_policytc: Maps (level_id, agg_id, layer_id) -> profile_id
# For cross_layer levels, only write layer_id=1 (terms apply across all layers)
if level_id in cross_layer_level:
fm_policytc_df = gul_inputs_df.loc[gul_inputs_df['layer_id'] == 1, fm_policytc_headers]
else:
fm_policytc_df = gul_inputs_df.loc[:, fm_policytc_headers]
fm_policytc_dedup = fm_policytc_df.drop_duplicates()
df_to_ndarray(fm_policytc_dedup, fm_policytc_dtype).tofile(fm_policytc_bin)
if fm_policytc_csv is not None:
fm_policytc_dedup.astype(fm_policytc_pd_dtype).to_csv(fm_policytc_csv, index=False,
header=False, chunksize=chunksize)
# fm_programme: Defines hierarchical links between levels
# from_agg_id (previous level) -> to_agg_id (current level)
# When root_start is True, use negative gul_input_id to indicate starting point
fm_programe_df = gul_inputs_df[['level_id', 'agg_id']].rename(columns={'agg_id': 'to_agg_id'})
if "root_start" in gul_inputs_df:
fm_programe_df['from_agg_id'] = gul_inputs_df['agg_id_prev'].where(~gul_inputs_df["root_start"], -gul_inputs_df['gul_input_id'])
else:
fm_programe_df['from_agg_id'] = gul_inputs_df['agg_id_prev']
fm_programe_dedup = fm_programe_df[fm_programme_headers].drop_duplicates()
df_to_ndarray(fm_programe_dedup, fm_programme_dtype).tofile(fm_programme_bin)
if fm_programme_csv is not None:
fm_programe_dedup.astype(fm_programme_pd_dtype).to_csv(fm_programme_csv, index=False,
header=False, chunksize=chunksize)
# reset gul_inputs_df level columns
gul_inputs_df = reset_gul_inputs(gul_inputs_df)
logger.info(f"level {cur_level_id} {level_info} took {time.time() - t0}")
t0 = time.time()
# =========================================================================
# FINALIZATION: Assign output IDs and write fm_xref
# =========================================================================
# Sort by gul_input_id and layer_id for consistent output ordering
gul_inputs_df = gul_inputs_df.sort_values(['gul_input_id', 'layer_id'], kind='stable')
gul_inputs_df['output_id'] = gul_inputs_df.reset_index().index + 1
# Fill missing PolNumber values from accounts_df using policy layer agg keys
default_pol_agg_key = [v['field'] for v in fm_aggregation_profile[SUPPORTED_FM_LEVELS['policy layer']['id']]['FMAggKey'].values()]
no_polnumber_df = gul_inputs_df.loc[gul_inputs_df['PolNumber'].isna(), default_pol_agg_key + ['output_id']]
if not no_polnumber_df.empty: # empty polnumber, we use accounts_df to set PolNumber based on policy layer agg_key
gul_inputs_df.loc[gul_inputs_df['PolNumber'].isna(), 'PolNumber'] = no_polnumber_df.merge(
accounts_df[default_pol_agg_key + ['PolNumber']].drop_duplicates(subset=default_pol_agg_key)
).set_index(no_polnumber_df['output_id'] - 1)['PolNumber']
# fm_xref: Maps GUL item IDs (agg_id) to FM output IDs
# This is the final cross-reference between GUL and IL outputs
fm_xref_df = gul_inputs_df.rename(columns={'gul_input_id': 'agg_id', 'output_id': 'output'})[fm_xref_headers]
df_to_ndarray(fm_xref_df, fm_xref_dtype).tofile(fm_xref_bin)
if fm_xref_csv is not None:
fm_xref_df.astype(fm_xref_pd_dtype).to_csv(fm_xref_csv, index=False, header=True, chunksize=chunksize)
# merge acc_idx
acc_idx_col = list(set(gul_inputs_df.columns).intersection(accounts_df.columns))
gul_inputs_df_col = list(gul_inputs_df.columns)
if 'CondTag' in acc_idx_col:
gul_inputs_df = (
gul_inputs_df
.merge(accounts_df[acc_idx_col + ['acc_idx']].drop_duplicates(subset=acc_idx_col),
how='left', validate='many_to_one'))
acc_idx_col.remove('CondTag')
gul_inputs_df.loc[gul_inputs_df['acc_idx'].isna(), 'acc_idx'] = (gul_inputs_df.loc[gul_inputs_df['acc_idx'].isna(), gul_inputs_df_col].reset_index()
.merge(accounts_df[acc_idx_col + ['acc_idx']].drop_duplicates(subset=acc_idx_col),
how='left', validate='many_to_one').set_index('index')['acc_idx'])
else:
gul_inputs_df = (
gul_inputs_df
.merge(accounts_df[acc_idx_col + ['acc_idx']].drop_duplicates(subset=acc_idx_col),
how='left', validate='many_to_one'))
return gul_inputs_df, il_input_files
def reset_gul_inputs(gul_inputs_df):
return (
gul_inputs_df.drop(columns=["root_start", "agg_id_prev", "is_step", "FMTermGroupID",
"profile_id", "level_id", 'fm_peril', 'need_tiv',
'agg_tiv'], errors='ignore')
.rename(columns={"agg_id": "agg_id_prev"}))
def write_empty_policy_layer(gul_inputs_df, cur_level_id, agg_key, fm_policytc_csv, fm_policytc_bin,
fm_programme_csv, fm_programme_bin, chunksize):
gul_inputs_df["agg_id"] = gul_inputs_df.groupby(agg_key, sort=False, observed=True).ngroup().astype('int32') + 1
gul_inputs_df["profile_id"] = 1
gul_inputs_df["level_id"] = cur_level_id
fm_policytc_df = gul_inputs_df.loc[:, fm_policytc_headers]
fm_policytc_dedup = fm_policytc_df.drop_duplicates()
df_to_ndarray(fm_policytc_dedup, fm_policytc_dtype).tofile(fm_policytc_bin)
if fm_policytc_csv is not None:
fm_policytc_dedup.astype(fm_policytc_pd_dtype).to_csv(fm_policytc_csv, index=False, header=False,
chunksize=chunksize)
fm_programe_df = gul_inputs_df[['level_id', 'agg_id']].rename(columns={'agg_id': 'to_agg_id'})
fm_programe_df['from_agg_id'] = gul_inputs_df['agg_id_prev']
fm_programe_dedup = fm_programe_df[fm_programme_headers].drop_duplicates()
df_to_ndarray(fm_programe_dedup, fm_programme_dtype).tofile(fm_programme_bin)
if fm_programme_csv is not None:
fm_programe_dedup.astype(fm_programme_pd_dtype).to_csv(fm_programme_csv, index=False,
header=False, chunksize=chunksize)
def write_fm_profile_level(level_df, fm_profile_csv, fm_profile_bin_file, step_policies_present, chunksize=100000):
"""
Writes an FM profile file.
Args:
level_df (pandas.DataFrame): FM terms dataframe.
fm_profile_csv (file): Open CSV file object to write to, or None to skip CSV.
fm_profile_bin_file (file): Open binary file object to write to.
step_policies_present (bool): Flag to determine which type of file to write.
chunksize (int, optional): Number of rows to write per chunk.
Raises:
OasisException: If writing to the file fails.
"""
level_df = level_df.astype({'calcrule_id': calcrule_id[DTYPE_IDX], 'profile_id': profile_id[DTYPE_IDX]})
# Step policies exist
if step_policies_present:
fm_profile_df = level_df[list(set(level_df.columns).intersection(set(fm_profile_step_headers + ['steptriggertype'])))].copy()
for col in fm_profile_step_headers + ['steptriggertype']:
if col not in fm_profile_df.columns:
fm_profile_df[col] = 0
for non_step_name, step_name in profile_cols_map.items():
if step_name not in fm_profile_df.columns:
fm_profile_df[step_name] = 0
fm_profile_df[step_name] = fm_profile_df[step_name].astype(object)
if non_step_name in level_df.columns:
fm_profile_df.loc[
~(fm_profile_df['steptriggertype'] > 0), step_name
] = level_df.loc[
~(fm_profile_df['steptriggertype'] > 0),
non_step_name
]
fm_profile_df.fillna(0, inplace=True)
fm_profile_df = fm_profile_df.drop_duplicates()
# Ensure step_id is of int data type and set default value to 1
fm_profile_df = fm_profile_df.astype(fm_profile_step_pd_dtype)
fm_profile_df.loc[fm_profile_df['step_id'] == 0, 'step_id'] = 1
fm_profile_df = fm_profile_df[fm_profile_step_headers].sort_values(by=["profile_id", 'step_id']).drop_duplicates()
# No step policies
else:
# make sure there is no step file in the folder
fm_profile_df = level_df[list(set(level_df.columns).intersection(set(policytc_cols)))].copy()
for col in policytc_cols[2:]:
if col not in fm_profile_df.columns:
fm_profile_df[col] = 0.
fm_profile_df = (
fm_profile_df
.rename(columns=profile_cols_map)
.drop_duplicates()
.assign(share2=0.0, share3=0.0)
.astype(fm_profile_pd_dtype)[fm_profile_headers]
)
try:
bin_dtype = fm_profile_step_dtype if step_policies_present else fm_profile_dtype
df_to_ndarray(fm_profile_df, bin_dtype).tofile(fm_profile_bin_file)
if fm_profile_csv is not None:
fm_profile_df.to_csv(
fm_profile_csv,
index=False,
header=False,
chunksize=chunksize,
)
except (IOError, OSError) as e:
raise OasisException("Exception raised in 'write_fm_profile_level'") from e