Source code for oasislmf.pytools.gul.structure

"""Pre-compute and persist gulpy read-only data structures.

Follows the same pattern as ``oasislmf.pytools.gulmc.structure``:
  - ``create_gulpy_structure`` builds all read-only numpy arrays once and
    saves them as ``.npy`` files.
  - ``load_gulpy_structure`` memory-maps them via ``np.load(mmap_mode='r')``,
    allowing multiple gulpy processes to share physical memory pages through
    the OS page cache.
"""
import logging
import os

import numpy as np
from oasis_data_manager.filestore.config import get_storage_from_config_path
from oasislmf.pytools.common.data import areaperil_int, correlations_dtype, load_as_ndarray
from oasislmf.pytools.common.input_files import read_coverages, read_correlations
from oasislmf.pytools.getmodel.manager import get_damage_bins
from oasislmf.pytools.gul.common import coverage_type
from oasislmf.pytools.gul.manager import gul_get_items, generate_item_map
from oasislmf.pytools.gul.random import (
    compute_norm_cdf_lookup, compute_norm_inv_cdf_lookup,
    x_min, x_max, norm_inv_N, cdf_min, cdf_max,
)

[docs] logger = logging.getLogger(__name__)
[docs] STRUCTURE_DIR = 'gulpy_structure'
[docs] ARRAY_FILES = [ 'damage_bins', 'coverages', 'items', 'item_map_hm', 'item_map_hm_keys', 'item_map_ja_offsets', 'corr_data_by_item_id', 'unique_peril_correlation_groups', 'norm_inv_cdf', 'norm_cdf', ]
def _structure_path(run_dir): return os.path.join(run_dir, 'input', STRUCTURE_DIR)
[docs] def gulpy_structure_exists(run_dir): """Check whether pre-computed gulpy structures exist.""" return os.path.isfile(os.path.join(_structure_path(run_dir), 'metadata.npy'))
[docs] def build_structures(run_dir, ignore_file_type, peril_filter): """Build all read-only gulpy data structures from input files. This extracts the preparation logic from ``manager.run()`` into a standalone callable so that it can be invoked once (by ``create_gulpy_structure``) rather than repeated in every parallel gulpy process. Args: run_dir (str): path to the run directory. ignore_file_type (set[str]): file extensions to ignore when loading. peril_filter (list): list of perils to include (empty = all). Returns: dict: mapping variable names to numpy arrays / scalars. """ model_storage = get_storage_from_config_path( os.path.join(run_dir, 'model_storage.json'), os.path.join(run_dir, 'static'), ) input_path = os.path.join(run_dir, 'input') ignore_file_type = set(ignore_file_type) # --- damage bins ----------------------------------------------------------- logger.debug('import damage bins') damage_bins = get_damage_bins(model_storage, ignore_file_type) # --- coverages ------------------------------------------------------------- logger.debug('import coverages') coverages_tiv = read_coverages(input_path, ignore_file_type) coverages = np.zeros(coverages_tiv.shape[0] + 1, coverage_type) coverages[1:]['tiv'] = coverages_tiv # --- items + peril filter -------------------------------------------------- logger.debug('import items') if peril_filter: keys_dtype = np.dtype([('LocID', np.int32), ('PerilID', 'U3'), ('CoverageTypeID', np.int32), ('AreaPerilID', areaperil_int), ('VulnerabilityID', np.int32)]) keys_tb = load_as_ndarray(input_path, 'keys', keys_dtype) peril_set = set(peril_filter) mask = np.array([p in peril_set for p in keys_tb['PerilID']]) valid_area_peril_id = np.unique(keys_tb['AreaPerilID'][mask]) logger.debug( f'Peril specific run: ({peril_filter}), ' f'{len(valid_area_peril_id)} AreaPerilID included out of {len(keys_tb)}') else: valid_area_peril_id = None items = gul_get_items(input_path, ignore_file_type) items = np.sort(items, order=['areaperil_id', 'vulnerability_id']) if valid_area_peril_id is not None: items = items[np.isin(items['areaperil_id'], valid_area_peril_id)] # --- item map (hashmap + jagged array) ------------------------------------- logger.debug('generate item map') (item_map_hm, item_map_hm_keys, item_map_ja_offsets) = generate_item_map(items, coverages) # --- correlations ---------------------------------------------------------- logger.debug('import correlations') data = read_correlations(input_path, ignore_file_type, filename='correlations.bin') Nperil_correlation_groups = len(data) do_correlation = False if Nperil_correlation_groups > 0 and any(data['damage_correlation_value'] > 0): do_correlation = True if do_correlation: corr_data_by_item_id = np.ndarray(Nperil_correlation_groups + 1, dtype=correlations_dtype) corr_data_by_item_id[0] = (0, 0., 0., 0, 0.) corr_data_by_item_id[1:]['peril_correlation_group'] = data['peril_correlation_group'] corr_data_by_item_id[1:]['damage_correlation_value'] = data['damage_correlation_value'] unique_peril_correlation_groups = np.unique( corr_data_by_item_id[1:]['peril_correlation_group']) # pre-compute Gaussian lookup tables norm_inv_cdf = compute_norm_inv_cdf_lookup(x_min, x_max, norm_inv_N) norm_cdf = compute_norm_cdf_lookup(cdf_min, cdf_max, norm_inv_N) else: corr_data_by_item_id = np.ndarray(1, dtype=correlations_dtype) unique_peril_correlation_groups = np.empty(0, dtype='int64') norm_inv_cdf = np.zeros(1, dtype='float64') norm_cdf = np.zeros(1, dtype='float64') # --- pack everything into a dict ------------------------------------------- return { 'damage_bins': damage_bins, 'coverages': coverages, 'items': items, 'item_map_hm': item_map_hm, 'item_map_hm_keys': item_map_hm_keys, 'item_map_ja_offsets': item_map_ja_offsets, 'corr_data_by_item_id': corr_data_by_item_id, 'unique_peril_correlation_groups': unique_peril_correlation_groups, 'norm_inv_cdf': norm_inv_cdf, 'norm_cdf': norm_cdf, # scalars 'do_correlation': int(do_correlation), }
[docs] def create_gulpy_structure(run_dir, ignore_file_type, peril_filter): """Build and save all read-only gulpy data structures as ``.npy`` files. Args: run_dir (str): path to the run directory. ignore_file_type (set[str]): file extensions to ignore when loading. peril_filter (list): list of perils to include (empty = all). """ structures = build_structures(run_dir, ignore_file_type, peril_filter) structure_path = _structure_path(run_dir) os.makedirs(structure_path, exist_ok=True) for name in ARRAY_FILES: np.save(os.path.join(structure_path, name), structures[name]) # save scalar metadata metadata = np.array([ structures['do_correlation'], ], dtype=np.int64) np.save(os.path.join(structure_path, 'metadata'), metadata) total_bytes = sum( os.path.getsize(os.path.join(structure_path, f'{name}.npy')) for name in ARRAY_FILES ) logger.info(f"gulpy structures saved to {structure_path} ({total_bytes / 1024 / 1024:.1f} MB)")
[docs] def load_gulpy_structure(run_dir): """Load pre-computed gulpy structures via memory-mapped numpy files. Each array is loaded with ``mmap_mode='r'`` so that multiple gulpy processes share physical memory pages through the OS page cache. Args: run_dir (str): path to the run directory. Returns: dict: mapping variable names to numpy arrays / scalars. """ structure_path = _structure_path(run_dir) result = {} for name in ARRAY_FILES: result[name] = np.load(os.path.join(structure_path, f'{name}.npy'), mmap_mode='r') metadata = np.load(os.path.join(structure_path, 'metadata.npy')) result['do_correlation'] = int(metadata[0]) return result