Source code for oasislmf.preparation.il_inputs

__all__ = [
    'get_calc_rule_ids',
    'get_grouped_fm_profile_by_level_and_term_group',
    'get_grouped_fm_terms_by_level_and_term_group',
    'get_oed_hierarchy',
    'get_il_input_items',
]

import contextlib
import copy
import itertools
import os
import time
import warnings

import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
from ods_tools.oed import fill_empty, BLANK_VALUES

from oasislmf.preparation.summaries import get_useful_summary_cols
from oasislmf.pytools.common.data import (fm_policytc_headers, fm_policytc_dtype,
                                          fm_profile_headers, fm_profile_dtype,
                                          fm_profile_step_headers, fm_profile_step_dtype,
                                          fm_programme_headers, fm_programme_dtype,
                                          fm_xref_headers, fm_xref_dtype
                                          )
from oasislmf.utils.calc_rules import get_calc_rules
from oasislmf.utils.coverages import SUPPORTED_COVERAGE_TYPES
from oasislmf.utils.data import factorize_ndarray
from oasislmf.utils.defaults import (OASIS_FILES_PREFIXES,
                                     get_default_accounts_profile, get_default_exposure_profile,
                                     get_default_fm_aggregation_profile, SOURCE_IDX)
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.fm import (CALCRULE_ASSIGNMENT_METHODS, COVERAGE_AGGREGATION_METHODS,
                               DEDUCTIBLE_AND_LIMIT_TYPES, FM_LEVELS, FML_ACCALL, STEP_TRIGGER_TYPES,
                               SUPPORTED_FM_LEVELS, FM_TERMS, GROUPED_SUPPORTED_FM_LEVELS)
from oasislmf.utils.log import oasis_log
from oasislmf.utils.path import as_path
from oasislmf.utils.profiles import (get_default_step_policies_profile,
                                     get_grouped_fm_profile_by_level_and_term_group,
                                     get_grouped_fm_terms_by_level_and_term_group,
                                     get_oed_hierarchy)

pd.options.mode.chained_assignment = None
warnings.simplefilter(action='ignore', category=FutureWarning)

# convertion from np dtype to pandas dtype
fm_policytc_dtype = {col: dtype for col, (dtype, _) in fm_policytc_dtype.fields.items()}
fm_profile_dtype = {col: dtype for col, (dtype, _) in fm_profile_dtype.fields.items()}
fm_profile_step_dtype = {col: dtype for col, (dtype, _) in fm_profile_step_dtype.fields.items()}
fm_programme_dtype = {col: dtype for col, (dtype, _) in fm_programme_dtype.fields.items()}
fm_xref_dtype = {col: dtype for col, (dtype, _) in fm_xref_dtype.fields.items()}


# Define a list of all supported OED coverage types in the exposure
supp_cov_type_ids = [v['id'] for v in SUPPORTED_COVERAGE_TYPES.values()]

policytc_cols = ['profile_id', 'calcrule_id', 'deductible', 'deductible_min', 'deductible_max', 'attachment', 'limit', 'share']

profile_cols_map = {
    'deductible': 'deductible1',
    'deductible_min': 'deductible2',
    'deductible_max': 'deductible3',
    'attachment': 'attachment1',
    'limit': 'limit1',
    'share': 'share1'
}
cross_layer_level = {FML_ACCALL, }

risk_disaggregation_term = {'deductible', 'deductible_min', 'deductible_max', 'attachment', 'limit'}

fm_term_ids = [fm_term['id'] for fm_term in FM_TERMS.values()]


def prepare_ded_and_limit(level_df):
    simplify_no_terms = {
        'deductible': ['ded_type', 'ded_code'],
        'limit': ['lim_type', 'lim_code']
    }
    level_df['need_tiv'] = False
    for term, type_and_code in simplify_no_terms.items():
        if term in level_df.columns:
            for elm in type_and_code:
                if elm in level_df.columns:
                    level_df.loc[level_df[term] == 0, elm] = 0
                    if elm[-5:] == '_type':
                        level_df['need_tiv'] |= level_df[elm].isin({2, 3})  # 2 or 3 type means term depend on tiv
        else:
            level_df = level_df.drop(columns=type_and_code, errors='ignore')

    return level_df


