# lec/manager.py
import logging
import numpy as np
import numba as nb
from contextlib import ExitStack
from pathlib import Path
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from oasislmf.pytools.common.data import (oasis_int, oasis_float, oasis_int_size, oasis_float_size)
from oasislmf.pytools.common.event_stream import MAX_LOSS_IDX, MEAN_IDX, NUMBER_OF_AFFECTED_RISK_IDX, SUMMARY_STREAM_ID, init_streams_in, mv_read
from oasislmf.pytools.common.input_files import PERIODS_FILE, read_occurrence, read_periods, read_return_periods
from oasislmf.pytools.lec.data import (AEP, AEPTVAR, AGG_FULL_UNCERTAINTY, AGG_SAMPLE_MEAN, AGG_WHEATSHEAF, AGG_WHEATSHEAF_MEAN,
OCC_FULL_UNCERTAINTY, OCC_SAMPLE_MEAN, OCC_WHEATSHEAF, OCC_WHEATSHEAF_MEAN, OEP, OEPTVAR,
OUTLOSS_DTYPE, EPT_dtype, EPT_fmt, EPT_headers, PSEPT_dtype, PSEPT_fmt, PSEPT_headers)
from oasislmf.pytools.lec.aggreports import AggReports
from oasislmf.pytools.lec.utils import get_outloss_mean_idx, get_outloss_sample_idx
from oasislmf.pytools.utils import redirect_logging
[docs]
logger = logging.getLogger(__name__)
@nb.njit(cache=True, error_model="numpy")
[docs]
def get_max_summary_id(file_handles):
"""Get max summary_id from all summary files
Args:
file_handles (List[np.memmap]): List of memmaps for summary files data
Returns:
max_summary_id (int): Max summary ID
"""
max_summary_id = -1
for fin in file_handles:
cursor = oasis_int_size * 3
valid_buff = len(fin)
while cursor < valid_buff:
_, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size)
summary_id, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size)
_, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size)
max_summary_id = max(max_summary_id, summary_id)
while cursor < valid_buff:
sidx, cursor = mv_read(fin, cursor, oasis_int, oasis_int_size)
_, cursor = mv_read(fin, cursor, oasis_float, oasis_float_size)
if sidx == 0:
break
return max_summary_id
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs]
def do_lec_output_agg_summary(
summary_id,
sidx,
loss,
filtered_occ_map,
outloss_mean,
outloss_sample,
num_sidxs,
max_summary_id,
):
"""Populate outloss_mean and outloss_sample with aggregate and max losses
Args:
summary_id (oasis_int): summary_id
sidx (oasis_int): Sample ID
loss (oasis_float): Loss value
filtered_occ_map (ndarray[occ_map_dtype]): Filtered numpy map of event_id, period_no, occ_date_id from the occurrence file_
outloss_mean (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, period_no containing aggregate and max losses
outloss_sample (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, sidx, period_no containing aggregate and max losses
num_sidxs (int): Number of sidxs to consider for outloss_sample
max_summary_id (int): Max summary ID
"""
for row in filtered_occ_map:
period_no = row["period_no"]
if sidx == MEAN_IDX:
idx = get_outloss_mean_idx(period_no, summary_id, max_summary_id)
outloss = outloss_mean
else:
idx = get_outloss_sample_idx(period_no, sidx, summary_id, num_sidxs, max_summary_id)
outloss = outloss_sample
outloss[idx]["row_used"] = True
outloss[idx]["agg_out_loss"] += loss
max_out_loss = max(outloss[idx]["max_out_loss"], loss)
outloss[idx]["max_out_loss"] = max_out_loss
@nb.njit(cache=True, error_model="numpy")
@nb.njit(cache=True, error_model="numpy")
[docs]
def run_lec(
file_handles,
outloss_mean,
outloss_sample,
summary_ids,
occ_map,
use_return_period,
num_sidxs,
max_summary_id,
):
"""Process each summary file and populate outloss_mean and outloss_sample
Args:
file_handles (List[np.memmap]): List of memmaps for summary files data
outloss_mean (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, period_no containing aggregate and max losses
outloss_sample (ndarray[OUTLOSS_DTYPE]): ndarray indexed by summary_id, sidx, period_no containing aggregate and max losses
summary_ids (ndarray[bool]): bool array marking which summary_ids are used
occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file
use_return_period (bool): Use Return Period file.
num_sidxs (int): Number of sidxs to consider for outloss_sample
max_summary_id (int): Max summary ID
"""
for fin in file_handles:
process_input_file(
fin,
outloss_mean,
outloss_sample,
summary_ids,
occ_map,
use_return_period,
num_sidxs,
max_summary_id,
)
[docs]
def run(
run_dir,
subfolder,
ept_output_file=None,
psept_output_file=None,
agg_full_uncertainty=False,
agg_wheatsheaf=False,
agg_sample_mean=False,
agg_wheatsheaf_mean=False,
occ_full_uncertainty=False,
occ_wheatsheaf=False,
occ_sample_mean=False,
occ_wheatsheaf_mean=False,
use_return_period=False,
noheader=False,
output_format="csv",
):
"""Runs LEC calculations
Args:
run_dir (str | os.PathLike): Path to directory containing required files structure
subfolder (str): Workspace subfolder inside <run_dir>/work/<subfolder>
ept_output_file (str, optional): Path to EPT output file. Defaults to None
psept_output_file (str, optional): Path to PSEPT output file. Defaults to None
agg_full_uncertainty (bool, optional): Aggregate Full Uncertainty. Defaults to False.
agg_wheatsheaf (bool, optional): Aggregate Wheatsheaf. Defaults to False.
agg_sample_mean (bool, optional): Aggregate Sample Mean. Defaults to False.
agg_wheatsheaf_mean (bool, optional): Aggregate Wheatsheaf Mean. Defaults to False.
occ_full_uncertainty (bool, optional): Occurrence Full Uncertainty. Defaults to False.
occ_wheatsheaf (bool, optional): Occurrence Wheatsheaf. Defaults to False.
occ_sample_mean (bool, optional): Occurrence Sample Mean. Defaults to False.
occ_wheatsheaf_mean (bool, optional): Occurrence Wheatsheaf Mean. Defaults to False.
use_return_period (bool, optional): Use Return Period file. Defaults to False.
noheader (bool): Boolean value to skip header in output file
output_format (str): Output format extension. Defaults to "csv".
"""
outmap = {
"ept": {
"compute": ept_output_file is not None,
"file_path": ept_output_file,
"fmt": EPT_fmt,
"headers": EPT_headers,
"file": None,
"dtype": EPT_dtype,
},
"psept": {
"compute": psept_output_file is not None,
"file_path": psept_output_file,
"fmt": PSEPT_fmt,
"headers": PSEPT_headers,
"file": None,
"dtype": PSEPT_dtype,
},
}
output_format = "." + output_format
output_binary = output_format == ".bin"
output_parquet = output_format == ".parquet"
# Check for correct suffix
for path in [v["file_path"] for v in outmap.values()]:
if path is None:
continue
if Path(path).suffix == "": # Ignore suffix for pipes
continue
if (Path(path).suffix != output_format):
raise ValueError(f"Invalid file extension for {output_format}, got {path},")
if not all([v["compute"] for v in outmap.values()]):
logger.warning("No output files specified")
with ExitStack() as stack:
workspace_folder = Path(run_dir, "work", subfolder)
if not workspace_folder.is_dir():
raise RuntimeError(f"Error: Unable to open directory {workspace_folder}")
# work folder for lec files
lec_files_folder = Path(workspace_folder, "lec_files")
lec_files_folder.mkdir(parents=False, exist_ok=True)
# Find summary binary files
files = [file for file in workspace_folder.glob("*.bin")]
file_handles = [np.memmap(file, mode="r", dtype="u1") for file in files]
streams_in, (stream_source_type, stream_agg_type, sample_size) = init_streams_in(files, stack)
if stream_source_type != SUMMARY_STREAM_ID:
raise RuntimeError(f"Error: Not a summary stream type {stream_source_type}")
max_summary_id = get_max_summary_id(file_handles)
file_data, use_return_period, agg_wheatsheaf_mean, occ_wheatsheaf_mean = read_input_files(
run_dir,
use_return_period,
agg_wheatsheaf_mean,
occ_wheatsheaf_mean,
sample_size,
)
output_flags = [
agg_full_uncertainty,
agg_wheatsheaf,
agg_sample_mean,
agg_wheatsheaf_mean,
occ_full_uncertainty,
occ_wheatsheaf,
occ_sample_mean,
occ_wheatsheaf_mean,
]
# Check output_flags against output files
handles_agg = [AGG_FULL_UNCERTAINTY, AGG_SAMPLE_MEAN, AGG_WHEATSHEAF_MEAN]
handles_occ = [OCC_FULL_UNCERTAINTY, OCC_SAMPLE_MEAN, OCC_WHEATSHEAF_MEAN]
handles_psept = [AGG_WHEATSHEAF, OCC_WHEATSHEAF]
hasAGG = any([output_flags[idx] for idx in handles_agg])
hasOCC = any([output_flags[idx] for idx in handles_occ])
hasEPT = hasAGG or hasOCC
hasPSEPT = any([output_flags[idx] for idx in handles_psept])
outmap["ept"]["compute"] = outmap["ept"]["compute"] and hasEPT
outmap["psept"]["compute"] = outmap["psept"]["compute"] and hasPSEPT
if not outmap["ept"]["compute"]:
logger.warning("WARNING: no valid output stream to fill EPT file")
if not outmap["psept"]["compute"]:
logger.warning("WARNING: no valid output stream to fill PSEPT file")
if not (outmap["ept"]["compute"] or outmap["psept"]["compute"]):
return
# Create outloss array maps
# outloss_mean has only -1 SIDX
outloss_mean_file = Path(lec_files_folder, "lec_outloss_mean.bdat")
outloss_mean = np.memmap(
outloss_mean_file,
dtype=OUTLOSS_DTYPE,
mode="w+",
shape=(file_data["no_of_periods"] * max_summary_id),
)
# outloss_sample has all SIDXs plus -2 and -3
num_sidxs = sample_size + 2
outloss_sample_file = Path(lec_files_folder, "lec_outloss_sample.bdat")
outloss_sample = np.memmap(
outloss_sample_file,
dtype=OUTLOSS_DTYPE,
mode="w+",
shape=(file_data["no_of_periods"] * num_sidxs * max_summary_id),
)
# Array of Summary IDs found
summary_ids = np.zeros((max_summary_id), dtype=np.bool_)
# Run LEC calculations to populate outloss and summary_ids arrays
run_lec(
file_handles,
outloss_mean,
outloss_sample,
summary_ids,
file_data["occ_map"],
use_return_period,
num_sidxs,
max_summary_id,
)
# Initialise output files LEC
if output_binary:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
out_file = stack.enter_context(open(outmap[out_type]["file_path"], 'wb'))
outmap[out_type]["file"] = out_file
elif output_parquet:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
temp_out_data = np.zeros(1000000, dtype=outmap[out_type]["dtype"])
temp_df = pd.DataFrame(temp_out_data, columns=outmap[out_type]["headers"])
temp_table = pa.Table.from_pandas(temp_df)
out_file = pq.ParquetWriter(outmap[out_type]["file_path"], temp_table.schema)
outmap[out_type]["file"] = out_file
else:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
out_file = stack.enter_context(open(outmap[out_type]["file_path"], 'w'))
if not noheader:
csv_headers = ','.join(outmap[out_type]["headers"])
out_file.write(csv_headers + '\n')
outmap[out_type]["file"] = out_file
# Output aggregate reports to CSVs
agg = AggReports(
outmap,
outloss_mean,
outloss_sample,
file_data["period_weights"],
max_summary_id,
sample_size,
file_data["no_of_periods"],
num_sidxs,
use_return_period,
file_data["returnperiods"],
lec_files_folder,
output_binary,
output_parquet,
)
# Output Mean Damage Ratio
if outmap["ept"]["compute"]:
if hasOCC:
agg.output_mean_damage_ratio(OEP, OEPTVAR, "max_out_loss")
if hasAGG:
agg.output_mean_damage_ratio(AEP, AEPTVAR, "agg_out_loss")
# Output Full Uncertainty
if output_flags[OCC_FULL_UNCERTAINTY]:
agg.output_full_uncertainty(OEP, OEPTVAR, "max_out_loss")
if output_flags[AGG_FULL_UNCERTAINTY]:
agg.output_full_uncertainty(AEP, AEPTVAR, "agg_out_loss")
# Output Wheatsheaf and Wheatsheaf Mean
if output_flags[OCC_WHEATSHEAF] or output_flags[OCC_WHEATSHEAF_MEAN]:
agg.output_wheatsheaf_and_wheatsheafmean(
OEP, OEPTVAR, "max_out_loss",
output_flags[OCC_WHEATSHEAF], output_flags[OCC_WHEATSHEAF_MEAN]
)
if output_flags[AGG_WHEATSHEAF] or output_flags[AGG_WHEATSHEAF_MEAN]:
agg.output_wheatsheaf_and_wheatsheafmean(
AEP, AEPTVAR, "agg_out_loss",
output_flags[AGG_WHEATSHEAF], output_flags[AGG_WHEATSHEAF_MEAN]
)
# Output Sample Mean
if output_flags[OCC_SAMPLE_MEAN]:
agg.output_sample_mean(OEP, OEPTVAR, "max_out_loss")
if output_flags[AGG_SAMPLE_MEAN]:
agg.output_sample_mean(AEP, AEPTVAR, "agg_out_loss")
@redirect_logging(exec_name='lecpy')
[docs]
def main(
run_dir='.',
subfolder=None,
ept=None,
psept=None,
agg_full_uncertainty=False,
agg_wheatsheaf=False,
agg_sample_mean=False,
agg_wheatsheaf_mean=False,
occ_full_uncertainty=False,
occ_wheatsheaf=False,
occ_sample_mean=False,
occ_wheatsheaf_mean=False,
use_return_period=False,
noheader=False,
ext="csv",
**kwargs
):
run(
run_dir,
subfolder,
ept_output_file=ept,
psept_output_file=psept,
agg_full_uncertainty=agg_full_uncertainty,
agg_wheatsheaf=agg_wheatsheaf,
agg_sample_mean=agg_sample_mean,
agg_wheatsheaf_mean=agg_wheatsheaf_mean,
occ_full_uncertainty=occ_full_uncertainty,
occ_wheatsheaf=occ_wheatsheaf,
occ_sample_mean=occ_sample_mean,
occ_wheatsheaf_mean=occ_wheatsheaf_mean,
use_return_period=use_return_period,
noheader=noheader,
output_format=ext,
)