Source code for oasislmf.pytools.converters.csvtobin.utils.vulnerability

import zlib
import numba as nb
import numpy as np
from oasislmf.pytools.common.data import resolve_file
from oasislmf.pytools.converters.csvtobin.utils.common import read_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.manager import VulnerabilityIndex_dtype, VulnerabilityRow_dtype
from oasislmf.utils.exceptions import OasisException


@nb.njit(cache=True, error_model="numpy")
def _validate(data):
    if data.size == 0:
        return

    prev_vuln_id = data[0]["vulnerability_id"]
    prev_int_id = data[0]["intensity_bin_id"]
    prev_dmg_id = data[0]["damage_bin_id"]
    prob_sum = data[0]["probability"]

    # Check first damage_bin_id for each vuln_id/int_id starts at 1
    if prev_dmg_id != 1:
        raise OasisException(f"Error: First damage bin ID on line {0} is not 1 for vulnerability_id={prev_vuln_id}, intensity_bin_id={prev_int_id}.")

    next_expected_dmg_id = 2  # This is 2 as the first damage_bin_id must be 1

    for i in range(1, len(data)):
        row = data[i]
        v_id = row["vulnerability_id"]
        int_id = row["intensity_bin_id"]
        dmg_id = row["damage_bin_id"]
        prob = row["probability"]

        # Check vulnerability_id is non-decreasing
        if v_id < prev_vuln_id:
            raise OasisException(
                f"Error: Vulnerability IDs {prev_vuln_id} and {v_id} at line {i} are not in ascending order."
            )

        if v_id == prev_vuln_id and int_id == prev_int_id:
            # Check damage_bin_id is contiguous
            if dmg_id != next_expected_dmg_id:
                raise OasisException(f"Error: Non-contiguous damage bin IDs got {dmg_id}, expected {next_expected_dmg_id} on line {i}.")
            prob_sum += prob
            next_expected_dmg_id += 1
        else:
            # Check probabilities for each vuln_id/int_id group sum to 1
            if not np.isclose(prob_sum, 1, atol=1e-6):
                raise OasisException(
                    f"Error: Probabilities for vulnerability_id {prev_vuln_id} and intensity_bin_id {prev_int_id} do not sum to 1."
                    "total probability = {prob_sum}."
                )

            if v_id == prev_vuln_id:
                # Check intensity_bin_id is contiguous
                if int_id != prev_int_id + 1:
                    raise OasisException(
                        f"Error: Non contiguous intensity bin IDs, got {int_id}, expected {prev_int_id + 1} on line {i}."
                    )

            # Check first damage_bin_id for each vuln_id/int_id starts at 1
            if dmg_id != 1:
                raise OasisException(
                    f"Error: First damage bin ID on line {i} is not 1 for vulnerability_id={v_id}, intensity_bin_id={int_id}."
                )

            # Reset state
            prob_sum = prob
            next_expected_dmg_id = 2

        # Update prev ids
        prev_vuln_id = v_id
        prev_int_id = int_id
        prev_dmg_id = dmg_id

    # Check probabilities for each vuln_id/int_id group sum to 1
    if not np.isclose(prob_sum, 1, atol=1e-6):
        raise OasisException(
            f"Error: Probabilities for vulnerability_id {prev_vuln_id} and intensity_bin_id {prev_int_id} do not sum to 1."
            "total probability = {prob_sum}."
        )


def _validate_int_bins(data):
    from oasislmf.pytools.converters.csvtobin.manager import logger

    vuln_ids = np.unique(data['vulnerability_id'])
    int_ids_all = np.unique(data['intensity_bin_id'])

    missing_count = 0
    for v_id in vuln_ids:
        mask = data["vulnerability_id"] == v_id
        vuln_bins = np.unique(data["intensity_bin_id"][mask])
        missing_bins = np.setdiff1d(int_ids_all, vuln_bins)
        if missing_bins.size > 0:
            missing_count += 1
            logger.warning(
                f"WARNING: vulnerability_id {v_id} is missing intensity_bin_ids: {missing_bins.tolist()}"
            )
    if missing_count > 0:
        logger.warning("All intensity bins must be present for each vulnerability ID in single peril models.")


[docs] def vulnerability_tobin( stack, file_in, file_out, file_type, idx_file_out, max_damage_bin_idx, no_validation, suppress_int_bin_checks, zip_files ): headers = TOOL_INFO[file_type]["headers"] dtype = TOOL_INFO[file_type]["dtype"] data = read_csv_as_ndarray(stack, file_in, headers, dtype) if not no_validation: _validate(data) if not suppress_int_bin_checks: _validate_int_bins(data) # Write max_damage_bin to bin header np.array([max_damage_bin_idx], dtype=np.int32).tofile(file_out) if zip_files and idx_file_out is None: raise OasisException(f"Error: Cannot write zip files without provided idx_file_out zip path, currently {idx_file_out}") # Write straight to bin file as no idx file if idx_file_out is None: data.tofile(file_out) return idx_file_out = resolve_file(idx_file_out, "wb", stack) unique_vulns = np.unique(data["vulnerability_id"]) offset = np.dtype(np.int32).itemsize # Header offset for v_id in unique_vulns: vuln_mask = data["vulnerability_id"] == v_id vuln_data = data[vuln_mask] bin_data = np.empty(len(vuln_data), dtype=VulnerabilityRow_dtype) bin_data["intensity_bin_id"] = vuln_data["intensity_bin_id"] bin_data["damage_bin_id"] = vuln_data["damage_bin_id"] bin_data["probability"] = vuln_data["probability"] if any(bin_data["damage_bin_id"] > max_damage_bin_idx): raise OasisException(f"Error: Found damage_bin_id in data larger than max_damage_bin_idx: {max_damage_bin_idx}") bin_data = bin_data.tobytes() dsize = 0 if zip_files: dsize = len(bin_data) bin_data = zlib.compress(bin_data) file_out.write(bin_data) size = len(bin_data) np.array([(v_id, offset, size, dsize)], dtype=VulnerabilityIndex_dtype).tofile(idx_file_out) offset += size