Source code for oasislmf.pytools.converters.csvtobin.utils.lossfactors
# Loss-factors CSV → binary converter.
#
# The CSV is read in fixed-size chunks (iter_csv_as_ndarray), so memory usage is
# O(chunk_size + max_rows_per_event) regardless of file size. Events spanning chunk
# boundaries are buffered in partial_chunks and flushed once their final rows arrive.
# Input must be sorted by event_id.
# For each complete event an 8-byte header (event_id, count) is written followed by
# the amp_factor body as a contiguous structured-array block.
import numpy as np
from oasislmf.pytools.converters.csvtobin.utils.common import iter_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.pla.common import amp_factor_dtype
def _flush_event(event_id, rows, file_out):
count = len(rows)
np.array([event_id, count], dtype=np.int32).tofile(file_out)
body = np.empty(count, dtype=amp_factor_dtype)
body['amplification_id'] = rows['amplification_id']
body['factor'] = rows['factor']
body.tofile(file_out)
[docs]
def lossfactors_tobin(stack, file_in, file_out, file_type):
dtype = TOOL_INFO[file_type]["dtype"]
np.array([0], dtype=np.int32).tofile(file_out)
partial_event_id = None
partial_chunks = []
for chunk in iter_csv_as_ndarray(stack, file_in, dtype):
if len(chunk) == 0:
continue
event_ids = np.ascontiguousarray(chunk['event_id'])
pos = 0
if partial_event_id is not None:
end = int(np.searchsorted(event_ids, partial_event_id, side='right'))
partial_chunks.append(chunk[:end])
pos = end
if pos == len(chunk):
continue
_flush_event(partial_event_id, np.concatenate(partial_chunks), file_out)
partial_event_id = None
partial_chunks = []
remaining_ids = event_ids[pos:]
if len(remaining_ids) == 0:
continue
changes = np.flatnonzero(np.diff(remaining_ids)) + 1 if len(remaining_ids) > 1 \
else np.empty(0, dtype=np.intp)
rel_starts = np.concatenate([[np.intp(0)], changes])
rel_ends = np.append(changes, [np.intp(len(remaining_ids))])
n_complete = len(rel_starts) - 1
for i in range(n_complete):
s = pos + int(rel_starts[i])
e = pos + int(rel_ends[i])
_flush_event(int(event_ids[s]), chunk[s:e], file_out)
last_start = pos + int(rel_starts[-1])
partial_event_id = int(event_ids[last_start])
partial_chunks = [chunk[last_start:]]
if partial_event_id is not None:
_flush_event(partial_event_id, np.concatenate(partial_chunks), file_out)