Source code for oasislmf.pytools.lec.aggreports.write_tables

import numba as nb
import numpy as np

from oasislmf.pytools.common.data import nb_oasis_int
from oasislmf.pytools.lec.data import (EPT_dtype, PSEPT_dtype, TAIL_valtype, NB_TAIL_valtype)
from oasislmf.pytools.lec.utils import (create_empty_array, get_wheatsheaf_items_idx, get_wheatsheaf_items_idx_data, resize_array)


@nb.njit(cache=True, error_model="numpy")
[docs] def get_loss( next_retperiod, last_retperiod, last_loss, curr_retperiod, curr_loss ): """Get loss based on current and next return period Args: next_retperiod (float): Next return period last_retperiod (float): Previous return period last_loss (float): Previous Loss value curr_retperiod (float): Current return period curr_loss (float): Current Loss value Returns: loss (float): Loss Value """ if curr_retperiod == 0: return 0 if curr_loss == 0: return 0 if curr_retperiod == next_retperiod: return curr_loss if curr_retperiod < next_retperiod: # Linear Interpolate return ( (next_retperiod - curr_retperiod) * (last_loss - curr_loss) / (last_retperiod - curr_retperiod) + curr_loss ) return -1
@nb.njit(cache=True, error_model="numpy")
[docs] def fill_tvar( tail, tail_sizes, summary_id, next_retperiod, tvar ): """Populate the Tail with retperiod and tvar values for summary_id Args: tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of Summary ID to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of Summary ID to size of each tail array summary_id (int): Summary ID next_retperiod (float): Next Return Period tvar (float): Tail Value at Risk Returns: tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of summary_id to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of summary_id to size of each tail array """ if summary_id not in tail: tail[summary_id] = create_empty_array(TAIL_valtype) tail_sizes[summary_id] = 0 tail_arr = tail[summary_id] tail_arr = resize_array(tail_arr, tail_sizes[summary_id]) tail_current_size = tail_sizes[summary_id] tail_arr[tail_current_size]["retperiod"] = next_retperiod tail_arr[tail_current_size]["tvar"] = tvar tail[summary_id] = tail_arr tail_sizes[summary_id] += 1 return tail, tail_sizes
@nb.njit(cache=True, error_model="numpy")
[docs] def fill_tvar_wheatsheaf( tail, tail_sizes, summary_id, sidx, num_sidxs, next_retperiod, tvar ): """Populate the Tail with retperiod and tvar values for (summary_id, sidx) pair Args: tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of (summary_id, sidx) pair to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of (summary_id, sidx) pair to size of each tail array summary_id (int): Summary ID sidx (int): Sample ID num_sidxs (int): Number of sidxs to consider next_retperiod (float): Next Return Period tvar (float): Tail Value at Risk Returns: tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of (summary_id, sidx) pair to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of (summary_id, sidx) pair to size of each tail array """ idx = get_wheatsheaf_items_idx(summary_id, sidx, num_sidxs) if idx not in tail: tail[idx] = create_empty_array(TAIL_valtype) tail_sizes[idx] = 0 tail_arr = tail[idx] tail_arr = resize_array(tail_arr, tail_sizes[idx]) tail_current_size = tail_sizes[idx] tail_arr[tail_current_size]["retperiod"] = next_retperiod tail_arr[tail_current_size]["tvar"] = tvar tail[idx] = tail_arr tail_sizes[idx] += 1 return tail, tail_sizes
@nb.njit(cache=True, error_model="numpy")
[docs] def write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, curr_retperiod, curr_loss, summary_id, eptype, epcalc, max_retperiod, counter, tvar, tail, tail_sizes, returnperiods, mean_map=None, is_wheatsheaf=False, num_sidxs=-1, ): """Processes return periods and computes losses for a given summary, updating TVaR and mean map if required. Args: next_returnperiod_idx (int): Index of the next return period to process. last_computed_rp (float): Last computed return period last_computed_loss (float): Last computed loss value curr_retperiod (float): Current return period being processed. curr_loss (float): Loss associated with the current return period. summary_id (int): Identifier for the current summary. eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). epcalc (int): Type of exceedance probability calculation. max_retperiod (int): Maximum return period to be used in calculations counter (int): Counter used for updating TVaR tvar (float): Tail Value at Risk tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of summary_id or (summary_id, sidx) pair to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of summary_id or (summary_id, sidx) pair to size of each tail array returnperiods (ndarray[np.int32]): Return Periods array mean_map (ndarray[MEANMAP_dtype], optional): An array mapping used for mean loss calculations per Summary ID. Used for EPT output later. Defaults to None. is_wheatsheaf (bool, optional): If True, update the wheatsheaf TVaR structure. num_sidxs (int, optional): Number of sidxs to consider. Defaults to -1 if not is_wheatsheaf. Returns: rets (list[EPT_dtype]): Return period and Loss EPT data tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of summary_id or (summary_id, sidx) pair to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of summary_id or (summary_id, sidx) pair to size of each tail array last_computed_rp (float): Last computed return period last_computed_loss (float): Last computed loss value """ next_retperiod = 0 rets = [] while True: if next_returnperiod_idx >= len(returnperiods): return rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss next_retperiod = returnperiods[next_returnperiod_idx] if curr_retperiod > next_retperiod: break if max_retperiod < next_retperiod: next_returnperiod_idx += 1 continue loss = get_loss( next_retperiod, last_computed_rp, last_computed_loss, curr_retperiod, curr_loss ) rets.append((summary_id, epcalc, eptype, next_retperiod, loss)) if mean_map is not None: mean_map[summary_id - 1][next_returnperiod_idx]["retperiod"] = next_retperiod mean_map[summary_id - 1][next_returnperiod_idx]["mean"] += loss mean_map[summary_id - 1][next_returnperiod_idx]["count"] += 1 if curr_retperiod != 0: tvar = tvar - ((tvar - loss) / counter) if is_wheatsheaf: tail, tail_sizes = fill_tvar_wheatsheaf( tail, tail_sizes, summary_id, epcalc, num_sidxs, next_retperiod, tvar, ) else: tail, tail_sizes = fill_tvar( tail, tail_sizes, summary_id, next_retperiod, tvar, ) next_returnperiod_idx += 1 counter += 1 if curr_retperiod > next_retperiod: break if curr_retperiod > 0: last_computed_rp = curr_retperiod last_computed_loss = curr_loss return rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss
@nb.njit(cache=True, error_model="numpy")
[docs] def write_tvar( epcalc, eptype_tvar, tail, tail_sizes, ): """Get TVaR values for EPT output from tail Args: epcalc (int): Type of exceedance probability calculation. eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of summary_id to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of summary_id pair to size of each tail array Returns: rets (list[EPT_dtype]): Return period and Loss EPT data """ rets = [] for summary_id in sorted(tail.keys()): vals = tail[summary_id][:tail_sizes[summary_id]] for row in vals: rets.append((summary_id, epcalc, eptype_tvar, row["retperiod"], row["tvar"])) return rets
@nb.njit(cache=True, error_model="numpy")
[docs] def write_tvar_wheatsheaf( num_sidxs, eptype_tvar, tail, tail_sizes, ): """Get TVaR values for PSEPT output from tail Args: num_sidxs (int): Number of sidxs to consider. eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). tail (nb.typed.Dict[nb_oasis_int, NB_TAIL_valtype]): Dict of (summary_id, sidx) pair to vector of (return period, tvar) values tail_sizes (nb.typed.Dict[nb_oasis_int, nb.types.int64]): Dict of (summary_id, sidx) pair to size of each tail array Returns: rets (list[PSEPT_dtype]): Return period and Loss PSEPT data """ rets = [] for idx in sorted(tail.keys()): sidx, summary_id = get_wheatsheaf_items_idx_data(idx, num_sidxs) vals = tail[idx][:tail_sizes[idx]] for row in vals: rets.append((summary_id, sidx, eptype_tvar, row["retperiod"], row["tvar"])) return rets
@nb.njit(cache=True, error_model="numpy")
[docs] def write_ept( items, items_start_end, max_retperiod, epcalc, eptype, eptype_tvar, use_return_period, returnperiods, max_summary_id, sample_size=1 ): """Generate Loss Exceedance Curve values and Tail Value at Risk values based on items and epcalc/eptype/eptype_tvar The loss calculation follows these principles: - For Aggregate Loss Exceedance Curves (AEP): The sum of all losses within a period is calculated. - For Occurrence Loss Exceedance Curves (OEP): The maximum loss within a period is taken. - TVAR (Tail Conditional Expectation): Calculated as the average of losses exceeding a given return period. Args: items (ndarray[LOSSVEC2MAP_dtype]): Array mapping summary_id to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each summary_id in the items array max_retperiod (int): Maximum return period to be used in calculations epcalc (int): Specifies the calculation method (mean damage loss, full uncertainty, per sample mean, sample mean). eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID sample_size (int, optional): Sample Size. Defaults to 1. Yields: buffer (ndarray[EPT_dtype]): Buffered chunks of EPT data """ buffer = np.zeros(1000000, dtype=EPT_dtype) bidx = 0 if len(items) == 0 or sample_size == 0: return tail = nb.typed.Dict.empty(nb_oasis_int, NB_TAIL_valtype) tail_sizes = nb.typed.Dict.empty(nb_oasis_int, nb.types.int64) for summary_id in range(1, max_summary_id + 1): start, end = items_start_end[summary_id - 1] if start == -1: continue filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 for item in sorted_items: value = item["value"] / sample_size retperiod = max_retperiod / i if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - value) / i) else: tvar = tvar - ((tvar - value) / i) if summary_id not in tail: tail[summary_id] = create_empty_array(TAIL_valtype) tail_sizes[summary_id] = 0 tail_arr = tail[summary_id] tail_arr = resize_array(tail_arr, tail_sizes[summary_id]) tail_current_size = tail_sizes[summary_id] tail_arr[tail_current_size]["retperiod"] = retperiod tail_arr[tail_current_size]["tvar"] = tvar tail[summary_id] = tail_arr tail_sizes[summary_id] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["EPCalc"] = epcalc buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: while True: retperiod = max_retperiod / i if returnperiods[next_returnperiod_idx] <= 0: retperiod = 0 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 if next_returnperiod_idx >= len(returnperiods): break rets = write_tvar( epcalc, eptype_tvar, tail, tail_sizes, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_ept_weighted( items, items_start_end, cum_weight_constant, epcalc, eptype, eptype_tvar, unused_period_weights, use_return_period, returnperiods, max_summary_id, sample_size=1 ): """Generate Loss Exceedance Curve values and Tail Value at Risk values based on items and epcalc/eptype/eptype_tvar. This function calculates weighted exceedance probability tables using cumulative period weightings (`period_weighting`), which impact the calculation of return periods. The weighting allows for more accurate representation of losses when event periods have different probabilities or frequencies of occurrence. The loss calculation follows these principles: - For Aggregate Loss Exceedance Curves (AEP): The sum of all losses within a period is calculated. - For Occurrence Loss Exceedance Curves (OEP): The maximum loss within a period is taken. - TVAR (Tail Conditional Expectation): Calculated as the average of losses exceeding a given return period. Args: items (ndarray[LOSSVEC2MAP_dtype]): Array mapping summary_id to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each summary_id in the items array cum_weight_constant (float): Constant factor for scaling cumulative period weights. epcalc (int): Specifies the calculation method (mean damage loss, full uncertainty, per sample mean, sample mean). eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). unused_period_weights (ndarray[float]): Array of unused period weights use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID sample_size (int, optional): Sample Size. Defaults to 1. Yields: buffer (ndarray[EPT_dtype]): Buffered chunks of EPT data """ buffer = np.zeros(1000000, dtype=EPT_dtype) bidx = 0 if len(items) == 0 or sample_size == 0: return tail = nb.typed.Dict.empty(nb_oasis_int, NB_TAIL_valtype) tail_sizes = nb.typed.Dict.empty(nb_oasis_int, nb.types.int64) for summary_id in range(1, max_summary_id + 1): start, end = items_start_end[summary_id - 1] if start == -1: continue filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 cumulative_weighting = 0 max_retperiod = 0 largest_loss = False for item in sorted_items: value = item["value"] / sample_size cumulative_weighting += (item["period_weighting"] * cum_weight_constant) retperiod = max_retperiod / i if item["period_weighting"]: retperiod = 1 / cumulative_weighting if not largest_loss: max_retperiod = retperiod + 0.0001 # Add for floating point errors largest_loss = True if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - (value)) / i) else: tvar = tvar - ((tvar - (value)) / i) if summary_id not in tail: tail[summary_id] = create_empty_array(TAIL_valtype) tail_sizes[summary_id] = 0 tail_arr = tail[summary_id] tail_arr = resize_array(tail_arr, tail_sizes[summary_id]) tail_current_size = tail_sizes[summary_id] tail_arr[tail_current_size]["retperiod"] = retperiod tail_arr[tail_current_size]["tvar"] = tvar tail[summary_id] = tail_arr tail_sizes[summary_id] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["EPCalc"] = epcalc buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: unused_pw_idx = 0 while True: retperiod = 0 if unused_pw_idx < len(unused_period_weights): cumulative_weighting += ( unused_period_weights[unused_pw_idx]["weighting"] * cum_weight_constant ) retperiod = 1 / cumulative_weighting unused_pw_idx += 1 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 if next_returnperiod_idx >= len(returnperiods): break rets = write_tvar( epcalc, eptype_tvar, tail, tail_sizes, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_psept( items, items_start_end, max_retperiod, eptype, eptype_tvar, use_return_period, returnperiods, max_summary_id, num_sidxs ): """Generate Per Sample Exceedance Probability Tables (PSEPT) for each individual sample, producing a separate loss exceedance curve for each sample, eptype, eptype_tvar. Args: items (ndarray[WHEATKEYITEMS_dtype]): Array mapping (summary_id, sidx) to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each (summary_id, sidx) pair in the items array max_retperiod (int): Maximum return period to be used in calculations eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID num_sidxs (int): Number of sidxs to consider Yields: buffer (ndarray[PSEPT_dtype]): Buffered chunks of PSEPT data """ buffer = np.zeros(1000000, dtype=PSEPT_dtype) bidx = 0 if len(items) == 0: return tail = nb.typed.Dict.empty(nb_oasis_int, NB_TAIL_valtype) tail_sizes = nb.typed.Dict.empty(nb_oasis_int, nb.types.int64) for idx in range(max_summary_id * num_sidxs): start, end = items_start_end[idx] if start == -1: continue sidx, summary_id = get_wheatsheaf_items_idx_data(idx, num_sidxs) filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 for item in sorted_items: value = item["value"] retperiod = max_retperiod / i if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - value) / i) else: tvar = tvar - ((tvar - value) / i) if idx not in tail: tail[idx] = create_empty_array(TAIL_valtype) tail_sizes[idx] = 0 tail_arr = tail[idx] tail_arr = resize_array(tail_arr, tail_sizes[idx]) tail_current_size = tail_sizes[idx] tail_arr[tail_current_size]["retperiod"] = retperiod tail_arr[tail_current_size]["tvar"] = tvar tail[idx] = tail_arr tail_sizes[idx] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["SampleId"] = sidx buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: while True: retperiod = max_retperiod / i if returnperiods[next_returnperiod_idx] <= 0: retperiod = 0 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 if next_returnperiod_idx >= len(returnperiods): break rets = write_tvar_wheatsheaf( num_sidxs, eptype_tvar, tail, tail_sizes, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_psept_weighted( items, items_start_end, max_retperiod, eptype, eptype_tvar, unused_period_weights, use_return_period, returnperiods, max_summary_id, num_sidxs, sample_size, mean_map=None ): """Generate Per Sample Exceedance Probability Tables (PSEPT) for each individual sample, producing a separate loss exceedance curve for each sample, eptype, eptype_tvar. Args: items (ndarray[WHEATKEYITEMS_dtype]): Array mapping (summary_id, sidx) to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each (summary_id, sidx) pair in the items array max_retperiod (int): Maximum return period to be used in calculations eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). unused_period_weights (ndarray[float]): Array of unused period weights use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID num_sidxs (int): Number of sidxs to consider sample_size (int): Sample Size. Defaults to 1. mean_map (ndarray[MEANMAP_dtype], optional): An array mapping used for mean loss calculations per Summary ID. Used for EPT output later. Defaults to None. Yields: buffer (ndarray[PSEPT_dtype]): Buffered chunks of PSEPT data """ buffer = np.zeros(1000000, dtype=PSEPT_dtype) bidx = 0 if len(items) == 0: return tail = nb.typed.Dict.empty(nb_oasis_int, NB_TAIL_valtype) tail_sizes = nb.typed.Dict.empty(nb_oasis_int, nb.types.int64) for idx in range(max_summary_id * num_sidxs): start, end = items_start_end[idx] if start == -1: continue sidx, summary_id = get_wheatsheaf_items_idx_data(idx, num_sidxs) filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 cumulative_weighting = 0 max_retperiod = 0 largest_loss = False for item in sorted_items: value = item["value"] cumulative_weighting += (item["period_weighting"] * sample_size) retperiod = max_retperiod / i if item["period_weighting"]: retperiod = 1 / cumulative_weighting if not largest_loss: max_retperiod = retperiod + 0.0001 # Add for floating point errors largest_loss = True if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, mean_map=mean_map, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - (value)) / i) else: tvar = tvar - ((tvar - (value)) / i) if idx not in tail: tail[idx] = create_empty_array(TAIL_valtype) tail_sizes[idx] = 0 tail_arr = tail[idx] tail_arr = resize_array(tail_arr, tail_sizes[idx]) tail_current_size = tail_sizes[idx] tail_arr[tail_current_size]["retperiod"] = retperiod tail_arr[tail_current_size]["tvar"] = tvar tail[idx] = tail_arr tail_sizes[idx] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["SampleId"] = sidx buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: unused_pw_idx = 0 while True: retperiod = 0 if unused_pw_idx < len(unused_period_weights): cumulative_weighting += ( unused_period_weights[unused_pw_idx]["weighting"] * sample_size ) retperiod = 1 / cumulative_weighting unused_pw_idx += 1 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, returnperiods, mean_map=mean_map, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 if next_returnperiod_idx >= len(returnperiods): break rets = write_tvar_wheatsheaf( num_sidxs, eptype_tvar, tail, tail_sizes, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_wheatsheaf_mean( mean_map, eptype, epcalc, max_summary_id, ): """Generate Wheatsheaf Mean Exceedance Probability Table (EPT) by averaging losses for each return period from a precomputed mean map. Args: mean_map (ndarray[MEANMAP_dtype]): An array mapping used for mean loss calculations per Summary ID. epcalc (int): Specifies the calculation method (mean damage loss, full uncertainty, per sample mean, sample mean). eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). max_summary_id (int): Maximum summary ID Yields: buffer (ndarray[EPT_dtype]): Buffered chunks of EPT data """ if len(mean_map) == 0: return buffer = np.zeros(1000000, dtype=EPT_dtype) bidx = 0 for summary_id in range(1, max_summary_id + 1): if np.sum(mean_map[summary_id - 1]["count"]) == 0: continue for mc in mean_map[summary_id - 1]: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["EPCalc"] = epcalc buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = mc["retperiod"] buffer[bidx]["Loss"] = mc["mean"] / max(mc["count"], 1) bidx += 1 yield buffer[:bidx]