Source code for oasislmf.preparation.il_inputs

__all__ = [
    'get_calc_rule_ids',
    'get_grouped_fm_profile_by_level_and_term_group',
    'get_grouped_fm_terms_by_level_and_term_group',
    'get_oed_hierarchy',
    'get_il_input_items',
    'get_profile_ids',
    'get_step_profile_ids',
    'write_il_input_files',
    'write_fm_policytc_file',
    'write_fm_profile_file',
    'write_fm_programme_file',
    'write_fm_xref_file'
]

import copy
import gc
import itertools
import os
import sys
import warnings

import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
from ods_tools.oed import fill_empty, BLANK_VALUES

from oasislmf.preparation.summaries import get_useful_summary_cols, get_xref_df
from oasislmf.utils.calc_rules import get_calc_rules
from oasislmf.utils.coverages import SUPPORTED_COVERAGE_TYPES
from oasislmf.utils.data import (factorize_array, factorize_ndarray,
                                 set_dataframe_column_dtypes)
from oasislmf.utils.defaults import (OASIS_FILES_PREFIXES, SUMMARY_TOP_LEVEL_COLS, assign_defaults_to_il_inputs,
                                     get_default_accounts_profile, get_default_exposure_profile,
                                     get_default_fm_aggregation_profile, SOURCE_IDX)
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.fm import (CALCRULE_ASSIGNMENT_METHODS, COVERAGE_AGGREGATION_METHODS,
                               DEDUCTIBLE_AND_LIMIT_TYPES, FML_ACCALL, STEP_TRIGGER_TYPES,
                               SUPPORTED_FM_LEVELS, FM_TERMS, GROUPED_SUPPORTED_FM_LEVELS)
from oasislmf.utils.log import oasis_log
from oasislmf.utils.path import as_path
from oasislmf.utils.profiles import (get_default_step_policies_profile,
                                     get_grouped_fm_profile_by_level_and_term_group,
                                     get_grouped_fm_terms_by_level_and_term_group,
                                     get_oed_hierarchy)

pd.options.mode.chained_assignment = None
warnings.simplefilter(action='ignore', category=FutureWarning)

# Define a list of all supported OED coverage types in the exposure
supp_cov_type_ids = [v['id'] for v in SUPPORTED_COVERAGE_TYPES.values()]

policytc_cols = ['calcrule_id', 'limit', 'deductible', 'deductible_min', 'deductible_max', 'attachment', 'share']
step_profile_cols = [
    'profile_id', 'calcrule_id',
    'deductible1', 'deductible2', 'deductible3', 'attachment1',
    'limit1', 'share1', 'share2', 'share3', 'step_id',
    'trigger_start', 'trigger_end', 'payout_start', 'payout_end',
    'limit2', 'scale1', 'scale2'
]

profile_cols = ['profile_id', 'calcrule_id', 'deductible1', 'deductible2', 'deductible3', 'attachment1', 'limit1',
                'share1', 'share2', 'share3']
profile_cols_map = {
    'deductible': 'deductible1',
    'deductible_min': 'deductible2',
    'deductible_max': 'deductible3',
    'attachment': 'attachment1',
    'limit': 'limit1',
    'share': 'share1'
}
cross_layer_level = {FML_ACCALL, }

risk_disaggregation_term = {'deductible', 'deductible_min', 'deductible_max', 'attachment', 'limit'}

fm_term_ids = [fm_term['id'] for fm_term in FM_TERMS.values()]


