Source code for oasislmf.pytools.converters.csvtobin.utils.vulnerability

# Vulnerability CSV → binary converter.
#
# The CSV is read in fixed-size chunks (via iter_csv_as_ndarray) so memory usage is O(chunk)
# regardless of file size. Two write paths exist depending on whether an index file is provided:
#
#   No-idx path (idx_file_out=None):
#     Each chunk is written directly as raw bytes (chunk.tofile). An optional rolling
#     intensity-bin completeness check compares each vulnerability against the previous one
#     in O(k) memory (k = number of distinct intensity bins), avoiding the O(V × k) dict
#     that would accumulate all vulnerabilities.
#
#   Idx path (idx_file_out provided):
#     Rows are buffered per vulnerability_id using a partial-vuln buffer. When the
#     vulnerability_id changes, all complete vulnerabilities in the chunk are flushed via
#     _flush_vuln (field copy → VulnerabilityRow_dtype, optional zlib compress, write, emit
#     one index entry). The last group in each chunk may be incomplete and is held in
#     partial_chunks until the next chunk arrives. Boundary detection uses a single np.diff
#     pass producing start/end slices in one vectorised step.
#
#   Validation (unless no_validation=True):
#     Streaming vectorised checks run over each chunk and carry scalar state across chunk
#     boundaries via (prev_vuln, prev_int, prev_dmg, prob_sum):
#       - vulnerability_id non-decreasing (np.diff < 0 check)
#       - damage_bin_id contiguous within each (vulnerability_id, intensity_bin_id) group
#       - damage_bin_id starts at 1 for each new group
#       - intensity_bin_id contiguous within each vulnerability_id
#       - probability sums to 1.0 per group (np.add.reduceat for complete groups)
#     The final open group's probability sum is checked after the last chunk.
#
# no_validation=True skips all validation and writes data in whatever order it appears in
# the CSV — the caller is responsible for correct sort order on the idx path.

import zlib
import numpy as np
from oasislmf.pytools.common.data import resolve_file
from oasislmf.pytools.converters.csvtobin.utils.common import iter_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.manager import VulnerabilityIndex_dtype, VulnerabilityRow_dtype
from oasislmf.utils.exceptions import OasisException


def _validate_chunk(chunk, state, first_chunk, row_offset):
    """Streaming vectorized validation. Returns updated state = (prev_vuln, prev_int, prev_dmg, prob_sum)."""
    if len(chunk) == 0:
        return state

    prev_vuln, prev_int, prev_dmg, prob_sum = state
    n = len(chunk)
    vuln = chunk['vulnerability_id']
    intb = chunk['intensity_bin_id']
    dmgb = chunk['damage_bin_id']
    prob = chunk['probability']

    # --- Row 0: cross-chunk boundary or first row ever ---
    if first_chunk:
        prob_sum = float(prob[0])
    else:
        if vuln[0] < prev_vuln:
            raise OasisException(
                f"Error: Vulnerability IDs {prev_vuln} and {vuln[0]} at row {row_offset} are not in ascending order."
            )
        new_group = (vuln[0] != prev_vuln) or (intb[0] != prev_int)
        if new_group:
            if not np.isclose(prob_sum, 1.0, atol=1e-6):
                raise OasisException(
                    f"Error: Probabilities for vulnerability_id {prev_vuln} and intensity_bin_id {prev_int} do not sum to 1. "
                    f"total probability = {prob_sum}."
                )
            prob_sum = float(prob[0])
        else:
            prob_sum += float(prob[0])

    if n == 1:
        return int(vuln[0]), int(intb[0]), int(dmgb[0]), prob_sum

    # --- Vectorized checks for rows 1..n-1 ---
    dv = np.diff(vuln)
    di = np.diff(intb)

    bad = np.flatnonzero(dv < 0)
    if bad.size:
        i = int(bad[0]) + 1
        raise OasisException(
            f"Error: Vulnerability IDs {vuln[i - 1]} and {vuln[i]} at row {row_offset + i} are not in ascending order."
        )

    group_ends = np.union1d(np.flatnonzero(dv != 0), np.flatnonzero(di != 0))
    new_group_rows = group_ends + 1

    # Probability sums
    if new_group_rows.size == 0:
        prob_sum += float(np.sum(prob[1:]))
    else:
        # Finish accumulating the current group (rows 1..new_group_rows[0]-1)
        if new_group_rows[0] > 1:
            prob_sum += float(np.sum(prob[1:new_group_rows[0]]))
        if not np.isclose(prob_sum, 1.0, atol=1e-6):
            raise OasisException(
                f"Error: Probabilities for vulnerability_id {vuln[0]} and intensity_bin_id {intb[0]} do not sum to 1. "
                f"total probability = {prob_sum}."
            )
        # Sum all new groups; last group may be incomplete so check all but the last
        rel_starts = new_group_rows - int(new_group_rows[0])
        group_sums = np.add.reduceat(prob[new_group_rows[0]:], rel_starts)
        if group_sums.size > 1:
            bad = np.flatnonzero(~np.isclose(group_sums[:-1], 1.0, atol=1e-6))
            if bad.size:
                g = int(bad[0])
                row = int(new_group_rows[g])
                raise OasisException(
                    f"Error: Probabilities for vulnerability_id {vuln[row]} and intensity_bin_id {intb[row]} do not sum to 1. "
                    f"total probability = {group_sums[g]}."
                )
        prob_sum = float(group_sums[-1])

    return int(vuln[-1]), int(intb[-1]), int(dmgb[-1]), prob_sum


