"""Pre-compute and persist getmodel (modelpy) read-only data structures.
Follows the same pattern as ``oasislmf.pytools.gulmc.structure``:
- ``create_getmodel_structure`` builds all read-only numpy arrays once and
saves them as ``.npy`` files.
- ``load_getmodel_structure`` memory-maps them via ``np.load(mmap_mode='r')``,
allowing multiple modelpy processes to share physical memory pages through
the OS page cache.
"""
import logging
import os
import numpy as np
import pandas as pd
from oasis_data_manager.filestore.config import get_storage_from_config_path
from oasislmf.pytools.getmodel.common import Keys
from oasislmf.pytools.getmodel.footprint import Footprint
from oasislmf.pytools.getmodel.manager import (
get_items, get_vulns, get_mean_damage_bins, convert_vuln_id_to_index,
)
[docs]
logger = logging.getLogger(__name__)
[docs]
STRUCTURE_DIR = 'getmodel_structure'
[docs]
ARRAY_FILES = [
'vuln_array',
'vulns_id',
'areaperil_id_ind',
'areaperil_to_vulns_idx_array',
'areaperil_to_vulns',
'unique_areaperil_ids',
'mean_damage_bins',
]
def _structure_path(run_dir):
return os.path.join(run_dir, 'input', STRUCTURE_DIR)
[docs]
def getmodel_structure_exists(run_dir):
"""Check whether pre-computed getmodel structures exist."""
return os.path.isfile(os.path.join(_structure_path(run_dir), 'metadata.npy'))
[docs]
def build_structures(run_dir, ignore_file_type, peril_filter,
model_df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader"):
"""Build all read-only getmodel data structures from input files.
This extracts the preparation logic from ``manager.run()`` into a
standalone callable so that it can be invoked once (by
``create_getmodel_structure``) rather than repeated in every parallel
modelpy process.
Args:
run_dir (str): path to the run directory.
ignore_file_type (set[str]): file extensions to ignore when loading.
peril_filter (list): list of perils to include (empty = all).
model_df_engine (str): engine for loading model dataframes.
Returns:
dict: mapping variable names to numpy arrays / scalars.
"""
model_storage = get_storage_from_config_path(
os.path.join(run_dir, 'model_storage.json'),
os.path.join(run_dir, 'static'),
)
input_path = os.path.join(run_dir, 'input')
ignore_file_type = set(ignore_file_type)
# --- keys / peril filter ---------------------------------------------------
if os.path.exists(os.path.join(input_path, 'keys.csv')):
keys_df = pd.read_csv(os.path.join(input_path, 'keys.csv'), dtype=Keys)
if peril_filter:
valid_area_peril_id = np.unique(
keys_df.loc[keys_df['PerilID'].isin(peril_filter), 'AreaPerilID'])
logger.debug(
f'Peril specific run: ({peril_filter}), '
f'{len(valid_area_peril_id)} AreaPerilID included out of {len(keys_df)}')
else:
valid_area_peril_id = keys_df['AreaPerilID']
else:
valid_area_peril_id = None
# --- items -----------------------------------------------------------------
logger.debug('import items')
vuln_map, vuln_map_keys, areaperil_id_ind, areaperil_to_vulns_idx_array, \
areaperil_to_vulns, unique_areaperil_ids = get_items(
input_path, ignore_file_type,
valid_area_peril_id if peril_filter else None)
# --- footprint (temporary open to get num_intensity_bins) ------------------
logger.debug('import footprint (header only)')
with Footprint.load(model_storage, ignore_file_type,
df_engine=model_df_engine,
areaperil_ids=list(unique_areaperil_ids)) as footprint_obj:
num_intensity_bins = footprint_obj.num_intensity_bins
# --- vulnerabilities -------------------------------------------------------
logger.debug('import vulnerabilities')
vuln_array, vulns_id, num_damage_bins = get_vulns(
model_storage, run_dir, vuln_map, vuln_map_keys,
num_intensity_bins, ignore_file_type, df_engine=model_df_engine)
# convert vulnerability IDs in areaperil_to_vulns to dense indices
convert_vuln_id_to_index(vuln_map, vuln_map_keys, areaperil_to_vulns)
# --- mean damage bins ------------------------------------------------------
logger.debug('import mean damage bins')
mean_damage_bins = get_mean_damage_bins(model_storage, ignore_file_type)
# --- pack everything into a dict -------------------------------------------
return {
'vuln_array': vuln_array,
'vulns_id': vulns_id,
'areaperil_id_ind': areaperil_id_ind,
'areaperil_to_vulns_idx_array': areaperil_to_vulns_idx_array,
'areaperil_to_vulns': areaperil_to_vulns,
'unique_areaperil_ids': unique_areaperil_ids,
'mean_damage_bins': mean_damage_bins,
# scalars
'num_damage_bins': num_damage_bins,
'num_intensity_bins': num_intensity_bins,
}
[docs]
def create_getmodel_structure(run_dir, ignore_file_type, peril_filter,
model_df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader"):
"""Build and save all read-only getmodel data structures as ``.npy`` files.
Args:
run_dir (str): path to the run directory.
ignore_file_type (set[str]): file extensions to ignore when loading.
peril_filter (list): list of perils to include (empty = all).
model_df_engine (str): engine for loading model dataframes.
"""
structures = build_structures(run_dir, ignore_file_type, peril_filter,
model_df_engine)
structure_path = _structure_path(run_dir)
os.makedirs(structure_path, exist_ok=True)
for name in ARRAY_FILES:
np.save(os.path.join(structure_path, name), structures[name])
# save scalar metadata
metadata = np.array([
structures['num_damage_bins'],
structures['num_intensity_bins'],
], dtype=np.int64)
np.save(os.path.join(structure_path, 'metadata'), metadata)
total_bytes = sum(
os.path.getsize(os.path.join(structure_path, f'{name}.npy'))
for name in ARRAY_FILES
)
logger.info(f"getmodel structures saved to {structure_path} ({total_bytes / 1024 / 1024:.1f} MB)")
[docs]
def load_getmodel_structure(run_dir):
"""Load pre-computed getmodel structures via memory-mapped numpy files.
Each array is loaded with ``mmap_mode='r'`` so that multiple modelpy
processes share physical memory pages through the OS page cache.
Args:
run_dir (str): path to the run directory.
Returns:
dict: mapping variable names to numpy arrays / scalars.
"""
structure_path = _structure_path(run_dir)
result = {}
for name in ARRAY_FILES:
result[name] = np.load(os.path.join(structure_path, f'{name}.npy'), mmap_mode='r')
metadata = np.load(os.path.join(structure_path, 'metadata.npy'))
result['num_damage_bins'] = int(metadata[0])
result['num_intensity_bins'] = int(metadata[1])
return result