"""
This file is the entry point for the python get model command for the package
TODO: use selector and select for output
"""
import atexit
import json
import logging
import os
import sys
from contextlib import ExitStack
import numba as nb
import numpy as np
import pandas as pd
from numba import int32 as nb_int32, float64 as nb_float64
from numba.typed import Dict
from oasis_data_manager.df_reader.config import get_df_reader, clean_config, InputReaderConfig
from oasis_data_manager.filestore.backends.base import BaseStorage
from oasis_data_manager.filestore.config import get_storage_from_config_path
from oasislmf.pytools.common.event_stream import PIPE_CAPACITY
from oasislmf.utils.data import validate_vulnerability_replacements
from oasislmf.pytools.data_layer.footprint_layer import FootprintLayerClient
from oasislmf.pytools.getmodel.common import (Index_type, Keys, areaperil_int,
oasis_float)
from oasislmf.pytools.getmodel.footprint import Footprint
from oasislmf.pytools.utils import redirect_logging
from ods_tools.oed import AnalysisSettingSchema
from .vulnerability import vulnerability_dataset, parquetvulnerability_meta_filename
from ..common.data import null_index
[docs]
logger = logging.getLogger(__name__)
[docs]
buff_size = PIPE_CAPACITY
[docs]
oasis_int_dtype = np.dtype('i4')
[docs]
oasis_int_size = np.int32().itemsize
[docs]
buff_int_size = buff_size // oasis_int_size
[docs]
areaperil_int_relative_size = areaperil_int.itemsize // oasis_int_size
[docs]
oasis_float_relative_size = oasis_float.itemsize // oasis_int_size
[docs]
results_relative_size = 2 * oasis_float_relative_size
[docs]
damagebindictionary = nb.from_dtype(np.dtype([('bin_index', np.int32),
('bin_from', oasis_float),
('bin_to', oasis_float),
('interpolation', oasis_float),
('interval_type', np.int32),
]))
[docs]
EventCSV = nb.from_dtype(np.dtype([('event_id', np.int32),
('areaperil_id', areaperil_int),
('intensity_bin_id', np.int32),
('probability', oasis_float)
]))
[docs]
Item = nb.from_dtype(np.dtype([('id', np.int32),
('coverage_id', np.int32),
('areaperil_id', areaperil_int),
('vulnerability_id', np.int32),
('group_id', np.int32)
]))
[docs]
Vulnerability = nb.from_dtype(np.dtype([('vulnerability_id', np.int32),
('intensity_bin_id', np.int32),
('damage_bin_id', np.int32),
('probability', oasis_float)
]))
[docs]
VulnerabilityIndex = nb.from_dtype(np.dtype([('vulnerability_id', np.int32),
('offset', np.int64),
('size', np.int64),
('original_size', np.int64)
]))
[docs]
VulnerabilityRow = nb.from_dtype(np.dtype([('intensity_bin_id', np.int32),
('damage_bin_id', np.int32),
('probability', oasis_float)
]))
@nb.njit(cache=True)
[docs]
def load_areaperil_id_u4(int32_mv, cursor, areaperil_id):
int32_mv[cursor] = areaperil_id.view('i4')
return cursor + 1
@nb.njit(cache=True)
[docs]
def load_areaperil_id_u8(int32_mv, cursor, areaperil_id):
int32_mv[cursor: cursor + 1] = areaperil_id.view('i4')
return cursor + 2
if areaperil_int == 'u4':
[docs]
load_areaperil_id = load_areaperil_id_u4
elif areaperil_int == 'u8':
load_areaperil_id = load_areaperil_id_u8
else:
raise Exception(f"AREAPERIL_TYPE {areaperil_int} is not implemented chose u4 or u8")
@nb.njit(cache=True)
[docs]
def load_items(items, valid_area_peril_id):
"""
Processes the Items loaded from the file extracting meta data around the vulnerability data.
Args:
items: (List[Item]) Data loaded from the vulnerability file
valid_area_peril_id: array of area_peril_id to be included (if none, all are included)
Returns: (Tuple[Dict[int, int], List[int], Dict[int, int], List[Tuple[int, int]], List[int]])
vulnerability dictionary, vulnerability IDs, areaperil to vulnerability index dictionary,
areaperil ID to vulnerability index array, areaperil ID to vulnerability array
"""
areaperil_to_vulns_size = 0
areaperil_dict = Dict()
vuln_dict = Dict()
vuln_idx = 0
for i in range(items.shape[0]):
item = items[i]
# filter areaperil_id
if valid_area_peril_id is not None and item['areaperil_id'] not in valid_area_peril_id:
continue
# insert the vulnerability index if not in there
if item['vulnerability_id'] not in vuln_dict:
vuln_dict[item['vulnerability_id']] = np.int32(vuln_idx)
vuln_idx += 1
# insert an area dictionary into areaperil_dict under the key of areaperil ID
if item['areaperil_id'] not in areaperil_dict:
area_vuln = Dict()
area_vuln[item['vulnerability_id']] = 0
areaperil_dict[item['areaperil_id']] = area_vuln
areaperil_to_vulns_size += 1
else:
if item['vulnerability_id'] not in areaperil_dict[item['areaperil_id']]:
areaperil_to_vulns_size += 1
areaperil_dict[item['areaperil_id']][item['vulnerability_id']] = 0
areaperil_to_vulns_idx_dict = Dict()
areaperil_to_vulns_idx_array = np.empty(len(areaperil_dict), dtype=Index_type)
areaperil_to_vulns = np.empty(areaperil_to_vulns_size, dtype=np.int32)
areaperil_i = 0
vulnerability_i = 0
for areaperil_id, vulns in areaperil_dict.items():
areaperil_to_vulns_idx_dict[areaperil_id] = areaperil_i
areaperil_to_vulns_idx_array[areaperil_i]['start'] = vulnerability_i
for vuln_id in sorted(vulns): # sorted is not necessary but doesn't impede the perf and align with cpp getmodel
areaperil_to_vulns[vulnerability_i] = vuln_id
vulnerability_i += 1
areaperil_to_vulns_idx_array[areaperil_i]['end'] = vulnerability_i
areaperil_i += 1
return vuln_dict, areaperil_to_vulns_idx_dict, areaperil_to_vulns_idx_array, areaperil_to_vulns
[docs]
def get_items(input_path, ignore_file_type=set(), valid_area_peril_id=None):
"""
Loads the items from the items file.
Args:
input_path: (str) the path pointing to the file
ignore_file_type: set(str) file extension to ignore when loading
Returns: (Tuple[Dict[int, int], List[int], Dict[int, int], List[Tuple[int, int]], List[int]])
vulnerability dictionary, vulnerability IDs, areaperil to vulnerability index dictionary,
areaperil ID to vulnerability index array, areaperil ID to vulnerability array
"""
input_files = set(os.listdir(input_path))
if "items.bin" in input_files and "bin" not in ignore_file_type:
logger.debug(f"loading {os.path.join(input_path, 'items.csv')}")
items = np.memmap(os.path.join(input_path, "items.bin"), dtype=Item, mode='r')
elif "items.csv" in input_files and "csv" not in ignore_file_type:
logger.debug(f"loading {os.path.join(input_path, 'items.csv')}")
items = np.loadtxt(os.path.join(input_path, "items.csv"), dtype=Item, delimiter=",", skiprows=1, ndmin=1)
else:
raise FileNotFoundError(f'items file not found at {input_path}')
return load_items(items, valid_area_peril_id)
[docs]
def get_intensity_bin_dict(input_path):
"""
Loads the intensity bin dictionary file and creates a dictionary to map intensities to bins
Used in the dynamic footprint generation as intensitys can be adjusted for defences at runtime
Args:
input_path: (str) the path pointing to the file
Returns: (Dict[int, int])
intensity bin dict, with intensity value and bin index
"""
input_files = set(os.listdir(input_path))
intensity_bin_dict = Dict.empty(nb_int32, nb_int32)
if "intensity_bin_dict.csv" in input_files:
logger.debug(f"loading {os.path.join(input_path, 'intensity_bin_dict.csv')}")
data = np.loadtxt(os.path.join(input_path, "intensity_bin_dict.csv"), dtype=np.int32, delimiter=",", skiprows=1, ndmin=1)
for d in data:
intensity_bin_dict[d[0]] = d[1]
else:
raise FileNotFoundError(f'intensity_bin_dict file not found at {input_path}')
return intensity_bin_dict
[docs]
def get_intensity_adjustment(input_path):
pass
@nb.njit(cache=True)
[docs]
def load_vuln_probability(vuln_array, vuln, vuln_id):
if vuln_array.shape[0] < vuln['damage_bin_id']:
raise Exception("vulnerability_id " + str(vuln_id) + " has damage_bin_id bigger that expected maximum")
if vuln['intensity_bin_id'] <= vuln_array.shape[1]: # intensity in vulnerability curve but not in the footprint, we can ignore
vuln_array[vuln['damage_bin_id'] - 1, vuln['intensity_bin_id'] - 1] = vuln['probability']
@nb.njit(cache=True)
[docs]
def load_vulns_bin_idx(vulns_bin, vulns_idx_bin, vuln_dict,
num_damage_bins, num_intensity_bins, rowsize):
"""
Loads the vulnerability binary index file.
Args:
vulns_bin: (List[VulnerabilityRow]) vulnerability data from the vulnerability file
vulns_idx_bin: (List[VulnerabilityIndex]) vulnerability index data from the vulnerability idx file
vuln_dict: (Dict[int, int]) maps the vulnerability ID with the index in the vulnerability array
num_damage_bins: (int) number of damage bins in the data
num_intensity_bins: (int) the number of intensity bins
Returns: (List[List[List[floats]]]) vulnerability data grouped by intensity bin and damage bin
"""
vuln_array = np.zeros((len(vuln_dict), num_damage_bins, num_intensity_bins), dtype=oasis_float)
vuln_ids = np.full(len(vuln_dict), null_index)
for idx_i in range(vulns_idx_bin.shape[0]):
vuln_idx = vulns_idx_bin[idx_i]
if vuln_idx['vulnerability_id'] in vuln_dict:
vuln_ids[vuln_dict[vuln_idx['vulnerability_id']]] = vuln_idx['vulnerability_id']
cur_vuln_array = vuln_array[vuln_dict[vuln_idx['vulnerability_id']]]
start = (vuln_idx['offset'] - vuln_offset) // rowsize
end = start + vuln_idx['size'] // rowsize
for vuln_i in range(start, end):
vuln = vulns_bin[vuln_i]
load_vuln_probability(cur_vuln_array, vuln, vuln_idx['vulnerability_id'])
return vuln_array, vuln_ids
@nb.njit(cache=True)
[docs]
def load_vulns_bin_idx_adjusted(vulns_bin, vulns_idx_bin, vuln_dict,
num_damage_bins, num_intensity_bins, rowsize, adj_vuln_data=None):
"""
Loads the vulnerability binary index file, prioritizing the data in the adjustments file over the data in the
vulnerability file.
Args:
vulns_bin: (List[VulnerabilityRow]) vulnerability data from the vulnerability file
vulns_idx_bin: (List[VulnerabilityIndex]) vulnerability index data from the vulnerability idx file
vuln_dict: (Dict[int, int]) maps the vulnerability ID with the index in the vulnerability array
num_damage_bins: (int) number of damage bins in the data
num_intensity_bins: (int) the number of intensity bins
adj_vuln_data: (List[Vulnerability]) vulnerability adjustment data, sorted by vuln_id
Returns: (List[List[List[floats]]]) vulnerability data grouped by intensity bin and damage bin
"""
vuln_array = np.zeros((len(vuln_dict), num_damage_bins, num_intensity_bins), dtype=oasis_float)
vuln_ids = np.full(len(vuln_dict), null_index)
adj_vuln_index = 0
for idx_i in range(vulns_idx_bin.shape[0]):
vuln_idx = vulns_idx_bin[idx_i]
vuln_id = vuln_idx['vulnerability_id']
# Check if current vulnerability id is in the adjustment data
while adj_vuln_data is not None and adj_vuln_index < len(adj_vuln_data) and adj_vuln_data[adj_vuln_index]['vulnerability_id'] < vuln_id:
adj_vuln_index += 1
if vuln_id in vuln_dict:
vuln_ids[vuln_dict[vuln_id]] = vuln_id
cur_vuln_array = vuln_array[vuln_dict[vuln_id]]
start = (vuln_idx['offset'] - vuln_offset) // rowsize
end = start + vuln_idx['size'] // rowsize
# Apply data from vulns_bin or adj_vuln_data
for vuln_i in range(start, end):
if (adj_vuln_data is not None and adj_vuln_index < len(adj_vuln_data) and adj_vuln_data[adj_vuln_index]['vulnerability_id'] == vuln_id):
load_vuln_probability(cur_vuln_array, adj_vuln_data[adj_vuln_index], vuln_id)
adj_vuln_index += 1
else:
load_vuln_probability(cur_vuln_array, vulns_bin[vuln_i], vuln_id)
# Add remaining adj_vuln_data
while adj_vuln_data is not None and adj_vuln_index < len(adj_vuln_data):
adj_vuln = adj_vuln_data[adj_vuln_index]
vuln_id = adj_vuln['vulnerability_id']
if vuln_id in vuln_dict:
load_vuln_probability(vuln_array[vuln_dict[vuln_id]], adj_vuln, vuln_id)
adj_vuln_index += 1
return vuln_array, vuln_ids
@nb.njit(cache=True)
[docs]
def load_vulns_bin(vulns_bin, vuln_dict, num_damage_bins, num_intensity_bins):
"""
Loads the vulnerability data grouped by the intensity and damage bins.
Args:
vuln_bin: (List[Vulnerability]) vulnerability data from the vulnerability file
vuln_dict: (Dict[int, int]) maps the vulnerability ID with the index in the vulnerability array
num_damage_bins: (int) number of damage bins in the data
num_intensity_bins: (int) the number of intensity bins
Returns: (List[List[List[floats]]]) vulnerability data grouped by intensity bin and damage bin
"""
vuln_array = np.zeros((len(vuln_dict), num_damage_bins, num_intensity_bins), dtype=oasis_float)
vuln_ids = np.full(len(vuln_dict), null_index)
cur_vulnerability_id = -1
for vuln_i in range(vulns_bin.shape[0]):
vuln = vulns_bin[vuln_i]
if vuln['vulnerability_id'] != cur_vulnerability_id:
if vuln['vulnerability_id'] in vuln_dict:
cur_vulnerability_id = vuln['vulnerability_id']
vuln_ids[vuln_dict[cur_vulnerability_id]] = cur_vulnerability_id
cur_vuln_array = vuln_array[vuln_dict[cur_vulnerability_id]]
else:
cur_vulnerability_id = -1
if cur_vulnerability_id != -1:
load_vuln_probability(cur_vuln_array, vuln, cur_vulnerability_id)
return vuln_array, vuln_ids
@nb.njit(cache=True)
[docs]
def load_vulns_bin_adjusted(vulns_bin, vuln_dict, num_damage_bins, num_intensity_bins, adj_vuln_data=None):
"""
Loads the vulnerability data grouped by the intensity and damage bins, prioritizing the data
in the adjustments file over the data in the vulnerability file.
Args:
vuln_bin: (List[Vulnerability]) vulnerability data from the vulnerability file
vuln_dict: (Dict[int, int]) maps the vulnerability ID with the index in the vulnerability array
num_damage_bins: (int) number of damage bins in the data
num_intensity_bins: (int) the number of intensity bins
adj_vuln_data: (List[Vulnerability]) vulnerability adjustment data, sorted by vuln_id
Returns: (List[List[List[floats]]]) vulnerability data grouped by intensity bin and damage bin
"""
vuln_array = np.zeros((len(vuln_dict), num_damage_bins, num_intensity_bins), dtype=oasis_float)
vuln_ids = np.full(len(vuln_dict), null_index)
ids_to_replace = set()
adj_vuln_index = 0
# Create list of ids to replace if adj_vuln_data is provided
if adj_vuln_data is not None:
for adj_vuln in adj_vuln_data:
ids_to_replace.add(adj_vuln['vulnerability_id'])
vuln_i = 0
while vuln_i < len(vulns_bin):
vuln = vulns_bin[vuln_i]
vuln_id = vuln['vulnerability_id']
if vuln_id in vuln_dict:
vuln_ids[vuln_dict[vuln_id]] = vuln_id
if vuln_id in ids_to_replace:
# Advance to current vuln_id
while adj_vuln_index < len(adj_vuln_data) and adj_vuln_data[adj_vuln_index]['vulnerability_id'] < vuln_id:
adj_vuln_index += 1
# process current vuln_id
while adj_vuln_index < len(adj_vuln_data) and adj_vuln_data[adj_vuln_index]['vulnerability_id'] == vuln_id:
load_vuln_probability(vuln_array[vuln_dict[vuln_id]], adj_vuln_data[adj_vuln_index], vuln_id)
adj_vuln_index += 1
# Skip remaining vulns_bin entries with the same vulnerability_id
while vuln_i < len(vulns_bin) and vulns_bin[vuln_i]['vulnerability_id'] == vuln_id:
vuln_i += 1
continue
else:
# Use data from vulns_bin
load_vuln_probability(vuln_array[vuln_dict[vuln_id]], vuln, vuln_id)
vuln_i += 1
return vuln_array, vuln_ids
@nb.njit()
[docs]
def update_vulns_dictionary(vuln_dict, vulns_id_array):
"""
Updates the indexes of the vulnerability IDs (usually used in loading vulnerability data from parquet file).
Args:
vuln_dict: (Dict[int, int]) vulnerability dict that maps the vulnerability IDs (key) with the index (value)
vulns_id_array: (List[int]) list of vulnerability IDs loaded from the parquet file
"""
for i in range(vulns_id_array.shape[0]):
vuln_dict[vulns_id_array[i]] = np.int32(i)
@nb.njit()
[docs]
def update_vuln_array_with_adj_data(vuln_array, vuln_dict, adj_vuln_data):
"""
Update the vulnerability array with adjustment data (used for parquet loading).
Args:
vuln_array: (3D array) The vulnerability data array.
vuln_dict: (Dict[int, int]) Maps vulnerability IDs to indices in vuln_array.
adj_vuln_data: (List[Vulnerability]) The vulnerability adjustment data.
Returns: (3D array) The updated vulnerability data array.
"""
for adj_vuln in adj_vuln_data:
vuln_id = adj_vuln['vulnerability_id']
if vuln_id in vuln_dict:
load_vuln_probability(vuln_array[vuln_dict[vuln_id]], adj_vuln, vuln_id)
return vuln_array
@nb.njit()
[docs]
def create_vulns_id(vuln_dict):
"""
Creates a vulnerability array where the index of the array correlates with the index of the vulnerability.
Args:
vuln_dict: (Dict) maps the vulnerability of the id (key) with the vulnerability ID (value)
Returns: (List[int]) list of vulnerability IDs
"""
vulns_id = np.empty(len(vuln_dict), dtype=np.int32)
for vuln_id, vuln_idx in vuln_dict.items():
vulns_id[vuln_idx] = vuln_id
return vulns_id
[docs]
def get_vuln_rngadj_dict(run_dir, vuln_dict):
"""
Loads vulnerability adjustments from the analysis settings file.
Args:
run_dir (str): path to the run directory (used to load the analysis settings)
Returns: (Dict[nb_int32, nb_float64]) vulnerability adjustments dictionary
"""
settings_path = os.path.join(run_dir, "analysis_settings.json")
vuln_adj_numba_dict = Dict.empty(key_type=nb_int32, value_type=nb_float64)
if not os.path.exists(settings_path):
logger.debug(f"analysis_settings.json not found in {run_dir}.")
return vuln_adj_numba_dict
vulnerability_adjustments_field = None
vulnerability_adjustments_field = AnalysisSettingSchema().get(settings_path, {}).get('vulnerability_adjustments', None)
if vulnerability_adjustments_field is not None:
adjustments = vulnerability_adjustments_field.get('adjustments', None)
else:
adjustments = None
if adjustments is None:
logger.debug(f"vulnerability_adjustments not found in {settings_path}.")
return vuln_adj_numba_dict
for key, value in adjustments.items():
if nb_int32(key) in vuln_dict.keys():
vuln_adj_numba_dict[nb_int32(key)] = nb_float64(value)
return vuln_adj_numba_dict
[docs]
def get_vulns(
storage: BaseStorage, run_dir, vuln_dict, num_intensity_bins, ignore_file_type=set(), df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader"):
"""
Loads the vulnerabilities from the file.
Args:
storage: (str) the storage manager for fetching model data
run_dir: (str) the path to the run folder (used to load the analysis settings)
vuln_dict: (Dict[int, int]) maps the vulnerability ID with the index in the vulnerability array
num_intensity_bins: (int) the number of intensity bins
ignore_file_type: set(str) file extension to ignore when loading
Returns: (Tuple[List[List[float]], int, np.array[int]) vulnerability data, vulnerabilities id, number of damage bins
"""
input_files = set(storage.listdir())
vuln_adj = get_vulnerability_replacements(run_dir, vuln_dict)
if vulnerability_dataset in input_files and "parquet" not in ignore_file_type:
source_url = storage.get_storage_url(vulnerability_dataset, encode_params=False)[1]
with storage.open(parquetvulnerability_meta_filename, 'r') as outfile:
meta_data = json.load(outfile)
logger.debug(f"loading {source_url}")
df_reader_config = clean_config(InputReaderConfig(filepath=vulnerability_dataset, engine=df_engine))
df_reader_config["engine"]["options"]["storage"] = storage
reader = get_df_reader(df_reader_config, filters=[[('vulnerability_id', '==', vuln_id)] for vuln_id in vuln_dict.keys()])
df = reader.as_pandas()
num_damage_bins = meta_data['num_damage_bins']
vuln_array = np.vstack(df['vuln_array'].to_numpy()).reshape(len(df['vuln_array']),
num_damage_bins,
num_intensity_bins)
vuln_ids = df['vulnerability_id'].to_numpy()
missing_vuln_ids = set(vuln_dict).difference(vuln_ids)
if missing_vuln_ids:
raise Exception(f"Vulnerability_ids {missing_vuln_ids} are missing"
f" from {source_url}")
update_vulns_dictionary(vuln_dict, vuln_ids)
# update vulnerability array with adjustment data if present
if vuln_adj is not None and len(vuln_adj) > 0:
vuln_array = update_vuln_array_with_adj_data(vuln_array, vuln_dict, vuln_adj)
else:
if "vulnerability.bin" in input_files and 'bin' not in ignore_file_type:
source_url = storage.get_storage_url('vulnerability.bin', encode_params=False)[1]
logger.debug(f"loading {source_url}")
with storage.open("vulnerability.bin", 'rb') as f:
header = np.frombuffer(f.read(8), 'i4')
num_damage_bins = header[0]
if "vulnerability.idx" in input_files and 'idx' not in ignore_file_type:
logger.debug(f"loading {storage.get_storage_url('vulnerability.idx', encode_params=False)[1]}")
with storage.open("vulnerability.bin") as f:
vulns_bin = np.memmap(f, dtype=VulnerabilityRow, offset=4, mode='r')
with storage.open("vulnerability.idx") as f:
vulns_idx_bin = np.memmap(f, dtype=VulnerabilityIndex, mode='r')
if vuln_adj is not None and len(vuln_adj) > 0:
vuln_array, valid_vuln_ids = load_vulns_bin_idx_adjusted(vulns_bin, vulns_idx_bin, vuln_dict,
num_damage_bins, num_intensity_bins, VulnerabilityRow.dtype.itemsize, vuln_adj)
else:
vuln_array, valid_vuln_ids = load_vulns_bin_idx(vulns_bin, vulns_idx_bin, vuln_dict,
num_damage_bins, num_intensity_bins, VulnerabilityRow.dtype.itemsize)
else:
with storage.with_fileno("vulnerability.bin") as f:
vulns_bin = np.memmap(f, dtype=Vulnerability, offset=4, mode='r')
if vuln_adj is not None and len(vuln_adj) > 0:
vuln_array, valid_vuln_ids = load_vulns_bin_adjusted(vulns_bin, vuln_dict, num_damage_bins, num_intensity_bins, vuln_adj)
else:
vuln_array, valid_vuln_ids = load_vulns_bin(vulns_bin, vuln_dict, num_damage_bins, num_intensity_bins)
elif "vulnerability.csv" in input_files and "csv" not in ignore_file_type:
source_url = storage.get_storage_url('vulnerability.csv', encode_params=False)[1]
logger.debug(f"loading {source_url}")
with storage.open("vulnerability.csv") as f:
vuln_csv = np.loadtxt(f, dtype=Vulnerability, delimiter=",", skiprows=1, ndmin=1)
num_damage_bins = max(vuln_csv['damage_bin_id'])
if vuln_adj is not None and len(vuln_adj) > 0:
vuln_array, valid_vuln_ids = load_vulns_bin_adjusted(vuln_csv, vuln_dict, num_damage_bins, num_intensity_bins, vuln_adj)
else:
vuln_array, valid_vuln_ids = load_vulns_bin(vuln_csv, vuln_dict, num_damage_bins, num_intensity_bins)
else:
raise FileNotFoundError(f"vulnerability file not found at {storage.get_storage_url('', encode_params=False)[1]}")
missing_vuln_ids = set(vuln_dict).difference(valid_vuln_ids)
if missing_vuln_ids:
raise Exception(f"Vulnerability_ids {missing_vuln_ids} are missing"
f" from {source_url}")
vuln_ids = create_vulns_id(vuln_dict)
return vuln_array, vuln_ids, num_damage_bins
[docs]
def get_vulnerability_replacements(run_dir, vuln_dict):
"""
Loads the vulnerability adjustment file.
Args:
path: (str) the path pointing to the run directory
vuln_dict: (Dict[int, int]) list of vulnerability IDs
Returns: (List[Vulnerability]) vulnerability replacement data
"""
settings_path = os.path.join(run_dir, "analysis_settings.json")
if not os.path.exists(settings_path):
logger.warning(f"analysis_settings.json not found in {run_dir}.")
return None
if not validate_vulnerability_replacements(settings_path):
return None
vulnerability_replacements_key = None
vulnerability_replacements_key = AnalysisSettingSchema().get(settings_path, {}).get('vulnerability_adjustments')
# Inputting the data directly into the analysis_settings.json file takes precedence over the file path
vulnerability_replacements_field = vulnerability_replacements_key.get('replace_data', None)
if vulnerability_replacements_field is None:
vulnerability_replacements_field = vulnerability_replacements_key.get('replace_file', None)
if isinstance(vulnerability_replacements_field, dict):
# Convert dict to flat array equivalent to csv format
flat_data = []
for v_id, adjustments in vulnerability_replacements_field.items():
for adj in adjustments:
flat_data.append((v_id, *adj))
vuln_adj = np.array(flat_data, dtype=Vulnerability)
elif isinstance(vulnerability_replacements_field, str):
# Load csv file
absolute_path = os.path.abspath(vulnerability_replacements_field)
logger.debug(f"loading {absolute_path}")
vuln_adj = np.loadtxt(absolute_path, dtype=Vulnerability, delimiter=",", skiprows=1, ndmin=1)
vuln_adj = np.array([adj_vuln for adj_vuln in vuln_adj if adj_vuln['vulnerability_id'] in vuln_dict],
dtype=vuln_adj.dtype)
vuln_adj.sort(order='vulnerability_id')
logger.info("Vulnerability adjustments found in analysis settings.")
return vuln_adj
[docs]
def get_mean_damage_bins(storage: BaseStorage, ignore_file_type=set()):
"""
Loads the mean damage bins from the damage_bin_dict file, namely, the `interpolation` value for each bin.
Args:
storage: (BaseStorage) the storage connector for fetching the model data
ignore_file_type: set(str) file extension to ignore when loading
Returns: (List[Union[damagebindictionary]]) loaded data from the damage_bin_dict file
"""
return get_damage_bins(storage, ignore_file_type)['interpolation']
[docs]
def get_damage_bins(storage: BaseStorage, ignore_file_type=set()):
"""
Loads the damage bins from the damage_bin_dict file.
Args:
storage: (BaseStorage) the storage connector for fetching the model data
ignore_file_type: set(str) file extension to ignore when loading
Returns: (List[Union[damagebindictionary]]) loaded data from the damage_bin_dict file
"""
input_files = set(storage.listdir())
if "damage_bin_dict.bin" in input_files and 'bin' not in ignore_file_type:
logger.debug(f"loading {storage.get_storage_url('damage_bin_dict.bin', encode_params=False)[1]}")
with storage.with_fileno("damage_bin_dict.bin") as f:
return np.fromfile(f, dtype=damagebindictionary)
elif "damage_bin_dict.csv" in input_files and 'csv' not in ignore_file_type:
logger.debug(f"loading {storage.get_storage_url('damage_bin_dict.csv', encode_params=False)[1]}")
with storage.open("damage_bin_dict.csv") as f:
return np.loadtxt(f, dtype=damagebindictionary, skiprows=1, delimiter=',', ndmin=1)
else:
raise FileNotFoundError(f"damage_bin_dict file not found at {storage.get_storage_url('', encode_params=False)[1]}")
@nb.njit(cache=True, fastmath=True)
[docs]
def damage_bin_prob(p, intensities_min, intensities_max, vulns, intensities):
"""
Calculate the probability of an event happening and then causing damage.
Note: vulns is a 1-d array containing 1 damage bin of the damage probability distribution as a
function of hazard intensity.
Args:
p: (float) the probability to be updated
intensities_min: (int) minimum intensity bin id
intensities_max: (int) maximum intensity bin id
vulns: (List[float]) slice of damage probability distribution given hazard intensity
intensities: (List[float]) intensity probability distribution
Returns: (float) the updated probability
"""
i = intensities_min
while i < intensities_max:
p += vulns[i] * intensities[i]
i += 1
return p
@nb.njit(cache=True, fastmath=True)
[docs]
def do_result(vulns_id, vuln_array, mean_damage_bins,
int32_mv, num_damage_bins,
intensities_min, intensities_max, intensities,
event_id, areaperil_id, vuln_i, cursor):
"""
Calculate the result concerning an event ID.
Args:
vulns_id: (List[int]) list of vulnerability IDs
vuln_array: (List[List[list]]) list of vulnerabilities and their data
mean_damage_bins: (List[float]) the mean of each damage bin (len(mean_damage_bins) == num_damage_bins)
int32_mv: (List[int]) FILL IN LATER
num_damage_bins: (int) number of damage bins in the data
intensities_min: (int) minimum intensity bin id
intensities_max: (int) maximum intensity bin id
intensities: (List[float]) intensity probability distribution
event_id: (int) the event ID that concerns the result being calculated
areaperil_id: (List[int]) the areaperil ID that concerns the result being calculated
vuln_i: (int) the index concerning the vulnerability inside the vuln_array
cursor: (int) PLEASE FILL IN
Returns: (int) PLEASE FILL IN
"""
int32_mv[cursor], cursor = event_id, cursor + 1
int32_mv[cursor:cursor + areaperil_int_relative_size] = areaperil_id.view(oasis_int_dtype)
cursor += areaperil_int_relative_size
int32_mv[cursor], cursor = vulns_id[vuln_i], cursor + 1
cur_vuln_mat = vuln_array[vuln_i]
p = 0
cursor_start = cursor
cursor += 1
oasis_float_mv = int32_mv[cursor: cursor + num_damage_bins * results_relative_size].view(oasis_float)
result_cursor = 0
damage_bin_i = 0
while damage_bin_i < num_damage_bins:
p = damage_bin_prob(p, intensities_min, intensities_max, cur_vuln_mat[damage_bin_i], intensities)
oasis_float_mv[result_cursor], result_cursor = p, result_cursor + 1
oasis_float_mv[result_cursor], result_cursor = mean_damage_bins[damage_bin_i], result_cursor + 1
damage_bin_i += 1
if p > 0.999999940:
break
int32_mv[cursor_start] = damage_bin_i
return cursor + (result_cursor * oasis_float_relative_size)
@nb.njit()
[docs]
def doCdf(event_id,
num_intensity_bins, footprint,
areaperil_to_vulns_idx_dict, areaperil_to_vulns_idx_array, areaperil_to_vulns,
vuln_array, vulns_id, num_damage_bins, mean_damage_bins,
int32_mv, max_result_relative_size):
"""
Calculates the cumulative distribution function (cdf) for an event ID.
Args:
event_id: (int) the event ID the the CDF is being calculated to.
num_intensity_bins: (int) the number of intensity bins for the CDF
footprint: (List[Tuple[int, int, float]]) information about the footprint with event_id, areaperil_id,
probability
areaperil_to_vulns_idx_dict: (Dict[int, int]) maps the areaperil ID with the ENTER_HERE
areaperil_to_vulns_idx_array: (List[Tuple[int, int]]) the index where the areaperil ID starts and finishes
areaperil_to_vulns: (List[int]) maps the areaperil ID to the vulnerability ID
vuln_array: (List[list]) FILL IN LATER
vulns_id: (List[int]) list of vulnerability IDs
num_damage_bins: (int) number of damage bins in the data
mean_damage_bins: (List[float]) the mean of each damage bin (len(mean_damage_bins) == num_damage_bins)
int32_mv: (List[int]) FILL IN LATER
max_result_relative_size: (int) the maximum result size
Returns: (int)
"""
if not footprint.shape[0]:
return 0
intensities_min = num_intensity_bins
intensities_max = 0
intensities = np.zeros(num_intensity_bins, dtype=oasis_float)
areaperil_id = np.zeros(1, dtype=areaperil_int)
has_vuln = False
cursor = 0
for footprint_i in range(footprint.shape[0]):
event_row = footprint[footprint_i]
if areaperil_id[0] != event_row['areaperil_id']:
if has_vuln and intensities_min <= intensities_max:
areaperil_to_vulns_idx = areaperil_to_vulns_idx_array[areaperil_to_vulns_idx_dict[areaperil_id[0]]]
intensities_max += 1
for vuln_idx in range(areaperil_to_vulns_idx['start'], areaperil_to_vulns_idx['end']):
vuln_i = areaperil_to_vulns[vuln_idx]
if cursor + max_result_relative_size > buff_int_size:
yield cursor * oasis_int_size
cursor = 0
cursor = do_result(vulns_id, vuln_array, mean_damage_bins,
int32_mv, num_damage_bins,
intensities_min, intensities_max, intensities,
event_id, areaperil_id, vuln_i, cursor)
areaperil_id[0] = event_row['areaperil_id']
has_vuln = areaperil_id[0] in areaperil_to_vulns_idx_dict
if has_vuln:
intensities[intensities_min: intensities_max] = 0
intensities_min = num_intensity_bins
intensities_max = 0
if has_vuln:
if event_row['probability'] > 0:
intensity_bin_i = event_row['intensity_bin_id'] - 1
intensities[intensity_bin_i] = event_row['probability']
if intensity_bin_i > intensities_max:
intensities_max = intensity_bin_i
if intensity_bin_i < intensities_min:
intensities_min = intensity_bin_i
if has_vuln and intensities_min <= intensities_max:
areaperil_to_vulns_idx = areaperil_to_vulns_idx_array[areaperil_to_vulns_idx_dict[areaperil_id[0]]]
intensities_max += 1
for vuln_idx in range(areaperil_to_vulns_idx['start'], areaperil_to_vulns_idx['end']):
vuln_i = areaperil_to_vulns[vuln_idx]
if cursor + max_result_relative_size > buff_int_size:
yield cursor * oasis_int_size
cursor = 0
cursor = do_result(vulns_id, vuln_array, mean_damage_bins,
int32_mv, num_damage_bins,
intensities_min, intensities_max, intensities,
event_id, areaperil_id, vuln_i, cursor)
yield cursor * oasis_int_size
@nb.njit()
[docs]
def convert_vuln_id_to_index(vuln_dict, areaperil_to_vulns):
for i in range(areaperil_to_vulns.shape[0]):
areaperil_to_vulns[i] = vuln_dict[areaperil_to_vulns[i]]
@redirect_logging(exec_name='modelpy')
[docs]
def run(
run_dir,
file_in,
file_out,
ignore_file_type,
data_server,
peril_filter,
df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader",
):
"""
Runs the main process of the getmodel process.
Args:
run_dir: (str) the directory of where the process is running
file_in: (Optional[str]) the path to the input directory
file_out: (Optional[str]) the path to the output directory
ignore_file_type: set(str) file extension to ignore when loading
data_server: (bool) if set to True runs the data server
peril_filter (list[int]): list of perils to include in the computation (if None, all perils will be included).
df_engine: (str) The engine to use when loading dataframes
Returns: None
"""
model_storage = get_storage_from_config_path(
os.path.join(run_dir, 'model_storage.json'),
os.path.join(run_dir, 'static'),
)
input_path = os.path.join(run_dir, 'input')
ignore_file_type = set(ignore_file_type)
if data_server:
logger.debug("data server active")
FootprintLayerClient.register()
logger.debug("registered with data server")
atexit.register(FootprintLayerClient.unregister)
else:
logger.debug("data server not active")
with ExitStack() as stack:
if file_in is None:
streams_in = sys.stdin.buffer
else:
streams_in = stack.enter_context(open(file_in, 'rb'))
if file_out is None:
stream_out = sys.stdout.buffer
else:
stream_out = stack.enter_context(open(file_out, 'wb'))
event_id_mv = memoryview(bytearray(4))
event_ids = np.ndarray(1, buffer=event_id_mv, dtype='i4')
# load keys.csv to determine included AreaPerilID from peril_filter
if peril_filter:
keys_df = pd.read_csv(os.path.join(input_path, 'keys.csv'), dtype=Keys)
valid_area_peril_id = keys_df.loc[keys_df['PerilID'].isin(peril_filter), 'AreaPerilID'].to_numpy()
logger.debug(
f'Peril specific run: ({peril_filter}), {len(valid_area_peril_id)} AreaPerilID included out of {len(keys_df)}')
else:
valid_area_peril_id = None
logger.debug('init items')
vuln_dict, areaperil_to_vulns_idx_dict, areaperil_to_vulns_idx_array, areaperil_to_vulns = get_items(
input_path, ignore_file_type, valid_area_peril_id)
logger.debug('init footprint')
footprint_obj = stack.enter_context(Footprint.load(model_storage, ignore_file_type, df_engine=df_engine))
if data_server:
num_intensity_bins: int = FootprintLayerClient.get_number_of_intensity_bins()
logger.info(f"got {num_intensity_bins} intensity bins from server")
else:
num_intensity_bins: int = footprint_obj.num_intensity_bins
logger.debug('init vulnerability')
vuln_array, vulns_id, num_damage_bins = get_vulns(model_storage, run_dir, vuln_dict,
num_intensity_bins, ignore_file_type, df_engine=df_engine)
convert_vuln_id_to_index(vuln_dict, areaperil_to_vulns)
logger.debug('init mean_damage_bins')
mean_damage_bins = get_mean_damage_bins(model_storage, ignore_file_type)
# even_id, areaperil_id, vulnerability_id, num_result, [oasis_float] * num_result
max_result_relative_size = 1 + + areaperil_int_relative_size + 1 + 1 + num_damage_bins * results_relative_size
mv = memoryview(bytearray(buff_size))
int32_mv = np.ndarray(buff_size // np.int32().itemsize, buffer=mv, dtype=np.int32)
# header
stream_out.write(np.uint32(1).tobytes())
logger.debug('doCdf starting')
while True:
len_read = streams_in.readinto(event_id_mv)
if len_read == 0:
break
# get the next event_id from the input stream
event_id = event_ids[0]
if data_server:
event_footprint = FootprintLayerClient.get_event(event_id)
else:
event_footprint = footprint_obj.get_event(event_id)
if event_footprint is not None:
# compute effective damageability probability distribution
# stream out: event_id, areaperil_id, number of damage bins, effecive damageability cdf bins (bin_mean and prob_to)
for cursor_bytes in doCdf(event_id,
num_intensity_bins, event_footprint,
areaperil_to_vulns_idx_dict, areaperil_to_vulns_idx_array, areaperil_to_vulns,
vuln_array, vulns_id, num_damage_bins, mean_damage_bins,
int32_mv, max_result_relative_size):
if cursor_bytes:
stream_out.write(mv[:cursor_bytes])
else:
break