[docs] def get_calc_rule_ids(il_inputs_calc_rules_df, calc_rule_type): """ merge selected il_inputs with the correct calc_rule table and return a pandas Series of calc. rule IDs Args: il_inputs_calc_rules_df (DataFrame): IL input items dataframe calc_rule_type (str): type of calc_rule to look for Returns: pandas Series of calc. rule IDs """ il_inputs_calc_rules_df = il_inputs_calc_rules_df.copy() calc_rules_df, calc_rule_term_info = get_calc_rules(calc_rule_type) calc_rules_df = calc_rules_df.drop(columns=['desc', 'id_key'], axis=1, errors='ignore') terms = [] terms_indicators = [] no_terms = { 'deductible': ['ded_type', 'ded_code'], 'limit': ['lim_type', 'lim_code'] } for term in calc_rule_term_info['terms']: if term in il_inputs_calc_rules_df.columns: terms.append(term) terms_indicators.append('{}_gt_0'.format(term)) else: calc_rules_df = calc_rules_df[calc_rules_df['{}_gt_0'.format(term)] == 0].drop(columns=['{}_gt_0'.format(term)]) for term in calc_rule_term_info['types_and_codes']: if term in il_inputs_calc_rules_df.columns: il_inputs_calc_rules_df[term] = il_inputs_calc_rules_df[term].fillna(0).astype('uint8') else: calc_rules_df = calc_rules_df[calc_rules_df[term] == 0].drop(columns=[term]) for term, type_and_code in no_terms.items(): if f'{term}_gt_0' in calc_rules_df.columns: for elm in type_and_code: if elm in il_inputs_calc_rules_df.columns: il_inputs_calc_rules_df.loc[il_inputs_calc_rules_df[term] == 0, elm] = 0 else: for elm in type_and_code: if elm in calc_rules_df.columns: calc_rules_df = calc_rules_df[calc_rules_df[elm] == 0].drop(columns=[elm]) il_inputs_calc_rules_df.loc[:, terms_indicators] = np.where(il_inputs_calc_rules_df[terms] > 0, 1, 0) merge_col = list(set(il_inputs_calc_rules_df.columns).intersection(calc_rules_df.columns).difference({'calcrule_id'})) if len(merge_col): calcrule_ids = ( il_inputs_calc_rules_df[merge_col].reset_index() .merge(calc_rules_df[merge_col + ['calcrule_id']].drop_duplicates(), how='left', on=merge_col) ).set_index('index')['calcrule_id'].fillna(0) else: return 100 # no term we return pass through if 0 in calcrule_ids.unique(): _cols = list(set(['PortNumber', 'AccNumber', 'LocNumber'] + merge_col).intersection(il_inputs_calc_rules_df.columns)) no_match_keys = il_inputs_calc_rules_df.loc[calcrule_ids == 0, _cols].drop_duplicates() err_msg = 'Calculation Rule mapping error, non-matching keys:\n{}'.format(no_match_keys) raise OasisException(err_msg) return calcrule_ids
def get_profile_ids(il_inputs_df): """ Returns a Numpy array of policy TC IDs from a table of IL input items :param il_inputs_df: IL input items dataframe :type il_inputs_df: pandas.DataFrame :return: Numpy array of policy TC IDs :rtype: numpy.ndarray """ factor_col = list(set(il_inputs_df.columns).intersection(policytc_cols).difference({'profile_id', })) return factorize_ndarray(il_inputs_df.loc[:, factor_col].values, col_idxs=range(len(factor_col)))[0] def __split_fm_terms_by_risk(df): """ adjust the financial term to the number of risks for example deductible is split into each individul building risk Args: df (DataFrame): the DataFrame an FM level """ for term in risk_disaggregation_term.intersection(set(df.columns)): if f'{term[:3]}_code' in df.columns: code_filter = df[f'{term[:3]}_code'] == 0 df.loc[code_filter, term] /= df.loc[code_filter, 'NumberOfRisks'] else: df[term] /= df['NumberOfRisks'] def get_cond_info(locations_df, accounts_df): pol_info = {} level_conds = {} extra_accounts = [] default_cond_tag = '0' if 'CondTag' in locations_df.columns: fill_empty(locations_df, 'CondTag', default_cond_tag) loc_condkey_df = locations_df.loc[locations_df['CondTag'] != default_cond_tag, ['PortNumber', 'AccNumber', 'CondTag']].drop_duplicates() else: loc_condkey_df = pd.DataFrame([], columns=['PortNumber', 'AccNumber', 'CondTag']) if 'CondTag' in accounts_df.columns: fill_empty(accounts_df, 'CondTag', default_cond_tag) acc_condkey_df = accounts_df.loc[accounts_df['CondTag'] != '', ['PortNumber', 'AccNumber', 'CondTag']].drop_duplicates() condkey_match_df = acc_condkey_df.merge(loc_condkey_df, how='outer', indicator=True) missing_condkey_df = condkey_match_df.loc[condkey_match_df['_merge'] == 'right_only', ['PortNumber', 'AccNumber', 'CondTag']] else: acc_condkey_df = pd.DataFrame([], columns=['PortNumber', 'AccNumber', 'CondTag']) missing_condkey_df = loc_condkey_df if missing_condkey_df.shape[0]: raise OasisException(f'Those condtag are present in locations but missing in the account file:\n{missing_condkey_df}') if acc_condkey_df.shape[0]: if 'CondTag' not in locations_df.columns: locations_df['CondTag'] = default_cond_tag # we get information about cond from accounts_df cond_tags = {} # information about each cond tag account_layer_exclusion = {} # for each account and layer, store info about cond class exclusion if 'CondPriority' in accounts_df.columns: fill_empty(accounts_df, 'CondPriority', 1) else: accounts_df['CondPriority'] = 1 if 'CondPeril' in accounts_df.columns: fill_empty(accounts_df, 'CondPeril', '') else: accounts_df['CondPeril'] = '' for acc_rec in accounts_df.to_dict(orient="records"): cond_tag_key = (acc_rec['PortNumber'], acc_rec['AccNumber'], acc_rec['CondTag']) cond_number_key = (acc_rec['PortNumber'], acc_rec['AccNumber'], acc_rec['CondTag'], acc_rec['CondNumber']) cond_tag = cond_tags.setdefault(cond_tag_key, {'CondPriority': acc_rec['CondPriority'] or 1, 'CondPeril': acc_rec['CondPeril']}) cond_tag.setdefault('layers', {})[acc_rec['layer_id']] = {'CondNumber': cond_number_key} exclusion_cond_tags = account_layer_exclusion.setdefault((acc_rec['PortNumber'], acc_rec['AccNumber']), {}).setdefault(acc_rec['layer_id'], set()) pol_info[(acc_rec['PortNumber'], acc_rec['AccNumber'], acc_rec['layer_id'])] = [acc_rec['PolNumber'], acc_rec['LayerNumber']] if acc_rec.get('CondClass') == 1: exclusion_cond_tags.add(acc_rec['CondTag']) # we get the list of loc for each cond_tag loc_conds = {} KEY_INDEX = 0 PRIORITY_INDEX = 1 for loc_rec in locations_df.to_dict(orient="records"): loc_key = (loc_rec['PortNumber'], loc_rec['AccNumber'], loc_rec['LocNumber']) cond_key = (loc_rec['PortNumber'], loc_rec['AccNumber'], loc_rec.get('CondTag', default_cond_tag)) if cond_key in cond_tags: cond_tag = cond_tags[cond_key] else: cond_tag = {'CondPriority': 1, 'layers': {}} cond_tags[cond_key] = cond_tag cond_location = cond_tag.setdefault('locations', set()) cond_location.add(loc_key) cond_tag_priority = cond_tag['CondPriority'] conds = loc_conds.setdefault(loc_key, []) for i, cond in enumerate(conds): if cond_tag_priority < cond[PRIORITY_INDEX]: conds.insert(i, (cond_key, cond_tag_priority)) break elif cond_tag_priority == cond[PRIORITY_INDEX] and cond_key != cond[KEY_INDEX]: raise OasisException(f"{cond_key} and {cond[KEY_INDEX]} have same priority in {loc_key}") else: conds.append((cond_key, cond_tag_priority)) # at first we just want condtag for each level for cond_key, cond_info in cond_tags.items(): port_num, acc_num, cond_tag = cond_key cond_level_start = 1 for loc_key in cond_info.get('locations', set()): for i, (cond_key_i, _) in enumerate(loc_conds[loc_key]): if cond_key_i == cond_key: cond_level_start = max(cond_level_start, i + 1) break cond_info['cond_level_start'] = cond_level_start for layer_id, exclusion_conds in account_layer_exclusion[(cond_key[0], cond_key[1])].items(): if layer_id not in cond_info['layers']: PolNumber, LayerNumber = pol_info[(port_num, acc_num, layer_id)] if exclusion_conds: extra_accounts.append({ 'PortNumber': port_num, 'AccNumber': acc_num, 'PolNumber': PolNumber, 'LayerNumber': LayerNumber, 'CondTag': cond_tag, 'layer_id': layer_id, 'CondNumber': 'FullFilter', 'CondDed6All': 1, 'CondDedType6All': 1, 'CondPeril': 'AA1', }) else: extra_accounts.append({ 'PortNumber': port_num, 'AccNumber': acc_num, 'PolNumber': PolNumber, 'LayerNumber': LayerNumber, 'CondTag': cond_tag, 'layer_id': layer_id, 'CondNumber': '', 'CondPeril': 'AA1', }) level_conds.setdefault(cond_level_start, set()).add(cond_key) return level_conds, extra_accounts def get_levels(locations_df, accounts_df): if locations_df is not None and not {"CondTag", "CondNumber"}.difference(accounts_df.columns): level_conds, extra_accounts = get_cond_info(locations_df, accounts_df) else: level_conds = [] for group_name, group_info in copy.deepcopy(GROUPED_SUPPORTED_FM_LEVELS).items(): if group_info['oed_source'] == 'location': if locations_df is not None: locations_df['layer_id'] = 1 yield locations_df, list(group_info['levels'].items()), group_info.get('fm_peril_field'), None elif group_info['oed_source'] == 'account': if group_name == 'cond' and level_conds: loc_conds_df = locations_df[['loc_id', 'PortNumber', 'AccNumber', 'CondTag']].drop_duplicates() for stage, cond_keys in level_conds.items(): cond_filter_df = (pd.DataFrame(cond_keys, columns=['PortNumber', 'AccNumber', 'CondTag']) .sort_values(by=['PortNumber', 'AccNumber', 'CondTag'])) loc_conds_df_filter = cond_filter_df[['PortNumber', 'AccNumber', 'CondTag']].drop_duplicates().merge(loc_conds_df, how='left') yield (cond_filter_df.merge(pd.concat([accounts_df, pd.DataFrame(extra_accounts)]), how='left'), group_info['levels'].items(), group_info['fm_peril_field'], loc_conds_df_filter[['loc_id', 'CondTag']]) else: yield accounts_df, group_info['levels'].items(), group_info.get('fm_peril_field'), None def get_level_term_info(term_df_source, level_column_mapper, level_id, step_level, fm_peril_field, oed_schema): level_terms = set() terms_maps = {} coverage_group_map = {} fm_group_tiv = {} non_zero_default = {} for ProfileElementName, term_info in level_column_mapper[level_id].items(): if term_info.get("FMTermType") == "tiv": continue default_value = oed_schema.get_default(ProfileElementName) if ProfileElementName not in term_df_source.columns: if default_value == 0 or default_value in BLANK_VALUES: continue else: non_zero_default[ProfileElementName] = [term_info['FMTermType'].lower(), default_value] continue else: fill_empty(term_df_source, ProfileElementName, default_value) if pd.isna(default_value): non_default_val = ~term_df_source[ProfileElementName].isna() else: non_default_val = (term_df_source[ProfileElementName] != default_value) if 'FMProfileStep' in term_info: profile_steps = term_info["FMProfileStep"] if isinstance(profile_steps, int): profile_steps = [profile_steps] valid_step_trigger_types = term_df_source.loc[(term_df_source['StepTriggerType'].isin(profile_steps)) & non_default_val, 'StepTriggerType'].unique() if len(valid_step_trigger_types): level_terms.add(term_info['FMTermType'].lower()) for step_trigger_type in valid_step_trigger_types: coverage_aggregation_method = STEP_TRIGGER_TYPES[step_trigger_type]['coverage_aggregation_method'] calcrule_assignment_method = STEP_TRIGGER_TYPES[step_trigger_type]['calcrule_assignment_method'] for coverage_type_id in supp_cov_type_ids: FMTermGroupID = COVERAGE_AGGREGATION_METHODS[coverage_aggregation_method].get(coverage_type_id) if FMTermGroupID is None: # step policy not supported for this coverage continue if (step_trigger_type, coverage_type_id) in coverage_group_map: if coverage_group_map[(step_trigger_type, coverage_type_id)] != FMTermGroupID: raise OasisException( f"multiple coverage_type_id {(step_trigger_type, coverage_type_id)} " f"assigned to the different FMTermGroupID " f"{(FMTermGroupID, coverage_group_map[(step_trigger_type, coverage_type_id)])}") else: coverage_group_map[(step_trigger_type, coverage_type_id)] = FMTermGroupID terms_map = terms_maps.setdefault((FMTermGroupID, step_trigger_type), {fm_peril_field: 'fm_peril'} if fm_peril_field else {}) if CALCRULE_ASSIGNMENT_METHODS[calcrule_assignment_method][FMTermGroupID]: terms_map[ProfileElementName] = term_info['FMTermType'].lower() else: if not (non_default_val).any(): continue level_terms.add(term_info['FMTermType'].lower()) coverage_type_ids = term_info.get("CoverageTypeID", supp_cov_type_ids) FMTermGroupID = term_info.get('FMTermGroupID', 1) if step_level: term_key = (FMTermGroupID, 0) else: term_key = FMTermGroupID if isinstance(coverage_type_ids, int): coverage_type_ids = [coverage_type_ids] fm_group_tiv[FMTermGroupID] = coverage_type_ids for coverage_type_id in coverage_type_ids: if step_level: coverage_group_key = (0, coverage_type_id) else: coverage_group_key = coverage_type_id if coverage_type_id in coverage_group_map: if coverage_group_map[coverage_group_key] != FMTermGroupID: raise OasisException( f"multiple coverage_type_id {coverage_group_key}" f"assigned to the different FMTermGroupID {(FMTermGroupID, coverage_group_map[coverage_group_key])}") else: coverage_group_map[coverage_group_key] = FMTermGroupID terms_maps.setdefault(term_key, {fm_peril_field: 'fm_peril'} if fm_peril_field else {})[ ProfileElementName] = term_info['FMTermType'].lower() if level_terms: for ProfileElementName, (term, default_value) in non_zero_default.items(): term_df_source[ProfileElementName] = default_value level_terms.add(term) for terms_map in terms_maps.values(): terms_map[ProfileElementName] = term return level_terms, terms_maps, coverage_group_map, fm_group_tiv def associate_items_peril_to_policy_peril(item_perils, policy_df, fm_peril_col, oed_schema): """ for each peril_id in item_peril_list we map it to each string representing policy perils we then merge this mapping to the initial policies so that each line will have a peril_id that can be use directly as key when merging with the gul_input_df Args: item_perils: all peril id from gul_input_df policy_df: the df with the policies fm_peril_col: the name of the column to use that define the policy peril filter oed_schema: the schema object we use to map peril and subperils Returns: """ fm_perils = policy_df[[fm_peril_col]].drop_duplicates() peril_map_df = pd.merge(fm_perils, item_perils, how='cross') peril_map_df = peril_map_df[oed_schema.peril_filtering(peril_map_df['peril_id'], peril_map_df[fm_peril_col])] return policy_df.merge(peril_map_df) @oasis_log
[docs] def get_il_input_items( gul_inputs_df, exposure_data, target_dir, logger, exposure_profile=get_default_exposure_profile(), accounts_profile=get_default_accounts_profile(), fm_aggregation_profile=get_default_fm_aggregation_profile(), do_disaggregation=True, oasis_files_prefixes=OASIS_FILES_PREFIXES['il'], chunksize=(2 * 10 ** 5), ): """ Generates and returns a Pandas dataframe of IL input items. :param gul_inputs_df: GUL input items :type gul_inputs_df: pandas.DataFrame :param exposure_data: object containing all information about the insurance policies :param target_dir: path to the directory used to write the fm files :param logger: logger object to trace progress :param exposure_profile: Source exposure profile (optional) :type exposure_profile: dict :param accounts_profile: Source accounts profile (optional) :type accounts_profile: dict :param fm_aggregation_profile: FM aggregation profile (optional) :param fm_aggregation_profile: dict :param do_disaggregation: whether to split terms and conditions for aggregate exposure (optional) :param do_disaggregation: bool :return: IL inputs dataframe :rtype: pandas.DataFrame :return Accounts dataframe :rtype: pandas.DataFrame """ target_dir = as_path(target_dir, 'Target IL input files directory', is_dir=True, preexists=False) with contextlib.ExitStack() as stack: fm_policytc_file = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_policytc']}.csv"), 'w')) fm_policytc_file.write(f"{','.join(fm_policytc_headers)}{os.linesep}") fm_profile_file = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_profile']}.csv"), 'w')) fm_programme_file = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_programme']}.csv"), 'w')) fm_programme_file.write(f"{','.join(fm_programme_headers)}{os.linesep}") fm_xref_file = stack.enter_context(open(os.path.join(target_dir, f"{oasis_files_prefixes['fm_xref']}.csv"), 'w')) locations_df = exposure_data.location.dataframe if exposure_data.location is not None else None accounts_df = exposure_data.account.dataframe oed_schema = exposure_data.oed_schema profile = get_grouped_fm_profile_by_level_and_term_group(exposure_profile, accounts_profile) # Get the FM aggregation profile - this describes how the IL input # items are to be aggregated in the various FM levels fm_aggregation_profile = copy.deepcopy(fm_aggregation_profile) if not fm_aggregation_profile: raise OasisException( 'FM aggregation profile is empty - this is required to perform aggregation' ) # Get the OED hierarchy terms profile - this defines the column names for loc. # ID, acc. ID, policy no. and portfolio no., as used in the source exposure # and accounts files. This is to ensure that the method never makes hard # coded references to the corresponding columns in the source files, as # that would mean that changes to these column names in the source files # may break the method oed_hierarchy = get_oed_hierarchy(exposure_profile, accounts_profile) tiv_terms = {v['tiv']['ProfileElementName']: str(v['tiv']['CoverageTypeID']) for k, v in profile[FM_LEVELS['site coverage']['id']].items()} useful_cols = sorted(set(['layer_id', 'orig_level_id', 'level_id', 'agg_id', 'gul_input_id', 'tiv', 'NumberOfRisks'] + get_useful_summary_cols(oed_hierarchy)).union(tiv_terms) - {'profile_id', 'item_id', 'output_id'}, key=str.lower) gul_inputs_df = gul_inputs_df.rename(columns={'item_id': 'gul_input_id'}) # adjust tiv columns and name them as their coverage id gul_inputs_df[['risk_id', 'NumberOfRisks']] = gul_inputs_df[['building_id', 'NumberOfBuildings']] gul_inputs_df.loc[gul_inputs_df['IsAggregate'] == 0, 'risk_id'] = 1 gul_inputs_df.loc[gul_inputs_df['IsAggregate'] == 0, 'NumberOfRisks'] = 1 gul_inputs_df.loc[gul_inputs_df['NumberOfRisks'] == 0, 'NumberOfRisks'] = 1 # initialization agg_keys = set() for level_id in fm_aggregation_profile: agg_keys = agg_keys.union(set([v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()])) present_cols = [col for col in gul_inputs_df.columns if col in set(useful_cols).union(agg_keys).union(fm_term_ids)] gul_inputs_df = gul_inputs_df[present_cols].rename(columns=tiv_terms) # Profile dict are base on key that correspond to the fm term name. # this prevents multiple file columns to point to the same fm term # which is necessary to have a generic logic that works with step policy # so we change the key to be the column and use FMTermType to store the term name level_column_mapper = {} for level_id, level_profile in profile.items(): column_map = {} level_column_mapper[level_id] = column_map # for fm we only use term_id 1 for term_name, term_info in itertools.chain.from_iterable(profile.items() for profile in level_profile.values()): new_term_info = copy.deepcopy(term_info) new_term_info['FMTermType'] = term_name column_map[term_info['ProfileElementName']] = new_term_info gul_inputs_df = gul_inputs_df.drop(columns=fm_term_ids, errors='ignore').reset_index(drop=True) gul_inputs_df['agg_id_prev'] = gul_inputs_df['gul_input_id'] gul_inputs_df['layer_id'] = 1 gul_inputs_df['PolNumber'] = pd.NA gul_inputs_df = gul_inputs_df.rename(columns={}) item_perils = gul_inputs_df[['peril_id']].drop_duplicates() # Determine whether step policies are listed, are not full of nans and step # numbers are greater than zero step_policies_present = False extra_fm_col = ['layer_id', 'PolNumber', 'NumberOfBuildings', 'IsAggregate'] if 'StepTriggerType' in accounts_df and 'StepNumber' in accounts_df: fill_empty(accounts_df, ['StepTriggerType', 'StepNumber'], 0) step_account = (accounts_df[accounts_df[accounts_df['StepTriggerType'].notnull()]['StepNumber'].gt(0)][['PortNumber', 'AccNumber']] .drop_duplicates()) step_policies_present = bool(step_account.shape[0]) # this is done only for fmcalc to make sure it program nodes stay as a tree, remove 'is_step' logic when fmcalc is dropped if step_policies_present: step_account['is_step'] = 1 gul_inputs_df = gul_inputs_df.merge(step_account, how='left') gul_inputs_df['is_step'] = gul_inputs_df['is_step'].fillna(0).astype('int8') extra_fm_col.append('is_step') # If step policies listed, keep step trigger type and columns associated # with those step trigger types that are present if step_policies_present: # we happend the fm step policy term to policy layer step_policy_level_map = level_column_mapper[SUPPORTED_FM_LEVELS['policy all']['id']] for col in ['StepTriggerType', 'cov_agg_id', 'assign_step_calcrule']: step_policy_level_map[col] = { 'ProfileElementName': col, 'FMTermType': col, } for key, step_term in get_default_step_policies_profile().items(): step_policy_level_map[step_term['Key']] = { 'ProfileElementName': step_term['Key'], 'FMTermType': step_term['FMProfileField'], 'FMProfileStep': step_term.get('FMProfileStep') } fm_profile_cols = fm_profile_step_headers else: fm_profile_cols = fm_profile_headers fm_profile_file.write(f"{','.join(fm_profile_cols)}{os.linesep}") pass_through_profile = pd.DataFrame({col: [0] for col in fm_profile_cols}) pass_through_profile['profile_id'] = 1 pass_through_profile['calcrule_id'] = 100 write_fm_profile_level(pass_through_profile, fm_profile_file, step_policies_present, chunksize) profile_id_offset = 1 # profile_id 1 is the passthrough policy 100 cur_level_id = 0 for term_df_source, levels, fm_peril_field, CondTag_merger in get_levels(locations_df, accounts_df): t0 = time.time() if CondTag_merger is not None: gul_inputs_df = gul_inputs_df.drop(columns='CondTag', errors='ignore').merge(CondTag_merger, how='left') for level, level_info in levels: level_id = level_info['id'] is_policy_layer_level = level_id == SUPPORTED_FM_LEVELS['policy layer']['id'] step_level = 'StepTriggerType' in level_column_mapper[level_id] # only true is step policy are present level_terms, terms_maps, coverage_group_map, fm_group_tiv = get_level_term_info( term_df_source, level_column_mapper, level_id, step_level, fm_peril_field, oed_schema) agg_key = [v['field'] for v in fm_aggregation_profile[level_id]['FMAggKey'].values()] if not terms_maps: # no terms we skip this level if is_policy_layer_level: # for policy layer we group all to make sure we can have a0 ALLOCATION_RULE cur_level_id += 1 write_empty_policy_layer(gul_inputs_df, cur_level_id, agg_key, fm_policytc_file, fm_programme_file, chunksize) logger.info(f"level {cur_level_id} {level_info} took {time.time() - t0}") t0 = time.time() continue # reset non layered agg_id layered_agg_id = gul_inputs_df[gul_inputs_df['layer_id'] > 1]["agg_id_prev"].unique() gul_inputs_df['layer_id'] = gul_inputs_df['layer_id'].where(gul_inputs_df['agg_id_prev'].isin(layered_agg_id), 0) # get all rows with terms in term_df_source and determine the correct FMTermGroupID level_df_list = [] valid_term_default = {} for group_key, terms in terms_maps.items(): if step_level: FMTermGroupID, step_trigger_type = group_key group_df = term_df_source[term_df_source['StepTriggerType'] == step_trigger_type] terms = {**terms_maps.get((1, 0), {}), **terms} # take all common terms in (1,0) plus term with step_trigger_type filter else: FMTermGroupID = group_key group_df = term_df_source group_df = (group_df[list(set(agg_key + list(terms.keys()) + extra_fm_col) .union(set(useful_cols).difference(set(gul_inputs_df.columns))) .intersection(group_df.columns))] .assign(FMTermGroupID=FMTermGroupID)) # only keep policy with non default values, remove duplicate numeric_terms = [term for term in terms.keys() if is_numeric_dtype(group_df[term])] term_filter = False for term in numeric_terms: if pd.isna(oed_schema.get_default(term)): term_filter |= ~group_df[term].isna() valid_term_default[terms[term]] = 0 else: term_filter |= (group_df[term] != oed_schema.get_default(term)) valid_term_default[terms[term]] = oed_schema.get_default(term) term_filter |= (group_df[term] != oed_schema.get_default(term)) keep_df = group_df[term_filter][list( set(agg_key).intersection(group_df.columns))].drop_duplicates() group_df = group_df.merge(keep_df, how='inner') # multiple ProfileElementName can have the same fm terms (ex: StepTriggerType 5), we take the max to have a unique one for ProfileElementName, term in terms.items(): if term in group_df: group_df[term] = group_df[[term, ProfileElementName]].max(axis=1) group_df.drop(columns=ProfileElementName, inplace=True) else: group_df.rename(columns={ProfileElementName: term}, inplace=True) level_df_list.append(group_df) level_df = pd.concat(level_df_list, copy=True, ignore_index=True) base_acc = level_df['AccNumber'] for term, default in valid_term_default.items(): level_df[term] = level_df[term].fillna(default) if do_disaggregation and 'risk_id' in agg_key: level_df['NumberOfRisks'] = level_df['NumberOfBuildings'].mask(level_df['IsAggregate'] == 0, 1) __split_fm_terms_by_risk(level_df) level_df = level_df.drop(columns=['NumberOfBuildings', 'IsAggregate', 'NumberOfRisks']) level_df = level_df.drop_duplicates(subset=set(level_df.columns) - set(SOURCE_IDX.values())) agg_id_merge_col = agg_key + ['FMTermGroupID'] agg_id_merge_col_extra = ['need_tiv'] if step_level: # merge with gul_inputs_df needs to be based on 'steptriggertype' and 'coverage_type_id' coverage_type_id_df = pd.DataFrame( [[StepTriggerType, coverage_type_id, FMTermGroupID] for (StepTriggerType, coverage_type_id), FMTermGroupID in coverage_group_map.items()], columns=['steptriggertype', 'coverage_type_id', 'FMTermGroupID']) level_df = level_df.merge(coverage_type_id_df) level_df["is_step"] = level_df['steptriggertype'] > 0 agg_id_merge_col_extra.append('coverage_type_id') else: # map the coverage_type_id to the correct FMTermGroupID for this level. coverage_type_id without term and therefor FMTermGroupID are map to 0 gul_inputs_df['FMTermGroupID'] = (gul_inputs_df['coverage_type_id'] .map(coverage_group_map, na_action='ignore') .astype('Int32')) # check that peril_id is part of the fm peril policy, if not we remove the terms if 'fm_peril' in level_df: level_df = associate_items_peril_to_policy_peril(item_perils, level_df, 'fm_peril', oed_schema) factorize_key = agg_key + ['FMTermGroupID', 'fm_peril'] agg_id_merge_col += ['fm_peril'] agg_id_merge_col_extra.append('peril_id') else: factorize_key = agg_key + ['FMTermGroupID'] if level_df.empty: # No actual terms for this level if is_policy_layer_level: # for policy layer we group all to make sure we can have a0 ALLOCATION_RULE cur_level_id += 1 write_empty_policy_layer(gul_inputs_df, cur_level_id, agg_key, fm_policytc_file, fm_programme_file, chunksize) logger.info(f"level {cur_level_id} {level_info} took {time.time() - t0}") t0 = time.time() continue level_df = prepare_ded_and_limit(level_df) agg_id_merge_col = list(set(agg_id_merge_col).intersection(level_df.columns)) gul_inputs_df = gul_inputs_df.merge( level_df[agg_id_merge_col + agg_id_merge_col_extra].drop_duplicates(), how='left', validate='many_to_one') if is_policy_layer_level: # we merge all on account at this level even if there is no policy gul_inputs_df["FMTermGroupID"] = gul_inputs_df["FMTermGroupID"].fillna(-1).astype('i4') else: gul_inputs_df["FMTermGroupID"] = gul_inputs_df["FMTermGroupID"].mask( gul_inputs_df["need_tiv"].isna(), -gul_inputs_df["agg_id_prev"]).astype('i4') gul_inputs_df["agg_id"] = factorize_ndarray(gul_inputs_df.loc[:, factorize_key].values, col_idxs=range(len(factorize_key)))[0] # merge Tiv in level_df when needed # make sure correct tiv sum exist tiv_df_list = [] for FMTermGroupID, coverage_type_ids in fm_group_tiv.items(): tiv_key = '_'.join(map(str, sorted(coverage_type_ids))) if tiv_key not in gul_inputs_df: gul_inputs_df[tiv_key] = gul_inputs_df[list( set(gul_inputs_df.columns).intersection(map(str, sorted(coverage_type_ids))))].sum(axis=1) tiv_df_list.append(gul_inputs_df[(gul_inputs_df["need_tiv"] == True) & (gul_inputs_df["FMTermGroupID"] == FMTermGroupID)] .drop_duplicates(subset=['agg_id', 'loc_id', 'building_id']) .rename(columns={tiv_key: 'agg_tiv'}) .groupby("agg_id", observed=True) .agg({**{col: 'first' for col in agg_id_merge_col}, **{'agg_tiv': 'sum'}}) ) tiv_df = pd.concat(tiv_df_list) gul_inputs_df = gul_inputs_df.merge(tiv_df['agg_tiv'], left_on='agg_id', right_index=True, how='left') gul_inputs_df['agg_tiv'] = gul_inputs_df['agg_tiv'].fillna(0) level_df = level_df.merge(tiv_df.drop_duplicates(), how='left').drop(columns=['need_tiv']) level_df['agg_tiv'] = level_df['agg_tiv'].fillna(0) # Apply rule to convert type 2 deductibles and limits to TIV shares if 'deductible' in level_df.columns and 'ded_type' in level_df.columns: level_df['deductible'] = level_df['deductible'].fillna(0.) level_df['ded_type'] = level_df['ded_type'].infer_objects(copy=False).fillna(0.).astype('i4') level_df['deductible'] = np.where( level_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'], level_df['deductible'] * level_df['agg_tiv'], level_df['deductible'] ) level_df.loc[level_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv'] ['id'], 'ded_type'] = DEDUCTIBLE_AND_LIMIT_TYPES['flat']['id'] type_filter = level_df['ded_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['bi']['id'] level_df.loc[type_filter, 'deductible'] = level_df.loc[type_filter, 'deductible'] * level_df.loc[type_filter, 'agg_tiv'] / 365. if 'limit' in level_df.columns and 'lim_type' in level_df.columns: level_df['limit'] = level_df['limit'].fillna(0.) level_df['lim_type'] = level_df['lim_type'].infer_objects(copy=False).fillna(0.).astype('i4') level_df['limit'] = np.where( level_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv']['id'], level_df['limit'] * level_df['agg_tiv'], level_df['limit'] ) level_df.loc[level_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['pctiv'] ['id'], 'lim_type'] = DEDUCTIBLE_AND_LIMIT_TYPES['flat']['id'] type_filter = level_df['lim_type'] == DEDUCTIBLE_AND_LIMIT_TYPES['bi']['id'] level_df.loc[type_filter, 'limit'] = level_df.loc[type_filter, 'limit'] * level_df.loc[type_filter, 'agg_tiv'] / 365. if level_id == SUPPORTED_FM_LEVELS['policy layer']['id']: level_df['calcrule_id'] = get_calc_rule_ids(level_df, calc_rule_type='policy_layer') level_df['profile_id'] = get_profile_ids(level_df) + profile_id_offset profile_id_offset = level_df['profile_id'].max() if not level_df.empty else profile_id_offset elif "is_step" in level_df.columns: step_filter = level_df["is_step"] # Before assigning calc. rule IDs and policy TC IDs, the StepTriggerType # should be split into its sub-types in cases where the associated # coverages are covered separately # For example, StepTriggerType = 5 covers buildings and contents separately def assign_sub_step_trigger_type(row): try: step_trigger_type = STEP_TRIGGER_TYPES[row['steptriggertype']]['sub_step_trigger_types'][ row['coverage_type_id']] return step_trigger_type except KeyError: return row['steptriggertype'] level_df.loc[step_filter, 'steptriggertype'] = level_df[step_filter].apply( lambda row: assign_sub_step_trigger_type(row), axis=1 ) final_step_filter = level_df['steptriggertype'] > 0 # step part level_df.loc[final_step_filter, 'calcrule_id'] = get_calc_rule_ids(level_df[final_step_filter], calc_rule_type='step') has_step = final_step_filter & (~level_df["step_id"].isna()) level_df.loc[has_step, 'profile_id'] = factorize_ndarray(level_df.loc[has_step, ['layer_id'] + factorize_key].values, col_idxs=range(len(['layer_id'] + factorize_key)))[0] + profile_id_offset profile_id_offset = level_df.loc[has_step, 'profile_id'].max() if has_step.any() else profile_id_offset # non step part level_df.loc[~final_step_filter, 'calcrule_id'] = get_calc_rule_ids(level_df[~final_step_filter], calc_rule_type='base') level_df.loc[~has_step, 'profile_id'] = (get_profile_ids(level_df[~has_step]) + profile_id_offset) profile_id_offset = level_df.loc[~has_step, 'profile_id'].max() if (~has_step).any() else profile_id_offset else: level_df['calcrule_id'] = get_calc_rule_ids(level_df, calc_rule_type='base') level_df['profile_id'] = get_profile_ids(level_df) + profile_id_offset profile_id_offset = level_df['profile_id'].max() if not level_df.empty else profile_id_offset write_fm_profile_level(level_df, fm_profile_file, step_policies_present, chunksize=chunksize) # ============================================================= # we have prepared FMTermGroupID on gul or level df (depending on step_level) now we can merge the terms for this level to gul merge_col = list(set(level_df.columns).intersection(set(gul_inputs_df))) level_df = level_df[merge_col + ['profile_id']].drop_duplicates() # gul_inputs that don't have layer yet must not be merge using layer_id non_layered_inputs_df = ( gul_inputs_df[gul_inputs_df['layer_id'] == 0] .drop(columns=['layer_id']) .rename(columns={'PolNumber': 'PolNumber_temp'}) .merge(level_df, how='left') ) if 'PolNumber' in level_df.columns: # gul_inputs['PolNumber'] is set to level_df['PolNumber'] if empty or if this is the first time layer appear new_layered_gul_input_id = non_layered_inputs_df[non_layered_inputs_df['layer_id'] > 1]['gul_input_id'].unique() non_layered_inputs_df['PolNumber'] = non_layered_inputs_df['PolNumber'].where( (non_layered_inputs_df['gul_input_id'].isin(new_layered_gul_input_id)) | (non_layered_inputs_df['PolNumber_temp'].isna()), non_layered_inputs_df['PolNumber_temp'] ) non_layered_inputs_df = non_layered_inputs_df.drop(columns=['PolNumber_temp']) else: non_layered_inputs_df = non_layered_inputs_df.rename(columns={'PolNumber_temp': 'PolNumber'}) layered_inputs_df = gul_inputs_df[gul_inputs_df['layer_id'] > 0].merge(level_df, how='left') # drop premature layering (no difference of policy between layers) if not is_policy_layer_level and level_id not in cross_layer_level: non_layered_inputs_df['layered_id'] = (non_layered_inputs_df['layer_id'] .where(non_layered_inputs_df['agg_id'].isin(layered_inputs_df['agg_id']), 0)) non_layered_inputs_df = (non_layered_inputs_df .drop_duplicates(subset=['gul_input_id', 'agg_id', 'profile_id', 'layered_id']) .drop(columns=['layered_id'])) gul_inputs_df = pd.concat(df for df in [layered_inputs_df, non_layered_inputs_df] if not df.empty) gul_inputs_df['layer_id'] = gul_inputs_df['layer_id'].fillna(1).astype('i4') gul_inputs_df.sort_values(by=['gul_input_id', 'layer_id']) gul_inputs_df["profile_id"] = gul_inputs_df["profile_id"].fillna(1).astype('i4') # check rows in prev df that are this level granularity (if prev_agg_id has multiple corresponding agg_id) need_root_start_df = gul_inputs_df.groupby("agg_id_prev", observed=True)["agg_id"].nunique() need_root_start_df = need_root_start_df[need_root_start_df > 1].index gul_inputs_df = gul_inputs_df.drop(columns=["root_start"], errors='ignore') # clean up if need_root_start_df.shape[0]: gul_inputs_df["root_start"] = gul_inputs_df["agg_id"].isin( set(gul_inputs_df.loc[gul_inputs_df["agg_id_prev"].isin(need_root_start_df), 'agg_id']) ) cur_level_id += 1 gul_inputs_df['level_id'] = cur_level_id # write fm_policytc if level_id in cross_layer_level: fm_policytc_df = gul_inputs_df.loc[gul_inputs_df['layer_id'] == 1, fm_policytc_headers] else: fm_policytc_df = gul_inputs_df.loc[:, fm_policytc_headers] fm_policytc_df.drop_duplicates().astype(fm_policytc_dtype).to_csv(fm_policytc_file, index=False, header=False, chunksize=chunksize) # write programe fm_programe_df = gul_inputs_df[['level_id', 'agg_id']].rename(columns={'agg_id': 'to_agg_id'}) if "root_start" in gul_inputs_df: fm_programe_df['from_agg_id'] = gul_inputs_df['agg_id_prev'].where(~gul_inputs_df["root_start"], -gul_inputs_df['gul_input_id']) else: fm_programe_df['from_agg_id'] = gul_inputs_df['agg_id_prev'] (fm_programe_df[fm_programme_headers].drop_duplicates().astype(fm_programme_dtype) .to_csv(fm_programme_file, index=False, header=False, chunksize=chunksize)) # reset gul_inputs_df level columns gul_inputs_df = (gul_inputs_df .drop(columns=["root_start", "agg_id_prev", "is_step", "FMTermGroupID", "profile_id", "level_id", 'fm_peril', 'need_tiv', 'agg_tiv'], errors='ignore') .rename(columns={"agg_id": "agg_id_prev"})) logger.info(f"level {cur_level_id} {level_info} took {time.time()-t0}") t0 = time.time() gul_inputs_df = gul_inputs_df.sort_values(['gul_input_id', 'layer_id'], kind='stable') gul_inputs_df['output_id'] = gul_inputs_df.reset_index().index + 1 default_pol_agg_key = [v['field'] for v in fm_aggregation_profile[SUPPORTED_FM_LEVELS['policy layer']['id']]['FMAggKey'].values()] no_polnumber_df = gul_inputs_df.loc[gul_inputs_df['PolNumber'].isna(), default_pol_agg_key + ['output_id']] if not no_polnumber_df.empty: # empty polnumber, we use accounts_df to set PolNumber based on policy layer agg_key gul_inputs_df.loc[gul_inputs_df['PolNumber'].isna(), 'PolNumber'] = no_polnumber_df.merge( accounts_df[default_pol_agg_key + ['PolNumber']].drop_duplicates(subset=default_pol_agg_key) ).set_index(no_polnumber_df['output_id'] - 1)['PolNumber'] # write fm_xref (gul_inputs_df .rename(columns={'gul_input_id': 'agg_id', 'output_id': 'output'})[fm_xref_headers] .astype(fm_xref_dtype) .to_csv(fm_xref_file, index=False, header=True, chunksize=chunksize)) # merge acc_idx acc_idx_col = list(set(gul_inputs_df.columns).intersection(accounts_df.columns)) gul_inputs_df_col = list(gul_inputs_df.columns) gul_inputs_df['acc_idx'] = -1 if 'CondTag' in acc_idx_col: gul_inputs_df = ( gul_inputs_df .merge(accounts_df[acc_idx_col + ['acc_idx']].drop_duplicates(subset=acc_idx_col), how='left', validate='many_to_one')) acc_idx_col.remove('CondTag') gul_inputs_df.loc[gul_inputs_df['acc_idx'].isna(), 'acc_idx'] = (gul_inputs_df.loc[gul_inputs_df['acc_idx'].isna(), gul_inputs_df_col].reset_index() .merge(accounts_df[acc_idx_col + ['acc_idx']].drop_duplicates(subset=acc_idx_col), how='left', validate='many_to_one').set_index('index')['acc_idx']) else: gul_inputs_df = ( gul_inputs_df .merge(accounts_df[acc_idx_col + ['acc_idx']].drop_duplicates(subset=acc_idx_col), how='left', validate='many_to_one')) return gul_inputs_df
def write_empty_policy_layer(gul_inputs_df, cur_level_id, agg_key, fm_policytc_file, fm_programme_file, chunksize): base_fm_df = gul_inputs_df[agg_key + ['layer_id', 'agg_id_prev']] base_fm_df["agg_id"] = factorize_ndarray(gul_inputs_df.loc[:, agg_key].values, col_idxs=range(len(agg_key)))[0] base_fm_df["profile_id"] = 1 base_fm_df["level_id"] = cur_level_id fm_policytc_df = base_fm_df.loc[:, fm_policytc_headers] fm_policytc_df.drop_duplicates().astype(fm_policytc_dtype).to_csv(fm_policytc_file, index=False, header=False, chunksize=chunksize) fm_programe_df = base_fm_df[['level_id', 'agg_id']].rename(columns={'agg_id': 'to_agg_id'}) fm_programe_df['from_agg_id'] = base_fm_df['agg_id_prev'] fm_programe_df[fm_programme_headers].drop_duplicates().astype(fm_programme_dtype).to_csv(fm_programme_file, index=False, header=False, chunksize=chunksize) def write_fm_profile_level(level_df, fm_profile_file, step_policies_present, chunksize=100000): """ Writes an FM profile file. :param level_df: level_df (fm terms) dataframe :type level_df: pandas.DataFrame :param fm_profile_file: open file to write to :type fm_profile_file: fileObj :param step_policies_present: flag to know which type of file to write :type step_policies_present: bool :param chunksize: chunksize :type chunksize: int :return: FM profile file path :rtype: str """ level_df = level_df.astype({'calcrule_id': 'i4', 'profile_id': 'i4'}) # Step policies exist if step_policies_present: fm_profile_df = level_df[list(set(level_df.columns).intersection(set(fm_profile_step_headers + ['steptriggertype'])))].copy() for col in fm_profile_step_headers + ['steptriggertype']: if col not in fm_profile_df.columns: fm_profile_df[col] = 0 for non_step_name, step_name in profile_cols_map.items(): if step_name not in fm_profile_df.columns: fm_profile_df[step_name] = 0 fm_profile_df[step_name] = fm_profile_df[step_name].astype(object) if non_step_name in level_df.columns: fm_profile_df.loc[ ~(fm_profile_df['steptriggertype'] > 0), step_name ] = level_df.loc[ ~(fm_profile_df['steptriggertype'] > 0), non_step_name ] fm_profile_df.fillna(0, inplace=True) fm_profile_df = fm_profile_df.drop_duplicates() # Ensure step_id is of int data type and set default value to 1 fm_profile_df = fm_profile_df.astype(fm_profile_step_dtype) fm_profile_df.loc[fm_profile_df['step_id'] == 0, 'step_id'] = 1 fm_profile_df = fm_profile_df[fm_profile_step_headers].sort_values(by=["profile_id", 'step_id']).drop_duplicates() # No step policies else: # make sure there is no step file in the folder fm_profile_df = level_df[list(set(level_df.columns).intersection(set(policytc_cols)))].copy() for col in policytc_cols[2:]: if col not in fm_profile_df.columns: fm_profile_df[col] = 0. fm_profile_df = ( fm_profile_df .rename(columns=profile_cols_map) .drop_duplicates() .assign(share2=0.0, share3=0.0) .astype(fm_profile_dtype)[fm_profile_headers] ) try: fm_profile_df.to_csv( fm_profile_file, index=False, header=False, chunksize=chunksize, ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_fm_profile_file'", e)