Source code for oasislmf.pytools.lec.manager

# lec/manager.py

import logging
import numpy as np
import numba as nb
from contextlib import ExitStack
from pathlib import Path

from oasislmf.pytools.common.data import (oasis_int, oasis_float, oasis_int_size, oasis_float_size)
from oasislmf.pytools.common.event_stream import MAX_LOSS_IDX, MEAN_IDX, NUMBER_OF_AFFECTED_RISK_IDX, SUMMARY_STREAM_ID, init_streams_in, mv_read
from oasislmf.pytools.common.input_files import PERIODS_FILE, read_occurrence, read_periods, read_return_periods
from oasislmf.pytools.lec.data import (AEP, AEPTVAR, AGG_FULL_UNCERTAINTY, AGG_SAMPLE_MEAN, AGG_WHEATSHEAF, AGG_WHEATSHEAF_MEAN,
                                       OCC_FULL_UNCERTAINTY, OCC_SAMPLE_MEAN, OCC_WHEATSHEAF, OCC_WHEATSHEAF_MEAN, OEP, OEPTVAR, OUTLOSS_DTYPE, EPT_headers, PSEPT_headers)
from oasislmf.pytools.lec.aggreports import AggReports
from oasislmf.pytools.lec.utils import get_outloss_mean_idx, get_outloss_sample_idx
from oasislmf.pytools.utils import redirect_logging


