"""
Module for the built-in Lookup Class
in the future we may want to improve on the management of files used to generate the keys
tutorial for pandas and parquet https://towardsdatascience.com/a-gentle-introduction-to-apache-arrow-with-apache-spark-and-pandas-bb19ffe0ddae
"""
import warnings
import numba as nb
import numpy as np
import pandas as pd
from ods_tools.oed import fill_empty, is_empty
try: # needed for rtree
from shapely.geometry import Point
# Hide numerous warnings similar to:
# > ...lib64/python3.8/site-packages/geopandas/_compat.py:112: UserWarning: The Shapely GEOS
# > version (3.8.0-CAPI-1.13.1 ) is incompatible with the GEOS version PyGEOS was compiled with
# > (3.10.3-CAPI-1.16.1). Conversions between both will be slow.
# We're not in a position to fix these without compiling shapely and pygeos from source.
# We're also not aware of any performance issues caused by this.
# Upgrading to Shapely 2 will likely address this issue.
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning, module="geopandas._compat",
message="The Shapely GEOS version")
import geopandas as gpd
try: # needed only for min distance
from sklearn.neighbors import BallTree
except ImportError:
except ImportError:
Point = gdp = None
import math
import re
from oasislmf.lookup.base import AbstractBasicKeyLookup, MultiprocLookupMixin
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.peril import get_peril_groups_df
from oasislmf.utils.status import OASIS_KEYS_STATUS, OASIS_UNKNOWN_ID
[docs]
OPT_INSTALL_MESSAGE = "install oasislmf with extra packages by running 'pip install oasislmf[extra]'"
[docs]
def get_nearest(src_points, candidates, k_neighbors=1):
"""Find nearest neighbors for all source points from a set of candidate points"""
# Create tree from the candidate points
tree = BallTree(candidates, leaf_size=15, metric='haversine')
# Find closest points and distances
distances, indices = tree.query(src_points, k=k_neighbors)
# Transpose to get distances and indices into arrays
distances = distances.transpose()
indices = indices.transpose()
# Get closest indices and distances (i.e. array at index 0)
# note: for the second closest points, you would take index 1, etc.
closest = indices[0]
closest_dist = distances[0]
# Return indices and distances
return (closest, closest_dist)
[docs]
def nearest_neighbor(left_gdf, right_gdf, return_dist=False):
"""
For each point in left_gdf, find closest point in right GeoDataFrame and return them.
NOTICE: Assumes that the input Points are in WGS84 projection (lat/lon).
"""
left_geom_col = left_gdf.geometry.name
right_geom_col = right_gdf.geometry.name
# Ensure that index in right gdf is formed of sequential numbers
right = right_gdf.copy().reset_index(drop=True)
# Parse coordinates from points and insert them into a numpy array as RADIANS
left_radians = np.array(left_gdf[left_geom_col].apply(lambda geom: (geom.x * np.pi / 180, geom.y * np.pi / 180)).to_list())
right_radians = np.array(right[right_geom_col].apply(lambda geom: (geom.x * np.pi / 180, geom.y * np.pi / 180)).to_list())
# Find the nearest points
# -----------------------
# closest ==> index in right_gdf that corresponds to the closest point
# dist ==> distance between the nearest neighbors (in meters)
closest, dist = get_nearest(src_points=left_radians, candidates=right_radians)
# Return points from right GeoDataFrame that are closest to points in left GeoDataFrame
closest_points = right.loc[closest]
# Ensure that the index corresponds the one in left_gdf
closest_points = closest_points.set_index(left_gdf.index)
# Add distance if requested
if return_dist:
# Convert to meters from radians
earth_radius = 6371000 # meters
closest_points['distance'] = dist * earth_radius
return closest_points
[docs]
key_columns = ['loc_id', 'peril_id', 'coverage_type', 'area_peril_id', 'vulnerability_id', 'status', 'message']
[docs]
class PerilCoveredDeterministicLookup(AbstractBasicKeyLookup):
[docs]
multiproc_enabled = False
[docs]
def process_locations(self, locations):
peril_groups_df = get_peril_groups_df()
model_perils_covered = np.unique(pd.DataFrame({'peril_group_id': self.config['model_perils_covered']})
.merge(peril_groups_df)['peril_id'])
split_df = locations['LocPerilsCovered'].str.split(';').apply(pd.Series).stack()
split_df.index = split_df.index.droplevel(-1)
split_df.name = 'peril_group_id'
keys_df = locations.join(split_df).merge(peril_groups_df)[['loc_id', 'peril_id']]
coverage_df = pd.DataFrame({'coverage_type': self.config['supported_oed_coverage_types']}, dtype='Int32')
keys_df = keys_df.sort_values('loc_id').merge(coverage_df, how="cross")
success_df = keys_df['peril_id'].isin(model_perils_covered)
success_df_len = keys_df[success_df].shape[0]
keys_df.loc[success_df, 'area_peril_id'] = np.arange(1, success_df_len + 1)
keys_df.loc[success_df, 'vulnerability_id'] = np.arange(1, success_df_len + 1)
keys_df.loc[success_df, 'status'] = OASIS_KEYS_STATUS['success']['id']
keys_df.loc[~keys_df['peril_id'].isin(model_perils_covered), ['status', 'message']
] = OASIS_KEYS_STATUS['noreturn']['id'], 'unsuported peril_id'
keys_df[['area_peril_id', 'vulnerability_id']] = keys_df[['area_peril_id', 'vulnerability_id']].astype('Int32')
return keys_df
[docs]
class Lookup(AbstractBasicKeyLookup, MultiprocLookupMixin):
"""
Built-in Lookup class that implement the OasisLookupInterface
The aim of this class is to provide a data driven lookup capability that will be both flexible and efficient.
it provide several generic function factory that can be define in the config under the "step_definition" key (ex:)
"step_definition": {
"split_loc_perils_covered":{
"type": "split_loc_perils_covered" ,
"columns": ["locperilscovered"],
"parameters": {
"model_perils_covered": ["WTC", "WSS"]
}
},
"vulnerability": {
"type": "merge",
"columns": ["peril_id", "coverage_type", "occupancycode"],
"parameters": {"file_path": "%%KEYS_DATA_PATH%%/vulnerability_dict.csv",
"id_columns": ["vulnerability_id"]
}
}
}
mapper key: is called the step_name,
it will be added the the lookup object method once the function has been built
it can take any value but make sure it doesn't collide with already existing method
type: define the function factory to call.
in the class for type <fct_type> the function factory called will be build_<fct_type>
ex: "type": "merge" => build_merge
columns: are the column required to be able to apply the step.
those are quite important as any column (except 'loc_id')
from the original Locations Dataframe that is not in any step will be drop to reduce memory consumption
parameters: the parameter passed the the function factory.
Once all the functions have been defined, the order in which they must be applied is defined in the config
under the "strategy" key (ex:)
"strategy": ["split_loc_perils_covered", "vulnerability"]
It is totally possible to subclass Lookup in order to create your custom step or function factory
for custom step:
add your function definition to the "mapper"with no parameters
"my_custom_step": {
"type": "custom_type" ,
"columns": [...],
}
simply add it to your "strategy": ["split_loc_perils_covered", "vulnerability", "my_custom_step"]
and code the function in your subclass
class MyLookup(Lookup):
@staticmethod
def my_custom_step(locations):
<do something on locations>
return modified_locations
for function factory:
add your function definition to the "step_definition" with the required parameters
"my_custom_step": {
"type": "custom_type" ,
"columns": [...],
"parameters": {
"param1": "value1"
}
}
add your step to "strategy": ["split_loc_perils_covered", "vulnerability", "my_custom_step"]
and code the function factory in your subclass
class MyLookup(Lookup):
def build_custom_type(self, param1):
def fct(locations):
<do something on locations that depend on param1>
return modified_locations
return fct
"""
[docs]
interface_version = "1"
[docs]
def set_step_function(self, step_name, step_config, function_being_set=None):
"""
set the step as a function of the lookup object if it's not already done and return it.
if the step is composed of several child steps, it will set the child steps recursively.
Args:
step_name (str): name of the strategy for this step
step_config (dict): config of the strategy for this step
function_being_set (set, None): set of all the strategy that are parent of this step
Returns:
function: function corresponding this step
"""
if hasattr(self, step_name):
step_function = getattr(self, step_name)
else:
if step_config['type'] == 'combine': # we need to build the child function
if function_being_set is None: # make sure we catch cyclic strategy definition
function_being_set = {step_name}
elif step_name in function_being_set:
raise OasisException(f"lookup config has a cyclic strategy definition {function_being_set} then {step_name} again")
else:
function_being_set.add(step_name)
functions = []
for child_step_name in step_config["parameters"]['strategy']:
child_fct = self.set_step_function(
step_name=child_step_name,
step_config=self.config['step_definition'][child_step_name],
function_being_set=function_being_set)
functions.append({'function': child_fct, 'columns': set(step_config.get("columns", []))})
step_config['parameters']['strategy'] = functions
step_function = getattr(self, f"build_{step_config['type']}")(**step_config['parameters'])
setattr(self, step_name, step_function)
return step_function
[docs]
def process_locations(self, locations):
# drop all unused columns and remove duplicate rows, find and rename useful columns
lower_case_column_map = {column.lower(): column for column in locations.columns}
useful_cols = set(['loc_id'] + sum((step_config.get("columns", []) for step_config in self.config['step_definition'].values()), []))
useful_cols_map = {lower_case_column_map[useful_col.lower()]: useful_col
for useful_col in useful_cols
if useful_col.lower() in lower_case_column_map}
locations = locations.rename(columns=useful_cols_map)
locations = locations[list(useful_cols.intersection(locations.columns))].drop_duplicates()
# set default status and message
locations['status'] = OASIS_KEYS_STATUS['success']['id']
locations['message'] = ''
# process each step of the strategy
for step_name in self.config["strategy"]:
step_config = self.config['step_definition'][step_name]
needed_column = set(step_config.get("columns", []))
if not needed_column.issubset(locations.columns):
raise OasisException(
f"Key Server Issue: missing columns {needed_column.difference(locations.columns)} for step {step_name}")
step_function = self.set_step_function(step_name, step_config)
locations = step_function(locations)
key_columns = [
'loc_id', 'peril_id', 'coverage_type', 'area_peril_id',
'vulnerability_id', 'status', 'message'
]
additional_columns = ['amplification_id', 'model_data', 'section_id', 'intensity_adjustment', 'return_period']
for col in additional_columns:
if col in locations.columns:
key_columns += [col]
locations = locations[key_columns]
# check all ids are of the good type
id_cols = ['coverage_type', 'area_peril_id', 'vulnerability_id']
if 'amplification_id' in locations.columns:
id_cols += ['amplification_id']
self.set_id_columns(locations, id_cols)
# check all success location have all ids set correctly
success_locations = locations.loc[locations['status'] == OASIS_KEYS_STATUS['success']['id']]
for id_col in id_cols:
unknown_ids = success_locations[id_col] == OASIS_UNKNOWN_ID
fail_locations = success_locations.loc[unknown_ids].index
locations.loc[fail_locations, ['status', 'message']] = OASIS_KEYS_STATUS['fail'][
'id'], f'{id_col} has an unknown id'
success_locations = success_locations.loc[~unknown_ids]
return locations
[docs]
def to_abs_filepath(self, filepath):
"""
replace placeholder r'%%(.+?)%%' (ex: %%KEYS_DATA_PATH%%) with the path set in self.config
Args:
filepath (str): filepath with potentially a placeholder
Returns:
str: filepath where placeholder are replace their actual value.
"""
placeholder_keys = set(re.findall(r'%%(.+?)%%', filepath))
for placeholder_key in placeholder_keys:
filepath = filepath.replace(f'%%{placeholder_key}%%', self.config[placeholder_key.lower()])
return filepath
@staticmethod
[docs]
def set_id_columns(df, id_columns):
"""
in Dataframes, only float column can have nan values. So after a left join for example if you have nan values
that will change the type of the original column into float.
this function replace the nan value with the OASIS_UNKNOWN_ID and reset the column type to int
"""
for col in id_columns:
if col not in df:
df[col] = OASIS_UNKNOWN_ID
else:
df[col] = df[col].astype('Int64')
df.loc[df[col].isna(), col] = OASIS_UNKNOWN_ID
return df
[docs]
def build_interval_to_index(self, value_column_name, sorted_array, index_column_name=None, side='left'):
"""
Allow to map a value column to an index according to it's index in the interval defined by sorted_array.
nan value are kept as nan
Args:
value_column_name: name of the column to map
sorted_array: sorted value that define the interval to map to
index_column_name: name of the output column
side: define what index is returned (left or right) in case of equality with one of the interval boundary
Returns:
function: return the mapping function
"""
if isinstance(sorted_array, list):
pass
elif isinstance(sorted_array, str):
sorted_array = [float(val) for val in open(self.to_abs_filepath(sorted_array)) if val.strip()]
else:
raise OasisException("sorted_array must be a list of the interval sorted or a path to a csv file containing those interval")
if index_column_name is None:
index_column_name = value_column_name + '_idx'
def fct(locations):
locations[index_column_name] = np.searchsorted(sorted_array, locations[value_column_name], side=side)
empty_values = is_empty(locations, value_column_name)
locations.loc[empty_values, index_column_name] = locations.loc[empty_values, value_column_name]
return locations
return fct
@staticmethod
[docs]
def build_combine(id_columns, strategy):
"""
build a function that will combine several strategy trying to achieve the same purpose by different mean into one.
for example, finding the correct area_peril_id for a location with one method using (latitude, longitude)
and one using postcode.
each strategy will be applied sequentially on the location that steal have OASIS_UNKNOWN_ID in their id_columns after the precedent strategy
Args:
id_columns (list): columns that will be checked to determine if a strategy has succeeded
strategy (list): list of strategy to apply
Returns:
function: function combining all strategies
"""
def fct(locations):
initial_columns = locations.columns
result = []
for child_strategy in strategy:
if not child_strategy['columns'].issubset(locations.columns): # needed column not present to run this strategy
continue
locations = child_strategy['function'](locations)
is_valid = (locations[id_columns] != OASIS_UNKNOWN_ID).any(axis=1)
result.append(locations[is_valid])
locations = locations[~is_valid][initial_columns]
result.append(locations)
return Lookup.set_id_columns(pd.concat(result), id_columns)
return fct
@staticmethod
[docs]
def build_split_loc_perils_covered(model_perils_covered=None):
"""
split the value of LocPerilsCovered into multiple line, taking peril group into account
drop all line that are not in the list model_perils_covered
usefull inspirational code:
https://stackoverflow.com/questions/17116814/pandas-how-do-i-split-text-in-a-column-into-multiple-rows
"""
peril_groups_df = get_peril_groups_df()
def fct(locations):
for col in locations.columns:
if col.lower() == 'locperilscovered':
loc_perils_covered_column = col
break
else:
raise OasisException('missing LocPerilsCovered column in location')
split_df = locations[loc_perils_covered_column].astype(str).str.split(';').apply(pd.Series, 1).stack()
split_df.index = split_df.index.droplevel(-1)
split_df.name = 'peril_group_id'
location = locations.join(split_df).merge(peril_groups_df)
if model_perils_covered:
df_model_perils_covered = pd.Series(model_perils_covered)
df_model_perils_covered.name = 'model_perils_covered'
location = location.merge(df_model_perils_covered, left_on='peril_id', right_on='model_perils_covered')
return location
return fct
@staticmethod
[docs]
def build_prepare(**kwargs):
"""
Prepare the dataframe by setting default, min and max values and type
support several simple DataFrame preparation:
default: create the column if missing and replace the nan value with the default value
max: truncate the values in a column to the specified max
min: truncate the values in a column to the specified min
type: convert the type of the column to the specified numpy dtype
Note that we use the string representation of numpy dtype available at
https://numpy.org/doc/stable/reference/arrays.dtypes.html#arrays-dtypes-constructing
"""
def prepare(locations):
for column_name, preparations in kwargs.items():
if "default" in preparations:
if column_name not in locations.columns:
locations[column_name] = preparations["default"]
else:
fill_empty(locations, column_name, preparations["default"])
if 'max' in preparations:
locations.loc[locations[column_name] > preparations['max'], column_name] = preparations['max']
if 'min' in preparations:
locations.loc[locations[column_name] > preparations['min'], column_name] = preparations['min']
if 'type' in preparations:
locations[column_name] = locations[column_name].astype(preparations['type'])
return locations
return prepare
[docs]
def build_rtree(self, file_path, file_type, id_columns, area_peril_read_params=None, nearest_neighbor_min_distance=-1):
"""
Function Factory to associate location to area_peril based on the rtree method
!!!
please note that this method is quite time consuming (specialy if you use the nearest point option
if your peril_area are square you should use area_peril function fixed_size_geo_grid
!!!
file_path: is the path to the file containing the area_peril_dictionary.
this file must be a geopandas Dataframe with a valid geometry.
an example on how to create such dataframe is available in PiWind
if you are new to geo data (in python) and want to learn more, you may have a look at this excellent course:
https://automating-gis-processes.github.io/site/index.html
file_type: can be any format readable by geopandas ('file', 'parquet', ...)
see: https://geopandas.readthedocs.io/en/latest/docs/reference/io.html
you may have to install additional library such as pyarrow for parquet
id_columns: column to transform to an 'id_column' (type int32 with nan replace by -1)
nearest_neighbor_min_distance: option to compute the nearest point if intersection method fails
we use:
https://automating-gis-processes.github.io/site/notebooks/L3/nearest-neighbor-faster.html
but alternatives can be found here:
https://gis.stackexchange.com/questions/222315/geopandas-find-nearest-point-in-other-dataframe
"""
if Point is None:
raise OasisException(f"shapely and geopandas modules are needed for rtree, {OPT_INSTALL_MESSAGE}")
if hasattr(gpd, f"read_{file_type}"):
if area_peril_read_params is None:
area_peril_read_params = {}
gdf_area_peril = getattr(gpd, f"read_{file_type}")(self.to_abs_filepath(file_path), **area_peril_read_params)
else:
raise OasisException(f"Unregognised Geopandas read type {file_type}")
if nearest_neighbor_min_distance > 0:
if BallTree is None:
raise OasisException(f"scikit-learn modules are needed for rtree with nearest_neighbor_min_distance, {OPT_INSTALL_MESSAGE}")
gdf_area_peril['center'] = gdf_area_peril.centroid
base_geometry_name = gdf_area_peril.geometry.name
def get_area(locations, gdf_area_peril):
# this conversion could be done in a separate step allowing more posibilities for the geometry
null_gdf = locations["longitude"].isna() | locations["latitude"].isna()
null_gdf_loc = locations[null_gdf]
if not null_gdf_loc.empty:
gdf_loc = gpd.GeoDataFrame(locations[~null_gdf], columns=locations.columns)
else:
gdf_loc = gpd.GeoDataFrame(locations, columns=locations.columns)
gdf_loc["loc_geometry"] = gdf_loc.apply(lambda row: Point(row["longitude"], row["latitude"]),
axis=1,
result_type='reduce')
gdf_loc = gdf_loc.set_geometry('loc_geometry')
gdf_loc = gpd.sjoin(gdf_loc, gdf_area_peril, 'left')
if nearest_neighbor_min_distance > 0:
gdf_loc_na = gdf_loc.loc[gdf_loc['index_right'].isna()]
if gdf_loc_na.shape[0]:
gdf_area_peril.set_geometry('center', inplace=True)
nearest_neighbor_df = nearest_neighbor(gdf_loc_na, gdf_area_peril, return_dist=True)
gdf_area_peril.set_geometry(base_geometry_name, inplace=True)
valid_nearest_neighbor = nearest_neighbor_df['distance'] <= nearest_neighbor_min_distance
common_col = list(set(gdf_loc_na.columns) & set(nearest_neighbor_df.columns))
gdf_loc.loc[valid_nearest_neighbor.index, common_col] = nearest_neighbor_df.loc[valid_nearest_neighbor, common_col]
if not null_gdf_loc.empty:
gdf_loc = pd.concat([gdf_loc, null_gdf_loc])
self.set_id_columns(gdf_loc, id_columns)
# index column are created during the sjoin, we can drop them
gdf_loc = gdf_loc.drop(columns=['index_right', 'index_left'], errors='ignore')
return gdf_loc
def fct(locations):
if 'peril_id' in gdf_area_peril.columns:
peril_id_covered = np.unique(gdf_area_peril['peril_id'])
res = [locations[~locations['peril_id'].isin(peril_id_covered)]]
for peril_id in peril_id_covered:
res.append(get_area(locations.loc[locations['peril_id'] == peril_id],
gdf_area_peril.loc[gdf_area_peril['peril_id'] == peril_id].drop(columns=['peril_id'])))
return pd.concat(res).reset_index()
else:
return get_area(locations, gdf_area_peril)
return fct
@staticmethod
[docs]
def build_fixed_size_geo_grid(lat_min, lat_max, lon_min, lon_max, arc_size, lat_reverse=False, lon_reverse=False):
"""
associate an id to each square of the grid define by the limit of lat and lon
reverse allow to change the ordering of id from (min to max) to (max to min)
"""
lat_cell_size = arc_size
lon_cell_size = arc_size
size_lat = math.ceil((lat_max - lat_min) / arc_size)
if lat_reverse:
@nb.njit()
def lat_id(lat):
return math.floor((lat_max - lat) / lat_cell_size)
else:
@nb.njit()
def lat_id(lat):
return math.floor((lat - lat_min) / lat_cell_size)
if lon_reverse:
@nb.njit()
def lon_id(lon):
return math.floor((lon_max - lon) / lon_cell_size)
else:
@nb.njit()
def lon_id(lon):
return math.floor((lon - lon_min) / lon_cell_size)
@nb.jit
def jit_geo_grid_lookup(lat, lon):
area_peril_id = np.empty_like(lat, dtype=np.int64)
for i in range(lat.shape[0]):
if lat_min < lat[i] < lat_max and lon_min < lon[i] < lon_max:
area_peril_id[i] = int(lat_id(lat[i]) + lon_id(lon[i]) * size_lat)
else:
area_peril_id[i] = OASIS_UNKNOWN_ID
return area_peril_id
def geo_grid_lookup(locations):
locations['area_peril_id'] = jit_geo_grid_lookup(locations['latitude'].to_numpy(),
locations['longitude'].to_numpy())
return locations
return geo_grid_lookup
[docs]
def build_merge(self, file_path, id_columns=[], **kwargs):
"""
this method will merge the locations Dataframe with the Dataframe present in file_path
All non match column present in id_columns will be set to -1
this is an efficient way to map a combination of column that have a finite scope to an idea.
"""
df_to_merge = pd.read_csv(self.to_abs_filepath(file_path), **kwargs)
df_to_merge.rename(columns={column: column.lower() for column in df_to_merge.columns}, inplace=True)
def merge(locations: pd.DataFrame):
rename_map = {col.lower(): col for col in locations.columns if col.lower() in df_to_merge.columns}
locations = locations.merge(df_to_merge.rename(columns=rename_map), how='left')
self.set_id_columns(locations, id_columns)
return locations
return merge
@staticmethod
[docs]
def build_simple_pivot(pivots, remove_pivoted_col=True):
"""
allow to pivot columns of the locations dataframe into multiple rows
each pivot in the pivot list may define:
"on": to rename a column into a new one
"new_cols": to create a new column with a certain values
ex:
"pivots": [{"on": {"vuln_str": "vulnerability_id"},
"new_cols": {"coverage_type": 1}},
{"on": {"vuln_con": "vulnerability_id"},
"new_cols": {"coverage_type": 3}},
],
loc_id vuln_str vuln_con
1 3 2
2 18 4
=>
loc_id vuln_str vuln_con vulnerability_id coverage_type
1 3 2 3 1
2 18 4 18 1
1 3 2 2 3
2 18 4 4 3
"""
def simple_pivot(locations):
pivoted_dfs = []
pivoted_cols = set()
for pivot in pivots:
pivot_df = locations.copy()
for old_name, new_name in pivot.get("on", {}).items():
pivot_df[new_name]
pivoted_cols.add(old_name)
for col_name, value in pivot.get("new_cols", {}).items():
pivot_df[col_name] = value
pivoted_dfs.append(pivot_df)
locations = pd.concat(pivoted_dfs)
if remove_pivoted_col:
locations.drop(columns=pivoted_cols, inplace=True)
return locations
return simple_pivot
@staticmethod
[docs]
def build_model_data(columns):
"""
Serialises specified columns from the OED file into a model_data dict
"""
lst_model_data = []
def model_data(locations):
# could improve with apply lambda
for index, i in locations.iterrows():
tmp_dict = {}
for col in columns:
tmp_dict[col] = i[col]
lst_model_data.append(tmp_dict)
locations['model_data'] = lst_model_data
return locations
return model_data
@staticmethod
[docs]
def build_dynamic_model_adjustment(intensity_adjustment_col, return_period_col):
"""
Converts specified columns from the OED file into intensity adjustments and
return period protection.
"""
lst_intensity_adjustment = []
lst_return_period = []
def adjustments(locations):
for index, row in locations.iterrows():
intensity_adjustment = row[intensity_adjustment_col]
return_period = row[return_period_col]
lst_intensity_adjustment.append(intensity_adjustment)
lst_return_period.append(return_period)
locations['intensity_adjustment'] = lst_intensity_adjustment
locations['return_period'] = lst_return_period
return locations
return adjustments