# Vulnerability CSV → binary converter.
#
# The CSV is read in fixed-size chunks (via iter_csv_as_ndarray) so memory usage is O(chunk)
# regardless of file size. Two write paths exist depending on whether an index file is provided:
#
# No-idx path (idx_file_out=None):
# Each chunk is written directly as raw bytes (chunk.tofile). An optional rolling
# intensity-bin completeness check compares each vulnerability against the previous one
# in O(k) memory (k = number of distinct intensity bins), avoiding the O(V × k) dict
# that would accumulate all vulnerabilities.
#
# Idx path (idx_file_out provided):
# Rows are buffered per vulnerability_id using a partial-vuln buffer. When the
# vulnerability_id changes, all complete vulnerabilities in the chunk are flushed via
# _flush_vuln (field copy → VulnerabilityRow_dtype, optional zlib compress, write, emit
# one index entry). The last group in each chunk may be incomplete and is held in
# partial_chunks until the next chunk arrives. Boundary detection uses a single np.diff
# pass producing start/end slices in one vectorised step.
#
# Validation (unless no_validation=True):
# Streaming vectorised checks run over each chunk and carry scalar state across chunk
# boundaries via (prev_vuln, prev_int, prev_dmg, prob_sum):
# - vulnerability_id non-decreasing (np.diff < 0 check)
# - damage_bin_id contiguous within each (vulnerability_id, intensity_bin_id) group
# - damage_bin_id starts at 1 for each new group
# - intensity_bin_id contiguous within each vulnerability_id
# - probability sums to 1.0 per group (np.add.reduceat for complete groups)
# The final open group's probability sum is checked after the last chunk.
#
# no_validation=True skips all validation and writes data in whatever order it appears in
# the CSV — the caller is responsible for correct sort order on the idx path.
import zlib
import numpy as np
from oasislmf.pytools.common.data import resolve_file
from oasislmf.pytools.converters.csvtobin.utils.common import iter_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.manager import VulnerabilityIndex_dtype, VulnerabilityRow_dtype
from oasislmf.utils.exceptions import OasisException
def _validate_chunk(chunk, state, first_chunk, row_offset):
"""Streaming vectorized validation. Returns updated state = (prev_vuln, prev_int, prev_dmg, prob_sum)."""
if len(chunk) == 0:
return state
prev_vuln, prev_int, prev_dmg, prob_sum = state
n = len(chunk)
vuln = chunk['vulnerability_id']
intb = chunk['intensity_bin_id']
dmgb = chunk['damage_bin_id']
prob = chunk['probability']
# --- Row 0: cross-chunk boundary or first row ever ---
if first_chunk:
prob_sum = float(prob[0])
else:
if vuln[0] < prev_vuln:
raise OasisException(
f"Error: Vulnerability IDs {prev_vuln} and {vuln[0]} at row {row_offset} are not in ascending order."
)
new_group = (vuln[0] != prev_vuln) or (intb[0] != prev_int)
if new_group:
if not np.isclose(prob_sum, 1.0, atol=1e-6):
raise OasisException(
f"Error: Probabilities for vulnerability_id {prev_vuln} and intensity_bin_id {prev_int} do not sum to 1. "
f"total probability = {prob_sum}."
)
prob_sum = float(prob[0])
else:
prob_sum += float(prob[0])
if n == 1:
return int(vuln[0]), int(intb[0]), int(dmgb[0]), prob_sum
# --- Vectorized checks for rows 1..n-1 ---
dv = np.diff(vuln)
di = np.diff(intb)
bad = np.flatnonzero(dv < 0)
if bad.size:
i = int(bad[0]) + 1
raise OasisException(
f"Error: Vulnerability IDs {vuln[i - 1]} and {vuln[i]} at row {row_offset + i} are not in ascending order."
)
group_ends = np.union1d(np.flatnonzero(dv != 0), np.flatnonzero(di != 0))
new_group_rows = group_ends + 1
# Probability sums
if new_group_rows.size == 0:
prob_sum += float(np.sum(prob[1:]))
else:
# Finish accumulating the current group (rows 1..new_group_rows[0]-1)
if new_group_rows[0] > 1:
prob_sum += float(np.sum(prob[1:new_group_rows[0]]))
if not np.isclose(prob_sum, 1.0, atol=1e-6):
raise OasisException(
f"Error: Probabilities for vulnerability_id {vuln[0]} and intensity_bin_id {intb[0]} do not sum to 1. "
f"total probability = {prob_sum}."
)
# Sum all new groups; last group may be incomplete so check all but the last
rel_starts = new_group_rows - int(new_group_rows[0])
group_sums = np.add.reduceat(prob[new_group_rows[0]:], rel_starts)
if group_sums.size > 1:
bad = np.flatnonzero(~np.isclose(group_sums[:-1], 1.0, atol=1e-6))
if bad.size:
g = int(bad[0])
row = int(new_group_rows[g])
raise OasisException(
f"Error: Probabilities for vulnerability_id {vuln[row]} and intensity_bin_id {intb[row]} do not sum to 1. "
f"total probability = {group_sums[g]}."
)
prob_sum = float(group_sums[-1])
return int(vuln[-1]), int(intb[-1]), int(dmgb[-1]), prob_sum
def _check_int_bins_rolling(v_id, curr_int_ids, prev_int_ids, logger):
"""O(k) rolling int-bin completeness check. Compares current vuln's int-ids against
the previous vuln's. Returns curr_int_ids to use as prev on the next call."""
if prev_int_ids is not None:
missing = np.setdiff1d(prev_int_ids, curr_int_ids)
extra = np.setdiff1d(curr_int_ids, prev_int_ids)
if missing.size:
logger.warning(
f"WARNING: vulnerability_id {v_id} is missing intensity_bin_ids: {missing.tolist()}"
)
if extra.size:
logger.warning(
f"WARNING: vulnerability_id {v_id} has unexpected intensity_bin_ids: {extra.tolist()}"
)
if missing.size or extra.size:
logger.warning(
"All intensity bins must be present for each vulnerability ID in single peril models."
)
return curr_int_ids
def _flush_vuln(v_id, rows, file_out, idx_file_out, max_damage_bin_idx, zip_files, offset):
bin_data = np.empty(len(rows), dtype=VulnerabilityRow_dtype)
bin_data["intensity_bin_id"] = rows["intensity_bin_id"]
bin_data["damage_bin_id"] = rows["damage_bin_id"]
bin_data["probability"] = rows["probability"]
if np.any(bin_data["damage_bin_id"] > max_damage_bin_idx):
raise OasisException(
f"Error: Found damage_bin_id in data larger than max_damage_bin_idx: {max_damage_bin_idx}"
)
bin_bytes = bin_data.tobytes()
dsize = 0
if zip_files:
dsize = len(bin_bytes)
bin_bytes = zlib.compress(bin_bytes)
file_out.write(bin_bytes)
size = len(bin_bytes)
np.array([(v_id, offset, size, dsize)], dtype=VulnerabilityIndex_dtype).tofile(idx_file_out)
return offset + size
[docs]
def vulnerability_tobin(
stack,
file_in,
file_out,
file_type,
idx_file_out,
max_damage_bin_idx,
no_validation,
suppress_int_bin_checks,
zip_files
):
from oasislmf.pytools.converters.csvtobin.manager import logger
dtype = TOOL_INFO[file_type]["dtype"]
# Write max_damage_bin header
np.array([max_damage_bin_idx], dtype=np.int32).tofile(file_out)
if zip_files and idx_file_out is None:
raise OasisException(
f"Error: Cannot write zip files without provided idx_file_out zip path, currently {idx_file_out}"
)
has_idx = idx_file_out is not None
if has_idx:
idx_file_out = resolve_file(idx_file_out, "wb", stack)
first_chunk = True
any_data = False
row_offset = 0
state = (0, 0, 0, 0.0) # (prev_vuln, prev_int, prev_dmg, prob_sum)
offset = np.dtype(np.int32).itemsize # past header
partial_vuln_id = None
partial_chunks = []
prev_int_ids = None # O(k) rolling int-bin state
for chunk in iter_csv_as_ndarray(stack, file_in, dtype):
if len(chunk) == 0:
continue
if not no_validation:
state = _validate_chunk(chunk, state, first_chunk, row_offset)
first_chunk = False
any_data = True
row_offset += len(chunk)
# No-idx path: write raw chunk directly
if not has_idx:
chunk.tofile(file_out)
if not suppress_int_bin_checks:
vuln = chunk['vulnerability_id']
intb = chunk['intensity_bin_id']
bounds = np.concatenate([[0], np.flatnonzero(np.diff(vuln) != 0) + 1, [len(chunk)]])
for i in range(len(bounds) - 1):
v_id = int(vuln[bounds[i]])
curr = np.unique(intb[bounds[i]:bounds[i + 1]])
prev_int_ids = _check_int_bins_rolling(v_id, curr, prev_int_ids, logger)
continue
# Idx path: buffer per-vuln, flush when vuln changes
vuln_ids = chunk['vulnerability_id']
pos = 0
# Continue partial vuln from previous chunk
if partial_vuln_id is not None:
end = int(np.searchsorted(vuln_ids, partial_vuln_id, side='right'))
partial_chunks.append(chunk[:end])
pos = end
if pos == len(chunk):
continue
# Partial vuln is now complete — flush it
all_rows = np.concatenate(partial_chunks)
offset = _flush_vuln(partial_vuln_id, all_rows, file_out, idx_file_out,
max_damage_bin_idx, zip_files, offset)
if not suppress_int_bin_checks:
curr = np.unique(all_rows['intensity_bin_id'])
prev_int_ids = _check_int_bins_rolling(partial_vuln_id, curr, prev_int_ids, logger)
partial_vuln_id = None
partial_chunks = []
remaining = vuln_ids[pos:]
if len(remaining) == 0:
continue
# Find vuln-change boundaries in remaining rows
changes = np.flatnonzero(np.diff(remaining)) + 1 if len(remaining) > 1 \
else np.empty(0, dtype=np.intp)
rel_starts = np.concatenate([[np.intp(0)], changes])
rel_ends = np.append(changes, [np.intp(len(remaining))])
n_complete = len(rel_starts) - 1
# Flush all complete vulns (all but the last group)
for i in range(n_complete):
s = pos + int(rel_starts[i])
e = pos + int(rel_ends[i])
v_id = int(vuln_ids[s])
rows = chunk[s:e]
offset = _flush_vuln(v_id, rows, file_out, idx_file_out,
max_damage_bin_idx, zip_files, offset)
if not suppress_int_bin_checks:
curr = np.unique(rows['intensity_bin_id'])
prev_int_ids = _check_int_bins_rolling(v_id, curr, prev_int_ids, logger)
# Buffer the last group — may be incomplete until next chunk
last_start = pos + int(rel_starts[-1])
partial_vuln_id = int(vuln_ids[last_start])
partial_chunks = [chunk[last_start:]]
# Flush final buffered vuln
if has_idx and partial_vuln_id is not None:
all_rows = np.concatenate(partial_chunks)
offset = _flush_vuln(partial_vuln_id, all_rows, file_out, idx_file_out,
max_damage_bin_idx, zip_files, offset)
if not suppress_int_bin_checks:
curr = np.unique(all_rows['intensity_bin_id'])
_check_int_bins_rolling(partial_vuln_id, curr, prev_int_ids, logger)
# Check final probability group
if not no_validation and any_data:
prev_vuln, prev_int, _, prob_sum = state
if not np.isclose(prob_sum, 1.0, atol=1e-6):
raise OasisException(
f"Error: Probabilities for vulnerability_id {prev_vuln} and intensity_bin_id {prev_int} do not sum to 1. "
f"total probability = {prob_sum}."
)