Source code for oasislmf.pytools.gulmc.aggregate

"""
This file contains specific functionality needed for aggregate vulnerabilities.

"""
import logging
import os

import numba as nb
import numpy as np
import pandas as pd
from oasis_data_manager.filestore.backends.base import BaseStorage
from oasislmf.utils.data import analysis_settings_loader
from oasislmf.pytools.common.data import (
    areaperil_int, nb_areaperil_int, nb_oasis_int, nb_oasis_float, oasis_float, oasis_int,
    aggregatevulnerability_dtype, vulnerability_weight_dtype,
)
from oasislmf.pytools.common.id_index import build as id_index_build
from oasislmf.pytools.common.hashmap import (
    init_dict as hm_init_dict, unpack as hm_unpack, rehash as hm_rehash,
    _try_add_key as hm_try_add_key, _find_key as hm_find_key,
    i_add_key_fail as hm_i_add_key_fail,
    new_slot_bit as hm_new_slot_bit, slot_mask as hm_slot_mask,
    NOT_FOUND as HM_NOT_FOUND,
)

[docs] logger = logging.getLogger(__name__)
# Key for the (areaperil_id, vuln_idx) -> weight hashmap built in # process_vulnerability_weights. Defined at module level so the structured-record # fnv1a/key_eq overloads compile once.
[docs] agg_weight_key_dtype = np.dtype([('areaperil_id', areaperil_int), ('vuln_idx', oasis_int)])
[docs] def read_aggregate_vulnerability(storage: BaseStorage, ignore_file_type=set()): """Load the aggregate vulnerability definitions from file. Args: storage: (BaseStorage) the storage manager for fetching model data ignore_file_type (Set[str]): file extension to ignore when loading. Returns: np.array[AggregateVulnerability]: aggregate vulnerability table. """ input_files = set(storage.listdir()) if "aggregate_vulnerability.bin" in input_files and "bin" not in ignore_file_type: logger.debug(f"loading {storage.get_storage_url('aggregate_vulnerability.bin', encode_params=False)}") with storage.open('aggregate_vulnerability.bin') as f: aggregate_vulnerability = np.memmap(f, dtype=aggregatevulnerability_dtype, mode='r') elif "aggregate_vulnerability.csv" in input_files and "csv" not in ignore_file_type: logger.debug(f"loading {storage.get_storage_url('aggregate_vulnerability.csv', encode_params=False)}") with storage.open('aggregate_vulnerability.csv') as f: aggregate_vulnerability = np.loadtxt(f, dtype=aggregatevulnerability_dtype, delimiter=",", skiprows=1, ndmin=1) else: aggregate_vulnerability = None logger.warning( f"Aggregate vulnerability table not found at {storage.get_storage_url('', encode_params=False)[0]}. Continuing without aggregate vulnerability definitions.") return aggregate_vulnerability
[docs] def read_vulnerability_weights(storage: BaseStorage, ignore_file_type=set()): """Load the vulnerability weights definitions from file. Args: storage: (BaseStorage) the storage manager for fetching model data ignore_file_type (Set[str]): file extension to ignore when loading. Returns: np.array[VulnerabilityWeight]: vulnerability weights table. """ input_files = set(storage.listdir()) if "weights.bin" in input_files and "bin" not in ignore_file_type: logger.debug(f"loading {storage.get_storage_url('weights.bin', encode_params=False)}") with storage.open("weights.bin") as f: aggregate_weights = np.memmap(f, dtype=vulnerability_weight_dtype, mode='r') elif "weights.csv" in input_files and "csv" not in ignore_file_type: logger.debug(f"loading {storage.get_storage_url('weights.csv', encode_params=False)}") with storage.open("weights.csv") as f: aggregate_weights = np.loadtxt(f, dtype=vulnerability_weight_dtype, delimiter=",", skiprows=1, ndmin=1) else: aggregate_weights = None logger.warning( f"Vulnerability weights not found at {storage.get_storage_url('', encode_params=False)[0]}. Continuing without vulnerability weights definitions.") return aggregate_weights
[docs] def process_aggregate_vulnerability(aggregate_vulnerability): """Rearrange aggregate vulnerability definitions from tabular format into CRS arrays mapping aggregate vulnerability id to the list of vulnerability ids that compose it. Args: aggregate_vulnerability (np.array[AggregateVulnerability]): aggregate vulnerability table. Returns: agg_vuln_ids (np.array[oasis_int]): sorted array of aggregate vulnerability ids. agg_vuln_id_ja_offsets (np.array[oasis_int]): jagged array offsets. Row i spans agg_vuln_id_ja_vuln_ids[agg_vuln_id_ja_offsets[i]:agg_vuln_id_ja_offsets[i+1]]. agg_vuln_id_ja_vuln_ids (np.array[oasis_int]): flat jagged array of constituent vulnerability ids. """ if aggregate_vulnerability is not None and len(aggregate_vulnerability) > 0: agg_vuln_df = pd.DataFrame(aggregate_vulnerability) # Group by aggregate_vulnerability_id, preserving insertion order of sub-vulns. # Sort groups so agg_vuln_ids is sorted for binary search in generate_item_map. grouped = agg_vuln_df.groupby('aggregate_vulnerability_id', sort=True) n_groups = len(grouped) n_entries = len(agg_vuln_df) agg_vuln_ids = np.empty(n_groups, dtype=oasis_int) agg_vuln_id_ja_vuln_ids = np.empty(n_entries, dtype=oasis_int) agg_vuln_id_ja_offsets = np.empty(n_groups + 1, dtype=oasis_int) agg_vuln_id_ja_offsets[0] = 0 ptr = 0 for i, (agg_id, grp) in enumerate(grouped): agg_vuln_ids[i] = agg_id sub_vulns = grp['vulnerability_id'].values n_sub = len(sub_vulns) agg_vuln_id_ja_vuln_ids[ptr:ptr + n_sub] = sub_vulns ptr += n_sub agg_vuln_id_ja_offsets[i + 1] = ptr else: agg_vuln_ids = np.empty(0, dtype=oasis_int) agg_vuln_id_ja_vuln_ids = np.empty(0, dtype=oasis_int) agg_vuln_id_ja_offsets = np.zeros(1, dtype=oasis_int) agg_vuln_id_ja_id_ind = id_index_build(agg_vuln_ids) return agg_vuln_ids, agg_vuln_id_ja_id_ind, agg_vuln_id_ja_offsets, agg_vuln_id_ja_vuln_ids
@nb.njit(cache=True)
[docs] def process_vulnerability_weights(areaperil_agg_vuln_idx_ja_areaperil_ids, areaperil_agg_vuln_idx_ja_data, vuln_map, vuln_map_keys, aggregate_weights): """ Populate the weight field in the merged data array by matching aggregate_weights records. Builds a (areaperil_id, vuln_idx) -> weight hashmap from aggregate_weights once, then iterates entries with one O(1) lookup each. Total cost: O(W + E). Iterating from the weight side handles the case where the same (ap, vuln_idx) pair appears in multiple entries (two distinct aggregate definitions sharing a sub-vuln on the same areaperil): every such entry gets the same weight, since each is looked up independently. Args: areaperil_agg_vuln_idx_ja_areaperil_ids (np.array[areaperil_int]): areaperil_id for each entry. areaperil_agg_vuln_idx_ja_data (np.array[agg_vuln_idx_weight_dtype]): merged (vuln_idx, weight) per entry. vuln_map (np.ndarray[uint8]): packed hashmap table mapping vuln_id to dense index. vuln_map_keys (np.ndarray[int32]): array of unique vulnerability ids (hashmap keys). aggregate_weights (np.array[VulnerabilityWeight]): vulnerability weights table. """ n_weights = len(aggregate_weights) n_entries = len(areaperil_agg_vuln_idx_ja_data) if n_weights == 0 or n_entries == 0: return # Build (areaperil_id, vuln_idx) -> weight hashmap from aggregate_weights. # Pre-sized to fit all weights so the load-factor guard never fires; only the # rehash path on Robin-Hood collision is kept. weight_key_table = np.empty(n_weights, dtype=agg_weight_key_dtype) weight_values = np.empty(n_weights, dtype=oasis_float) weight_table = hm_init_dict(n_weights) w_info, w_lookup, w_index = hm_unpack(weight_table) vm_info, vm_lookup, vm_index = hm_unpack(vuln_map) n_valid = 0 for i in range(n_weights): rec = aggregate_weights[i] vslot = hm_find_key(vm_info, vm_lookup, vm_index, vuln_map_keys, rec['vulnerability_id']) if vslot == HM_NOT_FOUND: continue weight_key_table[n_valid]['areaperil_id'] = nb_areaperil_int(rec['areaperil_id']) weight_key_table[n_valid]['vuln_idx'] = nb_oasis_int(vm_index[vslot]) weight_values[n_valid] = nb_oasis_float(rec['weight']) result = hm_try_add_key(w_info, w_lookup, w_index, weight_key_table, weight_key_table[n_valid], n_valid) while result == hm_i_add_key_fail: weight_table = hm_rehash(weight_table, weight_key_table) w_info, w_lookup, w_index = hm_unpack(weight_table) result = hm_try_add_key(w_info, w_lookup, w_index, weight_key_table, weight_key_table[n_valid], n_valid) if result & hm_new_slot_bit: n_valid += 1 else: # Duplicate (ap, vuln_idx) in aggregate_weights: preserve the old "last wins" # behavior by overwriting the previously-stored weight at its original position. weight_values[w_index[result & hm_slot_mask]] = nb_oasis_float(rec['weight']) # Apply weights to entries with O(1) lookup each. lookup_key = np.empty(1, dtype=agg_weight_key_dtype)[0] for j in range(n_entries): lookup_key['areaperil_id'] = areaperil_agg_vuln_idx_ja_areaperil_ids[j] lookup_key['vuln_idx'] = areaperil_agg_vuln_idx_ja_data[j]['vuln_idx'] eslot = hm_find_key(w_info, w_lookup, w_index, weight_key_table, lookup_key) if eslot != HM_NOT_FOUND: areaperil_agg_vuln_idx_ja_data[j]['weight'] = weight_values[w_index[eslot]]
[docs] def get_vuln_rngadj(run_dir, vuln_map, vuln_map_keys): """ Loads vulnerability adjustments from the analysis settings file. Args: run_dir (str): path to the run directory (used to load the analysis settings) vuln_map (np.ndarray[uint8]): packed hashmap table mapping vuln_id to dense index. vuln_map_keys (np.ndarray[int32]): array of unique vulnerability ids (hashmap keys). Returns: (np.ndarray[oasis_float]) vulnerability adjustments array, indexed by dense vuln index. """ settings_path = os.path.join(run_dir, "analysis_settings.json") vuln_adj = np.ones(len(vuln_map_keys), dtype=oasis_float) if not os.path.exists(settings_path): logger.debug(f"analysis_settings.json not found in {run_dir}.") return vuln_adj vulnerability_adjustments_field = analysis_settings_loader(settings_path).get('vulnerability_adjustments', None) if vulnerability_adjustments_field is not None: adjustments = vulnerability_adjustments_field.get('adjustments', None) else: adjustments = None if adjustments is None: logger.debug(f"vulnerability_adjustments not found in {settings_path}.") return vuln_adj hm_info, hm_lookup, hm_index = hm_unpack(vuln_map) for key, value in adjustments.items(): slot = hm_find_key(hm_info, hm_lookup, hm_index, vuln_map_keys, np.int32(int(key))) if slot != HM_NOT_FOUND: idx = hm_index[slot] vuln_adj[idx] = nb_oasis_float(value) return vuln_adj