# lec/manager.py
import logging
import numpy as np
import numba as nb
from contextlib import ExitStack
from pathlib import Path
from oasislmf.pytools.common.data import (oasis_int, oasis_float, oasis_int_size, oasis_float_size)
from oasislmf.pytools.common.event_stream import MAX_LOSS_IDX, MEAN_IDX, NUMBER_OF_AFFECTED_RISK_IDX, SUMMARY_STREAM_ID, init_streams_in, mv_read
from oasislmf.pytools.common.input_files import PERIODS_FILE, read_occurrence, read_periods, read_return_periods
from oasislmf.pytools.lec.data import (AEP, AEPTVAR, AGG_FULL_UNCERTAINTY, AGG_SAMPLE_MEAN, AGG_WHEATSHEAF, AGG_WHEATSHEAF_MEAN,
OCC_FULL_UNCERTAINTY, OCC_SAMPLE_MEAN, OCC_WHEATSHEAF, OCC_WHEATSHEAF_MEAN, OEP, OEPTVAR, OUTLOSS_DTYPE, EPT_headers, PSEPT_headers)
from oasislmf.pytools.lec.aggreports import AggReports
from oasislmf.pytools.lec.utils import get_outloss_mean_idx, get_outloss_sample_idx
from oasislmf.pytools.utils import redirect_logging
[docs]
logger = logging.getLogger(__name__)
@nb.njit(cache=True, error_model="numpy")
[docs]
def get_max_summary_id(file_handles):
"""Get max summary_id from all summary files
Args:
file_handles (List[np.memmap]): List of memmaps for summary files data
Returns:
max_summary_id (int): Max summary ID
"""
max_summary_id = -1
for fin in file_handles:
cursor = oasis_int_size * 3
valid_buff = len(fin)
while cursor < valid_buff:
_, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size)
summary_id, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size)
_, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size)
max_summary_id = max(max_summary_id, summary_id)
while cursor < valid_buff:
sidx, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size)
_, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size)
if sidx == 0:
break
return max_summary_id
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs]
def do_lec_output_agg_summary(
summary_id,
sidx,
loss,
filtered_occ_map,
outloss_mean,
outloss_sample,
num_sidxs,
max_summary_id,
):
"""Populate outloss_mean and outloss_sample with aggregate and max losses
Args:
summary_id (oasis_int): summary_id
sidx (oasis_int): Sample ID
loss (oasis_float): Loss value
filtered_occ_map (ndarray[occ_map_dtype]): Filtered numpy map of event_id, period_no, occ_date_id from the occurrence file_
outloss_mean (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, period_no containing aggregate and max losses
outloss_sample (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, sidx, period_no containing aggregate and max losses
num_sidxs (int): Number of sidxs to consider for outloss_sample
max_summary_id (int): Max summary ID
"""
for row in filtered_occ_map:
period_no = row["period_no"]
if sidx == MEAN_IDX:
idx = get_outloss_mean_idx(period_no, summary_id, max_summary_id)
outloss = outloss_mean
else:
idx = get_outloss_sample_idx(period_no, sidx, summary_id, num_sidxs, max_summary_id)
outloss = outloss_sample
outloss[idx]["row_used"] = True
outloss[idx]["agg_out_loss"] += loss
max_out_loss = max(outloss[idx]["max_out_loss"], loss)
outloss[idx]["max_out_loss"] = max_out_loss
@nb.njit(cache=True, error_model="numpy")
@nb.njit(cache=True, error_model="numpy")
[docs]
def run_lec(
file_handles,
outloss_mean,
outloss_sample,
summary_ids,
occ_map,
use_return_period,
num_sidxs,
max_summary_id,
):
"""Process each summary file and populate outloss_mean and outloss_sample
Args:
file_handles (List[np.memmap]): List of memmaps for summary files data
outloss_mean (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, period_no containing aggregate and max losses
outloss_sample (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, sidx, period_no containing aggregate and max losses
summary_ids (ndarray[bool]): bool array marking which summary_ids are used
occ_map (ndarray[occ_map_dtype]): numpy map of event_id, period_no, occ_date_id from the occurrence file_
use_return_period (bool): Use Return Period file.
num_sidxs (int): Number of sidxs to consider for outloss_sample
max_summary_id (int): Max summary ID
"""
for fin in file_handles:
process_input_file(
fin,
outloss_mean,
outloss_sample,
summary_ids,
occ_map,
use_return_period,
num_sidxs,
max_summary_id,
)
[docs]
def run(
run_dir,
subfolder,
ept_output_file=None,
psept_output_file=None,
agg_full_uncertainty=False,
agg_wheatsheaf=False,
agg_sample_mean=False,
agg_wheatsheaf_mean=False,
occ_full_uncertainty=False,
occ_wheatsheaf=False,
occ_sample_mean=False,
occ_wheatsheaf_mean=False,
use_return_period=False,
noheader=False,
):
"""Runs LEC calculations
Args:
run_dir (str | os.PathLike): Path to directory containing required files structure
subfolder (str): Workspace subfolder inside <run_dir>/work/<subfolder>
ept_output_file (str, optional): Path to EPT output file. Defaults to None
psept_output_file (str, optional): Path to PSEPT output file. Defaults to None
agg_full_uncertainty (bool, optional): Aggregate Full Uncertainty. Defaults to False.
agg_wheatsheaf (bool, optional): Aggregate Wheatsheaf. Defaults to False.
agg_sample_mean (bool, optional): Aggregate Sample Mean. Defaults to False.
agg_wheatsheaf_mean (bool, optional): Aggregate Wheatsheaf Mean. Defaults to False.
occ_full_uncertainty (bool, optional): Occurrence Full Uncertainty. Defaults to False.
occ_wheatsheaf (bool, optional): Occurrence Wheatsheaf. Defaults to False.
occ_sample_mean (bool, optional): Occurrence Sample Mean. Defaults to False.
occ_wheatsheaf_mean (bool, optional): Occurrence Wheatsheaf Mean. Defaults to False.
use_return_period (bool, optional): Use Return Period file. Defaults to False.
noheader (bool): Boolean value to skip header in output file
"""
output_ept = ept_output_file is not None
output_psept = psept_output_file is not None
with ExitStack() as stack:
workspace_folder = Path(run_dir, "work", subfolder)
if not workspace_folder.is_dir():
raise RuntimeError(f"Error: Unable to open directory {workspace_folder}")
# work folder for lec files
lec_files_folder = Path(workspace_folder, "lec_files")
lec_files_folder.mkdir(parents=False, exist_ok=True)
# Find summary binary files
files = [file for file in workspace_folder.glob("*.bin")]
file_handles = [np.memmap(file, mode="r", dtype="u1") for file in files]
streams_in, (stream_source_type, stream_agg_type, sample_size) = init_streams_in(files, stack)
if stream_source_type != SUMMARY_STREAM_ID:
raise RuntimeError(f"Error: Not a summary stream type {stream_source_type}")
max_summary_id = get_max_summary_id(file_handles)
file_data, use_return_period, agg_wheatsheaf_mean, occ_wheatsheaf_mean = read_input_files(
run_dir,
use_return_period,
agg_wheatsheaf_mean,
occ_wheatsheaf_mean,
sample_size,
)
output_flags = [
agg_full_uncertainty,
agg_wheatsheaf,
agg_sample_mean,
agg_wheatsheaf_mean,
occ_full_uncertainty,
occ_wheatsheaf,
occ_sample_mean,
occ_wheatsheaf_mean,
]
# Check output_flags against output files
handles_agg = [AGG_FULL_UNCERTAINTY, AGG_SAMPLE_MEAN, AGG_WHEATSHEAF_MEAN]
handles_occ = [OCC_FULL_UNCERTAINTY, OCC_SAMPLE_MEAN, OCC_WHEATSHEAF_MEAN]
handles_psept = [AGG_WHEATSHEAF, OCC_WHEATSHEAF]
hasAGG = any([output_flags[idx] for idx in handles_agg])
hasOCC = any([output_flags[idx] for idx in handles_occ])
hasEPT = hasAGG or hasOCC
hasPSEPT = any([output_flags[idx] for idx in handles_psept])
output_ept = output_ept and hasEPT
output_psept = output_psept and hasPSEPT
if not output_ept:
logger.warning("WARNING: no valid output stream to fill EPT file")
if not output_psept:
logger.warning("WARNING: no valid output stream to fill PSEPT file")
if not (output_ept or output_psept):
return
# Create outloss array maps
# outloss_mean has only -1 SIDX
outloss_mean_file = Path(lec_files_folder, "lec_outloss_mean.bdat")
outloss_mean = np.memmap(
outloss_mean_file,
dtype=OUTLOSS_DTYPE,
mode="w+",
shape=(file_data["no_of_periods"] * max_summary_id),
)
# outloss_sample has all SIDXs plus -2 and -3
num_sidxs = sample_size + 2
outloss_sample_file = Path(lec_files_folder, "lec_outloss_sample.bdat")
outloss_sample = np.memmap(
outloss_sample_file,
dtype=OUTLOSS_DTYPE,
mode="w+",
shape=(file_data["no_of_periods"] * num_sidxs * max_summary_id),
)
# Array of Summary IDs found
summary_ids = np.zeros((max_summary_id), dtype=np.bool_)
# Run LEC calculations to populate outloss and summary_ids arrays
run_lec(
file_handles,
outloss_mean,
outloss_sample,
summary_ids,
file_data["occ_map"],
use_return_period,
num_sidxs,
max_summary_id,
)
# Setup output files and headers
output_files = {}
if output_ept:
ept_file = stack.enter_context(open(ept_output_file, "w"))
if not noheader:
csv_headers = ",".join(EPT_headers)
ept_file.write(csv_headers + "\n")
output_files["ept"] = ept_file
if output_psept:
psept_file = stack.enter_context(open(psept_output_file, "w"))
if not noheader:
csv_headers = ",".join(PSEPT_headers)
psept_file.write(csv_headers + "\n")
output_files["psept"] = psept_file
# Output aggregate reports to CSVs
agg = AggReports(
output_files,
outloss_mean,
outloss_sample,
file_data["period_weights"],
max_summary_id,
sample_size,
file_data["no_of_periods"],
num_sidxs,
use_return_period,
file_data["returnperiods"],
lec_files_folder,
)
# Output Mean Damage Ratio
if output_ept:
if hasOCC:
agg.output_mean_damage_ratio(OEP, OEPTVAR, "max_out_loss")
if hasAGG:
agg.output_mean_damage_ratio(AEP, AEPTVAR, "agg_out_loss")
# Output Full Uncertainty
if output_flags[OCC_FULL_UNCERTAINTY]:
agg.output_full_uncertainty(OEP, OEPTVAR, "max_out_loss")
if output_flags[AGG_FULL_UNCERTAINTY]:
agg.output_full_uncertainty(AEP, AEPTVAR, "agg_out_loss")
# Output Wheatsheaf and Wheatsheaf Mean
if output_flags[OCC_WHEATSHEAF] or output_flags[OCC_WHEATSHEAF_MEAN]:
agg.output_wheatsheaf_and_wheatsheafmean(
OEP, OEPTVAR, "max_out_loss",
output_flags[OCC_WHEATSHEAF], output_flags[OCC_WHEATSHEAF_MEAN]
)
if output_flags[AGG_WHEATSHEAF] or output_flags[AGG_WHEATSHEAF_MEAN]:
agg.output_wheatsheaf_and_wheatsheafmean(
AEP, AEPTVAR, "agg_out_loss",
output_flags[AGG_WHEATSHEAF], output_flags[AGG_WHEATSHEAF_MEAN]
)
# Output Sample Mean
if output_flags[OCC_SAMPLE_MEAN]:
agg.output_sample_mean(OEP, OEPTVAR, "max_out_loss")
if output_flags[AGG_SAMPLE_MEAN]:
agg.output_sample_mean(AEP, AEPTVAR, "agg_out_loss")
@redirect_logging(exec_name='lecpy')
[docs]
def main(
run_dir='.',
subfolder=None,
ept=None,
psept=None,
agg_full_uncertainty=False,
agg_wheatsheaf=False,
agg_sample_mean=False,
agg_wheatsheaf_mean=False,
occ_full_uncertainty=False,
occ_wheatsheaf=False,
occ_sample_mean=False,
occ_wheatsheaf_mean=False,
use_return_period=False,
noheader=False,
**kwargs
):
run(
run_dir,
subfolder,
ept_output_file=ept,
psept_output_file=psept,
agg_full_uncertainty=agg_full_uncertainty,
agg_wheatsheaf=agg_wheatsheaf,
agg_sample_mean=agg_sample_mean,
agg_wheatsheaf_mean=agg_wheatsheaf_mean,
occ_full_uncertainty=occ_full_uncertainty,
occ_wheatsheaf=occ_wheatsheaf,
occ_sample_mean=occ_sample_mean,
occ_wheatsheaf_mean=occ_wheatsheaf_mean,
use_return_period=use_return_period,
noheader=noheader,
)