# elt/manager.py
import logging
import numpy as np
import numba as nb
from contextlib import ExitStack
from pathlib import Path
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from oasislmf.pytools.common.data import (MEAN_TYPE_ANALYTICAL, MEAN_TYPE_SAMPLE, oasis_int, oasis_float,
oasis_int_size, oasis_float_size, write_ndarray_to_fmt_csv)
from oasislmf.pytools.common.event_stream import (MAX_LOSS_IDX, MEAN_IDX, EventReader, init_streams_in,
mv_read, SUMMARY_STREAM_ID)
from oasislmf.pytools.common.input_files import read_event_rates, read_quantile
from oasislmf.pytools.elt.data import MELT_dtype, MELT_fmt, MELT_headers, QELT_dtype, QELT_fmt, QELT_headers, SELT_dtype, SELT_fmt, SELT_headers
from oasislmf.pytools.utils import redirect_logging
[docs]
logger = logging.getLogger(__name__)
[docs]
class ELTReader(EventReader):
def __init__(self, len_sample, compute_selt, compute_melt, compute_qelt, unique_event_ids, event_rates, intervals):
# Buffer for SELT data
[docs]
self.selt_data = np.zeros(1000000, dtype=SELT_dtype)
[docs]
self.selt_idx = np.zeros(1, dtype=np.int64)
# Buffer for MELT data
[docs]
self.melt_data = np.zeros(1000000, dtype=MELT_dtype)
[docs]
self.melt_idx = np.zeros(1, dtype=np.int64)
# Buffer for QELT data
[docs]
self.qelt_data = np.zeros(1000000, dtype=QELT_dtype)
[docs]
self.qelt_idx = np.zeros(1, dtype=np.int64)
read_buffer_state_dtype = np.dtype([
('len_sample', oasis_int),
('read_summary_set_id', np.bool_),
('reading_losses', np.bool_),
('compute_selt', np.bool_),
('compute_melt', np.bool_),
('compute_qelt', np.bool_),
('summary_id', oasis_int),
('impacted_exposure', oasis_float),
('sumloss', np.float64), # intermediary value need higher precision to not accumulate error
('sumlosssqr', np.float64), # intermediary value need higher precision to not accumulate error
('non_zero_samples', oasis_int),
('max_loss', oasis_float),
('analytical_mean', oasis_float),
('losses_vec', oasis_float, (len_sample,)),
])
[docs]
self.state = np.zeros(1, dtype=read_buffer_state_dtype)[0]
self.state["len_sample"] = len_sample
self.state["reading_losses"] = False
self.state["read_summary_set_id"] = False
self.state["compute_selt"] = compute_selt
self.state["compute_melt"] = compute_melt
self.state["compute_qelt"] = compute_qelt
[docs]
self.unique_event_ids = unique_event_ids.astype(oasis_int)
[docs]
self.event_rates = event_rates.astype(oasis_float)
[docs]
self.intervals = intervals
[docs]
self.curr_file_idx = None # Current summary file idx being read
[docs]
def get_data(self, out_type):
if out_type == "selt":
return self.selt_data
elif out_type == "melt":
return self.melt_data
elif out_type == "qelt":
return self.qelt_data
else:
raise RuntimeError(f"Unknown out_type {out_type}")
[docs]
def get_data_idx(self, out_type):
if out_type == "selt":
return self.selt_idx
elif out_type == "melt":
return self.melt_idx
elif out_type == "qelt":
return self.qelt_idx
else:
raise RuntimeError(f"Unknown out_type {out_type}")
[docs]
def read_buffer(self, byte_mv, cursor, valid_buff, event_id, item_id, file_idx):
# Check for new file idx to read summary_set_id at the start of each summary file stream
# This is not done by init_streams_in as the summary_set_id is unique to the summary_stream only
if self.curr_file_idx is not None and self.curr_file_idx != file_idx:
self.curr_file_idx = file_idx
self.state["read_summary_set_id"] = False
else:
self.curr_file_idx = file_idx
# Pass state variables to read_buffer
cursor, event_id, item_id, ret = read_buffer(
byte_mv, cursor, valid_buff, event_id, item_id, self.selt_data, self.selt_idx,
self.state, self.melt_data, self.melt_idx, self.qelt_data, self.qelt_idx, self.intervals,
self.unique_event_ids, self.event_rates
)
return cursor, event_id, item_id, ret
@nb.njit(cache=True)
def _update_selt_data(
selt_data, si, event_id, summary_id,
sample_id,
loss,
impacted_exposure,
):
selt_data[si]['EventId'] = event_id
selt_data[si]['SummaryId'] = summary_id
selt_data[si]['SampleId'] = sample_id
selt_data[si]['Loss'] = loss
selt_data[si]['ImpactedExposure'] = impacted_exposure
@nb.njit(cache=True)
def _update_melt_data(
melt_data, mi, event_id, summary_id,
sample_type,
event_rate,
chance_of_loss,
meanloss,
sdloss,
maxloss,
footprint_exposure,
mean_impacted_exposure,
max_impacted_exposure
):
melt_data[mi]['EventId'] = event_id
melt_data[mi]['SummaryId'] = summary_id
melt_data[mi]['SampleType'] = sample_type
melt_data[mi]['EventRate'] = event_rate
melt_data[mi]['ChanceOfLoss'] = chance_of_loss
melt_data[mi]['MeanLoss'] = meanloss
melt_data[mi]['SDLoss'] = sdloss
melt_data[mi]['MaxLoss'] = maxloss
melt_data[mi]["FootprintExposure"] = footprint_exposure
melt_data[mi]["MeanImpactedExposure"] = mean_impacted_exposure
melt_data[mi]["MaxImpactedExposure"] = max_impacted_exposure
@nb.njit(cache=True)
def _update_qelt_data(
qelt_data, qi, event_id, summary_id,
quantile,
loss,
):
qelt_data[qi]["EventId"] = event_id
qelt_data[qi]["SummaryId"] = summary_id
qelt_data[qi]["Quantile"] = quantile
qelt_data[qi]["Loss"] = loss
@nb.njit(cache=True, error_model="numpy")
[docs]
def read_buffer(
byte_mv, cursor, valid_buff, event_id, item_id,
selt_data, selt_idx, state,
melt_data, melt_idx,
qelt_data, qelt_idx, intervals,
unique_event_ids, event_rates
):
# Initialise idxs
last_event_id = event_id
si = selt_idx[0]
mi = melt_idx[0]
qi = qelt_idx[0]
# Helper functions
def _update_idxs():
selt_idx[0] = si
melt_idx[0] = mi
qelt_idx[0] = qi
def _reset_state():
state["reading_losses"] = False
state["non_zero_samples"] = 0
state["max_loss"] = 0
state["analytical_mean"] = 0
state["losses_vec"].fill(0)
state["sumloss"] = 0
state["sumlosssqr"] = 0
def _get_mean_and_sd_loss():
meanloss = state["sumloss"] / state["len_sample"]
if state["non_zero_samples"] > 1:
variance = (
state["sumlosssqr"] - (
(state["sumloss"] * state["sumloss"]) / state["len_sample"]
)
) / (state["len_sample"] - 1)
# Tolerance check
if variance / state["sumlosssqr"] < 1e-7:
variance = 0
sdloss = np.sqrt(variance)
else:
sdloss = 0
return meanloss, sdloss
while cursor < valid_buff:
if not state["reading_losses"]:
# Read summary header
if valid_buff - cursor >= 3 * oasis_int_size + oasis_float_size:
# Need to read summary_set_id from summary info first
if not state["read_summary_set_id"]:
_, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size)
state["read_summary_set_id"] = True
event_id_new, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size)
if last_event_id != 0 and event_id_new != last_event_id:
# New event, return to process the previous event
_update_idxs()
return cursor - oasis_int_size, last_event_id, item_id, 1
event_id = event_id_new
state["summary_id"], cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size)
state["impacted_exposure"], cursor = mv_read(byte_mv, cursor, oasis_float, oasis_float_size)
state["reading_losses"] = True
else:
break # Not enough for whole summary header
if state["reading_losses"]:
if valid_buff - cursor < oasis_int_size + oasis_float_size:
break # Not enough for whole record
# Read sidx
sidx, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size)
if sidx == 0: # sidx == 0, end of record
cursor += oasis_float_size # Read extra 0 for end of record
if state["compute_melt"]:
# Compute MELT statistics and store them
if state["non_zero_samples"] > 0:
meanloss, sdloss = _get_mean_and_sd_loss()
chance_of_loss = state["non_zero_samples"] / state["len_sample"]
mean_imp_exp = state["impacted_exposure"] * chance_of_loss
if unique_event_ids.size > 0:
idx_ev = np.searchsorted(unique_event_ids, event_id)
if idx_ev < unique_event_ids.size and unique_event_ids[idx_ev] == event_id:
event_rate = event_rates[idx_ev]
else:
event_rate = np.nan
else:
event_rate = np.nan
# Update MELT data (analytical mean)
_update_melt_data(
melt_data, mi,
event_id=event_id,
summary_id=state["summary_id"],
sample_type=MEAN_TYPE_ANALYTICAL,
event_rate=event_rate,
chance_of_loss=0,
meanloss=state["analytical_mean"],
sdloss=0,
maxloss=state["max_loss"],
footprint_exposure=state["impacted_exposure"],
mean_impacted_exposure=state["impacted_exposure"],
max_impacted_exposure=state["impacted_exposure"]
)
mi += 1
# Update MELT data (sample mean)
_update_melt_data(
melt_data, mi,
event_id=event_id,
summary_id=state["summary_id"],
sample_type=MEAN_TYPE_SAMPLE,
event_rate=event_rate,
chance_of_loss=chance_of_loss,
meanloss=meanloss,
sdloss=sdloss,
maxloss=state["max_loss"],
footprint_exposure=state["impacted_exposure"],
mean_impacted_exposure=mean_imp_exp,
max_impacted_exposure=state["impacted_exposure"]
)
mi += 1
if mi >= melt_data.shape[0]:
# Output array is full
_update_idxs()
return cursor, event_id, item_id, 1
# Update QELT data
if state["compute_qelt"]:
state["losses_vec"].sort()
# Calculate loss for per quantile interval
for i in range(len(intervals)):
q = intervals[i]["q"]
ipart = intervals[i]["integer_part"]
fpart = intervals[i]["fractional_part"]
if ipart == len(state["losses_vec"]):
loss = state["losses_vec"][ipart - 1]
else:
loss = (state["losses_vec"][ipart] - state["losses_vec"][ipart - 1]) * fpart + state["losses_vec"][ipart - 1]
_update_qelt_data(
qelt_data, qi,
event_id=event_id,
summary_id=state['summary_id'],
quantile=q,
loss=loss
)
qi += 1
if qi >= qelt_data.shape[0]:
# Output array is full
_update_idxs()
return cursor, event_id, item_id, 1
# Reset variables
_reset_state()
continue
# Read loss
loss, cursor = mv_read(byte_mv, cursor, oasis_float, oasis_float_size)
if sidx == MAX_LOSS_IDX:
state["max_loss"] = loss
else: # Normal data record
# Update SELT data
if state["compute_selt"]:
_update_selt_data(
selt_data, si,
event_id=event_id,
summary_id=state["summary_id"],
sample_id=sidx,
loss=loss,
impacted_exposure=state["impacted_exposure"]
)
si += 1
if si >= selt_data.shape[0]:
# Output array is full
_update_idxs()
return cursor, event_id, item_id, 1
if sidx > 0:
# Update MELT variables
state["sumloss"] += loss
state["sumlosssqr"] += loss * loss
if loss > 0:
state["non_zero_samples"] += 1
# Update QELT variables
state["losses_vec"][sidx - 1] = loss
if sidx == MEAN_IDX:
# Update MELT variables
state["analytical_mean"] = loss
else:
pass # Should never reach here anyways
# Update the indices
_update_idxs()
return cursor, event_id, item_id, 0
[docs]
def run(
run_dir,
files_in,
selt_output_file=None,
melt_output_file=None,
qelt_output_file=None,
noheader=False,
output_format="csv",
):
"""Runs ELT calculations
Args:
run_dir (str | os.PathLike): Path to directory containing required files structure
files_in (str | os.PathLike): Path to summary binary input file
selt_output_file (str, optional): Path to SELT output file. Defaults to None.
melt_output_file (str, optional): Path to MELT output file. Defaults to None.
qelt_output_file (str, optional): Path to QELT output file. Defaults to None.
noheader (bool): Boolean value to skip header in output file. Defaults to False.
output_format (str): Output format extension. Defaults to "csv".
"""
outmap = {
"selt": {
"compute": selt_output_file is not None,
"file_path": selt_output_file,
"fmt": SELT_fmt,
"headers": SELT_headers,
"file": None,
},
"melt": {
"compute": melt_output_file is not None,
"file_path": melt_output_file,
"fmt": MELT_fmt,
"headers": MELT_headers,
"file": None,
},
"qelt": {
"compute": qelt_output_file is not None,
"file_path": qelt_output_file,
"fmt": QELT_fmt,
"headers": QELT_headers,
"file": None,
},
}
output_format = "." + output_format
output_binary = output_format == ".bin"
output_parquet = output_format == ".parquet"
# Check for correct suffix
for path in [v["file_path"] for v in outmap.values()]:
if path is None:
continue
if Path(path).suffix == "": # Ignore suffix for pipes
continue
if (Path(path).suffix != output_format):
raise ValueError(f"Invalid file extension for {output_format}, got {path},")
if run_dir is None:
run_dir = './work'
if not all([v["compute"] for v in outmap.values()]):
logger.warning("No output files specified")
with ExitStack() as stack:
streams_in, (stream_source_type, stream_agg_type, len_sample) = init_streams_in(files_in, stack)
if stream_source_type != SUMMARY_STREAM_ID:
raise Exception(f"unsupported stream type {stream_source_type}, {stream_agg_type}")
file_data = read_input_files(
run_dir,
outmap["melt"]["compute"],
outmap["qelt"]["compute"],
len_sample
)
elt_reader = ELTReader(
len_sample,
outmap["selt"]["compute"],
outmap["melt"]["compute"],
outmap["qelt"]["compute"],
file_data["unique_event_ids"],
file_data["event_rates"],
file_data["intervals"]
)
# Initialise output files ELT
if output_binary:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
out_file = stack.enter_context(open(outmap[out_type]["file_path"], 'wb'))
outmap[out_type]["file"] = out_file
elif output_parquet:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
temp_df = pd.DataFrame(elt_reader.get_data(out_type), columns=outmap[out_type]["headers"])
temp_table = pa.Table.from_pandas(temp_df)
out_file = pq.ParquetWriter(outmap[out_type]["file_path"], temp_table.schema)
outmap[out_type]["file"] = out_file
else:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
out_file = stack.enter_context(open(outmap[out_type]["file_path"], 'w'))
if not noheader:
csv_headers = ','.join(outmap[out_type]["headers"])
out_file.write(csv_headers + '\n')
outmap[out_type]["file"] = out_file
# Process summary files
for event_id in elt_reader.read_streams(streams_in):
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
data_idx = elt_reader.get_data_idx(out_type)
data = elt_reader.get_data(out_type)[:data_idx[0]]
if outmap[out_type]["file"] is not None and data.size > 0:
if output_binary:
data.tofile(outmap[out_type]["file"])
elif output_parquet:
data_df = pd.DataFrame(data)
data_table = pa.Table.from_pandas(data_df)
outmap[out_type]["file"].write_table(data_table)
else:
write_ndarray_to_fmt_csv(
outmap[out_type]["file"],
data,
outmap[out_type]["headers"],
outmap[out_type]["fmt"]
)
data_idx[0] = 0
@redirect_logging(exec_name='eltpy')
[docs]
def main(run_dir='.', files_in=None, selt=None, melt=None, qelt=None, noheader=False, ext="csv", **kwargs):
run(
run_dir,
files_in,
selt_output_file=selt,
melt_output_file=melt,
qelt_output_file=qelt,
noheader=noheader,
output_format=ext,
)