Source code for oasislmf.computation.generate.files

__all__ = [
    'GenerateFiles',
    'GenerateDummyModelFiles',
    'GenerateDummyOasisFiles'
]

import io
import json
import os
from pathlib import Path
from typing import List
import pandas as pd

from oasislmf.computation.base import ComputationStep
from oasislmf.computation.data.dummy_model.generate import (AmplificationsFile,
                                                            CoveragesFile,
                                                            DamageBinDictFile,
                                                            EventsFile,
                                                            FMPolicyTCFile,
                                                            FMProfileFile,
                                                            FMProgrammeFile,
                                                            FMSummaryXrefFile,
                                                            FMXrefFile,
                                                            FootprintBinFile,
                                                            GULSummaryXrefFile,
                                                            ItemsFile,
                                                            LossFactorsFile,
                                                            OccurrenceFile,
                                                            RandomFile,
                                                            VulnerabilityFile)
from oasislmf.computation.generate.keys import GenerateKeys
from oasislmf.preparation.correlations import map_data
from oasislmf.preparation.dir_inputs import (create_target_directory,
                                             prepare_input_files_directory)
from oasislmf.preparation.gul_inputs import (get_gul_input_items,
                                             process_group_id_cols,
                                             write_gul_input_files)
from oasislmf.preparation.il_inputs import (get_il_input_items,
                                            get_oed_hierarchy,
                                            write_il_input_files)
from oasislmf.preparation.reinsurance_layer import write_files_for_reinsurance
from oasislmf.preparation.summaries import (get_summary_mapping,
                                            merge_oed_to_mapping,
                                            write_exposure_summary,
                                            write_mapping_file,
                                            write_summary_levels)
from oasislmf.pytools.data_layer.oasis_files.correlations import \
    CorrelationsData
from oasislmf.utils.data import (establish_correlations, get_dataframe,
                                 get_exposure_data, get_json, get_utctimestamp,
                                 prepare_account_df, prepare_location_df,
                                 prepare_reinsurance_df, validate_vulnerability_replacements)
from oasislmf.utils.defaults import (DAMAGE_GROUP_ID_COLS,
                                     HAZARD_GROUP_ID_COLS,
                                     OASIS_FILES_PREFIXES, WRITE_CHUNKSIZE,
                                     get_default_accounts_profile,
                                     get_default_exposure_profile,
                                     get_default_fm_aggregation_profile)
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.inputs import str2bool
from ods_tools.oed.setting_schema import ModelSettingSchema, AnalysisSettingSchema


