Source code for oasislmf.pytools.getmodel.footprint
"""
This file houses the classes that load the footprint data from compressed, binary, and CSV files.
"""
import json
import logging
import mmap
import os
from contextlib import ExitStack
from typing import Dict, List, Union
from zlib import decompress
import numpy as np
import pandas as pd
import numba as nb
from oasis_data_manager.df_reader.config import clean_config, InputReaderConfig, get_df_reader
from oasis_data_manager.df_reader.reader import OasisReader
from oasis_data_manager.filestore.backends.base import BaseStorage
from .common import (FootprintHeader, EventIndexBin, EventIndexBinZ, Event, EventCSV,
footprint_filename, footprint_index_filename, zfootprint_filename, zfootprint_index_filename,
csvfootprint_filename, parquetfootprint_filename, parquetfootprint_meta_filename,
event_defintion_filename, hazard_case_filename, fp_format_priorities)
[docs]
class OasisFootPrintError(Exception):
"""
Raises exceptions when loading footprints.
"""
def __init__(self, message: str) -> None:
"""
The constructor of the OasisFootPrintError class.
Args:
message: (str) the message to be raised
"""
super().__init__(message)
[docs]
class Footprint:
"""
This class is the base class for the footprint loaders.
Attributes:
storage (BaseStorage): the storage object used to lookup files
stack (ExitStack): the context manager that combines other context managers and cleanup functions
"""
def __init__(self, storage: BaseStorage, df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader") -> None:
"""
The constructor for the Footprint class.
Args:
storage (BaseStorage): the storage object used to lookup files
"""
def __exit__(self, exc_type, exc_value, exc_traceback):
self.stack.__exit__(exc_type, exc_value, exc_traceback)
@staticmethod
[docs]
def get_footprint_fmt_priorities():
"""
Get list of footprint file format classes in order of priority.
Returns: (list) footprint file format classes
"""
format_to_class = {
'parquet': FootprintParquet, 'csv': FootprintCsv,
'binZ': FootprintBinZ, 'bin': FootprintBin,
'parquet_dynamic': FootprintParquetDynamic,
}
priorities = [format_to_class[fmt] for fmt in fp_format_priorities if fmt in format_to_class]
return priorities
@classmethod
[docs]
def load(
cls,
storage: BaseStorage,
ignore_file_type=set(),
df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader",
):
"""
Loads the loading classes defined in this file checking to see if the files are in the static path
whilst doing so. The loading goes through the hierarchy with the following order:
-> parquet
-> compressed binary file
-> binary file
-> CSV file
-> parquet (with dynamic generation)
If the compressed binary file is present, this will be loaded. If it is not, then the binary file will be loaded
and so on.
Args:
storage (BaseStorage): the storage object used to lookup files
ignore_file_type (Set[str]): type of file to be skipped in the hierarchy. This can be a choice of:
parquet
json
z
bin
idx
Returns: (Union[FootprintBinZ, FootprintBin, FootprintCsv]) the loaded class
"""
for footprint_class in cls.get_footprint_fmt_priorities():
for filename in footprint_class.footprint_filenames:
if (not storage.exists(filename) or filename.rsplit('.', 1)[-1] in ignore_file_type):
valid = False
break
else:
valid = True
if valid:
for filename in footprint_class.footprint_filenames:
logger.debug(f"loading {filename}")
return footprint_class(storage, df_engine=df_engine)
else:
if storage.isfile("footprint.parquet"):
raise OasisFootPrintError(
message="footprint.parquet needs to be partitioned in order to work, please see: "
"oasislmf.pytools.data_layer.conversions.footprint => convert_bin_to_parquet"
)
raise OasisFootPrintError(message="no valid footprint found")
[docs]
def get_df_reader(self, filepath, **kwargs) -> OasisReader:
# load the base df engine config and add the connection parameters
df_reader_config = clean_config(InputReaderConfig(
filepath=filepath,
engine=self.df_engine
))
df_reader_config["engine"]["options"]["storage"] = self.storage
return get_df_reader(df_reader_config, **kwargs)
@staticmethod
[docs]
def prepare_df_data(data_frame: pd.DataFrame) -> np.array:
"""
Reads footprint data from a parquet file.
Returns: (np.array) footprint data loaded from the parquet file
"""
areaperil_id = data_frame["areaperil_id"].to_numpy()
intensity_bin_id = data_frame["intensity_bin_id"].to_numpy()
probability = data_frame["probability"].to_numpy()
buffer = np.empty(len(areaperil_id), dtype=Event)
outcome = stitch_data(areaperil_id, intensity_bin_id, probability, buffer)
return np.array(outcome, dtype=Event)
[docs]
class FootprintCsv(Footprint):
"""
This class is responsible for loading footprint data from CSV.
Attributes (when in context):
footprint (np.array[EventCSV]): event data loaded from the CSV file
num_intensity_bins (int): number of intensity bins in the data
has_intensity_uncertainty (bool): if the data has uncertainty
footprint_index (dict): map of footprint IDs with the index in the data
"""
def __enter__(self):
self.reader = self.get_df_reader("footprint.csv", dtype=EventCSV)
self.num_intensity_bins = self.reader.query(lambda df: df['intensity_bin_id'].max())
self.has_intensity_uncertainty = self.reader.query(
lambda df: df.groupby(['event_id', 'areaperil_id']).size().max() > 1
)
def _fn(df):
footprint_index_df = df.groupby('event_id', as_index=False).size()
footprint_index_df['offset'] = footprint_index_df['size'].cumsum() - footprint_index_df['size']
footprint_index_df.set_index('event_id', inplace=True)
return footprint_index_df
self.footprint_index = self.reader.query(_fn).to_dict('index')
return self
[docs]
def get_event(self, event_id):
"""
Gets the event from self.footprint based off the event ID passed in.
Args:
event_id: (int) the ID belonging to the Event being extracted
Returns: (np.array[EventCSV]) the event that was extracted
"""
event_info = self.footprint_index.get(event_id)
if event_info is None:
return
else:
return self.prepare_df_data(self.reader.filter(lambda df: df[df["event_id"] == event_id]).as_pandas())
[docs]
class FootprintBin(Footprint):
"""
This class is responsible loading the event data from the footprint binary files.
Attributes (when in context):
footprint (mmap.mmap): loaded data from the binary file which has header and then Event data
num_intensity_bins (int): number of intensity bins in the data
has_intensity_uncertainty (bool): if the data has uncertainty
footprint_index (dict): map of footprint IDs with the index in the data
"""
def __enter__(self):
footprint_file = self.stack.enter_context(self.storage.with_fileno(footprint_filename))
self.footprint = mmap.mmap(footprint_file.fileno(), length=0, access=mmap.ACCESS_READ)
footprint_header = np.frombuffer(bytearray(self.footprint[:FootprintHeader.size]), dtype=FootprintHeader)
self.num_intensity_bins = int(footprint_header['num_intensity_bins'])
self.has_intensity_uncertainty = int(footprint_header['has_intensity_uncertainty'] & intensityMask)
f = self.stack.enter_context(self.storage.with_fileno(footprint_index_filename))
footprint_mmap = np.memmap(f, dtype=EventIndexBin, mode='r')
self.footprint_index = pd.DataFrame(
footprint_mmap,
columns=footprint_mmap.dtype.names
).set_index('event_id').to_dict('index')
return self
[docs]
def get_event(self, event_id):
"""
Gets the event from self.footprint based off the event ID passed in.
Args:
event_id: (int) the ID belonging to the Event being extracted
Returns: (np.array(Event)) the event that was extracted
"""
event_info = self.footprint_index.get(event_id)
if event_info is None:
return
else:
return np.frombuffer(self.footprint[event_info['offset']: event_info['offset'] + event_info['size']], Event)
[docs]
class FootprintBinZ(Footprint):
"""
This class is responsible for loading event data from compressed event data.
Attributes (when in context):
zfootprint (mmap.mmap): loaded data from the compressed binary file which has header and then Event data
num_intensity_bins (int): number of intensity bins in the data
has_intensity_uncertainty (bool): if the data has uncertainty
uncompressed_size (int): the size in which the data is when it is decompressed
index_dtype (Union[EventIndexBinZ, EventIndexBin]) the data type
footprint_index (dict): map of footprint IDs with the index in the data
"""
def __enter__(self):
zfootprint_file = self.stack.enter_context(self.storage.with_fileno(zfootprint_filename))
self.zfootprint = mmap.mmap(zfootprint_file.fileno(), length=0, access=mmap.ACCESS_READ)
footprint_header = np.frombuffer(bytearray(self.zfootprint[:FootprintHeader.size]), dtype=FootprintHeader)
self.num_intensity_bins = int(footprint_header['num_intensity_bins'])
self.has_intensity_uncertainty = int(footprint_header['has_intensity_uncertainty'] & intensityMask)
self.uncompressed_size = int((footprint_header['has_intensity_uncertainty'] & uncompressedMask) >> 1)
if self.uncompressed_size:
self.index_dtype = EventIndexBinZ
else:
self.index_dtype = EventIndexBin
f = self.stack.enter_context(self.storage.with_fileno(zfootprint_index_filename))
zfootprint_mmap = np.memmap(f, dtype=self.index_dtype, mode='r')
self.footprint_index = pd.DataFrame(zfootprint_mmap, columns=zfootprint_mmap.dtype.names).set_index('event_id').to_dict('index')
return self
[docs]
def get_event(self, event_id):
"""
Gets the event from self.zfootprint based off the event ID passed in.
Args:
event_id: (int) the ID belonging to the Event being extracted
Returns: (np.array[Event]) the event that was extracted
"""
event_info = self.footprint_index.get(event_id)
if event_info is None:
return
else:
zdata = self.zfootprint[event_info['offset']: event_info['offset'] + event_info['size']]
data = decompress(zdata)
return np.frombuffer(data, Event)
[docs]
class FootprintParquet(Footprint):
"""
This class is responsible for loading event data from parquet event data.
Attributes (when in context):
num_intensity_bins (int): number of intensity bins in the data
has_intensity_uncertainty (bool): if the data has uncertainty
footprint_index (dict): map of footprint IDs with the index in the data
"""
def __enter__(self):
with self.storage.open(parquetfootprint_meta_filename, 'r') as outfile:
meta_data: Dict[str, Union[int, bool]] = json.load(outfile)
self.num_intensity_bins = int(meta_data['num_intensity_bins'])
self.has_intensity_uncertainty = int(meta_data['has_intensity_uncertainty'] & intensityMask)
return self
[docs]
def get_event(self, event_id: int):
"""
Gets the event data from the partitioned parquet data file.
Args:
event_id: (int) the ID belonging to the Event being extracted
Returns: (np.array[Event]) the event that was extracted
"""
reader = self.get_df_reader("footprint.parquet", filters=[("event_id", "==", event_id)])
numpy_data = self.prepare_df_data(data_frame=reader.as_pandas())
return numpy_data
[docs]
class FootprintParquetDynamic(Footprint):
"""
This class is responsible for loading event data from parquet dynamic event sets and maps
It will build the footprint from the underlying event defintion and hazard case files
Attributes (when in context):
num_intensity_bins (int): number of intensity bins in the data
has_intensity_uncertainty (bool): if the data has uncertainty. Only "no" is supported
return periods (list): the list of return periods in the model. Not currently used
"""
[docs]
footprint_filenames: List[str] = [event_defintion_filename, hazard_case_filename, parquetfootprint_meta_filename]
def __enter__(self):
with self.storage.open(parquetfootprint_meta_filename, 'r') as outfile:
meta_data: Dict[str, Union[int, bool]] = json.load(outfile)
self.num_intensity_bins = int(meta_data['num_intensity_bins'])
self.has_intensity_uncertainty = int(meta_data['has_intensity_uncertainty'] & intensityMask)
self.df_location_sections = pd.read_csv('input/sections.csv')
self.location_sections = set(list(self.df_location_sections['section_id']))
self.location_apids = pd.read_csv('input/items.csv', usecols=['areaperil_id']).areaperil_id.unique()
self.location_apids = pd.read_csv('input/keys.csv', usecols=['AreaPerilID']).AreaPerilID.unique()
return self
[docs]
def get_event(self, event_id: int):
"""
Gets the event data from the partitioned parquet data file.
Args:
event_id: (int) the ID belonging to the Event being extracted
Returns: (np.array[Event]) the event that was extracted
"""
event_defintion_reader = self.get_df_reader(event_defintion_filename, filters=[("event_id", "==", event_id)])
df_event_defintion = event_defintion_reader.as_pandas()
event_sections = list(df_event_defintion['section_id'])
sections = list(set(event_sections) & self.location_sections)
if len(sections) > 0:
hazard_case_reader = self.get_df_reader(hazard_case_filename, filters=[("section_id", "in", sections)])
df_hazard_case = hazard_case_reader.as_pandas()
df_hazard_case = df_hazard_case[df_hazard_case['areaperil_id'].isin(self.location_apids)]
from_cols = ['areaperil_id', 'intensity']
to_cols = from_cols + ['interpolation', 'return_period']
df_hazard_case_from = df_hazard_case.merge(
df_event_defintion, left_on=['section_id', 'return_period'], right_on=['section_id', 'rp_from'])[from_cols].rename(
columns={'intensity': 'from_intensity'})
df_hazard_case_to = df_hazard_case.merge(
df_event_defintion, left_on=['section_id', 'return_period'], right_on=['section_id', 'rp_to'])[to_cols].rename(
columns={'intensity': 'to_intensity'})
df_footprint = df_hazard_case_from.merge(df_hazard_case_to, on='areaperil_id', how='outer')
df_footprint['from_intensity'] = df_footprint['from_intensity'].fillna(0)
if len(df_footprint.index) > 0:
df_footprint['intensity_bin_id'] = np.floor(df_footprint.from_intensity + (
(df_footprint.to_intensity - df_footprint.from_intensity) * df_footprint.interpolation))
df_footprint['intensity_bin_id'] = df_footprint['intensity_bin_id'].astype('int')
df_footprint['probability'] = 1
else:
df_footprint.loc[:, 'intensity_bin_id'] = []
df_footprint.loc[:, 'probability'] = []
numpy_data = self.prepare_df_data(data_frame=df_footprint[['areaperil_id', 'intensity_bin_id', 'probability', 'return_period']])
return numpy_data
@nb.njit(cache=True)
[docs]
def stitch_data(areaperil_id, intensity_bin_id, probability, buffer):
"""
Creates a list of tuples from three np.arrays (all inputs must be the same length).
Args:
areaperil_id: (np.array) list of areaperil IDs
intensity_bin_id: (np.array) list of probability bin IDs
probability: (np.array) list of probabilities
buffer: (np.array[Event]) list of zeros to be populated with the previous lists
Returns:
"""
for x in range(0, len(buffer)):
buffer[x]['areaperil_id'] = areaperil_id[x]
buffer[x]['intensity_bin_id'] = intensity_bin_id[x]
buffer[x]['probability'] = probability[x]
return buffer