Source code for oasislmf.pytools.converters.bintocsv.utils.vulnerability

# Vulnerability binary → CSV converter.
#
# The binary vulnerability file is memory-mapped (np.memmap) so only pages actually
# read are loaded — memory usage is proportional to the rows being output, not file size.
# Three paths exist depending on idx_file_in and zip_files:
#
#   No-idx path:
#     The binary already contains vulnerability_id in every row (written as the full CSV
#     dtype by csvtobin). Memory-mapped and written to CSV in fixed _BATCH_ROWS chunks.
#
#   Non-zip idx path (_vulnerability_tocsv_bin):
#     The binary is memory-mapped as u1 and viewed past the 4-byte header as
#     VulnerabilityRow_dtype (zero-copy). A JIT inner loop (_fill_next_vuln_batch) packs
#     complete vulnerabilities into a fixed pre-allocated output buffer (_BATCH_ROWS rows),
#     adding vulnerability_id from the index. The buffer is flushed once per batch.
#     Vulnerabilities whose row count exceeds the batch size are handled directly.
#
#   Zip idx path (_vulnerability_tocsv_zip):
#     The binary is memory-mapped as u1. Each vulnerability is decompressed individually
#     with zlib. Since VulnerabilityIndex always carries original_size, a single reusable
#     output buffer is pre-allocated to max(original_size) and reused for every
#     vulnerability, avoiding a heap allocation per decompressed block.

import zlib
import numba as nb
import numpy as np
import sys

from oasislmf.pytools.common.data import resolve_file, write_ndarray_to_fmt_csv
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.manager import VulnerabilityIndex_dtype, VulnerabilityRow_dtype
from oasislmf.utils.exceptions import OasisException

_BATCH_ROWS = 1 << 13  # 8 K rows
_HEADER_SIZE = 4       # int32 max_damage_bin_idx


@nb.njit(cache=True, error_model="numpy")
def _fill_next_vuln_batch(
    out_vuln_id, out_int_bin, out_dmg_bin, out_prob,
    index_vuln_ids, elem_offsets, elem_counts,
    fp_int_bin, fp_dmg_bin, fp_prob,
    vuln_start,
):
    """Fill output arrays with complete vulnerabilities starting from *vuln_start*.

    Stops when the batch buffer is full (never splits a vulnerability across batches)
    or all vulnerabilities have been consumed.

    Returns (rows_filled, vulns_consumed).
    vulns_consumed == 0 means the next vulnerability alone exceeds the buffer; the
    caller must handle it directly.
    """
    max_rows = len(out_vuln_id)
    out_idx = 0
    i = vuln_start
    while i < len(index_vuln_ids):
        n = elem_counts[i]
        if out_idx + n > max_rows:
            break
        elem_start = elem_offsets[i]
        vid = index_vuln_ids[i]
        for j in range(n):
            out_vuln_id[out_idx] = vid
            out_int_bin[out_idx] = fp_int_bin[elem_start + j]
            out_dmg_bin[out_idx] = fp_dmg_bin[elem_start + j]
            out_prob[out_idx] = fp_prob[elem_start + j]
            out_idx += 1
        i += 1
    return out_idx, i - vuln_start


