Source code for oasislmf.pytools.common.input_files

import logging
import numpy as np
from pathlib import Path

from oasislmf.pytools.common.data import load_as_ndarray
from oasislmf.pytools.common.event_stream import mv_read, oasis_int, oasis_float


[docs] logger = logging.getLogger(__name__)
# Input file names (input/<file_name>)
[docs] EVENTRATES_FILE = "event_rates.csv"
[docs] OCCURRENCE_FILE = "occurrence.bin"
[docs] PERIODS_FILE = "periods.bin"
[docs] QUANTILE_FILE = "quantile.bin"
[docs] RETURNPERIODS_FILE = "returnperiods.bin"
[docs] def read_event_rates(run_dir, filename=EVENTRATES_FILE): """Reads event rates from a CSV file Args: run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): event rates csv file name Returns: unique_event_ids (ndarray[oasis_int]): unique event ids event_rates (ndarray[oasis_float]): event rates """ event_rate_file = Path(run_dir, filename) data = load_as_ndarray( run_dir, filename[:-4], np.dtype([('event_id', oasis_int), ('rate', oasis_float)]), must_exist=False, col_map={"event_id": "EventIds", "rate": "Event_rates"} ) if data is None or data.size == 0: logger.info(f"Event rate file {event_rate_file} is empty, proceeding without event rates.") return np.array([], dtype=oasis_int), np.array([], dtype=oasis_float) unique_event_ids = data['event_id'] event_rates = data['rate'] # Make sure event_ids are sorted sort_idx = np.argsort(unique_event_ids) unique_event_ids = unique_event_ids[sort_idx] event_rates = event_rates[sort_idx] return unique_event_ids, event_rates
[docs] def read_quantile(sample_size, run_dir, filename=QUANTILE_FILE, return_empty=False): """Generate a quantile interval Dictionary based on sample size and quantile binary file Args: sample_size (int): Sample size run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): quantile binary file name return_empty (bool): return an empty intervals array regardless of the existence of the quantile binary Returns: intervals (quantile_interval_dtype): Numpy array emulating a dictionary for numba """ intervals = [] quantile_interval_dtype = np.dtype([ ('q', oasis_float), ('integer_part', oasis_int), ('fractional_part', oasis_float), ]) if return_empty: return np.array([], dtype=quantile_interval_dtype) data = load_as_ndarray(run_dir, filename[:-4], np.float32, must_exist=True) for q in data: # Calculate interval index and fractional part pos = (sample_size - 1) * q + 1 integer_part = int(pos) fractional_part = pos - integer_part intervals.append((q, integer_part, fractional_part)) # Convert to numpy array intervals = np.array(intervals, dtype=quantile_interval_dtype) return intervals
[docs] def read_occurrence(run_dir, filename=OCCURRENCE_FILE): """Read the occurrence binary file and returns an occurrence map Args: run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): occurrence binary file name Returns: occ_map (ndarray[occ_map_dtype]): numpy map of event_id, period_no, occ_date_id from the occurrence file """ occurrence_fp = Path(run_dir, filename) fin = np.memmap(occurrence_fp, mode="r", dtype="u1") cursor = 0 valid_buff = len(fin) if valid_buff - cursor < np.dtype(np.int32).itemsize: raise RuntimeError("Error: broken occurrence file, not enough data") date_opts, cursor = mv_read(fin, cursor, np.int32, np.dtype(np.int32).itemsize) date_algorithm = date_opts & 1 granular_date = date_opts >> 1 # (event_id: int, period_no: int, occ_date_id: int) record_size = np.dtype(np.int32).itemsize * 3 # (event_id: int, period_no: int, occ_date_id: long long) if granular_date: record_size = np.dtype(np.int32).itemsize * 2 + np.dtype(np.int64).itemsize # Should not get here if not date_algorithm and granular_date: raise RuntimeError("FATAL: Unknown date algorithm") # Extract no_of_periods if valid_buff - cursor < np.dtype(np.int32).itemsize: raise RuntimeError("Error: broken occurrence file, not enough data") no_of_periods, cursor = mv_read(fin, cursor, np.int32, np.dtype(np.int32).itemsize) num_records = (valid_buff - cursor) // record_size if (valid_buff - cursor) % record_size != 0: logger.warning( f"Occurrence File size (num_records: {num_records}) does not align with expected record size (record_size: {record_size})" ) occ_map_dtype = np.dtype([ ("event_id", np.int32), ("period_no", np.int32), ("occ_date_id", np.int32), ]) if granular_date: occ_map_dtype = np.dtype([ ("event_id", np.int32), ("period_no", np.int32), ("occ_date_id", np.int64), ]) occ_map = np.zeros(0, dtype=occ_map_dtype) if num_records > 0: occ_map = np.frombuffer(fin[cursor:cursor + num_records * record_size], dtype=occ_map_dtype) return occ_map, date_algorithm, granular_date, no_of_periods
[docs] def read_periods(no_of_periods, run_dir, filename=PERIODS_FILE): """Returns an array of period weights for each period between 1 and no_of_periods inclusive (with no gaps). Args: no_of_periods (int): Number of periods run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): periods binary file name Returns: period_weights (ndarray[period_weights_dtype]): Period weights """ periods_fp = Path(run_dir, filename) period_weights_dtype = np.dtype([ ("period_no", np.int32), ("weighting", np.float64), ]) if not periods_fp.exists(): # If no periods binary file found, the revert to using period weights reciprocal to no_of_periods logger.warning(f"Periods file not found at {periods_fp}, using reciprocal calculated period weights based on no_of_periods {no_of_periods}") period_weights = np.array( [(i + 1, 1 / no_of_periods) for i in range(no_of_periods)], dtype=period_weights_dtype ) return period_weights data = load_as_ndarray(run_dir, filename[:-4], period_weights_dtype, must_exist=True) # Less data than no_of_periods if len(data) != no_of_periods: raise RuntimeError(f"ERROR: no_of_periods does not match total period_no in period binary file {periods_fp}.") # Sort by period_no period_weights = np.sort(data, order="period_no") # Identify any missing periods expected_periods = np.arange(1, no_of_periods + 1) actual_periods = period_weights['period_no'] missing_periods = np.setdiff1d(expected_periods, actual_periods) if len(missing_periods) > 0: raise RuntimeError(f"ERROR: Missing period_no in period binary file {periods_fp}.") return period_weights
[docs] def read_return_periods(use_return_period_file, run_dir, filename=RETURNPERIODS_FILE): """Returns an array of return periods decreasing order with no duplicates. Args: use_return_period_file (bool): Bool to use Return Period File run_dir (str | os.PathLike): Path to input files dir filename (str | os.PathLike): return periods binary file name Returns: return_periods (ndarray[np.int32]): Return Periods use_return_period_file (bool): Bool to use Return Period File """ if not use_return_period_file: return np.array([], dtype=np.int32), use_return_period_file returnperiods_fp = Path(run_dir, filename) if not returnperiods_fp.exists(): raise RuntimeError(f"ERROR: Return Periods file not found at {returnperiods_fp}.") returnperiods = load_as_ndarray( run_dir, filename[:-4], np.int32, must_exist=False ) if len(returnperiods) == 0: logger.warning(f"WARNING: Empty return periods file at {returnperiods_fp}. Running without defined return periods option") return None, False # Check return periods validity # Return periods should be unique and decreasing in order if len(returnperiods) != len(np.unique(returnperiods)): raise RuntimeError(f"ERROR: Invalid return periods file. Duplicate return periods found: {returnperiods}") lastrp = -1 for rp in returnperiods: if lastrp != -1 and lastrp <= rp: raise RuntimeError(f"ERROR: Invalid return periods file. Non-decreasing return periods found: {returnperiods}") lastrp = rp return returnperiods, use_return_period_file