__all__ = [
'get_summary_mapping',
'generate_summaryxref_files',
'merge_oed_to_mapping',
'write_exposure_summary',
'get_exposure_summary',
'get_useful_summary_cols',
'get_xref_df',
'write_summary_levels',
'write_mapping_file',
]
import pathlib
import io
import json
import os
import numpy as np
import pandas as pd
from ..utils.coverages import SUPPORTED_COVERAGE_TYPES
from ..utils.data import (
factorize_dataframe,
factorize_ndarray,
get_dataframe,
get_json,
merge_dataframes,
set_dataframe_column_dtypes,
fill_na_with_categoricals
)
from ..utils.defaults import (
SOURCE_IDX,
SUMMARY_MAPPING,
SUMMARY_OUTPUT,
SUMMARY_TOP_LEVEL_COLS,
get_default_exposure_profile,
)
from ..utils.exceptions import OasisException
from ..utils.log import oasis_log
from ..utils.path import as_path
from ..utils.status import OASIS_KEYS_STATUS, OASIS_KEYS_STATUS_MODELLED
MAP_SUMMARY_DTYPES = {
'loc_id': 'int',
SOURCE_IDX['loc']: 'int',
SOURCE_IDX['acc']: 'int',
'item_id': 'int',
'layer_id': 'int',
'coverage_id': 'int',
'peril_id': 'category',
'agg_id': 'int',
'output_id': 'int',
'coverage_type_id': 'int',
'tiv': 'float',
'building_id': 'int',
'risk_id': 'int',
}
[docs]
def get_useful_summary_cols(oed_hierarchy):
return [
oed_hierarchy['accnum']['ProfileElementName'],
oed_hierarchy['locnum']['ProfileElementName'],
'loc_id',
oed_hierarchy['polnum']['ProfileElementName'],
oed_hierarchy['portnum']['ProfileElementName'],
SOURCE_IDX['loc'],
SOURCE_IDX['acc'],
'item_id',
'layer_id',
'coverage_id',
'peril_id',
'agg_id',
'output_id',
'coverage_type_id',
'tiv',
'building_id',
'risk_id',
'intensity_adjustment',
'return_period'
]
[docs]
def get_xref_df(il_inputs_df):
top_level_layers_df = il_inputs_df.loc[il_inputs_df['level_id'] == il_inputs_df['level_id'].max(), ['top_agg_id'] + SUMMARY_TOP_LEVEL_COLS]
bottom_level_layers_df = il_inputs_df[il_inputs_df['level_id'] == 1]
bottom_level_layers_df.drop(columns=SUMMARY_TOP_LEVEL_COLS, inplace=True)
return (merge_dataframes(bottom_level_layers_df, top_level_layers_df, join_on=['top_agg_id'])
.drop_duplicates(subset=['gul_input_id', 'layer_id'], keep='first')
.sort_values(['gul_input_id', 'layer_id'])
)
@oasis_log
[docs]
def get_summary_mapping(inputs_df, oed_hierarchy, is_fm_summary=False):
"""
Create a DataFrame with linking information between Ktools `OasisFiles`
And the Exposure data
:param inputs_df: datafame from gul_inputs.get_gul_input_items(..) / il_inputs.get_il_input_items(..)
:type inputs_df: pandas.DataFrame
:param is_fm_summary: Indicates whether an FM summary mapping is required
:type is_fm_summary: bool
:return: Subset of columns from gul_inputs_df / il_inputs_df
:rtype: pandas.DataFrame
"""
# Case GUL+FM (based on il_inputs_df)
if is_fm_summary:
summary_mapping = get_xref_df(inputs_df)
summary_mapping['agg_id'] = summary_mapping['gul_input_id']
summary_mapping = summary_mapping.reindex(sorted(summary_mapping.columns, key=str.lower), axis=1)
summary_mapping['output_id'] = factorize_ndarray(
summary_mapping.loc[:, ['gul_input_id', 'layer_id']].values,
col_idxs=range(2)
)[0]
# GUL Only
else:
summary_mapping = inputs_df.copy(deep=True)
summary_mapping['layer_id'] = 1
summary_mapping['agg_id'] = summary_mapping['item_id']
summary_mapping.drop(
[c for c in summary_mapping.columns if c not in get_useful_summary_cols(oed_hierarchy)],
axis=1,
inplace=True
)
acc_num = oed_hierarchy['accnum']['ProfileElementName']
loc_num = oed_hierarchy['locnum']['ProfileElementName']
policy_num = oed_hierarchy['polnum']['ProfileElementName']
portfolio_num = oed_hierarchy['portnum']['ProfileElementName']
dtypes = {
**{t: 'str' for t in [portfolio_num, policy_num, acc_num, loc_num, 'peril_id']},
**{t: 'uint8' for t in ['coverage_type_id']},
**{t: 'uint32' for t in [SOURCE_IDX['loc'], SOURCE_IDX['acc'], 'loc_id', 'item_id', 'layer_id', 'coverage_id', 'agg_id', 'output_id',
'building_id', 'risk_id']},
**{t: 'float64' for t in ['tiv']}
}
summary_mapping = set_dataframe_column_dtypes(summary_mapping, dtypes)
return summary_mapping
[docs]
def merge_oed_to_mapping(summary_map_df, exposure_df, oed_column_join, oed_column_info):
"""
Create a factorized col (summary ids) based on a list of oed column names
:param :summary_map_df dataframe return from get_summary_mapping
:type summary_map_df: pandas.DataFrame
:param exposure_df: Summary map file path
:type exposure_df: pandas.DataFrame
:param oed_column_join: column to join on
:type oed_column_join: list
:param oed_column_info: Dictionary of columns to pick from exposure_df and their default value
:type oed_column_info: dict
{'Col_A': 0, 'Col_B': 1, 'Col_C': 2}
:return: New DataFrame of summary_map_df + exposure_df merged on exposure index
:rtype: pandas.DataFrame
"""
column_set = set(oed_column_info)
columns_found = [c for c in column_set if c in exposure_df.columns and c not in summary_map_df.columns]
columns_missing = list(set(column_set) - set(columns_found))
new_summary_map_df = merge_dataframes(summary_map_df, exposure_df.loc[:, columns_found + oed_column_join], join_on=oed_column_join, how='inner')
for col, default in oed_column_info.items():
if col in columns_missing:
new_summary_map_df[col] = default
fill_na_with_categoricals(new_summary_map_df, oed_column_info)
return new_summary_map_df
def group_by_oed(oed_col_group, summary_map_df, exposure_df, sort_by, accounts_df=None):
"""
Adds list of OED fields from `column_set` to summary map file
:param :summary_map_df dataframe return from get_summary_mapping
:type summary_map_df: pandas.DataFrame
:param exposure_df: DataFrame loaded from location.csv
:type exposure_df: pandas.DataFrame
:param accounts_df: DataFrame loaded from accounts.csv
:type accounts_df: pandas.DataFrame
:return: subset of columns from exposure_df to merge
:rtype: list
summary_ids[0] is an int list 1..n array([1, 2, 1, 2, 1, 2, 1, 2, 1, 2, ... ])
summary_ids[1] is an array of values used to factorize `array(['Layer1', 'Layer2'], dtype=object)`
"""
oed_cols = oed_col_group # All requred columns
unmapped_cols = [c for c in oed_cols if c not in summary_map_df.columns] # columns which in locations / Accounts file
mapped_cols = [c for c in oed_cols + [SOURCE_IDX['loc'], SOURCE_IDX['acc'], sort_by]
if c in summary_map_df.columns] # Columns already in summary_map_df
tiv_cols = ['tiv', 'loc_id', 'building_id', 'coverage_type_id']
# Extract mapped_cols from summary_map_df
summary_group_df = summary_map_df.loc[:, list(set(tiv_cols).union(mapped_cols))]
# Search Loc / Acc files and merge in remaing
if unmapped_cols is not []:
# Location file columns
exposure_cols = [c for c in unmapped_cols if c in exposure_df.columns]
exposure_col_df = exposure_df.loc[:, exposure_cols + [SOURCE_IDX['loc']]]
summary_group_df = merge_dataframes(summary_group_df, exposure_col_df, join_on=SOURCE_IDX['loc'], how='left')
# Account file columns
if isinstance(accounts_df, pd.DataFrame):
accounts_cols = [c for c in unmapped_cols if c in set(accounts_df.columns) - set(exposure_df.columns)]
if accounts_cols:
accounts_col_df = accounts_df.loc[:, accounts_cols + [SOURCE_IDX['acc']]]
summary_group_df = merge_dataframes(summary_group_df, accounts_col_df, join_on=SOURCE_IDX['acc'], how='left')
fill_na_with_categoricals(summary_group_df, 0)
summary_group_df.sort_values(by=[sort_by], inplace=True)
summary_ids = factorize_dataframe(summary_group_df, by_col_labels=oed_cols)
summary_tiv = summary_group_df.drop_duplicates(['loc_id', 'building_id', 'coverage_type_id'] + oed_col_group,
keep='first').groupby(oed_col_group, observed=True).agg({'tiv': np.sum})
return summary_ids[0], summary_ids[1], summary_tiv
@oasis_log
[docs]
def write_summary_levels(exposure_df, accounts_df, exposure_data, target_dir):
'''
Json file with list Available / Recommended columns for use in the summary reporting
Available: Columns which exists in input files and has at least one non-zero / NaN value
Recommended: Columns which are available + also in the list of `useful` groupings SUMMARY_LEVEL_LOC
{
'GUL': {
'available': ['AccNumber',
'LocNumber',
'istenant',
'buildingid',
'countrycode',
'latitude',
'longitude',
'streetaddress',
'postalcode',
'occupancycode',
'constructioncode',
'locperilscovered',
'BuildingTIV',
'ContentsTIV',
'BITIV',
'PortNumber'],
'IL': {
... etc ...
}
}
'''
# Manage internal columns, (Non-OED exposure input)
int_excluded_cols = ['loc_id', SOURCE_IDX['loc']]
desc_non_oed = 'Not an OED field'
int_oasis_cols = {
'coverage_type_id': 'Oasis coverage type',
'peril_id': 'OED peril code',
'coverage_id': 'Oasis coverage identifier',
}
# GUL perspective (loc columns only)
l_col_list = exposure_df.replace(0, np.nan).dropna(how='any', axis=1).columns.to_list()
l_col_info = exposure_data.get_input_fields('Loc')
gul_avail = {k: l_col_info[k.lower()]["Type & Description"] if k.lower() in l_col_info else desc_non_oed
for k in set([c for c in l_col_list]).difference(int_excluded_cols)}
# IL perspective (join of acc + loc col with no dups)
il_avail = {}
if accounts_df is not None:
a_col_list = accounts_df.loc[:, ~accounts_df.isnull().all()].columns.to_list()
a_col_info = exposure_data.get_input_fields('Acc')
a_avail = set([c for c in a_col_list])
il_avail = {k: a_col_info[k.lower()]["Type & Description"] if k.lower() in a_col_info else desc_non_oed
for k in a_avail.difference(gul_avail.keys())}
# Write JSON
gul_summary_lvl = {'GUL': {'available': {**gul_avail, **il_avail, **int_oasis_cols}}}
il_summary_lvl = {'IL': {'available': {**gul_avail, **il_avail, **int_oasis_cols}}} if il_avail else {}
with io.open(os.path.join(target_dir, 'exposure_summary_levels.json'), 'w', encoding='utf-8') as f:
f.write(json.dumps({**gul_summary_lvl, **il_summary_lvl}, sort_keys=True, ensure_ascii=False, indent=4))
@oasis_log
[docs]
def write_mapping_file(sum_inputs_df, target_dir, is_fm_summary=False):
"""
Writes a summary map file, used to build summarycalc xref files.
:param summary_mapping: dataframe return from get_summary_mapping
:type summary_mapping: pandas.DataFrame
:param sum_mapping_fp: Summary map file path
:type sum_mapping_fp: str
:param is_fm_summary: Indicates whether an FM summary mapping is required
:type is_fm_summary: bool
:return: Summary xref file path
:rtype: str
"""
target_dir = as_path(
target_dir,
'Target IL input files directory',
is_dir=True,
preexists=False
)
# Set chunk size for writing the CSV files - default is max 20K, min 1K
chunksize = min(2 * 10 ** 5, max(len(sum_inputs_df), 1000))
if is_fm_summary:
sum_mapping_fp = os.path.join(target_dir, SUMMARY_MAPPING['fm_map_fn'])
else:
sum_mapping_fp = os.path.join(target_dir, SUMMARY_MAPPING['gul_map_fn'])
try:
sum_inputs_df.to_csv(
path_or_buf=sum_mapping_fp,
encoding='utf-8',
mode=('w' if os.path.exists(sum_mapping_fp) else 'a'),
chunksize=chunksize,
index=False
)
except (IOError, OSError) as e:
raise OasisException("Exception raised in 'write_mapping_file'", e)
return sum_mapping_fp
def get_column_selection(summary_set):
"""
Given a analysis_settings summary definition, return either
1. the set of OED columns requested to group by
2. If no information key 'oed_fields', then group all outputs into a single summary_set
:param summary_set: summary group dictionary from the `analysis_settings.json`
:type summary_set: dict
:return: List of selected OED columns to create summary groups from
:rtype: list
"""
if "oed_fields" not in summary_set:
return []
if not summary_set["oed_fields"]:
return []
# Use OED column list set in analysis_settings file
elif isinstance(summary_set['oed_fields'], list) and len(summary_set['oed_fields']) > 0:
return [c for c in summary_set['oed_fields']]
elif isinstance(summary_set['oed_fields'], str) and len(summary_set['oed_fields']) > 0:
return [summary_set['oed_fields']]
else:
raise OasisException(
'Error processing settings file: "oed_fields" '
'is expected to be a list of strings, not {}'.format(type(summary_set['oed_fields']))
)
def get_ri_settings(run_dir):
"""
Return the contents of ri_layers.json
Example:
{
"1": {
"inuring_priority": 1,
"risk_level": "LOC",
"directory": " ... /runs/ProgOasis-20190501145127/RI_1"
}
}
:param run_dir: The file path of the model run directory
:type run_dir: str
:return: metadata for the Reinsurance layers
:rtype: dict
"""
return get_json(src_fp=os.path.join(run_dir, 'ri_layers.json'))
def write_df_to_csv_file(df, target_dir, filename):
"""
Write a generated summary xref dataframe to disk in csv format.
:param df: The dataframe output of get_df( .. )
:type df: pandas.DataFrame
:param target_dir: Abs directory to write a summary_xref file
:type target_dir: str
:param filename: Name of output file
:type filename: str
"""
target_dir = as_path(target_dir, 'Input files directory', is_dir=True, preexists=False)
pathlib.Path(target_dir).mkdir(parents=True, exist_ok=True)
chunksize = min(2 * 10 ** 5, max(len(df), 1000))
csv_fp = os.path.join(target_dir, filename)
try:
df.to_csv(
path_or_buf=csv_fp,
encoding='utf-8',
mode=('w'),
chunksize=chunksize,
index=False
)
except (IOError, OSError) as e:
raise OasisException("Exception raised in 'write_df_to_csv_file'", e)
return csv_fp
def write_df_to_parquet_file(df, target_dir, filename):
"""
Write a generated summary xref dataframe to disk in parquet format.
:param df: The dataframe output of get_df( .. )
:type df: pandas.DataFrame
:param target_dir: Abs directory to write a summary_xref file
:type target_dir: str
:param filename: Name of output file
:type filename: str
"""
target_dir = as_path(
target_dir, 'Output files directory', is_dir=True, preexists=False
)
parquet_fp = os.path.join(target_dir, filename)
try:
df.to_parquet(path=parquet_fp, engine='pyarrow')
except (IOError, OSError) as e:
raise OasisException(
"Exception raised in 'write_df_to_parquet_file'", e
)
return parquet_fp
def get_summary_xref_df(
map_df, exposure_df, accounts_df, summaries_info_dict, summaries_type,
id_set_index='output_id'
):
"""
Create a Dataframe for either gul / il / ri based on a section
from the analysis settings
:param map_df: Summary Map dataframe (GUL / IL)
:type map_df: pandas.DataFrame
:param exposure_df: Location OED data
:type exposure_df: pandas.DataFrame
:param accounts_df: Accounts OED data
:type accounts_df: pandas.DataFrame
:param summaries_info_dict: list of dictionary definitionfor a summary group from the analysis_settings file
:type summaries_info_dict: list
[{
"summarycalc": true,
"eltcalc": true,
"aalcalc": true,
"pltcalc": true,
"id": 1,
"oed_fields": [],
"lec_output": true,
"leccalc": {
"return_period_file": true,
"outputs": {
"full_uncertainty_aep": true,
"full_uncertainty_oep": true,
"wheatsheaf_aep": true,
"wheatsheaf_oep": true
}
}
},
...
]
:param summaries_type: Text label to use as key in summary description either ['gul', 'il', 'ri']
:type summaries_type: String
:return summaryxref_df: Dataframe containing abstracted summary data for ktools
:rtypwrite_xref_filee: pandas.DataFrame
:return summary_desc: dictionary of dataFrames listing what summary_ids map to
:rtype: dictionary
"""
summaryxref_df = pd.DataFrame()
summary_desc = {}
all_cols = set(map_df.columns.to_list() + exposure_df.columns.to_list())
if isinstance(accounts_df, pd.DataFrame):
all_cols.update(accounts_df.columns.to_list())
# Extract the summary id index column depending on id_set_index
map_df.sort_values(id_set_index, inplace=True)
ids_set_df = map_df.loc[:, [id_set_index]].rename(columns={'output_id': "output"})
# For each granularity build a set grouping
for summary_set in summaries_info_dict:
summary_set_df = ids_set_df
cols_group_by = get_column_selection(summary_set)
file_extension = 'csv'
if summary_set.get('ord_output', {}).get('parquet_format'):
file_extension = 'parquet'
desc_key = '{}_S{}_summary-info.{}'.format(
summaries_type, summary_set['id'], file_extension
)
# an empty intersection means no selected columns from the input data
if not set(cols_group_by).intersection(all_cols):
# is the intersection empty because the columns don't exist?
if set(cols_group_by).difference(all_cols):
err_msg = 'Input error: Summary set columns missing from the input files: {}'.format(
set(cols_group_by).difference(all_cols))
raise OasisException(err_msg)
# Fall back to setting all in single group
summary_set_df['summary_id'] = 1
summary_desc[desc_key] = pd.DataFrame(data=['All-Risks'], columns=['_not_set_'])
summary_desc[desc_key].insert(loc=0, column='summary_id', value=1)
summary_desc[desc_key].insert(loc=len(summary_desc[desc_key].columns), column='tiv',
value=map_df.drop_duplicates(['building_id', 'loc_id', 'coverage_type_id'], keep='first').tiv.sum())
else:
(
summary_set_df['summary_id'],
set_values,
tiv_values
) = group_by_oed(cols_group_by, map_df, exposure_df, id_set_index, accounts_df)
# Build description file
summary_desc_df = pd.DataFrame(data=list(set_values), columns=cols_group_by)
summary_desc_df.insert(loc=0, column='summary_id', value=range(1, len(set_values) + 1))
summary_desc[desc_key] = pd.merge(summary_desc_df, tiv_values, left_on=cols_group_by, right_on=cols_group_by)
# Appends summary set to '__summaryxref.csv'
summary_set_df['summaryset_id'] = summary_set['id']
summaryxref_df = pd.concat([summaryxref_df, summary_set_df.drop_duplicates()], sort=True, ignore_index=True)
dtypes = {
t: 'uint32' for t in ['coverage_id', 'summary_id', 'summaryset_id']
}
summaryxref_df = set_dataframe_column_dtypes(summaryxref_df, dtypes)
return summaryxref_df, summary_desc
@oasis_log
[docs]
def generate_summaryxref_files(
location_df, account_df, model_run_fp, analysis_settings, il=False,
ri=False, rl=False, gul_item_stream=False, fmpy=False
):
"""
Top level function for creating the summaryxref files from the manager.py
:param model_run_fp: Model run directory file path
:type model_run_fp: str
:param analysis_settings: Model analysis settings file
:type analysis_settings: dict
:param il: Boolean to indicate the insured loss level mode - false if the
source accounts file path not provided to Oasis files gen.
:type il: bool
:param ri: Boolean to indicate the RI loss level mode - false if the
source accounts file path not provided to Oasis files gen.
:type ri: bool
:param rl: Boolean to indicate the RL loss level mode - false if the
source accounts file path not provided to Oasis files gen.
:type rl: bool
:param gul_items: Boolean to gul to use item_id instead of coverage_id
:type gul_items: bool
:param fmpy: Boolean to indicate whether fmpy python version will be used
:type fmpy: bool
"""
# Boolean checks for summary generation types (gul / il / ri)
gul_summaries = all([
analysis_settings.get('gul_output'),
analysis_settings.get('gul_summaries'),
])
il_summaries = all([
analysis_settings.get('il_output'),
analysis_settings.get('il_summaries'),
il,
])
ri_summaries = all([
analysis_settings.get('ri_output'),
analysis_settings.get('ri_summaries'),
ri,
])
rl_summaries = all([
analysis_settings.get('rl_output'),
analysis_settings.get('rl_summaries'),
rl,
])
# Load il_map if present
if il_summaries or ri_summaries or rl_summaries:
if account_df is None:
raise OasisException('No account file found.')
il_map_fp = os.path.join(model_run_fp, 'input', SUMMARY_MAPPING['fm_map_fn'])
il_map_df = get_dataframe(
src_fp=il_map_fp,
lowercase_cols=False,
col_dtypes=MAP_SUMMARY_DTYPES,
empty_data_error_msg='No summary map file found.',
)
il_map_df = il_map_df[list(set(il_map_df).intersection(MAP_SUMMARY_DTYPES))]
if gul_summaries:
gul_map_df = il_map_df
gul_map_df['item_id'] = gul_map_df['agg_id']
elif gul_summaries:
gul_map_fp = os.path.join(model_run_fp, 'input', SUMMARY_MAPPING['gul_map_fn'])
gul_map_df = get_dataframe(
src_fp=gul_map_fp,
lowercase_cols=False,
col_dtypes=MAP_SUMMARY_DTYPES,
empty_data_error_msg='No summary map file found.',
)
gul_map_df = gul_map_df[list(set(gul_map_df).intersection(MAP_SUMMARY_DTYPES))]
if gul_summaries:
# Load GUL summary map
id_set_index = 'item_id' if gul_item_stream else 'coverage_id'
gul_summaryxref_df, gul_summary_desc = get_summary_xref_df(
gul_map_df,
location_df,
account_df,
analysis_settings['gul_summaries'],
'gul',
id_set_index
)
# Write Xref file
write_df_to_csv_file(gul_summaryxref_df, os.path.join(model_run_fp, 'input'), SUMMARY_OUTPUT['gul'])
# Write summary_id description files
for desc_key in gul_summary_desc:
if desc_key.split('.')[-1] == 'parquet':
write_df_to_parquet_file(gul_summary_desc[desc_key], os.path.join(model_run_fp, 'output'), desc_key)
else:
write_df_to_csv_file(gul_summary_desc[desc_key], os.path.join(model_run_fp, 'output'), desc_key)
if il_summaries:
# Load FM summary map
il_map_fp = os.path.join(model_run_fp, 'input', SUMMARY_MAPPING['fm_map_fn'])
il_map_df = get_dataframe(
src_fp=il_map_fp,
lowercase_cols=False,
col_dtypes=MAP_SUMMARY_DTYPES,
empty_data_error_msg='No summary map file found.',
)
il_map_df = il_map_df[list(set(il_map_df).intersection(MAP_SUMMARY_DTYPES))]
il_summaryxref_df, il_summary_desc = get_summary_xref_df(
il_map_df,
location_df,
account_df,
analysis_settings['il_summaries'],
'il'
)
# Write Xref file
write_df_to_csv_file(il_summaryxref_df, os.path.join(model_run_fp, 'input'), SUMMARY_OUTPUT['il'])
# Write summary_id description files
for desc_key in il_summary_desc:
if desc_key.split('.')[-1] == 'parquet':
write_df_to_parquet_file(il_summary_desc[desc_key], os.path.join(model_run_fp, 'output'), desc_key)
else:
write_df_to_csv_file(il_summary_desc[desc_key], os.path.join(model_run_fp, 'output'), desc_key)
if ri_summaries or rl_summaries:
if ('il_summaries' not in analysis_settings) or (not il_summaries):
il_map_fp = os.path.join(model_run_fp, 'input', SUMMARY_MAPPING['fm_map_fn'])
il_map_df = get_dataframe(
src_fp=il_map_fp,
lowercase_cols=False,
col_dtypes=MAP_SUMMARY_DTYPES,
empty_data_error_msg='No summary map file found.',
)
il_map_df = il_map_df[list(set(il_map_df).intersection(MAP_SUMMARY_DTYPES))]
ri_summaryxref_df, ri_summary_desc = get_summary_xref_df(
il_map_df,
location_df,
account_df,
analysis_settings['ri_summaries'],
'ri'
)
# Write Xref file for each inuring priority where output has been requested
ri_settings = get_ri_settings(os.path.join(model_run_fp, 'input'))
ri_layers = {int(x) for x in ri_settings}
max_layer = max(ri_layers)
ri_inuring_priorities = set(analysis_settings.get('ri_inuring_priorities', []))
ri_inuring_priorities.add(max_layer)
if not fmpy:
if len(ri_inuring_priorities) > 1:
raise OasisException('Outputs at intermediate inuring priorities not compatible with fmcalc c++ option.')
if not ri_inuring_priorities.issubset(ri_layers):
ri_missing_layers = ri_inuring_priorities.difference(ri_layers)
ri_missing_layers = [str(layer) for layer in ri_missing_layers]
missing_layers = ', '.join(ri_missing_layers[:-1])
missing_layers += ' and ' * (len(ri_missing_layers) > 1) + ri_missing_layers[-1]
missing_layers = ('priority ' if len(ri_missing_layers) == 1 else 'priorities ') + missing_layers
raise OasisException(f'Requested outputs for inuring priorities {missing_layers} lie outside of scope.')
# If requested, gross RL output at every inuring priority
if rl_summaries:
ri_inuring_priorities = ri_layers
for inuring_priority in ri_inuring_priorities:
summary_ri_fp = os.path.join(
model_run_fp, 'input', os.path.basename(ri_settings[str(inuring_priority)]['directory']))
write_df_to_csv_file(ri_summaryxref_df, summary_ri_fp, SUMMARY_OUTPUT['il'])
# Write summary_id description files
for desc_key in ri_summary_desc:
if desc_key.split('.')[-1] == 'parquet':
write_df_to_parquet_file(ri_summary_desc[desc_key], os.path.join(model_run_fp, 'output'), desc_key)
else:
write_df_to_csv_file(ri_summary_desc[desc_key], os.path.join(model_run_fp, 'output'), desc_key)
def get_exposure_summary_by_status(df, exposure_summary, peril_id, status):
"""
Populate dictionary of TIV and number of locations, grouped by peril and
validity respectively
:param df: dataframe from gul_inputs.get_gul_input_items(..)
:type df: pandas.DataFrame
:param peril_id: Descriptive OED peril key, e.g. "WTC"
:type peril_id: str
:param status: status returned by lookup ('success', 'fail' or 'nomatch')
:type status: str
:return: populated exposure_summary dictionary
:rtype: dict
"""
# Separate TIVs and number of distinct locations by coverage type and acquire sum
for coverage_type in SUPPORTED_COVERAGE_TYPES:
tiv_sum = df.loc[
(df['peril_id'] == peril_id) & (df['coverage_type_id'] == SUPPORTED_COVERAGE_TYPES[coverage_type]['id']),
'tiv'
].sum()
tiv_sum = float(tiv_sum)
exposure_summary[peril_id][status]['tiv_by_coverage'][coverage_type] = tiv_sum
exposure_summary[peril_id][status]['tiv'] += tiv_sum
loc_count = df.loc[
(df['peril_id'] == peril_id) & (df['coverage_type_id'] == SUPPORTED_COVERAGE_TYPES[coverage_type]['id']),
'loc_id'
].drop_duplicates().count()
loc_count = int(loc_count)
exposure_summary[peril_id][status]['number_of_locations_by_coverage'][coverage_type] = loc_count
# Find number of locations
loc_count = df.loc[df['peril_id'] == peril_id, 'loc_id'].drop_duplicates().count()
loc_count = int(loc_count)
exposure_summary[peril_id][status]['number_of_locations'] = loc_count
return exposure_summary
def get_exposure_summary_all(df, exposure_summary, peril_id):
"""
Populate dictionary of TIV and number of locations, grouped by peril
:param df: dataframe from gul_inputs.get_gul_input_items(..)
:type df: pandas.DataFrame
:param exposure_summary: dictionary to populate created in write_exposure_summary(..)
:type exposure_summary: dict
:param peril_id: Descriptive OED peril key, e.g. "WTC"
:type peril_id: str
:return: populated exposure_summary dictionary
:rtype: dict
"""
# Separate TIVs and number of distinct locations by coverage type and acquire sum
for coverage_type in SUPPORTED_COVERAGE_TYPES:
tiv_sum = df.loc[
(df['peril_id'] == peril_id) &
(df['coverage_type_id'] == SUPPORTED_COVERAGE_TYPES[coverage_type]['id']),
'tiv'
].sum()
tiv_sum = float(tiv_sum)
exposure_summary[peril_id]['all']['tiv_by_coverage'][coverage_type] = tiv_sum
exposure_summary[peril_id]['all']['tiv'] += tiv_sum
loc_count = df.loc[
(df['peril_id'] == peril_id) &
(df['coverage_type_id'] == SUPPORTED_COVERAGE_TYPES[coverage_type]['id']),
'loc_id'
].drop_duplicates().count()
loc_count = int(loc_count)
exposure_summary[peril_id]['all']['number_of_locations_by_coverage'][coverage_type] = loc_count
# Find number of locations total
loc_count = df.loc[df['peril_id'] == peril_id, 'loc_id'].drop_duplicates().count()
loc_count = int(loc_count)
exposure_summary[peril_id]['all']['number_of_locations'] = loc_count
# Find number of locations by coverage type
return exposure_summary
@oasis_log
def get_exposure_totals(df):
"""
Return dictionary with total TIVs and number of locations
:param df: dataframe `df_summary_peril` from `get_exposure_summary`
:type df: pandas.DataFrame
:return: totals section for exposure_summary dictionary
:rtype: dict
"""
dedupe_cols = ['loc_id', 'coverage_type_id']
within_scope_tiv = df[df.status.isin(OASIS_KEYS_STATUS_MODELLED)].drop_duplicates(subset=dedupe_cols)['tiv'].sum()
within_scope_num = len(df[df.status.isin(OASIS_KEYS_STATUS_MODELLED)]['loc_id'].unique())
outside_scope_tiv = df[~df.status.isin(OASIS_KEYS_STATUS_MODELLED)].drop_duplicates(subset=dedupe_cols)['tiv'].sum()
outside_scope_num = len(df[~df.status.isin(OASIS_KEYS_STATUS_MODELLED)]['loc_id'].unique())
portfolio_tiv = df.drop_duplicates(subset=dedupe_cols)['tiv'].sum()
portfolio_num = len(df['loc_id'].unique())
return {
"modelled": {
"tiv": within_scope_tiv,
"number_of_locations": within_scope_num
},
"not-modelled": {
"tiv": outside_scope_tiv,
"number_of_locations": outside_scope_num
},
"portfolio": {
"tiv": portfolio_tiv,
"number_of_locations": portfolio_num
}
}
@oasis_log
[docs]
def get_exposure_summary(
exposure_df,
keys_df,
exposure_profile=get_default_exposure_profile(),
):
"""
Create exposure summary as dictionary of TIVs and number of locations
grouped by peril and validity respectively. returns a python dict().
:param exposure_df: source exposure dataframe
:type exposure df: pandas.DataFrame
:param keys_df: dataFrame holding keys data (success and errors)
:type keys_errors_df: pandas.DataFrame
:param exposure_profile: profile defining exposure file
:type exposure_profile: dict
:return: Exposure summary dictionary
:rtype: dict
"""
# get location tivs by coveragetype
df_summary = []
for field in exposure_profile:
if 'FMTermType' in exposure_profile[field].keys():
if exposure_profile[field]['FMTermType'] == 'TIV':
cov_name = exposure_profile[field]['ProfileElementName']
coverage_type_id = exposure_profile[field]['CoverageTypeID']
tmp_df = exposure_df[['loc_id', cov_name]]
tmp_df.columns = ['loc_id', 'tiv']
tmp_df['coverage_type_id'] = coverage_type_id
df_summary.append(tmp_df)
df_summary = pd.concat(df_summary)
# get all perils
peril_list = keys_df['peril_id'].drop_duplicates().to_list()
df_summary_peril = []
for peril_id in peril_list:
tmp_df = df_summary.copy()
tmp_df['peril_id'] = peril_id
df_summary_peril.append(tmp_df)
df_summary_peril = pd.concat(df_summary_peril)
df_summary_peril = df_summary_peril.merge(keys_df, how='left', on=['loc_id', 'coverage_type_id', 'peril_id'])
no_return = OASIS_KEYS_STATUS['noreturn']['id']
df_summary_peril['status'] = df_summary_peril['status'].fillna(no_return)
# Compile summary of exposure data
exposure_summary = {}
# Create totals section
exposure_summary['total'] = get_exposure_totals(df_summary_peril)
for peril_id in peril_list:
exposure_summary[peril_id] = {}
# Create dictionary structure for all and each validity status
for status in ['all'] + list(OASIS_KEYS_STATUS.keys()):
exposure_summary[peril_id][status] = {}
exposure_summary[peril_id][status]['tiv'] = 0.0
exposure_summary[peril_id][status]['tiv_by_coverage'] = {}
exposure_summary[peril_id][status]['number_of_locations'] = 0
exposure_summary[peril_id][status]['number_of_locations_by_coverage'] = {}
# Fill exposure summary dictionary
if status == 'all':
exposure_summary = get_exposure_summary_all(
df_summary_peril,
exposure_summary,
peril_id
)
else:
exposure_summary = get_exposure_summary_by_status(
df_summary_peril[df_summary_peril['status'] == status],
exposure_summary,
peril_id,
status
)
return exposure_summary
@oasis_log
def write_gul_errors_map(
target_dir,
exposure_df,
keys_errors_df
):
"""
Create csv file to help map keys errors back to original exposures.
:param target_dir: directory on disk to write csv file
:type target_dir: str
:param exposure_df: source exposure dataframe
:type exposure df: pandas.DataFrame
:param keys_errors_df: keys errors dataframe
:type keys_errors_df: pandas.DataFrame
"""
cols = ['loc_id', 'PortNumber', 'AccNumber', 'LocNumber', 'peril_id', 'coverage_type_id', 'tiv', 'status', 'message']
gul_error_map_fp = os.path.join(target_dir, 'gul_errors_map.csv')
exposure_id_cols = ['loc_id', 'PortNumber', 'AccNumber', 'LocNumber']
keys_error_cols = ['loc_id', 'peril_id', 'coverage_type_id', 'status', 'message']
tiv_maps = {1: 'BuildingTIV', 2: 'OtherTIV', 3: 'ContentsTIV', 4: 'BITIV'}
exposure_cols = exposure_id_cols + list(tiv_maps.values())
keys_errors_df.columns = keys_error_cols
gul_inputs_errors_df = exposure_df[exposure_cols].merge(keys_errors_df[keys_error_cols], on=['loc_id'])
gul_inputs_errors_df['tiv'] = 0.0
for cov_type in tiv_maps:
tiv_field = tiv_maps[cov_type]
gul_inputs_errors_df['tiv'] = np.where(
gul_inputs_errors_df['coverage_type_id'] == cov_type,
gul_inputs_errors_df[tiv_field],
gul_inputs_errors_df['tiv']
)
gul_inputs_errors_df['tiv'] = gul_inputs_errors_df['tiv'].fillna(0.0)
gul_inputs_errors_df[cols].to_csv(gul_error_map_fp, index=False)
@oasis_log
[docs]
def write_exposure_summary(
target_dir,
exposure_df,
keys_fp,
keys_errors_fp,
exposure_profile
):
"""
Create exposure summary as dictionary of TIVs and number of locations
grouped by peril and validity respectively. Writes dictionary as json file
to disk.
:param target_dir: directory on disk to write exposure summary file
:type target_dir: str
:param exposure_df: source exposure dataframe
:type exposure df: pandas.DataFrame
:param keys_fp: file path to keys file
:type keys_fp: str
:param keys_errors_fp: file path to keys errors file
:type keys_errors_fp: str
:param exposure_profile: profile defining exposure file
:type exposure_profile: dict
:return: Exposure summary file path
:rtype: str
"""
keys_success_df = keys_errors_df = None
# get keys success
if keys_fp:
keys_success_df = pd.read_csv(keys_fp)[['LocID', 'PerilID', 'CoverageTypeID']]
keys_success_df['status'] = OASIS_KEYS_STATUS['success']['id']
keys_success_df.columns = ['loc_id', 'peril_id', 'coverage_type_id', 'status']
# get keys errors
if keys_errors_fp:
keys_errors_df = pd.read_csv(keys_errors_fp)[['LocID', 'PerilID', 'CoverageTypeID', 'Status', 'Message']]
keys_errors_df.columns = ['loc_id', 'peril_id', 'coverage_type_id', 'status', 'message']
if not keys_errors_df.empty:
write_gul_errors_map(target_dir, exposure_df, keys_errors_df)
# concatinate keys responses & run
df_keys = pd.concat([keys_success_df, keys_errors_df])
exposure_summary = get_exposure_summary(exposure_df, df_keys, exposure_profile)
# write exposure summary as json fileV
fp = os.path.join(target_dir, 'exposure_summary_report.json')
with io.open(fp, 'w', encoding='utf-8') as f:
f.write(json.dumps(exposure_summary, ensure_ascii=False, indent=4))
return fp