# aal/manager.py
import logging
import numpy as np
import numba as nb
import os
from contextlib import ExitStack
from pathlib import Path
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from oasislmf.pytools.aal.data import AAL_meanonly_dtype, AAL_meanonly_fmt, AAL_meanonly_headers, AAL_dtype, AAL_fmt, AAL_headers, ALCT_dtype, ALCT_fmt, ALCT_headers
from oasislmf.pytools.common.data import (MEAN_TYPE_ANALYTICAL, MEAN_TYPE_SAMPLE, oasis_int, oasis_float,
oasis_int_size, oasis_float_size, write_ndarray_to_fmt_csv)
from oasislmf.pytools.common.event_stream import (MEAN_IDX, MAX_LOSS_IDX, NUMBER_OF_AFFECTED_RISK_IDX, SUMMARY_STREAM_ID,
init_streams_in, mv_read)
from oasislmf.pytools.common.input_files import read_occurrence, read_periods
from oasislmf.pytools.common.utils.nb_heapq import heap_pop, heap_push, init_heap
from oasislmf.pytools.utils import redirect_logging
[docs]
logger = logging.getLogger(__name__)
# Total amount of memory AAL summary index should use before raising an error (GB)
[docs]
OASIS_AAL_MEMORY = float(os.environ["OASIS_AAL_MEMORY"]) if "OASIS_AAL_MEMORY" in os.environ else 4
# Similar to aal_rec in ktools
# summary_id can be infered from index
# type can be infered from which array is using it
_AAL_REC_DTYPE = np.dtype([
('mean', np.float64),
('mean_squared', np.float64),
])
# Similar to aal_rec_period
_AAL_REC_PERIOD_DTYPE = np.dtype(
_AAL_REC_DTYPE.descr + [('mean_period', np.float64)]
)
_SUMMARIES_DTYPE = np.dtype([
("summary_id", np.int32),
("file_idx", np.int32),
("period_no", np.int32),
("file_offset", np.int64),
])
_SUMMARIES_DTYPE_size = _SUMMARIES_DTYPE.itemsize
@nb.njit(cache=True, error_model="numpy")
[docs]
def process_bin_file(
fbin,
offset,
occ_map,
summaries_data,
summaries_idx,
file_index,
):
"""Reads summary<n>.bin file event_ids and summary_ids to populate summaries_data
Args:
fbin (np.memmap): summary binary memmap
offset (int): file offset to read from
occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file
summaries_data (ndarray[_SUMMARIES_DTYPE]): Index summary data (summaries.idx data)
summaries_idx (int): current index reached in summaries_data
file_index (int): Summary bin file index
Returns:
summaries_idx (int): current index reached in summaries_data
resize_flag (bool): flag to indicate whether to resize summaries_data when full
offset (int): file offset to read from
"""
while offset < len(fbin):
cursor = offset
event_id, cursor = mv_read(fbin, cursor, oasis_int, oasis_int_size)
summary_id, cursor = mv_read(fbin, cursor, oasis_int, oasis_int_size)
if event_id not in occ_map:
# If the event_id doesn't exist in occ_map, continue with the next
offset = cursor
# Skip over Expval and losses
_, offset = mv_read(fbin, offset, oasis_float, oasis_float_size)
offset = skip_losses(fbin, offset)
continue
# Get the number of rows for the current event_id
n_rows = occ_map[event_id].shape[0]
if summaries_idx + n_rows >= len(summaries_data):
# Resize array if full
return summaries_idx, True, offset
# Now fill the summaries_data with the rows that match the current event_id
current_row = 0
for row in occ_map[event_id]:
summaries_data[summaries_idx]["summary_id"] = summary_id
summaries_data[summaries_idx]["file_idx"] = file_index
summaries_data[summaries_idx]["period_no"] = row["period_no"]
summaries_data[summaries_idx]["file_offset"] = offset
summaries_idx += 1
current_row += 1
if current_row >= n_rows:
break
offset = cursor
# Read Expval
_, offset = mv_read(fbin, offset, oasis_float, oasis_float_size)
# Skip over losses
offset = skip_losses(fbin, offset)
return summaries_idx, False, offset
[docs]
def sort_and_save_chunk(summaries_data, temp_file_path):
"""Sort a chunk of summaries data and save it to a temporary file.
Args:
summaries_data (ndarray[_SUMMARIES_DTYPE]): Indexed summary data
temp_file_path (str | os.PathLike): Path to temporary file
"""
sort_columns = ["file_idx", "period_no", "summary_id"]
sorted_indices = np.lexsort([summaries_data[col] for col in sort_columns])
sorted_chunk = summaries_data[sorted_indices]
sorted_chunk.tofile(temp_file_path)
@nb.njit(cache=True, error_model="numpy")
[docs]
def merge_sorted_chunks(memmaps):
"""
Merge sorted chunks using a k-way merge algorithm and yield next smallest row
Args:
memmaps (List[np.memmap]): List of temporary file memmaps
Yields:
smallest_row (ndarray[_SUMMARIES_DTYPE]): yields the next smallest row from sorted summaries partial files
"""
min_heap = init_heap(num_compare=3)
size = 0
# Initialize the min_heap with the first row of each memmap
for i, mmap in enumerate(memmaps):
if len(mmap) > 0:
first_row = mmap[0]
min_heap, size = heap_push(min_heap, size, np.array(
[first_row["summary_id"], first_row["period_no"], first_row["file_idx"], i, 0],
dtype=np.int32
))
# Perform the k-way merge
while size > 0:
# The min heap will store the smallest row at the top when popped
element, min_heap, size = heap_pop(min_heap, size)
file_idx = element[3]
row_num = element[4]
smallest_row = memmaps[file_idx][row_num]
yield smallest_row
# Push the next row from the same file into the heap if there are any more rows
if row_num + 1 < len(memmaps[file_idx]):
next_row = memmaps[file_idx][row_num + 1]
min_heap, size = heap_push(min_heap, size, np.array(
[next_row["summary_id"], next_row["period_no"], next_row["file_idx"], file_idx, row_num + 1],
dtype=np.int32
))
[docs]
def get_summaries_data(
path,
files_handles,
occ_map,
aal_max_memory
):
"""Gets the indexed summaries data, ordered with k-way merge if not enough memory
Args:
path (os.PathLike): Path to the workspace folder containing summary binaries
files_handles (List[np.memmap]): List of memmaps for summary files data
occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file
aal_max_memory (float): OASIS_AAL_MEMORY value (has to be passed in as numba won't update from environment variable)
Returns:
memmaps (List[np.memmap]): List of temporary file memmaps
max_summary_id (int): Max summary ID
"""
# Remove existing temp bdat files if exists
for temp_file in path.glob("indexed_summaries.part*.bdat"):
os.remove(temp_file)
buffer_size = int(((aal_max_memory * (1024**3) // _SUMMARIES_DTYPE_size)))
temp_files = []
chunk_index = 0
summaries_data = np.empty(buffer_size, dtype=_SUMMARIES_DTYPE)
summaries_idx = 0
max_summary_id = 0
for file_index, fbin in enumerate(files_handles):
offset = oasis_int_size * 3 # Summary stream header size
while True:
summaries_idx, resize_flag, offset = process_bin_file(
fbin,
offset,
occ_map,
summaries_data,
summaries_idx,
file_index,
)
# Write new summaries partial file when buffer size or end of summary file reached
if resize_flag:
temp_file_path = Path(path, f"indexed_summaries.part{chunk_index}.bdat")
summaries_data = summaries_data[:summaries_idx]
sort_and_save_chunk(summaries_data, temp_file_path)
temp_files.append(temp_file_path)
chunk_index += 1
summaries_idx = 0
max_summary_id = max(max_summary_id, np.max(summaries_data["summary_id"]))
# End of file, move to next file
if offset >= len(fbin):
break
# Write remaining summaries data to temporary file
temp_file_path = Path(path, f"indexed_summaries.part{chunk_index}.bdat")
summaries_data = summaries_data[:summaries_idx]
sort_and_save_chunk(summaries_data, temp_file_path)
max_summary_id = max(max_summary_id, np.max(summaries_data["summary_id"]))
temp_files.append(temp_file_path)
memmaps = [np.memmap(temp_file, mode="r", dtype=_SUMMARIES_DTYPE) for temp_file in temp_files]
return memmaps, max_summary_id
[docs]
def summary_index(path, occ_map, stack):
"""Index the summary binary outputs
Args:
path (os.PathLike): Path to the workspace folder containing summary binaries
occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file
stack (ExitStack): Exit stack
Returns:
files_handles (List[np.memmap]): List of memmaps for summary files data
sample_size (int): Sample size
max_summary_id (int): Max summary ID
memmaps (List[np.memmap]): List of temporary file memmaps
"""
# work folder for aal files
aal_files_folder = Path(path, "aal_files")
aal_files_folder.mkdir(parents=False, exist_ok=True)
# Find summary binary files
files = [file for file in path.glob("*.bin")]
files_handles = [np.memmap(file, mode="r", dtype="u1") for file in files]
streams_in, (stream_source_type, stream_agg_type, sample_size) = init_streams_in(files, stack)
if stream_source_type != SUMMARY_STREAM_ID:
raise RuntimeError(f"Error: Not a summary stream type {stream_source_type}")
memmaps, max_summary_id = get_summaries_data(
aal_files_folder,
files_handles,
occ_map,
OASIS_AAL_MEMORY
)
return files_handles, sample_size, max_summary_id, memmaps
[docs]
def get_num_subsets(alct, sample_size, max_summary_id):
"""Gets the number of subsets required to generates the Sample AAL np map for subset sizes up to sample_size
Example: sample_size[10], max_summary_id[2] generates following ndarray
[
# subset_size, mean, mean_squared, mean_period
[0, 0, 0], # subset_size = 1 , summary_id = 1
[0, 0, 0], # subset_size = 1 , summary_id = 2
[0, 0, 0], # subset_size = 2 , summary_id = 1
[0, 0, 0], # subset_size = 2 , summary_id = 2
[0, 0, 0], # subset_size = 4 , summary_id = 1
[0, 0, 0], # subset_size = 4 , summary_id = 2
[0, 0, 0], # subset_size = 10 , summary_id = 1, subset_size = sample_size
[0, 0, 0], # subset_size = 10 , summary_id = 2, subset_size = sample_size
]
Subset_size is implicit based on position in array, grouped by max_summary_id
So first two arrays are subset_size 2^0 = 1
The next two arrays are subset_size 2^1 = 2
The next two arrays are subset_size 2^2 = 4
The last two arrays are subset_size = sample_size = 10
Doesn't generate one with subset_size 8 as double that is larger than sample_size
Therefore this function returns 4, and the sample aal array is 4 * 2
Args:
alct (bool): Boolean for ALCT output
sample_size (int): Sample size
max_summary_id (int): Max summary ID
Returns:
num_subsets (int): Number of subsets
"""
i = 0
if alct and sample_size > 1:
while ((1 << i) + ((1 << i) - 1)) <= sample_size:
i += 1
return i + 1
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs]
def get_weighted_means(
vec_sample_sum_loss,
weighting,
sidx,
end_sidx,
):
"""Get sum of weighted mean and weighted mean_squared
Args:
vec_sample_sum_loss (ndarray[_AAL_REC_DTYPE]): Vector for sample sum losses
weighting (float): Weighting value
sidx (int): start index
end_sidx (int): end index
Returns:
weighted_mean (float): Sum weighted mean
weighted_mean_squared (float): Sum weighted mean squared
"""
weighted_mean = 0
weighted_mean_squared = 0
while sidx < end_sidx:
sumloss = vec_sample_sum_loss[sidx]
weighted_mean += sumloss * weighting
weighted_mean_squared += sumloss * sumloss * weighting
sidx += 1
return weighted_mean, weighted_mean_squared
@nb.njit(cache=True, error_model="numpy")
[docs]
def do_calc_end(
period_no,
no_of_periods,
period_weights,
sample_size,
curr_summary_id,
max_summary_id,
vec_analytical_aal,
vecs_sample_aal,
vec_used_summary_id,
vec_sample_sum_loss,
):
"""Updates Analytical and Sample AAL vectors from sample sum losses
Args:
period_no (int): Period Number
no_of_periods (int): Number of periods
period_weights (ndarray[period_weights_dtype]): Period Weights
sample_size (int): Sample Size
curr_summary_id (int): Current summary_id
max_summary_id (int): Max summary_id
vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL
vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL
vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used
vec_sample_sum_loss (ndarray[_AAL_REC_DTYPE]): Vector for sample sum losses
"""
# Get weighting
weighting = 1
if no_of_periods > 0:
# period_no in period_weights
if period_no > 0 and period_no <= no_of_periods:
weighting = period_weights[period_no - 1][1] * no_of_periods
else:
weighting = 0
# Update Analytical AAL
mean = vec_sample_sum_loss[0] # 0 index is where the analytical mean is stored
vec_used_summary_id[curr_summary_id - 1] = True
vec_analytical_aal[curr_summary_id - 1]["mean"] += mean * weighting
vec_analytical_aal[curr_summary_id - 1]["mean_squared"] += mean * mean * weighting
# Update Sample AAL
# Get relevant indexes for curr_summary_id
len_sample_aal = len(vecs_sample_aal)
num_subsets = len_sample_aal // max_summary_id
idxs = [i * max_summary_id + (curr_summary_id - 1) for i in range(num_subsets)]
# Get sample aal idx for sample_size
last_sample_aal = vecs_sample_aal[idxs[-1]]
total_mean_by_period = 0
sidx = 1
aal_idx = 0
while sidx < sample_size + 1:
# Iterate through aal_idx except the last one which is subset_size == sample_size
while aal_idx < num_subsets - 1:
curr_sample_aal = vecs_sample_aal[idxs[aal_idx]]
# Calculate the subset_size and assign to sidx
sidx = 1 << aal_idx
end_sidx = sidx << 1
# Traverse sidx == subset_size to sidx == subset_size * 2
weighted_mean, weighted_mean_squared = get_weighted_means(
vec_sample_sum_loss,
weighting,
sidx,
end_sidx
)
# Update sample size Sample AAL
last_sample_aal["mean"] += weighted_mean
last_sample_aal["mean_squared"] += weighted_mean_squared
total_mean_by_period += weighted_mean
# Update current Sample AAL
curr_sample_aal["mean"] += weighted_mean
curr_sample_aal["mean_squared"] += weighted_mean_squared
# Update current Sample AAL mean_period
curr_sample_aal["mean_period"] += weighted_mean * weighted_mean
sidx = end_sidx
aal_idx += 1
# Update sample size Sample AAL
mean = vec_sample_sum_loss[sidx]
total_mean_by_period += mean * weighting
last_sample_aal["mean"] += mean * weighting
last_sample_aal["mean_squared"] += mean * mean * weighting
sidx += 1
# Update sample size Sample AAL mean_period
last_sample_aal["mean_period"] += total_mean_by_period * total_mean_by_period
vec_sample_sum_loss.fill(0)
@nb.njit(cache=True, error_model="numpy")
[docs]
def read_losses(summary_fin, cursor, vec_sample_sum_loss):
"""Read losses from summary_fin starting at cursor, populate vec_sample_sum_loss
Args:
summary_fin (np.memmap): summary file memmap
cursor (int): data offset for reading binary files
(ndarray[_AAL_REC_DTYPE]): Vector for sample sum losses
Returns:
cursor (int): data offset for reading binary files
"""
# Max losses is sample_size + num special sidxs
valid_buff = len(summary_fin)
while True:
if valid_buff - cursor < oasis_int_size + oasis_float_size:
raise RuntimeError("Error: broken summary file, not enough data")
sidx, cursor = mv_read(summary_fin, cursor, oasis_int, oasis_int_size)
loss, cursor = mv_read(summary_fin, cursor, oasis_float, oasis_float_size)
if sidx == 0:
break
if sidx == NUMBER_OF_AFFECTED_RISK_IDX or sidx == MAX_LOSS_IDX:
continue
if sidx == MEAN_IDX:
sidx = 0
vec_sample_sum_loss[sidx] += loss
return cursor
@nb.njit(cache=True, error_model="numpy")
[docs]
def skip_losses(summary_fin, cursor):
"""Skip through losses in summary_fin starting at cursor
Args:
summary_fin (np.memmap): summary file memmap
cursor (int): data offset for reading binary files
Returns:
cursor (int): data offset for reading binary files
"""
valid_buff = len(summary_fin)
sidx = 1
while sidx:
if valid_buff - cursor < oasis_int_size + oasis_float_size:
raise RuntimeError("Error: broken summary file, not enough data")
sidx, cursor = mv_read(summary_fin, cursor, oasis_int, oasis_int_size)
cursor += oasis_float_size
return cursor
@nb.njit(cache=True, error_model="numpy")
[docs]
def run_aal(
memmaps,
no_of_periods,
period_weights,
sample_size,
max_summary_id,
files_handles,
vec_analytical_aal,
vecs_sample_aal,
vec_used_summary_id,
):
"""Run AAL calculation loop to populate vec data
Args:
memmaps (List[np.memmap]): List of temporary file memmaps
no_of_periods (int): Number of periods
period_weights (ndarray[period_weights_dtype]): Period Weights
sample_size (int): Sample Size
max_summary_id (int): Max summary_id
files_handles (List[np.memmap]): List of memmaps for summary files data
vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL
vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL
vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used
"""
if len(memmaps) == 0:
raise ValueError("File is empty or missing data")
# Index 0 is mean
vec_sample_sum_loss = np.zeros(sample_size + 1, dtype=np.float64)
last_summary_id = -1
last_period_no = -1
for line in merge_sorted_chunks(memmaps):
summary_id = line["summary_id"]
file_idx = line["file_idx"]
period_no = line["period_no"]
file_offset = line["file_offset"]
if last_summary_id != summary_id:
if last_summary_id != -1:
do_calc_end(
last_period_no,
no_of_periods,
period_weights,
sample_size,
last_summary_id,
max_summary_id,
vec_analytical_aal,
vecs_sample_aal,
vec_used_summary_id,
vec_sample_sum_loss,
)
last_period_no = period_no
last_summary_id = summary_id
elif last_period_no != period_no:
if last_period_no != -1:
do_calc_end(
last_period_no,
no_of_periods,
period_weights,
sample_size,
last_summary_id,
max_summary_id,
vec_analytical_aal,
vecs_sample_aal,
vec_used_summary_id,
vec_sample_sum_loss,
)
last_period_no = period_no
summary_fin = files_handles[file_idx]
# Read summary header values (event_id, summary_id, expval)
cursor = file_offset + (2 * oasis_int_size) + oasis_float_size
read_losses(summary_fin, cursor, vec_sample_sum_loss)
if last_summary_id != -1:
do_calc_end(
last_period_no,
no_of_periods,
period_weights,
sample_size,
last_summary_id,
max_summary_id,
vec_analytical_aal,
vecs_sample_aal,
vec_used_summary_id,
vec_sample_sum_loss,
)
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs]
def calculate_mean_stddev(
observable_sum,
observable_squared_sum,
number_of_observations
):
"""Compute the mean and standard deviation from the sum and squared sum of an observable
Args:
observable_sum (ndarray[oasis_float]): Observable sum
observable_squared_sum (ndarray[oasis_float]): Observable squared sum
number_of_observations (int | ndarray[int]): number of observations
Returns:
mean (ndarray[oasis_float]): Mean
std (ndarray[oasis_float]): Standard Deviation
"""
mean = observable_sum / number_of_observations
std = np.sqrt(
(
observable_squared_sum - (observable_sum * observable_sum)
/ number_of_observations
) / (number_of_observations - 1)
)
return mean, std
@nb.njit(cache=True, error_model="numpy")
[docs]
def get_aal_data(
vec_analytical_aal,
vecs_sample_aal,
vec_used_summary_id,
sample_size,
no_of_periods
):
"""Generate AAL csv data
Args:
vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL
vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL
vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used
sample_size (int): Sample Size
no_of_periods (int): Number of periods
Returns:
aal_data (List[Tuple]): AAL csv data
"""
aal_data = []
assert len(vec_analytical_aal) == len(vecs_sample_aal), \
f"Lengths of analytical ({len(vec_analytical_aal)}) and sample ({len(vecs_sample_aal)}) aal data differ"
mean_analytical, std_analytical = calculate_mean_stddev(
vec_analytical_aal["mean"],
vec_analytical_aal["mean_squared"],
no_of_periods,
)
mean_sample, std_sample = calculate_mean_stddev(
vecs_sample_aal["mean"],
vecs_sample_aal["mean_squared"],
no_of_periods * sample_size,
)
for summary_idx in range(len(vec_analytical_aal)):
if not vec_used_summary_id[summary_idx]:
continue
aal_data.append((summary_idx + 1, MEAN_TYPE_ANALYTICAL, mean_analytical[summary_idx], std_analytical[summary_idx]))
for summary_idx in range(len(vecs_sample_aal)):
if not vec_used_summary_id[summary_idx]:
continue
aal_data.append((summary_idx + 1, MEAN_TYPE_SAMPLE, mean_sample[summary_idx], std_sample[summary_idx]))
return aal_data
@nb.njit(cache=True, error_model="numpy")
[docs]
def get_aal_data_meanonly(
vec_analytical_aal,
vecs_sample_aal,
vec_used_summary_id,
sample_size,
no_of_periods
):
"""Generate AAL csv data
Args:
vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL
vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL
vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used
sample_size (int): Sample Size
no_of_periods (int): Number of periods
Returns:
aal_data (List[Tuple]): AAL csv data
"""
aal_data = []
assert len(vec_analytical_aal) == len(vecs_sample_aal), \
f"Lengths of analytical ({len(vec_analytical_aal)}) and sample ({len(vecs_sample_aal)}) aal data differ"
mean_analytical, std_analytical = calculate_mean_stddev(
vec_analytical_aal["mean"],
vec_analytical_aal["mean_squared"],
no_of_periods,
)
mean_sample, std_sample = calculate_mean_stddev(
vecs_sample_aal["mean"],
vecs_sample_aal["mean_squared"],
no_of_periods * sample_size,
)
# aalmeanonlycalc orders output data differently, this if condition is here to match the output to the ktools output
for summary_idx in range(len(vec_analytical_aal)):
if not vec_used_summary_id[summary_idx]:
continue
aal_data.append((summary_idx + 1, MEAN_TYPE_ANALYTICAL, mean_analytical[summary_idx]))
aal_data.append((summary_idx + 1, MEAN_TYPE_SAMPLE, mean_sample[summary_idx]))
return aal_data
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs]
def calculate_confidence_interval(std_err, confidence_level):
"""Calculate the confidence interval based on standard error and confidence level.
Args:
std_err (float): The standard error.
confidence_level (float): The confidence level (e.g., 0.95 for 95%).
Returns:
confidence interval (float): The confidence interval.
"""
# Compute p-value above 0.5
p_value = (1 + confidence_level) / 2
p_value = np.sqrt(-2 * np.log(1 - p_value))
# Approximation formula for z-value from Abramowitz & Stegun, Handbook
# of Mathematical Functions: with Formulas, Graphs, and Mathematical
# Tables, Dover Publications (1965), eq. 26.2.23
# Also see John D. Cook Consulting, https://www.johndcook.com/blog/cpp_phi_inverse/
c = np.array([2.515517, 0.802853, 0.010328])
d = np.array([1.432788, 0.189269, 0.001308])
z_value = p_value - (
((c[2] * p_value + c[1]) * p_value + c[0]) /
(((d[2] * p_value + d[1]) * p_value + d[0]) * p_value + 1)
)
# Return the confidence interval
return std_err * z_value
@nb.njit(cache=True, error_model="numpy")
[docs]
def get_alct_data(
vecs_sample_aal,
max_summary_id,
sample_size,
no_of_periods,
confidence,
):
"""Generate ALCT csv data
Args:
vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL
max_summary_id (int): Max summary_id
sample_size (int): Sample Size
no_of_periods (int): Number of periods
confidence (float): Confidence level between 0 and 1, default 0.95
Returns:
alct_data (List[List]): ALCT csv data
"""
alct_data = []
num_subsets = len(vecs_sample_aal) // max_summary_id
# Generate the subset sizes (last one is always sample_size)
subset_sizes = np.array([2 ** i for i in range(num_subsets)])
subset_sizes[-1] = sample_size
for summary_id in range(1, max_summary_id + 1):
# Get idxs for summary_id across all subset_sizes
idxs = np.array([i * max_summary_id + (summary_id - 1) for i in range(num_subsets)])
v_curr = vecs_sample_aal[idxs]
mean, std = calculate_mean_stddev(
v_curr["mean"],
v_curr["mean_squared"],
subset_sizes * no_of_periods,
)
mean_period = v_curr["mean_period"] / (subset_sizes * subset_sizes)
var_vuln = (
(v_curr["mean_squared"] - subset_sizes * mean_period)
/ (subset_sizes * no_of_periods - subset_sizes)
) / (subset_sizes * no_of_periods)
var_haz = (
subset_sizes * (mean_period - no_of_periods * mean * mean)
/ (no_of_periods - 1)
) / (subset_sizes * no_of_periods)
std_err = np.sqrt(var_vuln)
ci = calculate_confidence_interval(std_err, confidence)
std_err_haz = np.sqrt(var_haz)
std_err_vuln = np.sqrt(var_vuln)
lower_ci = np.where(ci > 0, mean - ci, 0)
upper_ci = np.where(ci > 0, mean + ci, 0)
curr_data = np.column_stack((
np.array([summary_id] * num_subsets),
mean,
std,
subset_sizes,
lower_ci,
upper_ci,
std_err, std_err / mean,
var_haz, std_err_haz, std_err_haz / mean,
var_vuln, std_err_vuln, std_err_vuln / mean,
))
for row in curr_data:
alct_data.append(row)
return alct_data
[docs]
def run(
run_dir,
subfolder,
aal_output_file=None,
alct_output_file=None,
meanonly=False,
noheader=False,
confidence=0.95,
output_format="csv",
):
"""Runs AAL calculations
Args:
run_dir (str | os.PathLike): Path to directory containing required files structure
subfolder (str): Workspace subfolder inside <run_dir>/work/<subfolder>
aal_output_file (str, optional): Path to AAL output file. Defaults to None
alct_output_file (str, optional): Path to ALCT output file. Defaults to None
meanonly (bool): Boolean value to output AAL with mean only
noheader (bool): Boolean value to skip header in output file
confidence (float): Confidence level between 0 and 1, default 0.95
output_format (str): Output format extension. Defaults to "csv".
"""
outmap = {
"aal": {
"compute": aal_output_file is not None,
"file_path": aal_output_file,
"fmt": AAL_fmt if not meanonly else AAL_meanonly_fmt,
"headers": AAL_headers if not meanonly else AAL_meanonly_headers,
"file": None,
"dtype": AAL_dtype if not meanonly else AAL_meanonly_dtype,
},
"alct": {
"compute": alct_output_file is not None,
"file_path": alct_output_file,
"fmt": ALCT_fmt,
"headers": ALCT_headers,
"file": None,
"dtype": ALCT_dtype,
},
}
output_format = "." + output_format
output_binary = output_format == ".bin"
output_parquet = output_format == ".parquet"
# Check for correct suffix
for path in [v["file_path"] for v in outmap.values()]:
if path is None:
continue
if Path(path).suffix == "": # Ignore suffix for pipes
continue
if (Path(path).suffix != output_format):
raise ValueError(f"Invalid file extension for {output_format}, got {path},")
if not all([v["compute"] for v in outmap.values()]):
logger.warning("No output files specified")
with ExitStack() as stack:
workspace_folder = Path(run_dir, "work", subfolder)
if not workspace_folder.is_dir():
raise RuntimeError(f"Error: Unable to open directory {workspace_folder}")
file_data = read_input_files(run_dir)
files_handles, sample_size, max_summary_id, memmaps = summary_index(
workspace_folder,
file_data["occ_map"],
stack
)
# aal vec are Indexed on summary_id - 1
num_subsets = get_num_subsets(outmap["alct"]["compute"], sample_size, max_summary_id)
vecs_sample_aal = np.zeros(num_subsets * max_summary_id, dtype=_AAL_REC_PERIOD_DTYPE)
vec_analytical_aal = np.zeros(max_summary_id, dtype=_AAL_REC_DTYPE)
vec_used_summary_id = np.zeros(max_summary_id, dtype=np.bool_)
# Run AAL calculations, populate above vecs
run_aal(
memmaps,
file_data["no_of_periods"],
file_data["period_weights"],
sample_size,
max_summary_id,
files_handles,
vec_analytical_aal, # unique in output_aal
vecs_sample_aal,
vec_used_summary_id,
)
# Initialise output files AAL
if output_binary:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
out_file = stack.enter_context(open(outmap[out_type]["file_path"], 'wb'))
outmap[out_type]["file"] = out_file
elif output_parquet:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
temp_out_data = np.zeros(1000000, dtype=outmap[out_type]["dtype"])
temp_df = pd.DataFrame(temp_out_data, columns=outmap[out_type]["headers"])
temp_table = pa.Table.from_pandas(temp_df)
out_file = pq.ParquetWriter(outmap[out_type]["file_path"], temp_table.schema)
outmap[out_type]["file"] = out_file
else:
for out_type in outmap:
if not outmap[out_type]["compute"]:
continue
out_file = stack.enter_context(open(outmap[out_type]["file_path"], 'w'))
if not noheader:
csv_headers = ','.join(outmap[out_type]["headers"])
out_file.write(csv_headers + '\n')
outmap[out_type]["file"] = out_file
def write_output(data, out_type):
if output_binary:
data.tofile(outmap[out_type]["file"])
elif output_parquet:
data_df = pd.DataFrame(data)
data_table = pa.Table.from_pandas(data_df)
outmap[out_type]["file"].write_table(data_table)
else:
write_ndarray_to_fmt_csv(
outmap[out_type]["file"],
data,
outmap[out_type]["headers"],
outmap[out_type]["fmt"]
)
if outmap["aal"]["compute"]:
# Get Sample AAL data for subset_size == sample_size (last group of arrays)
start_idx = (num_subsets - 1) * max_summary_id
aal_data_func = get_aal_data_meanonly if meanonly else get_aal_data
aal_data = aal_data_func(
vec_analytical_aal,
vecs_sample_aal[start_idx:],
vec_used_summary_id,
sample_size,
file_data["no_of_periods"],
)
aal_data = np.array(aal_data, dtype=outmap["aal"]["dtype"])
write_output(aal_data, "aal")
if outmap["alct"]["compute"]:
alct_data = get_alct_data(
vecs_sample_aal,
max_summary_id,
sample_size,
file_data["no_of_periods"],
confidence
)
alct_data = np.array([tuple(arr) for arr in alct_data], dtype=outmap["alct"]["dtype"])
write_output(alct_data, "alct")
@redirect_logging(exec_name='aalpy')
[docs]
def main(run_dir='.', subfolder=None, aal=None, alct=None, meanonly=False, noheader=False, confidence=0.95, ext="csv", **kwargs):
run(
run_dir,
subfolder,
aal_output_file=aal,
meanonly=meanonly,
alct_output_file=alct,
noheader=noheader,
confidence=confidence,
output_format=ext,
)