def _vulnerability_tocsv_bin(bin_data, idx_data, file_out, dtype, headers, fmt):
    """Non-zip idx path: JIT batch loop over vulnerabilities; one write per ~_BATCH_ROWS rows."""
    item_size = VulnerabilityRow_dtype.itemsize
    data_end = int(idx_data['offset'][-1] + idx_data['size'][-1])
    vuln_data = bin_data[_HEADER_SIZE:data_end].view(VulnerabilityRow_dtype)
    fp_int_bin = vuln_data['intensity_bin_id']
    fp_dmg_bin = vuln_data['damage_bin_id']
    fp_prob = vuln_data['probability']

    elem_offsets = (idx_data['offset'].astype(np.int64) - _HEADER_SIZE) // item_size
    elem_counts = idx_data['size'].astype(np.int64) // item_size

    actual_batch_rows = min(_BATCH_ROWS, int(elem_counts.sum()))
    batch_data = np.empty(actual_batch_rows, dtype=dtype)
    vuln_cursor = 0
    n_vulns = len(idx_data)

    while vuln_cursor < n_vulns:
        n_rows, consumed = _fill_next_vuln_batch(
            batch_data['vulnerability_id'], batch_data['intensity_bin_id'],
            batch_data['damage_bin_id'], batch_data['probability'],
            idx_data['vulnerability_id'], elem_offsets, elem_counts,
            fp_int_bin, fp_dmg_bin, fp_prob,
            vuln_cursor,
        )
        if consumed == 0:
            # Single vulnerability exceeds _BATCH_ROWS; allocate exactly and write directly
            n = int(elem_counts[vuln_cursor])
            es = int(elem_offsets[vuln_cursor])
            large_buf = np.empty(n, dtype=dtype)
            large_buf['vulnerability_id'] = int(idx_data['vulnerability_id'][vuln_cursor])
            large_buf['intensity_bin_id'] = fp_int_bin[es:es + n]
            large_buf['damage_bin_id'] = fp_dmg_bin[es:es + n]
            large_buf['probability'] = fp_prob[es:es + n]
            write_ndarray_to_fmt_csv(file_out, large_buf, headers, fmt)
            consumed = 1
        else:
            write_ndarray_to_fmt_csv(file_out, batch_data[:n_rows], headers, fmt)
        vuln_cursor += consumed


def _vulnerability_tocsv_zip(bin_data, idx_data, file_out, dtype, headers, fmt):
    """Zip path: decompress per vulnerability into a single reusable buffer."""
    max_vuln_elems = int(idx_data['original_size'].max()) // VulnerabilityRow_dtype.itemsize
    buf = np.empty(max_vuln_elems, dtype=dtype)

    for row in idx_data:
        vuln_id = int(row['vulnerability_id'])
        offset = int(row['offset'])
        size = int(row['size'])
        n = int(row['original_size']) // VulnerabilityRow_dtype.itemsize
        vuln_rows = np.frombuffer(
            zlib.decompress(bin_data[offset:offset + size].tobytes()),
            dtype=VulnerabilityRow_dtype,
        )
        buf_v = buf[:n]
        buf_v['vulnerability_id'] = vuln_id
        buf_v['intensity_bin_id'] = vuln_rows['intensity_bin_id']
        buf_v['damage_bin_id'] = vuln_rows['damage_bin_id']
        buf_v['probability'] = vuln_rows['probability']
        write_ndarray_to_fmt_csv(file_out, buf_v, headers, fmt)


[docs] def vulnerability_tocsv(stack, file_in, file_out, file_type, noheader, idx_file_in, zip_files): headers = TOOL_INFO[file_type]["headers"] dtype = TOOL_INFO[file_type]["dtype"] fmt = TOOL_INFO[file_type]["fmt"] file_in = resolve_file(file_in, "rb", stack) if not noheader: file_out.write(",".join(headers) + "\n") if idx_file_in is None: if zip_files: raise OasisException( f"Error: Cannot read zip files without provided idx_file_in zip path, currently {idx_file_in}" ) # No-idx: binary contains vulnerability_id in every row (full CSV dtype) if file_in == sys.stdin.buffer: data = np.frombuffer(file_in.read(), dtype=dtype, offset=_HEADER_SIZE) else: data = np.memmap(file_in, dtype=dtype, mode='r', offset=_HEADER_SIZE) for start in range(0, len(data), _BATCH_ROWS): write_ndarray_to_fmt_csv(file_out, data[start:start + _BATCH_ROWS], headers, fmt) else: if file_in == sys.stdin.buffer: bin_data = np.frombuffer(file_in.read(), dtype='u1') else: bin_data = np.memmap(file_in, dtype='u1', mode='r') idx_data = np.memmap(idx_file_in, dtype=VulnerabilityIndex_dtype, mode='r') if zip_files: _vulnerability_tocsv_zip(bin_data, idx_data, file_out, dtype, headers, fmt) else: _vulnerability_tocsv_bin(bin_data, idx_data, file_out, dtype, headers, fmt)