Source code for oasislmf.preparation.gul_inputs

__all__ = [
    'get_gul_input_items',
    'write_amplifications_file',
    'write_coverages_file',
    'write_gul_input_files',
    'write_items_file',
    'write_complex_items_file',
    'write_sections_file'
]
import copy
import os
import sys
import warnings
from collections import OrderedDict

import pandas as pd
import numpy as np

from oasislmf.pytools.data_layer.oasis_files.correlations import \
    CorrelationsData
from oasislmf.utils.coverages import SUPPORTED_COVERAGE_TYPES
from oasislmf.utils.data import (factorize_ndarray, merge_dataframes,
                                 set_dataframe_column_dtypes)
from oasislmf.utils.defaults import (CORRELATION_GROUP_ID,
                                     DAMAGE_GROUP_ID_COLS,
                                     HAZARD_GROUP_ID_COLS,
                                     OASIS_FILES_PREFIXES, SOURCE_IDX,
                                     get_default_exposure_profile)
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.fm import SUPPORTED_FM_LEVELS
from oasislmf.utils.log import oasis_log
from oasislmf.utils.path import as_path
from oasislmf.utils.profiles import (
    get_fm_terms_oed_columns, get_grouped_fm_profile_by_level_and_term_group,
    get_grouped_fm_terms_by_level_and_term_group, get_oed_hierarchy)

pd.options.mode.chained_assignment = None
warnings.simplefilter(action='ignore', category=FutureWarning)

VALID_OASIS_GROUP_COLS = [
    'item_id',
    'peril_id',
    'coverage_id',
    'coverage_type_id',
    'peril_correlation_group',
    'building_id',
    'risk_id'
]

PERIL_CORRELATION_GROUP_COL = 'peril_correlation_group'


def process_group_id_cols(group_id_cols, exposure_df_columns, has_correlation_groups):
    """
    cleans out columns that are not valid oasis group columns.

    Valid group id columns can be either
    1. exist in the location file
    2. be listed as a useful internal col

    Args:
        group_id_cols: (List[str]) the ID columns that are going to be filtered
        exposure_df_columns: (List[str]) the columns in the exposure dataframe
        has_correlation_groups: (bool) if set to True means that we are hashing with correlations in mind therefore the
                             "peril_correlation_group" column is added

    Returns: (List[str]) the filtered columns
    """
    for col in group_id_cols:
        if col not in list(exposure_df_columns) + VALID_OASIS_GROUP_COLS:
            warnings.warn('Column {} not found in loc file, or a valid internal oasis column'.format(col))
            group_id_cols.remove(col)

    if PERIL_CORRELATION_GROUP_COL not in group_id_cols and has_correlation_groups is True:
        group_id_cols.append(PERIL_CORRELATION_GROUP_COL)

    return group_id_cols


