Source code for oasislmf.pytools.gul.io

"""
This file contains the utilities for all the I/O necessary in gulpy.

"""
from select import select

import numpy as np
from numba import njit
from numba.typed import Dict, List
from numba.types import int8 as nb_int8
from numba.types import int32 as nb_int32
from numba.types import int64 as nb_int64

from oasislmf.pytools.common.data import areaperil_int, areaperil_int_size, oasis_int, oasis_int_size, oasis_float, oasis_float_size
from oasislmf.pytools.common.event_stream import PIPE_CAPACITY, bytes_to_stream_types, mv_read, CDF_STREAM_ID
from oasislmf.pytools.gul.common import (NP_BASE_ARRAY_SIZE, ProbMean,
                                         ProbMean_size,
                                         damagecdfrec_stream, items_data_type)
from oasislmf.pytools.gul.random import generate_hash


@njit(cache=True)
[docs] def gen_structs(): """Generate some data structures needed for the whole computation. Returns: Dict(int,int), List: map of group ids to random seeds, list storing the index where a specific cdf record starts in the `rec` numpy array. """ group_id_rng_index = Dict.empty(nb_int32, nb_int64) rec_idx_ptr = List([0]) return group_id_rng_index, rec_idx_ptr
@njit(cache=True)
[docs] def gen_valid_area_peril(valid_area_peril_id): valid_area_peril_dict = Dict() for i in range(valid_area_peril_id.shape[0]): valid_area_peril_dict[valid_area_peril_id[i]] = nb_int8(0) return valid_area_peril_dict
[docs] def read_getmodel_stream(stream_in, item_map, coverages, compute, seeds, valid_area_peril_id=None, buff_size=PIPE_CAPACITY): """Read the getmodel output stream yielding data event by event. Args: stream_in (buffer-like): input stream, e.g. `sys.stdin.buffer`. item_map (Dict[ITEM_MAP_KEY_TYPE, ITEM_MAP_VALUE_TYPE]): dict storing the mapping between areaperil_id, vulnerability_id to item. coverages (numpy.ndarray[coverage_type]): array with coverage data. compute (numpy.array[int]): list of coverages to be computed. seeds (numpy.array[int]): the random seeds for each coverage_id. buff_size (int): size in bytes of the read buffer (see note). valid_area_peril_id (list[int]): list of valid areaperil_ids. Raises: ValueError: If the stream type is not 1. Yields: int, int, numpy.array[items_data_type], numpy.array[oasis_float], numpy.array[int], int: event_id, index of the last coverage_id stored in compute, item-related data, cdf records, array with the indices of `rec` where each cdf record starts, number of unique random seeds computed so far. Note: It is advisable to set buff_size as 2x the maximum pipe limit (65536 bytes) to ensure that the stream is always read in the biggest possible chunks, which nominally is the largest between the pipe limit and the remaining memory to fill the memoryview. """ # determine stream type stream_source_type, stream_agg_type = bytes_to_stream_types(stream_in.read(4)) # see https://github.com/OasisLMF/ktools/blob/master/docs/md/CoreComponents.md if stream_source_type != CDF_STREAM_ID: raise ValueError(f"FATAL: Invalid stream type: expect {CDF_STREAM_ID}, got {stream_source_type}.") # maximum number of entries is buff_size divided by the minimum entry size # (corresponding to a 1-bin only cdf) min_size_cdf_entry = damagecdfrec_stream.size + 4 + ProbMean.size # each record from getmodel stream is expected to contain: # 1 damagecdfrec_stream obj, 1 int (Nbins), a number `Nbins` of ProbMean objects # init the memory view to store the stream mv = memoryview(bytearray(buff_size)) byte_mv = np.frombuffer(buffer=mv, dtype='b') valid_buf = 0 last_event_id = -1 len_read = 1 if valid_area_peril_id is not None: valid_area_peril_dict = gen_valid_area_peril(valid_area_peril_id) else: valid_area_peril_dict = None # init data structures group_id_rng_index, rec_idx_ptr = gen_structs() rng_index = 0 damagecdf_i = 0 compute_i = 0 items_data_i = 0 coverages['cur_items'].fill(0) recs = [] items_data = np.empty(2 ** NP_BASE_ARRAY_SIZE, dtype=items_data_type) select_stream_list = [stream_in] while True: if valid_buf < buff_size and len_read: select(select_stream_list, [], select_stream_list) # read the stream from valid_buf onwards len_read = stream_in.readinto1(mv[valid_buf:]) valid_buf += len_read if valid_buf == 0: # the stream has ended and all the data has been read if last_event_id != -1: yield last_event_id, compute_i, items_data, np.concatenate(recs), rec_idx_ptr, rng_index break # read the streamed data into formatted data cursor, yield_event, event_id, rec, rec_idx_ptr, last_event_id, compute_i, items_data_i, items_data, rng_index, group_id_rng_index, damagecdf_i = stream_to_data( byte_mv, valid_buf, min_size_cdf_entry, last_event_id, item_map, coverages, valid_area_peril_dict, compute_i, compute, items_data_i, items_data, seeds, rng_index, group_id_rng_index, damagecdf_i, rec_idx_ptr ) if cursor == 0 and len_read == 0: # here valid buff > 0, but not enough data to have a full item => the stream stopped prematurely raise Exception( f"cdf input stream ended prematurely, event_id: {event_id}, valid_buf: {valid_buf}, buff_size: {buff_size}" ) # persist the cdf records read from the stream recs.append(rec) if yield_event: # event is fully read yield last_event_id, compute_i, items_data, np.concatenate(recs), rec_idx_ptr, rng_index # init the data structures for the next event last_event_id = event_id # clear data structures group_id_rng_index, rec_idx_ptr = gen_structs() rng_index = 0 damagecdf_i = 0 compute_i = 0 items_data_i = 0 coverages['cur_items'].fill(0) recs = [] # move the un-read data to the beginning of the memoryview mv[:valid_buf - cursor] = mv[cursor:valid_buf] # update the length of the valid data valid_buf -= cursor
@njit(cache=True, fastmath=True)
[docs] def stream_to_data(byte_mv, valid_buf, size_cdf_entry, last_event_id, item_map, coverages, valid_area_peril_dict, compute_i, compute, items_data_i, items_data, seeds, rng_index, group_id_rng_index, damagecdf_i, rec_idx_ptr): """Parse streamed data into data arrays. Args: byte_mv (ndarray): byte view of the buffer valid_buf (int): number of bytes with valid data size_cdf_entry (int): size (in bytes) of a single record last_event_id (int): event_id of the last event that was completed item_map (Dict[ITEM_MAP_KEY_TYPE, ITEM_MAP_VALUE_TYPE]): dict storing the mapping between areaperil_id, vulnerability_id to item. coverages (numpy.ndarray[coverage_type]): array with coverage data. compute_i (int): index of the last coverage id stored in `compute`. compute (numpy.array[int]): list of coverage ids to be computed. items_data_i (int): index of the last items_data_i stored in `items_data`. items_data (numpy.array[items_data_type]): item-related data. seeds (numpy.array[int]): the random seeds for each coverage_id. rng_index (int): number of unique random seeds computed so far. group_id_rng_index (Dict([int,int])): map of group ids to random seeds. damagecdf_i (int): index of the last cdf record that has been read from stream and stored in `rec`. rec_idx_ptr (numpy.array[int]): array with the indices of `rec` where each cdf record starts. Returns: int, bool, int, numpy.array[ProbMean], numpy.array[int], int, int, int, numpy.array[items_data_type], int, Dict[ITEM_MAP_KEY_TYPE, ITEM_MAP_VALUE_TYPE]), int: number of int numbers read from the int32_mv ndarray, whether the current event (id=`event_id`) has been fully read, cdf record, array with the indices of `rec` where each cdf record starts, last or current event id, index of the last coverage id stored in `compute`, index of the last items_data_i stored in `items_data`, item-related data, number of unique random seeds computed so far, map of group ids to random seeds, index of the last cdf record that has been read from stream and stored in `rec`W """ yield_event = False # `rec` is a temporary buffer to store the cdf being read # conservative choice: size `rec` as if the entire buffer is filled with cdf bins rec = np.zeros(valid_buf // ProbMean_size, dtype=ProbMean) # int32 memoryview cursor cursor = 0 # init a counter for the local `rec` array last_rec_idx_ptr = 0 rec_valid_len = 0 while cursor + size_cdf_entry <= valid_buf: event_cursor = cursor event_id, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size) if event_id != last_event_id: # a new event has started if last_event_id > 0: # if this is not the beginning of the very first event, yield the event that was just completed yield_event = True cursor = event_cursor break last_event_id = event_id areaperil_id, cursor = mv_read(byte_mv, cursor, areaperil_int, areaperil_int_size) vulnerability_id, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size) Nbins_to_read, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size) if cursor + Nbins_to_read * ProbMean_size > valid_buf: # if the next cdf record is not fully contained in the valid buf, then # get more data in the buffer and put cursor back at the beginning of this cdf cursor = event_cursor break # peril filter if valid_area_peril_dict is not None: if areaperil_id not in valid_area_peril_dict: cursor += ProbMean_size * Nbins_to_read continue # read damage cdf bins start_rec = last_rec_idx_ptr end_rec = start_rec + Nbins_to_read for j in range(start_rec, end_rec, 1): rec[j]['prob_to'], cursor = mv_read(byte_mv, cursor, oasis_float, oasis_float_size) rec[j]['bin_mean'], cursor = mv_read(byte_mv, cursor, oasis_float, oasis_float_size) rec_idx_ptr.append(rec_idx_ptr[-1] + Nbins_to_read) last_rec_idx_ptr = end_rec rec_valid_len += Nbins_to_read # register the items to their coverage item_key = tuple((areaperil_id, vulnerability_id)) for item in item_map[item_key]: item_id, coverage_id, group_id = item # if this group_id was not seen yet, process it. # it assumes that hash only depends on event_id and group_id # and that only 1 event_id is processed at a time. if group_id not in group_id_rng_index: group_id_rng_index[group_id] = rng_index seeds[rng_index] = generate_hash(group_id, last_event_id) this_rng_index = rng_index rng_index += 1 else: this_rng_index = group_id_rng_index[group_id] coverage = coverages[coverage_id] if coverage['cur_items'] == 0: # no items were collected for this coverage yet: set up the structure compute[compute_i], compute_i = coverage_id, compute_i + 1 while items_data.shape[0] < items_data_i + coverage['max_items']: # if items_data needs to be larger to store all the items, double it in size temp_items_data = np.empty(items_data.shape[0] * 2, dtype=items_data.dtype) temp_items_data[:items_data_i] = items_data[:items_data_i] items_data = temp_items_data coverage['start_items'], items_data_i = items_data_i, items_data_i + coverage['max_items'] # append the data of this item item_i = coverage['start_items'] + coverage['cur_items'] items_data[item_i]['item_id'] = item_id items_data[item_i]['damagecdf_i'] = damagecdf_i items_data[item_i]['rng_index'] = this_rng_index coverage['cur_items'] += 1 damagecdf_i += 1 return cursor, yield_event, event_id, rec[:rec_valid_len], rec_idx_ptr, last_event_id, compute_i, items_data_i, items_data, rng_index, group_id_rng_index, damagecdf_i