Source code for oasislmf.pytools.lec.aggreports.write_tables

import numba as nb
import numpy as np

from oasislmf.pytools.common.data import DEFAULT_BUFFER_SIZE
from oasislmf.pytools.lec.data import (EPT_dtype, PSEPT_dtype, TAIL_valtype)
from oasislmf.pytools.lec.utils import (get_wheatsheaf_items_idx, get_wheatsheaf_items_idx_data)


@nb.njit(cache=True, error_model="numpy")
[docs] def get_loss( next_retperiod, last_retperiod, last_loss, curr_retperiod, curr_loss ): """Get loss based on current and next return period Args: next_retperiod (float): Next return period last_retperiod (float): Previous return period last_loss (float): Previous Loss value curr_retperiod (float): Current return period curr_loss (float): Current Loss value Returns: loss (float): Loss Value """ if curr_retperiod == 0: return 0 if curr_loss == 0: return 0 if curr_retperiod == next_retperiod: return curr_loss if curr_retperiod < next_retperiod: # Linear Interpolate return ( (next_retperiod - curr_retperiod) * (last_loss - curr_loss) / (last_retperiod - curr_retperiod) + curr_loss ) return -1
@nb.njit(cache=True, error_model="numpy")
[docs] def fill_tvar( tail, tail_sizes, tail_offsets, summary_id, next_retperiod, tvar ): """Populate the Tail with retperiod and tvar values for summary_id Args: tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per summary_id tail_offsets (ndarray[int64]): Array of start positions per summary_id in tail summary_id (int): Summary ID next_retperiod (float): Next Return Period tvar (float): Tail Value at Risk Returns: tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per summary_id """ idx = summary_id - 1 insert_pos = tail_offsets[idx] + tail_sizes[idx] tail[insert_pos]["retperiod"] = next_retperiod tail[insert_pos]["tvar"] = tvar tail_sizes[idx] += 1 return tail, tail_sizes
@nb.njit(cache=True, error_model="numpy")
[docs] def fill_tvar_wheatsheaf( tail, tail_sizes, tail_offsets, summary_id, sidx, num_sidxs, next_retperiod, tvar ): """Populate the Tail with retperiod and tvar values for (summary_id, sidx) pair Args: tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per (summary_id, sidx) idx tail_offsets (ndarray[int64]): Array of start positions per idx in tail summary_id (int): Summary ID sidx (int): Sample ID num_sidxs (int): Number of sidxs to consider next_retperiod (float): Next Return Period tvar (float): Tail Value at Risk Returns: tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per (summary_id, sidx) idx """ idx = get_wheatsheaf_items_idx(summary_id, sidx, num_sidxs) insert_pos = tail_offsets[idx] + tail_sizes[idx] tail[insert_pos]["retperiod"] = next_retperiod tail[insert_pos]["tvar"] = tvar tail_sizes[idx] += 1 return tail, tail_sizes
@nb.njit(cache=True, error_model="numpy")
[docs] def write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, curr_retperiod, curr_loss, summary_id, eptype, epcalc, max_retperiod, counter, tvar, tail, tail_sizes, tail_offsets, returnperiods, mean_map=None, is_wheatsheaf=False, num_sidxs=-1, ): """Processes return periods and computes losses for a given summary, updating TVaR and mean map if required. Args: next_returnperiod_idx (int): Index of the next return period to process. last_computed_rp (float): Last computed return period last_computed_loss (float): Last computed loss value curr_retperiod (float): Current return period being processed. curr_loss (float): Loss associated with the current return period. summary_id (int): Identifier for the current summary. eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). epcalc (int): Type of exceedance probability calculation. max_retperiod (int): Maximum return period to be used in calculations counter (int): Counter used for updating TVaR tvar (float): Tail Value at Risk tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per summary/idx tail_offsets (ndarray[int64]): Array of start positions per summary/idx in tail returnperiods (ndarray[np.int32]): Return Periods array mean_map (ndarray[MEANMAP_dtype], optional): An array mapping used for mean loss calculations per Summary ID. Used for EPT output later. Defaults to None. is_wheatsheaf (bool, optional): If True, update the wheatsheaf TVaR structure. num_sidxs (int, optional): Number of sidxs to consider. Defaults to -1 if not is_wheatsheaf. Returns: rets (list[EPT_dtype]): Return period and Loss EPT data tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per summary/idx last_computed_rp (float): Last computed return period last_computed_loss (float): Last computed loss value """ next_retperiod = 0 rets = [] while True: if next_returnperiod_idx >= len(returnperiods): return rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss next_retperiod = returnperiods[next_returnperiod_idx] if curr_retperiod > next_retperiod: break if max_retperiod < next_retperiod: next_returnperiod_idx += 1 continue loss = get_loss( next_retperiod, last_computed_rp, last_computed_loss, curr_retperiod, curr_loss ) rets.append((summary_id, epcalc, eptype, next_retperiod, loss)) if mean_map is not None: mean_map[summary_id - 1][next_returnperiod_idx]["retperiod"] = next_retperiod mean_map[summary_id - 1][next_returnperiod_idx]["mean"] += loss mean_map[summary_id - 1][next_returnperiod_idx]["count"] += 1 if curr_retperiod != 0: tvar = tvar - ((tvar - loss) / counter) if is_wheatsheaf: tail, tail_sizes = fill_tvar_wheatsheaf( tail, tail_sizes, tail_offsets, summary_id, epcalc, num_sidxs, next_retperiod, tvar, ) else: tail, tail_sizes = fill_tvar( tail, tail_sizes, tail_offsets, summary_id, next_retperiod, tvar, ) next_returnperiod_idx += 1 counter += 1 if curr_retperiod > next_retperiod: break if curr_retperiod > 0: last_computed_rp = curr_retperiod last_computed_loss = curr_loss return rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss
@nb.njit(cache=True, error_model="numpy")
[docs] def write_tvar( epcalc, eptype_tvar, tail, tail_sizes, tail_offsets, max_summary_id, ): """Get TVaR values for EPT output from tail Args: epcalc (int): Type of exceedance probability calculation. eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per summary_id tail_offsets (ndarray[int64]): Array of start positions per summary_id in tail max_summary_id (int): Maximum summary ID Returns: rets (ndarray[EPT_dtype]): Return period and Loss EPT data """ rets = np.empty(np.sum(tail_sizes), dtype=EPT_dtype) pos = 0 for i in range(max_summary_id): size = tail_sizes[i] if size == 0: continue summary_id = i + 1 for j in range(size): p = tail_offsets[i] + j rets[pos]["SummaryId"] = summary_id rets[pos]["EPCalc"] = epcalc rets[pos]["EPType"] = eptype_tvar rets[pos]["ReturnPeriod"] = tail[p]["retperiod"] rets[pos]["Loss"] = tail[p]["tvar"] pos += 1 return rets
@nb.njit(cache=True, error_model="numpy")
[docs] def write_tvar_wheatsheaf( num_sidxs, eptype_tvar, tail, tail_sizes, tail_offsets, total_idxs, ): """Get TVaR values for PSEPT output from tail Args: num_sidxs (int): Number of sidxs to consider. eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). tail (ndarray[TAIL_valtype]): Flat array of (return period, tvar) values tail_sizes (ndarray[int64]): Array of current fill size per (summary_id, sidx) idx tail_offsets (ndarray[int64]): Array of start positions per idx in tail total_idxs (int): Total number of (summary_id, sidx) index entries Returns: rets (ndarray[PSEPT_dtype]): Return period and Loss PSEPT data """ rets = np.empty(np.sum(tail_sizes), dtype=PSEPT_dtype) pos = 0 for idx in range(total_idxs): size = tail_sizes[idx] if size == 0: continue sidx, summary_id = get_wheatsheaf_items_idx_data(idx, num_sidxs) for j in range(size): p = tail_offsets[idx] + j rets[pos]["SummaryId"] = summary_id rets[pos]["SampleId"] = sidx rets[pos]["EPType"] = eptype_tvar rets[pos]["ReturnPeriod"] = tail[p]["retperiod"] rets[pos]["Loss"] = tail[p]["tvar"] pos += 1 return rets
@nb.njit(cache=True, error_model="numpy")
[docs] def write_ept( items, items_start_end, max_retperiod, epcalc, eptype, eptype_tvar, use_return_period, returnperiods, max_summary_id, sample_size=1 ): """Generate Loss Exceedance Curve values and Tail Value at Risk values based on items and epcalc/eptype/eptype_tvar The loss calculation follows these principles: - For Aggregate Loss Exceedance Curves (AEP): The sum of all losses within a period is calculated. - For Occurrence Loss Exceedance Curves (OEP): The maximum loss within a period is taken. - TVAR (Tail Conditional Expectation): Calculated as the average of losses exceeding a given return period. Args: items (ndarray[LOSSVEC2MAP_dtype]): Array mapping summary_id to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each summary_id in the items array max_retperiod (int): Maximum return period to be used in calculations epcalc (int): Specifies the calculation method (mean damage loss, full uncertainty, per sample mean, sample mean). eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID sample_size (int, optional): Sample Size. Defaults to 1. Yields: buffer (ndarray[EPT_dtype]): Buffered chunks of EPT data """ buffer = np.zeros(DEFAULT_BUFFER_SIZE, dtype=EPT_dtype) bidx = 0 if len(items) == 0 or sample_size == 0: return tail_sizes = np.zeros(max_summary_id, dtype=np.int64) tail_offsets = np.zeros(max_summary_id + 1, dtype=np.int64) if use_return_period: rp_len = len(returnperiods) if len(returnperiods) > 0 else 1 for s in range(max_summary_id + 1): tail_offsets[s] = s * rp_len tail = np.zeros(max_summary_id * rp_len, dtype=TAIL_valtype) else: for s in range(max_summary_id): start, end = items_start_end[s] if start != -1: tail_offsets[s + 1] = tail_offsets[s] + (end - start) else: tail_offsets[s + 1] = tail_offsets[s] total_tail = tail_offsets[max_summary_id] tail = np.zeros(total_tail if total_tail > 0 else 1, dtype=TAIL_valtype) for summary_id in range(1, max_summary_id + 1): start, end = items_start_end[summary_id - 1] if start == -1: continue filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 for item in sorted_items: value = item["value"] / sample_size retperiod = max_retperiod / i if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - value) / i) else: tvar = tvar - ((tvar - value) / i) insert_pos = tail_offsets[summary_id - 1] + tail_sizes[summary_id - 1] tail[insert_pos]["retperiod"] = retperiod tail[insert_pos]["tvar"] = tvar tail_sizes[summary_id - 1] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["EPCalc"] = epcalc buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: while True: if next_returnperiod_idx >= len(returnperiods): break retperiod = max_retperiod / i if returnperiods[next_returnperiod_idx] <= 0: retperiod = 0 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 rets = write_tvar( epcalc, eptype_tvar, tail, tail_sizes, tail_offsets, max_summary_id, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret["SummaryId"] buffer[bidx]["EPCalc"] = ret["EPCalc"] buffer[bidx]["EPType"] = ret["EPType"] buffer[bidx]["ReturnPeriod"] = ret["ReturnPeriod"] buffer[bidx]["Loss"] = ret["Loss"] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_ept_weighted( items, items_start_end, cum_weight_constant, epcalc, eptype, eptype_tvar, unused_period_weights, use_return_period, returnperiods, max_summary_id, sample_size=1 ): """Generate Loss Exceedance Curve values and Tail Value at Risk values based on items and epcalc/eptype/eptype_tvar. This function calculates weighted exceedance probability tables using cumulative period weightings (`period_weighting`), which impact the calculation of return periods. The weighting allows for more accurate representation of losses when event periods have different probabilities or frequencies of occurrence. The loss calculation follows these principles: - For Aggregate Loss Exceedance Curves (AEP): The sum of all losses within a period is calculated. - For Occurrence Loss Exceedance Curves (OEP): The maximum loss within a period is taken. - TVAR (Tail Conditional Expectation): Calculated as the average of losses exceeding a given return period. Args: items (ndarray[LOSSVEC2MAP_dtype]): Array mapping summary_id to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each summary_id in the items array cum_weight_constant (float): Constant factor for scaling cumulative period weights. epcalc (int): Specifies the calculation method (mean damage loss, full uncertainty, per sample mean, sample mean). eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). unused_period_weights (ndarray[float]): Array of unused period weights use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID sample_size (int, optional): Sample Size. Defaults to 1. Yields: buffer (ndarray[EPT_dtype]): Buffered chunks of EPT data """ buffer = np.zeros(DEFAULT_BUFFER_SIZE, dtype=EPT_dtype) bidx = 0 if len(items) == 0 or sample_size == 0: return tail_sizes = np.zeros(max_summary_id, dtype=np.int64) tail_offsets = np.zeros(max_summary_id + 1, dtype=np.int64) if use_return_period: rp_len = len(returnperiods) if len(returnperiods) > 0 else 1 for s in range(max_summary_id + 1): tail_offsets[s] = s * rp_len tail = np.zeros(max_summary_id * rp_len, dtype=TAIL_valtype) else: for s in range(max_summary_id): start, end = items_start_end[s] if start != -1: tail_offsets[s + 1] = tail_offsets[s] + (end - start) else: tail_offsets[s + 1] = tail_offsets[s] total_tail = tail_offsets[max_summary_id] tail = np.zeros(total_tail if total_tail > 0 else 1, dtype=TAIL_valtype) for summary_id in range(1, max_summary_id + 1): start, end = items_start_end[summary_id - 1] if start == -1: continue filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 cumulative_weighting = 0 max_retperiod = 0 largest_loss = False for item in sorted_items: value = item["value"] / sample_size cumulative_weighting += (item["period_weighting"] * cum_weight_constant) retperiod = max_retperiod / i if item["period_weighting"]: retperiod = 1 / cumulative_weighting if not largest_loss: max_retperiod = retperiod + 0.0001 # Add for floating point errors largest_loss = True if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - (value)) / i) else: tvar = tvar - ((tvar - (value)) / i) insert_pos = tail_offsets[summary_id - 1] + tail_sizes[summary_id - 1] tail[insert_pos]["retperiod"] = retperiod tail[insert_pos]["tvar"] = tvar tail_sizes[summary_id - 1] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["EPCalc"] = epcalc buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: unused_pw_idx = 0 while True: if next_returnperiod_idx >= len(returnperiods): break retperiod = 0 if unused_pw_idx < len(unused_period_weights): cumulative_weighting += ( unused_period_weights[unused_pw_idx]["weighting"] * cum_weight_constant ) retperiod = 1 / cumulative_weighting unused_pw_idx += 1 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, epcalc, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["EPCalc"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 rets = write_tvar( epcalc, eptype_tvar, tail, tail_sizes, tail_offsets, max_summary_id, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret["SummaryId"] buffer[bidx]["EPCalc"] = ret["EPCalc"] buffer[bidx]["EPType"] = ret["EPType"] buffer[bidx]["ReturnPeriod"] = ret["ReturnPeriod"] buffer[bidx]["Loss"] = ret["Loss"] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_psept( items, items_start_end, max_retperiod, eptype, eptype_tvar, use_return_period, returnperiods, max_summary_id, num_sidxs ): """Generate Per Sample Exceedance Probability Tables (PSEPT) for each individual sample, producing a separate loss exceedance curve for each sample, eptype, eptype_tvar. Args: items (ndarray[WHEATKEYITEMS_dtype]): Array mapping (summary_id, sidx) to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each (summary_id, sidx) pair in the items array max_retperiod (int): Maximum return period to be used in calculations eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID num_sidxs (int): Number of sidxs to consider Yields: buffer (ndarray[PSEPT_dtype]): Buffered chunks of PSEPT data """ buffer = np.zeros(DEFAULT_BUFFER_SIZE, dtype=PSEPT_dtype) bidx = 0 if len(items) == 0: return total_idxs = max_summary_id * num_sidxs tail_sizes = np.zeros(total_idxs, dtype=np.int64) tail_offsets = np.zeros(total_idxs + 1, dtype=np.int64) if use_return_period: rp_len = len(returnperiods) if len(returnperiods) > 0 else 1 for s in range(total_idxs + 1): tail_offsets[s] = s * rp_len tail = np.zeros(total_idxs * rp_len, dtype=TAIL_valtype) else: for s in range(total_idxs): start, end = items_start_end[s] if start != -1: tail_offsets[s + 1] = tail_offsets[s] + (end - start) else: tail_offsets[s + 1] = tail_offsets[s] total_tail = tail_offsets[total_idxs] tail = np.zeros(total_tail if total_tail > 0 else 1, dtype=TAIL_valtype) for idx in range(total_idxs): start, end = items_start_end[idx] if start == -1: continue sidx, summary_id = get_wheatsheaf_items_idx_data(idx, num_sidxs) filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 for item in sorted_items: value = item["value"] retperiod = max_retperiod / i if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - value) / i) else: tvar = tvar - ((tvar - value) / i) insert_pos = tail_offsets[idx] + tail_sizes[idx] tail[insert_pos]["retperiod"] = retperiod tail[insert_pos]["tvar"] = tvar tail_sizes[idx] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["SampleId"] = sidx buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: while True: if next_returnperiod_idx >= len(returnperiods): break retperiod = max_retperiod / i if returnperiods[next_returnperiod_idx] <= 0: retperiod = 0 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 rets = write_tvar_wheatsheaf( num_sidxs, eptype_tvar, tail, tail_sizes, tail_offsets, total_idxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret["SummaryId"] buffer[bidx]["SampleId"] = ret["SampleId"] buffer[bidx]["EPType"] = ret["EPType"] buffer[bidx]["ReturnPeriod"] = ret["ReturnPeriod"] buffer[bidx]["Loss"] = ret["Loss"] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_psept_weighted( items, items_start_end, max_retperiod, eptype, eptype_tvar, unused_period_weights, use_return_period, returnperiods, max_summary_id, num_sidxs, sample_size, mean_map=None ): """Generate Per Sample Exceedance Probability Tables (PSEPT) for each individual sample, producing a separate loss exceedance curve for each sample, eptype, eptype_tvar. Args: items (ndarray[WHEATKEYITEMS_dtype]): Array mapping (summary_id, sidx) to loss value (and period_no/period_weighting where applicable) items_start_end (ndarray[np.int32]): An array marking where the start and end idxs are for each (summary_id, sidx) pair in the items array max_retperiod (int): Maximum return period to be used in calculations eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). eptype_tvar (int): Type of Tail Value-at-Risk (TVAR) to calculate (0 = OEP TVAR, 1 = AEP TVAR). unused_period_weights (ndarray[float]): Array of unused period weights use_return_period (bool): Use Return Period file. returnperiods (ndarray[np.int32]): Return Periods array max_summary_id (int): Maximum summary ID num_sidxs (int): Number of sidxs to consider sample_size (int): Sample Size. Defaults to 1. mean_map (ndarray[MEANMAP_dtype], optional): An array mapping used for mean loss calculations per Summary ID. Used for EPT output later. Defaults to None. Yields: buffer (ndarray[PSEPT_dtype]): Buffered chunks of PSEPT data """ buffer = np.zeros(DEFAULT_BUFFER_SIZE, dtype=PSEPT_dtype) bidx = 0 if len(items) == 0: return total_idxs = max_summary_id * num_sidxs tail_sizes = np.zeros(total_idxs, dtype=np.int64) tail_offsets = np.zeros(total_idxs + 1, dtype=np.int64) if use_return_period: rp_len = len(returnperiods) if len(returnperiods) > 0 else 1 for s in range(total_idxs + 1): tail_offsets[s] = s * rp_len tail = np.zeros(total_idxs * rp_len, dtype=TAIL_valtype) else: for s in range(total_idxs): start, end = items_start_end[s] if start != -1: tail_offsets[s + 1] = tail_offsets[s] + (end - start) else: tail_offsets[s + 1] = tail_offsets[s] total_tail = tail_offsets[total_idxs] tail = np.zeros(total_tail if total_tail > 0 else 1, dtype=TAIL_valtype) for idx in range(total_idxs): start, end = items_start_end[idx] if start == -1: continue sidx, summary_id = get_wheatsheaf_items_idx_data(idx, num_sidxs) filtered_items = items[start:end] sorted_idxs = np.argsort(filtered_items["value"])[::-1] sorted_items = filtered_items[sorted_idxs] next_returnperiod_idx = 0 last_computed_rp = 0 last_computed_loss = 0 tvar = 0 i = 1 cumulative_weighting = 0 max_retperiod = 0 largest_loss = False for item in sorted_items: value = item["value"] cumulative_weighting += (item["period_weighting"] * sample_size) retperiod = max_retperiod / i if item["period_weighting"]: retperiod = 1 / cumulative_weighting if not largest_loss: max_retperiod = retperiod + 0.0001 # Add for floating point errors largest_loss = True if use_return_period: if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, value, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, mean_map=mean_map, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - ((tvar - (value)) / i) else: tvar = tvar - ((tvar - (value)) / i) insert_pos = tail_offsets[idx] + tail_sizes[idx] tail[insert_pos]["retperiod"] = retperiod tail[insert_pos]["tvar"] = tvar tail_sizes[idx] += 1 if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["SampleId"] = sidx buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = retperiod buffer[bidx]["Loss"] = value bidx += 1 i += 1 if use_return_period: unused_pw_idx = 0 while True: if next_returnperiod_idx >= len(returnperiods): break retperiod = 0 if unused_pw_idx < len(unused_period_weights): cumulative_weighting += ( unused_period_weights[unused_pw_idx]["weighting"] * sample_size ) retperiod = 1 / cumulative_weighting unused_pw_idx += 1 if next_returnperiod_idx < len(returnperiods): rets, tail, tail_sizes, next_returnperiod_idx, last_computed_rp, last_computed_loss = write_return_period_out( next_returnperiod_idx, last_computed_rp, last_computed_loss, retperiod, 0, summary_id, eptype, sidx, max_retperiod, i, tvar, tail, tail_sizes, tail_offsets, returnperiods, mean_map=mean_map, is_wheatsheaf=True, num_sidxs=num_sidxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret[0] buffer[bidx]["SampleId"] = ret[1] buffer[bidx]["EPType"] = ret[2] buffer[bidx]["ReturnPeriod"] = ret[3] buffer[bidx]["Loss"] = ret[4] bidx += 1 tvar = tvar - (tvar / i) i += 1 rets = write_tvar_wheatsheaf( num_sidxs, eptype_tvar, tail, tail_sizes, tail_offsets, total_idxs, ) for ret in rets: if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = ret["SummaryId"] buffer[bidx]["SampleId"] = ret["SampleId"] buffer[bidx]["EPType"] = ret["EPType"] buffer[bidx]["ReturnPeriod"] = ret["ReturnPeriod"] buffer[bidx]["Loss"] = ret["Loss"] bidx += 1 yield buffer[:bidx]
@nb.njit(cache=True, error_model="numpy")
[docs] def write_wheatsheaf_mean( mean_map, eptype, epcalc, max_summary_id, ): """Generate Wheatsheaf Mean Exceedance Probability Table (EPT) by averaging losses for each return period from a precomputed mean map. Args: mean_map (ndarray[MEANMAP_dtype]): An array mapping used for mean loss calculations per Summary ID. epcalc (int): Specifies the calculation method (mean damage loss, full uncertainty, per sample mean, sample mean). eptype (int): Type of exceedance probability (0 = OEP, 1 = AEP). max_summary_id (int): Maximum summary ID Yields: buffer (ndarray[EPT_dtype]): Buffered chunks of EPT data """ if len(mean_map) == 0: return buffer = np.zeros(DEFAULT_BUFFER_SIZE, dtype=EPT_dtype) bidx = 0 for summary_id in range(1, max_summary_id + 1): if np.sum(mean_map[summary_id - 1]["count"]) == 0: continue for mc in mean_map[summary_id - 1]: if mc["count"] == 0: continue if bidx >= len(buffer): yield buffer[:bidx] bidx = 0 buffer[bidx]["SummaryId"] = summary_id buffer[bidx]["EPCalc"] = epcalc buffer[bidx]["EPType"] = eptype buffer[bidx]["ReturnPeriod"] = mc["retperiod"] buffer[bidx]["Loss"] = mc["mean"] / max(mc["count"], 1) bidx += 1 yield buffer[:bidx]