Source code for oasislmf.preparation.gul_inputs

__all__ = [
    'get_gul_input_items',
    'write_gul_input_files',
]
import os
import warnings
from collections import OrderedDict

import pandas as pd
import numpy as np

from oasislmf.pytools.common.data import (correlations_headers, correlations_dtype, amplifications_dtype, items_dtype,
                                          coverages_dtype, item_adjustment_dtype,
                                          complex_items_meta_dtype,
                                          item_id, coverage_id, group_id, section_id,
                                          DTYPE_IDX)
from oasislmf.pytools.converters.csvtobin.utils import complex_items_write_bin, amplifications_write_bin
from oasislmf.pytools.converters.csvtobin.utils.common import df_to_ndarray
from oasislmf.utils.data import merge_dataframes
from oasislmf.utils.defaults import (CORRELATION_GROUP_ID,
                                     DAMAGE_GROUP_ID_COLS,
                                     HAZARD_GROUP_ID_COLS,
                                     OASIS_FILES_PREFIXES, SOURCE_IDX,
                                     get_default_exposure_profile)
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.fm import SUPPORTED_FM_LEVELS
from oasislmf.utils.log import oasis_log
from oasislmf.utils.path import as_path
from oasislmf.utils.profiles import get_grouped_fm_profile_by_level_and_term_group, get_oed_hierarchy

pd.options.mode.chained_assignment = None
warnings.simplefilter(action='ignore', category=FutureWarning)

VALID_OASIS_GROUP_COLS = [
    'item_id',
    'peril_id',
    'coverage_id',
    'coverage_type_id',
    'peril_correlation_group',
    'building_id',
    'risk_id'
]

PERIL_CORRELATION_GROUP_COL = 'peril_correlation_group'


def prepare_sections_df(gul_inputs_df):
    sections = gul_inputs_df.loc[:, ['section_id']].drop_duplicates()
    sections['section_id'] = sections['section_id'].astype(str).str.split(';')
    sections = sections.explode('section_id').drop_duplicates()
    sections['section_id'] = sections['section_id'].astype(section_id[DTYPE_IDX])
    return sections


def coverages_write_gul_bin(data, file_path, dtype):
    df_to_ndarray(data, dtype)["tiv"].tofile(file_path)


def complex_items_write_gul_bin(data, file_path, dtype):
    with open(file_path, 'wb') as f:
        complex_items_write_bin(data, f, dtype)


def amplifications_write_gul_bin(data, file_path, dtype):
    with open(file_path, 'wb') as f:
        amplifications_write_bin(df_to_ndarray(data, dtype), f)


def default_write_gul_bin(data, file_path, dtype):
    df_to_ndarray(data, dtype).tofile(file_path)


files_write_info = {
    'complex_items': {"csv_dtype": {'item_id': item_id[DTYPE_IDX], 'coverage_id': coverage_id[DTYPE_IDX], 'model_data': str, 'group_id': group_id[DTYPE_IDX]},
                      "bin_dtype": complex_items_meta_dtype,
                      "write_bin": complex_items_write_gul_bin,
                      "required_col": {'model_data'}},

    'items': {"csv_dtype": {col: dtype for col, (dtype, _) in items_dtype.fields.items()},
              "bin_dtype": items_dtype},
    'coverages': {"csv_dtype": {col: dtype for col, (dtype, _) in coverages_dtype.fields.items()},
                  "bin_dtype": coverages_dtype,
                  "write_bin": coverages_write_gul_bin},
    'amplifications': {"csv_dtype": {col: dtype for col, (dtype, _) in amplifications_dtype.fields.items()},
                       "bin_dtype": amplifications_dtype,
                       "write_bin": amplifications_write_gul_bin,
                       "required_col": {'amplification_id'}},
    'sections': {"csv_dtype": {'section_id': section_id[DTYPE_IDX]},
                 "prepare_data": prepare_sections_df,
                 "required_col": {'section_id'}},
    'item_adjustments': {"csv_dtype": {col: dtype for col, (dtype, _) in item_adjustment_dtype.fields.items()},
                         "required_col": {'intensity_adjustment'}}
}