def _check_int_bins_rolling(v_id, curr_int_ids, prev_int_ids, logger):
    """O(k) rolling int-bin completeness check. Compares current vuln's int-ids against
    the previous vuln's. Returns curr_int_ids to use as prev on the next call."""
    if prev_int_ids is not None:
        missing = np.setdiff1d(prev_int_ids, curr_int_ids)
        extra = np.setdiff1d(curr_int_ids, prev_int_ids)
        if missing.size:
            logger.warning(
                f"WARNING: vulnerability_id {v_id} is missing intensity_bin_ids: {missing.tolist()}"
            )
        if extra.size:
            logger.warning(
                f"WARNING: vulnerability_id {v_id} has unexpected intensity_bin_ids: {extra.tolist()}"
            )
        if missing.size or extra.size:
            logger.warning(
                "All intensity bins must be present for each vulnerability ID in single peril models."
            )
    return curr_int_ids


def _flush_vuln(v_id, rows, file_out, idx_file_out, max_damage_bin_idx, zip_files, offset):
    bin_data = np.empty(len(rows), dtype=VulnerabilityRow_dtype)
    bin_data["intensity_bin_id"] = rows["intensity_bin_id"]
    bin_data["damage_bin_id"] = rows["damage_bin_id"]
    bin_data["probability"] = rows["probability"]

    if np.any(bin_data["damage_bin_id"] > max_damage_bin_idx):
        raise OasisException(
            f"Error: Found damage_bin_id in data larger than max_damage_bin_idx: {max_damage_bin_idx}"
        )

    bin_bytes = bin_data.tobytes()
    dsize = 0
    if zip_files:
        dsize = len(bin_bytes)
        bin_bytes = zlib.compress(bin_bytes)
    file_out.write(bin_bytes)
    size = len(bin_bytes)
    np.array([(v_id, offset, size, dsize)], dtype=VulnerabilityIndex_dtype).tofile(idx_file_out)
    return offset + size


