Source code for oasislmf.pytools.converters.bintocsv.utils.lossfactors

# Loss-factors binary → CSV converter.
#
# The binary is memory-mapped (np.memmap) so only pages that are read are loaded —
# memory usage is proportional to the events being output, not file size.
# The format has no index: events are stored sequentially as
# [event_id i4][count i4][amplification_id i4, factor f4 × count].
# Events are iterated in one pass; their rows are accumulated into a pre-allocated
# output buffer (_BATCH_ROWS rows) that is flushed to write_ndarray_to_fmt_csv in
# one call per batch. Single events that exceed the buffer are written directly.

import sys
import numpy as np

from oasislmf.pytools.common.data import resolve_file, write_ndarray_to_fmt_csv
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.pla.common import amp_factor_dtype, event_count_dtype

_BATCH_ROWS = 1 << 13  # 8 K rows


[docs] def lossfactors_tocsv(stack, file_in, file_out, file_type, noheader): headers = TOOL_INFO[file_type]["headers"] dtype = TOOL_INFO[file_type]["dtype"] fmt = TOOL_INFO[file_type]["fmt"] file_in = resolve_file(file_in, "rb", stack) if file_in == sys.stdin.buffer: raw = np.frombuffer(file_in.read(), dtype='u1') else: raw = np.memmap(file_in, dtype='u1', mode='r') if not noheader: file_out.write(",".join(headers) + "\n") batch_data = np.empty(_BATCH_ROWS, dtype=dtype) batch_pos = 0 pos = 4 # skip opts header (int32) while pos + event_count_dtype.itemsize <= len(raw): hdr = raw[pos:pos + event_count_dtype.itemsize].view(event_count_dtype)[0] event_id = int(hdr['event_id']) count = int(hdr['count']) pos += event_count_dtype.itemsize body = raw[pos:pos + count * amp_factor_dtype.itemsize].view(amp_factor_dtype) pos += count * amp_factor_dtype.itemsize if batch_pos + count > _BATCH_ROWS: if batch_pos > 0: write_ndarray_to_fmt_csv(file_out, batch_data[:batch_pos], headers, fmt) batch_pos = 0 if count > _BATCH_ROWS: large_buf = np.empty(count, dtype=dtype) large_buf['event_id'] = event_id large_buf['amplification_id'] = body['amplification_id'] large_buf['factor'] = body['factor'] write_ndarray_to_fmt_csv(file_out, large_buf, headers, fmt) continue batch_data['event_id'][batch_pos:batch_pos + count] = event_id batch_data['amplification_id'][batch_pos:batch_pos + count] = body['amplification_id'] batch_data['factor'][batch_pos:batch_pos + count] = body['factor'] batch_pos += count if batch_pos > 0: write_ndarray_to_fmt_csv(file_out, batch_data[:batch_pos], headers, fmt)