Source code for oasislmf.pytools.converters.csvtobin.utils.vulnerability
import zlib
import numba as nb
import numpy as np
from oasislmf.pytools.common.data import resolve_file
from oasislmf.pytools.converters.csvtobin.utils.common import read_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.manager import VulnerabilityIndex_dtype, VulnerabilityRow_dtype
from oasislmf.utils.exceptions import OasisException
@nb.njit(cache=True, error_model="numpy")
def _validate(data):
if data.size == 0:
return
prev_vuln_id = data[0]["vulnerability_id"]
prev_int_id = data[0]["intensity_bin_id"]
prev_dmg_id = data[0]["damage_bin_id"]
prob_sum = data[0]["probability"]
# Check first damage_bin_id for each vuln_id/int_id starts at 1
if prev_dmg_id != 1:
raise OasisException(f"Error: First damage bin ID on line {0} is not 1 for vulnerability_id={prev_vuln_id}, intensity_bin_id={prev_int_id}.")
next_expected_dmg_id = 2 # This is 2 as the first damage_bin_id must be 1
for i in range(1, len(data)):
row = data[i]
v_id = row["vulnerability_id"]
int_id = row["intensity_bin_id"]
dmg_id = row["damage_bin_id"]
prob = row["probability"]
# Check vulnerability_id is non-decreasing
if v_id < prev_vuln_id:
raise OasisException(
f"Error: Vulnerability IDs {prev_vuln_id} and {v_id} at line {i} are not in ascending order."
)
if v_id == prev_vuln_id and int_id == prev_int_id:
# Check damage_bin_id is contiguous
if dmg_id != next_expected_dmg_id:
raise OasisException(f"Error: Non-contiguous damage bin IDs got {dmg_id}, expected {next_expected_dmg_id} on line {i}.")
prob_sum += prob
next_expected_dmg_id += 1
else:
# Check probabilities for each vuln_id/int_id group sum to 1
if not np.isclose(prob_sum, 1, atol=1e-6):
raise OasisException(
f"Error: Probabilities for vulnerability_id {prev_vuln_id} and intensity_bin_id {prev_int_id} do not sum to 1."
"total probability = {prob_sum}."
)
if v_id == prev_vuln_id:
# Check intensity_bin_id is contiguous
if int_id != prev_int_id + 1:
raise OasisException(
f"Error: Non contiguous intensity bin IDs, got {int_id}, expected {prev_int_id + 1} on line {i}."
)
# Check first damage_bin_id for each vuln_id/int_id starts at 1
if dmg_id != 1:
raise OasisException(
f"Error: First damage bin ID on line {i} is not 1 for vulnerability_id={v_id}, intensity_bin_id={int_id}."
)
# Reset state
prob_sum = prob
next_expected_dmg_id = 2
# Update prev ids
prev_vuln_id = v_id
prev_int_id = int_id
prev_dmg_id = dmg_id
# Check probabilities for each vuln_id/int_id group sum to 1
if not np.isclose(prob_sum, 1, atol=1e-6):
raise OasisException(
f"Error: Probabilities for vulnerability_id {prev_vuln_id} and intensity_bin_id {prev_int_id} do not sum to 1."
"total probability = {prob_sum}."
)
def _validate_int_bins(data):
from oasislmf.pytools.converters.csvtobin.manager import logger
vuln_ids = np.unique(data['vulnerability_id'])
int_ids_all = np.unique(data['intensity_bin_id'])
missing_count = 0
for v_id in vuln_ids:
mask = data["vulnerability_id"] == v_id
vuln_bins = np.unique(data["intensity_bin_id"][mask])
missing_bins = np.setdiff1d(int_ids_all, vuln_bins)
if missing_bins.size > 0:
missing_count += 1
logger.warning(
f"WARNING: vulnerability_id {v_id} is missing intensity_bin_ids: {missing_bins.tolist()}"
)
if missing_count > 0:
logger.warning("All intensity bins must be present for each vulnerability ID in single peril models.")
[docs]
def vulnerability_tobin(
stack,
file_in,
file_out,
file_type,
idx_file_out,
max_damage_bin_idx,
no_validation,
suppress_int_bin_checks,
zip_files
):
headers = TOOL_INFO[file_type]["headers"]
dtype = TOOL_INFO[file_type]["dtype"]
data = read_csv_as_ndarray(stack, file_in, headers, dtype)
if not no_validation:
_validate(data)
if not suppress_int_bin_checks:
_validate_int_bins(data)
# Write max_damage_bin to bin header
np.array([max_damage_bin_idx], dtype=np.int32).tofile(file_out)
if zip_files and idx_file_out is None:
raise OasisException(f"Error: Cannot write zip files without provided idx_file_out zip path, currently {idx_file_out}")
# Write straight to bin file as no idx file
if idx_file_out is None:
data.tofile(file_out)
return
idx_file_out = resolve_file(idx_file_out, "wb", stack)
unique_vulns = np.unique(data["vulnerability_id"])
offset = np.dtype(np.int32).itemsize # Header offset
for v_id in unique_vulns:
vuln_mask = data["vulnerability_id"] == v_id
vuln_data = data[vuln_mask]
bin_data = np.empty(len(vuln_data), dtype=VulnerabilityRow_dtype)
bin_data["intensity_bin_id"] = vuln_data["intensity_bin_id"]
bin_data["damage_bin_id"] = vuln_data["damage_bin_id"]
bin_data["probability"] = vuln_data["probability"]
if any(bin_data["damage_bin_id"] > max_damage_bin_idx):
raise OasisException(f"Error: Found damage_bin_id in data larger than max_damage_bin_idx: {max_damage_bin_idx}")
bin_data = bin_data.tobytes()
dsize = 0
if zip_files:
dsize = len(bin_data)
bin_data = zlib.compress(bin_data)
file_out.write(bin_data)
size = len(bin_data)
np.array([(v_id, offset, size, dsize)], dtype=VulnerabilityIndex_dtype).tofile(idx_file_out)
offset += size