import logging
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pyarrow as pa
from oasislmf.pytools.common.data import oasis_float, periods_dtype, write_ndarray_to_fmt_csv
from oasislmf.pytools.lec.aggreports.outputs.full_uncertainty import output_full_uncertainty
from oasislmf.pytools.lec.aggreports.outputs.mean_damage_ratio import output_mean_damage_ratio
from oasislmf.pytools.lec.aggreports.outputs.sample_mean import output_sample_mean, reorder_losses_by_summary_and_period
from oasislmf.pytools.lec.aggreports.outputs.wheatsheaf import fill_wheatsheaf_items
from oasislmf.pytools.lec.aggreports.outputs.wheatsheaf_mean import fill_wheatsheaf_mean_items, get_wheatsheaf_max_count
from oasislmf.pytools.lec.aggreports.write_tables import write_ept, write_ept_weighted, write_psept, write_psept_weighted, write_wheatsheaf_mean
from oasislmf.pytools.lec.data import (
AEP, AEPTVAR, AGG_FULL_UNCERTAINTY, AGG_SAMPLE_MEAN, AGG_WHEATSHEAF, AGG_WHEATSHEAF_MEAN,
FULL, MEANDR, MEANSAMPLE, OCC_FULL_UNCERTAINTY, OCC_SAMPLE_MEAN, OCC_WHEATSHEAF,
OCC_WHEATSHEAF_MEAN, OEP, OEPTVAR, PERSAMPLEMEAN,
)
from oasislmf.pytools.lec.data import LOSSVEC2MAP_dtype, MEANMAP_dtype, WHEATKEYITEMS_dtype, EPT_dtype, PSEPT_dtype
[docs]
logger = logging.getLogger(__name__)
@dataclass
[docs]
class LecConfig:
"""Shared per-run configuration passed to output helpers and output_for_summary_idx."""
[docs]
period_weights: np.ndarray
[docs]
use_return_period: bool
[docs]
returnperiods: np.ndarray
# Reusable output buffers, allocated once per run and shared across all generator
# calls (the write_* generators fully overwrite each row before yielding, so no
# re-zeroing is needed). Avoids a ~19 MB allocation on every per-summary call.
[docs]
ept_buffer: np.ndarray # must have dtype EPT_dtype
[docs]
psept_buffer: np.ndarray # must have dtype PSEPT_dtype
def __post_init__(self):
if self.period_weights.dtype != periods_dtype:
raise TypeError(f"period_weights dtype must be {periods_dtype}, got {self.period_weights.dtype}")
if self.returnperiods is not None and self.returnperiods.dtype != np.dtype('i4'):
raise TypeError(f"returnperiods dtype must be int32, got {self.returnperiods.dtype}")
if self.ept_buffer.dtype != EPT_dtype:
raise TypeError(f"ept_buffer dtype must be {EPT_dtype}, got {self.ept_buffer.dtype}")
if self.psept_buffer.dtype != PSEPT_dtype:
raise TypeError(f"psept_buffer dtype must be {PSEPT_dtype}, got {self.psept_buffer.dtype}")
[docs]
def make_output_fn(outmap, output_binary, output_parquet):
"""Return a callable(data, out_type) that writes data in the correct output format."""
def output_data(data, out_type):
if output_binary:
data.tofile(outmap[out_type]["file"])
elif output_parquet:
arrays = [pa.array(data[name]) for name in data.dtype.names]
data_table = pa.Table.from_arrays(arrays, schema=outmap[out_type]["schema"])
outmap[out_type]["file"].write_table(data_table)
else:
write_ndarray_to_fmt_csv(
outmap[out_type]["file"],
data,
outmap[out_type]["headers"],
outmap[out_type]["fmt"]
)
return output_data
# ---------------------------------------------------------------------------
# Shared compute-and-write helpers — arrays pre-allocated by caller
# (memmap for sequential, np.zeros for idx). alloc_* callables handle late-sized arrays.
# ---------------------------------------------------------------------------
def _write_mean_damage_ratio(
items, items_start_end, row_used_indices_mean, outloss_mean_vals,
config, eptype, eptype_tvar, output_fn,
):
has_weights, used_period_no = output_mean_damage_ratio(
items, items_start_end, row_used_indices_mean,
outloss_mean_vals, config.period_weights, config.max_summary_id,
)
unused_pw = config.period_weights[~used_period_no]
if has_weights:
gen = write_ept_weighted(
config.ept_buffer,
items, items_start_end, config.sample_size, MEANDR,
eptype, eptype_tvar, unused_pw,
config.use_return_period, config.returnperiods, config.max_summary_id,
)
else:
gen = write_ept(
config.ept_buffer,
items, items_start_end, config.no_of_periods, MEANDR,
eptype, eptype_tvar,
config.use_return_period, config.returnperiods, config.max_summary_id,
)
for data in gen:
output_fn(data, "ept")
def _write_full_uncertainty(
items, items_start_end, row_used_indices_sample, outloss_sample_vals,
config, eptype, eptype_tvar, output_fn,
):
has_weights, used_period_no = output_full_uncertainty(
items, items_start_end, row_used_indices_sample,
outloss_sample_vals, config.period_weights, config.max_summary_id, config.num_sidxs,
)
unused_pw = config.period_weights[~used_period_no]
if has_weights:
gen = write_ept_weighted(
config.ept_buffer,
items, items_start_end, 1, FULL,
eptype, eptype_tvar, unused_pw,
config.use_return_period, config.returnperiods, config.max_summary_id,
)
else:
gen = write_ept(
config.ept_buffer,
items, items_start_end, config.no_of_periods * config.sample_size, FULL,
eptype, eptype_tvar,
config.use_return_period, config.returnperiods, config.max_summary_id,
)
for data in gen:
output_fn(data, "ept")
def _write_wheatsheaf(
wheatsheaf_items, wheatsheaf_items_start_end, mean_map,
row_used_indices_sample, outloss_sample_vals,
config, eptype, eptype_tvar, do_wheat, do_wheat_mean, alloc_wm_items, output_fn,
):
"""alloc_wm_items: no-weights branch only; np.zeros for idx, memmap lambda for seq."""
has_weights, used_period_no = fill_wheatsheaf_items(
wheatsheaf_items, wheatsheaf_items_start_end, row_used_indices_sample,
outloss_sample_vals, config.period_weights, config.max_summary_id, config.num_sidxs,
)
unused_pw = config.period_weights[~used_period_no]
if has_weights:
if do_wheat:
gen = write_psept_weighted(
config.psept_buffer,
wheatsheaf_items, wheatsheaf_items_start_end, config.no_of_periods,
eptype, eptype_tvar, unused_pw,
config.use_return_period, config.returnperiods,
config.max_summary_id, config.num_sidxs, config.sample_size, mean_map=mean_map,
)
for data in gen:
output_fn(data, "psept")
if do_wheat_mean and mean_map is not None:
gen = write_wheatsheaf_mean(config.ept_buffer, mean_map, eptype, PERSAMPLEMEAN, config.max_summary_id)
for data in gen:
output_fn(data, "ept")
else:
if do_wheat:
gen = write_psept(
config.psept_buffer,
wheatsheaf_items, wheatsheaf_items_start_end, config.no_of_periods,
eptype, eptype_tvar,
config.use_return_period, config.returnperiods,
config.max_summary_id, config.num_sidxs,
)
for data in gen:
output_fn(data, "psept")
if do_wheat_mean:
maxcounts = get_wheatsheaf_max_count(
wheatsheaf_items, wheatsheaf_items_start_end, config.max_summary_id)
if np.any(maxcounts != -1):
wm_items = alloc_wm_items(np.sum(maxcounts[maxcounts != -1]))
wm_items_start_end = fill_wheatsheaf_mean_items(
wm_items, wheatsheaf_items, wheatsheaf_items_start_end,
maxcounts, config.max_summary_id, config.num_sidxs,
)
gen = write_ept(
config.ept_buffer,
wm_items, wm_items_start_end, config.no_of_periods,
PERSAMPLEMEAN, eptype, eptype_tvar,
config.use_return_period, config.returnperiods, config.max_summary_id,
sample_size=config.sample_size,
)
for data in gen:
output_fn(data, "ept")
def _write_sample_mean(
reordered_outlosses, row_used_indices_sample, outloss_sample_vals,
config, eptype, eptype_tvar, alloc_items, output_fn,
):
"""reordered_outlosses shape: no_of_periods * max_summary_id. alloc_items size is post-computation."""
reorder_losses_by_summary_and_period(
reordered_outlosses, row_used_indices_sample, outloss_sample_vals,
config.max_summary_id, config.no_of_periods, config.num_sidxs, config.sample_size,
)
row_used_ro = np.flatnonzero(reordered_outlosses["row_used"])
items = alloc_items(len(row_used_ro))
items_start_end = np.full((config.max_summary_id, 2), -1, dtype=np.int32)
has_weights, used_period_no = output_sample_mean(
items, items_start_end, row_used_ro, reordered_outlosses["value"],
config.period_weights, config.max_summary_id, config.no_of_periods,
)
unused_pw = config.period_weights[~used_period_no]
if has_weights:
gen = write_ept_weighted(
config.ept_buffer,
items, items_start_end, config.sample_size, MEANSAMPLE,
eptype, eptype_tvar, unused_pw,
config.use_return_period, config.returnperiods, config.max_summary_id,
)
else:
gen = write_ept(
config.ept_buffer,
items, items_start_end, config.no_of_periods, MEANSAMPLE,
eptype, eptype_tvar,
config.use_return_period, config.returnperiods, config.max_summary_id,
)
for data in gen:
output_fn(data, "ept")
# ---------------------------------------------------------------------------
# Sequential path — AggReports
# Allocates memmaps, calls the shared helpers above.
# ---------------------------------------------------------------------------
[docs]
class AggReports():
def __init__(
self,
outmap,
outloss_mean,
row_used_mean,
outloss_sample,
row_used_sample,
config,
lec_files_folder,
output_binary,
output_parquet,
):
[docs]
self.outloss_mean = outloss_mean
[docs]
self.outloss_sample = outloss_sample
[docs]
self.lec_files_folder = lec_files_folder
[docs]
self.row_used_indices_mean = np.flatnonzero(row_used_mean)
[docs]
self.row_used_indices_sample = np.flatnonzero(row_used_sample)
[docs]
self.output_data = make_output_fn(outmap, output_binary, output_parquet)
[docs]
def output_mean_damage_ratio(self, eptype, eptype_tvar, outloss_type):
"""Output Mean Damage Ratio
Mean Damage Losses - This means do the loss calculation for a year using the event mean
damage loss computed by numerical integration of the effective damageability distributions.
Args:
eptype (int): Exceedance Probability Type
eptype_tvar (int): Exceedance Probability Type (Tail Value at Risk)
outloss_type (string): Which loss to output
"""
if outloss_type not in ("agg_out_loss", "max_out_loss"):
raise ValueError(f"Error: Unknown outloss_type: {outloss_type}")
outloss_vals = self.outloss_mean[outloss_type]
row_used_indices = self.row_used_indices_mean
items = np.memmap(
Path(self.lec_files_folder, f"lec_mean_damage_ratio-{outloss_type}-items.bdat"),
dtype=LOSSVEC2MAP_dtype, mode="w+", shape=(len(row_used_indices),),
)
items_start_end = np.full((self.config.max_summary_id, 2), -1, dtype=np.int32)
_write_mean_damage_ratio(
items, items_start_end, row_used_indices, outloss_vals,
self.config, eptype, eptype_tvar, self.output_data,
)
[docs]
def output_full_uncertainty(self, eptype, eptype_tvar, outloss_type):
"""Output Full Uncertainty
Full Uncertainty – this means do the calculation across all samples (treating the samples
effectively as repeat years) - this is the most accurate of all the single EP Curves.
Args:
eptype (int): Exceedance Probability Type
eptype_tvar (int): Exceedance Probability Type (Tail Value at Risk)
outloss_type (string): Which loss to output
"""
if outloss_type not in ("agg_out_loss", "max_out_loss"):
raise ValueError(f"Error: Unknown outloss_type: {outloss_type}")
outloss_vals = self.outloss_sample[outloss_type]
row_used_indices = self.row_used_indices_sample
items = np.memmap(
Path(self.lec_files_folder, f"lec_full_uncertainty-{outloss_type}-items.bdat"),
dtype=LOSSVEC2MAP_dtype, mode="w+", shape=(len(row_used_indices),),
)
items_start_end = np.full((self.config.max_summary_id, 2), -1, dtype=np.int32)
_write_full_uncertainty(
items, items_start_end, row_used_indices, outloss_vals,
self.config, eptype, eptype_tvar, self.output_data,
)
[docs]
def output_wheatsheaf_and_wheatsheafmean(self, eptype, eptype_tvar, outloss_type, output_wheatsheaf, output_wheatsheaf_mean):
"""Output Wheatsheaf and Wheatsheaf Mean
Wheatsheaf, Per Sample EPT (PSEPT) – this means calculate the EP Curve for each sample and
leave it at the sample level of detail, resulting in multiple "curves".
Wheatsheaf Mean, Per Sample mean EPT – this means average the loss at each return period of
the Per Sample EPT.
Args:
eptype (int): Exceedance Probability Type
eptype_tvar (int): Exceedance Probability Type (Tail Value at Risk)
outloss_type (string): Which loss to output
output_wheatsheaf (bool): Bool to Output Wheatsheaf
output_wheatsheaf_mean (bool): Bool to Output Wheatsheaf Mean
"""
if outloss_type not in ("agg_out_loss", "max_out_loss"):
raise ValueError(f"Error: Unknown outloss_type: {outloss_type}")
outloss_vals = self.outloss_sample[outloss_type]
row_used_indices = self.row_used_indices_sample
wh_items = np.memmap(
Path(self.lec_files_folder, f"lec_wheatsheaf-items-{outloss_type}.bdat"),
dtype=WHEATKEYITEMS_dtype, mode="w+", shape=(len(row_used_indices),),
)
wh_items_start_end = np.full(
(self.config.max_summary_id * self.config.num_sidxs, 2), -1, dtype=np.int32,
)
mean_map = None
if output_wheatsheaf_mean:
mean_map = np.memmap(
Path(self.lec_files_folder, f"lec_wheatsheaf_mean-map-{outloss_type}.bdat"),
dtype=MEANMAP_dtype, mode="w+",
shape=(self.config.max_summary_id, len(self.config.returnperiods)),
)
wm_items_path = Path(self.lec_files_folder, f"lec_wheatsheaf_mean-items-{outloss_type}.bdat")
def alloc_wm_items(size): return np.memmap(
wm_items_path, dtype=LOSSVEC2MAP_dtype, mode="w+", shape=(size,)
)
_write_wheatsheaf(
wh_items, wh_items_start_end, mean_map, row_used_indices, outloss_vals,
self.config, eptype, eptype_tvar, output_wheatsheaf, output_wheatsheaf_mean,
alloc_wm_items, self.output_data,
)
[docs]
def output_sample_mean(self, eptype, eptype_tvar, outloss_type):
"""Output Sample Mean
Sample Mean Losses – this means do the loss calculation for a year using the statistical
sample event mean.
Args:
eptype (int): Exceedance Probability Type
eptype_tvar (int): Exceedance Probability Type (Tail Value at Risk)
outloss_type (string): Which loss to output
"""
if self.config.sample_size == 0:
logger.warning("aggreports.output_sample_mean, self.sample_size is 0, not outputting any sample mean")
return
if outloss_type not in ("agg_out_loss", "max_out_loss"):
raise ValueError(f"Error: Unknown outloss_type: {outloss_type}")
outloss_vals = self.outloss_sample[outloss_type]
reordered_outlosses = np.memmap(
Path(self.lec_files_folder, f"lec_sample_mean-reordered_outlosses-{outloss_type}.bdat"),
dtype=np.dtype([("row_used", np.bool_), ("value", oasis_float)]),
mode="w+",
shape=(self.config.no_of_periods * self.config.max_summary_id),
)
items_path = Path(self.lec_files_folder, f"lec_sample_mean-{outloss_type}-items.bdat")
def alloc_items(size): return np.memmap(
items_path, dtype=LOSSVEC2MAP_dtype, mode="w+", shape=(size,)
)
_write_sample_mean(
reordered_outlosses, self.row_used_indices_sample, outloss_vals,
self.config, eptype, eptype_tvar, alloc_items, self.output_data,
)
# ---------------------------------------------------------------------------
# Idx path — output_for_summary_idx
# Allocates small np.zeros arrays (single summary at a time), calls the same helpers.
# ---------------------------------------------------------------------------
[docs]
def output_for_summary_idx(
summary_id,
outloss_mean_s,
row_used_mean_s,
outloss_sample_s,
row_used_sample_s,
output_flags,
hasOCC,
hasAGG,
outmap,
config,
output_fn,
):
"""Output all report types for a single summary_id (idx path).
Called once per summary_id. config.max_summary_id must be 1; arrays are sized
for a single summary. Write generators emit SummaryId=1, corrected here before each write.
"""
row_used_indices_mean = np.flatnonzero(row_used_mean_s)
row_used_indices_sample = np.flatnonzero(row_used_sample_s)
if not len(row_used_indices_mean) and not len(row_used_indices_sample):
return
def _out(data, out_type):
if len(data):
data["SummaryId"] = summary_id
output_fn(data, out_type)
if outmap["ept"]["compute"] and (hasOCC or hasAGG):
for eptype, eptype_tvar, outloss_type, wanted in (
(OEP, OEPTVAR, "max_out_loss", hasOCC),
(AEP, AEPTVAR, "agg_out_loss", hasAGG),
):
if not wanted:
continue
items = np.zeros(len(row_used_indices_mean), dtype=LOSSVEC2MAP_dtype)
items_start_end = np.full((1, 2), -1, dtype=np.int32)
_write_mean_damage_ratio(
items, items_start_end, row_used_indices_mean,
outloss_mean_s[outloss_type], config, eptype, eptype_tvar, _out,
)
for flag, eptype, eptype_tvar, outloss_type in (
(OCC_FULL_UNCERTAINTY, OEP, OEPTVAR, "max_out_loss"),
(AGG_FULL_UNCERTAINTY, AEP, AEPTVAR, "agg_out_loss"),
):
if not output_flags[flag]:
continue
items = np.zeros(len(row_used_indices_sample), dtype=LOSSVEC2MAP_dtype)
items_start_end = np.full((1, 2), -1, dtype=np.int32)
_write_full_uncertainty(
items, items_start_end, row_used_indices_sample,
outloss_sample_s[outloss_type], config, eptype, eptype_tvar, _out,
)
for flag_w, flag_wm, eptype, eptype_tvar, outloss_type in (
(OCC_WHEATSHEAF, OCC_WHEATSHEAF_MEAN, OEP, OEPTVAR, "max_out_loss"),
(AGG_WHEATSHEAF, AGG_WHEATSHEAF_MEAN, AEP, AEPTVAR, "agg_out_loss"),
):
do_wheat = output_flags[flag_w]
do_wheat_mean = output_flags[flag_wm]
if not (do_wheat or do_wheat_mean):
continue
wheatsheaf_items = np.zeros(len(row_used_indices_sample), dtype=WHEATKEYITEMS_dtype)
wheatsheaf_items_start_end = np.full((config.num_sidxs, 2), -1, dtype=np.int32)
mean_map = np.zeros((1, len(config.returnperiods)), dtype=MEANMAP_dtype) if do_wheat_mean else None
_write_wheatsheaf(
wheatsheaf_items, wheatsheaf_items_start_end, mean_map,
row_used_indices_sample, outloss_sample_s[outloss_type],
config, eptype, eptype_tvar, do_wheat, do_wheat_mean,
lambda size: np.zeros(size, dtype=LOSSVEC2MAP_dtype),
_out,
)
if config.sample_size == 0:
return
for flag, eptype, eptype_tvar, outloss_type in (
(OCC_SAMPLE_MEAN, OEP, OEPTVAR, "max_out_loss"),
(AGG_SAMPLE_MEAN, AEP, AEPTVAR, "agg_out_loss"),
):
if not output_flags[flag]:
continue
reordered = np.zeros(
config.no_of_periods,
dtype=np.dtype([("row_used", np.bool_), ("value", oasis_float)]),
)
_write_sample_mean(
reordered, row_used_indices_sample, outloss_sample_s[outloss_type],
config, eptype, eptype_tvar,
lambda size: np.zeros(size, dtype=LOSSVEC2MAP_dtype),
_out,
)