import os
import sys
import numba as nb
import numpy as np
import pandas as pd
[docs]
oasis_int = np.dtype(os.environ.get('OASIS_INT', 'i4'))
[docs]
nb_oasis_int = nb.from_dtype(oasis_int)
[docs]
oasis_int_size = oasis_int.itemsize
[docs]
oasis_float = np.dtype(os.environ.get('OASIS_FLOAT', 'f4'))
[docs]
nb_oasis_float = nb.from_dtype(oasis_float)
[docs]
oasis_float_size = oasis_float.itemsize
[docs]
areaperil_int = np.dtype(os.environ.get('AREAPERIL_TYPE', 'u4'))
[docs]
nb_areaperil_int = nb.from_dtype(areaperil_int)
[docs]
areaperil_int_size = areaperil_int.itemsize
[docs]
null_index = oasis_int.type(-1)
# A default buffer size for nd arrays to be initialised to
[docs]
DEFAULT_BUFFER_SIZE = 1_000_000
# Mean type numbers for outputs (SampleType)
[docs]
MEAN_TYPE_ANALYTICAL = 1
# Types
[docs]
aggregatevulnerability_output = [
("aggregate_vulnerability_id", 'i4', "%d"),
("vulnerability_id", 'i4', "%d"),
]
aggregatevulnerability_headers, aggregatevulnerability_dtype, aggregatevulnerability_fmt = generate_output_metadata(aggregatevulnerability_output)
[docs]
amplifications_output = [
("item_id", 'i4', "%d"),
("amplification_id", 'i4', "%d"),
]
amplifications_headers, amplifications_dtype, amplifications_fmt = generate_output_metadata(amplifications_output)
[docs]
cdf_output = [
("event_id", 'i4', "%d"),
("areaperil_id", 'i4', "%d"),
("vulnerability_id", 'i4', "%d"),
("bin_index", 'i4', "%d"),
("prob_to", 'f4', "%f"),
("bin_mean", 'f4', "%f"),
]
cdf_headers, cdf_dtype, cdf_fmt = generate_output_metadata(cdf_output)
complex_items_meta_headers, complex_items_meta_dtype, complex_items_meta_fmt = generate_output_metadata(complex_items_meta_output)
[docs]
correlations_output = [
("item_id", 'i4', "%d"),
("peril_correlation_group", 'i4', "%d"),
("damage_correlation_value", oasis_float, "%f"),
("hazard_group_id", 'i4', "%d"),
("hazard_correlation_value", oasis_float, "%f"),
]
correlations_headers, correlations_dtype, correlations_fmt = generate_output_metadata(correlations_output)
[docs]
coverages_output = [
("coverage_id", 'i4', "%d"),
("tiv", oasis_float, "%f"),
]
coverages_headers, coverages_dtype, coverages_fmt = generate_output_metadata(coverages_output)
[docs]
damagebin_output = [
("bin_index", 'i4', "%d"),
("bin_from", oasis_float, "%f"),
("bin_to", oasis_float, "%f"),
("interpolation", oasis_float, "%f"),
("damage_type", 'i4', "%d"),
]
damagebin_headers, damagebin_dtype, damagebin_fmt = generate_output_metadata(damagebin_output)
[docs]
eve_output = [
("event_id", oasis_int, "%d")
]
eve_headers, eve_dtype, eve_fmt = generate_output_metadata(eve_output)
footprint_event_headers, footprint_event_dtype, footprint_event_fmt = generate_output_metadata(footprint_event_output)
[docs]
fm_output = [
("event_id", 'i4', "%d"),
("output_id", 'i4', "%d"),
("sidx", 'i4', "%d"),
("loss", oasis_float, "%.2f"),
]
fm_headers, fm_dtype, fm_fmt = generate_output_metadata(fm_output)
[docs]
fm_policytc_output = [
("level_id", 'i4', "%d"),
("agg_id", 'i4', "%d"),
("layer_id", 'i4', "%d"),
("profile_id", 'i4', "%d"),
]
fm_policytc_headers, fm_policytc_dtype, fm_policytc_fmt = generate_output_metadata(fm_policytc_output)
[docs]
fm_profile_output = [
("profile_id", 'i4', "%d"),
("calcrule_id", 'i4', "%d"),
("deductible1", oasis_float, "%f"),
("deductible2", oasis_float, "%f"),
("deductible3", oasis_float, "%f"),
("attachment1", oasis_float, "%f"),
("limit1", oasis_float, "%f"),
("share1", oasis_float, "%f"),
("share2", oasis_float, "%f"),
("share3", oasis_float, "%f"),
]
fm_profile_headers, fm_profile_dtype, fm_profile_fmt = generate_output_metadata(fm_profile_output)
[docs]
fm_profile_step_output = [
("profile_id", 'i4', "%d"),
("calcrule_id", 'i4', "%d"),
("deductible1", oasis_float, "%f"),
("deductible2", oasis_float, "%f"),
("deductible3", oasis_float, "%f"),
("attachment1", oasis_float, "%f"),
("limit1", oasis_float, "%f"),
("share1", oasis_float, "%f"),
("share2", oasis_float, "%f"),
("share3", oasis_float, "%f"),
("step_id", 'i4', "%d"),
("trigger_start", oasis_float, "%f"),
("trigger_end", oasis_float, "%f"),
("payout_start", oasis_float, "%f"),
("payout_end", oasis_float, "%f"),
("limit2", oasis_float, "%f"),
("scale1", oasis_float, "%f"),
("scale2", oasis_float, "%f"),
]
fm_profile_step_headers, fm_profile_step_dtype, fm_profile_step_fmt = generate_output_metadata(fm_profile_step_output)
[docs]
fm_programme_output = [
("from_agg_id", 'i4', "%d"),
("level_id", 'i4', "%d"),
("to_agg_id", 'i4', "%d"),
]
fm_programme_headers, fm_programme_dtype, fm_programme_fmt = generate_output_metadata(fm_programme_output)
[docs]
fm_summary_xref_output = [
("output", 'i4', "%d"),
("summary_id", 'i4', "%d"),
("summaryset_id", 'i4', "%d")
]
fm_summary_xref_headers, fm_summary_xref_dtype, fm_summary_xref_fmt = generate_output_metadata(fm_summary_xref_output)
[docs]
fm_xref_output = [
("output", 'i4', "%d"),
("agg_id", 'i4', "%d"),
("layer_id", 'i4', "%d"),
]
fm_xref_headers, fm_xref_dtype, fm_xref_fmt = generate_output_metadata(fm_xref_output)
[docs]
gul_output = [
("event_id", 'i4', "%d"),
("item_id", 'i4', "%d"),
("sidx", 'i4', "%d"),
("loss", oasis_float, "%.2f"),
]
gul_headers, gul_dtype, gul_fmt = generate_output_metadata(gul_output)
[docs]
gul_summary_xref_output = [
("item_id", 'i4', "%d"),
("summary_id", 'i4', "%d"),
("summaryset_id", 'i4', "%d")
]
gul_summary_xref_headers, gul_summary_xref_dtype, gul_summary_xref_fmt = generate_output_metadata(gul_summary_xref_output)
[docs]
items_output = [
("item_id", 'i4', "%d"),
("coverage_id", 'i4', "%d"),
("areaperil_id", areaperil_int, "%u"),
("vulnerability_id", 'i4', "%d"),
("group_id", 'i4', "%d"),
]
items_headers, items_dtype, items_fmt = generate_output_metadata(items_output)
[docs]
lossfactors_output = [
("event_id", 'i4', "%d"),
("amplification_id", 'i4', "%d"),
("factor", 'f4', "%.2f"),
]
lossfactors_headers, lossfactors_dtype, lossfactors_fmt = generate_output_metadata(lossfactors_output)
[docs]
occurrence_output = [
("event_id", 'i4', "%d"),
("period_no", 'i4', "%d"),
("occ_date_id", 'i4', "%d"),
]
occurrence_headers, occurrence_dtype, occurrence_fmt = generate_output_metadata(occurrence_output)
[docs]
occurrence_granular_output = [
("event_id", 'i4', "%d"),
("period_no", 'i4', "%d"),
("occ_date_id", 'i8', "%d"),
]
occurrence_granular_headers, occurrence_granular_dtype, occurrence_granular_fmt = generate_output_metadata(occurrence_granular_output)
[docs]
periods_output = [
("period_no", 'i4', "%d"),
("weighting", 'f8', "%0.9lf"),
]
periods_headers, periods_dtype, periods_fmt = generate_output_metadata(periods_output)
[docs]
quantile_output = [
("quantile", 'f4', "%f"),
]
quantile_headers, quantile_dtype, quantile_fmt = generate_output_metadata(quantile_output)
[docs]
quantile_interval_output = quantile_output + [
('integer_part', oasis_int, "%d"),
('fractional_part', oasis_float, "%f"),
]
quantile_interval_headers, quantile_interval_dtype, quantile_interval_fmt = generate_output_metadata(quantile_interval_output)
[docs]
random_output = [
("random_no", 'f4', "%f"),
]
random_headers, random_dtype, random_fmt = generate_output_metadata(random_output)
[docs]
returnperiods_output = [
("return_period", 'i4', "%d"),
]
returnperiods_headers, returnperiods_dtype, returnperiods_fmt = generate_output_metadata(returnperiods_output)
[docs]
vulnerability_output = [
("vulnerability_id", 'i4', "%d"),
("intensity_bin_id", 'i4', "%d"),
("damage_bin_id", 'i4', "%d"),
("probability", oasis_float, "%.6f"),
]
vulnerability_headers, vulnerability_dtype, vulnerability_fmt = generate_output_metadata(vulnerability_output)
[docs]
vulnerability_weight_output = [
("areaperil_id", areaperil_int, "%d"),
("vulnerability_id", 'i4', "%d"),
("weight", oasis_float, "%f"),
]
vulnerability_weight_headers, vulnerability_weight_dtype, vulnerability_weight_fmt = generate_output_metadata(vulnerability_weight_output)
[docs]
def load_as_ndarray(dir_path, name, _dtype, must_exist=True, col_map=None):
"""
load a file as a numpy ndarray
useful for multi-columns files
Args:
dir_path: path to the directory where the binary or csv file is stored
name: name of the file
_dtype: np.dtype
must_exist: raise FileNotFoundError if no file is present
col_map: name re-mapping to change name of csv columns
Returns:
numpy ndarray
"""
if os.path.isfile(os.path.join(dir_path, name + '.bin')):
return np.fromfile(os.path.join(dir_path, name + '.bin'), dtype=_dtype)
elif must_exist or os.path.isfile(os.path.join(dir_path, name + '.csv')):
# in csv column cam be out of order and have different name,
# we load with pandas and write each column to the ndarray
if col_map is None:
col_map = {}
with open(os.path.join(dir_path, name + '.csv')) as file_in:
cvs_dtype = {col_map.get(key, key): col_dtype for key, (col_dtype, _) in _dtype.fields.items()}
df = pd.read_csv(file_in, delimiter=',', dtype=cvs_dtype, usecols=list(cvs_dtype.keys()))
res = np.empty(df.shape[0], dtype=_dtype)
for name in _dtype.names:
res[name] = df[col_map.get(name, name)]
return res
else:
return np.empty(0, dtype=_dtype)
[docs]
def load_as_array(dir_path, name, _dtype, must_exist=True):
"""
load file as a single numpy array,
useful for files with a binary version with only one type of value where their index correspond to an id.
For example coverage.bin only contains tiv value for each coverage id
coverage_id n correspond to index n-1
Args:
dir_path: path to the directory where the binary or csv file is stored
name: name of the file
_dtype: numpy dtype of the required array
must_exist: raise FileNotFoundError if no file is present
Returns:
numpy array of dtype type
"""
fp = os.path.join(dir_path, name + '.bin')
if os.path.isfile(fp):
return np.fromfile(fp, dtype=_dtype)
elif must_exist or os.path.isfile(os.path.join(dir_path, name + '.csv')):
fp = os.path.join(dir_path, name + '.csv')
with open(fp) as file_in:
return np.loadtxt(file_in, dtype=_dtype, delimiter=',', skiprows=1, usecols=1)
else:
return np.empty(0, dtype=_dtype)
[docs]
def write_ndarray_to_fmt_csv(output_file, data, headers, row_fmt):
"""Writes a custom dtype array with headers to csv with the provided row_fmt str
This function is a faster replacement for np.savetxt as it formats each row one at a time before writing to csv.
We create one large string, and formats all the data at once, and writes all the data at once.
WARNING: untested with string types in custom data.
Args:
output_file (io.TextIOWrapper): CSV file
data (ndarray[<custom dtype>]): Custom dtype ndarray with column names
headers (list[str]): Column names for custom ndarray
row_fmt (str): Format for each row in csv
"""
if len(headers) != len(row_fmt.split(",")):
raise RuntimeError(f"ERROR: write_ndarray_to_fmt_csv requires row_fmt ({row_fmt}) and headers ({headers}) to have the same length.")
# Copy data as np.ravel does not work with custom dtype arrays
# Default type of np.empty is np.float64.
data_cpy = np.empty((data.shape[0], len(headers)))
for i in range(len(headers)):
data_cpy[:, i] = data[headers[i]]
# Create one large formatted string
final_fmt = "\n".join([row_fmt] * data_cpy.shape[0])
str_data = final_fmt % tuple(np.ravel(data_cpy))
output_file.write(str_data)
output_file.write("\n")
[docs]
float_equal_precision = np.finfo(oasis_float).eps
@nb.njit(cache=True)
[docs]
def almost_equal(a, b):
return abs(a - b) < float_equal_precision
[docs]
def resolve_file(path, mode, stack):
"""Resolve file path to open file or use sys.stdin
Args:
path (str | os.PathLike): File path or "-" indicationg standard input/output.
mode (str): Mode to open file ("r", "rb, "w", "wb").
stack (ExitStack): Context manager stack used to manage file lifecycle.
Returns:
file (IO): A file-like object opened in the specified mode.
"""
is_read = "r" in mode
is_binary = "b" in mode
if str(path) == "-":
if is_read:
return sys.stdin.buffer if is_binary else sys.stdin
else:
return sys.stdout.buffer if is_binary else sys.stdout
else:
return stack.enter_context(open(path, mode))