Source code for oasislmf.pytools.gul.manager

"""
This file is the entry point for the gul command for the package.

"""
import logging
import os
import sys
from contextlib import ExitStack
from select import select

import numpy as np
import pandas as pd
from numba import njit
from numba.typed import Dict, List

from oasis_data_manager.filestore.config import get_storage_from_config_path
from oasislmf.pytools.common.event_stream import (PIPE_CAPACITY, mv_write_item_header, mv_write_sidx_loss, mv_write_delimiter,
                                                  stream_info_to_bytes, LOSS_STREAM_ID, ITEM_STREAM)
from oasislmf.pytools.data_layer.oasis_files.correlations import Correlation, CorrelationsData
from oasislmf.pytools.getmodel.common import Keys, oasis_float
from oasislmf.pytools.getmodel.manager import Item, get_damage_bins
from oasislmf.pytools.gul.common import (SPECIAL_SIDX, CHANCE_OF_LOSS_IDX, ITEM_MAP_KEY_TYPE,
                                         ITEM_MAP_VALUE_TYPE, MAX_LOSS_IDX,
                                         MEAN_IDX, NUM_IDX, STD_DEV_IDX,
                                         TIV_IDX, coverage_type,
                                         gulSampleslevelHeader_size,
                                         gulSampleslevelRec_size)
from oasislmf.pytools.gul.core import (compute_mean_loss, get_gul, setmaxloss,
                                       split_tiv_classic,
                                       split_tiv_multiplicative)
from oasislmf.pytools.gul.io import read_getmodel_stream
from oasislmf.pytools.gul.random import (compute_norm_cdf_lookup,
                                         compute_norm_inv_cdf_lookup,
                                         generate_correlated_hash_vector,
                                         get_corr_rval, get_random_generator)
from oasislmf.pytools.gul.utils import append_to_dict_value, binary_search
from oasislmf.pytools.utils import redirect_logging