[docs] def vulnerability_tobin( stack, file_in, file_out, file_type, idx_file_out, max_damage_bin_idx, no_validation, suppress_int_bin_checks, zip_files ): from oasislmf.pytools.converters.csvtobin.manager import logger dtype = TOOL_INFO[file_type]["dtype"] # Write max_damage_bin header np.array([max_damage_bin_idx], dtype=np.int32).tofile(file_out) if zip_files and idx_file_out is None: raise OasisException( f"Error: Cannot write zip files without provided idx_file_out zip path, currently {idx_file_out}" ) has_idx = idx_file_out is not None if has_idx: idx_file_out = resolve_file(idx_file_out, "wb", stack) first_chunk = True any_data = False row_offset = 0 state = (0, 0, 0, 0.0) # (prev_vuln, prev_int, prev_dmg, prob_sum) offset = np.dtype(np.int32).itemsize # past header partial_vuln_id = None partial_chunks = [] prev_int_ids = None # O(k) rolling int-bin state for chunk in iter_csv_as_ndarray(stack, file_in, dtype): if len(chunk) == 0: continue if not no_validation: state = _validate_chunk(chunk, state, first_chunk, row_offset) first_chunk = False any_data = True row_offset += len(chunk) # No-idx path: write raw chunk directly if not has_idx: chunk.tofile(file_out) if not suppress_int_bin_checks: vuln = chunk['vulnerability_id'] intb = chunk['intensity_bin_id'] bounds = np.concatenate([[0], np.flatnonzero(np.diff(vuln) != 0) + 1, [len(chunk)]]) for i in range(len(bounds) - 1): v_id = int(vuln[bounds[i]]) curr = np.unique(intb[bounds[i]:bounds[i + 1]]) prev_int_ids = _check_int_bins_rolling(v_id, curr, prev_int_ids, logger) continue # Idx path: buffer per-vuln, flush when vuln changes vuln_ids = chunk['vulnerability_id'] pos = 0 # Continue partial vuln from previous chunk if partial_vuln_id is not None: end = int(np.searchsorted(vuln_ids, partial_vuln_id, side='right')) partial_chunks.append(chunk[:end]) pos = end if pos == len(chunk): continue # Partial vuln is now complete — flush it all_rows = np.concatenate(partial_chunks) offset = _flush_vuln(partial_vuln_id, all_rows, file_out, idx_file_out, max_damage_bin_idx, zip_files, offset) if not suppress_int_bin_checks: curr = np.unique(all_rows['intensity_bin_id']) prev_int_ids = _check_int_bins_rolling(partial_vuln_id, curr, prev_int_ids, logger) partial_vuln_id = None partial_chunks = [] remaining = vuln_ids[pos:] if len(remaining) == 0: continue # Find vuln-change boundaries in remaining rows changes = np.flatnonzero(np.diff(remaining)) + 1 if len(remaining) > 1 \ else np.empty(0, dtype=np.intp) rel_starts = np.concatenate([[np.intp(0)], changes]) rel_ends = np.append(changes, [np.intp(len(remaining))]) n_complete = len(rel_starts) - 1 # Flush all complete vulns (all but the last group) for i in range(n_complete): s = pos + int(rel_starts[i]) e = pos + int(rel_ends[i]) v_id = int(vuln_ids[s]) rows = chunk[s:e] offset = _flush_vuln(v_id, rows, file_out, idx_file_out, max_damage_bin_idx, zip_files, offset) if not suppress_int_bin_checks: curr = np.unique(rows['intensity_bin_id']) prev_int_ids = _check_int_bins_rolling(v_id, curr, prev_int_ids, logger) # Buffer the last group — may be incomplete until next chunk last_start = pos + int(rel_starts[-1]) partial_vuln_id = int(vuln_ids[last_start]) partial_chunks = [chunk[last_start:]] # Flush final buffered vuln if has_idx and partial_vuln_id is not None: all_rows = np.concatenate(partial_chunks) offset = _flush_vuln(partial_vuln_id, all_rows, file_out, idx_file_out, max_damage_bin_idx, zip_files, offset) if not suppress_int_bin_checks: curr = np.unique(all_rows['intensity_bin_id']) _check_int_bins_rolling(partial_vuln_id, curr, prev_int_ids, logger) # Check final probability group if not no_validation and any_data: prev_vuln, prev_int, _, prob_sum = state if not np.isclose(prob_sum, 1.0, atol=1e-6): raise OasisException( f"Error: Probabilities for vulnerability_id {prev_vuln} and intensity_bin_id {prev_int} do not sum to 1. " f"total probability = {prob_sum}." )