"""This file contains specific functionality needed for aggregate vulnerabilities."""importloggingimportnumbaasnbimportnumpyasnpimportpandasaspdfromnumbaimportnjitfromnumba.typedimportDict,Listfromnumba.typesimportint32asnb_int32fromoasis_data_manager.filestore.backends.baseimportBaseStoragefromoasislmf.pytools.common.dataimportareaperil_int,nb_areaperil_int,nb_oasis_float,aggregatevulnerability_dtype,vulnerability_weight_dtype
[docs]defgen_empty_agg_vuln_to_vuln_ids():"""Generate empty map to store the definitions of aggregate vulnerability functions. Returns: dict[int, list[int]]: map of aggregate vulnerability id to list of vulnerability ids. """returnDict.empty(nb_int32,List.empty_list(nb_int32))
@njit(cache=True)
[docs]defgen_empty_areaperil_vuln_idx_to_weights():"""Generate empty map to store the weights of individual vulnerability functions in each aggregate vulnerability. Returns: dict[AGG_VULN_WEIGHTS_KEY_TYPE, AGG_VULN_WEIGHTS_VAL_TYPE]: map of areaperil_id, vulnerability id to weight. """returnDict.empty(AGG_VULN_WEIGHTS_KEY_TYPE,AGG_VULN_WEIGHTS_VAL_TYPE)
[docs]defread_aggregate_vulnerability(storage:BaseStorage,ignore_file_type=set()):"""Load the aggregate vulnerability definitions from file. Args: storage: (BaseStorage) the storage manager for fetching model data ignore_file_type (Set[str]): file extension to ignore when loading. Returns: np.array[AggregateVulnerability]: aggregate vulnerability table. """input_files=set(storage.listdir())if"aggregate_vulnerability.bin"ininput_filesand"bin"notinignore_file_type:logger.debug(f"loading {storage.get_storage_url('aggregate_vulnerability.bin',encode_params=False)}")withstorage.open('aggregate_vulnerability.bin')asf:aggregate_vulnerability=np.memmap(f,dtype=aggregatevulnerability_dtype,mode='r')elif"aggregate_vulnerability.csv"ininput_filesand"csv"notinignore_file_type:logger.debug(f"loading {storage.get_storage_url('aggregate_vulnerability.csv',encode_params=False)}")withstorage.open('aggregate_vulnerability.csv')asf:aggregate_vulnerability=np.loadtxt(f,dtype=aggregatevulnerability_dtype,delimiter=",",skiprows=1,ndmin=1)else:aggregate_vulnerability=Nonelogging.warning(f"Aggregate vulnerability table not found at {storage.get_storage_url('',encode_params=False)[0]}. Continuing without aggregate vulnerability definitions.")returnaggregate_vulnerability
[docs]defread_vulnerability_weights(storage:BaseStorage,ignore_file_type=set()):"""Load the vulnerability weights definitions from file. Args: storage: (BaseStorage) the storage manager for fetching model data ignore_file_type (Set[str]): file extension to ignore when loading. Returns: np.array[VulnerabilityWeight]: vulnerability weights table. """input_files=set(storage.listdir())if"weights.bin"ininput_filesand"bin"notinignore_file_type:logger.debug(f"loading {storage.get_storage_url('weights.bin',encode_params=False)}")withstorage.open("weights.bin")asf:aggregate_weights=np.memmap(f,dtype=vulnerability_weight_dtype,mode='r')elif"weights.csv"ininput_filesand"csv"notinignore_file_type:logger.debug(f"loading {storage.get_storage_url('weights.csv',encode_params=False)}")withstorage.open("weights.csv")asf:aggregate_weights=np.loadtxt(f,dtype=vulnerability_weight_dtype,delimiter=",",skiprows=1,ndmin=1)else:aggregate_weights=Nonelogging.warning(f"Vulnerability weights not found at {storage.get_storage_url('',encode_params=False)[0]}. Continuing without vulnerability weights definitions.")returnaggregate_weights
[docs]defprocess_aggregate_vulnerability(aggregate_vulnerability):"""Rearrange aggregate vulnerability definitions from tabular format to a map between aggregate vulnerability id and the list of vulnerability ids that it is made of. Args: aggregate_vulnerability (np.array[AggregateVulnerability]): aggregate vulnerability table. Returns: dict[int, list[int]]: map of aggregate vulnerability id to list of vulnerability ids. """agg_vuln_to_vuln_ids=gen_empty_agg_vuln_to_vuln_ids()ifaggregate_vulnerabilityisnotNone:agg_vuln_df=pd.DataFrame(aggregate_vulnerability)# init agg_vuln_to_vuln_ids to allow numba to compile later functions# vulnerability_id and aggregate_vulnerability_id are remapped to the internal ids# using the vulnd_dict map that contains only the vulnerability_id used in this portfolio.# here we read all aggregate vulnerability_id, then, after processing the items file,# we will filter out the aggregate vulnerability that are not used in this portfolio.foragg,grpinagg_vuln_df.groupby('aggregate_vulnerability_id'):agg_vuln_id=nb_int32(agg)ifagg_vuln_idnotinagg_vuln_to_vuln_ids:agg_vuln_to_vuln_ids[agg_vuln_id]=List.empty_list(nb_int32)forentryingrp['vulnerability_id'].to_list():agg_vuln_to_vuln_ids[agg_vuln_id].append(nb_int32(entry))returnagg_vuln_to_vuln_ids
@nb.njit(cache=True)
[docs]defprocess_vulnerability_weights(areaperil_vuln_i_to_weight,vuln_dict,aggregate_weights):""" Polpulate the useful (areaperil_id, vulnerability_i) in areaperil_vuln_i_to_weight with the weight from aggregate_weights Args: areaperil_vuln_i_to_weight: dict of useful (areaperil_id, vulnerability_i) to 0. (weight placeholder to be updated) vuln_dict: vuln_dict (Tuple[Dict[int, int]): vulnerability dictionary, vuln_id => vuln_i. aggregate_weights (np.array[VulnerabilityWeight]): vulnerability weights table. """foriinrange(len(aggregate_weights)):rec=aggregate_weights[i]ifrec['vulnerability_id']invuln_dict:key=(nb_areaperil_int(rec['areaperil_id']),vuln_dict[rec['vulnerability_id']])ifkeyinareaperil_vuln_i_to_weight:areaperil_vuln_i_to_weight[key]=nb_oasis_float(rec['weight'])