# Footprint binary → CSV converter.
#
# The binary footprint is memory-mapped (np.memmap) so only pages that are actually
# read are loaded — memory usage is proportional to the events being output, not file size.
# Processing proceeds as follows:
#
# 1. Index loading: the .idx file is memory-mapped as EventIndexBin_dtype or
# EventIndexBinZ_dtype (when the binary carries a decompressed-size field).
# If the index is out of order it is sorted by event_id once upfront via argsort;
# the common case (already sorted) skips this entirely.
#
# 2. Event range filtering: when event_from_to is supplied the index is masked to the
# requested range in one vectorised step before any footprint data is touched.
#
# 3. Output — two paths depending on zip_files:
#
# Non-zip (_footprint_tocsv_bin): a JIT inner loop (_fill_next_batch) packs complete
# events into a fixed pre-allocated output buffer (_BATCH_ROWS rows) and flushes it
# to write_ndarray_to_fmt_csv in one call per batch. Single events that exceed the
# batch size are written directly with an exact-size allocation.
#
# Zip (_footprint_tocsv_zip): each event is decompressed individually with zlib. When
# the index carries a d_size field (decompressed size), a single reusable buffer is
# pre-allocated at the maximum event size; otherwise a new buffer is allocated per event.
import zlib
import numba as nb
import numpy as np
import re
import sys
from oasislmf.pytools.common.data import resolve_file, write_ndarray_to_fmt_csv
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.common import (
Event_dtype, EventIndexBin_dtype, EventIndexBinZ_dtype, FootprintHeader
)
# Number of output rows to accumulate before flushing to write_ndarray_to_fmt_csv.
_BATCH_ROWS = 1 << 13 # 8 K rows
@nb.njit(cache=True, error_model="numpy")
def _fill_next_batch(
out_event_id, out_areaperil_id, out_intensity_bin_id, out_probability,
index_event_ids, elem_offsets, event_elem_counts,
fp_areaperil_id, fp_intensity_bin_id, fp_probability,
event_start,
):
"""Fill output arrays with complete events starting from *event_start*.
Stops when the batch buffer is full (never splits an event across batches)
or all events have been consumed.
Returns (rows_filled, events_consumed).
events_consumed == 0 means the next event alone exceeds the buffer; the
caller must handle it directly.
"""
max_rows = len(out_event_id)
out_idx = 0
i = event_start
while i < len(index_event_ids):
n = event_elem_counts[i]
if out_idx + n > max_rows:
break
elem_start = elem_offsets[i]
eid = index_event_ids[i]
for j in range(n):
out_event_id[out_idx] = eid
out_areaperil_id[out_idx] = fp_areaperil_id[elem_start + j]
out_intensity_bin_id[out_idx] = fp_intensity_bin_id[elem_start + j]
out_probability[out_idx] = fp_probability[elem_start + j]
out_idx += 1
i += 1
return out_idx, i - event_start
def _check_event_from_to(event_from_to):
from_event = -1
to_event = -1
if event_from_to is None:
return True, from_event, to_event
regex_match = re.fullmatch(r'(\d+)-(\d+)', event_from_to)
if not regex_match:
raise ValueError(f"Invalid format for event_from_to string: {event_from_to}. String must be of format \"[int1]-[int2]\"")
from_event, to_event = map(int, regex_match.groups())
if from_event > to_event:
raise ValueError(f"Invalid event range: {from_event} > {to_event}")
return False, from_event, to_event
def _read_footprint_zips(stack, file_in, idx_file_in):
footprint_file = resolve_file(file_in, mode="rb", stack=stack)
if footprint_file == sys.stdin.buffer:
footprint = np.frombuffer(footprint_file.read(), dtype="u1")
else:
footprint = np.memmap(footprint_file, dtype="u1", mode='r')
footprint_header = np.frombuffer(footprint[:FootprintHeader.size].tobytes(), dtype=FootprintHeader)
uncompressedMask = 1 << 1
uncompressed_size = int(footprint_header['has_intensity_uncertainty'].item() & uncompressedMask)
if uncompressed_size:
index_dtype = EventIndexBinZ_dtype
else:
index_dtype = EventIndexBin_dtype
footprint_index_file = resolve_file(idx_file_in, mode="rb", stack=stack)
footprint_index = np.memmap(footprint_index_file, dtype=index_dtype, mode='r')
return footprint, footprint_index
def _read_footprint_bins(stack, file_in, idx_file_in):
footprint_file = resolve_file(file_in, mode="rb", stack=stack)
if footprint_file == sys.stdin.buffer:
footprint = np.frombuffer(footprint_file.read(), dtype="u1")
else:
footprint = np.memmap(footprint_file, dtype="u1", mode='r')
footprint_index_file = resolve_file(idx_file_in, mode="rb", stack=stack)
footprint_index = np.memmap(footprint_index_file, dtype=EventIndexBin_dtype, mode='r')
return footprint, footprint_index
def _footprint_tocsv_bin(footprint, sorted_index, file_out, dtype, headers, fmt):
"""Non-zip path: batch events through a JIT inner loop; one write per ~_BATCH_ROWS rows."""
header_size = FootprintHeader.size
item_size = Event_dtype.itemsize
# View footprint data (past header) as Event records — zero-copy
footprint_events = footprint[header_size:].view(Event_dtype)
fp_areaperil = footprint_events['areaperil_id']
fp_intensity = footprint_events['intensity_bin_id']
fp_probability = footprint_events['probability']
# Pre-compute element-level offsets and counts once (numpy vectorised)
elem_offsets = (sorted_index['offset'].astype(np.int64) - header_size) // item_size
event_elem_counts = sorted_index['size'].astype(np.int64) // item_size
# Cap to actual data size so small files don't over-allocate
actual_batch_rows = min(_BATCH_ROWS, int(event_elem_counts.sum()))
batch_data = np.empty(actual_batch_rows, dtype=dtype)
event_cursor = 0
n_events = len(sorted_index)
while event_cursor < n_events:
n_rows, consumed = _fill_next_batch(
batch_data['event_id'], batch_data['areaperil_id'],
batch_data['intensity_bin_id'], batch_data['probability'],
sorted_index['event_id'], elem_offsets, event_elem_counts,
fp_areaperil, fp_intensity, fp_probability,
event_cursor,
)
if consumed == 0:
# Single event exceeds _BATCH_ROWS; allocate exactly and write directly
n = int(event_elem_counts[event_cursor])
es = int(elem_offsets[event_cursor])
large_buf = np.empty(n, dtype=dtype)
large_buf['event_id'] = int(sorted_index['event_id'][event_cursor])
large_buf['areaperil_id'] = fp_areaperil[es:es + n]
large_buf['intensity_bin_id'] = fp_intensity[es:es + n]
large_buf['probability'] = fp_probability[es:es + n]
write_ndarray_to_fmt_csv(file_out, large_buf, headers, fmt)
consumed = 1
else:
write_ndarray_to_fmt_csv(file_out, batch_data[:n_rows], headers, fmt)
event_cursor += consumed
def _footprint_tocsv_zip(footprint, sorted_index, file_out, dtype, headers, fmt):
"""Zip path: decompress per event; numpy field assignment into pre-allocated buffer.
When the index carries a 'd_size' (decompressed size) field we can pre-allocate
a single reusable buffer. Without it the compressed size is smaller than the
decompressed size, so we allocate per event.
"""
if 'd_size' in sorted_index.dtype.names:
max_event_elems = int(sorted_index['d_size'].max()) // Event_dtype.itemsize
event_csv_data = np.empty(max_event_elems, dtype=dtype)
else:
event_csv_data = None
for row in sorted_index:
event_id = int(row['event_id'])
offset = int(row['offset'])
compressed_size = int(row['size'])
event_data = np.frombuffer(
zlib.decompress(footprint[offset:offset + compressed_size].tobytes()),
dtype=Event_dtype,
)
n = len(event_data)
buf = event_csv_data[:n] if event_csv_data is not None else np.empty(n, dtype=dtype)
buf['event_id'] = event_id
buf['areaperil_id'] = event_data['areaperil_id']
buf['intensity_bin_id'] = event_data['intensity_bin_id']
buf['probability'] = event_data['probability']
write_ndarray_to_fmt_csv(file_out, buf, headers, fmt)