Source code for oasislmf.computation.run.exposure

__all__ = [
    'RunExposure',
    'RunFmTest',
]

import csv
import os
import shutil
import tempfile
from itertools import chain

import pandas as pd

from oasislmf.computation.base import ComputationStep
from oasislmf.computation.generate.files import GenerateFiles
from oasislmf.computation.generate.keys import GenerateKeysDeterministic
from oasislmf.computation.generate.losses import GenerateLossesDeterministic
from oasislmf.preparation.il_inputs import get_oed_hierarchy
from oasislmf.utils.coverages import SUPPORTED_COVERAGE_TYPES
from oasislmf.utils.data import (get_dataframe, get_exposure_data,
                                 print_dataframe)
from oasislmf.utils.defaults import (KTOOLS_ALLOC_FM_MAX,
                                     KTOOLS_ALLOC_IL_DEFAULT,
                                     KTOOLS_ALLOC_RI_DEFAULT,
                                     OASIS_FILES_PREFIXES)
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.inputs import str2bool


[docs] class RunExposure(ComputationStep): """ Generates insured losses from preexisting Oasis files with specified loss factors (loss % of TIV). """
[docs] step_params = [ {'name': 'src_dir', 'flag': '-s', 'is_path': True, 'pre_exist': True, 'help': ''}, {'name': 'run_dir', 'flag': '-r', 'is_path': True, 'pre_exist': False, 'help': ''}, {'name': 'check_oed', 'type': str2bool, 'const': True, 'nargs': '?', 'default': True, 'help': 'if True check input oed files'}, {'name': 'output_file', 'flag': '-f', 'is_path': True, 'pre_exist': False, 'help': '', 'type': str}, {'name': 'loss_factor', 'flag': '-l', 'type': float, 'nargs': '+', 'help': '', 'default': [1.0]}, {'name': 'oed_schema_info', 'is_path': True, 'pre_exist': True, 'help': 'path to custom oed_schema'}, {'name': 'currency_conversion_json', 'is_path': True, 'pre_exist': True, 'help': 'settings to perform currency conversion of oed files'}, {'name': 'reporting_currency', 'help': 'currency to use in the results reported'}, {'name': 'ktools_alloc_rule_il', 'flag': '-a', 'default': KTOOLS_ALLOC_IL_DEFAULT, 'type': int, 'help': 'Set the fmcalc allocation rule used in direct insured loss'}, {'name': 'ktools_alloc_rule_ri', 'flag': '-A', 'default': KTOOLS_ALLOC_RI_DEFAULT, 'type': int, 'help': 'Set the fmcalc allocation rule used in reinsurance'}, {'name': 'output_level', 'flag': '-o', 'help': 'Keys files output format', 'choices': ['item', 'loc', 'pol', 'acc', 'port'], 'default': 'item'}, {'name': 'extra_summary_cols', 'nargs': '+', 'help': 'extra column to include in the summary', 'default': []}, {'name': 'coverage_types', 'type': int, 'nargs': '+', 'default': list(v['id'] for v in SUPPORTED_COVERAGE_TYPES.values()), 'help': 'Select List of supported coverage_types [1, .. ,4]'}, {'name': 'model_perils_covered', 'nargs': '+', 'default': ['AA1'], 'help': 'List of peril covered by the model'}, {'name': 'fmpy', 'default': True, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'use fmcalc python version instead of c++ version'}, {'name': 'fmpy_low_memory', 'default': False, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'use memory map instead of RAM to store loss array (may decrease performance but reduce RAM usage drastically)'}, {'name': 'fmpy_sort_output', 'default': True, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'order fmpy output by item_id'}, {'name': 'stream_type', 'flag': '-t', 'default': 2, 'type': int, 'help': 'Set the IL input stream type, 2 = default loss stream, 1 = deprecated cov/item stream'}, {'name': 'net_ri', 'default': True}, {'name': 'include_loss_factor', 'default': True}, {'name': 'print_summary', 'default': True}, {'name': 'do_disaggregation', 'type': str2bool, 'const': True, 'nargs': '?', 'default': True, 'help': 'if True run the oasis disaggregation.'}, ]
def _check_alloc_rules(self): alloc_ranges = { 'ktools_alloc_rule_il': KTOOLS_ALLOC_FM_MAX, 'ktools_alloc_rule_ri': KTOOLS_ALLOC_FM_MAX} for rule in alloc_ranges: alloc_val = getattr(self, rule) if (alloc_val < 0) or (alloc_val > alloc_ranges[rule]): raise OasisException(f'Error: {rule}={alloc_val} - Not withing valid range [0..{alloc_ranges[rule]}]')
[docs] def run(self): tmp_dir = None src_dir = self.src_dir if self.src_dir else os.getcwd() if self.run_dir: run_dir = self.run_dir else: tmp_dir = tempfile.TemporaryDirectory() run_dir = tmp_dir.name include_loss_factor = not (len(self.loss_factor) == 1) self._check_alloc_rules() self.oasis_files_dir = src_dir exposure_data = get_exposure_data(self, add_internal_col=True) if not exposure_data.location or not exposure_data.account: raise OasisException( f'Location and/or account missing in source directory "{src_dir}" - ' 'files named `location.*` and `account.*` are expected' ) il = bool(exposure_data.account) ril = all([exposure_data.ri_info, exposure_data.ri_scope, il]) self.logger.debug('\nRunning deterministic losses (GUL=True, IL={}, RIL={})\n'.format(il, ril)) if not os.path.exists(run_dir): os.makedirs(run_dir) # 1. Create Deterministic keys file keys_fp = os.path.join(run_dir, 'keys.csv') GenerateKeysDeterministic( keys_data_csv=keys_fp, supported_oed_coverage_types=self.coverage_types, exposure_data=exposure_data, model_perils_covered=self.model_perils_covered, ).run() # 2. Start Oasis files generation GenerateFiles( oasis_files_dir=run_dir, exposure_data=exposure_data, keys_data_csv=keys_fp, do_disaggregation=self.do_disaggregation, ).run() # 3. Run Deterministic Losses losses = GenerateLossesDeterministic( exposure_data=exposure_data, oasis_files_dir=run_dir, output_dir=os.path.join(run_dir, 'output'), include_loss_factor=include_loss_factor, loss_factor=self.loss_factor, net_ri=self.net_ri, ktools_alloc_rule_il=self.ktools_alloc_rule_il, ktools_alloc_rule_ri=self.ktools_alloc_rule_ri, fmpy=self.fmpy, fmpy_low_memory=self.fmpy_low_memory, fmpy_sort_output=self.fmpy_sort_output, il_stream_type=self.stream_type, ).run() guls_df = losses['gul'] ils_df = losses['il'] rils_df = losses['ri'] # Read in the summary map summaries_df = get_dataframe(src_fp=os.path.join(run_dir, 'fm_summary_map.csv'), lowercase_cols=False) guls_df.to_csv(path_or_buf=os.path.join(run_dir, 'guls.csv'), index=False, encoding='utf-8') guls_df.rename(columns={'loss': 'loss_gul'}, inplace=True) guls_df = guls_df.merge( right=summaries_df, left_on=["item_id"], right_on=["agg_id"] ) if include_loss_factor: join_cols = ["event_id", "output_id", "loss_factor_idx"] else: join_cols = ["event_id", "output_id"] if il: ils_df.to_csv(path_or_buf=os.path.join(run_dir, 'ils.csv'), index=False, encoding='utf-8') ils_df.rename(columns={'loss': 'loss_il'}, inplace=True) all_losses_df = guls_df.merge( how='left', right=ils_df, on=join_cols, suffixes=["_gul", "_il"] ) if ril: rils_df.to_csv(path_or_buf=os.path.join(run_dir, 'rils.csv'), index=False, encoding='utf-8') rils_df.rename(columns={'loss': 'loss_ri'}, inplace=True) all_losses_df = all_losses_df.merge( how='left', right=rils_df, on=join_cols ) oed_hierarchy = get_oed_hierarchy() portfolio_num = oed_hierarchy['portnum']['ProfileElementName'] acc_num = oed_hierarchy['accnum']['ProfileElementName'] loc_num = oed_hierarchy['locnum']['ProfileElementName'] policy_num = oed_hierarchy['polnum']['ProfileElementName'] if self.output_level == 'port': summary_cols = [portfolio_num] elif self.output_level == 'acc': summary_cols = [portfolio_num, acc_num] elif self.output_level == 'pol': summary_cols = [portfolio_num, acc_num, policy_num] elif self.output_level == 'loc': summary_cols = [portfolio_num, acc_num, loc_num] elif self.output_level == 'item': summary_cols = [ 'output_id', portfolio_num, acc_num, loc_num, policy_num, 'coverage_type_id'] elif self.output_level == 'peril_item': summary_cols = [ 'output_id', portfolio_num, acc_num, loc_num, policy_num, 'coverage_type_id', 'peril_id'] summary_cols += self.extra_summary_cols if include_loss_factor: group_by_cols = summary_cols + ['loss_factor_idx'] else: group_by_cols = summary_cols guls_df = guls_df.loc[:, group_by_cols + ['loss_gul']] if not il and not ril: all_loss_cols = group_by_cols + ['loss_gul'] all_losses_df = guls_df.loc[:, all_loss_cols] all_losses_df.drop_duplicates(keep=False, inplace=True) elif not ril: all_loss_cols = group_by_cols + ['loss_gul', 'loss_il'] all_losses_df = all_losses_df.loc[:, all_loss_cols] summary_gul_df = pd.DataFrame( {'loss_gul': guls_df.groupby(group_by_cols)['loss_gul'].sum()}).reset_index() summary_il_df = pd.DataFrame( {'loss_il': all_losses_df.groupby(group_by_cols)['loss_il'].sum()}).reset_index() all_losses_df = summary_gul_df.merge(how='left', right=summary_il_df, on=group_by_cols) else: all_loss_cols = group_by_cols + ['loss_gul', 'loss_il', 'loss_ri'] all_losses_df = all_losses_df.loc[:, all_loss_cols] summary_gul_df = pd.DataFrame( {'loss_gul': guls_df.groupby(group_by_cols)['loss_gul'].sum()}).reset_index() summary_il_df = pd.DataFrame( {'loss_il': all_losses_df.groupby(group_by_cols)['loss_il'].sum()}).reset_index() summary_ri_df = pd.DataFrame( {'loss_ri': all_losses_df.groupby(group_by_cols)['loss_ri'].sum()}).reset_index() all_losses_df = summary_gul_df.merge(how='left', right=summary_il_df, on=group_by_cols) all_losses_df = all_losses_df.merge(how='left', right=summary_ri_df, on=group_by_cols) for i in range(len(self.loss_factor)): if include_loss_factor: total_gul = guls_df[guls_df.loss_factor_idx == i].loss_gul.sum() else: total_gul = guls_df.loss_gul.sum() if not il and not ril: all_loss_cols = all_loss_cols + ['loss_gul'] all_losses_df = guls_df.loc[:, all_loss_cols] all_losses_df.drop_duplicates(keep=False, inplace=True) header = \ 'Losses (loss factor={:.2%}; total gul={:,.00f})'.format( self.loss_factor[i], total_gul) elif not ril: if include_loss_factor: total_il = ils_df[ils_df.loss_factor_idx == i].loss_il.sum() else: total_il = ils_df.loss_il.sum() header = \ 'Losses (loss factor={:.2%}; total gul={:,.00f}; total il={:,.00f})'.format( self.loss_factor[i], total_gul, total_il) else: if include_loss_factor: total_il = ils_df[ils_df.loss_factor_idx == i].loss_il.sum() total_ri_net = rils_df[rils_df.loss_factor_idx == i].loss_ri.sum() else: total_il = ils_df.loss_il.sum() total_ri_net = rils_df.loss_ri.sum() total_ri_ceded = total_il - total_ri_net header = \ 'Losses (loss factor={:.2%}; total gul={:,.00f}; total il={:,.00f}; total ri ceded={:,.00f})'.format( self.loss_factor[i], total_gul, total_il, total_ri_ceded) # Convert output cols to strings for formatting for c in group_by_cols: all_losses_df[c] = all_losses_df[c].apply(str) if self.print_summary: cols_to_print = all_loss_cols.copy() if False: cols_to_print.remove('loss_factor_idx') if include_loss_factor: print_dataframe( all_losses_df[all_losses_df.loss_factor_idx == str(i)], frame_header=header, cols=cols_to_print) else: print_dataframe( all_losses_df, frame_header=header, cols=cols_to_print) if self.output_file: all_losses_df.to_csv(self.output_file, index=False, encoding='utf-8') if tmp_dir: tmp_dir.cleanup() return (il, ril)
[docs] class RunFmTest(ComputationStep): """ Runs an FM test case and validates generated losses against expected losses. only use 'update_expected' for debugging it replaces the expected file with generated """
[docs] step_params = [ {'name': 'test_case_name', 'flag': '-c', 'type': str, 'help': 'Runs a specific test sub-directory from "test_case_dir". If not set then run all tests found.'}, {'name': 'list_tests', 'flag': '-l', 'action': 'store_true', 'help': 'List the valid test cases in the test directory rather than running'}, {'name': 'test_case_dir', 'flag': '-t', 'default': os.getcwd(), 'is_path': True, 'pre_exist': True, 'help': 'Test directory - should contain test directories containing OED files and expected results'}, {'name': 'run_dir', 'flag': '-r', 'help': 'Run directory - where files should be generated. If not set temporary files will not be saved.'}, {'name': 'test_tolerance', 'type': float, 'help': 'Relative tolerance between expected values and results, default is "1e-4" or 0.0001', 'default': 1e-4}, {'name': 'model_perils_covered', 'nargs': '+', 'default': ['AA1'], 'help': 'List of peril covered by the model'}, {'name': 'fmpy', 'default': True, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'use fmcalc python version instead of c++ version'}, {'name': 'fmpy_low_memory', 'default': False, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'use memory map instead of RAM to store loss array (may decrease performance but reduce RAM usage drastically)'}, {'name': 'fmpy_sort_output', 'default': True, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'order fmpy output by item_id'}, {'name': 'update_expected', 'default': False}, {'name': 'expected_output_dir', 'default': "expected"}, ]
[docs] def search_test_cases(self): case_names = [] for test_case in os.listdir(path=self.test_case_dir): if os.path.exists( os.path.join(self.test_case_dir, test_case, self.expected_output_dir) ): case_names.append(test_case) case_names.sort() return case_names, len(case_names)
def _case_dir_is_valid_test(self): src_contents = [fn.lower() for fn in os.listdir(self.test_case_dir)] return 'location.csv' and 'account.csv' and 'expected' in src_contents
[docs] def run(self): # Run test case given on CLI if self.test_case_name: return self.execute_test_case(self.test_case_name) # If 'test_case_dir' is a valid test run that dir directly if self._case_dir_is_valid_test(): return self.execute_test_case('') # Search for valid cases in sub-dirs and run all found case_names, case_num = self.search_test_cases() # If '--list-tests' is selected print found cases and exit if self.list_tests: for name in case_names: self.logger.info(name) exit(0) if case_num < 1: raise OasisException(f'No vaild FM test cases found in "{self.test_case_dir}"') else: # If test_case not selected run all cases self.logger.info(f"Running: {case_num} Tests from '{self.test_case_dir}'") self.logger.info(f'Test names: {case_names}') failed_tests = [] exit_status = 0 for case in case_names: test_result = self.execute_test_case(case) if not test_result: failed_tests.append(case) exit_status = 1 if len(failed_tests) == 0: self.logger.info("All tests passed") else: self.logger.info("{} test failed: ".format(len(failed_tests))) [self.logger.info(n) for n in failed_tests] exit(exit_status)
[docs] def execute_test_case(self, test_case): if self.run_dir: tmp_dir = None run_dir = self.run_dir else: tmp_dir = tempfile.TemporaryDirectory() run_dir = tmp_dir.name test_dir = os.path.join(self.test_case_dir, test_case) output_level = 'loc' loss_factor_fp = os.path.join(test_dir, 'loss_factors.csv') loss_factor = [] include_loss_factor = False if os.path.exists(loss_factor_fp): loss_factor = [] include_loss_factor = True try: with open(loss_factor_fp, 'r') as csvfile: reader = csv.DictReader(csvfile) for row in reader: loss_factor.append( float(row['loss_factor'])) except Exception as e: raise OasisException(f"Failed to read {loss_factor_fp}", e) else: loss_factor.append(1.0) output_file = os.path.join(run_dir, 'loc_summary.csv') (il, ril) = RunExposure( src_dir=test_dir, run_dir=run_dir, loss_factor=loss_factor, output_level=output_level, model_perils_covered=self.model_perils_covered, output_file=output_file, include_loss_factor=include_loss_factor, fmpy=self.fmpy, fmpy_low_memory=self.fmpy_low_memory, fmpy_sort_output=self.fmpy_sort_output, ).run() expected_data_dir = os.path.join(test_dir, self.expected_output_dir) if not os.path.exists(expected_data_dir): if self.update_expected: os.makedirs(expected_data_dir) else: raise OasisException( 'No subfolder named `expected` found in the input directory - ' 'this subfolder should contain the expected set of GUL + IL ' 'input files, optionally the RI input files, and the expected ' 'set of GUL, IL and optionally the RI loss files' ) files = ['keys.csv', 'loc_summary.csv'] files += [ '{}.csv'.format(fn) for ft, fn in chain(OASIS_FILES_PREFIXES['gul'].items(), OASIS_FILES_PREFIXES['il'].items()) ] files += ['gul_summary_map.csv', 'guls.csv'] if il: files += ['fm_summary_map.csv', 'ils.csv'] if ril: files += ['rils.csv'] test_result = True for f in files: generated = os.path.join(run_dir, f) expected = os.path.join(expected_data_dir, f) if not os.path.exists(expected): if self.update_expected and os.path.exists(generated): shutil.copyfile(generated, expected) continue try: pd.testing.assert_frame_equal( pd.read_csv(expected), pd.read_csv(generated), check_exact=False, rtol=self.test_tolerance ) except AssertionError: if self.update_expected: shutil.copyfile(generated, expected) else: print("Expected:") with open(expected) as f: self.logger.info(f.read()) print("Generated:") with open(generated) as f: self.logger.info(f.read()) raise OasisException( f'\n FAIL: generated {generated} vs expected {expected}' ) test_result = False if tmp_dir: tmp_dir.cleanup() return test_result