"""
This file contains specific functionality to read and process items files.
"""
import logging
import os
import numpy as np
import numba as nb
from numba.types import int32 as nb_int32
from oasislmf.pytools.common.data import areaperil_int, nb_oasis_int, oasis_int, items_dtype
from oasislmf.pytools.common.id_index import get_idx, NOT_FOUND
from oasislmf.pytools.common.hashmap import (
init_dict, unpack, rehash, _try_add_key,
i_add_key_fail, new_slot_bit, slot_mask,
HM_INFO_N_VALID,
)
from oasislmf.pytools.getmodel.manager import encode_peril_id
from oasislmf.pytools.gulmc.common import ItemAdjustment, agg_vuln_idx_weight_dtype
[docs]
logger = logging.getLogger(__name__)
[docs]
def read_items(input_path, ignore_file_type=set()):
"""Load the items from the items file.
Args:
input_path (str): the path pointing to the file
ignore_file_type (Set[str]): file extension to ignore when loading.
Returns:
np.ndarray[items_dtype]: items table loaded from items.bin (memmap'd) or items.csv.
"""
input_files = set(os.listdir(input_path))
if "items.bin" in input_files and "bin" not in ignore_file_type:
items_fname = os.path.join(input_path, 'items.bin')
logger.debug(f"loading {items_fname}")
items = np.memmap(items_fname, dtype=items_dtype, mode='r')
elif "items.csv" in input_files and "csv" not in ignore_file_type:
items_fname = os.path.join(input_path, 'items.csv')
logger.debug(f"loading {items_fname}")
items = np.loadtxt(items_fname, dtype=items_dtype, delimiter=",", skiprows=1, ndmin=1)
else:
raise FileNotFoundError(f'items file not found at {input_path}')
return items
@nb.njit(cache=True, fastmath=True)
[docs]
def generate_item_map(items, coverages,
agg_vuln_id_ja_id_ind, agg_vuln_id_ja_offsets, agg_vuln_id_ja_vuln_ids):
"""Generate item_map as a two-level jagged array; requires items to be sorted.
Items must be sorted by (areaperil_id, vulnerability_id). The function builds a
two-level CSR structure that replaces the former Numba Dict item_map and areaperil_ids_map:
Level 0: areaperil_id → areaperil_ind (via id_index)
Level 1: areaperil_ind → pair index range (areaperil_to_vuln_ja_offsets)
Level 2: pair position → item indices (areaperil_to_vuln_ja_vuln_ja_offsets / areaperil_to_vuln_ja_vuln_ja_item_idxs)
Args:
items (numpy.ndarray): 1-d structured array storing item data, sorted by
(areaperil_id, vulnerability_id). Must have 'areaperil_agg_vuln_idx' field.
coverages (numpy.ndarray): coverage id to information on items.
agg_vuln_id_ja_id_ind (np.array): id_index for aggregate vulnerability ids.
agg_vuln_id_ja_offsets (np.array[oasis_int]): jagged array offsets for agg_vuln_id_ja_vuln_ids.
agg_vuln_id_ja_vuln_ids (np.array[oasis_int]): flat jagged array of constituent vulnerability ids.
Returns:
areaperil_to_vuln_ja_areaperil_ids (np.array[areaperil_int]): sorted unique areaperil_ids.
areaperil_to_vuln_ja_offsets (np.array[oasis_int]): L1 CSR offsets (N_areaperil + 1).
areaperil_to_vuln_ja_vuln_ja_offsets (np.array[oasis_int]): L2 CSR offsets (N_pairs + 1).
areaperil_to_vuln_ja_vuln_ja_item_idxs (np.array[oasis_int]): flat item indices into items array.
vuln_map (np.ndarray[uint8]): packed hashmap table mapping vuln_id to dense index.
vuln_map_keys (np.ndarray[int32]): array of unique vulnerability ids.
areaperil_agg_vuln_idx_ja_offsets (np.array[oasis_int]): CSR offsets for aggregate vulns.
areaperil_agg_vuln_idx_ja_data (np.array[agg_vuln_idx_weight_dtype]): merged (vuln_idx, weight) per entry.
areaperil_agg_vuln_idx_ja_areaperil_ids (np.array[areaperil_int]): areaperil_id per aggregate entry.
"""
N = len(items)
# --- Pass 1: count unique areaperil_ids, unique (areaperil_id, vuln_id) pairs,
# and exact size of the aggregate-vuln output arrays ---
n_unique_areaperils = nb_int32(0)
n_unique_pairs = nb_int32(0)
n_agg_pair_groups = nb_int32(0) # number of unique (ap, agg_vuln_id) blocks
n_agg_entries_total = nb_int32(0) # total sub-vuln entries written across all blocks
prev_ap = areaperil_int.type(0)
prev_vuln = nb_int32(-1)
for j in range(N):
ap = items[j]['areaperil_id']
vuln = items[j]['vulnerability_id']
if ap != prev_ap or vuln != prev_vuln:
n_unique_pairs += 1
if ap != prev_ap:
n_unique_areaperils += 1
# If this pair's vuln_id is an aggregate, account for one block of size n_sub.
# Pass 2 repeats this lookup; the duplication is bounded by n_unique_pairs and
# avoids a separate pre-pass over items.
agg_idx = get_idx(agg_vuln_id_ja_id_ind, vuln)
if agg_idx != NOT_FOUND:
n_agg_pair_groups += 1
n_agg_entries_total += nb_int32(
agg_vuln_id_ja_offsets[agg_idx + 1] - agg_vuln_id_ja_offsets[agg_idx]
)
prev_ap = ap
prev_vuln = vuln
# --- Allocate jagged array structures ---
# L0: unique areaperil_ids (for id_index build, done after this function)
unique_areaperil_ids = np.empty(n_unique_areaperils, dtype=areaperil_int)
# L1: areaperil → vuln pair count (CSR offsets only — pair vuln_ids are
# not needed at runtime; consumers go through items[item_idx] for vuln_id)
areaperil_to_vuln_ja_offsets = np.empty(n_unique_areaperils + 1, dtype=oasis_int)
# L2: pair → item indices
areaperil_to_vuln_ja_vuln_ja_offsets = np.empty(n_unique_pairs + 1, dtype=oasis_int)
areaperil_to_vuln_ja_vuln_ja_item_idxs = np.empty(N, dtype=oasis_int)
# --- Hashmap for iterative vuln_id -> dense_index construction ---
max_unique_vulns = N + len(agg_vuln_id_ja_vuln_ids)
vuln_key_table = np.empty(max(max_unique_vulns, 1), dtype=np.int32)
vuln_table = init_dict(max_unique_vulns)
hm_info, hm_lookup, hm_index = unpack(vuln_table)
# --- Aggregate vulnerability jagged arrays (sized exactly from pass 1) ---
agg_alloc = max(n_agg_entries_total, 1) # 0 → 1 to keep array shapes valid
areaperil_agg_vuln_idx_ja_data = np.zeros(agg_alloc, dtype=agg_vuln_idx_weight_dtype)
areaperil_agg_vuln_idx_ja_areaperil_ids = np.empty(agg_alloc, dtype=areaperil_int)
areaperil_agg_vuln_idx_ja_offsets = np.empty(n_agg_pair_groups + 1, dtype=oasis_int)
ja_ptr = nb_int32(0)
n_agg_vuln_groups = nb_int32(0)
areaperil_agg_vuln_idx_ja_offsets[0] = 0
# --- Pass 2: build all structures ---
ap_idx = nb_int32(-1) # current areaperil index
pair_idx = nb_int32(-1) # current pair index
item_ptr = nb_int32(0) # pointer into item_idxs
prev_ap = areaperil_int.type(0)
prev_vuln = nb_int32(-1)
last_block_idx = nb_int32(-1)
areaperil_to_vuln_ja_vuln_ja_offsets[0] = 0
for j in range(N):
ap = items[j]['areaperil_id']
vuln = items[j]['vulnerability_id']
is_new_pair = (ap != prev_ap or vuln != prev_vuln)
if is_new_pair:
# Close previous pair's item range
if pair_idx >= 0:
areaperil_to_vuln_ja_vuln_ja_offsets[pair_idx + 1] = item_ptr
pair_idx += 1
if ap != prev_ap:
# Close previous areaperil's vuln range
if ap_idx >= 0:
areaperil_to_vuln_ja_offsets[ap_idx + 1] = pair_idx
ap_idx += 1
unique_areaperil_ids[ap_idx] = ap
areaperil_to_vuln_ja_offsets[ap_idx] = pair_idx
prev_ap = ap
prev_vuln = vuln
# Store item index
areaperil_to_vuln_ja_vuln_ja_item_idxs[item_ptr] = oasis_int.type(j)
item_ptr += 1
coverages[items[j]['coverage_id']]['max_items'] += 1
# --- Vulnerability hashmap + aggregate processing (unchanged logic) ---
agg_idx = get_idx(agg_vuln_id_ja_id_ind, vuln)
is_aggregate = agg_idx != NOT_FOUND
if is_aggregate:
if is_new_pair:
sub_start = agg_vuln_id_ja_offsets[agg_idx]
sub_end = agg_vuln_id_ja_offsets[agg_idx + 1]
n_sub = sub_end - sub_start
last_block_idx = n_agg_vuln_groups
for si in range(sub_start, sub_end):
k = si - sub_start
sub_vuln_id = np.int32(agg_vuln_id_ja_vuln_ids[si])
result = _try_add_key(hm_info, hm_lookup, hm_index, vuln_key_table, sub_vuln_id)
while result == i_add_key_fail:
vuln_table = rehash(vuln_table, vuln_key_table)
hm_info, hm_lookup, hm_index = unpack(vuln_table)
result = _try_add_key(hm_info, hm_lookup, hm_index, vuln_key_table, sub_vuln_id)
dense_idx = nb_oasis_int(hm_index[result & slot_mask])
areaperil_agg_vuln_idx_ja_data[ja_ptr + k]['vuln_idx'] = dense_idx
areaperil_agg_vuln_idx_ja_areaperil_ids[ja_ptr + k] = ap
ja_ptr += n_sub
n_agg_vuln_groups += 1
areaperil_agg_vuln_idx_ja_offsets[n_agg_vuln_groups] = ja_ptr
items[j]['areaperil_agg_vuln_idx'] = last_block_idx
else:
vuln_id_int32 = np.int32(vuln)
result = _try_add_key(hm_info, hm_lookup, hm_index, vuln_key_table, vuln_id_int32)
while result == i_add_key_fail:
vuln_table = rehash(vuln_table, vuln_key_table)
hm_info, hm_lookup, hm_index = unpack(vuln_table)
result = _try_add_key(hm_info, hm_lookup, hm_index, vuln_key_table, vuln_id_int32)
items[j]['vulnerability_idx'] = nb_oasis_int(hm_index[result & slot_mask])
# Close final pair and areaperil
if pair_idx >= 0:
areaperil_to_vuln_ja_vuln_ja_offsets[pair_idx + 1] = item_ptr
if ap_idx >= 0:
areaperil_to_vuln_ja_offsets[ap_idx + 1] = pair_idx + 1
return (unique_areaperil_ids, areaperil_to_vuln_ja_offsets,
areaperil_to_vuln_ja_vuln_ja_offsets,
areaperil_to_vuln_ja_vuln_ja_item_idxs,
vuln_table, vuln_key_table[:hm_info[HM_INFO_N_VALID]],
areaperil_agg_vuln_idx_ja_offsets[:n_agg_vuln_groups + 1],
areaperil_agg_vuln_idx_ja_data[:ja_ptr],
areaperil_agg_vuln_idx_ja_areaperil_ids[:ja_ptr])
@nb.njit(cache=True)
[docs]
def build_cdf_group_indices(vuln_ja_offsets, vuln_ja_item_idxs, items, dynamic_footprint):
"""Assign a sequential index to each unique CDF-producing group.
A CDF group is a set of items that share identical vulnerability CDFs. For non-dynamic
models, each (areaperil, vuln_id) pair — which corresponds to one position in the
item_map jagged array — gets a single index. For dynamic models, items within the same
pair may have different intensity_adjustment values that produce different CDFs, so each
unique adjustment gets its own sub-index.
Numba compiles two specializations based on whether dynamic_footprint is None or not.
Args:
vuln_ja_offsets (np.array[oasis_int]): L2 CSR offsets (N_pairs + 1).
vuln_ja_item_idxs (np.array[oasis_int]): flat item indices.
items (np.ndarray): items table (must have 'intensity_adjustment' for dynamic).
dynamic_footprint: None for static footprints, truthy for dynamic.
Returns:
item_cdf_group_idx (np.array[int64]): maps item_idx → CDF group index.
n_cdf_groups (int): total number of unique CDF groups.
"""
item_cdf_group_idx = np.empty(len(items), dtype=np.int64)
n_pairs = len(vuln_ja_offsets) - 1
cdf_group_cache_id = 0
if dynamic_footprint is None:
for k in range(n_pairs):
start = vuln_ja_offsets[k]
end = vuln_ja_offsets[k + 1]
for pos in range(start, end):
item_cdf_group_idx[vuln_ja_item_idxs[pos]] = cdf_group_cache_id
cdf_group_cache_id += 1
else:
adj_key_storage = np.empty(max(len(items), 1), dtype=oasis_int)
adj_cache_ids = np.empty(max(len(items), 1), dtype=np.int64)
for k in range(n_pairs):
start = vuln_ja_offsets[k]
end = vuln_ja_offsets[k + 1]
n_pair_items = end - start
# fresh hashmap per pair (maps intensity_adjustment → dense index)
adj_table = init_dict(max(n_pair_items, 1))
adj_info, adj_lookup, adj_index = unpack(adj_table)
for pos in range(start, end):
item_idx = vuln_ja_item_idxs[pos]
adj = items[item_idx]['intensity_adjustment']
result = _try_add_key(adj_info, adj_lookup, adj_index, adj_key_storage, adj)
while result == i_add_key_fail:
adj_table = rehash(adj_table, adj_key_storage)
adj_info, adj_lookup, adj_index = unpack(adj_table)
result = _try_add_key(adj_info, adj_lookup, adj_index, adj_key_storage, adj)
dense_idx = adj_index[result & slot_mask]
if result & new_slot_bit: # new unique adjustment
adj_cache_ids[dense_idx] = cdf_group_cache_id
cdf_group_cache_id += 1
item_cdf_group_idx[item_idx] = adj_cache_ids[dense_idx]
return item_cdf_group_idx, cdf_group_cache_id
[docs]
def get_peril_id(input_path):
"""
Get peril_id associated with item_id
Args:
input_path (str): The directory path where the 'gul_summary_map.csv' file is located.
Returns:
np.ndarray: A structured NumPy array with the following fields:
- 'item_id' (oasis_int): The item ID as an integer.
- 'peril_id' (oasis_int): The encoded peril ID as an integer.
"""
from oasislmf.pytools.common.data import load_as_ndarray
read_dtype = np.dtype([('item_id', oasis_int), ('peril_id', 'U3')])
raw = load_as_ndarray(input_path, 'gul_summary_map', read_dtype,
col_map={'item_id': 'item_id', 'peril_id': 'peril_id'})
result = np.empty(len(raw), dtype=np.dtype([('item_id', oasis_int), ('peril_id', oasis_int)]))
result['item_id'] = raw['item_id']
result['peril_id'] = np.vectorize(encode_peril_id)(raw['peril_id'])
return result