Source code for oasislmf.pytools.common.input_files

from contextlib import ExitStack
import logging
import sys
import numba as nb
import numpy as np
from pathlib import Path

from oasislmf.pytools.common.data import (
    load_as_ndarray, nb_oasis_int,
    correlations_headers, correlations_dtype, coverages_headers,
    occurrence_dtype, occurrence_granular_dtype, periods_dtype, quantile_dtype,
    quantile_interval_dtype, returnperiods_dtype
)
from oasislmf.pytools.common.event_stream import mv_read, oasis_int, oasis_float


[docs] logger = logging.getLogger(__name__)
# Input file names (input/<file_name>)
[docs] AMPLIFICATIONS_FILE = "amplifications.bin"
[docs] CORRELATIONS_FILENAME = "correlations.bin"
[docs] COVERAGES_FILE = "coverages.bin"
[docs] EVENTRATES_FILE = "event_rates.csv"
[docs] FMPOLICYTC_FILE = "fm_policytc.bin"
[docs] FMPROGRAMME_FILE = "fm_programme.bin"
[docs] FMPROFILE_FILE = "fm_profile.bin"
[docs] FMSUMMARYXREF_FILE = "fmsummaryxref.bin"
[docs] FMXREF_FILE = "fmxref.bin"
[docs] GULSUMMARYXREF_FILE = "gulsummaryxref.bin"
[docs] ITEMS_FILE = "items.bin"
[docs] OCCURRENCE_FILE = "occurrence.bin"
[docs] PERIODS_FILE = "periods.bin"
[docs] QUANTILE_FILE = "quantile.bin"
[docs] RETURNPERIODS_FILE = "returnperiods.bin"
[docs] def read_amplifications(run_dir="", filename=AMPLIFICATIONS_FILE, use_stdin=False): """ Get array of amplification IDs from amplifications.bin, where index corresponds to item ID. amplifications.bin is binary file with layout: reserved header (4-byte int), item ID 1 (4-byte int), amplification ID a_1 (4-byte int), ... item ID n (4-byte int), amplification ID a_n (4-byte int) Args: run_dir (str): path to amplifications.bin file filename (str | os.PathLike): amplifications file name use_stdin (bool): Use standard input for file data, ignores run_dir/filename. Defaults to False. Returns: items_amps (numpy.ndarray): array of amplification IDs, where index corresponds to item ID """ header_size = 4 if use_stdin: items_amps = np.frombuffer(sys.stdin.buffer.read(), dtype=np.int32, offset=header_size) else: amplification_file = Path(run_dir, filename) if not amplification_file.exists(): raise FileNotFoundError('amplifications file not found.') items_amps = np.fromfile(amplification_file, dtype=np.int32, offset=header_size) # Check item IDs start from 1 and are contiguous if items_amps[0] != 1: raise ValueError(f'First item ID is {items_amps[0]}. Expected 1.') items_amps = items_amps.reshape(len(items_amps) // 2, 2) if not np.all(items_amps[1:, 0] - items_amps[:-1, 0] == 1): raise ValueError('Item IDs in amplifications file are not contiguous') items_amps = np.concatenate((np.array([0]), items_amps[:, 1])) return items_amps
[docs] def read_correlations(run_dir, ignore_file_type=set(), filename=CORRELATIONS_FILENAME): """Load the correlations from the correlations file. Args: run_dir (str): path to correlations file ignore_file_type (Set[str]): file extension to ignore when loading. filename (str | os.PathLike): correlations file name Returns: Tuple[Dict[int, int], List[int], Dict[int, int], List[Tuple[int, int]], List[int]] vulnerability dictionary, vulnerability IDs, areaperil to vulnerability index dictionary, areaperil ID to vulnerability index array, areaperil ID to vulnerability array """ for ext in ["bin", "csv"]: if ext in ignore_file_type: continue correlations_file = Path(run_dir, filename).with_suffix("." + ext) if correlations_file.exists(): logger.debug(f"loading {correlations_file}") if ext == "bin": try: correlations = np.memmap(correlations_file, dtype=correlations_dtype, mode='r') except ValueError: logger.debug("binary file is empty, numpy.memmap failed. trying to read correlations.csv.") correlations = read_correlations(run_dir, ignore_file_type={'bin'}, filename=correlations_file.with_suffix(".csv").name) elif ext == "csv": # Check for header with open(correlations_file, "r") as fin: first_line = fin.readline() first_line_elements = [header.strip() for header in first_line.strip().split(',')] has_header = first_line_elements == correlations_headers correlations = np.loadtxt( correlations_file, dtype=correlations_dtype, delimiter=",", skiprows=1 if has_header else 0, ndmin=1 ) else: raise RuntimeError(f"Cannot read correlations file of type {ext}. Not Implemented.") return correlations raise FileNotFoundError(f'correlations file not found at {run_dir}. Ignoring files with ext {ignore_file_type}.')
[docs] def read_coverages(run_dir="", ignore_file_type=set(), filename=COVERAGES_FILE, use_stdin=False): """Load the coverages from the coverages file. Args: run_dir (str): path to coverages file ignore_file_type (Set[str]): file extension to ignore when loading. filename (str | os.PathLike): coverages file name use_stdin (bool): Use standard input for file data, ignores run_dir/filename. Defaults to False. Returns: numpy.array[oasis_float]: array with the coverage values for each coverage_id. """ for ext in ["bin", "csv"]: if ext in ignore_file_type: continue coverages_file = Path(run_dir, filename).with_suffix("." + ext) if coverages_file.exists(): logger.debug(f"loading {coverages_file}") if ext == "bin": if use_stdin: coverages = np.frombuffer(sys.stdin.buffer.read(), dtype=oasis_float) else: coverages = np.fromfile(coverages_file, dtype=oasis_float) elif ext == "csv": with ExitStack() as stack: if use_stdin: fin = sys.stdin else: fin = stack.enter_context(open(coverages_file, "r")) lines = fin.readlines() # Check for header first_line_elements = [header.strip() for header in lines[0].strip().split(',')] has_header = first_line_elements == coverages_headers data_lines = lines[1:] if has_header else lines coverages = np.loadtxt( data_lines, dtype=oasis_float, delimiter=",", ndmin=1 )[:, 1] else: raise RuntimeError(f"Cannot read coverages file of type {ext}. Not Implemented.") return coverages raise FileNotFoundError(f'coverages file not found at {run_dir}. Ignoring files with ext {ignore_file_type}.')
[docs] def read_event_rates(run_dir, filename=EVENTRATES_FILE): """Reads event rates from a CSV file Args: run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): event rates csv file name Returns: unique_event_ids (ndarray[oasis_int]): unique event ids event_rates (ndarray[oasis_float]): event rates """ event_rate_file = Path(run_dir, filename) data = load_as_ndarray( run_dir, filename[:-4], np.dtype([('event_id', oasis_int), ('rate', oasis_float)]), must_exist=False, col_map={"event_id": "EventIds", "rate": "Event_rates"} ) if data is None or data.size == 0: logger.info(f"Event rate file {event_rate_file} is empty, proceeding without event rates.") return np.array([], dtype=oasis_int), np.array([], dtype=oasis_float) unique_event_ids = data['event_id'] event_rates = data['rate'] # Make sure event_ids are sorted sort_idx = np.argsort(unique_event_ids) unique_event_ids = unique_event_ids[sort_idx] event_rates = event_rates[sort_idx] return unique_event_ids, event_rates
[docs] def read_quantile(sample_size, run_dir, filename=QUANTILE_FILE, return_empty=False): """Generate a quantile interval Dictionary based on sample size and quantile binary file Args: sample_size (int): Sample size run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): quantile binary file name return_empty (bool): return an empty intervals array regardless of the existence of the quantile binary Returns: intervals (quantile_interval_dtype): Numpy array emulating a dictionary for numba """ intervals = [] if return_empty: return np.array([], dtype=quantile_interval_dtype) data = load_as_ndarray(run_dir, filename[:-4], quantile_dtype, must_exist=True) for row in data: q = row["quantile"] # Calculate interval index and fractional part pos = (sample_size - 1) * q + 1 integer_part = int(pos) fractional_part = pos - integer_part intervals.append((q, integer_part, fractional_part)) # Convert to numpy array intervals = np.array(intervals, dtype=quantile_interval_dtype) return intervals
[docs] def read_occurrence_bin(run_dir="", filename=OCCURRENCE_FILE, use_stdin=False): """Read the occurrence binary file and returns an occurrence map Args: run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): occurrence binary file name use_stdin (bool): Use standard input for file data, ignores run_dir/filename. Defaults to False. Returns: occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file """ occurrence_fp = Path(run_dir, filename) if use_stdin: fin = np.frombuffer(sys.stdin.buffer.read(), dtype="u1") else: fin = np.memmap(occurrence_fp, mode="r", dtype="u1") cursor = 0 valid_buff = len(fin) if valid_buff - cursor < np.dtype(np.int32).itemsize: raise RuntimeError("Error: broken occurrence file, not enough data") date_opts, cursor = mv_read(fin, cursor, np.int32, np.dtype(np.int32).itemsize) date_algorithm = date_opts & 1 granular_date = date_opts >> 1 # (event_id: int, period_no: int, occ_date_id: int) record_size = np.dtype(np.int32).itemsize * 3 # (event_id: int, period_no: int, occ_date_id: long long) if granular_date: record_size = np.dtype(np.int32).itemsize * 2 + np.dtype(np.int64).itemsize # Should not get here if not date_algorithm and granular_date: raise RuntimeError("FATAL: Unknown date algorithm") # Extract no_of_periods if valid_buff - cursor < np.dtype(np.int32).itemsize: raise RuntimeError("Error: broken occurrence file, not enough data") no_of_periods, cursor = mv_read(fin, cursor, np.int32, np.dtype(np.int32).itemsize) num_records = (valid_buff - cursor) // record_size if (valid_buff - cursor) % record_size != 0: logger.warning( f"Occurrence File size (num_records: {num_records}) does not align with expected record size (record_size: {record_size})" ) occ_dtype = occurrence_dtype if granular_date: occ_dtype = occurrence_granular_dtype occ_arr = np.zeros(0, dtype=occ_dtype) if num_records > 0: occ_arr = np.frombuffer(fin[cursor:cursor + num_records * record_size], dtype=occ_dtype) return occ_arr, date_algorithm, granular_date, no_of_periods
[docs] def read_occurrence(run_dir, filename=OCCURRENCE_FILE): """Read the occurrence binary file and returns an occurrence map Args: run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): occurrence binary file name Returns: occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file """ occ_arr, date_algorithm, granular_date, no_of_periods = read_occurrence_bin(run_dir, filename=filename) occ_dtype = occurrence_dtype if granular_date: occ_dtype = occurrence_granular_dtype occ_map_valtype = occ_dtype[["period_no", "occ_date_id"]] NB_occ_map_valtype = nb.types.Array(nb.from_dtype(occ_map_valtype), 1, "C") occ_map = _read_occ_arr(occ_arr, occ_map_valtype, NB_occ_map_valtype) return occ_map, date_algorithm, granular_date, no_of_periods
@nb.njit(cache=True, error_model="numpy") def _read_occ_arr(occ_arr, occ_map_valtype, NB_occ_map_valtype): """Reads occurrence file array and returns an occurrence map of event_id to list of (period_no, occ_date_id) """ occ_map = nb.typed.Dict.empty(nb_oasis_int, NB_occ_map_valtype) occ_map_sizes = nb.typed.Dict.empty(nb_oasis_int, nb.types.int64) for row in occ_arr: event_id = row["event_id"] if event_id not in occ_map: occ_map[event_id] = np.zeros(8, dtype=occ_map_valtype) occ_map_sizes[event_id] = 0 array = occ_map[event_id] current_size = occ_map_sizes[event_id] if current_size >= len(array): # Resize if the array is full new_array = np.empty(len(array) * 2, dtype=occ_map_valtype) new_array[:len(array)] = array array = new_array occ_map_current_size = occ_map_sizes[event_id] array[occ_map_current_size]["period_no"] = row["period_no"] array[occ_map_current_size]["occ_date_id"] = row["occ_date_id"] occ_map[event_id] = array occ_map_sizes[event_id] += 1 for event_id in occ_map: occ_map[event_id] = occ_map[event_id][:occ_map_sizes[event_id]] return occ_map @nb.njit(cache=True)
[docs] def occ_get_date(occ_date_id, granular_date): """Returns date as year, month, day, hour, minute from occ_date_id Args: occ_date_id (np.int32 | np.int64): occurrence file date id (int64 for granular dates) granular_date (bool): boolean for whether granular date should be extracted or not Returns: (oasis_int, oasis_int, oasis_int, oasis_int, oasis_int): Returns year, month, date, hour, minute """ days = occ_date_id / (1440 - 1439 * (not granular_date)) # Function void d(long long g, int& y, int& mm, int& dd) taken from pltcalc.cpp y = (10000 * days + 14780) // 3652425 ddd = days - (365 * y + y // 4 - y // 100 + y // 400) if ddd < 0: y = y - 1 ddd = days - (365 * y + y // 4 - y // 100 + y // 400) mi = (100 * ddd + 52) // 3060 mm = (mi + 2) % 12 + 1 y = y + (mi + 2) // 12 dd = ddd - (mi * 306 + 5) // 10 + 1 minutes = (occ_date_id % 1440) * granular_date occ_hour = minutes // 60 occ_minutes = minutes % 60 return y, mm, dd, occ_hour, occ_minutes
@nb.njit(cache=True)
[docs] def occ_get_date_id(granular_date, occ_year, occ_month, occ_day, occ_hour=0, occ_minute=0): """Returns the occ_date_id from year, month, day, hour, minute and whether it is a granular date Args: granular_date (bool): boolean for whether granular date should be extracted or not occ_year (int): Occurrence Year. occ_month (int): Occurrence Month. occ_day (int): Occurrence Day. occ_hour (int): Occurrence Hour. Defaults to 0. occ_minute (int): Occurrence Minute. Defaults to 0. Returns: occ_date_id (np.int64): occurrence file date id (int64 for granular dates) """ occ_month = (occ_month + 9) % 12 occ_year = occ_year - occ_month // 10 occ_date_id = np.int64( 365 * occ_year + occ_year // 4 - occ_year // 100 + occ_year // 400 + (occ_month * 306 + 5) // 10 + (occ_day - 1) ) occ_date_id *= (1440 // (1440 - 1439 * granular_date)) occ_date_id += (60 * occ_hour + occ_minute) return occ_date_id
[docs] def read_periods(no_of_periods, run_dir, filename=PERIODS_FILE): """Returns an array of period weights for each period between 1 and no_of_periods inclusive (with no gaps). Args: no_of_periods (int): Number of periods run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): periods binary file name Returns: period_weights (ndarray[periods_dtype]): Period weights """ periods_fp = Path(run_dir, filename) if not periods_fp.exists(): # If no periods binary file found, the revert to using period weights reciprocal to no_of_periods logger.warning(f"Periods file not found at {periods_fp}, using reciprocal calculated period weights based on no_of_periods {no_of_periods}") period_weights = np.array( [(i + 1, 1 / no_of_periods) for i in range(no_of_periods)], dtype=periods_dtype ) return period_weights data = load_as_ndarray(run_dir, filename[:-4], periods_dtype, must_exist=True) # Less data than no_of_periods if len(data) != no_of_periods: raise RuntimeError(f"ERROR: no_of_periods does not match total period_no in period binary file {periods_fp}.") # Sort by period_no period_weights = np.sort(data, order="period_no") # Identify any missing periods expected_periods = np.arange(1, no_of_periods + 1) actual_periods = period_weights['period_no'] missing_periods = np.setdiff1d(expected_periods, actual_periods) if len(missing_periods) > 0: raise RuntimeError(f"ERROR: Missing period_no in period binary file {periods_fp}.") return period_weights
[docs] def read_returnperiods(use_return_period_file, run_dir, filename=RETURNPERIODS_FILE): """Returns an array of return periods decreasing order with no duplicates. Args: use_return_period_file (bool): Bool to use Return Period File run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): return periods binary file name Returns: return_periods (ndarray[np.int32]): Return Periods use_return_period_file (bool): Bool to use Return Period File """ if not use_return_period_file: return np.array([], dtype=returnperiods_dtype)["return_period"], use_return_period_file returnperiods_fp = Path(run_dir, filename) if not returnperiods_fp.exists(): raise RuntimeError(f"ERROR: Return Periods file not found at {returnperiods_fp}.") returnperiods = load_as_ndarray( run_dir, filename[:-4], returnperiods_dtype, must_exist=False ) if len(returnperiods) == 0: logger.warning(f"WARNING: Empty return periods file at {returnperiods_fp}. Running without defined return periods option") return None, False # Check return periods validity # Return periods should be unique and decreasing in order if len(returnperiods) != len(np.unique(returnperiods)): raise RuntimeError(f"ERROR: Invalid return periods file. Duplicate return periods found: {returnperiods}") lastrp = -1 for rp in returnperiods["return_period"]: if lastrp != -1 and lastrp <= rp: raise RuntimeError(f"ERROR: Invalid return periods file. Non-decreasing return periods found: {returnperiods}") lastrp = rp return returnperiods["return_period"], use_return_period_file