[docs] def get_calc_rule_ids(il_inputs_calc_rules_df, calc_rule_type): """ merge selected il_inputs with the correct calc_rule table and return a pandas Series of calc. rule IDs Args: il_inputs_calc_rules_df (DataFrame): IL input items dataframe calc_rule_type (str): type of calc_rule to look for Returns: pandas Series of calc. rule IDs """ calc_rules_df, calc_rule_term_info = get_calc_rules(calc_rule_type) calc_rules_df = calc_rules_df.drop(columns=['desc', 'id_key'], axis=1, errors='ignore') terms = [] terms_indicators = [] for term in calc_rule_term_info['terms']: if term in il_inputs_calc_rules_df.columns: terms.append(term) terms_indicators.append('{}_gt_0'.format(term)) else: calc_rules_df = calc_rules_df[calc_rules_df['{}_gt_0'.format(term)] == 0].drop(columns=['{}_gt_0'.format(term)]) for term in calc_rule_term_info['types_and_codes']: if term in il_inputs_calc_rules_df.columns: il_inputs_calc_rules_df[term] = il_inputs_calc_rules_df[term].fillna(0).astype('uint8') else: calc_rules_df = calc_rules_df[calc_rules_df[term] == 0].drop(columns=[term]) il_inputs_calc_rules_df.loc[:, terms_indicators] = np.where(il_inputs_calc_rules_df[terms] > 0, 1, 0) merge_col = list(set(il_inputs_calc_rules_df.columns).intersection(calc_rules_df.columns).difference({'calcrule_id'})) calcrule_ids = ( il_inputs_calc_rules_df[merge_col].reset_index() .merge(calc_rules_df[merge_col + ['calcrule_id']].drop_duplicates(), how='left', on=merge_col) ).set_index('index')['calcrule_id'].fillna(0) if 0 in calcrule_ids.unique(): no_match_keys = il_inputs_calc_rules_df.loc[calcrule_ids == 0, ['PortNumber', 'AccNumber', 'LocNumber'] + merge_col].drop_duplicates() err_msg = 'Calculation Rule mapping error, non-matching keys:\n{}'.format(no_match_keys) raise OasisException(err_msg) return calcrule_ids
[docs] def get_profile_ids(il_inputs_df): """ Returns a Numpy array of policy TC IDs from a table of IL input items :param il_inputs_df: IL input items dataframe :type il_inputs_df: pandas.DataFrame :return: Numpy array of policy TC IDs :rtype: numpy.ndarray """ factor_col = list(set(il_inputs_df.columns).intersection(policytc_cols + step_profile_cols)) return factorize_ndarray(il_inputs_df.loc[:, factor_col].values, col_idxs=range(len(factor_col)))[0]
[docs] def get_step_profile_ids( il_inputs_df, offset=0, idx_cols=[] ): """ Returns a Numpy array of policy TC IDs from a table of IL input items that include step policies :param il_inputs_df: IL input items dataframe :type il_inputs_df: pandas.DataFrame :param step_trigger_type_cols: column names used to determine values for terms indicators and types :type step_trigger_type_cols: list :return: Numpy array of policy TC IDs :rtype: numpy.ndarray """ fm_policytc_cols = [col for col in idx_cols + ['layer_id', 'level_id', 'agg_id', 'coverage_id', 'assign_step_calcrule'] + step_profile_cols if col in il_inputs_df.columns] fm_policytc_df = il_inputs_df[fm_policytc_cols] fm_policytc_df['profile_id'] = factorize_ndarray(fm_policytc_df.loc[:, ['layer_id', 'level_id', 'agg_id']].values, col_idxs=range(3))[0] fm_policytc_df['pol_id'] = factorize_ndarray(fm_policytc_df.loc[:, idx_cols + ['coverage_id']].values, col_idxs=range(len(idx_cols) + 1))[0] step_calcrule_policytc_agg = pd.DataFrame( fm_policytc_df[fm_policytc_df['step_id'] > 0]['profile_id'].to_list(), index=fm_policytc_df[fm_policytc_df['step_id'] > 0]['pol_id'] ).groupby('pol_id').aggregate(list).to_dict()[0] fm_policytc_df.loc[ fm_policytc_df['step_id'] > 0, 'profile_id' ] = fm_policytc_df.loc[fm_policytc_df['step_id'] > 0]['pol_id'].map(step_calcrule_policytc_agg) fm_policytc_df['profile_id'] = fm_policytc_df['profile_id'].apply( lambda x: tuple(x) if isinstance(x, list) else x ) return factorize_array(fm_policytc_df['profile_id'])[0] + offset
def __split_fm_terms_by_risk(df): """ adjust the financial term to the number of risks for example deductible is split into each individul building risk Args: df (DataFrame): the DataFrame an FM level """ for term in risk_disaggregation_term.intersection(set(df.columns)): if f'{term[:3]}_code' in df.columns: code_filter = df[f'{term[:3]}_code'] == 0 df.loc[code_filter, term] /= df.loc[code_filter, 'NumberOfRisks'] else: df[term] /= df['NumberOfRisks'] def __drop_duplicated_row(prev_level_df, level_df, sub_agg_key): """drop duplicated row base on the agg_key, sub_agg_key and layer_id""" if prev_level_df is not None: sub_level_layer_needed = level_df['agg_id'].isin(prev_level_df.loc[prev_level_df["layer_id"] == 2]['to_agg_id']) else: sub_level_layer_needed = False level_value_count = level_df[['agg_id'] + list(set(level_df.columns).intersection(fm_term_ids))].drop_duplicates()['agg_id'].value_counts() this_level_layer_needed = level_df['agg_id'].isin(level_value_count[level_value_count > 1].index.values) if 'share' in level_df.columns: this_level_layer_needed |= ~level_df['share'].isin({0, 1}) level_df['layer_id'] = np.where(sub_level_layer_needed | this_level_layer_needed, level_df['layer_id'], 1) level_df.drop_duplicates(subset=['agg_id'] + sub_agg_key + ['layer_id'], inplace=True) def get_cond_info(locations_df, accounts_df): level_conds = {} extra_accounts = [] default_cond_tag = '0' if 'CondTag' in locations_df.columns: fill_empty(locations_df, 'CondTag', default_cond_tag) loc_condkey_df = locations_df.loc[locations_df['CondTag'] != default_cond_tag, ['PortNumber', 'AccNumber', 'CondTag']].drop_duplicates() else: loc_condkey_df = pd.DataFrame([], columns=['PortNumber', 'AccNumber', 'CondTag']) if 'CondTag' in accounts_df.columns: fill_empty(accounts_df, 'CondTag', default_cond_tag) acc_condkey_df = accounts_df.loc[accounts_df['CondTag'] != '', ['PortNumber', 'AccNumber', 'CondTag']].drop_duplicates() condkey_match_df = acc_condkey_df.merge(loc_condkey_df, how='outer', indicator=True) missing_condkey_df = condkey_match_df.loc[condkey_match_df['_merge'] == 'right_only', ['PortNumber', 'AccNumber', 'CondTag']] else: acc_condkey_df = pd.DataFrame([], columns=['PortNumber', 'AccNumber', 'CondTag']) missing_condkey_df = loc_condkey_df if missing_condkey_df.shape[0]: raise OasisException(f'Those condtag are present in locations but missing in the account file:\n{missing_condkey_df}') if acc_condkey_df.shape[0]: if 'CondTag' not in locations_df.columns: locations_df['CondTag'] = default_cond_tag # we get information about cond from accounts_df cond_tags = {} # information about each cond tag account_layer_exclusion = {} # for each account and layer, store info about cond class exclusion fill_empty(accounts_df, 'CondPriority', 1) fill_empty(accounts_df, 'CondPeril', '') for acc_rec in accounts_df.to_dict(orient="records"): cond_tag_key = (acc_rec['PortNumber'], acc_rec['AccNumber'], acc_rec['CondTag']) cond_number_key = (acc_rec['PortNumber'], acc_rec['AccNumber'], acc_rec['CondTag'], acc_rec['CondNumber']) cond_tag = cond_tags.setdefault(cond_tag_key, {'CondPriority': acc_rec['CondPriority'] or 1, 'CondPeril': acc_rec['CondPeril']}) cond_tag.setdefault('layers', {})[acc_rec['layer_id']] = {'CondNumber': cond_number_key} exclusion_cond_tags = account_layer_exclusion.setdefault((acc_rec['PortNumber'], acc_rec['AccNumber']), {}).setdefault(acc_rec['layer_id'], set()) if acc_rec.get('CondClass') == 1: exclusion_cond_tags.add(acc_rec['CondTag']) # we get the list of loc for each cond_tag loc_conds = {} KEY_INDEX = 0 PRIORITY_INDEX = 1 for loc_rec in locations_df.to_dict(orient="records"): loc_key = (loc_rec['PortNumber'], loc_rec['AccNumber'], loc_rec['LocNumber']) cond_key = (loc_rec['PortNumber'], loc_rec['AccNumber'], loc_rec.get('CondTag', default_cond_tag)) if cond_key in cond_tags: cond_tag = cond_tags[cond_key] else: cond_tag = {'CondPriority': 1, 'layers': {}} cond_tags[cond_key] = cond_tag # if 'locations' not in cond_tag: # first time we see the cond tag in this loop, update the layers # for layer_id in account_layers[loc_rec['PortNumber'], loc_rec['AccNumber']]: # if layer_id not in cond_tag['CondNumber']: # cond_tag['CondNumber'][layer_id] = {'CondNumber': tuple(list(cond_key)+[''])} cond_location = cond_tag.setdefault('locations', set()) cond_location.add(loc_key) cond_tag_priority = cond_tag['CondPriority'] conds = loc_conds.setdefault(loc_key, []) for i, cond in enumerate(conds): if cond_tag_priority < cond[PRIORITY_INDEX]: conds.insert(i, (cond_key, cond_tag_priority)) break elif cond_tag_priority == cond[PRIORITY_INDEX] and cond_key != cond[KEY_INDEX]: raise OasisException(f"{cond_key} and {cond[KEY_INDEX]} have same priority in {loc_key}") else: conds.append((cond_key, cond_tag_priority)) # at first we just want condtag for each level for cond_key, cond_info in cond_tags.items(): port_num, acc_num, cond_tag = cond_key cond_level_start = 1 for loc_key in cond_info.get('locations', set()): for i, (cond_key_i, _) in enumerate(loc_conds[loc_key]): if cond_key_i == cond_key: cond_level_start = max(cond_level_start, i + 1) break cond_info['cond_level_start'] = cond_level_start for layer_id, exclusion_conds in account_layer_exclusion[(cond_key[0], cond_key[1])].items(): if layer_id not in cond_info['layers']: if exclusion_conds: extra_accounts.append({ 'PortNumber': port_num, 'AccNumber': acc_num, 'CondTag': cond_tag, 'layer_id': layer_id, 'CondNumber': 'FullFilter', 'CondDed6All': 1, 'CondDedType6All': 1, 'CondPeril': 'AA1', }) else: extra_accounts.append({ 'PortNumber': port_num, 'AccNumber': acc_num, 'CondTag': cond_tag, 'layer_id': layer_id, 'CondNumber': '', 'CondPeril': 'AA1', }) level_conds.setdefault(cond_level_start, set()).add(cond_key) return level_conds, extra_accounts def get_levels(gul_inputs_df, locations_df, accounts_df): level_conds, extra_accounts = get_cond_info(locations_df, accounts_df) for group_name, group_info in copy.deepcopy(GROUPED_SUPPORTED_FM_LEVELS).items(): if group_info['oed_source'] == 'location': locations_df['layer_id'] = 1 yield locations_df, list(group_info['levels'].items())[1:], group_info['fm_peril_field'] # [1:] => 'site coverage' is already done elif group_info['oed_source'] == 'account': if group_name == 'cond' and level_conds: loc_conds_df = locations_df[['loc_id', 'PortNumber', 'AccNumber', 'CondTag']].drop_duplicates() for stage, cond_keys in level_conds.items(): cond_filter_df = pd.DataFrame(cond_keys, columns=['PortNumber', 'AccNumber', 'CondTag']) loc_conds_df_filter = cond_filter_df[['PortNumber', 'AccNumber', 'CondTag']].drop_duplicates().merge(loc_conds_df, how='left') gul_inputs_df.drop(columns=['CondTag'], inplace=True, errors='ignore') gul_inputs_df['CondTag'] = gul_inputs_df[['loc_id', 'PortNumber', 'AccNumber']].merge(loc_conds_df_filter, how='left')['CondTag'] yield (cond_filter_df.merge(pd.concat([accounts_df, pd.DataFrame(extra_accounts)]), how='left'), group_info['levels'].items(), group_info['fm_peril_field']) else: yield accounts_df, group_info['levels'].items(), group_info.get('fm_peril_field') def get_level_term_info(term_df_source, level_column_mapper, level_id, step_level, fm_peril_field, oed_schema): level_terms = set() terms_maps = {} coverage_group_map = {} fm_group_tiv = {} non_zero_default = {} for ProfileElementName, term_info in level_column_mapper[level_id].items(): default_value = oed_schema.get_default(ProfileElementName) if ProfileElementName not in term_df_source.columns: if default_value == 0 or default_value in BLANK_VALUES: continue else: non_zero_default[ProfileElementName] = [term_info['FMTermType'].lower(), default_value] continue else: fill_empty(term_df_source, ProfileElementName, default_value) if 'FMProfileStep' in term_info: profile_steps = term_info["FMProfileStep"] if isinstance(profile_steps, int): profile_steps = [profile_steps] valid_step_trigger_types = term_df_source.loc[(term_df_source['StepTriggerType'].isin(profile_steps)) & (term_df_source[ProfileElementName] > 0), 'StepTriggerType'].unique() if len(valid_step_trigger_types): level_terms.add(term_info['FMTermType'].lower()) for step_trigger_type in valid_step_trigger_types: coverage_aggregation_method = STEP_TRIGGER_TYPES[step_trigger_type]['coverage_aggregation_method'] calcrule_assignment_method = STEP_TRIGGER_TYPES[step_trigger_type]['calcrule_assignment_method'] for coverage_type_id in supp_cov_type_ids: FMTermGroupID = COVERAGE_AGGREGATION_METHODS[coverage_aggregation_method][coverage_type_id] if (step_trigger_type, coverage_type_id) in coverage_group_map: if coverage_group_map[(step_trigger_type, coverage_type_id)] != FMTermGroupID: raise OasisException( f"multiple coverage_type_id {(step_trigger_type, coverage_type_id)} " f"assigned to the different FMTermGroupID " f"{(FMTermGroupID, coverage_group_map[(step_trigger_type, coverage_type_id)])}") else: coverage_group_map[(step_trigger_type, coverage_type_id)] = FMTermGroupID terms_map = terms_maps.setdefault((FMTermGroupID, step_trigger_type), {fm_peril_field: 'fm_peril'} if fm_peril_field else {}) if CALCRULE_ASSIGNMENT_METHODS[calcrule_assignment_method][FMTermGroupID]: terms_map[ProfileElementName] = term_info['FMTermType'].lower() else: if not (term_df_source[ProfileElementName] > 0).any(): continue level_terms.add(term_info['FMTermType'].lower()) coverage_type_ids = term_info.get("CoverageTypeID", supp_cov_type_ids) FMTermGroupID = term_info.get('FMTermGroupID', 1) if step_level: term_key = (FMTermGroupID, 0) else: term_key = FMTermGroupID if isinstance(coverage_type_ids, int): coverage_type_ids = [coverage_type_ids] fm_group_tiv[FMTermGroupID] = coverage_type_ids for coverage_type_id in coverage_type_ids: if step_level: coverage_group_key = (0, coverage_type_id) else: coverage_group_key = coverage_type_id if coverage_type_id in coverage_group_map: if coverage_group_map[coverage_group_key] != FMTermGroupID: raise OasisException( f"multiple coverage_type_id {coverage_group_key}" f"assigned to the different FMTermGroupID {(FMTermGroupID, coverage_group_map[coverage_group_key])}") else: coverage_group_map[coverage_group_key] = FMTermGroupID terms_maps.setdefault(term_key, {fm_peril_field: 'fm_peril'} if fm_peril_field else {})[ ProfileElementName] = term_info['FMTermType'].lower() if level_terms: for ProfileElementName, (term, default_value) in non_zero_default.items(): term_df_source[ProfileElementName] = default_value level_terms.add(term) for terms_map in terms_maps.values(): terms_map[ProfileElementName] = term return level_terms, terms_maps, coverage_group_map, fm_group_tiv @oasis_log
[docs] def get_il_input_items( gul_inputs_df, locations_df, accounts_df, oed_schema, exposure_profile=get_default_exposure_profile(), accounts_profile=get_default_accounts_profile(), fm_aggregation_profile=get_default_fm_aggregation_profile(), do_disaggregation=True, ): """ Generates and returns a Pandas dataframe of IL input items. :param locations_df: Source exposure :type locations_df: pandas.DataFrame :param gul_inputs_df: GUL input items :type gul_inputs_df: pandas.DataFrame :param accounts_df: Source accounts dataframe (optional) :param accounts_df: pandas.DataFrame :param accounts_fp: Source accounts file path (optional) :param accounts_fp: str :param exposure_profile: Source exposure profile (optional) :type exposure_profile: dict :param accounts_profile: Source accounts profile (optional) :type accounts_profile: dict :param fm_aggregation_profile: FM aggregation profile (optional) :param fm_aggregation_profile: dict :param do_disaggregation: whether to split terms and conditions for aggregate exposure (optional) :param do_disaggregation: bool :return: IL inputs dataframe :rtype: pandas.DataFrame :return Accounts dataframe :rtype: pandas.DataFrame """ profile = get_grouped_fm_profile_by_level_and_term_group(exposure_profile, accounts_profile) tiv_terms = {v['tiv']['ProfileElementName']: str(v['tiv']['CoverageTypeID']) for k, v in profile[1].items()} # Get the FM aggregation profile - this describes how the IL input # items are to be aggregated in the various FM levels fm_aggregation_profile = copy.deepcopy(fm_aggregation_profile) if not fm_aggregation_profile: raise OasisException( 'FM aggregation profile is empty - this is required to perform aggregation' ) # Get the OED hierarchy terms profile - this defines the column names for loc. # ID, acc. ID, policy no. and portfolio no., as used in the source exposure # and accounts files. This is to ensure that the method never makes hard # coded references to the corresponding columns in the source files, as # that would mean that changes to these column names in the source files # may break the method oed_hierarchy = get_oed_hierarchy(exposure_profile, accounts_profile) # ===== useful_cols = sorted(set(['layer_id', 'orig_level_id', 'level_id', 'agg_id', 'gul_input_id', 'agg_tiv', 'NumberOfRisks'] + get_useful_summary_cols(oed_hierarchy) + list(tiv_terms.values())) - {'profile_id', 'item_id', 'output_id'}, key=str.lower) gul_inputs_df.rename(columns={'item_id': 'gul_input_id'}, inplace=True) # adjust tiv columns and name them as their coverage id gul_inputs_df.rename(columns=tiv_terms, inplace=True) gul_inputs_df[['risk_id', 'NumberOfRisks']] = gul_inputs_df[['building_id', 'NumberOfBuildings']] gul_inputs_df.loc[gul_inputs_df['IsAggregate'] == 0, ['risk_id', 'NumberOfRisks']] = 1, 1 gul_inputs_df.loc[gul_inputs_df['NumberOfRisks'] == 0, 'NumberOfRisks'] = 1 # initialization agg_keys = set() for level_id in fm_aggregation_profile: agg_keys = agg_keys.union(set([v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()])) present_cols = [col for col in gul_inputs_df.columns if col in set(useful_cols).union(agg_keys).union(fm_term_ids + ['LocPeril'])] gul_inputs_df = gul_inputs_df[present_cols] # Remove TIV col from location as this information is present in gul_input_df locations_df = locations_df.drop(columns=tiv_terms.keys()) # Profile dict are base on key that correspond to the fm term name. # this prevent multiple file column to point to the same fm term # which is necessary to have a generic logic that works with step policy # so we change the key to be the column and use FMTermType to store the term name level_column_mapper = {} for level_id, level_profile in profile.items(): column_map = {} level_column_mapper[level_id] = column_map # for fm we only use term_id 1 for term_name, term_info in itertools.chain.from_iterable(profile.items() for profile in level_profile.values()): new_term_info = copy.deepcopy(term_info) new_term_info['FMTermType'] = term_name column_map[term_info['ProfileElementName']] = new_term_info level_id = SUPPORTED_FM_LEVELS['site coverage']['id'] prev_level_df = gul_inputs_df[present_cols] prev_agg_key = [v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()] # no duplicate, if we have, error will appear later for agg_id_1 prev_level_df.drop_duplicates(subset=prev_agg_key, inplace=True, ignore_index=True) if 'LocPeril' in prev_level_df: peril_filter = oed_schema.peril_filtering(prev_level_df['peril_id'], prev_level_df['LocPeril']) prev_level_df.loc[~peril_filter, list(set(prev_level_df.columns).intersection(fm_term_ids))] = 0 if do_disaggregation: __split_fm_terms_by_risk(prev_level_df) prev_level_df['agg_id'] = factorize_ndarray(prev_level_df.loc[:, ['loc_id', 'risk_id', 'coverage_type_id']].values, col_idxs=range(3))[0] prev_level_df['level_id'] = 1 prev_level_df['orig_level_id'] = level_id prev_level_df['layer_id'] = 1 prev_level_df['agg_tiv'] = prev_level_df['tiv'] prev_level_df['attachment'] = 0 prev_level_df['share'] = 0 il_inputs_df_list = [] gul_inputs_df = gul_inputs_df.drop(columns=fm_term_ids + ['tiv'], errors='ignore').reset_index(drop=True) prev_df_subset = prev_agg_key # Determine whether step policies are listed, are not full of nans and step # numbers are greater than zero step_policies_present = False extra_fm_col = ['layer_id'] if 'StepTriggerType' in accounts_df and 'StepNumber' in accounts_df: fill_empty(accounts_df, ['StepTriggerType', 'StepNumber'], 0) step_account = (accounts_df[accounts_df[accounts_df['StepTriggerType'].notnull()]['StepNumber'].gt(0)][['PortNumber', 'AccNumber']] .drop_duplicates()) step_policies_present = bool(step_account.shape[0]) # this is done only for fmcalc to make sure it program nodes stay as a tree, remove 'is_step' logic when fmcalc is dropped if step_policies_present: step_account['is_step'] = 1 gul_inputs_df = gul_inputs_df.merge(step_account, how='left') gul_inputs_df['is_step'] = gul_inputs_df['is_step'].fillna(0).astype('int8') extra_fm_col = ['layer_id', 'is_step'] # If step policies listed, keep step trigger type and columns associated # with those step trigger types that are present if step_policies_present: # we happend the fm step policy term to policy layer step_policy_level_map = level_column_mapper[SUPPORTED_FM_LEVELS['policy layer']['id']] for col in ['StepTriggerType', 'cov_agg_id', 'assign_step_calcrule']: step_policy_level_map[col] = { 'ProfileElementName': col, 'FMTermType': col, } for key, step_term in get_default_step_policies_profile().items(): step_policy_level_map[step_term['Key']] = { 'ProfileElementName': step_term['Key'], 'FMTermType': step_term['FMProfileField'], 'FMProfileStep': step_term.get('FMProfileStep') } for term_df_source, levels, fm_peril_field in get_levels(gul_inputs_df, locations_df, accounts_df): for level, level_info in levels: level_id = level_info['id'] step_level = 'StepTriggerType' in level_column_mapper[level_id] # only true is step policy are present level_terms, terms_maps, coverage_group_map, fm_group_tiv = get_level_term_info( term_df_source, level_column_mapper, level_id, step_level, fm_peril_field, oed_schema) if not terms_maps: # no terms we skip this level continue agg_key = [v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()] # get all rows with terms in term_df_source and determine the correct FMTermGroupID level_df_list = [] for group_key, terms in terms_maps.items(): if step_level: FMTermGroupID, step_trigger_type = group_key group_df = term_df_source[term_df_source['StepTriggerType'] == step_trigger_type] terms = {**terms_maps.get((1, 0), {}), **terms} # take all common terms in (1,0) plus term with step_trigger_type filter else: FMTermGroupID = group_key group_df = term_df_source group_df = (group_df[list(set(agg_key + list(terms.keys()) + extra_fm_col) .union(set(useful_cols).difference(set(gul_inputs_df.columns))) .intersection(group_df.columns))] .assign(FMTermGroupID=FMTermGroupID)) numeric_terms = [term for term in terms.keys() if is_numeric_dtype(group_df[term])] keep_df = group_df[(group_df[numeric_terms] > 0).any(axis='columns')][list( set(agg_key).intersection(group_df.columns))].drop_duplicates() group_df = keep_df.merge(group_df, how='left') # multiple ProfileElementName can have the same fm terms (ex: StepTriggerType 5), we take the max to have a unique one for ProfileElementName, term in terms.items(): if term in group_df: group_df[term] = group_df[[term, ProfileElementName]].max(axis=1) group_df.drop(columns=ProfileElementName, inplace=True) else: group_df.rename(columns={ProfileElementName: term}, inplace=True) level_df_list.append(group_df) level_df = pd.concat(level_df_list, copy=True) level_df = level_df.drop_duplicates(subset=set(level_df.columns) - set(SOURCE_IDX.values())) if step_level: # merge with gul_inputs_df needs to be based on 'steptriggertype' and 'coverage_type_id' gul_inputs_df.drop(columns=['FMTermGroupID'], errors='ignore', inplace=True) coverage_type_id_df = pd.DataFrame( [[StepTriggerType, coverage_type_id, FMTermGroupID] for (StepTriggerType, coverage_type_id), FMTermGroupID in coverage_group_map.items()], columns=['steptriggertype', 'coverage_type_id', 'FMTermGroupID']) level_df = level_df.merge(coverage_type_id_df) else: # map the coverage_type_id to the correct FMTermGroupID for this level. coverage_type_id without term and therefor FMTermGroupID are map to 0 gul_inputs_df['FMTermGroupID'] = gul_inputs_df['coverage_type_id'].map(coverage_group_map, na_action='ignore') # make sure correct tiv sum exist for FMTermGroupID, coverage_type_ids in fm_group_tiv.items(): tiv_key = '_'.join(map(str, sorted(coverage_type_ids))) if tiv_key not in gul_inputs_df: gul_inputs_df[tiv_key] = gul_inputs_df[map(str, sorted(coverage_type_ids))].sum(axis=1) # we have prepared FMTermGroupID on gul or level df (depending on step_level) now we can merge the terms for this level to gul level_df = (gul_inputs_df.merge(level_df, how='left')) # assign correct tiv for FMTermGroupID, coverage_type_ids in fm_group_tiv.items(): tiv_key = '_'.join(map(str, sorted(coverage_type_ids))) level_df['tiv'] = level_df.loc[level_df['FMTermGroupID'] == FMTermGroupID, tiv_key] # check that peril_id is part of the fm peril policy, if not we remove the terms if 'fm_peril' in level_df: level_df['fm_peril'] = level_df['fm_peril'].fillna('') peril_filter = oed_schema.peril_filtering(level_df['peril_id'], level_df['fm_peril']) level_df.loc[~peril_filter, list(level_terms) + ['FMTermGroupID']] = 0 factorize_key = agg_key + ['FMTermGroupID', 'fm_peril'] else: factorize_key = agg_key + ['FMTermGroupID'] level_df['FMTermGroupID'] = level_df['FMTermGroupID'].astype('Int64') # make sure agg_id without term still have the same amount of layer no_term_filter = level_df['layer_id'].isna() level_df_no_term = level_df[no_term_filter] level_df_no_term = level_df_no_term.drop(columns=['layer_id']).merge( prev_level_df[prev_level_df['gul_input_id'].isin(set(level_df_no_term['gul_input_id']))][['gul_input_id', 'layer_id']], how='left' ) level_df = pd.concat([level_df[~no_term_filter], level_df_no_term]).reset_index() level_df['layer_id'] = level_df['layer_id'].astype('int32') # map agg_id_prev using gul_input_id prev_level_df_mapper = prev_level_df[['gul_input_id', 'agg_id']].drop_duplicates().set_index('gul_input_id') level_df['agg_id_prev'] = level_df['gul_input_id'].map(prev_level_df_mapper['agg_id']).fillna(0).astype('int64') if do_disaggregation: if 'risk_id' in agg_key: __split_fm_terms_by_risk(level_df) # for line that had no term FMTermGroupID == 0, we store in FMTermGroupID the previous agg_id to keep the same granularity if 'is_step' in level_df: level_df.loc[(level_df['FMTermGroupID'] == 0) & (level_df['is_step'] == 1), 'FMTermGroupID'] = -level_df['agg_id_prev'] level_df['agg_id'] = factorize_ndarray(level_df.loc[:, factorize_key].values, col_idxs=range(len(factorize_key)))[0] # check rows in prev df that are this level granularity (if prev_agg_id has multiple corresponding agg_id) need_root_start_df = level_df.groupby("agg_id_prev")["agg_id"].nunique() need_root_start_df = need_root_start_df[need_root_start_df > 1].index # create new prev df for element that need to restart from items root_df = level_df[((level_df['agg_id_prev'].isin(need_root_start_df)) & (level_df['layer_id'] == 1))] root_df['to_agg_id'] = root_df['agg_id'] root_df['agg_id'] = -root_df['gul_input_id'] root_df.drop_duplicates(subset='agg_id', inplace=True) root_df['level_id'] = len(il_inputs_df_list) + 1 max_agg_id = np.max(level_df['agg_id']) prev_level_df['to_agg_id'] = (prev_level_df[['gul_input_id']] .merge(level_df.drop_duplicates(subset=['gul_input_id'])[['gul_input_id', 'agg_id']])['agg_id']) prev_level_df.loc[prev_level_df['to_agg_id'].isna(), 'to_agg_id'] = ( max_agg_id + factorize_ndarray(prev_level_df.loc[prev_level_df['to_agg_id'].isna(), ['agg_id']].values, col_idxs=range(len(['agg_id'])))[0]) prev_level_df.loc[prev_level_df['agg_id'].isin(need_root_start_df), 'to_agg_id'] = 0 prev_level_df['to_agg_id'] = prev_level_df['to_agg_id'].astype('int32') level_df.drop(columns=['agg_id_prev'], inplace=True) # we don't need coverage_type_id as an agg key as tiv already represent the sum of correct tiv values level_df = level_df.merge( level_df[['loc_id', 'building_id', 'agg_id', 'tiv']] .drop_duplicates()[['agg_id', 'tiv']] .groupby('agg_id', observed=True)['tiv'] .sum() .reset_index(name='agg_tiv'), how='left' ) __drop_duplicated_row(il_inputs_df_list[-1] if il_inputs_df_list else None, prev_level_df, prev_df_subset) il_inputs_df_list.append(pd.concat([df for df in [prev_level_df, root_df] if not df.empty])) level_df['level_id'] = len(il_inputs_df_list) + 1 level_df['orig_level_id'] = level_id sub_agg_key = [v['field'] for v in fm_aggregation_profile[level_id].get('FMSubAggKey', {}).values() if v['field'] in level_df.columns] prev_level_df = level_df prev_df_subset = ['agg_id'] + sub_agg_key + ['layer_id'] # create account aggregation if necessary if prev_level_df['orig_level_id'].max() < SUPPORTED_FM_LEVELS['policy layer']['id']: level = 'policy layer' level_id = SUPPORTED_FM_LEVELS[level]['id'] agg_key = [v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()] sub_agg_key = [v['field'] for v in fm_aggregation_profile[level_id].get('FMSubAggKey', {}).values() if v['field'] in prev_level_df.columns] need_account_aggregation = (prev_level_df[agg_key + sub_agg_key + ['layer_id', 'agg_id']] .drop_duplicates() .groupby(agg_key + sub_agg_key + ['layer_id'], observed=True) .size() .max() > 1) or set(SUMMARY_TOP_LEVEL_COLS).difference(set(prev_level_df.columns)) if need_account_aggregation: level_df = gul_inputs_df.merge(accounts_df[list(set(agg_key + sub_agg_key + ['layer_id']) .union(set(useful_cols).difference(set(gul_inputs_df.columns))) .intersection(accounts_df.columns))]) level_df['orig_level_id'] = level_id level_df['level_id'] = len(il_inputs_df_list) + 2 level_df['agg_id'] = factorize_ndarray(level_df.loc[:, agg_key].values, col_idxs=range(len(agg_key)))[0] prev_level_df['to_agg_id'] = (prev_level_df[['gul_input_id']] .merge(level_df.drop_duplicates(subset=['gul_input_id'])[['gul_input_id', 'agg_id']])['agg_id']) il_inputs_df_list.append(prev_level_df) prev_level_df = level_df prev_level_df['to_agg_id'] = 0 il_inputs_df_list.append(prev_level_df.drop_duplicates(subset=sub_agg_key + ['agg_id', 'layer_id'])) il_inputs_df = pd.concat(il_inputs_df_list) for col in set(list(il_inputs_df.columns)): try: il_inputs_df[col] = il_inputs_df[col].fillna(0) except (TypeError, ValueError): pass for col in fm_term_ids: if col not in il_inputs_df.columns: il_inputs_df[col] = 0 del prev_level_df del il_inputs_df_list gc.collect() # set top agg_id for later xref computation il_inputs_df['top_agg_id'] = factorize_ndarray(il_inputs_df.loc[:, agg_key].values, col_idxs=range(len(agg_key)))[0] # Final setting of data types before returning the IL input items dtypes = { **{t: 'float64' for t in ['tiv', 'agg_tiv', 'deductible', 'deductible_min', 'deductible_max', 'limit', 'attachment', 'share', 'deductible1', 'limit1', 'limit2', 'trigger_start', 'trigger_end', 'payout_start', 'payout_end', 'scale1', 'scale2']}, **{t: 'int32' for t in ['agg_id', 'to_agg_id', 'item_id', 'layer_id', 'level_id', 'orig_level_id', 'calcrule_id', 'profile_id', 'steptriggertype', 'step_id']}, # **{t: 'uint16' for t in [cond_num]}, **{t: 'uint8' for t in ['ded_code', 'ded_type', 'lim_code', 'lim_type', 'trigger_type', 'payout_type']} } il_inputs_df = set_dataframe_column_dtypes(il_inputs_df, dtypes) # Assign default values to IL inputs il_inputs_df = assign_defaults_to_il_inputs(il_inputs_df) # Apply rule to convert type 2 deductibles and limits to TIV shares il_inputs_df['deductible'] = np.where( il_inputs_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'], il_inputs_df['deductible'] * il_inputs_df['agg_tiv'], il_inputs_df['deductible'] ) il_inputs_df.loc[il_inputs_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'], 'ded_type'] = DEDUCTIBLE_AND_LIMIT_TYPES['flat']['id'] il_inputs_df['limit'] = np.where( il_inputs_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'], il_inputs_df['limit'] * il_inputs_df['agg_tiv'], il_inputs_df['limit'] ) il_inputs_df.loc[il_inputs_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'], 'lim_type'] = DEDUCTIBLE_AND_LIMIT_TYPES['flat']['id'] if step_policies_present: # Before assigning calc. rule IDs and policy TC IDs, the StepTriggerType # should be split into its sub-types in cases where the associated # coverages are covered separately # For example, StepTriggerType = 5 covers buildings and contents separately def assign_sub_step_trigger_type(row): try: step_trigger_type = STEP_TRIGGER_TYPES[row['steptriggertype']]['sub_step_trigger_types'][ row['coverage_type_id']] return step_trigger_type except KeyError: return row['steptriggertype'] il_inputs_df['StepTriggerType'] = il_inputs_df.apply( lambda row: assign_sub_step_trigger_type(row), axis=1 ) # Set the calc. rule IDs il_inputs_df.reset_index(drop=True, inplace=True) policy_layer_filter = il_inputs_df['orig_level_id'] == SUPPORTED_FM_LEVELS['policy layer']['id'] if step_policies_present: step_filter = (il_inputs_df['StepTriggerType'] > 0) base_filter = (~step_filter) & (~policy_layer_filter) policy_layer_filter = (~step_filter) & policy_layer_filter il_inputs_df.loc[step_filter, 'calcrule_id'] = get_calc_rule_ids(il_inputs_df[step_filter], calc_rule_type='step') else: base_filter = ~policy_layer_filter il_inputs_df.loc[base_filter, 'calcrule_id'] = get_calc_rule_ids(il_inputs_df[base_filter], calc_rule_type='base') il_inputs_df.loc[policy_layer_filter, 'calcrule_id'] = get_calc_rule_ids(il_inputs_df[policy_layer_filter], calc_rule_type='policy_layer') il_inputs_df['calcrule_id'] = il_inputs_df['calcrule_id'].astype('uint32') # Set the policy TC IDs if 'StepTriggerType' in il_inputs_df: il_inputs_df.loc[ ~(il_inputs_df['StepTriggerType'] > 0), 'profile_id' ] = get_profile_ids( il_inputs_df[~(il_inputs_df['StepTriggerType'] > 0)] ) il_inputs_df.loc[ il_inputs_df['StepTriggerType'] > 0, 'profile_id' ] = get_step_profile_ids( il_inputs_df[il_inputs_df['StepTriggerType'] > 0], offset=il_inputs_df['profile_id'].max(), idx_cols=['AccNumber', 'PolNumber', 'PortNumber'] ) else: il_inputs_df['profile_id'] = get_profile_ids(il_inputs_df) il_inputs_df['profile_id'] = il_inputs_df['profile_id'].astype('uint32') il_inputs_df = set_dataframe_column_dtypes(il_inputs_df, dtypes) return il_inputs_df
@oasis_log
[docs] def write_fm_policytc_file(il_inputs_df, fm_policytc_fp, chunksize=100000): """ Writes an FM policy T & C file. :param il_inputs_df: IL inputs dataframe :type il_inputs_df: pandas.DataFrame :param fm_policytc_fp: FM policy TC file path :type fm_policytc_fp: str :return: FM policy TC file path :rtype: str """ try: fm_policytc_df = il_inputs_df.loc[il_inputs_df['agg_id'] > 0, ['layer_id', 'level_id', 'agg_id', 'profile_id', 'orig_level_id']] fm_policytc_df.loc[fm_policytc_df['orig_level_id'].isin(cross_layer_level), 'layer_id'] = 1 # remove layer for cross layer level fm_policytc_df.drop(columns=['orig_level_id']).drop_duplicates().to_csv( path_or_buf=fm_policytc_fp, encoding='utf-8', mode=('w' if os.path.exists(fm_policytc_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_fm_policytc_file'", e) return fm_policytc_fp
@oasis_log
[docs] def write_fm_profile_file(il_inputs_df, fm_profile_fp, chunksize=100000): """ Writes an FM profile file. :param il_inputs_df: IL inputs dataframe :type il_inputs_df: pandas.DataFrame :param fm_profile_fp: FM profile file path :type fm_profile_fp: str :return: FM profile file path :rtype: str """ il_inputs_df = il_inputs_df[il_inputs_df['agg_id'] > 0] try: # Step policies exist if 'StepTriggerType' in il_inputs_df: fm_profile_df = il_inputs_df[list(set(il_inputs_df.columns).intersection(set(step_profile_cols)))] for col in step_profile_cols: if col not in fm_profile_df.columns: fm_profile_df[col] = 0 for non_step_name, step_name in profile_cols_map.items(): fm_profile_df.loc[ ~(il_inputs_df['StepTriggerType'] > 0), step_name ] = il_inputs_df.loc[ ~(il_inputs_df['StepTriggerType'] > 0), non_step_name ] fm_profile_df.fillna(0, inplace=True) fm_profile_df = fm_profile_df.drop_duplicates() # Ensure step_id is of int data type and set default value to 1 dtypes = {t: 'int64' if t == 'step_id' else 'float64' for t in profile_cols_map.values()} fm_profile_df = set_dataframe_column_dtypes(fm_profile_df, dtypes) fm_profile_df.loc[fm_profile_df['step_id'] == 0, 'step_id'] = 1 fm_profile_df.loc[:, step_profile_cols].to_csv( path_or_buf=fm_profile_fp, encoding='utf-8', mode=('w' if os.path.exists(fm_profile_fp) else 'a'), chunksize=chunksize, index=False ) # No step policies else: # make sure there is no step file in the folder cols = ['profile_id', 'calcrule_id', 'deductible', 'deductible_min', 'deductible_max', 'attachment', 'limit', 'share'] fm_profile_df = il_inputs_df.loc[:, cols] fm_profile_df.loc[:, cols[2:]] = fm_profile_df.loc[:, cols[2:]].round(7).values fm_profile_df.rename( columns={ 'deductible': 'deductible1', 'deductible_min': 'deductible2', 'deductible_max': 'deductible3', 'attachment': 'attachment1', 'limit': 'limit1', 'share': 'share1' }, inplace=True ) fm_profile_df = fm_profile_df.drop_duplicates() fm_profile_df = fm_profile_df.assign(share2=0.0, share3=0.0) cols = ['profile_id', 'calcrule_id', 'deductible1', 'deductible2', 'deductible3', 'attachment1', 'limit1', 'share1', 'share2', 'share3'] fm_profile_df.loc[:, cols].to_csv( path_or_buf=fm_profile_fp, encoding='utf-8', mode=('w' if os.path.exists(fm_profile_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_fm_profile_file'", e) return fm_profile_fp
@oasis_log
[docs] def write_fm_programme_file(il_inputs_df, fm_programme_fp, chunksize=100000): """ Writes an FM programme file. :param il_inputs_df: IL inputs dataframe :type il_inputs_df: pandas.DataFrame :param fm_programme_fp: FM programme file path :type fm_programme_fp: str :return: FM programme file path :rtype: str """ try: item_level = il_inputs_df.loc[(il_inputs_df['level_id'] == 1) & (il_inputs_df['agg_id'] > 0), ['gul_input_id', 'level_id', 'agg_id']] item_level.rename(columns={'gul_input_id': 'from_agg_id', 'agg_id': 'to_agg_id', }, inplace=True) item_level.drop_duplicates(keep='first', inplace=True) il_inputs_df = il_inputs_df[il_inputs_df['to_agg_id'] != 0] fm_programme_df = il_inputs_df[['agg_id', 'level_id', 'to_agg_id']] fm_programme_df['level_id'] += 1 fm_programme_df.rename(columns={'agg_id': 'from_agg_id'}, inplace=True) fm_programme_df.drop_duplicates(keep='first', inplace=True) fm_programme_df = pd.concat([item_level, fm_programme_df]) fm_programme_df.to_csv( path_or_buf=fm_programme_fp, encoding='utf-8', mode=('w' if os.path.exists(fm_programme_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_fm_programme_file'", e) return fm_programme_fp
@oasis_log
[docs] def write_fm_xref_file(il_inputs_df, fm_xref_fp, chunksize=100000): """ Writes an FM xref file. :param il_inputs_df: IL inputs dataframe :type il_inputs_df: pandas.DataFrame :param fm_xref_fp: FM xref file path :type fm_xref_fp: str :return: FM xref file path :rtype: str """ try: xref_df = get_xref_df(il_inputs_df)[['gul_input_id', 'layer_id']].rename(columns={'gul_input_id': 'agg_id'}) xref_df['output'] = xref_df.reset_index().index + 1 xref_df[['output', 'agg_id', 'layer_id']].to_csv( path_or_buf=fm_xref_fp, encoding='utf-8', mode=('w' if os.path.exists(fm_xref_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_fm_xref_file'", e) return fm_xref_fp
@oasis_log
[docs] def write_il_input_files( il_inputs_df, target_dir, oasis_files_prefixes=OASIS_FILES_PREFIXES['il'], chunksize=(2 * 10 ** 5) ): """ Writes standard Oasis IL input files to a target directory using a pre-generated dataframe of IL inputs dataframe. The files written are :: fm_policytc.csv fm_profile.csv fm_programme.csv fm_xref.csv :param il_inputs_df: IL inputs dataframe :type locations_df: pandas.DataFrame :param target_dir: Target directory in which to write the files :type target_dir: str :param oasis_files_prefixes: Oasis IL input file name prefixes :param oasis_files_prefixes: dict :param chunksize: The chunk size to use when writing out the input files :type chunksize: int :return: IL input files dict :rtype: dict """ # Clean the target directory path target_dir = as_path(target_dir, 'Target IL input files directory', is_dir=True, preexists=False) oasis_files_prefixes = copy.deepcopy(oasis_files_prefixes) # Set chunk size for writing the CSV files - default is the minimum of 100K # or the IL inputs frame size chunksize = chunksize or min(2 * 10 ** 5, len(il_inputs_df)) # A dict of IL input file names and file paths il_input_files = { fn: os.path.join(target_dir, '{}.csv'.format(oasis_files_prefixes[fn])) for fn in oasis_files_prefixes } this_module = sys.modules[__name__] # Write the files serially for fn in il_input_files: getattr(this_module, 'write_{}_file'.format(fn))(il_inputs_df.copy(deep=True), il_input_files[fn], chunksize) return il_input_files