Source code for oasislmf.pytools.aal.manager

# aal/manager.py

import logging
import numpy as np
import numba as nb
import os
from contextlib import ExitStack
from pathlib import Path

from oasislmf.pytools.aal.utils import heap_pop, heap_push, init_heap, exact_binary_search
from oasislmf.pytools.common.data import (MEAN_TYPE_ANALYTICAL, MEAN_TYPE_SAMPLE, oasis_int, oasis_float,
                                          oasis_int_size, oasis_float_size, write_ndarray_to_fmt_csv)
from oasislmf.pytools.common.event_stream import (MEAN_IDX, MAX_LOSS_IDX, NUMBER_OF_AFFECTED_RISK_IDX, SUMMARY_STREAM_ID,
                                                  init_streams_in, mv_read)
from oasislmf.pytools.common.input_files import read_occurrence, read_periods
from oasislmf.pytools.utils import redirect_logging


[docs] logger = logging.getLogger(__name__)
# Total amount of memory AAL summary index should use before raising an error (GB)
[docs] OASIS_AAL_MEMORY = float(os.environ["OASIS_AAL_MEMORY"]) if "OASIS_AAL_MEMORY" in os.environ else 4
# Similar to aal_rec in ktools # summary_id can be infered from index # type can be infered from which array is using it _AAL_REC_DTYPE = np.dtype([ ('mean', np.float64), ('mean_squared', np.float64), ]) # Similar to aal_rec_period _AAL_REC_PERIOD_DTYPE = np.dtype( _AAL_REC_DTYPE.descr + [('mean_period', np.float64)] ) _SUMMARIES_DTYPE = np.dtype([ ("summary_id", np.int32), ("file_idx", np.int32), ("period_no", np.int32), ("file_offset", np.int64), ]) _SUMMARIES_DTYPE_size = _SUMMARIES_DTYPE.itemsize
[docs] AAL_output = [ ('SummaryId', oasis_int, '%d'), ('SampleType', oasis_int, '%d'), ('MeanLoss', oasis_float, '%.6f'), ('SDLoss', oasis_float, '%.6f'), ]
[docs] ALCT_output = [ ("SummaryId", oasis_float, '%d'), ("MeanLoss", oasis_float, '%.6f'), ("SDLoss", oasis_float, '%.6f'), ("SampleSize", oasis_float, '%d'), ("LowerCI", oasis_float, '%.6f'), ("UpperCI", oasis_float, '%.6f'), ("StandardError", oasis_float, '%.6f'), ("RelativeError", oasis_float, '%.6f'), ("VarElementHaz", oasis_float, '%.6f'), ("StandardErrorHaz", oasis_float, '%.6f'), ("RelativeErrorHaz", oasis_float, '%.6f'), ("VarElementVuln", oasis_float, '%.6f'), ("StandardErrorVuln", oasis_float, '%.6f'), ("RelativeErrorVuln", oasis_float, '%.6f'), ]
@nb.njit(cache=True, error_model="numpy")
[docs] def process_bin_file( fbin, offset, occ_map, unique_event_ids, event_id_counts, summaries_data, summaries_idx, file_index, sample_size ): """Reads summary<n>.bin file event_ids and summary_ids to populate summaries_data Args: fbin (np.memmap): summary binary memmap offset (int): file offset to read from occ_map (ndarray[occ_map_dtype]): numpy map of event_id, period_no, occ_date_id from the occurrence file unique_event_ids (ndarray[np.int32]): List of unique event_ids event_id_counts (ndarray[np.int32]): List of the counts of occurrences for each unique event_id in occ_map summaries_data (ndarray[_SUMMARIES_DTYPE]): Index summary data (summaries.idx data) summaries_idx (int): current index reached in summaries_data file_index (int): Summary bin file index sample_size (int): Sample size Returns: summaries_idx (int): current index reached in summaries_data resize_flag (bool): flag to indicate whether to resize summaries_data when full offset (int): file offset to read from """ while offset < len(fbin): cursor = offset event_id, cursor = mv_read(fbin, cursor, oasis_int, oasis_int_size) summary_id, cursor = mv_read(fbin, cursor, oasis_int, oasis_int_size) # Find the index of the event_id in the unique_event_ids array event_id_index = exact_binary_search(unique_event_ids, event_id) if event_id_index == len(unique_event_ids): # If the event_id doesn't exist in occ_map, continue with the next offset = cursor # Skip over Expval and losses _, offset = mv_read(fbin, offset, oasis_float, oasis_float_size) offset = skip_losses(fbin, offset) continue # Get the number of rows for the current event_id n_rows = event_id_counts[event_id_index] if summaries_idx + n_rows >= len(summaries_data): # Resize array if full return summaries_idx, True, offset # Now fill the summaries_data with the rows that match the current event_id current_row = 0 for row in occ_map: if row["event_id"] == event_id: summaries_data[summaries_idx]["summary_id"] = summary_id summaries_data[summaries_idx]["file_idx"] = file_index summaries_data[summaries_idx]["period_no"] = row["period_no"] summaries_data[summaries_idx]["file_offset"] = offset summaries_idx += 1 current_row += 1 if current_row >= n_rows: break offset = cursor # Read Expval _, offset = mv_read(fbin, offset, oasis_float, oasis_float_size) # Skip over losses offset = skip_losses(fbin, offset) return summaries_idx, False, offset
[docs] def sort_and_save_chunk(summaries_data, temp_file_path): """Sort a chunk of summaries data and save it to a temporary file. Args: summaries_data (ndarray[_SUMMARIES_DTYPE]): Indexed summary data temp_file_path (str | os.PathLike): Path to temporary file """ sort_columns = ["file_idx", "period_no", "summary_id"] sorted_indices = np.lexsort([summaries_data[col] for col in sort_columns]) sorted_chunk = summaries_data[sorted_indices] sorted_chunk.tofile(temp_file_path)
@nb.njit(cache=True, error_model="numpy")
[docs] def merge_sorted_chunks(memmaps): """ Merge sorted chunks using a k-way merge algorithm and yield next smallest row Args: memmaps (List[np.memmap]): List of temporary file memmaps Yields: smallest_row (ndarray[_SUMMARIES_DTYPE]): yields the next smallest row from sorted summaries partial files """ min_heap = init_heap() size = 0 # Initialize the min_heap with the first row of each memmap for i, mmap in enumerate(memmaps): if len(mmap) > 0: first_row = mmap[0] min_heap, size = heap_push(min_heap, size, np.array( [first_row["summary_id"], first_row["period_no"], first_row["file_idx"], i, 0], dtype=np.int32 )) # Perform the k-way merge while size > 0: # The min heap will store the smallest row at the top when popped element, min_heap, size = heap_pop(min_heap, size) file_idx = element[3] row_num = element[4] smallest_row = memmaps[file_idx][row_num] yield smallest_row # Push the next row from the same file into the heap if there are any more rows if row_num + 1 < len(memmaps[file_idx]): next_row = memmaps[file_idx][row_num + 1] min_heap, size = heap_push(min_heap, size, np.array( [next_row["summary_id"], next_row["period_no"], next_row["file_idx"], file_idx, row_num + 1], dtype=np.int32 ))
[docs] def get_summaries_data( path, files_handles, occ_map, unique_event_ids, event_id_counts, sample_size, aal_max_memory ): """Gets the indexed summaries data, ordered with k-way merge if not enough memory Args: path (os.PathLike): Path to the workspace folder containing summary binaries files_handles (List[np.memmap]): List of memmaps for summary files data occ_map (ndarray[occ_map_dtype]): numpy map of event_id, period_no, occ_date_id from the occurrence file unique_event_ids (ndarray[np.int32]): List of unique event_ids event_id_counts (ndarray[np.int32]): List of the counts of occurrences for each unique event_id in occ_map sample_size (int): Sample size aal_max_memory (float): OASIS_AAL_MEMORY value (has to be passed in as numba won't update from environment variable) Returns: memmaps (List[np.memmap]): List of temporary file memmaps max_summary_id (int): Max summary ID """ # Remove existing temp bdat files if exists for temp_file in path.glob("indexed_summaries.part*.bdat"): os.remove(temp_file) buffer_size = int(((aal_max_memory * (1024**3) // _SUMMARIES_DTYPE_size))) temp_files = [] chunk_index = 0 summaries_data = np.empty(buffer_size, dtype=_SUMMARIES_DTYPE) summaries_idx = 0 max_summary_id = 0 for file_index, fbin in enumerate(files_handles): offset = oasis_int_size * 3 # Summary stream header size while True: summaries_idx, resize_flag, offset = process_bin_file( fbin, offset, occ_map, unique_event_ids, event_id_counts, summaries_data, summaries_idx, file_index, sample_size ) # Write new summaries partial file when buffer size or end of summary file reached if resize_flag: temp_file_path = Path(path, f"indexed_summaries.part{chunk_index}.bdat") summaries_data = summaries_data[:summaries_idx] sort_and_save_chunk(summaries_data, temp_file_path) temp_files.append(temp_file_path) chunk_index += 1 summaries_idx = 0 max_summary_id = max(max_summary_id, np.max(summaries_data["summary_id"])) # End of file, move to next file if offset >= len(fbin): break # Write remaining summaries data to temporary file temp_file_path = Path(path, f"indexed_summaries.part{chunk_index}.bdat") summaries_data = summaries_data[:summaries_idx] sort_and_save_chunk(summaries_data, temp_file_path) max_summary_id = max(max_summary_id, np.max(summaries_data["summary_id"])) temp_files.append(temp_file_path) memmaps = [np.memmap(temp_file, mode="r", dtype=_SUMMARIES_DTYPE) for temp_file in temp_files] return memmaps, max_summary_id
[docs] def summary_index(path, occ_map, unique_event_ids, event_id_counts, stack): """Index the summary binary outputs Args: path (os.PathLike): Path to the workspace folder containing summary binaries occ_map (ndarray[occ_map_dtype]): numpy map of event_id, period_no, occ_date_id from the occurrence file unique_event_ids (ndarray[np.int32]): List of unique event_ids event_id_counts (ndarray[np.int32]): List of the counts of occurrences for each unique event_id in occ_map stack (ExitStack): Exit stack Returns: files_handles (List[np.memmap]): List of memmaps for summary files data sample_size (int): Sample size max_summary_id (int): Max summary ID memmaps (List[np.memmap]): List of temporary file memmaps """ # work folder for aal files aal_files_folder = Path(path, "aal_files") aal_files_folder.mkdir(parents=False, exist_ok=True) # Find summary binary files files = [file for file in path.glob("*.bin")] files_handles = [np.memmap(file, mode="r", dtype="u1") for file in files] streams_in, (stream_source_type, stream_agg_type, sample_size) = init_streams_in(files, stack) if stream_source_type != SUMMARY_STREAM_ID: raise RuntimeError(f"Error: Not a summary stream type {stream_source_type}") memmaps, max_summary_id = get_summaries_data( aal_files_folder, files_handles, occ_map, unique_event_ids, event_id_counts, sample_size, OASIS_AAL_MEMORY ) return files_handles, sample_size, max_summary_id, memmaps
[docs] def read_input_files(run_dir): """Reads all input files and returns a dict of relevant data Args: run_dir (str | os.PathLike): Path to directory containing required files structure Returns: file_data (Dict[str, Any]): A dict of relevent data extracted from files """ occ_map, date_algorithm, granular_date, no_of_periods = read_occurrence(Path(run_dir, "input")) period_weights = read_periods(no_of_periods, Path(run_dir, "input")) # Create a list of occurrences for each event_id # Creates an array that stores indices corresponding # to rows in occ_map that match each event_id event_ids = [row["event_id"] for row in occ_map] unique_event_ids = np.sort(np.unique(event_ids)) # Precompute the row counts for each event_id event_id_counts = np.zeros(len(unique_event_ids), dtype=np.int32) for row in occ_map: event_id_index = np.where(unique_event_ids == row["event_id"])[0][0] event_id_counts[event_id_index] += 1 file_data = { "occ_map": occ_map, "date_algorithm": date_algorithm, "granular_date": granular_date, "no_of_periods": no_of_periods, "period_weights": period_weights, "unique_event_ids": unique_event_ids, "event_id_counts": event_id_counts, } return file_data
[docs] def get_num_subsets(alct, sample_size, max_summary_id): """Gets the number of subsets required to generates the Sample AAL np map for subset sizes up to sample_size Example: sample_size[10], max_summary_id[2] generates following ndarray [ # subset_size, mean, mean_squared, mean_period [0, 0, 0], # subset_size = 1 , summary_id = 1 [0, 0, 0], # subset_size = 1 , summary_id = 2 [0, 0, 0], # subset_size = 2 , summary_id = 1 [0, 0, 0], # subset_size = 2 , summary_id = 2 [0, 0, 0], # subset_size = 4 , summary_id = 1 [0, 0, 0], # subset_size = 4 , summary_id = 2 [0, 0, 0], # subset_size = 10 , summary_id = 1, subset_size = sample_size [0, 0, 0], # subset_size = 10 , summary_id = 2, subset_size = sample_size ] Subset_size is implicit based on position in array, grouped by max_summary_id So first two arrays are subset_size 2^0 = 1 The next two arrays are subset_size 2^1 = 2 The next two arrays are subset_size 2^2 = 4 The last two arrays are subset_size = sample_size = 10 Doesn't generate one with subset_size 8 as double that is larger than sample_size Therefore this function returns 4, and the sample aal array is 4 * 2 Args: alct (bool): Boolean for ALCT output sample_size (int): Sample size max_summary_id (int): Max summary ID Returns: num_subsets (int): Number of subsets """ i = 0 if alct and sample_size > 1: while ((1 << i) + ((1 << i) - 1)) <= sample_size: i += 1 return i + 1
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs] def get_weighted_means( vec_sample_sum_loss, weighting, sidx, end_sidx, ): """Get sum of weighted mean and weighted mean_squared Args: vec_sample_sum_loss (ndarray[_AAL_REC_DTYPE]): Vector for sample sum losses weighting (float): Weighting value sidx (int): start index end_sidx (int): end index Returns: weighted_mean (float): Sum weighted mean weighted_mean_squared (float): Sum weighted mean squared """ weighted_mean = 0 weighted_mean_squared = 0 while sidx < end_sidx: sumloss = vec_sample_sum_loss[sidx] weighted_mean += sumloss * weighting weighted_mean_squared += sumloss * sumloss * weighting sidx += 1 return weighted_mean, weighted_mean_squared
@nb.njit(cache=True, error_model="numpy")
[docs] def do_calc_end( period_no, no_of_periods, period_weights, sample_size, curr_summary_id, max_summary_id, vec_analytical_aal, vecs_sample_aal, vec_used_summary_id, vec_sample_sum_loss, ): """Updates Analytical and Sample AAL vectors from sample sum losses Args: period_no (int): Period Number no_of_periods (int): Number of periods period_weights (ndarray[period_weights_dtype]): Period Weights sample_size (int): Sample Size curr_summary_id (int): Current summary_id max_summary_id (int): Max summary_id vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used vec_sample_sum_loss (ndarray[_AAL_REC_DTYPE]): Vector for sample sum losses """ # Get weighting weighting = 1 if no_of_periods > 0: # period_no in period_weights if period_no > 0 and period_no <= no_of_periods: weighting = period_weights[period_no - 1][1] * no_of_periods else: weighting = 0 # Update Analytical AAL mean = vec_sample_sum_loss[0] # 0 index is where the analytical mean is stored vec_used_summary_id[curr_summary_id - 1] = True vec_analytical_aal[curr_summary_id - 1]["mean"] += mean * weighting vec_analytical_aal[curr_summary_id - 1]["mean_squared"] += mean * mean * weighting # Update Sample AAL # Get relevant indexes for curr_summary_id len_sample_aal = len(vecs_sample_aal) num_subsets = len_sample_aal // max_summary_id idxs = [i * max_summary_id + (curr_summary_id - 1) for i in range(num_subsets)] # Get sample aal idx for sample_size last_sample_aal = vecs_sample_aal[idxs[-1]] total_mean_by_period = 0 sidx = 1 aal_idx = 0 while sidx < sample_size + 1: # Iterate through aal_idx except the last one which is subset_size == sample_size while aal_idx < num_subsets - 1: curr_sample_aal = vecs_sample_aal[idxs[aal_idx]] # Calculate the subset_size and assign to sidx sidx = 1 << aal_idx end_sidx = sidx << 1 # Traverse sidx == subset_size to sidx == subset_size * 2 weighted_mean, weighted_mean_squared = get_weighted_means( vec_sample_sum_loss, weighting, sidx, end_sidx ) # Update sample size Sample AAL last_sample_aal["mean"] += weighted_mean last_sample_aal["mean_squared"] += weighted_mean_squared total_mean_by_period += weighted_mean # Update current Sample AAL curr_sample_aal["mean"] += weighted_mean curr_sample_aal["mean_squared"] += weighted_mean_squared # Update current Sample AAL mean_period curr_sample_aal["mean_period"] += weighted_mean * weighted_mean sidx = end_sidx aal_idx += 1 # Update sample size Sample AAL mean = vec_sample_sum_loss[sidx] total_mean_by_period += mean * weighting last_sample_aal["mean"] += mean * weighting last_sample_aal["mean_squared"] += mean * mean * weighting sidx += 1 # Update sample size Sample AAL mean_period last_sample_aal["mean_period"] += total_mean_by_period * total_mean_by_period vec_sample_sum_loss.fill(0)
@nb.njit(cache=True, error_model="numpy")
[docs] def read_losses(summary_fin, cursor, vec_sample_sum_loss): """Read losses from summary_fin starting at cursor, populate vec_sample_sum_loss Args: summary_fin (np.memmap): summary file memmap cursor (int): data offset for reading binary files (ndarray[_AAL_REC_DTYPE]): Vector for sample sum losses Returns: cursor (int): data offset for reading binary files """ # Max losses is sample_size + num special sidxs valid_buff = len(summary_fin) while True: if valid_buff - cursor < oasis_int_size + oasis_float_size: raise RuntimeError("Error: broken summary file, not enough data") sidx, cursor = mv_read(summary_fin, cursor, oasis_int, oasis_int_size) loss, cursor = mv_read(summary_fin, cursor, oasis_float, oasis_float_size) if sidx == 0: break if sidx == NUMBER_OF_AFFECTED_RISK_IDX or sidx == MAX_LOSS_IDX: continue if sidx == MEAN_IDX: sidx = 0 vec_sample_sum_loss[sidx] += loss return cursor
@nb.njit(cache=True, error_model="numpy")
[docs] def skip_losses(summary_fin, cursor): """Skip through losses in summary_fin starting at cursor Args: summary_fin (np.memmap): summary file memmap cursor (int): data offset for reading binary files Returns: cursor (int): data offset for reading binary files """ valid_buff = len(summary_fin) sidx = 1 while sidx: if valid_buff - cursor < oasis_int_size + oasis_float_size: raise RuntimeError("Error: broken summary file, not enough data") sidx, cursor = mv_read(summary_fin, cursor, oasis_int, oasis_int_size) cursor += oasis_float_size return cursor
@nb.njit(cache=True, error_model="numpy")
[docs] def run_aal( memmaps, no_of_periods, period_weights, sample_size, max_summary_id, files_handles, vec_analytical_aal, vecs_sample_aal, vec_used_summary_id, ): """Run AAL calculation loop to populate vec data Args: memmaps (List[np.memmap]): List of temporary file memmaps no_of_periods (int): Number of periods period_weights (ndarray[period_weights_dtype]): Period Weights sample_size (int): Sample Size max_summary_id (int): Max summary_id files_handles (List[np.memmap]): List of memmaps for summary files data vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used """ if len(memmaps) == 0: raise ValueError("File is empty or missing data") # Index 0 is mean vec_sample_sum_loss = np.zeros(sample_size + 1, dtype=np.float64) last_summary_id = -1 last_period_no = -1 for line in merge_sorted_chunks(memmaps): summary_id = line["summary_id"] file_idx = line["file_idx"] period_no = line["period_no"] file_offset = line["file_offset"] if last_summary_id != summary_id: if last_summary_id != -1: do_calc_end( last_period_no, no_of_periods, period_weights, sample_size, last_summary_id, max_summary_id, vec_analytical_aal, vecs_sample_aal, vec_used_summary_id, vec_sample_sum_loss, ) last_period_no = period_no last_summary_id = summary_id elif last_period_no != period_no: if last_period_no != -1: do_calc_end( last_period_no, no_of_periods, period_weights, sample_size, last_summary_id, max_summary_id, vec_analytical_aal, vecs_sample_aal, vec_used_summary_id, vec_sample_sum_loss, ) last_period_no = period_no summary_fin = files_handles[file_idx] # Read summary header values (event_id, summary_id, expval) cursor = file_offset + (2 * oasis_int_size) + oasis_float_size read_losses(summary_fin, cursor, vec_sample_sum_loss) if last_summary_id != -1: do_calc_end( last_period_no, no_of_periods, period_weights, sample_size, last_summary_id, max_summary_id, vec_analytical_aal, vecs_sample_aal, vec_used_summary_id, vec_sample_sum_loss, )
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs] def calculate_mean_stddev( observable_sum, observable_squared_sum, number_of_observations ): """Compute the mean and standard deviation from the sum and squared sum of an observable Args: observable_sum (ndarray[oasis_float]): Observable sum observable_squared_sum (ndarray[oasis_float]): Observable squared sum number_of_observations (int | ndarray[int]): number of observations Returns: mean (ndarray[oasis_float]): Mean std (ndarray[oasis_float]): Standard Deviation """ mean = observable_sum / number_of_observations std = np.sqrt( ( observable_squared_sum - (observable_sum * observable_sum) / number_of_observations ) / (number_of_observations - 1) ) return mean, std
@nb.njit(cache=True, error_model="numpy")
[docs] def get_aal_data( vec_analytical_aal, vecs_sample_aal, vec_used_summary_id, sample_size, no_of_periods ): """Generate AAL csv data Args: vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used sample_size (int): Sample Size no_of_periods (int): Number of periods Returns: aal_data (List[Tuple]): AAL csv data """ aal_data = [] assert len(vec_analytical_aal) == len(vecs_sample_aal), \ f"Lengths of analytical ({len(vec_analytical_aal)}) and sample ({len(vecs_sample_aal)}) aal data differ" mean_analytical, std_analytical = calculate_mean_stddev( vec_analytical_aal["mean"], vec_analytical_aal["mean_squared"], no_of_periods, ) mean_sample, std_sample = calculate_mean_stddev( vecs_sample_aal["mean"], vecs_sample_aal["mean_squared"], no_of_periods * sample_size, ) for summary_idx in range(len(vec_analytical_aal)): if not vec_used_summary_id[summary_idx]: continue aal_data.append((summary_idx + 1, MEAN_TYPE_ANALYTICAL, mean_analytical[summary_idx], std_analytical[summary_idx])) for summary_idx in range(len(vecs_sample_aal)): if not vec_used_summary_id[summary_idx]: continue aal_data.append((summary_idx + 1, MEAN_TYPE_SAMPLE, mean_sample[summary_idx], std_sample[summary_idx])) return aal_data
@nb.njit(cache=True, error_model="numpy")
[docs] def get_aal_data_meanonly( vec_analytical_aal, vecs_sample_aal, vec_used_summary_id, sample_size, no_of_periods ): """Generate AAL csv data Args: vec_analytical_aal (ndarray[_AAL_REC_DTYPE]): Vector for Analytical AAL vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL vec_used_summary_id (ndarray[bool]): vector to store if summary_id is used sample_size (int): Sample Size no_of_periods (int): Number of periods Returns: aal_data (List[Tuple]): AAL csv data """ aal_data = [] assert len(vec_analytical_aal) == len(vecs_sample_aal), \ f"Lengths of analytical ({len(vec_analytical_aal)}) and sample ({len(vecs_sample_aal)}) aal data differ" mean_analytical, std_analytical = calculate_mean_stddev( vec_analytical_aal["mean"], vec_analytical_aal["mean_squared"], no_of_periods, ) mean_sample, std_sample = calculate_mean_stddev( vecs_sample_aal["mean"], vecs_sample_aal["mean_squared"], no_of_periods * sample_size, ) # aalmeanonlycalc orders output data differently, this if condition is here to match the output to the ktools output for summary_idx in range(len(vec_analytical_aal)): if not vec_used_summary_id[summary_idx]: continue aal_data.append((summary_idx + 1, MEAN_TYPE_ANALYTICAL, mean_analytical[summary_idx])) aal_data.append((summary_idx + 1, MEAN_TYPE_SAMPLE, mean_sample[summary_idx])) return aal_data
@nb.njit(cache=True, fastmath=True, error_model="numpy")
[docs] def calculate_confidence_interval(std_err, confidence_level): """Calculate the confidence interval based on standard error and confidence level. Args: std_err (float): The standard error. confidence_level (float): The confidence level (e.g., 0.95 for 95%). Returns: confidence interval (float): The confidence interval. """ # Compute p-value above 0.5 p_value = (1 + confidence_level) / 2 p_value = np.sqrt(-2 * np.log(1 - p_value)) # Approximation formula for z-value from Abramowitz & Stegun, Handbook # of Mathematical Functions: with Formulas, Graphs, and Mathematical # Tables, Dover Publications (1965), eq. 26.2.23 # Also see John D. Cook Consulting, https://www.johndcook.com/blog/cpp_phi_inverse/ c = np.array([2.515517, 0.802853, 0.010328]) d = np.array([1.432788, 0.189269, 0.001308]) z_value = p_value - ( ((c[2] * p_value + c[1]) * p_value + c[0]) / (((d[2] * p_value + d[1]) * p_value + d[0]) * p_value + 1) ) # Return the confidence interval return std_err * z_value
@nb.njit(cache=True, error_model="numpy")
[docs] def get_alct_data( vecs_sample_aal, max_summary_id, sample_size, no_of_periods, confidence, ): """Generate ALCT csv data Args: vecs_sample_aal (ndarray[_AAL_REC_PERIODS_DTYPE]): Vector for Sample AAL max_summary_id (int): Max summary_id sample_size (int): Sample Size no_of_periods (int): Number of periods confidence (float): Confidence level between 0 and 1, default 0.95 Returns: alct_data (List[List]): ALCT csv data """ alct_data = [] num_subsets = len(vecs_sample_aal) // max_summary_id # Generate the subset sizes (last one is always sample_size) subset_sizes = np.array([2 ** i for i in range(num_subsets)]) subset_sizes[-1] = sample_size for summary_id in range(1, max_summary_id + 1): # Get idxs for summary_id across all subset_sizes idxs = np.array([i * max_summary_id + (summary_id - 1) for i in range(num_subsets)]) v_curr = vecs_sample_aal[idxs] mean, std = calculate_mean_stddev( v_curr["mean"], v_curr["mean_squared"], subset_sizes * no_of_periods, ) mean_period = v_curr["mean_period"] / (subset_sizes * subset_sizes) var_vuln = ( (v_curr["mean_squared"] - subset_sizes * mean_period) / (subset_sizes * no_of_periods - subset_sizes) ) / (subset_sizes * no_of_periods) var_haz = ( subset_sizes * (mean_period - no_of_periods * mean * mean) / (no_of_periods - 1) ) / (subset_sizes * no_of_periods) std_err = np.sqrt(var_vuln) ci = calculate_confidence_interval(std_err, confidence) std_err_haz = np.sqrt(var_haz) std_err_vuln = np.sqrt(var_vuln) lower_ci = np.where(ci > 0, mean - ci, 0) upper_ci = np.where(ci > 0, mean + ci, 0) curr_data = np.column_stack(( np.array([summary_id] * num_subsets), mean, std, subset_sizes, lower_ci, upper_ci, std_err, std_err / mean, var_haz, std_err_haz, std_err_haz / mean, var_vuln, std_err_vuln, std_err_vuln / mean, )) for row in curr_data: alct_data.append(row) return alct_data
[docs] def run(run_dir, subfolder, aal_output_file=None, alct_output_file=None, meanonly=False, noheader=False, confidence=0.95): """Runs AAL calculations Args: run_dir (str | os.PathLike): Path to directory containing required files structure subfolder (str): Workspace subfolder inside <run_dir>/work/<subfolder> aal_output_file (str, optional): Path to AAL output file. Defaults to None alct_output_file (str, optional): Path to ALCT output file. Defaults to None meanonly (bool): Boolean value to output AAL with mean only noheader (bool): Boolean value to skip header in output file confidence (float): Confidence level between 0 and 1, default 0.95 """ output_aal = aal_output_file is not None output_alct = alct_output_file is not None with ExitStack() as stack: workspace_folder = Path(run_dir, "work", subfolder) if not workspace_folder.is_dir(): raise RuntimeError(f"Error: Unable to open directory {workspace_folder}") file_data = read_input_files(run_dir) files_handles, sample_size, max_summary_id, memmaps = summary_index( workspace_folder, file_data["occ_map"], file_data["unique_event_ids"], file_data["event_id_counts"], stack ) # aal vec are Indexed on summary_id - 1 num_subsets = get_num_subsets(output_alct, sample_size, max_summary_id) vecs_sample_aal = np.zeros(num_subsets * max_summary_id, dtype=_AAL_REC_PERIOD_DTYPE) vec_analytical_aal = np.zeros(max_summary_id, dtype=_AAL_REC_DTYPE) vec_used_summary_id = np.zeros(max_summary_id, dtype=np.bool_) # Run AAL calculations, populate above vecs run_aal( memmaps, file_data["no_of_periods"], file_data["period_weights"], sample_size, max_summary_id, files_handles, vec_analytical_aal, # unique in output_aal vecs_sample_aal, vec_used_summary_id, ) # Initialise csv column names for output files output_files = {} if output_aal: aal_file = stack.enter_context(open(aal_output_file, "w")) if not noheader: if not meanonly: AAL_headers = [c[0] for c in AAL_output] csv_headers = ",".join(AAL_headers) aal_file.write(csv_headers + "\n") else: AAL_headers = [c[0] for c in AAL_output if c[0] != "SDLoss"] csv_headers = ",".join(AAL_headers) aal_file.write(csv_headers + "\n") output_files["aal"] = aal_file if output_alct: alct_file = stack.enter_context(open(alct_output_file, "w")) if not noheader: if not meanonly: ALCT_headers = [c[0] for c in ALCT_output] csv_headers = ",".join(ALCT_headers) alct_file.write(csv_headers + "\n") output_files["alct"] = alct_file # Output file data if not meanonly: AAL_fmt = ','.join([c[2] for c in AAL_output]) AAL_dtype = np.dtype([(c[0], c[1]) for c in AAL_output]) else: AAL_fmt = ','.join([c[2] for c in AAL_output if c[0] != "SDLoss"]) AAL_dtype = np.dtype([(c[0], c[1]) for c in AAL_output if c[0] != "SDLoss"]) ALCT_fmt = ','.join([c[2] for c in ALCT_output]) ALCT_dtype = np.dtype([(c[0], c[1]) for c in ALCT_output]) if output_aal: # Get Sample AAL data for subset_size == sample_size (last group of arrays) start_idx = (num_subsets - 1) * max_summary_id if not meanonly: aal_data = get_aal_data( vec_analytical_aal, vecs_sample_aal[start_idx:], vec_used_summary_id, sample_size, file_data["no_of_periods"], ) else: aal_data = get_aal_data_meanonly( vec_analytical_aal, vecs_sample_aal[start_idx:], vec_used_summary_id, sample_size, file_data["no_of_periods"], ) write_ndarray_to_fmt_csv(output_files["aal"], np.array(aal_data, dtype=AAL_dtype), AAL_headers, AAL_fmt) if output_alct: alct_data = get_alct_data( vecs_sample_aal, max_summary_id, sample_size, file_data["no_of_periods"], confidence ) write_ndarray_to_fmt_csv(output_files["alct"], np.array([tuple(arr) for arr in alct_data], dtype=ALCT_dtype), ALCT_headers, ALCT_fmt)
@redirect_logging(exec_name='aalpy')
[docs] def main(run_dir='.', subfolder=None, aal=None, alct=None, meanonly=False, noheader=False, confidence=0.95, **kwargs): run( run_dir, subfolder, aal_output_file=aal, meanonly=meanonly, alct_output_file=alct, noheader=noheader, confidence=confidence, )