Source code for oasislmf.pytools.converters.csvtobin.utils.summarycalc
import numpy as np
from oasislmf.pytools.common.data import oasis_int, oasis_float
from oasislmf.pytools.common.event_stream import SUMMARY_STREAM_ID
from oasislmf.pytools.converters.csvtobin.utils.common import read_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
[docs]
def summarycalc_tobin(stack, file_in, file_out, file_type, max_sample_index, summary_set_id):
headers = TOOL_INFO[file_type]["headers"]
dtype = TOOL_INFO[file_type]["dtype"]
data = read_csv_as_ndarray(stack, file_in, headers, dtype)
stream_agg_type = 1
stream_info = (SUMMARY_STREAM_ID << 24 | stream_agg_type)
# Write stream info byte
np.array([stream_info], dtype="i4").tofile(file_out)
# Write sample len byte
np.array([max_sample_index], dtype="i4").tofile(file_out)
# Write summary set id byte
np.array([summary_set_id], dtype="i4").tofile(file_out)
curr_event_id = -1
curr_summary_id = -1
curr_expval = -1
sidx_losses = []
sidx_loss_dtype = np.dtype([
("sidx", oasis_int),
("loss", oasis_float)]
)
for row in data:
event_id = row["EventId"]
summary_id = row["SummaryId"]
sidx = row["SampleId"]
loss = row["Loss"]
expval = row["ImpactedExposure"]
if (event_id != curr_event_id) or (summary_id != curr_summary_id) or (expval != curr_expval):
if curr_event_id != -1:
sidx_losses.append((0, 0))
np.array([curr_event_id, curr_summary_id], dtype=oasis_int).tofile(file_out)
np.array([curr_expval], dtype=oasis_float).tofile(file_out)
np.array(sidx_losses, dtype=sidx_loss_dtype).tofile(file_out)
curr_event_id = event_id
curr_summary_id = summary_id
curr_expval = expval
sidx_losses = []
if sidx <= max_sample_index:
sidx_losses.append((sidx, loss))
if curr_event_id != -1:
sidx_losses.append((0, 0))
np.array([curr_event_id, curr_summary_id], dtype=oasis_int).tofile(file_out)
np.array([curr_expval], dtype=oasis_float).tofile(file_out)
np.array(sidx_losses, dtype=sidx_loss_dtype).tofile(file_out)