Source code for oasislmf.pytools.converters.bintocsv.utils.cdf

# CDF binary → CSV converter.
#
# Input is a getmodel binary stream (CDF_STREAM_ID) piped or read from file.
# read_getmodel_stream (Numba JIT in gul/io.py) parses the stream and yields
# one tuple per event: (event_id, damagecdfrecs, recs, rec_idx_ptr).
# damagecdfrecs is a structured array of (areaperil_id, vulnerability_id) per
# CDF group; recs is a flat ProbMean array; rec_idx_ptr is a numba.typed.List
# of start indices into recs for each group (length = n_groups + 1).
#
# Output paths:
#
#   Normal events (n_rows <= _BATCH_ROWS):
#     A JIT inner loop (_fill_cdf_batch) writes one event's rows directly into
#     a pre-allocated output buffer (batch_data, _BATCH_ROWS rows), starting at
#     the current write position.  The buffer is flushed to
#     write_ndarray_to_fmt_csv in one call per _BATCH_ROWS rows, amortising
#     per-call overhead across multiple events.  rec_idx_ptr is passed directly
#     as a numba.typed.List without conversion.
#
#   Oversized events (n_rows > _BATCH_ROWS):
#     An exact-size buffer is allocated and written in one call, then freed.

import logging
import numba as nb
import numpy as np
from pathlib import Path

from oasislmf.pytools.common.data import resolve_file, write_ndarray_to_fmt_csv, items_dtype
from oasislmf.pytools.common.input_files import read_coverages
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.gul.common import coverage_type
from oasislmf.pytools.gul.manager import generate_item_map, gul_get_items, read_getmodel_stream

[docs] logger = logging.getLogger(__name__)
_BATCH_ROWS = 1 << 13 # 8 K rows @nb.njit(cache=True, error_model="numpy") def _fill_cdf_batch( out_eid, out_apid, out_vid, out_binidx, out_prob_to, out_bin_mean, event_id, damagecdfrecs, recs, rec_idx_ptr, out_start, ): """Fill output column arrays with one event's rows starting at *out_start*. Returns the new write position (out_start + rows_written). """ n_groups = len(rec_idx_ptr) - 1 out_idx = out_start for g in range(n_groups): ap_id = damagecdfrecs[g]['areaperil_id'] vuln_id = damagecdfrecs[g]['vulnerability_id'] s = rec_idx_ptr[g] e = rec_idx_ptr[g + 1] for j in range(s, e): out_eid[out_idx] = event_id out_apid[out_idx] = ap_id out_vid[out_idx] = vuln_id out_binidx[out_idx] = j - s + 1 out_prob_to[out_idx] = recs[j]['prob_to'] out_bin_mean[out_idx] = recs[j]['bin_mean'] out_idx += 1 return out_idx
[docs] def cdf_tocsv(stack, file_in, file_out, file_type, noheader, run_dir): headers = TOOL_INFO[file_type]["headers"] dtype = TOOL_INFO[file_type]["dtype"] fmt = TOOL_INFO[file_type]["fmt"] input_path = Path(run_dir, 'input') file_in = resolve_file(file_in, "rb", stack) if not noheader: file_out.write(",".join(headers) + "\n") items = gul_get_items(input_path) items = np.sort(items, order=['areaperil_id', 'vulnerability_id']) coverages_tiv = read_coverages(input_path) coverages = np.zeros(coverages_tiv.shape[0] + 1, coverage_type) coverages[1:]['tiv'] = coverages_tiv item_map_hm, item_map_hm_keys, item_map_ja_offsets = generate_item_map(items, coverages) del coverages_tiv compute = np.zeros(coverages.shape[0] + 1, items.dtype['coverage_id']) seeds = np.zeros(len(np.unique(items['group_id'])), dtype=items_dtype['group_id']) batch_data = np.empty(_BATCH_ROWS, dtype=dtype) batch_pos = 0 for event_data in read_getmodel_stream(file_in, items, item_map_hm, item_map_hm_keys, item_map_ja_offsets, coverages, compute, seeds): event_id, compute_i, items_data, damagecdfrecs, recs, rec_idx_ptr, rng_index = event_data n_rows = len(recs) if batch_pos + n_rows > _BATCH_ROWS: if batch_pos > 0: write_ndarray_to_fmt_csv(file_out, batch_data[:batch_pos], headers, fmt) batch_pos = 0 if n_rows > _BATCH_ROWS: # Single event exceeds batch; allocate exactly and write directly large_buf = np.empty(n_rows, dtype=dtype) _fill_cdf_batch( large_buf['event_id'], large_buf['areaperil_id'], large_buf['vulnerability_id'], large_buf['bin_index'], large_buf['prob_to'], large_buf['bin_mean'], event_id, damagecdfrecs, recs, rec_idx_ptr, 0, ) write_ndarray_to_fmt_csv(file_out, large_buf, headers, fmt) continue batch_pos = _fill_cdf_batch( batch_data['event_id'], batch_data['areaperil_id'], batch_data['vulnerability_id'], batch_data['bin_index'], batch_data['prob_to'], batch_data['bin_mean'], event_id, damagecdfrecs, recs, rec_idx_ptr, batch_pos, ) if batch_pos > 0: write_ndarray_to_fmt_csv(file_out, batch_data[:batch_pos], headers, fmt)