from contextlib import ExitStack
import logging
import sys
import numba as nb
import numpy as np
from pathlib import Path
from oasislmf.pytools.common.data import (
load_as_ndarray, nb_oasis_int,
correlations_headers, correlations_dtype, coverages_headers,
occurrence_dtype, occurrence_granular_dtype, periods_dtype, quantile_dtype,
quantile_interval_dtype, returnperiods_dtype
)
from oasislmf.pytools.common.event_stream import mv_read, oasis_int, oasis_float
[docs]
logger = logging.getLogger(__name__)
# Input file names (input/<file_name>)
[docs]
AMPLIFICATIONS_FILE = "amplifications.bin"
[docs]
CORRELATIONS_FILENAME = "correlations.bin"
[docs]
COVERAGES_FILE = "coverages.bin"
[docs]
EVENTRATES_FILE = "event_rates.csv"
[docs]
FMPOLICYTC_FILE = "fm_policytc.bin"
[docs]
FMPROGRAMME_FILE = "fm_programme.bin"
[docs]
FMPROFILE_FILE = "fm_profile.bin"
[docs]
FMSUMMARYXREF_FILE = "fmsummaryxref.bin"
[docs]
FMXREF_FILE = "fmxref.bin"
[docs]
GULSUMMARYXREF_FILE = "gulsummaryxref.bin"
[docs]
ITEMS_FILE = "items.bin"
[docs]
OCCURRENCE_FILE = "occurrence.bin"
[docs]
PERIODS_FILE = "periods.bin"
[docs]
QUANTILE_FILE = "quantile.bin"
[docs]
RETURNPERIODS_FILE = "returnperiods.bin"
[docs]
def read_amplifications(run_dir="", filename=AMPLIFICATIONS_FILE, use_stdin=False):
"""
Get array of amplification IDs from amplifications.bin, where index
corresponds to item ID.
amplifications.bin is binary file with layout:
reserved header (4-byte int),
item ID 1 (4-byte int), amplification ID a_1 (4-byte int),
...
item ID n (4-byte int), amplification ID a_n (4-byte int)
Args:
run_dir (str): path to amplifications.bin file
filename (str | os.PathLike): amplifications file name
use_stdin (bool): Use standard input for file data, ignores run_dir/filename. Defaults to False.
Returns:
items_amps (numpy.ndarray): array of amplification IDs, where index
corresponds to item ID
"""
header_size = 4
if use_stdin:
items_amps = np.frombuffer(sys.stdin.buffer.read(), dtype=np.int32, offset=header_size)
else:
amplification_file = Path(run_dir, filename)
if not amplification_file.exists():
raise FileNotFoundError('amplifications file not found.')
items_amps = np.fromfile(amplification_file, dtype=np.int32, offset=header_size)
# Check item IDs start from 1 and are contiguous
if items_amps[0] != 1:
raise ValueError(f'First item ID is {items_amps[0]}. Expected 1.')
items_amps = items_amps.reshape(len(items_amps) // 2, 2)
if not np.all(items_amps[1:, 0] - items_amps[:-1, 0] == 1):
raise ValueError('Item IDs in amplifications file are not contiguous')
items_amps = np.concatenate((np.array([0]), items_amps[:, 1]))
return items_amps
[docs]
def read_correlations(run_dir, ignore_file_type=set(), filename=CORRELATIONS_FILENAME):
"""Load the correlations from the correlations file.
Args:
run_dir (str): path to correlations file
ignore_file_type (Set[str]): file extension to ignore when loading.
filename (str | os.PathLike): correlations file name
Returns:
Tuple[Dict[int, int], List[int], Dict[int, int], List[Tuple[int, int]], List[int]]
vulnerability dictionary, vulnerability IDs, areaperil to vulnerability index dictionary,
areaperil ID to vulnerability index array, areaperil ID to vulnerability array
"""
for ext in ["bin", "csv"]:
if ext in ignore_file_type:
continue
correlations_file = Path(run_dir, filename).with_suffix("." + ext)
if correlations_file.exists():
logger.debug(f"loading {correlations_file}")
if ext == "bin":
try:
correlations = np.memmap(correlations_file, dtype=correlations_dtype, mode='r')
except ValueError:
logger.debug("binary file is empty, numpy.memmap failed. trying to read correlations.csv.")
correlations = read_correlations(run_dir, ignore_file_type={'bin'}, filename=correlations_file.with_suffix(".csv").name)
elif ext == "csv":
# Check for header
with open(correlations_file, "r") as fin:
first_line = fin.readline()
first_line_elements = [header.strip() for header in first_line.strip().split(',')]
has_header = first_line_elements == correlations_headers
correlations = np.loadtxt(
correlations_file,
dtype=correlations_dtype,
delimiter=",",
skiprows=1 if has_header else 0,
ndmin=1
)
else:
raise RuntimeError(f"Cannot read correlations file of type {ext}. Not Implemented.")
return correlations
raise FileNotFoundError(f'correlations file not found at {run_dir}. Ignoring files with ext {ignore_file_type}.')
[docs]
def read_coverages(run_dir="", ignore_file_type=set(), filename=COVERAGES_FILE, use_stdin=False):
"""Load the coverages from the coverages file.
Args:
run_dir (str): path to coverages file
ignore_file_type (Set[str]): file extension to ignore when loading.
filename (str | os.PathLike): coverages file name
use_stdin (bool): Use standard input for file data, ignores run_dir/filename. Defaults to False.
Returns:
numpy.array[oasis_float]: array with the coverage values for each coverage_id.
"""
for ext in ["bin", "csv"]:
if ext in ignore_file_type:
continue
coverages_file = Path(run_dir, filename).with_suffix("." + ext)
if coverages_file.exists():
logger.debug(f"loading {coverages_file}")
if ext == "bin":
if use_stdin:
coverages = np.frombuffer(sys.stdin.buffer.read(), dtype=oasis_float)
else:
coverages = np.fromfile(coverages_file, dtype=oasis_float)
elif ext == "csv":
with ExitStack() as stack:
if use_stdin:
fin = sys.stdin
else:
fin = stack.enter_context(open(coverages_file, "r"))
lines = fin.readlines()
# Check for header
first_line_elements = [header.strip() for header in lines[0].strip().split(',')]
has_header = first_line_elements == coverages_headers
data_lines = lines[1:] if has_header else lines
coverages = np.loadtxt(
data_lines,
dtype=oasis_float,
delimiter=",",
ndmin=1
)[:, 1]
else:
raise RuntimeError(f"Cannot read coverages file of type {ext}. Not Implemented.")
return coverages
raise FileNotFoundError(f'coverages file not found at {run_dir}. Ignoring files with ext {ignore_file_type}.')
[docs]
def read_event_rates(run_dir, filename=EVENTRATES_FILE):
"""Reads event rates from a CSV file
Args:
run_dir (str | os.PathLike): Path to input files dir
filename (str | os.PathLike): event rates csv file name
Returns:
unique_event_ids (ndarray[oasis_int]): unique event ids
event_rates (ndarray[oasis_float]): event rates
"""
event_rate_file = Path(run_dir, filename)
data = load_as_ndarray(
run_dir,
filename[:-4],
np.dtype([('event_id', oasis_int), ('rate', oasis_float)]),
must_exist=False,
col_map={"event_id": "EventIds", "rate": "Event_rates"}
)
if data is None or data.size == 0:
logger.info(f"Event rate file {event_rate_file} is empty, proceeding without event rates.")
return np.array([], dtype=oasis_int), np.array([], dtype=oasis_float)
unique_event_ids = data['event_id']
event_rates = data['rate']
# Make sure event_ids are sorted
sort_idx = np.argsort(unique_event_ids)
unique_event_ids = unique_event_ids[sort_idx]
event_rates = event_rates[sort_idx]
return unique_event_ids, event_rates
[docs]
def read_quantile(sample_size, run_dir, filename=QUANTILE_FILE, return_empty=False):
"""Generate a quantile interval Dictionary based on sample size and quantile binary file
Args:
sample_size (int): Sample size
run_dir (str | os.PathLike): Path to input files dir
filename (str | os.PathLike): quantile binary file name
return_empty (bool): return an empty intervals array regardless of the existence of the quantile binary
Returns:
intervals (quantile_interval_dtype): Numpy array emulating a dictionary for numba
"""
intervals = []
if return_empty:
return np.array([], dtype=quantile_interval_dtype)
data = load_as_ndarray(run_dir, filename[:-4], quantile_dtype, must_exist=True)
for row in data:
q = row["quantile"]
# Calculate interval index and fractional part
pos = (sample_size - 1) * q + 1
integer_part = int(pos)
fractional_part = pos - integer_part
intervals.append((q, integer_part, fractional_part))
# Convert to numpy array
intervals = np.array(intervals, dtype=quantile_interval_dtype)
return intervals
[docs]
def read_occurrence_bin(run_dir="", filename=OCCURRENCE_FILE, use_stdin=False):
"""Read the occurrence binary file and returns an occurrence map
Args:
run_dir (str | os.PathLike): Path to input files dir
filename (str | os.PathLike): occurrence binary file name
use_stdin (bool): Use standard input for file data, ignores run_dir/filename. Defaults to False.
Returns:
occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file
"""
occurrence_fp = Path(run_dir, filename)
if use_stdin:
fin = np.frombuffer(sys.stdin.buffer.read(), dtype="u1")
else:
fin = np.memmap(occurrence_fp, mode="r", dtype="u1")
cursor = 0
valid_buff = len(fin)
if valid_buff - cursor < np.dtype(np.int32).itemsize:
raise RuntimeError("Error: broken occurrence file, not enough data")
date_opts, cursor = mv_read(fin, cursor, np.int32, np.dtype(np.int32).itemsize)
date_algorithm = date_opts & 1
granular_date = date_opts >> 1
# (event_id: int, period_no: int, occ_date_id: int)
record_size = np.dtype(np.int32).itemsize * 3
# (event_id: int, period_no: int, occ_date_id: long long)
if granular_date:
record_size = np.dtype(np.int32).itemsize * 2 + np.dtype(np.int64).itemsize
# Should not get here
if not date_algorithm and granular_date:
raise RuntimeError("FATAL: Unknown date algorithm")
# Extract no_of_periods
if valid_buff - cursor < np.dtype(np.int32).itemsize:
raise RuntimeError("Error: broken occurrence file, not enough data")
no_of_periods, cursor = mv_read(fin, cursor, np.int32, np.dtype(np.int32).itemsize)
num_records = (valid_buff - cursor) // record_size
if (valid_buff - cursor) % record_size != 0:
logger.warning(
f"Occurrence File size (num_records: {num_records}) does not align with expected record size (record_size: {record_size})"
)
occ_dtype = occurrence_dtype
if granular_date:
occ_dtype = occurrence_granular_dtype
occ_arr = np.zeros(0, dtype=occ_dtype)
if num_records > 0:
occ_arr = np.frombuffer(fin[cursor:cursor + num_records * record_size], dtype=occ_dtype)
return occ_arr, date_algorithm, granular_date, no_of_periods
[docs]
def read_occurrence(run_dir, filename=OCCURRENCE_FILE):
"""Read the occurrence binary file and returns an occurrence map
Args:
run_dir (str | os.PathLike): Path to input files dir
filename (str | os.PathLike): occurrence binary file name
Returns:
occ_map (nb.typed.Dict): numpy map of event_id, period_no, occ_date_id from the occurrence file
"""
occ_arr, date_algorithm, granular_date, no_of_periods = read_occurrence_bin(run_dir, filename=filename)
occ_dtype = occurrence_dtype
if granular_date:
occ_dtype = occurrence_granular_dtype
occ_map_valtype = occ_dtype[["period_no", "occ_date_id"]]
NB_occ_map_valtype = nb.types.Array(nb.from_dtype(occ_map_valtype), 1, "C")
occ_map = _read_occ_arr(occ_arr, occ_map_valtype, NB_occ_map_valtype)
return occ_map, date_algorithm, granular_date, no_of_periods
@nb.njit(cache=True, error_model="numpy")
def _read_occ_arr(occ_arr, occ_map_valtype, NB_occ_map_valtype):
"""Reads occurrence file array and returns an occurrence map of event_id to list of (period_no, occ_date_id)
"""
occ_map = nb.typed.Dict.empty(nb_oasis_int, NB_occ_map_valtype)
occ_map_sizes = nb.typed.Dict.empty(nb_oasis_int, nb.types.int64)
for row in occ_arr:
event_id = row["event_id"]
if event_id not in occ_map:
occ_map[event_id] = np.zeros(8, dtype=occ_map_valtype)
occ_map_sizes[event_id] = 0
array = occ_map[event_id]
current_size = occ_map_sizes[event_id]
if current_size >= len(array): # Resize if the array is full
new_array = np.empty(len(array) * 2, dtype=occ_map_valtype)
new_array[:len(array)] = array
array = new_array
occ_map_current_size = occ_map_sizes[event_id]
array[occ_map_current_size]["period_no"] = row["period_no"]
array[occ_map_current_size]["occ_date_id"] = row["occ_date_id"]
occ_map[event_id] = array
occ_map_sizes[event_id] += 1
for event_id in occ_map:
occ_map[event_id] = occ_map[event_id][:occ_map_sizes[event_id]]
return occ_map
@nb.njit(cache=True)
[docs]
def occ_get_date(occ_date_id, granular_date):
"""Returns date as year, month, day, hour, minute from occ_date_id
Args:
occ_date_id (np.int32 | np.int64): occurrence file date id (int64 for granular dates)
granular_date (bool): boolean for whether granular date should be extracted or not
Returns:
(oasis_int, oasis_int, oasis_int, oasis_int, oasis_int): Returns year, month, date, hour, minute
"""
days = occ_date_id / (1440 - 1439 * (not granular_date))
# Function void d(long long g, int& y, int& mm, int& dd) taken from pltcalc.cpp
y = (10000 * days + 14780) // 3652425
ddd = days - (365 * y + y // 4 - y // 100 + y // 400)
if ddd < 0:
y = y - 1
ddd = days - (365 * y + y // 4 - y // 100 + y // 400)
mi = (100 * ddd + 52) // 3060
mm = (mi + 2) % 12 + 1
y = y + (mi + 2) // 12
dd = ddd - (mi * 306 + 5) // 10 + 1
minutes = (occ_date_id % 1440) * granular_date
occ_hour = minutes // 60
occ_minutes = minutes % 60
return y, mm, dd, occ_hour, occ_minutes
@nb.njit(cache=True)
[docs]
def occ_get_date_id(granular_date, occ_year, occ_month, occ_day, occ_hour=0, occ_minute=0):
"""Returns the occ_date_id from year, month, day, hour, minute and whether it is a granular date
Args:
granular_date (bool): boolean for whether granular date should be extracted or not
occ_year (int): Occurrence Year.
occ_month (int): Occurrence Month.
occ_day (int): Occurrence Day.
occ_hour (int): Occurrence Hour. Defaults to 0.
occ_minute (int): Occurrence Minute. Defaults to 0.
Returns:
occ_date_id (np.int64): occurrence file date id (int64 for granular dates)
"""
occ_month = (occ_month + 9) % 12
occ_year = occ_year - occ_month // 10
occ_date_id = np.int64(
365 * occ_year + occ_year // 4 - occ_year // 100 + occ_year // 400 + (occ_month * 306 + 5) // 10 + (occ_day - 1)
)
occ_date_id *= (1440 // (1440 - 1439 * granular_date))
occ_date_id += (60 * occ_hour + occ_minute)
return occ_date_id
[docs]
def read_periods(no_of_periods, run_dir, filename=PERIODS_FILE):
"""Returns an array of period weights for each period between 1 and no_of_periods inclusive (with no gaps).
Args:
no_of_periods (int): Number of periods
run_dir (str | os.PathLike): Path to input files dir
filename (str | os.PathLike): periods binary file name
Returns:
period_weights (ndarray[periods_dtype]): Period weights
"""
periods_fp = Path(run_dir, filename)
if not periods_fp.exists():
# If no periods binary file found, the revert to using period weights reciprocal to no_of_periods
logger.warning(f"Periods file not found at {periods_fp}, using reciprocal calculated period weights based on no_of_periods {no_of_periods}")
period_weights = np.array(
[(i + 1, 1 / no_of_periods) for i in range(no_of_periods)],
dtype=periods_dtype
)
return period_weights
data = load_as_ndarray(run_dir, filename[:-4], periods_dtype, must_exist=True)
# Less data than no_of_periods
if len(data) != no_of_periods:
raise RuntimeError(f"ERROR: no_of_periods does not match total period_no in period binary file {periods_fp}.")
# Sort by period_no
period_weights = np.sort(data, order="period_no")
# Identify any missing periods
expected_periods = np.arange(1, no_of_periods + 1)
actual_periods = period_weights['period_no']
missing_periods = np.setdiff1d(expected_periods, actual_periods)
if len(missing_periods) > 0:
raise RuntimeError(f"ERROR: Missing period_no in period binary file {periods_fp}.")
return period_weights
[docs]
def read_returnperiods(use_return_period_file, run_dir, filename=RETURNPERIODS_FILE):
"""Returns an array of return periods decreasing order with no duplicates.
Args:
use_return_period_file (bool): Bool to use Return Period File
run_dir (str | os.PathLike): Path to input files dir
filename (str | os.PathLike): return periods binary file name
Returns:
return_periods (ndarray[np.int32]): Return Periods
use_return_period_file (bool): Bool to use Return Period File
"""
if not use_return_period_file:
return np.array([], dtype=returnperiods_dtype)["return_period"], use_return_period_file
returnperiods_fp = Path(run_dir, filename)
if not returnperiods_fp.exists():
raise RuntimeError(f"ERROR: Return Periods file not found at {returnperiods_fp}.")
returnperiods = load_as_ndarray(
run_dir,
filename[:-4],
returnperiods_dtype,
must_exist=False
)
if len(returnperiods) == 0:
logger.warning(f"WARNING: Empty return periods file at {returnperiods_fp}. Running without defined return periods option")
return None, False
# Check return periods validity
# Return periods should be unique and decreasing in order
if len(returnperiods) != len(np.unique(returnperiods)):
raise RuntimeError(f"ERROR: Invalid return periods file. Duplicate return periods found: {returnperiods}")
lastrp = -1
for rp in returnperiods["return_period"]:
if lastrp != -1 and lastrp <= rp:
raise RuntimeError(f"ERROR: Invalid return periods file. Non-decreasing return periods found: {returnperiods}")
lastrp = rp
return returnperiods["return_period"], use_return_period_file