Source code for oasislmf.pytools.converters.csvtobin.utils.footprint
import zlib
import numpy as np
import pandas as pd
from oasislmf.pytools.common.data import resolve_file
from oasislmf.pytools.converters.csvtobin.utils.common import read_csv_as_ndarray
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.common import Event_dtype, EventIndexBin_dtype, EventIndexBinZ_dtype
from oasislmf.utils.exceptions import OasisException
def _validate(data):
df = pd.DataFrame(data)
# Check probability sums to 1 for each (event_id, areaperil_id) group
prob_sums = df.groupby(["event_id", "areaperil_id"])["probability"].sum()
invalid_sums = prob_sums[~np.isclose(prob_sums, 1, atol=1e-6)]
if not invalid_sums.empty:
error_msg = "\n".join([
f"Group (event_id={idx[0]}, areaperil_id={idx[1]}) has prob sum = {val:.6f}"
for idx, val in invalid_sums.items()
])
raise OasisException(f"Error: Probabilities do not sum to 1 for the following groups: \n{error_msg}")
# Check sorted by event_id, areaperil_id
expected_order = df.sort_values(['event_id', 'areaperil_id']).reset_index(drop=True)
if not df[['event_id', 'areaperil_id']].equals(expected_order[['event_id', 'areaperil_id']]):
unordered_rows = df[['event_id', 'areaperil_id']].ne(expected_order[['event_id', 'areaperil_id']]).any(axis=1)
mismatch_indices = df.index[unordered_rows].tolist()
raise OasisException(f"IDs not in ascending order. First few mismatched indices: \n{df.iloc[mismatch_indices[:10]]}")
# Check intensity bin uniqueness for each (event_id, areaperil_id) group
duplicates = df.duplicated(subset=['event_id', 'areaperil_id', 'intensity_bin_id'], keep=False)
if duplicates.any():
dup_rows = df[duplicates]
error_msg = dup_rows[['event_id', 'areaperil_id', 'intensity_bin_id']].drop_duplicates(keep="last").to_string()
raise OasisException(f"Error: Duplicate intensity bins found: \n{error_msg}")
[docs]
def footprint_tobin(
stack, file_in, file_out, file_type,
idx_file_out,
zip_files,
max_intensity_bin_idx,
no_intensity_uncertainty,
decompressed_size,
no_validation
):
headers = TOOL_INFO[file_type]["headers"]
dtype = TOOL_INFO[file_type]["dtype"]
idx_file_out = resolve_file(idx_file_out, "wb", stack)
data = read_csv_as_ndarray(stack, file_in, headers, dtype)
if not no_validation:
_validate(data)
# Write bin file header
np.array([max_intensity_bin_idx], dtype=np.int32).tofile(file_out)
zip_opts = decompressed_size << 1 | (not no_intensity_uncertainty)
np.array([zip_opts], dtype=np.int32).tofile(file_out)
offset = np.dtype(np.int32).itemsize * 2
unique_events = np.unique(data["event_id"])
for event_id in unique_events:
event_mask = data["event_id"] == event_id
event_data = data[event_mask]
bin_data = np.empty(len(event_data), dtype=Event_dtype)
bin_data["areaperil_id"] = event_data["areaperil_id"]
bin_data["intensity_bin_id"] = event_data["intensity_bin_id"]
bin_data["probability"] = event_data["probability"]
if any(bin_data["intensity_bin_id"] > max_intensity_bin_idx):
raise OasisException(f"Error: Found intensity_bin_idx in data larger than max_intensity_bin_idx: {max_intensity_bin_idx}")
bin_data = bin_data.tobytes()
dsize = len(bin_data)
if zip_files:
bin_data = zlib.compress(bin_data)
file_out.write(bin_data)
size = len(bin_data)
if decompressed_size:
np.array([(event_id, offset, size, dsize)], dtype=EventIndexBinZ_dtype).tofile(idx_file_out)
else:
np.array([(event_id, offset, size)], dtype=EventIndexBin_dtype).tofile(idx_file_out)
offset += size