def process_group_id_cols(group_id_cols, exposure_df_columns, has_correlation_groups):
    """
    cleans out columns that are not valid oasis group columns.

    Valid group id columns can be either
    1. exist in the location file
    2. be listed as a useful internal col

    Args:
        group_id_cols: (List[str]) the ID columns that are going to be filtered
        exposure_df_columns: (List[str]) the columns in the exposure dataframe
        has_correlation_groups: (bool) if set to True means that we are hashing with correlations in mind therefore the
                             "peril_correlation_group" column is added

    Returns: (List[str]) the filtered columns
    """
    for col in group_id_cols:
        if col not in list(exposure_df_columns) + VALID_OASIS_GROUP_COLS:
            warnings.warn('Column {} not found in loc file, or a valid internal oasis column'.format(col))
            group_id_cols.remove(col)

    if PERIL_CORRELATION_GROUP_COL not in group_id_cols and has_correlation_groups is True:
        group_id_cols.append(PERIL_CORRELATION_GROUP_COL)

    return group_id_cols


@oasis_log
[docs] def get_gul_input_items( location_df, keys_df, correlations=False, peril_correlation_group_df=None, exposure_profile=get_default_exposure_profile(), damage_group_id_cols=None, hazard_group_id_cols=None, do_disaggregation=True ): """ Generates GUL (Ground-Up Loss) input items by combining location and keys data. This function creates the foundational data structure for loss calculations by merging exposure (location) data with model keys data. Each resulting row represents a unique combination of location, peril, coverage type, and building that will flow through the loss calculation pipeline. Overview of Processing Flow: =========================== 1. SETUP: Load profiles, extract TIV columns, prepare location data 2. MERGE: Join location_df with keys_df on loc_id to create base GUL items 3. COVERAGE UNPACKING: Set TIV values based on coverage_type_id 4. DISAGGREGATION: If enabled, expand rows by NumberOfBuildings 5. ID ASSIGNMENT: Compute item_id, coverage_id, group_id, hazard_group_id 6. FINALIZE: Select required columns and return Key Data Structures: =================== - location_df: Source exposure data with TIV columns (BuildingTIV, ContentsTIV, etc.) - keys_df: Model lookup results mapping locations to model-specific IDs (areaperil_id, vulnerability_id, peril_id, coverage_type_id) - gul_inputs_df: Output DataFrame with one row per (location, peril, coverage, building) Key Output Columns: ================== - item_id: Unique identifier for each GUL item (loc_id, peril_id, coverage_type_id, building_id) - coverage_id: Groups items by (loc_id, building_id, coverage_type_id) - group_id: Damage correlation group (hashed from damage_group_id_cols) - hazard_group_id: Hazard correlation group (hashed from hazard_group_id_cols) - tiv: Total Insured Value for this item's coverage type - areaperil_id, vulnerability_id: Model-specific identifiers from keys Disaggregation: ============== When do_disaggregation=True and NumberOfBuildings > 1: - TIV is divided by NumberOfBuildings - Rows are repeated NumberOfBuildings times - Each repeated row gets a unique building_id (1 to NumberOfBuildings) This allows modeling individual buildings within an aggregate location. Args: location_df (pandas.DataFrame): Exposure data with columns including loc_id, PortNumber, AccNumber, LocNumber, TIV columns, NumberOfBuildings, IsAggregate. keys_df (pandas.DataFrame): Model keys with columns including locid/loc_id, perilid, coveragetypeid, areaperilid, vulnerabilityid. correlations (bool, optional): If True, merge with peril_correlation_group_df for correlation modeling. Default False. peril_correlation_group_df (pandas.DataFrame, optional): Correlation group definitions when correlations=True. exposure_profile (dict, optional): Maps OED fields to FM term types. damage_group_id_cols (list[str], optional): Columns used to compute group_id via hashing. Default: ['loc_id', 'peril_correlation_group']. hazard_group_id_cols (list[str], optional): Columns used to compute hazard_group_id via hashing. Default: ['loc_id']. do_disaggregation (bool, optional): If True, split aggregate locations by NumberOfBuildings. Default True. Returns: pandas.DataFrame: GUL inputs with columns including item_id, coverage_id, group_id, hazard_group_id, tiv, areaperil_id, vulnerability_id, peril_id, coverage_type_id, building_id, and location identifiers. Raises: OasisException: If exposure profile is missing FM term information. OasisException: If merge of location and keys data produces empty result. OasisException: If all rows have zero TIV after filtering. """ # ========================================================================= # SETUP PHASE: Load profiles and extract configuration # ========================================================================= # Get the grouped exposure profile - describes FM terms (TIV, limit, deductible) # for site coverage level and OED hierarchy terms (portfolio, account, location) profile = get_grouped_fm_profile_by_level_and_term_group(exposure_profile=exposure_profile) if not profile: raise OasisException( 'Source exposure profile is possibly missing FM term information: ' 'FM term definitions for TIV, limit, deductible, attachment and/or share.' ) # Get the OED hierarchy terms profile - this defines the column names for loc. # ID, acc. ID, policy no. and portfolio no., as used in the source exposure # and accounts files. This is to ensure that the method never makes hard # coded references to the corresponding columns in the source files, as # that would mean that changes to these column names in the source files # may break the method oed_hierarchy = get_oed_hierarchy(exposure_profile=exposure_profile) loc_num = oed_hierarchy['locnum']['ProfileElementName'] acc_num = oed_hierarchy['accnum']['ProfileElementName'] portfolio_num = oed_hierarchy['portnum']['ProfileElementName'] # The (site) coverage FM level ID (# 1 in the OED FM levels hierarchy) cov_level_id = SUPPORTED_FM_LEVELS['site coverage']['id'] # Get the TIV column names and corresponding coverage types tiv_terms = OrderedDict({v['tiv']['CoverageTypeID']: v['tiv']['ProfileElementName'] for k, v in profile[cov_level_id].items()}) tiv_cols = list(set(tiv_col for tiv_col in tiv_terms.values() if tiv_col in location_df.columns)) # Create the basic GUL inputs dataframe from merging the exposure and # keys dataframes on loc. number/loc. ID; filter out any rows with # zeros for TIVs for all coverage types, and replace any nulls in the # cond.num. and TIV columns with zeros # add default values if missing if 'IsAggregate' not in location_df.columns: location_df['IsAggregate'] = 0 else: location_df['IsAggregate'] = location_df['IsAggregate'].fillna(0) # Make sure NumberOfBuildings is there and filled (not mandatory), otherwise assume NumberOfBuildings = 1 if 'NumberOfBuildings' not in location_df.columns: location_df['NumberOfBuildings'] = 1 else: location_df['NumberOfBuildings'] = location_df['NumberOfBuildings'].fillna(1) # Select only the columns required. This reduces memory use significantly for portfolios # that include many OED columns. exposure_df_gul_inputs_cols = ['loc_id', portfolio_num, acc_num, loc_num, 'NumberOfBuildings', 'IsAggregate', 'LocPeril'] + tiv_cols if SOURCE_IDX['loc'] in location_df: exposure_df_gul_inputs_cols += [SOURCE_IDX['loc']] # it is assumed that correlations are False for now, correlations for group ID hashing are assessed later on in # the process to re-hash the group ID with the correlation "peril_correlation_group" column name. This is because # the correlations is achieved later in the process leading to a chicken and egg problem # set damage_group_id_cols if not damage_group_id_cols: # damage_group_id_cols is None or an empty list damage_group_id_cols = DAMAGE_GROUP_ID_COLS else: # remove any duplicate column names used to assign group_id damage_group_id_cols = list(set(damage_group_id_cols)) # only add damage group col if not an internal oasis col or if not present already in exposure_df_gul_inputs_cols for col in damage_group_id_cols: if col in VALID_OASIS_GROUP_COLS: pass elif col not in exposure_df_gul_inputs_cols: exposure_df_gul_inputs_cols.append(col) # set hazard_group_id_cols if not hazard_group_id_cols: # hazard_group_id_cols is None or an empty list hazard_group_id_cols = HAZARD_GROUP_ID_COLS else: # remove any duplicate column names used to assign group_id hazard_group_id_cols = list(set(hazard_group_id_cols)) # only add hazard group col if not an internal oasis col or if not present already in exposure_df_gul_inputs_cols for col in hazard_group_id_cols: if col in VALID_OASIS_GROUP_COLS: pass elif col not in exposure_df_gul_inputs_cols: exposure_df_gul_inputs_cols.append(col) # Check if correlation group field is used to drive damage group id # and test that it's present and populated with integers correlation_group_id = CORRELATION_GROUP_ID correlation_field = correlation_group_id[0] correlation_check = False if damage_group_id_cols == correlation_group_id: if correlation_field in location_df.columns: if location_df[correlation_field].astype('uint32').isnull().sum() == 0: correlation_check = True actual_tiv_cols = [tiv_col for tiv_col in tiv_cols if tiv_col in location_df.columns] location_df[actual_tiv_cols] = location_df[actual_tiv_cols].fillna(0.0) location_df = location_df[(location_df[actual_tiv_cols] != 0).any(axis=1)] gul_inputs_df = (location_df[list(set(exposure_df_gul_inputs_cols).intersection(location_df.columns))] .drop_duplicates('loc_id', ignore_index=True)) # ========================================================================= # MERGE PHASE: Join location data with model keys # ========================================================================= # Keys file uses camelCase headers; rename to snake_case for consistency keys_df.rename( columns={ 'locid': 'loc_id' if 'loc_id' not in keys_df else 'locid', 'perilid': 'peril_id', 'coveragetypeid': 'coverage_type_id', 'areaperilid': 'areaperil_id', 'vulnerabilityid': 'vulnerability_id', 'amplificationid': 'amplification_id', 'modeldata': 'model_data', 'intensityadjustment': 'intensity_adjustment', 'returnperiod': 'return_period' }, inplace=True, copy=False # Pandas copies column data by default on rename ) # If the keys file relates to a complex/custom model then look for a # ``modeldata`` column in the keys file, and ignore the area peril # and vulnerability ID columns, unless it's the dynamic model generator which # uses them if 'model_data' in keys_df and 'areaperil_id' not in keys_df and 'vulnerbaility_id' not in keys_df: keys_df['areaperil_id'] = keys_df['vulnerability_id'] = -1 gul_inputs_df = merge_dataframes( keys_df, gul_inputs_df, join_on='loc_id', how='inner', ) if gul_inputs_df.empty: raise OasisException( 'Inner merge of the exposure file dataframe ' 'and the keys file dataframe on loc. number/loc. ID ' 'is empty - ' 'please check that the loc. number and loc. ID columns ' 'in the exposure and keys files respectively have a non-empty ' 'intersection' ) # Free memory after merge, before memory-intensive restructuring of data del keys_df # ========================================================================= # COVERAGE UNPACKING: Set TIV based on coverage type # ========================================================================= # Map coverage_type_id to the appropriate TIV column (BuildingTIV, ContentsTIV, etc.) cols_by_cov_type = {} for cov_type in gul_inputs_df.coverage_type_id.unique(): tiv_col = tiv_terms[cov_type] cols_by_cov_type[cov_type] = { 'tiv_col': tiv_col } # If disaggregating, divide TIV by NumberOfBuildings before assigning if do_disaggregation: # split TIV gul_inputs_df[tiv_cols] = gul_inputs_df[tiv_cols].div(np.maximum(1, gul_inputs_df['NumberOfBuildings']), axis=0) # Set tiv column based on coverage_type_id (vectorized) gul_inputs_df['tiv'] = 0.0 for cov_type, tiv_col in cols_by_cov_type.items(): mask = gul_inputs_df['coverage_type_id'] == cov_type gul_inputs_df.loc[mask, 'tiv'] = gul_inputs_df.loc[mask, tiv_col['tiv_col']] # Filter out rows with zero TIV gul_inputs_df = gul_inputs_df[gul_inputs_df['tiv'] > 0] if gul_inputs_df.empty: raise OasisException('Empty gul_inputs_df dataframe after dropping rows with zero tiv: please check the exposure input files') # ========================================================================= # DISAGGREGATION: Expand rows by NumberOfBuildings # ========================================================================= # For aggregate locations (NumberOfBuildings > 1), create one row per building # Each building gets a unique building_id and its share of the TIV if do_disaggregation: repeat_counts = np.maximum(1, gul_inputs_df['NumberOfBuildings'].values).astype(int) # Repeat rows using np.repeat + iloc (faster than iterative expansion) gul_inputs_df = gul_inputs_df.iloc[np.repeat(np.arange(len(gul_inputs_df)), repeat_counts)].reset_index(drop=True) # Assign building_id: 1, 2, ..., n for each location gul_inputs_df['building_id'] = np.concatenate([np.arange(1, n + 1) for n in repeat_counts]) else: gul_inputs_df = gul_inputs_df.copy() gul_inputs_df['building_id'] = 1 # ========================================================================= # ID ASSIGNMENT: Compute item_id, coverage_id, group_id, hazard_group_id # ========================================================================= # risk_id/NumberOfRisks: Used for aggregate exposure handling in FM gul_inputs_df[['risk_id', 'NumberOfRisks']] = gul_inputs_df[['building_id', 'NumberOfBuildings']] gul_inputs_df.loc[gul_inputs_df['IsAggregate'] == 0, ['risk_id', 'NumberOfRisks']] = 1, 1 gul_inputs_df.loc[gul_inputs_df['NumberOfRisks'] == 0, 'NumberOfRisks'] = 1 # item_id: Unique per (location, peril, coverage_type, building) - the fundamental GUL unit gul_inputs_df['item_id'] = gul_inputs_df.groupby( ['loc_id', 'peril_id', 'coverage_type_id', 'building_id'], sort=False, observed=True).ngroup().astype('int32') + 1 # coverage_id: Groups items by (location, building, coverage_type) - ignores peril gul_inputs_df['coverage_id'] = gul_inputs_df.groupby( ['loc_id', 'building_id', 'coverage_type_id'], sort=False, observed=True).ngroup().astype('int32') + 1 # group_id and hazard_group_id: Correlation groups for damage/hazard sampling # If the group id is set according to the correlation group field then map this field # directly, otherwise create an index of the group id fields # keep group_id consistance by lower casing column names and sorting damage_group_id_cols_map = {c: c.lower() for c in sorted(damage_group_id_cols)} # mapping from PascalCase -> 'lower_case' hazard_group_id_cols_map = {c: c.lower() for c in sorted(hazard_group_id_cols)} # mapping from PascalCase -> 'lower_case' if correlation_check is True: gul_inputs_df['group_id'] = gul_inputs_df[correlation_group_id] if correlations: # do merge with peril correlation df gul_inputs_df = gul_inputs_df.merge(peril_correlation_group_df, left_on='peril_id', right_on='id').reset_index() else: gul_inputs_df[["peril_correlation_group", "damage_correlation_value", "hazard_correlation_value"]] = 0 gul_inputs_df["group_id"] = ( pd.util.hash_pandas_object( gul_inputs_df.rename(columns=damage_group_id_cols_map)[sorted(list(damage_group_id_cols_map.values()))], index=False).to_numpy() >> 33 ).astype('uint32') gul_inputs_df["hazard_group_id"] = ( pd.util.hash_pandas_object( gul_inputs_df.rename(columns=hazard_group_id_cols_map)[sorted(list(hazard_group_id_cols_map.values()))], index=False).to_numpy() >> 33 ).astype('uint32') # ========================================================================= # FINALIZE: Select required columns and return # ========================================================================= keyscols = ['peril_id', 'coverage_type_id', 'tiv', 'areaperil_id', 'vulnerability_id'] additionalcols = ['amplification_id', 'section_id', 'intensity_adjustment', 'return_period'] for col in additionalcols: if col in gul_inputs_df.columns: keyscols += [col] usecols = ( ['loc_id', portfolio_num, acc_num, loc_num] + ([SOURCE_IDX['loc']] if SOURCE_IDX['loc'] in gul_inputs_df else []) + keyscols + (['model_data'] if 'model_data' in gul_inputs_df else []) + # disagg_id is needed for fm_summary_map ['group_id', 'coverage_id', 'item_id', 'status', 'building_id', 'NumberOfBuildings', 'IsAggregate', 'LocPeril'] + tiv_cols + ["peril_correlation_group", "damage_correlation_value", 'hazard_group_id', "hazard_correlation_value"] ) usecols = [col for col in usecols if col in gul_inputs_df] gul_inputs_df = ( gul_inputs_df [usecols] .drop_duplicates(subset='item_id') .sort_values("item_id", kind='stable') .reset_index() ) return gul_inputs_df
@oasis_log def write_file(gul_inputs_df, file_path, file_dtype, chunksize=100000): try: gul_inputs_df[file_dtype.keys()].astype(file_dtype).drop_duplicates().to_csv( path_or_buf=file_path, encoding='utf-8', mode=('w' if os.path.exists(file_path) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException(f"Exception raised while writing {file_path} {file_dtype}", e) return file_path @oasis_log
[docs] def write_gul_input_files( gul_inputs_df, target_dir, correlations_df, output_dir, oasis_files_prefixes=OASIS_FILES_PREFIXES['gul'], chunksize=(2 * 10 ** 5), intermediary_csv=False, ): """Write standard Oasis GUL input files to a target directory. Writes binary files (items.bin, coverages.bin) directly from a pre-generated dataframe of GUL input items. Optional files (complex_items.bin, amplifications.bin) are written when the corresponding columns are present. Files that have no binary consumer (sections.csv, item_adjustments.csv) are always written as CSV. Args: gul_inputs_df (pd.DataFrame): GUL inputs dataframe. target_dir (str): Target directory in which to write the files. correlations_df (pd.DataFrame): Correlations dataframe. If None, an empty dataframe with correlations_headers columns is used. output_dir (str): Output directory for correlations files. oasis_files_prefixes (dict): Oasis GUL input file name prefixes. Defaults to OASIS_FILES_PREFIXES['gul']. chunksize (int): Chunk size for writing CSV files. Defaults to 200000. intermediary_csv (bool): If True, also write CSV files alongside binary for debugging. Defaults to False. Returns: dict: Mapping of file names to their written file paths. """ # Clean the target directory path target_dir = as_path(target_dir, 'Target IL input files directory', is_dir=True, preexists=False) # write the correlations to a binary file if correlations_df is None: correlations_df = pd.DataFrame(columns=correlations_headers) correlations_df.to_csv(f"{output_dir}/correlations.csv", index=False) correlations_df_np_data = np.array([r for r in correlations_df.itertuples(index=False)], dtype=correlations_dtype) correlations_df_np_data.tofile(f"{output_dir}/correlations.bin") # Set chunk size for writing the CSV files - default is the minimum of 100K # or the GUL inputs frame size chunksize = chunksize or min(chunksize, len(gul_inputs_df)) # A dict of GUL input file names and file paths gul_input_files = {} # Write the gul_inputs_df files serially for fm_name, file_name in oasis_files_prefixes.items(): file_write_info = files_write_info[fm_name] if not file_write_info.get("required_col", set()).issubset(gul_inputs_df.columns): continue data = file_write_info.get('prepare_data', lambda x: x[list(file_write_info['csv_dtype'].keys())] # default )(gul_inputs_df).drop_duplicates() if "bin_dtype" in file_write_info: file_path = os.path.join(target_dir, f"{file_name}.bin") gul_input_files[fm_name] = file_path oasis_log(file_write_info.get("write_bin", default_write_gul_bin))(data, file_path, file_write_info['bin_dtype']) if intermediary_csv: file_path = os.path.join(target_dir, f"{file_name}.csv") write_file(data, file_path, file_write_info['csv_dtype'], chunksize=chunksize) else: file_path = os.path.join(target_dir, f"{file_name}.csv") gul_input_files[fm_name] = file_path write_file(data, file_path, file_write_info['csv_dtype'], chunksize=chunksize) return gul_input_files