@oasis_log
[docs] def get_gul_input_items( location_df, keys_df, correlations=False, peril_correlation_group_df=None, exposure_profile=get_default_exposure_profile(), damage_group_id_cols=None, hazard_group_id_cols=None, do_disaggregation=True ): """ Generates and returns a Pandas dataframe of GUL input items. :param exposure_df: Exposure dataframe :type exposure_df: pandas.DataFrame :param keys_df: Keys dataframe :type keys_df: pandas.DataFrame :param output_dir: the output directory where input files are stored :type output_dir: str :param exposure_profile: Exposure profile :type exposure_profile: dict :param damage_group_id_cols: Columns to be used to generate a hashed damage group id. :type damage_group_id_cols: list[str] :param hazard_group_id_cols: Columns to be used to generate a hashed hazard group id. :type hazard_group_id_cols: list[str] :param do_disaggregation: If True, disaggregates by the number of buildings :type do_disaggregation: bool :return: GUL inputs dataframe :rtype: pandas.DataFrame """ # Get the grouped exposure profile - this describes the financial terms to # to be found in the source exposure file, which are for the following # FM levels: site coverage (# 1), site pd (# 2), site all (# 3). It also # describes the OED hierarchy terms present in the exposure file, namely # portfolio num., acc. num., loc. num., and cond. num. profile = get_grouped_fm_profile_by_level_and_term_group(exposure_profile=exposure_profile) if not profile: raise OasisException( 'Source exposure profile is possibly missing FM term information: ' 'FM term definitions for TIV, limit, deductible, attachment and/or share.' ) # Get the OED hierarchy terms profile - this defines the column names for loc. # ID, acc. ID, policy no. and portfolio no., as used in the source exposure # and accounts files. This is to ensure that the method never makes hard # coded references to the corresponding columns in the source files, as # that would mean that changes to these column names in the source files # may break the method oed_hierarchy = get_oed_hierarchy(exposure_profile=exposure_profile) loc_num = oed_hierarchy['locnum']['ProfileElementName'] acc_num = oed_hierarchy['accnum']['ProfileElementName'] portfolio_num = oed_hierarchy['portnum']['ProfileElementName'] # The (site) coverage FM level ID (# 1 in the OED FM levels hierarchy) cov_level_id = SUPPORTED_FM_LEVELS['site coverage']['id'] # Get the TIV column names and corresponding coverage types tiv_terms = OrderedDict({v['tiv']['CoverageTypeID']: v['tiv']['ProfileElementName'] for k, v in profile[cov_level_id].items()}) tiv_cols = list(tiv_terms.values()) # Get the list of coverage type IDs - financial terms for the coverage # level are grouped by coverage type ID in the grouped version of the # exposure profile (profile of the financial terms sourced from the # source exposure file) cov_types = [v['id'] for v in SUPPORTED_COVERAGE_TYPES.values()] # Get the FM terms profile (this is a simplfied view of the main grouped # profile, containing only information about the financial terms), and # the list of OED colum names for the financial terms for the site coverage # (# 1 ) FM level fm_terms = get_grouped_fm_terms_by_level_and_term_group(grouped_profile_by_level_and_term_group=profile, lowercase=False) terms_floats = ['deductible', 'deductible_min', 'deductible_max', 'limit'] terms_ints = ['ded_code', 'ded_type', 'lim_code', 'lim_type'] terms = terms_floats + terms_ints term_cols_floats = get_fm_terms_oed_columns( fm_terms, levels=['site coverage'], term_group_ids=cov_types, terms=terms_floats ) term_cols_ints = get_fm_terms_oed_columns( fm_terms, levels=['site coverage'], term_group_ids=cov_types, terms=terms_ints ) term_cols = term_cols_floats + term_cols_ints # Create the basic GUL inputs dataframe from merging the exposure and # keys dataframes on loc. number/loc. ID; filter out any rows with # zeros for TIVs for all coverage types, and replace any nulls in the # cond.num. and TIV columns with zeros # add default values if missing if 'IsAggregate' not in location_df.columns: location_df['IsAggregate'] = 0 else: location_df['IsAggregate'].fillna(0, inplace=True) # Make sure NumberOfBuildings is there and filled (not mandatory), otherwise assume NumberOfBuildings = 1 if 'NumberOfBuildings' not in location_df.columns: location_df['NumberOfBuildings'] = 1 else: location_df['NumberOfBuildings'] = location_df['NumberOfBuildings'].fillna(1) # Select only the columns required. This reduces memory use significantly for portfolios # that include many OED columns. exposure_df_gul_inputs_cols = ['loc_id', portfolio_num, acc_num, loc_num, 'NumberOfBuildings', 'IsAggregate', 'LocPeril'] + term_cols + tiv_cols if SOURCE_IDX['loc'] in location_df: exposure_df_gul_inputs_cols += [SOURCE_IDX['loc']] # it is assumed that correlations are False for now, correlations for group ID hashing are assessed later on in # the process to re-hash the group ID with the correlation "peril_correlation_group" column name. This is because # the correlations is achieved later in the process leading to a chicken and egg problem # group_id_cols = process_group_id_cols(group_id_cols=group_id_cols, # exposure_df_columns=list(exposure_df.columns), # has_correlation_groups=False) # set damage_group_id_cols if not damage_group_id_cols: # damage_group_id_cols is None or an empty list damage_group_id_cols = DAMAGE_GROUP_ID_COLS else: # remove any duplicate column names used to assign group_id damage_group_id_cols = list(set(damage_group_id_cols)) # only add damage group col if not an internal oasis col or if not present already in exposure_df_gul_inputs_cols for col in damage_group_id_cols: if col in VALID_OASIS_GROUP_COLS: pass elif col not in exposure_df_gul_inputs_cols: exposure_df_gul_inputs_cols.append(col) # set hazard_group_id_cols if not hazard_group_id_cols: # hazard_group_id_cols is None or an empty list hazard_group_id_cols = HAZARD_GROUP_ID_COLS else: # remove any duplicate column names used to assign group_id hazard_group_id_cols = list(set(hazard_group_id_cols)) # only add hazard group col if not an internal oasis col or if not present already in exposure_df_gul_inputs_cols for col in hazard_group_id_cols: if col in VALID_OASIS_GROUP_COLS: pass elif col not in exposure_df_gul_inputs_cols: exposure_df_gul_inputs_cols.append(col) # Check if correlation group field is used to drive damage group id # and test that it's present and poulated with integers correlation_group_id = CORRELATION_GROUP_ID correlation_field = correlation_group_id[0] correlation_check = False if damage_group_id_cols == correlation_group_id: if correlation_field in location_df.columns: if location_df[correlation_field].astype('uint32').isnull().sum() == 0: correlation_check = True query_nonzero_tiv = " | ".join(f"({tiv_col} != 0)" for tiv_col in tiv_cols) for tiv_col in tiv_cols: if tiv_col not in location_df.columns: location_df[tiv_col] = 0 location_df.loc[:, tiv_cols] = location_df.loc[:, tiv_cols].fillna(0.0) location_df.query(query_nonzero_tiv, inplace=True, engine='numexpr') gul_inputs_df = location_df[list(set(exposure_df_gul_inputs_cols).intersection(location_df.columns))] gul_inputs_df.drop_duplicates('loc_id', inplace=True, ignore_index=True) # Rename the main keys dataframe columns - this is due to the fact that the # keys file headers use camel case, and don't use underscored names, which # is the convention used for the GUL and IL inputs dataframes in the MDK keys_df.rename( columns={ 'locid': 'loc_id' if 'loc_id' not in keys_df else 'locid', 'perilid': 'peril_id', 'coveragetypeid': 'coverage_type_id', 'areaperilid': 'areaperil_id', 'vulnerabilityid': 'vulnerability_id', 'amplificationid': 'amplification_id', 'modeldata': 'model_data', 'intensityadjustment': 'intensity_adjustment', 'returnperiod': 'return_period' }, inplace=True, copy=False # Pandas copies column data by default on rename ) # If the keys file relates to a complex/custom model then look for a # ``modeldata`` column in the keys file, and ignore the area peril # and vulnerability ID columns, unless it's the dynamic model generator which # uses them if 'model_data' in keys_df and 'areaperil_id' not in keys_df and 'vulnerbaility_id' not in keys_df: keys_df['areaperil_id'] = keys_df['vulnerability_id'] = -1 gul_inputs_df = merge_dataframes( keys_df, gul_inputs_df, join_on='loc_id', how='inner', ) if gul_inputs_df.empty: raise OasisException( 'Inner merge of the exposure file dataframe ' 'and the keys file dataframe on loc. number/loc. ID ' 'is empty - ' 'please check that the loc. number and loc. ID columns ' 'in the exposure and keys files respectively have a non-empty ' 'intersection' ) # Free memory after merge, before memory-intensive restructuring of data del keys_df # make query to retain only rows with positive TIV for each coverage type, e.g.: (coverage_type_id == 1 and BuildingsTIV > 0.0) or (...) positive_TIV_query = " or ".join( map(lambda cov_type: f"(coverage_type_id == {cov_type} and {tiv_terms[cov_type]} > 0.0)", gul_inputs_df.coverage_type_id.unique())) gul_inputs_df[tiv_terms.values()].fillna(0, inplace=True) # convert null T&C values to 0 gul_inputs_df.query(positive_TIV_query, inplace=True) # remove rows with TIV=null or TIV=0 if gul_inputs_df.empty: raise OasisException('Empty gul_inputs_df dataframe after dropping rows with zero tiv: please check the exposure input files') # prepare column mappings for all coverage types cols_by_cov_type = {} for cov_type in gul_inputs_df.coverage_type_id.unique(): tiv_col = tiv_terms[cov_type] other_cov_types = [v['id'] for v in SUPPORTED_COVERAGE_TYPES.values() if v['id'] != cov_type] other_cov_type_term_cols = get_fm_terms_oed_columns(fm_terms=fm_terms, levels=['site coverage'], term_group_ids=other_cov_types, terms=terms) is_bi_coverage = cov_type == SUPPORTED_COVERAGE_TYPES['bi']['id'] # store for cov_type cov_type_terms = [t for t in terms if fm_terms[cov_level_id][cov_type].get(t)] cov_type_term_cols = get_fm_terms_oed_columns(fm_terms, levels=['site coverage'], term_group_ids=[cov_type], terms=cov_type_terms) column_mapping_dict = { generic_col: cov_col for generic_col, cov_col in zip(cov_type_term_cols, cov_type_terms) if generic_col in gul_inputs_df.columns } cols_by_cov_type[cov_type] = { 'to_drop': other_cov_types + other_cov_type_term_cols, 'is_bi_coverage': is_bi_coverage, 'column_mapping_dict': column_mapping_dict, 'tiv_col': tiv_col } # coverage unpacking and disaggregation loop: # - one row representing N coverages is being transformed to N rows, one per coverage. # - if NumberOfBuildings > 1, on top of unpacking the coverage, it performs the disaggregation of the items # by repeating the rows `NumberOfBuildings` times and assigning to each row a unique `disagg_id`` number, # useful for generating `item_id` later. # - group the rows in the GUL inputs table by coverage type # - set the IL terms (and BI coverage boolean) in each group and update the corresponding frame section in the GUL inputs table gul_inputs_reformatted_chunks = [] terms_found = set() if do_disaggregation: # split TIV gul_inputs_df[tiv_cols] = gul_inputs_df[tiv_cols].div(np.maximum(1, gul_inputs_df['NumberOfBuildings']), axis=0) for (number_of_buildings, cov_type), cov_type_group in gul_inputs_df.groupby(by=['NumberOfBuildings', 'coverage_type_id'], sort=True): # drop columns corresponding to other cov types cov_type_group.drop( columns=cols_by_cov_type[cov_type]['to_drop'], errors="ignore", # Ignore if any of these cols don't exist inplace=True ) # check if coverage type is "bi" cov_type_group['is_bi_coverage'] = cols_by_cov_type[cov_type]['is_bi_coverage'] cov_type_group.rename(columns=cols_by_cov_type[cov_type]['column_mapping_dict'], inplace=True, copy=False) cov_type_group['tiv'] = cov_type_group[cols_by_cov_type[cov_type]['tiv_col']] cov_type_group['coverage_type_id'] = cov_type terms_found.update(cols_by_cov_type[cov_type]['column_mapping_dict'].values()) if do_disaggregation: # if NumberOfBuildings == 0: still add one entry disagg_df_chunk = (cov_type_group.reset_index() .join(pd.DataFrame({'building_id': range(1, max(number_of_buildings, 1) + 1)}), how='cross') .set_index('index')) else: disagg_df_chunk = cov_type_group.copy().assign(building_id=1) gul_inputs_reformatted_chunks.append(disagg_df_chunk) # concatenate all the unpacked chunks. Sort by index to preserve `item_id` order as in the original code gul_inputs_df = ( pd.concat(gul_inputs_reformatted_chunks) .fillna(value={c: 0 for c in terms_found}) .sort_index() .reset_index(drop=True) .fillna(value={c: 0 for c in set(gul_inputs_df.columns).intersection(set(term_cols_ints + terms_ints))}) ) # set default values and data types for BI coverage boolean, TIV, deductibles and limit dtypes = { **{t: 'uint8' for t in term_cols_ints + terms_ints}, **{'is_bi_coverage': 'bool'} } gul_inputs_df = set_dataframe_column_dtypes(gul_inputs_df, dtypes) # add risk_id to gul_inputs_df gul_inputs_df[['risk_id', 'NumberOfRisks']] = gul_inputs_df[['building_id', 'NumberOfBuildings']] gul_inputs_df.loc[gul_inputs_df['IsAggregate'] == 0, ['risk_id', 'NumberOfRisks']] = 1, 1 gul_inputs_df.loc[gul_inputs_df['NumberOfRisks'] == 0, 'NumberOfRisks'] = 1 # set 'disagg_id', `item_id` and `coverage_id` gul_inputs_df['item_id'] = factorize_ndarray( gul_inputs_df.loc[:, ['loc_id', 'peril_id', 'coverage_type_id', 'building_id']].values, col_idxs=range(4))[0] gul_inputs_df['coverage_id'] = factorize_ndarray(gul_inputs_df.loc[:, ['loc_id', 'building_id', 'coverage_type_id']].values, col_idxs=range(3))[0] # set default data types gul_inputs_df = set_dataframe_column_dtypes(gul_inputs_df, {'item_id': 'int32', 'coverage_id': 'int32'}) # Set the group ID # If the group id is set according to the correlation group field then map this field # directly, otherwise create an index of the group id fields # keep group_id consistance by lower casing column names and sorting damage_group_id_cols_map = {c: c.lower() for c in sorted(damage_group_id_cols)} # mapping from PascalCase -> 'lower_case' hazard_group_id_cols_map = {c: c.lower() for c in sorted(hazard_group_id_cols)} # mapping from PascalCase -> 'lower_case' if correlation_check is True: gul_inputs_df['group_id'] = gul_inputs_df[correlation_group_id] if correlations: # do merge with peril correlation df gul_inputs_df = gul_inputs_df.merge(peril_correlation_group_df, left_on='peril_id', right_on='id').reset_index() else: gul_inputs_df[["peril_correlation_group", "damage_correlation_value", "hazard_correlation_value"]] = 0 gul_inputs_df["group_id"] = ( pd.util.hash_pandas_object( gul_inputs_df.rename(columns=damage_group_id_cols_map)[sorted(list(damage_group_id_cols_map.values()))], index=False).to_numpy() >> 33 ).astype('uint32') gul_inputs_df["hazard_group_id"] = ( pd.util.hash_pandas_object( gul_inputs_df.rename(columns=hazard_group_id_cols_map)[sorted(list(hazard_group_id_cols_map.values()))], index=False).to_numpy() >> 33 ).astype('uint32') # Select only required columns # Order here matches test output expectations keyscols = ['peril_id', 'coverage_type_id', 'tiv', 'areaperil_id', 'vulnerability_id'] additionalcols = ['amplification_id', 'section_id', 'intensity_adjustment', 'return_period'] for col in additionalcols: if col in gul_inputs_df.columns: keyscols += [col] usecols = ( ['loc_id', portfolio_num, acc_num, loc_num] + ([SOURCE_IDX['loc']] if SOURCE_IDX['loc'] in gul_inputs_df else []) + keyscols + terms + (['model_data'] if 'model_data' in gul_inputs_df else []) + # disagg_id is needed for fm_summary_map ['is_bi_coverage', 'group_id', 'coverage_id', 'item_id', 'status', 'building_id', 'NumberOfBuildings', 'IsAggregate', 'LocPeril'] + tiv_cols + ["peril_correlation_group", "damage_correlation_value", 'hazard_group_id', "hazard_correlation_value"] ) usecols = [col for col in usecols if col in gul_inputs_df] gul_inputs_df = ( gul_inputs_df [usecols] .drop_duplicates(subset='item_id') .sort_values("item_id") .reset_index() ) return gul_inputs_df
@oasis_log
[docs] def write_complex_items_file(gul_inputs_df, complex_items_fp, chunksize=100000): """ Writes a complex model items file. :param gul_inputs_df: GUL inputs dataframe :type gul_inputs_df: pandas.DataFrame :param complex_items_fp: Complex/custom model items file path :type complex_items_fp: str :return: Complex/custom model items file path :rtype: str """ try: gul_inputs_df.loc[:, ['item_id', 'coverage_id', 'model_data', 'group_id']].drop_duplicates().to_csv( path_or_buf=complex_items_fp, encoding='utf-8', mode=('w' if os.path.exists(complex_items_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_complex_items_file'", e)
@oasis_log
[docs] def write_sections_file(gul_inputs_df, sections_fp, chunksize=100000): """ Writes a section id file based on the input location area perils. :param gul_inputs_df: GUL inputs dataframe :type gul_inputs_df: pandas.DataFrame :param dynamic_events_fp: events file path to output :type sections_fp: str """ try: gul_inputs_df.loc[:, ['section_id']].drop_duplicates().to_csv( path_or_buf=sections_fp, encoding='utf-8', mode=('w' if os.path.exists(sections_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_sections_file'", e)
@oasis_log def write_item_adjustments_file(gul_inputs_df, item_adjustments_fp, chunksize=100000): """ Writes a item_adjustments id file based on the gul inputs. :param gul_inputs_df: GUL inputs dataframe :type gul_inputs_df: pandas.DataFrame :param item_adjustments_fp: item_adjustments file path to output :type sections_fp: str """ try: gul_inputs_df.loc[:, ['item_id', 'intensity_adjustment', 'return_period']].drop_duplicates().to_csv( path_or_buf=item_adjustments_fp, encoding='utf-8', mode=('w' if os.path.exists(item_adjustments_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_item_adjustments_file'", e) @oasis_log
[docs] def write_amplifications_file(gul_inputs_df, amplifications_fp, chunksize=100000): """ Writes an amplifications file. This is the mapping between item IDs and amplifications IDs. :param gul_inputs_df: GUL inputs dataframe :type gul_inputs_df: pandas.DataFrame :param amplifications_fp: amplifications file path :type amplifications_fp: str :return: amplifications file path :rtype: str """ try: gul_inputs_df.loc[:, ['item_id', 'amplification_id']].drop_duplicates().to_csv( path_or_buf=amplifications_fp, encoding='utf-8', mode=('w' if os.path.exists(amplifications_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raise in 'write_amplifications_file'", e) return amplifications_fp
@oasis_log
[docs] def write_items_file(gul_inputs_df, items_fp, chunksize=100000): """ Writes an items file. :param gul_inputs_df: GUL inputs dataframe :type gul_inputs_df: pandas.DataFrame :param items_fp: Items file path :type items_fp: str :return: Items file path :rtype: str """ try: gul_inputs_df.loc[:, ['item_id', 'coverage_id', 'areaperil_id', 'vulnerability_id', 'group_id']].drop_duplicates().to_csv( path_or_buf=items_fp, encoding='utf-8', mode=('w' if os.path.exists(items_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_items_file'", e) return items_fp
@oasis_log
[docs] def write_coverages_file(gul_inputs_df, coverages_fp, chunksize=100000): """ Writes a coverages file. :param gul_inputs_df: GUL inputs dataframe :type gul_inputs_df: pandas.DataFrame :param coverages_fp: Coverages file path :type coverages_fp: str :return: Coverages file path :rtype: str """ try: gul_inputs_df.loc[:, ['coverage_id', 'tiv']].drop_duplicates().to_csv( path_or_buf=coverages_fp, encoding='utf-8', mode=('w' if os.path.exists(coverages_fp) else 'a'), chunksize=chunksize, index=False ) except (IOError, OSError) as e: raise OasisException("Exception raised in 'write_coverages_file'", e) return coverages_fp
@oasis_log
[docs] def write_gul_input_files( gul_inputs_df, target_dir, correlations_df, output_dir, oasis_files_prefixes=OASIS_FILES_PREFIXES['gul'], chunksize=(2 * 10 ** 5), ): """ Writes the standard Oasis GUL input files to a target directory, using a pre-generated dataframe of GUL input items. The files written are :: items.csv coverages.csv and optionally a complex items file in case of a complex/custom model. :param gul_inputs_df: GUL inputs dataframe :type gul_inputs_df: pandas.DataFrame :param target_dir: Target directory in which to write the files :type target_dir: str :param oasis_files_prefixes: Oasis GUL input file name prefixes :param oasis_files_prefixes: dict :param chunksize: The chunk size to use when writing out the input files :type chunksize: int :return: GUL input files dict :rtype: dict """ # Clean the target directory path target_dir = as_path(target_dir, 'Target IL input files directory', is_dir=True, preexists=False) oasis_files_prefixes = copy.deepcopy(oasis_files_prefixes) if correlations_df is None: correlations_df = pd.DataFrame(columns=CorrelationsData.COLUMNS) # write the correlations to a binary file correlation_data_handle = CorrelationsData(data=correlations_df) correlation_data_handle.to_bin(file_path=f"{output_dir}/correlations.bin") correlation_data_handle.to_csv(file_path=f"{output_dir}/correlations.csv") # Set chunk size for writing the CSV files - default is the minimum of 100K # or the GUL inputs frame size chunksize = chunksize or min(chunksize, len(gul_inputs_df)) # If no complex model data present then remove the corresponding file # name from the files prefixes dict, which is used for writing the # GUl input files if 'model_data' not in gul_inputs_df: oasis_files_prefixes.pop('complex_items', None) # If no amplification IDs then remove corresponding file name from files # prefixes dict if 'amplification_id' not in gul_inputs_df: oasis_files_prefixes.pop('amplifications', None) # If no section IDs then remove corresponding file name from files # prefixes dict if 'section_id' not in gul_inputs_df: oasis_files_prefixes.pop('sections', None) # If no adjustments data then remove corresponding file name from files # prefixes dict if 'intensity_adjustment' not in gul_inputs_df: oasis_files_prefixes.pop('item_adjustments', None) # A dict of GUL input file names and file paths gul_input_files = { fn: os.path.join(target_dir, '{}.csv'.format(oasis_files_prefixes[fn])) for fn in oasis_files_prefixes } this_module = sys.modules[__name__] # Write the files serially for fn in gul_input_files: getattr(this_module, 'write_{}_file'.format(fn))(gul_inputs_df.copy(deep=True), gul_input_files[fn], chunksize) return gul_input_files