[docs] logger = logging.getLogger(__name__)
@njit(cache=True)
[docs] def adjust_byte_mv_size(byte_mv, max_bytes_per_coverage): """ adjust buff size so that the buffer fits the longest coverage Args: byte_mv: numpy byte array max_bytes_per_coverage: max size possible to accommodate all the coverage in byte_mv Returns: byte_mv: numpy byte array """ # buff_size = byte_mv.shape[0] while buff_size < max_bytes_per_coverage: buff_size *= 2 if byte_mv.shape[0] < buff_size: # create a new bigger byte_mv byte_mv = np.empty(buff_size, dtype='b') return byte_mv
[docs] def get_coverages(input_path, ignore_file_type=set()): """Load the coverages from the coverages file. Args: input_path (str): the path containing the coverage file. ignore_file_type (Set[str]): file extension to ignore when loading. Returns: numpy.array[oasis_float]: array with the coverage values for each coverage_id. """ input_files = set(os.listdir(input_path)) if "coverages.bin" in input_files and "bin" not in ignore_file_type: coverages_fname = os.path.join(input_path, 'coverages.bin') logger.debug(f"loading {coverages_fname}") coverages = np.fromfile(coverages_fname, dtype=oasis_float) elif "coverages.csv" in input_files and "csv" not in ignore_file_type: coverages_fname = os.path.join(input_path, 'coverages.csv') logger.debug(f"loading {coverages_fname}") coverages = np.loadtxt(coverages_fname, dtype=oasis_float, delimiter=",", skiprows=1, ndmin=1)[:, 1] else: raise FileNotFoundError(f'coverages file not found at {input_path}') return coverages
[docs] def gul_get_items(input_path, ignore_file_type=set()): """Load the items from the items file. Args: input_path (str): the path pointing to the file ignore_file_type (Set[str]): file extension to ignore when loading. Returns: Tuple[Dict[int, int], List[int], Dict[int, int], List[Tuple[int, int]], List[int]] vulnerability dictionary, vulnerability IDs, areaperil to vulnerability index dictionary, areaperil ID to vulnerability index array, areaperil ID to vulnerability array """ input_files = set(os.listdir(input_path)) if "items.bin" in input_files and "bin" not in ignore_file_type: items_fname = os.path.join(input_path, 'items.bin') logger.debug(f"loading {items_fname}") items = np.memmap(items_fname, dtype=Item, mode='r') elif "items.csv" in input_files and "csv" not in ignore_file_type: items_fname = os.path.join(input_path, 'items.csv') logger.debug(f"loading {items_fname}") items = np.loadtxt(items_fname, dtype=Item, delimiter=",", skiprows=1, ndmin=1) else: raise FileNotFoundError(f'items file not found at {input_path}') return items
@njit(cache=True, fastmath=True)
[docs] def generate_item_map(items, coverages): """Generate item_map; requires items to be sorted. Args: items (numpy.ndarray[int32, int32, int32]): 1-d structured array storing `item_id`, `coverage_id`, `group_id` for all items. items need to be sorted by increasing areaperil_id, vulnerability_id in order to output the items in correct order. Returns: item_map (Dict[ITEM_MAP_KEY_TYPE, ITEM_MAP_VALUE_TYPE]): dict storing the mapping between areaperil_id, vulnerability_id to item. """ item_map = Dict.empty(ITEM_MAP_KEY_TYPE, List.empty_list(ITEM_MAP_VALUE_TYPE)) Nitems = items.shape[0] for j in range(Nitems): append_to_dict_value( item_map, tuple((items[j]['areaperil_id'], items[j]['vulnerability_id'])), tuple((items[j]['id'], items[j]['coverage_id'], items[j]['group_id'])), ITEM_MAP_VALUE_TYPE ) coverages[items[j]['coverage_id']]['max_items'] += 1 return item_map
@redirect_logging(exec_name='gulpy')
[docs] def run(run_dir, ignore_file_type, sample_size, loss_threshold, alloc_rule, debug, random_generator, peril_filter=[], file_in=None, file_out=None, ignore_correlation=False, **kwargs): """Execute the main gulpy worklow. Args: run_dir: (str) the directory of where the process is running ignore_file_type set(str): file extension to ignore when loading sample_size (int): number of random samples to draw. loss_threshold (float): threshold above which losses are printed to the output stream. alloc_rule (int): back-allocation rule. debug (bool): if True, for each random sample, print to the output stream the random value instead of the loss. random_generator (int): random generator function id. file_in (str, optional): filename of input stream. Defaults to None. file_out (str, optional): filename of output stream. Defaults to None. ignore_correlation (bool): if True, do not compute correlated random samples. Raises: ValueError: if alloc_rule is not 0, 1, or 2. Returns: int: 0 if no errors occurred. """ logger.info("starting gulpy") model_storage = get_storage_from_config_path( os.path.join(run_dir, 'model_storage.json'), os.path.join(run_dir, 'static'), ) input_path = os.path.join(run_dir, 'input') ignore_file_type = set(ignore_file_type) damage_bins = get_damage_bins(model_storage) # read coverages from file coverages_tiv = get_coverages(input_path) # load keys.csv to determine included AreaPerilID from peril_filter if peril_filter: keys_df = pd.read_csv(os.path.join(input_path, 'keys.csv'), dtype=Keys) valid_area_peril_id = keys_df.loc[keys_df['PerilID'].isin(peril_filter), 'AreaPerilID'].to_numpy() logger.debug( f'Peril specific run: ({peril_filter}), {len(valid_area_peril_id)} AreaPerilID included out of {len(keys_df)}') else: valid_area_peril_id = None # init the structure for computation # coverages are numbered from 1, therefore we skip element 0 in `coverages` coverages = np.zeros(coverages_tiv.shape[0] + 1, coverage_type) coverages[1:]['tiv'] = coverages_tiv del coverages_tiv items = gul_get_items(input_path) # in-place sort items in order to store them in item_map in the desired order # currently numba only supports a simple call to np.sort() with no `order` keyword, # so we do the sort here. items = np.sort(items, order=['areaperil_id', 'vulnerability_id']) item_map = generate_item_map(items, coverages) # init array to store the coverages to be computed # coverages are numebered from 1, therefore skip element 0. compute = np.zeros(coverages.shape[0] + 1, items.dtype['coverage_id']) with ExitStack() as stack: # set up streams if file_in is None: streams_in = sys.stdin.buffer else: streams_in = stack.enter_context(open(file_in, 'rb')) if file_out is None or file_out == '-': stream_out = sys.stdout.buffer else: stream_out = stack.enter_context(open(file_out, 'wb')) select_stream_list = [stream_out] # prepare output buffer, write stream header stream_out.write(stream_info_to_bytes(LOSS_STREAM_ID, ITEM_STREAM)) stream_out.write(np.int32(sample_size).tobytes()) # set the random generator function generate_rndm = get_random_generator(random_generator) if alloc_rule not in [0, 1, 2, 3]: raise ValueError(f"Expect alloc_rule to be 0, 1, 2, or 3, got {alloc_rule}") cursor = 0 # create the array to store the seeds seeds = np.zeros(len(np.unique(items['group_id'])), dtype=Item.dtype['group_id']) do_correlation = False if ignore_correlation: logger.info("Correlated random number generation: switched OFF because --ignore-correlation is True.") else: file_path = os.path.join(input_path, 'correlations.bin') data = CorrelationsData.from_bin(file_path=file_path).data Nperil_correlation_groups = len(data) logger.info(f"Detected {Nperil_correlation_groups} peril correlation groups.") if Nperil_correlation_groups > 0 and any(data['damage_correlation_value'] > 0): do_correlation = True else: logger.info("Correlated random number generation: switched OFF because 0 peril correlation groups were detected or " "the correlation value is zero for all peril correlation groups.") if do_correlation: logger.info("Correlated random number generation: switched ON.") corr_data_by_item_id = np.ndarray(Nperil_correlation_groups + 1, dtype=Correlation) corr_data_by_item_id[0] = (0, 0., 0., 0, 0.) corr_data_by_item_id[1:]['peril_correlation_group'] = np.array(data['peril_correlation_group']) corr_data_by_item_id[1:]['damage_correlation_value'] = np.array(data['damage_correlation_value']) logger.info( f"Correlation values for {Nperil_correlation_groups} peril correlation groups have been imported." ) unique_peril_correlation_groups = np.unique(corr_data_by_item_id[1:]['peril_correlation_group']) # pre-compute lookup tables for the Gaussian cdf and inverse cdf # Notes: # - the size `arr_N` can be increased to achieve better resolution in the Gaussian cdf and inv cdf. # - the function `get_corr_rval` to compute the correlated numbers is not affected by arr_N. arr_min, arr_max, arr_N = 1e-16, 1 - 1e-16, 1000000 arr_min_cdf, arr_max_cdf = -20., 20. norm_inv_cdf = compute_norm_inv_cdf_lookup(arr_min, arr_max, arr_N) norm_cdf = compute_norm_cdf_lookup(arr_min_cdf, arr_max_cdf, arr_N) # buffer to be re-used to store all the correlated random values z_unif = np.zeros(sample_size, dtype='float64') else: # create dummy data structures with proper dtypes to allow correct numba compilation corr_data_by_item_id = np.ndarray(1, dtype=Correlation) arr_min, arr_max, arr_N = 0, 0, 0 arr_min_cdf, arr_max_cdf = 0, 0 norm_inv_cdf, norm_cdf = np.zeros(1, dtype='float64'), np.zeros(1, dtype='float64') z_unif = np.zeros(1, dtype='float64') # create buffer to be reused to store all losses for one coverage losses_buffer = np.zeros((sample_size + NUM_IDX + 1, np.max(coverages[1:]['max_items'])), dtype=oasis_float) byte_mv = np.empty(PIPE_CAPACITY * 2, dtype='b') # maximum bytes to be written in the output stream for 1 item max_bytes_per_item = gulSampleslevelHeader_size + (sample_size + NUM_IDX + 1) * gulSampleslevelRec_size for event_data in read_getmodel_stream(streams_in, item_map, coverages, compute, seeds, valid_area_peril_id): event_id, compute_i, items_data, recs, rec_idx_ptr, rng_index = event_data # generation of "base" random values is done as before rndms_base = generate_rndm(seeds[:rng_index], sample_size) # to generate the correlated part, we do the hashing here for now (instead of in stream_to_data) # generate the correlated samples for the whole event, for all peril correlation groups if do_correlation: corr_seeds = generate_correlated_hash_vector(unique_peril_correlation_groups, event_id) eps_ij = generate_rndm(corr_seeds, sample_size, skip_seeds=1) else: # create dummy data structures with proper dtypes to allow correct numba compilation corr_seeds = np.zeros(1, dtype='int64') eps_ij = np.zeros((1, 1), dtype='float64') last_processed_coverage_ids_idx = 0 # adjust buff size so that the buffer fits the longest coverage byte_mv = adjust_byte_mv_size(byte_mv, np.max(coverages['cur_items']) * max_bytes_per_item) while last_processed_coverage_ids_idx < compute_i: cursor, last_processed_coverage_ids_idx = compute_event_losses( event_id, coverages, compute[:compute_i], items_data, last_processed_coverage_ids_idx, sample_size, recs, rec_idx_ptr, damage_bins, loss_threshold, losses_buffer, alloc_rule, do_correlation, rndms_base, eps_ij, corr_data_by_item_id, arr_min, arr_max, arr_N, norm_inv_cdf, arr_min_cdf, arr_max_cdf, norm_cdf, z_unif, debug, max_bytes_per_item, byte_mv, cursor ) # write the losses to the output stream write_start = 0 while write_start < cursor: select([], select_stream_list, select_stream_list) write_start += stream_out.write(byte_mv[write_start:cursor].tobytes()) cursor = 0 logger.info(f"event {event_id} DONE") return 0
@njit(cache=True, fastmath=True)
[docs] def compute_event_losses(event_id, coverages, coverage_ids, items_data, last_processed_coverage_ids_idx, sample_size, recs, rec_idx_ptr, damage_bins, loss_threshold, losses, alloc_rule, do_correlation, rndms_base, eps_ij, corr_data_by_item_id, arr_min, arr_max, arr_N, norm_inv_cdf, arr_min_cdf, arr_max_cdf, norm_cdf, z_unif, debug, max_bytes_per_item, byte_mv, cursor): """Compute losses for an event. Args: event_id (int32): event id. coverages (numpy.array[oasis_float]): array with the coverage values for each coverage_id. coverage_ids (numpy.array[int]): array of unique coverage ids used in this event. items_data (numpy.array[items_data_type]): items-related data. last_processed_coverage_ids_idx (int): index of the last coverage_id stored in `coverage_ids` that was fully processed and printed to the output stream. sample_size (int): number of random samples to draw. recs (numpy.array[ProbMean]): all the cdfs used in event_id. rec_idx_ptr (numpy.array[int]): array with the indices of `rec` where each cdf record starts. damage_bins (List[Union[damagebindictionaryCsv, damagebindictionary]]): loaded data from the damage_bin_dict file. loss_threshold (float): threshold above which losses are printed to the output stream. losses (numpy.array[oasis_float]): array (to be re-used) to store losses for all item_ids. alloc_rule (int): back-allocation rule. do_correlation (bool): if True, compute correlated random samples. rndms (numpy.array[float64]): 2d array of shape (number of seeds, sample_size) storing the random values drawn for each seed. debug (bool): if True, for each random sample, print to the output stream the random value instead of the loss. max_bytes_per_item (int): maximum bytes to be written in the output stream for an item. byte_mv (numpy.array): byte view of where the output is buffered. cursor (int): index of int32_mv where to start writing. Returns: int, int: updated value of cursor, last last_processed_coverage_ids_idx """ for coverage_i in range(last_processed_coverage_ids_idx, coverage_ids.shape[0]): coverage = coverages[coverage_ids[coverage_i]] tiv = coverage['tiv'] # coverages are indexed from 1 Nitem_ids = coverage['cur_items'] exposureValue = tiv / Nitem_ids # estimate max number of bytes needed to output this coverage # conservatively assume all random samples are printed (losses>loss_threshold) # number of records of type gulSampleslevelRec_size is sample_size + 5 (negative sidx) + 1 (terminator line) est_cursor_bytes = Nitem_ids * max_bytes_per_item # return before processing this coverage if the number of free bytes left in the buffer # is not sufficient to write out the full coverage if cursor + est_cursor_bytes > byte_mv.shape[0]: return cursor, last_processed_coverage_ids_idx items = items_data[coverage['start_items']: coverage['start_items'] + coverage['cur_items']] for item_i in range(coverage['cur_items']): item = items[item_i] damagecdf_i = item['damagecdf_i'] rng_index = item['rng_index'] rec = recs[rec_idx_ptr[damagecdf_i]:rec_idx_ptr[damagecdf_i + 1]] prob_to = rec['prob_to'] bin_mean = rec['bin_mean'] Nbins = len(prob_to) # compute mean values gul_mean, std_dev, chance_of_loss, max_loss = compute_mean_loss( tiv, prob_to, bin_mean, Nbins, damage_bins[Nbins - 1]['bin_to'], ) losses[MAX_LOSS_IDX, item_i] = max_loss losses[CHANCE_OF_LOSS_IDX, item_i] = chance_of_loss losses[TIV_IDX, item_i] = exposureValue losses[STD_DEV_IDX, item_i] = std_dev losses[MEAN_IDX, item_i] = gul_mean if sample_size > 0: if do_correlation: item_corr_data = corr_data_by_item_id[item['item_id']] rho = item_corr_data['damage_correlation_value'] if rho > 0: peril_correlation_group = item_corr_data['peril_correlation_group'] get_corr_rval( eps_ij[peril_correlation_group], rndms_base[rng_index], rho, arr_min, arr_max, arr_N, norm_inv_cdf, arr_min_cdf, arr_max_cdf, norm_cdf, sample_size, z_unif ) rndms = z_unif else: rndms = rndms_base[rng_index] else: rndms = rndms_base[rng_index] if debug: for sample_idx in range(1, sample_size + 1): rval = rndms[sample_idx - 1] losses[sample_idx, item_i] = rval else: for sample_idx in range(1, sample_size + 1): # cap `rval` to the maximum `prob_to` value (which should be 1.) rval = rndms[sample_idx - 1] if rval >= prob_to[Nbins - 1]: rval = prob_to[Nbins - 1] - 0.00000003 bin_idx = Nbins - 1 else: # find the bin in which the random value `rval` falls into # note that rec['bin_mean'] == damage_bins['interpolation'], therefore # there's a 1:1 mapping between indices of rec and damage_bins bin_idx = binary_search(rval, prob_to, Nbins) # compute ground-up losses gul = get_gul( damage_bins['bin_from'][bin_idx], damage_bins['bin_to'][bin_idx], bin_mean[bin_idx], prob_to[bin_idx - 1] * (bin_idx > 0), prob_to[bin_idx], rval, tiv ) if gul >= loss_threshold: losses[sample_idx, item_i] = gul else: losses[sample_idx, item_i] = 0 cursor = write_losses(event_id, sample_size, loss_threshold, losses[:, :items.shape[0]], items['item_id'], alloc_rule, tiv, byte_mv, cursor) # register that another `coverage_id` has been processed last_processed_coverage_ids_idx += 1 return cursor, last_processed_coverage_ids_idx
@njit(cache=True, fastmath=True)
[docs] def write_losses(event_id, sample_size, loss_threshold, losses, item_ids, alloc_rule, tiv, byte_mv, cursor): """Write the computed losses. Args: event_id (int32): event id. sample_size (int): number of random samples to draw. loss_threshold (float): threshold above which losses are printed to the output stream. losses (numpy.array[oasis_float]): losses for all item_ids item_ids (numpy.array[ITEM_ID_TYPE]): ids of items whose losses are in `losses`. alloc_rule (int): back-allocation rule. tiv (oasis_float): total insured value. byte_mv (numpy.ndarray): byte view of where the output is buffered. cursor (int): index of int32_mv where to start writing. Returns: int: updated values of cursor """ if alloc_rule == 2: losses[1:] = setmaxloss(losses[1:]) if tiv > 0: # check whether the sum of losses-per-sample exceeds TIV # if so, split TIV in proportion to the losses if alloc_rule in [1, 2]: # loop over all positive sidx samples for sample_i in range(1, losses.shape[0]): split_tiv_classic(losses[sample_i], tiv) elif alloc_rule == 3: # loop over all positive sidx samples for sample_i in range(1, losses.shape[0]): split_tiv_multiplicative(losses[sample_i], tiv) # output the losses for all the items for item_j in range(item_ids.shape[0]): # write header cursor = mv_write_item_header(byte_mv, cursor, event_id, item_ids[item_j]) # write negative sidx for sample_idx in SPECIAL_SIDX: cursor = mv_write_sidx_loss(byte_mv, cursor, sample_idx, losses[sample_idx, item_j]) # write the random samples (only those with losses above the threshold) for sample_idx in range(1, sample_size + 1): if losses[sample_idx, item_j] >= loss_threshold: cursor = mv_write_sidx_loss(byte_mv, cursor, sample_idx, losses[sample_idx, item_j]) # write terminator for the samples for this item cursor = mv_write_delimiter(byte_mv, cursor) return cursor