[docs] logger = logging.getLogger(__name__)
[docs] def read_input_files( run_dir, use_return_period, agg_wheatsheaf_mean, occ_wheatsheaf_mean, sample_size ): """Reads all input files and returns a dict of relevant data Args: run_dir (str | os.PathLike): Path to directory containing required files structure use_return_period (bool): Use Return Period file. agg_wheatsheaf_mean (bool): Aggregate Wheatsheaf Mean. occ_wheatsheaf_mean (bool): Occurrence Wheatsheaf Mean. sample_size (int): Sample Size. Returns: file_data (Dict[str, Any]): A dict of relevent data extracted from files use_return_period (bool): Use Return Period file. agg_wheatsheaf_mean (bool): Aggregate Wheatsheaf Mean. occ_wheatsheaf_mean (bool): Occurrence Wheatsheaf Mean. """ input_dir = Path(run_dir, "input") occ_map, date_algorithm, granular_date, no_of_periods = read_occurrence(input_dir) period_weights = read_periods(no_of_periods, input_dir) periods_fp = Path(input_dir, PERIODS_FILE) if not periods_fp.exists(): period_weights = np.array([], dtype=period_weights.dtype) else: # Normalise period weights period_weights["weighting"] /= sample_size returnperiods, use_return_period = read_return_periods(use_return_period, input_dir) # User must define return periods if he/she wishes to use non-uniform period weights for # Wheatsheaf/per sample mean output if agg_wheatsheaf_mean or occ_wheatsheaf_mean: if len(period_weights) > 0 and not use_return_period: logger.warning("WARNING: Return periods file must be present if you wish to use non-uniform period weights for Wheatsheaf mean/per sample mean output.") logger.info("INFO: Wheatsheaf mean/per sample mean output will not be produced.") agg_wheatsheaf_mean = False occ_wheatsheaf_mean = False elif len(period_weights) > 0: logger.info("INFO: Tail Value at Risk for Wheatsheaf mean/per sample mean is not supported if you wish to use non-uniform period weights.") file_data = { "occ_map": occ_map, "date_algorithm": date_algorithm, "granular_date": granular_date, "no_of_periods": no_of_periods, "period_weights": period_weights, "returnperiods": returnperiods, } return file_data, use_return_period, agg_wheatsheaf_mean, occ_wheatsheaf_mean
@nb.njit(cache=True, error_model="numpy")
[docs] def get_max_summary_id(file_handles): """Get max summary_id from all summary files Args: file_handles (List[np.memmap]): List of memmaps for summary files data Returns: max_summary_id (int): Max summary ID """ max_summary_id = -1 for fin in file_handles: cursor = oasis_int_size * 3 valid_buff = len(fin) while cursor < valid_buff: _, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size) summary_id, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size) _, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size) max_summary_id = max(max_summary_id, summary_id) while cursor < valid_buff: sidx, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size) _, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size) if sidx == 0: break return max_summary_id
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs] def do_lec_output_agg_summary( summary_id, sidx, loss, filtered_occ_map, outloss_mean, outloss_sample, num_sidxs, max_summary_id, ): """Populate outloss_mean and outloss_sample with aggregate and max losses Args: summary_id (oasis_int): summary_id sidx (oasis_int): Sample ID loss (oasis_float): Loss value filtered_occ_map (ndarray[occ_map_dtype]): Filtered numpy map of event_id, period_no, occ_date_id from the occurrence file_ outloss_mean (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, period_no containing aggregate and max losses outloss_sample (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, sidx, period_no containing aggregate and max losses num_sidxs (int): Number of sidxs to consider for outloss_sample max_summary_id (int): Max summary ID """ for row in filtered_occ_map: period_no = row["period_no"] if sidx == MEAN_IDX: idx = get_outloss_mean_idx(period_no, summary_id, max_summary_id) outloss = outloss_mean else: idx = get_outloss_sample_idx(period_no, sidx, summary_id, num_sidxs, max_summary_id) outloss = outloss_sample outloss[idx]["row_used"] = True outloss[idx]["agg_out_loss"] += loss max_out_loss = max(outloss[idx]["max_out_loss"], loss) outloss[idx]["max_out_loss"] = max_out_loss
@nb.njit(cache=True, error_model="numpy")
[docs] def process_input_file( fin, outloss_mean, outloss_sample, summary_ids, occ_map, use_return_period, num_sidxs, max_summary_id, ): """Process summary file and populate outloss_mean and outloss_sample with losses Args: fin (np.memmap): summary binary memmap outloss_mean (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, period_no containing aggregate and max losses outloss_sample (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, sidx, period_no containing aggregate and max losses summary_ids (ndarray[bool]): bool array marking which summary_ids are used occ_map (ndarray[occ_map_dtype]): numpy map of event_id, period_no, occ_date_id from the occurrence file_ use_return_period (bool): Use Return Period file. num_sidxs (int): Number of sidxs to consider for outloss_sample max_summary_id (int): Max summary ID """ # Set cursor to end of stream header (stream_type, sample_size, summary_set_id) cursor = oasis_int_size * 3 # Read all samples valid_buff = len(fin) while cursor < valid_buff: event_id, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size) summary_id, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size) expval, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size) filtered_occ_map = occ_map[occ_map["event_id"] == event_id] # Discard samples if event_id not found if len(filtered_occ_map) == 0: while cursor < valid_buff: sidx, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size) _, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size) if sidx == 0: break continue summary_ids[summary_id - 1] = True while cursor < valid_buff: sidx, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size) loss, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size) if sidx == 0: break if sidx == NUMBER_OF_AFFECTED_RISK_IDX or sidx == MAX_LOSS_IDX: continue if loss > 0 or use_return_period: do_lec_output_agg_summary( summary_id, sidx, loss, filtered_occ_map, outloss_mean, outloss_sample, num_sidxs, max_summary_id, )
@nb.njit(cache=True, error_model="numpy")
[docs] def run_lec( file_handles, outloss_mean, outloss_sample, summary_ids, occ_map, use_return_period, num_sidxs, max_summary_id, ): """Process each summary file and populate outloss_mean and outloss_sample Args: file_handles (List[np.memmap]): List of memmaps for summary files data outloss_mean (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, period_no containing aggregate and max losses outloss_sample (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, sidx, period_no containing aggregate and max losses summary_ids (ndarray[bool]): bool array marking which summary_ids are used occ_map (ndarray[occ_map_dtype]): numpy map of event_id, period_no, occ_date_id from the occurrence file_ use_return_period (bool): Use Return Period file. num_sidxs (int): Number of sidxs to consider for outloss_sample max_summary_id (int): Max summary ID """ for fin in file_handles: process_input_file( fin, outloss_mean, outloss_sample, summary_ids, occ_map, use_return_period, num_sidxs, max_summary_id, )
[docs] def run( run_dir, subfolder, ept_output_file=None, psept_output_file=None, agg_full_uncertainty=False, agg_wheatsheaf=False, agg_sample_mean=False, agg_wheatsheaf_mean=False, occ_full_uncertainty=False, occ_wheatsheaf=False, occ_sample_mean=False, occ_wheatsheaf_mean=False, use_return_period=False, noheader=False, ): """Runs LEC calculations Args: run_dir (str | os.PathLike): Path to directory containing required files structure subfolder (str): Workspace subfolder inside <run_dir>/work/<subfolder> ept_output_file (str, optional): Path to EPT output file. Defaults to None psept_output_file (str, optional): Path to PSEPT output file. Defaults to None agg_full_uncertainty (bool, optional): Aggregate Full Uncertainty. Defaults to False. agg_wheatsheaf (bool, optional): Aggregate Wheatsheaf. Defaults to False. agg_sample_mean (bool, optional): Aggregate Sample Mean. Defaults to False. agg_wheatsheaf_mean (bool, optional): Aggregate Wheatsheaf Mean. Defaults to False. occ_full_uncertainty (bool, optional): Occurrence Full Uncertainty. Defaults to False. occ_wheatsheaf (bool, optional): Occurrence Wheatsheaf. Defaults to False. occ_sample_mean (bool, optional): Occurrence Sample Mean. Defaults to False. occ_wheatsheaf_mean (bool, optional): Occurrence Wheatsheaf Mean. Defaults to False. use_return_period (bool, optional): Use Return Period file. Defaults to False. noheader (bool): Boolean value to skip header in output file """ output_ept = ept_output_file is not None output_psept = psept_output_file is not None with ExitStack() as stack: workspace_folder = Path(run_dir, "work", subfolder) if not workspace_folder.is_dir(): raise RuntimeError(f"Error: Unable to open directory {workspace_folder}") # work folder for lec files lec_files_folder = Path(workspace_folder, "lec_files") lec_files_folder.mkdir(parents=False, exist_ok=True) # Find summary binary files files = [file for file in workspace_folder.glob("*.bin")] file_handles = [np.memmap(file, mode="r", dtype="u1") for file in files] streams_in, (stream_source_type, stream_agg_type, sample_size) = init_streams_in(files, stack) if stream_source_type != SUMMARY_STREAM_ID: raise RuntimeError(f"Error: Not a summary stream type {stream_source_type}") max_summary_id = get_max_summary_id(file_handles) file_data, use_return_period, agg_wheatsheaf_mean, occ_wheatsheaf_mean = read_input_files( run_dir, use_return_period, agg_wheatsheaf_mean, occ_wheatsheaf_mean, sample_size, ) output_flags = [ agg_full_uncertainty, agg_wheatsheaf, agg_sample_mean, agg_wheatsheaf_mean, occ_full_uncertainty, occ_wheatsheaf, occ_sample_mean, occ_wheatsheaf_mean, ] # Check output_flags against output files handles_agg = [AGG_FULL_UNCERTAINTY, AGG_SAMPLE_MEAN, AGG_WHEATSHEAF_MEAN] handles_occ = [OCC_FULL_UNCERTAINTY, OCC_SAMPLE_MEAN, OCC_WHEATSHEAF_MEAN] handles_psept = [AGG_WHEATSHEAF, OCC_WHEATSHEAF] hasAGG = any([output_flags[idx] for idx in handles_agg]) hasOCC = any([output_flags[idx] for idx in handles_occ]) hasEPT = hasAGG or hasOCC hasPSEPT = any([output_flags[idx] for idx in handles_psept]) output_ept = output_ept and hasEPT output_psept = output_psept and hasPSEPT if not output_ept: logger.warning("WARNING: no valid output stream to fill EPT file") if not output_psept: logger.warning("WARNING: no valid output stream to fill PSEPT file") if not (output_ept or output_psept): return # Create outloss array maps # outloss_mean has only -1 SIDX outloss_mean_file = Path(lec_files_folder, "lec_outloss_mean.bdat") outloss_mean = np.memmap( outloss_mean_file, dtype=OUTLOSS_DTYPE, mode="w+", shape=(file_data["no_of_periods"] * max_summary_id), ) # outloss_sample has all SIDXs plus -2 and -3 num_sidxs = sample_size + 2 outloss_sample_file = Path(lec_files_folder, "lec_outloss_sample.bdat") outloss_sample = np.memmap( outloss_sample_file, dtype=OUTLOSS_DTYPE, mode="w+", shape=(file_data["no_of_periods"] * num_sidxs * max_summary_id), ) # Array of Summary IDs found summary_ids = np.zeros((max_summary_id), dtype=np.bool_) # Run LEC calculations to populate outloss and summary_ids arrays run_lec( file_handles, outloss_mean, outloss_sample, summary_ids, file_data["occ_map"], use_return_period, num_sidxs, max_summary_id, ) # Setup output files and headers output_files = {} if output_ept: ept_file = stack.enter_context(open(ept_output_file, "w")) if not noheader: csv_headers = ",".join(EPT_headers) ept_file.write(csv_headers + "\n") output_files["ept"] = ept_file if output_psept: psept_file = stack.enter_context(open(psept_output_file, "w")) if not noheader: csv_headers = ",".join(PSEPT_headers) psept_file.write(csv_headers + "\n") output_files["psept"] = psept_file # Output aggregate reports to CSVs agg = AggReports( output_files, outloss_mean, outloss_sample, file_data["period_weights"], max_summary_id, sample_size, file_data["no_of_periods"], num_sidxs, use_return_period, file_data["returnperiods"], lec_files_folder, ) # Output Mean Damage Ratio if output_ept: if hasOCC: agg.output_mean_damage_ratio(OEP, OEPTVAR, "max_out_loss") if hasAGG: agg.output_mean_damage_ratio(AEP, AEPTVAR, "agg_out_loss") # Output Full Uncertainty if output_flags[OCC_FULL_UNCERTAINTY]: agg.output_full_uncertainty(OEP, OEPTVAR, "max_out_loss") if output_flags[AGG_FULL_UNCERTAINTY]: agg.output_full_uncertainty(AEP, AEPTVAR, "agg_out_loss") # Output Wheatsheaf and Wheatsheaf Mean if output_flags[OCC_WHEATSHEAF] or output_flags[OCC_WHEATSHEAF_MEAN]: agg.output_wheatsheaf_and_wheatsheafmean( OEP, OEPTVAR, "max_out_loss", output_flags[OCC_WHEATSHEAF], output_flags[OCC_WHEATSHEAF_MEAN] ) if output_flags[AGG_WHEATSHEAF] or output_flags[AGG_WHEATSHEAF_MEAN]: agg.output_wheatsheaf_and_wheatsheafmean( AEP, AEPTVAR, "agg_out_loss", output_flags[AGG_WHEATSHEAF], output_flags[AGG_WHEATSHEAF_MEAN] ) # Output Sample Mean if output_flags[OCC_SAMPLE_MEAN]: agg.output_sample_mean(OEP, OEPTVAR, "max_out_loss") if output_flags[AGG_SAMPLE_MEAN]: agg.output_sample_mean(AEP, AEPTVAR, "agg_out_loss")
@redirect_logging(exec_name='lecpy')
[docs] def main( run_dir='.', subfolder=None, ept=None, psept=None, agg_full_uncertainty=False, agg_wheatsheaf=False, agg_sample_mean=False, agg_wheatsheaf_mean=False, occ_full_uncertainty=False, occ_wheatsheaf=False, occ_sample_mean=False, occ_wheatsheaf_mean=False, use_return_period=False, noheader=False, **kwargs ): run( run_dir, subfolder, ept_output_file=ept, psept_output_file=psept, agg_full_uncertainty=agg_full_uncertainty, agg_wheatsheaf=agg_wheatsheaf, agg_sample_mean=agg_sample_mean, agg_wheatsheaf_mean=agg_wheatsheaf_mean, occ_full_uncertainty=occ_full_uncertainty, occ_wheatsheaf=occ_wheatsheaf, occ_sample_mean=occ_sample_mean, occ_wheatsheaf_mean=occ_wheatsheaf_mean, use_return_period=use_return_period, noheader=noheader, )