[docs] class GenerateFiles(ComputationStep): """ Generates the standard Oasis GUL input files + optionally the IL/FM input files and the RI input files. """
[docs] step_params = [ # Command line options {'name': 'oasis_files_dir', 'flag': '-o', 'is_path': True, 'pre_exist': False, 'help': 'Path to the directory in which to generate the Oasis files'}, {'name': 'keys_data_csv', 'flag': '-z', 'is_path': True, 'pre_exist': True, 'help': 'Pre-generated keys CSV file path'}, {'name': 'analysis_settings_json', 'flag': '-a', 'is_path': True, 'pre_exist': True, 'required': False, 'help': 'Analysis settings JSON file path'}, {'name': 'keys_errors_csv', 'is_path': True, 'pre_exist': True, 'help': 'Pre-generated keys errors CSV file path'}, {'name': 'profile_loc_json', 'is_path': True, 'pre_exist': True, 'help': 'Source (OED) exposure profile JSON path'}, {'name': 'profile_acc_json', 'flag': '-b', 'is_path': True, 'pre_exist': True, 'help': 'Source (OED) accounts profile JSON path'}, {'name': 'profile_fm_agg_json', 'is_path': True, 'pre_exist': True, 'help': 'FM (OED) aggregation profile path'}, {'name': 'oed_schema_info', 'is_path': True, 'pre_exist': True, 'help': 'path to custom oed_schema'}, {'name': 'currency_conversion_json', 'is_path': True, 'pre_exist': True, 'help': 'settings to perform currency conversion of oed files'}, {'name': 'reporting_currency', 'help': 'currency to use in the results reported'}, {'name': 'oed_location_csv', 'flag': '-x', 'is_path': True, 'pre_exist': True, 'help': 'Source location CSV file path'}, {'name': 'oed_accounts_csv', 'flag': '-y', 'is_path': True, 'pre_exist': True, 'help': 'Source accounts CSV file path'}, {'name': 'oed_info_csv', 'flag': '-i', 'is_path': True, 'pre_exist': True, 'help': 'Reinsurance info. CSV file path'}, {'name': 'oed_scope_csv', 'flag': '-s', 'is_path': True, 'pre_exist': True, 'help': 'Reinsurance scope CSV file path'}, {'name': 'check_oed', 'type': str2bool, 'const': True, 'nargs': '?', 'default': True, 'help': 'if True check input oed files'}, {'name': 'disable_summarise_exposure', 'flag': '-S', 'default': False, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'Disables creation of an exposure summary report'}, {'name': 'damage_group_id_cols', 'flag': '-G', 'nargs': '+', 'help': 'Columns from loc file to set group_id', 'default': DAMAGE_GROUP_ID_COLS}, {'name': 'hazard_group_id_cols', 'flag': '-H', 'nargs': '+', 'help': 'Columns from loc file to set hazard_group_id', 'default': HAZARD_GROUP_ID_COLS}, {'name': 'lookup_multiprocessing', 'type': str2bool, 'const': False, 'nargs': '?', 'default': False, 'help': 'Flag to enable/disable lookup multiprocessing'}, {'name': 'do_disaggregation', 'type': str2bool, 'const': True, 'nargs': '?', 'default': True, 'help': 'if True run the oasis disaggregation.'}, # Manager only options (pass data directy instead of filepaths) {'name': 'lookup_config'}, {'name': 'lookup_complex_config'}, {'name': 'write_ri_tree', 'default': False}, {'name': 'verbose', 'default': False}, {'name': 'write_chunksize', 'type': int, 'default': WRITE_CHUNKSIZE}, {'name': 'oasis_files_prefixes', 'default': OASIS_FILES_PREFIXES}, {'name': 'profile_loc', 'default': get_default_exposure_profile()}, {'name': 'profile_acc', 'default': get_default_accounts_profile()}, {'name': 'profile_fm_agg', 'default': get_default_fm_aggregation_profile()}, {'name': 'location', 'type': str, 'nargs': '+', 'help': 'A set of locations to include in the files'}, {'name': 'portfolio', 'type': str, 'nargs': '+', 'help': 'A set of portfolios to include in the files'}, {'name': 'account', 'type': str, 'nargs': '+', 'help': 'A set of locations to include in the files'}, {'name': 'base_df_engine', 'type': str, 'default': 'oasis_data_manager.df_reader.reader.OasisPandasReader', 'help': 'The default dataframe reading engine to use when loading files'}, {'name': 'exposure_df_engine', 'type': str, 'default': None, 'help': 'The dataframe reading engine to use when loading exposure files'}, ]
[docs] chained_commands = [ GenerateKeys ]
def _get_output_dir(self): if self.oasis_files_dir: return self.oasis_files_dir utcnow = get_utctimestamp(fmt='%Y%m%d%H%M%S') return os.path.join(os.getcwd(), 'runs', 'files-{}'.format(utcnow))
[docs] def get_exposure_data_config(self): return { 'location': self.oed_location_csv, 'account': self.oed_accounts_csv, 'ri_info': self.oed_info_csv, 'ri_scope': self.oed_scope_csv, 'oed_schema_info': self.oed_schema_info, 'currency_conversion': self.currency_conversion_json, 'check_oed': self.check_oed, 'use_field': True, 'location_numbers': self.location, 'portfolio_numbers': self.portfolio, 'account_numbers': self.account, 'base_df_engine': self.base_df_engine, 'exposure_df_engine': self.exposure_df_engine, }
[docs] def run(self): self.logger.info('\nProcessing arguments - Creating Oasis Files') if not (self.keys_data_csv or self.lookup_config_json or (self.lookup_data_dir and self.model_version_csv and self.lookup_module_path)): raise OasisException( 'No pre-generated keys file provided, and no lookup assets ' 'provided to generate a keys file - if you do not have a ' 'pre-generated keys file then lookup assets must be provided - ' 'for a built-in lookup the lookup config. JSON file path must ' 'be provided, or for custom lookups the keys data path + model ' 'version file path + lookup package path must be provided' ) self.oasis_files_dir = self._get_output_dir() exposure_data = get_exposure_data(self, add_internal_col=True) self.kwargs['exposure_data'] = exposure_data il = bool(exposure_data.account) ri = exposure_data.ri_info and exposure_data.ri_scope and il self.logger.info('\nGenerating Oasis files (GUL=True, IL={}, RIL={})'.format(il, ri)) summarise_exposure = not self.disable_summarise_exposure validate_vulnerability_replacements(self.analysis_settings_json) # Prepare the target directory and copy the source files, profiles and # model version into it target_dir = prepare_input_files_directory( target_dir=self.oasis_files_dir, exposure_data=exposure_data, exposure_profile_fp=self.profile_loc_json, keys_fp=self.keys_data_csv, keys_errors_fp=self.keys_errors_csv, lookup_config_fp=self.lookup_config_json, model_version_fp=self.model_version_csv, complex_lookup_config_fp=self.lookup_complex_config_json, accounts_profile_fp=self.profile_acc_json, fm_aggregation_profile_fp=self.profile_fm_agg_json, ) # Get the profiles defining the exposure and accounts files, ID related # terms in these files, and FM aggregation hierarchy location_profile = get_json(src_fp=self.profile_loc_json) if self.profile_loc_json else self.profile_loc accounts_profile = get_json(src_fp=self.profile_acc_json) if self.profile_acc_json else self.profile_acc oed_hierarchy = get_oed_hierarchy(location_profile, accounts_profile) fm_aggregation_profile = get_json(src_fp=self.profile_fm_agg_json) if self.profile_fm_agg_json else self.profile_fm_agg # force fm_agg level keys to type int: if any(isinstance(lvl, str) for lvl in fm_aggregation_profile.keys()): fm_aggregation_profile = {int(k): v for k, v in fm_aggregation_profile.items()} if self.reporting_currency: exposure_data.reporting_currency = self.reporting_currency exposure_data.save(target_dir, self.reporting_currency, save_config=True) exposure_data.location.dataframe = prepare_location_df(exposure_data.location.dataframe) location_df = exposure_data.location.dataframe if il: exposure_data.account.dataframe = prepare_account_df(exposure_data.account.dataframe) account_df = exposure_data.account.dataframe else: account_df = None # If a pre-generated keys file path has not been provided, # then it is asssumed some model lookup assets have been provided, so # as to allow the lookup to be instantiated and called to generated # the keys file. _keys_fp = _keys_errors_fp = None if not self.keys_data_csv: _keys_fp = self.kwargs['keys_data_csv'] = os.path.join(target_dir, 'keys.csv') _keys_errors_fp = self.kwargs['keys_errors_csv'] = os.path.join(target_dir, 'keys-errors.csv') GenerateKeys(**self.kwargs).run() else: _keys_fp = os.path.join(target_dir, os.path.basename(self.keys_data_csv)) if self.keys_errors_csv: _keys_errors_fp = os.path.join(target_dir, os.path.basename(self.keys_errors_csv)) # Load keys file **** WARNING - REFACTOR THIS **** dtypes = { 'locid': 'int64', 'perilid': 'category', 'coveragetypeid': 'uint8', 'areaperilid': 'uint64', 'vulnerabilityid': 'uint32', 'modeldata': 'str' } keys_error_fp = os.path.join(os.path.dirname(_keys_fp), 'keys-errors.csv') if _keys_fp else 'Missing' missing_keys_msg = 'No successful lookup results found in the keys file - ' missing_keys_msg += 'Check the `keys-errors.csv` file for details. \n File path: {}'.format(keys_error_fp) keys_df = get_dataframe( src_fp=_keys_fp, col_dtypes=dtypes, empty_data_error_msg=missing_keys_msg, memory_map=True ) # ************************************************ # check that all loc_ids have been returned from keys lookup try: if self.keys_errors_csv: keys_errors_df = get_dataframe(src_fp=self.keys_errors_csv, memory_map=True) else: keys_errors_df = get_dataframe(src_fp=_keys_errors_fp, memory_map=True) except OasisException: # Assume empty file on read error. keys_errors_df = pd.DataFrame(columns=['locid']) returned_locid = set(keys_errors_df['locid']).union(set(keys_df['locid'])) del keys_errors_df missing_ids = set(location_df['loc_id']).difference(returned_locid) if len(missing_ids) > 0: raise OasisException(f'Lookup error: missing "loc_id" values from keys return: {list(missing_ids)}') # Columns from loc file to assign group_id model_damage_group_fields = [] model_hazard_group_fields = [] correlations: bool = False model_settings = None analysis_settings = None correlations_analysis_settings = None # If analysis settings file contains correlation settings, they will overwrite the ones in model settings if self.analysis_settings_json is not None: analysis_settings = AnalysisSettingSchema().get(self.analysis_settings_json) correlations_analysis_settings = analysis_settings.get('model_settings', {}).get('correlation_settings', None) if self.model_settings_json is not None: model_settings = ModelSettingSchema().get(self.model_settings_json) if correlations_analysis_settings is not None: model_settings['model_settings']['correlation_settings'] = correlations_analysis_settings correlations = establish_correlations(model_settings=model_settings) try: model_damage_group_fields = model_settings["data_settings"].get("damage_group_fields") except (KeyError, AttributeError, OasisException) as e: self.logger.warn(f'WARNING: Failed to load "damage_group_fields", file: {self.model_settings_json}, error: {e}') try: model_hazard_group_fields = model_settings["data_settings"].get("hazard_group_fields") except (KeyError, AttributeError, OasisException) as e: self.logger.warn(f'WARNING: Failed to load "hazard_group_fields", file: {self.model_settings_json}, error: {e}') # load group columns from model_settings.json if not set in kwargs (CLI) if model_damage_group_fields and not self.kwargs.get('group_id_cols'): damage_group_id_cols = model_damage_group_fields # otherwise load group cols from args else: damage_group_id_cols = self.damage_group_id_cols # load hazard group columns from model_settings.json if not set in kwargs (CLI) if model_hazard_group_fields and not self.kwargs.get('hazard_group_id_cols'): hazard_group_id_cols = model_hazard_group_fields # otherwise load group cols from args else: hazard_group_id_cols = self.hazard_group_id_cols # Workaround -- check model_settings and analysis_settings for 'do_disaggregation' # once general mechanism is done to work with any setting, this should be removed # see: https://github.com/OasisLMF/OasisLMF/pull/1552 settings_do_disaggregation = None if analysis_settings: settings_do_disaggregation = analysis_settings.get('do_disaggregation', None) elif model_settings and settings_do_disaggregation is None: settings_do_disaggregation = model_settings.get('model_settings', {}).get('do_disaggregation', None) if settings_do_disaggregation is not None: self.do_disaggregation = settings_do_disaggregation damage_group_id_cols: List[str] = process_group_id_cols(group_id_cols=damage_group_id_cols, exposure_df_columns=location_df, has_correlation_groups=correlations) hazard_group_id_cols: List[str] = process_group_id_cols(group_id_cols=hazard_group_id_cols, exposure_df_columns=location_df, has_correlation_groups=correlations) gul_inputs_df = get_gul_input_items( location_df, keys_df, peril_correlation_group_df=map_data(data=model_settings, logger=self.logger), correlations=correlations, exposure_profile=location_profile, damage_group_id_cols=damage_group_id_cols, hazard_group_id_cols=hazard_group_id_cols, do_disaggregation=self.do_disaggregation ) # If not in det. loss gen. scenario, write exposure summary file if summarise_exposure: write_exposure_summary( target_dir, location_df, keys_fp=_keys_fp, keys_errors_fp=_keys_errors_fp, exposure_profile=location_profile, ) # If exposure summary set, write valid columns for summary levels to file if summarise_exposure: write_summary_levels(location_df, account_df, exposure_data, target_dir) # Write the GUL input files files_prefixes = self.oasis_files_prefixes gul_input_files = write_gul_input_files( gul_inputs_df, target_dir, correlations_df=gul_inputs_df[CorrelationsData.COLUMNS], output_dir=self._get_output_dir(), oasis_files_prefixes=files_prefixes['gul'], chunksize=self.write_chunksize, ) gul_summary_mapping = get_summary_mapping(gul_inputs_df, oed_hierarchy) write_mapping_file(gul_summary_mapping, target_dir) del gul_summary_mapping # If no source accounts file path has been provided assume that IL # input files, and therefore also RI input files, are not needed if not il: # Write `summary_map.csv` for GUL only self.logger.info('\nOasis files generated: {}'.format(json.dumps(gul_input_files, indent=4))) return gul_input_files # Get the IL input items il_inputs_df = get_il_input_items( gul_inputs_df=gul_inputs_df.copy(), locations_df=exposure_data.location.dataframe, accounts_df=exposure_data.account.dataframe, oed_schema=exposure_data.oed_schema, exposure_profile=location_profile, accounts_profile=accounts_profile, fm_aggregation_profile=fm_aggregation_profile, do_disaggregation=self.do_disaggregation, ) # Write the IL/FM input files il_input_files = write_il_input_files( il_inputs_df, target_dir, oasis_files_prefixes=files_prefixes['il'], chunksize=self.write_chunksize ) fm_summary_mapping = get_summary_mapping(il_inputs_df, oed_hierarchy, is_fm_summary=True) write_mapping_file(fm_summary_mapping, target_dir, is_fm_summary=True) # Combine the GUL and IL input file paths into a single dict (for convenience) oasis_files = {**gul_input_files, **il_input_files} # If no RI input file paths (info. and scope) have been provided then # no RI input files are needed, just return the GUL and IL Oasis files if not ri: self.logger.info('\nOasis files generated: {}'.format(json.dumps(oasis_files, indent=4))) return oasis_files exposure_data.ri_info.dataframe, exposure_data.ri_scope.dataframe = prepare_reinsurance_df(exposure_data.ri_info.dataframe, exposure_data.ri_scope.dataframe) # Write the RI input files, and write the returned RI layer info. as a # file, which can be reused by the model runner (in the model execution # stage) to set the number of RI iterations fm_summary_mapping['loc_id'] = fm_summary_mapping['loc_id'].astype(exposure_data.location.dataframe['loc_id'].dtype) xref_descriptions_df = merge_oed_to_mapping( fm_summary_mapping, exposure_data.location.dataframe, ['loc_id'], {'LocGroup': '', 'ReinsTag': '', 'CountryCode': ''}) xref_descriptions_df[['PortNumber', 'AccNumber', 'PolNumber']] = xref_descriptions_df[['PortNumber', 'AccNumber', 'PolNumber']].astype(str) xref_descriptions_df = merge_oed_to_mapping( xref_descriptions_df, exposure_data.account.dataframe, ['PortNumber', 'AccNumber', 'PolNumber'], { 'CedantName': '', 'ProducerName': '', 'LOB': '', 'PolInceptionDate': '', 'PolExpiryDate': '', } ) xref_descriptions_df = xref_descriptions_df.sort_values(by='agg_id') del fm_summary_mapping self.kwargs['oed_info_csv'] = exposure_data.ri_info ri_layers = write_files_for_reinsurance( exposure_data.ri_info.dataframe, exposure_data.ri_scope.dataframe, xref_descriptions_df, target_dir, oasis_files['fm_xref'], self.logger ) with io.open(os.path.join(target_dir, 'ri_layers.json'), 'w', encoding='utf-8') as f: f.write(json.dumps(ri_layers, ensure_ascii=False, indent=4)) oasis_files['ri_layers'] = os.path.abspath(f.name) for layer, layer_info in ri_layers.items(): oasis_files['RI_{}'.format(layer)] = layer_info['directory'] self.logger.info('\nOasis files generated: {}'.format(json.dumps(oasis_files, indent=4))) return oasis_files
[docs] class GenerateDummyModelFiles(ComputationStep): """ Generates dummy model files. """ # Command line options
[docs] step_params = [ {'name': 'target_dir', 'flag': '-o', 'is_path': True, 'pre_exist': False, 'help': 'Path to the directory in which to generate the Model files'}, {'name': 'num_vulnerabilities', 'flag': '-v', 'required': True, 'type': int, 'help': 'Number of vulnerabilities'}, {'name': 'num_intensity_bins', 'flag': '-i', 'required': True, 'type': int, 'help': 'Number of intensity bins'}, {'name': 'num_damage_bins', 'flag': '-d', 'required': True, 'type': int, 'help': 'Number of damage bins'}, {'name': 'vulnerability_sparseness', 'flag': '-s', 'required': False, 'type': float, 'default': 1.0, 'help': 'Percentage of bins normalised to range [0,1] impacted for a vulnerability at an intensity level'}, {'name': 'num_events', 'flag': '-e', 'required': True, 'type': int, 'help': 'Number of events'}, {'name': 'num_areaperils', 'flag': '-a', 'required': True, 'type': int, 'help': 'Number of areaperils'}, {'name': 'areaperils_per_event', 'flag': '-A', 'required': False, 'type': int, 'default': None, 'help': 'Number of areaperils impacted per event'}, {'name': 'intensity_sparseness', 'flag': '-S', 'required': False, 'type': float, 'default': 1.0, 'help': 'Percentage of bins normalised to range [0,1] impacted for an event and areaperil'}, {'name': 'no_intensity_uncertainty', 'flag': '-u', 'required': False, 'default': False, 'action': 'store_true', 'help': 'No intensity uncertainty flag'}, {'name': 'num_periods', 'flag': '-p', 'required': True, 'type': int, 'help': 'Number of periods'}, {'name': 'periods_per_event_mean', 'flag': '-P', 'required': False, 'type': int, 'default': 1, 'help': 'Mean of truncated normal distribution sampled to determine number of periods per event'}, {'name': 'periods_per_event_stddev', 'flag': '-Q', 'required': False, 'type': float, 'default': 0.0, 'help': 'Standard deviation of truncated normal distribution sampled to determine number of periods per event'}, {'name': 'num_amplifications', 'flag': '-m', 'required': False, 'type': int, 'default': 0, 'help': 'Number of amplifications'}, {'name': 'min_pla_factor', 'flag': '-f', 'required': False, 'type': float, 'default': 0.875, 'help': 'Minimum Post Loss Amplification Factor'}, {'name': 'max_pla_factor', 'flag': '-F', 'required': False, 'type': float, 'default': 1.5, 'help': 'Maximum Post Loss Amplification Factor'}, {'name': 'num_randoms', 'flag': '-r', 'required': False, 'type': int, 'default': 0, 'help': 'Number of random numbers'}, {'name': 'random_seed', 'flag': '-R', 'required': False, 'type': int, 'default': - 1, 'help': 'Random seed (-1 for 1234 (default), 0 for current system time'} ]
def _validate_input_arguments(self): if self.vulnerability_sparseness > 1.0 or self.vulnerability_sparseness < 0.0: raise OasisException('Invalid value for --vulnerability-sparseness') if self.intensity_sparseness > 1.0 or self.intensity_sparseness < 0.0: raise OasisException('Invalid value for --intensity-sparseness') if not self.areaperils_per_event: self.areaperils_per_event = self.num_areaperils if self.areaperils_per_event > self.num_areaperils: raise OasisException('Number of areaperils per event exceeds total number of areaperils') if self.num_amplifications < 0.0: raise OasisException('Invalid value for --num-amplifications') if self.max_pla_factor < self.min_pla_factor: raise OasisException('Value for --max-pla-factor must be greater than that for --min-pla-factor') if self.min_pla_factor < 0: raise OasisException('Invalid value for --min-pla-factor') if self.random_seed < -1: raise OasisException('Invalid random seed') def _create_target_directory(self, label): utcnow = get_utctimestamp(fmt='%Y%m%d%H%M%S') if not self.target_dir: self.target_dir = os.path.join(os.getcwd(), 'runs', f'test-{label}-{utcnow}') self.target_dir = create_target_directory( self.target_dir, 'target test model files directory' ) def _prepare_run_directory(self): self.input_dir = os.path.join(self.target_dir, 'input') self.static_dir = os.path.join(self.target_dir, 'static') directories = [ self.input_dir, self.static_dir ] for directory in directories: if not os.path.exists(directory): Path(directory).mkdir(parents=True, exist_ok=True) def _set_footprint_files_inputs(self): self.footprint_files_inputs = { 'num_events': self.num_events, 'num_areaperils': self.num_areaperils, 'areaperils_per_event': self.areaperils_per_event, 'num_intensity_bins': self.num_intensity_bins, 'intensity_sparseness': self.intensity_sparseness, 'no_intensity_uncertainty': self.no_intensity_uncertainty, 'directory': self.static_dir } def _set_periods_per_event_parameters(self): self.periods_per_event_parameters = { 'mean': self.periods_per_event_mean, 'stddev': self.periods_per_event_stddev } def _get_model_file_objects(self): # vulnerability.bin, events.bin, footprint.bin, footprint.idx, # damage_bin_dict.bin and occurrence.bin self._set_footprint_files_inputs() self._set_periods_per_event_parameters() self.model_files = [ VulnerabilityFile( self.num_vulnerabilities, self.num_intensity_bins, self.num_damage_bins, self.vulnerability_sparseness, self.random_seed, self.static_dir ), EventsFile(self.num_events, self.input_dir), FootprintBinFile( **self.footprint_files_inputs, random_seed=self.random_seed ), DamageBinDictFile(self.num_damage_bins, self.static_dir), OccurrenceFile( self.num_events, self.num_periods, self.random_seed, self.input_dir, **self.periods_per_event_parameters ) ] if self.num_amplifications > 0: self.model_files += [ LossFactorsFile( self.num_events, self.num_amplifications, self.min_pla_factor, self.max_pla_factor, self.random_seed, self.static_dir ) ] if self.num_randoms > 0: self.model_files += [ RandomFile(self.num_randoms, self.random_seed, self.static_dir) ]
[docs] def run(self): self.logger.info('\nProcessing arguments - Creating Dummy Model Files') self._validate_input_arguments() self._create_target_directory(label='files') self._prepare_run_directory() self._get_model_file_objects() for model_file in self.model_files: self.logger.info(f'Writing {model_file.file_name}') model_file.write_file() self.logger.info(f'\nDummy Model files generated in {self.target_dir}')
[docs] class GenerateDummyOasisFiles(GenerateDummyModelFiles): """ Generates dummy model and Oasis GUL input files + optionally the IL/FM input files. """
[docs] step_params = [ {'name': 'num_locations', 'flag': '-l', 'required': True, 'type': int, 'help': 'Number of locations'}, {'name': 'coverages_per_location', 'flag': '-c', 'required': True, 'type': int, 'help': 'Number of coverage types per location'}, {'name': 'num_layers', 'flag': '-L', 'required': False, 'type': int, 'default': 1, 'help': 'Number of layers'} ]
[docs] chained_commands = [GenerateDummyModelFiles]
def _validate_input_arguments(self): super()._validate_input_arguments() if self.coverages_per_location > 4 or self.coverages_per_location < 1: raise OasisException('Number of supported coverage types is 1 to 4') def _get_gul_file_objects(self): # coverages.bin, items.bin and gulsummaryxref.bin self.gul_files = [ CoveragesFile( self.num_locations, self.coverages_per_location, self.random_seed, self.input_dir ), ItemsFile( self.num_locations, self.coverages_per_location, self.num_areaperils, self.num_vulnerabilities, self.random_seed, self.input_dir ), GULSummaryXrefFile( self.num_locations, self.coverages_per_location, self.input_dir ) ] if self.num_amplifications > 0: self.gul_files += [ AmplificationsFile( self.num_locations, self.coverages_per_location, self.num_amplifications, self.random_seed, self.input_dir ) ] def _get_fm_file_objects(self): # fm_programme.bin, fm_policytc.bin, fm_profile.bin, fm_xref.bin and # fmsummaryxref.bin self.fm_files = [ FMProgrammeFile( self.num_locations, self.coverages_per_location, self.input_dir ), FMPolicyTCFile( self.num_locations, self.coverages_per_location, self.num_layers, self.input_dir ), FMProfileFile(self.num_layers, self.input_dir), FMXrefFile( self.num_locations, self.coverages_per_location, self.num_layers, self.input_dir ), FMSummaryXrefFile( self.num_locations, self.coverages_per_location, self.num_layers, self.input_dir ) ]
[docs] def run(self): self.logger.info('\nProcessing arguments - Creating Model & Test Oasis Files') self._validate_input_arguments() self._create_target_directory(label='files') self._prepare_run_directory() self._get_model_file_objects() self._get_gul_file_objects() self._get_fm_file_objects() output_files = self.model_files + self.gul_files + self.fm_files for output_file in output_files: self.logger.info(f'Writing {output_file.file_name}') output_file.write_file() self.logger.info(f'\nDummy Model and Oasis files generated in {self.target_dir}')