# Footprint CSV → binary converter.
#
# The CSV is read in fixed-size chunks (via iter_csv_as_ndarray) so memory usage is O(chunk)
# regardless of file size. Each chunk is processed as follows:
#
# 1. Validation (unless no_validation=True): three Numba JIT checks run over the chunk,
# carrying state across chunk boundaries via scalar "prev_*" variables —
# sort order, probability sums per (event_id, areaperil_id) group, and
# duplicate intensity_bin_id detection.
#
# 2. Event boundary detection: a single np.diff pass finds all event-id change points,
# producing start/end slices for every event in the chunk in one vectorised step.
#
# 3. Writing: complete events (all rows present in this chunk) are batch-converted and
# written in one tobytes()/write() call (non-zip), or compressed individually per
# event (zip). Events that span a chunk boundary are buffered in partial_chunks and
# flushed once their final rows arrive in the next chunk.
#
# 4. Index: one (event_id, offset, size[, decompressed_size]) entry per event is
# accumulated and written to the .idx file after all chunks are processed.
#
# no_validation=True skips step 1 and writes events in whatever order they appear in the
# CSV — the caller is responsible for ensuring the input is sorted by (event_id, areaperil_id).
import zlib
import numba as nb
import numpy as np
from oasislmf.pytools.common.data import resolve_file
from oasislmf.pytools.converters.csvtobin.utils.common import iter_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.common import Event_dtype, EventIndexBin_dtype, EventIndexBinZ_dtype
from oasislmf.utils.exceptions import OasisException
@nb.njit(cache=True, error_model="numpy")
def _check_sorted(event_ids, areaperil_ids, prev_event_id, prev_areaperil_id, first_chunk):
"""Single-pass sort check. Returns (bad_idx, last_event_id, last_areaperil_id)."""
if len(event_ids) == 0:
return np.int64(-1), prev_event_id, prev_areaperil_id
if not first_chunk:
if event_ids[0] < prev_event_id or (
event_ids[0] == prev_event_id and areaperil_ids[0] < prev_areaperil_id):
return np.int64(0), event_ids[0], areaperil_ids[0]
for i in range(1, len(event_ids)):
if event_ids[i] < event_ids[i - 1] or (
event_ids[i] == event_ids[i - 1] and areaperil_ids[i] < areaperil_ids[i - 1]):
return np.int64(i), event_ids[i], areaperil_ids[i]
return np.int64(-1), event_ids[-1], areaperil_ids[-1]
@nb.njit(cache=True, error_model="numpy")
def _check_prob_sums(event_ids, areaperil_ids, probs,
prev_event_id, prev_areaperil_id, running_sum, first_chunk,
atol=1e-6):
"""Incremental probability sum check assuming sorted data.
Returns (bad_idx, last_event_id, last_areaperil_id, running_sum).
bad_idx=-1 means valid; the final group is not finalised here — check after last chunk."""
if len(event_ids) == 0:
return np.int64(-1), prev_event_id, prev_areaperil_id, running_sum
if first_chunk:
prev_event_id = event_ids[0]
prev_areaperil_id = areaperil_ids[0]
running_sum = np.float64(probs[0])
i_start = 1
else:
i_start = 0
for i in range(i_start, len(event_ids)):
if event_ids[i] != prev_event_id or areaperil_ids[i] != prev_areaperil_id:
if abs(running_sum - 1.0) > atol:
return np.int64(i - 1), prev_event_id, prev_areaperil_id, running_sum
running_sum = np.float64(probs[i])
prev_event_id = event_ids[i]
prev_areaperil_id = areaperil_ids[i]
else:
running_sum += probs[i]
return np.int64(-1), prev_event_id, prev_areaperil_id, running_sum
@nb.njit(cache=True, error_model="numpy")
def _check_duplicates(event_ids, areaperil_ids, intensity_bin_ids,
prev_event_id, prev_areaperil_id, prev_intensity_bin_id, first_chunk):
"""Single-pass duplicate intensity_bin_id check assuming sorted data.
Returns (bad_idx, last_event_id, last_areaperil_id, last_intensity_bin_id)."""
if len(event_ids) == 0:
return np.int64(-1), prev_event_id, prev_areaperil_id, prev_intensity_bin_id
if not first_chunk:
if (event_ids[0] == prev_event_id and areaperil_ids[0] == prev_areaperil_id
and intensity_bin_ids[0] == prev_intensity_bin_id):
return np.int64(0), event_ids[0], areaperil_ids[0], intensity_bin_ids[0]
for i in range(1, len(event_ids)):
if (event_ids[i] == event_ids[i - 1] and areaperil_ids[i] == areaperil_ids[i - 1]
and intensity_bin_ids[i] == intensity_bin_ids[i - 1]):
return np.int64(i), event_ids[i], areaperil_ids[i], intensity_bin_ids[i]
return np.int64(-1), event_ids[-1], areaperil_ids[-1], intensity_bin_ids[-1]
@nb.njit(cache=True, error_model="numpy")
def _exceeds_max_intensity(intensity_bin_ids, max_val):
"""Early-exit check for any intensity_bin_id exceeding max_val."""
for v in intensity_bin_ids:
if v > max_val:
return True
return False
def _validate_chunk(chunk, event_ids, areaperil_ids, first_chunk,
prev_sort_event, prev_sort_areaperil,
prev_prob_event, prev_prob_areaperil, running_sum,
prev_dup_event, prev_dup_areaperil, prev_dup_intensity):
"""Run all three streaming validation checks for one chunk. Returns updated carry state."""
# Check sorted by (event_id, areaperil_id)
bad_idx, prev_sort_event, prev_sort_areaperil = _check_sorted(
event_ids, areaperil_ids,
prev_sort_event, prev_sort_areaperil, first_chunk,
)
if bad_idx != -1:
raise OasisException(
f"IDs not in ascending order at row {bad_idx}: {chunk[bad_idx]}"
)
# Check probability sums to 1 per (event_id, areaperil_id) group
bad_idx, prev_prob_event, prev_prob_areaperil, running_sum = _check_prob_sums(
event_ids, areaperil_ids, np.ascontiguousarray(chunk["probability"]),
prev_prob_event, prev_prob_areaperil, running_sum, first_chunk,
)
if bad_idx != -1:
raise OasisException(
f"Probabilities do not sum to 1 for group ending at row {bad_idx}: "
f"event_id={prev_prob_event}, areaperil_id={prev_prob_areaperil}"
)
# Check no duplicate intensity_bin_id within a group
bad_idx, prev_dup_event, prev_dup_areaperil, prev_dup_intensity = _check_duplicates(
event_ids, areaperil_ids, np.ascontiguousarray(chunk["intensity_bin_id"]),
prev_dup_event, prev_dup_areaperil, prev_dup_intensity, first_chunk,
)
if bad_idx != -1:
raise OasisException(
f"Duplicate intensity_bin_id at row {bad_idx}: "
f"event_id={chunk['event_id'][bad_idx]}, areaperil_id={chunk['areaperil_id'][bad_idx]}, "
f"intensity_bin_id={chunk['intensity_bin_id'][bad_idx]}"
)
return (prev_sort_event, prev_sort_areaperil,
prev_prob_event, prev_prob_areaperil, running_sum,
prev_dup_event, prev_dup_areaperil, prev_dup_intensity)
def _flush_event(event_id, rows, file_out, idx_entries,
max_intensity_bin_idx, zip_files, decompressed_size, offset):
"""Convert, optionally compress, and write a single event. Used for partial events
(spanning chunk boundaries) and for the zip path where per-event compression is required."""
bin_data = np.empty(len(rows), dtype=Event_dtype)
bin_data["areaperil_id"] = rows["areaperil_id"]
bin_data["intensity_bin_id"] = rows["intensity_bin_id"]
bin_data["probability"] = rows["probability"]
if _exceeds_max_intensity(bin_data["intensity_bin_id"], max_intensity_bin_idx):
raise OasisException(
f"Error: Found intensity_bin_idx in data larger than max_intensity_bin_idx: {max_intensity_bin_idx}"
)
bin_bytes = bin_data.tobytes()
dsize = len(bin_bytes)
if zip_files:
bin_bytes = zlib.compress(bin_bytes)
file_out.write(bin_bytes)
size = len(bin_bytes)
if decompressed_size:
idx_entries.append((event_id, offset, size, dsize))
else:
idx_entries.append((event_id, offset, size))
return offset + size