import os
import sys
import numba as nb
import numpy as np
import pandas as pd
[docs]
oasis_int = np.dtype(os.environ.get('OASIS_INT', 'i4'))
[docs]
nb_oasis_int = nb.from_dtype(oasis_int)
[docs]
oasis_int_size = oasis_int.itemsize
[docs]
oasis_float = np.dtype(os.environ.get('OASIS_FLOAT', 'f4'))
[docs]
nb_oasis_float = nb.from_dtype(oasis_float)
[docs]
oasis_float_size = oasis_float.itemsize
[docs]
areaperil_int = np.dtype(os.environ.get('OASIS_AREAPERIL_TYPE', 'u4'))
[docs]
nb_areaperil_int = nb.from_dtype(areaperil_int)
[docs]
areaperil_int_size = areaperil_int.itemsize
[docs]
null_index = oasis_int.type(-1)
# A default buffer size for nd arrays to be initialised to
[docs]
DEFAULT_BUFFER_SIZE = 1_000_000
# Mean type numbers for outputs (SampleType)
[docs]
MEAN_TYPE_ANALYTICAL = 1
# single type definition index
[docs]
NAME_DTYPE_SLICE = slice(2)
# single type definition (alphabetical order)
[docs]
agg_id = ("agg_id", 'i4', "%d")
[docs]
aggregate_vulnerability_id = ("aggregate_vulnerability_id", 'i4', "%d")
[docs]
amplification_id = ("amplification_id", 'i4', "%d")
[docs]
areaperil_id = ("areaperil_id", areaperil_int, "%d")
[docs]
attachment1 = ("attachment1", oasis_float, "%f")
[docs]
bin_from = ("bin_from", oasis_float, "%f")
[docs]
bin_index = ("bin_index", 'i4', "%d")
[docs]
bin_mean = ("bin_mean", 'f4', "%f")
[docs]
bin_to = ("bin_to", oasis_float, "%f")
[docs]
calcrule_id = ("calcrule_id", 'i4', "%d")
[docs]
coverage_id = ("coverage_id", 'u4', "%u")
[docs]
damage_bin_id = ("damage_bin_id", 'i4', "%d")
[docs]
damage_correlation_value = ("damage_correlation_value", oasis_float, "%f")
[docs]
damage_type = ("damage_type", 'i4', "%d")
[docs]
deductible1 = ("deductible1", oasis_float, "%f")
[docs]
deductible2 = ("deductible2", oasis_float, "%f")
[docs]
deductible3 = ("deductible3", oasis_float, "%f")
[docs]
event_id = ("event_id", 'i4', "%d")
[docs]
factor = ("factor", 'f4', "%.2f") # Amplification factor
[docs]
from_agg_id = ("from_agg_id", 'i4', "%d")
[docs]
group_id = ("group_id", 'u4', "%u")
[docs]
hazard_correlation_value = ("hazard_correlation_value", oasis_float, "%f")
[docs]
hazard_group_id = ("hazard_group_id", 'i4', "%d")
[docs]
intensity_adjustment = ('intensity_adjustment', 'i4', "%d")
[docs]
intensity_bin_id = ('intensity_bin_id', 'i4', "%d")
[docs]
interpolation = ("interpolation", oasis_float, "%f")
[docs]
item_id = ("item_id", 'i4', "%d")
[docs]
item_return_period = ('return_period', 'i4', "%d")
[docs]
layer_id = ("layer_id", 'i4', "%d")
[docs]
level_id = ("level_id", 'i4', "%d")
[docs]
limit1 = ("limit1", oasis_float, "%f")
[docs]
limit2 = ("limit2", oasis_float, "%f")
[docs]
loss = ("loss", oasis_float, "%.2f")
[docs]
model_data_len = ("model_data_len", 'u4', "%u")
[docs]
occ_date_id = ("occ_date_id", 'i4', "%d")
[docs]
occ_date_id_granular = ("occ_date_id", 'i8', "%d")
[docs]
output_id = ("output_id", 'i4', "%d")
[docs]
output = ("output", output_id[1], output_id[2])
[docs]
payout_end = ("payout_end", oasis_float, "%f")
[docs]
payout_start = ("payout_start", oasis_float, "%f")
[docs]
peril_correlation_group = ("peril_correlation_group", 'i4', "%d")
[docs]
period_no = ("period_no", 'i4', "%d")
[docs]
period_weighting = ("weighting", 'f8', "%0.9lf")
[docs]
prob_to = ("prob_to", 'f4', "%f")
[docs]
probability = ('probability', oasis_float, "%.6f")
[docs]
profile_id = ("profile_id", 'i4', "%d")
[docs]
quantile = ("quantile", 'f4', "%f")
[docs]
quantile_fractional_part = ('fractional_part', oasis_float, "%f")
[docs]
quantile_integer_part = ('integer_part', oasis_int, "%d")
[docs]
random_no = ("random_no", 'f4', "%f")
[docs]
return_period = ("return_period", 'i4', "%d")
[docs]
scale1 = ("scale1", oasis_float, "%f")
[docs]
scale2 = ("scale2", oasis_float, "%f")
[docs]
section_id = ("scale2", oasis_int, "%d")
[docs]
share1 = ("share1", oasis_float, "%f")
[docs]
share2 = ("share2", oasis_float, "%f")
[docs]
share3 = ("share3", oasis_float, "%f")
[docs]
sidx = ("sidx", 'i4', "%d")
[docs]
step_id = ("step_id", 'i4', "%d")
[docs]
summary_id = ("summary_id", 'i4', "%d")
[docs]
summaryset_id = ("summaryset_id", 'i4', "%d")
[docs]
tiv = ("tiv", oasis_float, "%f")
[docs]
to_agg_id = ("to_agg_id", 'i4', "%d")
[docs]
trigger_end = ("trigger_end", oasis_float, "%f")
[docs]
trigger_start = ("trigger_start", oasis_float, "%f")
[docs]
vulnerability_id = ("vulnerability_id", 'i4', "%d")
[docs]
vulnerability_weight = ("weight", oasis_float, "%f")
# Types
[docs]
aggregatevulnerability_output = [
aggregate_vulnerability_id,
vulnerability_id,
]
aggregatevulnerability_headers, aggregatevulnerability_dtype, aggregatevulnerability_fmt = generate_output_metadata(aggregatevulnerability_output)
[docs]
amplifications_output = [
item_id,
amplification_id,
]
amplifications_headers, amplifications_dtype, amplifications_fmt = generate_output_metadata(amplifications_output)
[docs]
cdf_output = [
event_id,
areaperil_id,
vulnerability_id,
bin_index,
prob_to,
bin_mean,
]
cdf_headers, cdf_dtype, cdf_fmt = generate_output_metadata(cdf_output)
complex_items_meta_headers, complex_items_meta_dtype, complex_items_meta_fmt = generate_output_metadata(complex_items_meta_output)
[docs]
correlations_output = [
item_id,
peril_correlation_group,
damage_correlation_value,
hazard_group_id,
hazard_correlation_value,
]
correlations_headers, correlations_dtype, correlations_fmt = generate_output_metadata(correlations_output)
[docs]
coverages_output = [
coverage_id,
tiv,
]
coverages_headers, coverages_dtype, coverages_fmt = generate_output_metadata(coverages_output)
[docs]
damagebin_output = [
bin_index,
bin_from,
bin_to,
interpolation,
damage_type,
]
damagebin_headers, damagebin_dtype, damagebin_fmt = generate_output_metadata(damagebin_output)
[docs]
eve_output = [
event_id
]
eve_headers, eve_dtype, eve_fmt = generate_output_metadata(eve_output)
footprint_event_headers, footprint_event_dtype, footprint_event_fmt = generate_output_metadata(footprint_event_output)
[docs]
fm_output = [
event_id,
output_id,
sidx,
loss,
]
fm_headers, fm_dtype, fm_fmt = generate_output_metadata(fm_output)
[docs]
fm_policytc_output = [
level_id,
agg_id,
layer_id,
profile_id,
]
fm_policytc_headers, fm_policytc_dtype, fm_policytc_fmt = generate_output_metadata(fm_policytc_output)
[docs]
fm_profile_output = [
profile_id,
calcrule_id,
deductible1,
deductible2,
deductible3,
attachment1,
limit1,
share1,
share2,
share3,
]
fm_profile_headers, fm_profile_dtype, fm_profile_fmt = generate_output_metadata(fm_profile_output)
[docs]
fm_profile_step_output = [
profile_id,
calcrule_id,
deductible1,
deductible2,
deductible3,
attachment1,
limit1,
share1,
share2,
share3,
step_id,
trigger_start,
trigger_end,
payout_start,
payout_end,
limit2,
scale1,
scale2,
]
fm_profile_step_headers, fm_profile_step_dtype, fm_profile_step_fmt = generate_output_metadata(fm_profile_step_output)
[docs]
fm_programme_output = [
from_agg_id,
level_id,
to_agg_id,
]
fm_programme_headers, fm_programme_dtype, fm_programme_fmt = generate_output_metadata(fm_programme_output)
[docs]
fm_summary_xref_output = [
output,
summary_id,
summaryset_id
]
fm_summary_xref_headers, fm_summary_xref_dtype, fm_summary_xref_fmt = generate_output_metadata(fm_summary_xref_output)
[docs]
fm_xref_output = [
output,
agg_id,
layer_id,
]
fm_xref_headers, fm_xref_dtype, fm_xref_fmt = generate_output_metadata(fm_xref_output)
[docs]
gul_output = [
event_id,
item_id,
sidx,
loss,
]
gul_headers, gul_dtype, gul_fmt = generate_output_metadata(gul_output)
[docs]
gul_summary_xref_output = [
item_id,
summary_id,
summaryset_id
]
gul_summary_xref_headers, gul_summary_xref_dtype, gul_summary_xref_fmt = generate_output_metadata(gul_summary_xref_output)
[docs]
items_output = [
item_id,
coverage_id,
areaperil_id,
vulnerability_id,
group_id
]
items_headers, items_dtype, items_fmt = generate_output_metadata(items_output)
[docs]
item_adjustment = [
item_id,
intensity_adjustment,
item_return_period
]
item_adjustment_headers, item_adjustment_dtype, item_adjustment_fmt = generate_output_metadata(item_adjustment)
[docs]
lossfactors_output = [
event_id,
amplification_id,
factor,
]
lossfactors_headers, lossfactors_dtype, lossfactors_fmt = generate_output_metadata(lossfactors_output)
[docs]
occurrence_output = [
event_id,
period_no,
occ_date_id,
]
occurrence_headers, occurrence_dtype, occurrence_fmt = generate_output_metadata(occurrence_output)
[docs]
occurrence_granular_output = [
event_id,
period_no,
occ_date_id_granular,
]
occurrence_granular_headers, occurrence_granular_dtype, occurrence_granular_fmt = generate_output_metadata(occurrence_granular_output)
[docs]
periods_output = [
period_no,
period_weighting,
]
periods_headers, periods_dtype, periods_fmt = generate_output_metadata(periods_output)
[docs]
quantile_output = [
quantile,
]
quantile_headers, quantile_dtype, quantile_fmt = generate_output_metadata(quantile_output)
[docs]
quantile_interval_output = quantile_output + [
quantile_integer_part,
quantile_fractional_part,
]
quantile_interval_headers, quantile_interval_dtype, quantile_interval_fmt = generate_output_metadata(quantile_interval_output)
[docs]
random_output = [
random_no,
]
random_headers, random_dtype, random_fmt = generate_output_metadata(random_output)
[docs]
returnperiods_output = [
return_period,
]
returnperiods_headers, returnperiods_dtype, returnperiods_fmt = generate_output_metadata(returnperiods_output)
[docs]
vulnerability_output = [
vulnerability_id,
intensity_bin_id,
damage_bin_id,
probability,
]
vulnerability_headers, vulnerability_dtype, vulnerability_fmt = generate_output_metadata(vulnerability_output)
[docs]
vulnerability_weight_output = [
areaperil_id,
vulnerability_id,
vulnerability_weight,
]
vulnerability_weight_headers, vulnerability_weight_dtype, vulnerability_weight_fmt = generate_output_metadata(vulnerability_weight_output)
[docs]
def load_as_ndarray(dir_path, name, _dtype, must_exist=True, col_map=None):
"""
load a file as a numpy ndarray
useful for multi-columns files
Args:
dir_path: path to the directory where the binary or csv file is stored
name: name of the file
_dtype: np.dtype
must_exist: raise FileNotFoundError if no file is present
col_map: name re-mapping to change name of csv columns
Returns:
numpy ndarray
"""
if os.path.isfile(os.path.join(dir_path, name + '.bin')):
return np.fromfile(os.path.join(dir_path, name + '.bin'), dtype=_dtype)
elif must_exist or os.path.isfile(os.path.join(dir_path, name + '.csv')):
# in csv column cam be out of order and have different name,
# we load with pandas and write each column to the ndarray
if col_map is None:
col_map = {}
with open(os.path.join(dir_path, name + '.csv')) as file_in:
cvs_dtype = {col_map.get(key, key): col_dtype for key, (col_dtype, _) in _dtype.fields.items()}
df = pd.read_csv(file_in, delimiter=',', dtype=cvs_dtype, usecols=list(cvs_dtype.keys()))
res = np.empty(df.shape[0], dtype=_dtype)
for name in _dtype.names:
res[name] = df[col_map.get(name, name)]
return res
else:
return np.empty(0, dtype=_dtype)
[docs]
def load_as_array(dir_path, name, _dtype, must_exist=True):
"""
load file as a single numpy array,
useful for files with a binary version with only one type of value where their index correspond to an id.
For example coverage.bin only contains tiv value for each coverage id
coverage_id n correspond to index n-1
Args:
dir_path: path to the directory where the binary or csv file is stored
name: name of the file
_dtype: numpy dtype of the required array
must_exist: raise FileNotFoundError if no file is present
Returns:
numpy array of dtype type
"""
fp = os.path.join(dir_path, name + '.bin')
if os.path.isfile(fp):
return np.fromfile(fp, dtype=_dtype)
elif must_exist or os.path.isfile(os.path.join(dir_path, name + '.csv')):
fp = os.path.join(dir_path, name + '.csv')
with open(fp) as file_in:
return np.loadtxt(file_in, dtype=_dtype, delimiter=',', skiprows=1, usecols=1)
else:
return np.empty(0, dtype=_dtype)
[docs]
def write_ndarray_to_fmt_csv(output_file, data, headers, row_fmt):
"""Writes a custom dtype array with headers to csv with the provided row_fmt str
This function is a faster replacement for np.savetxt as it formats each row one at a time before writing to csv.
We create one large string, and formats all the data at once, and writes all the data at once.
WARNING: untested with string types in custom data.
Args:
output_file (io.TextIOWrapper): CSV file
data (ndarray[<custom dtype>]): Custom dtype ndarray with column names
headers (list[str]): Column names for custom ndarray
row_fmt (str): Format for each row in csv
"""
if len(headers) != len(row_fmt.split(",")):
raise RuntimeError(f"ERROR: write_ndarray_to_fmt_csv requires row_fmt ({row_fmt}) and headers ({headers}) to have the same length.")
# Copy data as np.ravel does not work with custom dtype arrays
# Default type of np.empty is np.float64.
data_cpy = np.empty((data.shape[0], len(headers)))
for i in range(len(headers)):
data_cpy[:, i] = data[headers[i]]
# Create one large formatted string
final_fmt = "\n".join([row_fmt] * data_cpy.shape[0])
str_data = final_fmt % tuple(np.ravel(data_cpy))
output_file.write(str_data)
output_file.write("\n")
[docs]
float_equal_precision = np.finfo(oasis_float).eps
@nb.njit(cache=True)
[docs]
def almost_equal(a, b):
return abs(a - b) < float_equal_precision
[docs]
def resolve_file(path, mode, stack):
"""Resolve file path to open file or use sys.stdin
Args:
path (str | os.PathLike): File path or "-" indicationg standard input/output.
mode (str): Mode to open file ("r", "rb, "w", "wb").
stack (ExitStack): Context manager stack used to manage file lifecycle.
Returns:
file (IO): A file-like object opened in the specified mode.
"""
is_read = "r" in mode
is_binary = "b" in mode
if str(path) == "-":
if is_read:
return sys.stdin.buffer if is_binary else sys.stdin
else:
return sys.stdout.buffer if is_binary else sys.stdout
else:
return stack.enter_context(open(path, mode))