# Vulnerability binary → CSV converter.
#
# The binary vulnerability file is memory-mapped (np.memmap) so only pages actually
# read are loaded — memory usage is proportional to the rows being output, not file size.
# Three paths exist depending on idx_file_in and zip_files:
#
# No-idx path:
# The binary already contains vulnerability_id in every row (written as the full CSV
# dtype by csvtobin). Memory-mapped and written to CSV in fixed _BATCH_ROWS chunks.
#
# Non-zip idx path (_vulnerability_tocsv_bin):
# The binary is memory-mapped as u1 and viewed past the 4-byte header as
# VulnerabilityRow_dtype (zero-copy). A JIT inner loop (_fill_next_vuln_batch) packs
# complete vulnerabilities into a fixed pre-allocated output buffer (_BATCH_ROWS rows),
# adding vulnerability_id from the index. The buffer is flushed once per batch.
# Vulnerabilities whose row count exceeds the batch size are handled directly.
#
# Zip idx path (_vulnerability_tocsv_zip):
# The binary is memory-mapped as u1. Each vulnerability is decompressed individually
# with zlib. Since VulnerabilityIndex always carries original_size, a single reusable
# output buffer is pre-allocated to max(original_size) and reused for every
# vulnerability, avoiding a heap allocation per decompressed block.
import zlib
import numba as nb
import numpy as np
import sys
from oasislmf.pytools.common.data import resolve_file, write_ndarray_to_fmt_csv
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.manager import VulnerabilityIndex_dtype, VulnerabilityRow_dtype
from oasislmf.utils.exceptions import OasisException
_BATCH_ROWS = 1 << 13 # 8 K rows
_HEADER_SIZE = 4 # int32 max_damage_bin_idx
@nb.njit(cache=True, error_model="numpy")
def _fill_next_vuln_batch(
out_vuln_id, out_int_bin, out_dmg_bin, out_prob,
index_vuln_ids, elem_offsets, elem_counts,
fp_int_bin, fp_dmg_bin, fp_prob,
vuln_start,
):
"""Fill output arrays with complete vulnerabilities starting from *vuln_start*.
Stops when the batch buffer is full (never splits a vulnerability across batches)
or all vulnerabilities have been consumed.
Returns (rows_filled, vulns_consumed).
vulns_consumed == 0 means the next vulnerability alone exceeds the buffer; the
caller must handle it directly.
"""
max_rows = len(out_vuln_id)
out_idx = 0
i = vuln_start
while i < len(index_vuln_ids):
n = elem_counts[i]
if out_idx + n > max_rows:
break
elem_start = elem_offsets[i]
vid = index_vuln_ids[i]
for j in range(n):
out_vuln_id[out_idx] = vid
out_int_bin[out_idx] = fp_int_bin[elem_start + j]
out_dmg_bin[out_idx] = fp_dmg_bin[elem_start + j]
out_prob[out_idx] = fp_prob[elem_start + j]
out_idx += 1
i += 1
return out_idx, i - vuln_start
def _vulnerability_tocsv_bin(bin_data, idx_data, file_out, dtype, headers, fmt):
"""Non-zip idx path: JIT batch loop over vulnerabilities; one write per ~_BATCH_ROWS rows."""
item_size = VulnerabilityRow_dtype.itemsize
data_end = int(idx_data['offset'][-1] + idx_data['size'][-1])
vuln_data = bin_data[_HEADER_SIZE:data_end].view(VulnerabilityRow_dtype)
fp_int_bin = vuln_data['intensity_bin_id']
fp_dmg_bin = vuln_data['damage_bin_id']
fp_prob = vuln_data['probability']
elem_offsets = (idx_data['offset'].astype(np.int64) - _HEADER_SIZE) // item_size
elem_counts = idx_data['size'].astype(np.int64) // item_size
actual_batch_rows = min(_BATCH_ROWS, int(elem_counts.sum()))
batch_data = np.empty(actual_batch_rows, dtype=dtype)
vuln_cursor = 0
n_vulns = len(idx_data)
while vuln_cursor < n_vulns:
n_rows, consumed = _fill_next_vuln_batch(
batch_data['vulnerability_id'], batch_data['intensity_bin_id'],
batch_data['damage_bin_id'], batch_data['probability'],
idx_data['vulnerability_id'], elem_offsets, elem_counts,
fp_int_bin, fp_dmg_bin, fp_prob,
vuln_cursor,
)
if consumed == 0:
# Single vulnerability exceeds _BATCH_ROWS; allocate exactly and write directly
n = int(elem_counts[vuln_cursor])
es = int(elem_offsets[vuln_cursor])
large_buf = np.empty(n, dtype=dtype)
large_buf['vulnerability_id'] = int(idx_data['vulnerability_id'][vuln_cursor])
large_buf['intensity_bin_id'] = fp_int_bin[es:es + n]
large_buf['damage_bin_id'] = fp_dmg_bin[es:es + n]
large_buf['probability'] = fp_prob[es:es + n]
write_ndarray_to_fmt_csv(file_out, large_buf, headers, fmt)
consumed = 1
else:
write_ndarray_to_fmt_csv(file_out, batch_data[:n_rows], headers, fmt)
vuln_cursor += consumed
def _vulnerability_tocsv_zip(bin_data, idx_data, file_out, dtype, headers, fmt):
"""Zip path: decompress per vulnerability into a single reusable buffer."""
max_vuln_elems = int(idx_data['original_size'].max()) // VulnerabilityRow_dtype.itemsize
buf = np.empty(max_vuln_elems, dtype=dtype)
for row in idx_data:
vuln_id = int(row['vulnerability_id'])
offset = int(row['offset'])
size = int(row['size'])
n = int(row['original_size']) // VulnerabilityRow_dtype.itemsize
vuln_rows = np.frombuffer(
zlib.decompress(bin_data[offset:offset + size].tobytes()),
dtype=VulnerabilityRow_dtype,
)
buf_v = buf[:n]
buf_v['vulnerability_id'] = vuln_id
buf_v['intensity_bin_id'] = vuln_rows['intensity_bin_id']
buf_v['damage_bin_id'] = vuln_rows['damage_bin_id']
buf_v['probability'] = vuln_rows['probability']
write_ndarray_to_fmt_csv(file_out, buf_v, headers, fmt)
[docs]
def vulnerability_tocsv(stack, file_in, file_out, file_type, noheader, idx_file_in, zip_files):
headers = TOOL_INFO[file_type]["headers"]
dtype = TOOL_INFO[file_type]["dtype"]
fmt = TOOL_INFO[file_type]["fmt"]
file_in = resolve_file(file_in, "rb", stack)
if not noheader:
file_out.write(",".join(headers) + "\n")
if idx_file_in is None:
if zip_files:
raise OasisException(
f"Error: Cannot read zip files without provided idx_file_in zip path, currently {idx_file_in}"
)
# No-idx: binary contains vulnerability_id in every row (full CSV dtype)
if file_in == sys.stdin.buffer:
data = np.frombuffer(file_in.read(), dtype=dtype, offset=_HEADER_SIZE)
else:
data = np.memmap(file_in, dtype=dtype, mode='r', offset=_HEADER_SIZE)
for start in range(0, len(data), _BATCH_ROWS):
write_ndarray_to_fmt_csv(file_out, data[start:start + _BATCH_ROWS], headers, fmt)
else:
if file_in == sys.stdin.buffer:
bin_data = np.frombuffer(file_in.read(), dtype='u1')
else:
bin_data = np.memmap(file_in, dtype='u1', mode='r')
idx_data = np.memmap(idx_file_in, dtype=VulnerabilityIndex_dtype, mode='r')
if zip_files:
_vulnerability_tocsv_zip(bin_data, idx_data, file_out, dtype, headers, fmt)
else:
_vulnerability_tocsv_bin(bin_data, idx_data, file_out, dtype, headers, fmt)