import contextlib
import io
import logging
import multiprocessing
import os
import random
import re
import shutil
import string
from collections import Counter
from functools import partial
import pandas as pd
from ..utils.defaults import (EVE_DEFAULT_SHUFFLE, EVE_FISHER_YATES,
EVE_NO_SHUFFLE, EVE_ROUND_ROBIN, EVE_STD_SHUFFLE,
KTOOL_N_FM_PER_LB, KTOOL_N_GUL_PER_LB,
KTOOLS_ALLOC_GUL_DEFAULT,
KTOOLS_ALLOC_IL_DEFAULT, KTOOLS_ALLOC_RI_DEFAULT)
from ..utils.exceptions import OasisException
[docs]
logger = logging.getLogger(__name__)
[docs]
RUNTYPE_GROUNDUP_LOSS = 'gul'
[docs]
RUNTYPE_LOAD_BALANCED_LOSS = 'lb'
[docs]
RUNTYPE_INSURED_LOSS = 'il'
[docs]
RUNTYPE_REINSURANCE_LOSS = 'ri'
[docs]
RUNTYPE_REINSURANCE_GROSS_LOSS = 'rl'
[docs]
RUNTYPE_FULL_CORRELATION = 'fc'
[docs]
REINSURANCE_RUNTYPES = [
RUNTYPE_REINSURANCE_LOSS,
RUNTYPE_REINSURANCE_GROSS_LOSS
]
[docs]
WAIT_PROCESSING_SWITCHES = {
'full_uncertainty_aep': '-F',
'wheatsheaf_aep': '-W',
'sample_mean_aep': '-S',
'full_uncertainty_oep': '-f',
'wheatsheaf_oep': '-w',
'sample_mean_oep': '-s',
'wheatsheaf_mean_aep': '-M',
'wheatsheaf_mean_oep': '-m',
}
[docs]
ORD_EPT_OUTPUT_SWITCHES = {
"ept_full_uncertainty_aep": '-F',
"ept_full_uncertainty_oep": '-f',
"ept_mean_sample_aep": '-S',
"ept_mean_sample_oep": '-s',
"ept_per_sample_mean_aep": '-M',
"ept_per_sample_mean_oep": '-m',
}
[docs]
ORD_PSEPT_OUTPUT_SWITCHES = {
"psept_aep": '-W',
"psept_oep": '-w',
}
[docs]
ORD_LECCALC = {**ORD_EPT_OUTPUT_SWITCHES, **ORD_PSEPT_OUTPUT_SWITCHES}
[docs]
ORD_ALT_OUTPUT_SWITCHES = {
"alt_period": {
'csv_flag': '-o', 'parquet_flag': '-p', 'alct_flag': '-c',
'alct_confidence_level': '-l'
}
}
[docs]
ORD_ALT_MEANONLY_OUTPUT_SWITCHES = {
"alt_meanonly": {
'csv_flag': '-o', 'parquet_flag': '-p'
}
}
[docs]
ORD_PLT_OUTPUT_SWITCHES = {
"plt_sample": {
'csv_flag': '-S', 'ktools_exe': 'pltcalc', 'table_name': 'splt',
'parquet_flag': '-s', 'kat_flag': '-S'
},
"plt_quantile": {
'csv_flag': '-Q', 'ktools_exe': 'pltcalc', 'table_name': 'qplt',
'parquet_flag': '-q', 'kat_flag': '-Q'
},
"plt_moment": {
'csv_flag': '-M', 'ktools_exe': 'pltcalc', 'table_name': 'mplt',
'parquet_flag': '-m', 'kat_flag': '-M'
}
}
[docs]
ORD_ELT_OUTPUT_SWITCHES = {
"elt_quantile": {
'csv_flag': '-Q', 'ktools_exe': 'eltcalc', 'table_name': 'qelt',
'parquet_flag': '-q', 'kat_flag': '-q'
},
"elt_moment": {
'csv_flag': '-M', 'ktools_exe': 'eltcalc', 'table_name': 'melt',
'parquet_flag': '-m', 'kat_flag': '-m'
}
}
[docs]
ORD_SELT_OUTPUT_SWITCH = {
"elt_sample": {
'csv_flag': '-o', 'ktools_exe': 'summarycalctocsv', 'table_name': 'selt',
'parquet_flag': '-p', 'kat_flag': '-s'
}
}
[docs]
OUTPUT_SWITCHES = {
"plt_ord": ORD_PLT_OUTPUT_SWITCHES,
"elt_ord": ORD_ELT_OUTPUT_SWITCHES,
"selt_ord": ORD_SELT_OUTPUT_SWITCH
}
[docs]
EVE_SHUFFLE_OPTIONS = {
EVE_NO_SHUFFLE: {'eve': '-n ', 'kat_sorting': False},
EVE_ROUND_ROBIN: {'eve': '', 'kat_sorting': True},
EVE_FISHER_YATES: {'eve': '-r ', 'kat_sorting': False},
EVE_STD_SHUFFLE: {'eve': '-R ', 'kat_sorting': False}
}
[docs]
SUMMARY_TYPES = ['eltcalc', 'summarycalc', 'pltcalc']
[docs]
TRAP_FUNC = """
touch $LOG_DIR/stderror.err
ktools_monitor.sh $$ $LOG_DIR & pid0=$!
exit_handler(){
exit_code=$?
# disable handler
trap - QUIT HUP INT KILL TERM ERR EXIT
kill -9 $pid0 2> /dev/null
if [ "$exit_code" -gt 0 ]; then
# Error - run process clean up
echo 'Ktools Run Error - exitcode='$exit_code
set +x
group_pid=$(ps -p $$ -o pgid --no-headers)
sess_pid=$(ps -p $$ -o sess --no-headers)
script_pid=$$
printf "Script PID:%d, GPID:%s, SPID:%d\n" $script_pid $group_pid $sess_pid >> $LOG_DIR/killout.txt
ps -jf f -g $sess_pid > $LOG_DIR/subprocess_list
PIDS_KILL=$(pgrep -a --pgroup $group_pid | awk \'BEGIN { FS = "[ \\t\\n]+" }{ if ($1 >= \'$script_pid\') print}\' | grep -v celery | egrep -v *\\\.log$ | egrep -v *startup.sh$ | sort -n -r)
echo "$PIDS_KILL" >> $LOG_DIR/killout.txt
kill -9 $(echo "$PIDS_KILL" | awk \'BEGIN { FS = "[ \\t\\n]+" }{ print $1 }\') 2>/dev/null
exit $exit_code
else
# script successful
exit 0
fi
}
trap exit_handler QUIT HUP INT KILL TERM ERR EXIT"""
[docs]
def get_check_function(custom_gulcalc_log_start=None, custom_gulcalc_log_finish=None):
"""Creates a bash function to check the logs to ensure same number of process started and finsished.
Args:
custom_gulcalc_log_start (str): Custom message printed to the logs when a process starts.
custom_gulcalc_log_finish (str): Custom message printed to the logs when a process ends.
"""
check_function = """
check_complete(){
set +e
proc_list="eve getmodel gulcalc fmcalc summarycalc eltcalc aalcalc aalcalcmeanonly leccalc pltcalc ordleccalc modelpy gulpy fmpy gulmc summarypy"
has_error=0
for p in $proc_list; do
started=$(find log -name "${p}_[0-9]*.log" | wc -l)
finished=$(find log -name "${p}_[0-9]*.log" -exec grep -l "finish" {} + | wc -l)
if [ "$finished" -lt "$started" ]; then
echo "[ERROR] $p - $((started-finished)) processes lost"
has_error=1
elif [ "$started" -gt 0 ]; then
echo "[OK] $p"
fi
done
"""
# Add in check for custom gulcalc if settings are provided
if custom_gulcalc_log_start and custom_gulcalc_log_finish:
check_function += f"""
started=$( grep "{custom_gulcalc_log_start}" log/gul_stderror.err | wc -l)
finished=$( grep "{custom_gulcalc_log_finish}" log/gul_stderror.err | wc -l)
if [ "$finished" -lt "$started" ]; then
echo "[ERROR] gulcalc - $((started-finished)) processes lost"
has_error=1
elif [ "$started" -gt 0 ]; then
echo "[OK] gulcalc"
fi
"""
check_function += """ if [ "$has_error" -ne 0 ]; then
false # raise non-zero exit code
else
echo 'Run Completed'
fi
}"""
return check_function
[docs]
BASH_TRACE = """
# --- Redirect Bash trace to file ---
bash_logging_supported(){
local BASH_VER_MAJOR=${BASH_VERSION:0:1}
local BASH_VER_MINOR=${BASH_VERSION:2:1}
if [[ "$BASH_VER_MAJOR" -gt 4 ]]; then
echo 1; exit
fi
if [[ $BASH_VER_MAJOR -eq 4 ]] && [[ $BASH_VER_MINOR -gt 3 ]]; then
echo 1; exit
fi
echo 0
}
if [ $(bash_logging_supported) == 1 ]; then
exec > >(tee -ia $LOG_DIR/bash.log)
exec 2> >(tee -ia $LOG_DIR/bash.log >& 2)
exec 19> $LOG_DIR/bash.log
export BASH_XTRACEFD="19"
set -x
else
echo "WARNING: logging disabled, bash version '$BASH_VERSION' is not supported, minimum requirement is bash v4.4"
fi """
[docs]
def process_range(max_process_id, process_number=None):
"""
Creates an iterable for all the process ids, if process number is set
then an iterable containing only that number is returned.
This allows for the loss generation to be ran in different processes
rather than accross multiple cores.
:param max_process_id: The largest process number
:param process_number: If set iterable only containing this number is returned
:return: iterable containing all the process numbers to process
"""
if process_number is not None:
return [process_number]
else:
return range(1, max_process_id + 1)
[docs]
def get_modelcmd(modelpy: bool, server=False, peril_filter=[]) -> str:
"""
Gets the construct model command line argument for the bash script.
Args:
modelpy: (bool) if the getmodel Python setting is True or not
server: (bool) if set then enable 'TCP' ipc server/client mode
peril_filter: (list) list of perils to include (all included if empty)
Returns: C++ getmodel if modelpy is False, Python getmodel if the modelpy if False
"""
py_cmd = 'modelpy'
cpp_cmd = 'getmodel'
if modelpy is True:
if server is True:
py_cmd = f'{py_cmd} --data-server'
if peril_filter:
py_cmd = f"{py_cmd} --peril-filter {' '.join(peril_filter)}"
return py_cmd
else:
return cpp_cmd
[docs]
def get_gulcmd(gulpy, gulpy_random_generator, gulmc, gulmc_random_generator, gulmc_effective_damageability, gulmc_vuln_cache_size, modelpy_server, peril_filter, model_df_engine='oasis_data_manager.df_reader.reader.OasisPandasReader', dynamic_footprint=False):
"""Get the ground-up loss calculation command.
Args:
gulpy (bool): if True, return the python command name, else the c++ one.
Returns:
str: the ground-up loss calculation command
"""
if gulpy and gulmc:
raise ValueError("Expect either gulpy or gulmc to be True, got both True.")
if gulpy:
cmd = f'gulpy --random-generator={gulpy_random_generator}'
elif gulmc:
cmd = f"gulmc --random-generator={gulmc_random_generator} {'--data-server'*modelpy_server} --model-df-engine=\'{model_df_engine}\'"
if peril_filter:
cmd += f" --peril-filter {' '.join(peril_filter)}"
if gulmc_effective_damageability:
cmd += " --effective-damageability"
if gulmc_vuln_cache_size:
cmd += f" --vuln-cache-size {gulmc_vuln_cache_size}"
if dynamic_footprint:
cmd += " --dynamic-footprint True"
else:
cmd = 'gulcalc'
return cmd
[docs]
def get_fmcmd(fmpy, fmpy_low_memory=False, fmpy_sort_output=False):
if fmpy:
cmd = 'fmpy'
if fmpy_low_memory:
cmd += ' -l'
if fmpy_sort_output:
cmd += ' --sort-output'
return cmd
else:
return 'fmcalc'
[docs]
def print_command(command_file, cmd):
"""
Writes the supplied command to the end of the generated script
:param command_file: File to append command to.
:param cmd: The command to append
"""
with io.open(command_file, "a", encoding='utf-8') as myfile:
myfile.writelines(cmd + "\n")
[docs]
def leccalc_enabled(summary_options):
"""
Checks if leccalc is enabled in a summaries section
:param summary_options: Summaies section from an analysis_settings file
:type summary_options: dict
Example:
{
"aalcalc": true,
"eltcalc": true,
"id": 1,
"lec_output": true,
"leccalc": {
"full_uncertainty_aep": true,
"full_uncertainty_oep": true,
"return_period_file": true
}
}
:return: True is leccalc is enables, False otherwise.
"""
lec_options = summary_options.get('leccalc', {})
lec_boolean = summary_options.get('lec_output', False)
# Disabled if leccalc flag is missing or false
if not lec_boolean:
return False
# Backwards compatibility for nested "outputs" keys in lec_options
if "outputs" in lec_options:
lec_options = lec_options["outputs"]
# Enabled if at least one option is selected
for ouput_opt in lec_options:
if ouput_opt in WAIT_PROCESSING_SWITCHES and lec_options[ouput_opt]:
return True
return False
[docs]
def ord_enabled(summary_options, ORD_SWITCHES):
"""
Checks if ORD leccalc is enabled in a summaries section
:param summary_options: Summaies section from an analysis_settings file
:type summary_options: dict
:param ORD_SWITCHES: Options from the analysis_settings 'Summaies' section to search
:type ORD_SWITCHES: dict
Example:
{
"id": 1,
"ord_output": {
"ept_full_uncertainty_aep": true,
"ept_full_uncertainty_oep": true,
"ept_mean_sample_aep": true,
"ept_mean_sample_oep": true,
"ept_per_sample_mean_aep": true,
"ept_per_sample_mean_oep": true,
"psept_aep": true,
"psept_oep": true,
"return_period_file": true
}
}
:return: True is leccalc is enables, False otherwise.
"""
ord_options = summary_options.get('ord_output', {})
for ouput_opt in ord_options:
if ouput_opt in ORD_SWITCHES and ord_options[ouput_opt]:
return True
return False
[docs]
def do_post_wait_processing(
runtype,
analysis_settings,
filename,
process_counter,
work_sub_dir='',
output_dir='output/',
stderr_guard=True,
inuring_priority=None
):
if '{}_summaries'.format(runtype) not in analysis_settings:
return
if not inuring_priority:
inuring_priority = ''
for summary in analysis_settings['{}_summaries'.format(runtype)]:
if "id" in summary:
summary_set = summary['id']
# ktools ORIG - aalcalc
if summary.get('aalcalc'):
cmd = 'aalcalc -K{}{}_{}S{}_summaryaalcalc'.format(
work_sub_dir,
runtype,
inuring_priority,
summary_set
)
process_counter['lpid_monitor_count'] += 1
cmd = '{} > {}{}_{}S{}_aalcalc.csv'.format(
cmd, output_dir, runtype, inuring_priority, summary_set
)
if stderr_guard:
cmd = '( {} ) 2>> $LOG_DIR/stderror.err & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
else:
cmd = '{} & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
print_command(filename, cmd)
# ORD - PALT
if ord_enabled(summary, ORD_ALT_OUTPUT_SWITCHES):
cmd = 'aalcalc -K{}{}_{}S{}_summary_palt'.format(
work_sub_dir, runtype, inuring_priority, summary_set
)
palt_outfile_stem = '{}{}_{}S{}_palt'.format(
output_dir, runtype, inuring_priority, summary_set
)
alct_outfile_stem = '{}{}_{}S{}_alct'.format(
output_dir, runtype, inuring_priority, summary_set
)
alct_file_extension = ".csv"
if summary.get('ord_output', {}).get('parquet_format'):
alct_file_extension = ".parquet"
if summary.get('ord_output', {}).get('alct_convergence'):
cmd = '{} {} {}{}'.format(
cmd,
ORD_ALT_OUTPUT_SWITCHES.get('alt_period', {}).get('alct_flag', ''),
alct_outfile_stem,
alct_file_extension
)
if summary.get('ord_output', {}).get('alct_confidence'):
cmd = '{} {} {}'.format(
cmd,
ORD_ALT_OUTPUT_SWITCHES.get('alct_confidence_level', ''),
summary.get('ord_output', {}).get('alct_confidence')
)
if summary.get('ord_output', {}).get('parquet_format'):
cmd = '{} {}'.format(
cmd,
ORD_ALT_OUTPUT_SWITCHES.get('alt_period', {}).get('parquet_flag', '')
)
cmd = '{} {}.parquet'.format(cmd, palt_outfile_stem)
else:
cmd = '{} {}'.format(
cmd,
ORD_ALT_OUTPUT_SWITCHES.get('alt_period', {}).get('csv_flag', '')
)
cmd = '{} > {}.csv'.format(cmd, palt_outfile_stem)
process_counter['lpid_monitor_count'] += 1
if stderr_guard:
cmd = '( {} ) 2>> $LOG_DIR/stderror.err & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
else:
cmd = '{} & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
print_command(filename, cmd)
# ktools ORIG - aalcalcmeanonly
if summary.get('aalcalcmeanonly'):
cmd = 'aalcalcmeanonly -K{}{}_{}S{}_summaryaalcalcmeanonly'.format(
work_sub_dir, runtype, inuring_priority, summary_set
)
process_counter['lpid_monitor_count'] += 1
cmd = '{} > {}{}_{}S{}_aalcalcmeanonly.csv'.format(
cmd, output_dir, runtype, inuring_priority, summary_set
)
if stderr_guard:
cmd = '( {} ) 2>> $LOG_DIR/stderror.err & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
else:
cmd = '{} & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
print_command(filename, cmd)
# ORD - aalcalcmeanonly
if ord_enabled(summary, ORD_ALT_MEANONLY_OUTPUT_SWITCHES):
cmd = 'aalcalcmeanonly -K{}{}_{}S{}_summary_altmeanonly'.format(
work_sub_dir, runtype, inuring_priority, summary_set
)
altmeanonly_outfile_stem = '{}{}_{}S{}_altmeanonly'.format(
output_dir, runtype, inuring_priority, summary_set
)
if summary.get('ord_output', {}).get('parquet_format'):
cmd = '{} {}'.format(
cmd,
ORD_ALT_MEANONLY_OUTPUT_SWITCHES.get('alt_meanonly', {}).get('parquet_flag', '')
)
cmd = '{} {}.parquet'.format(cmd, altmeanonly_outfile_stem)
else:
cmd = '{} {}'.format(
cmd,
ORD_ALT_MEANONLY_OUTPUT_SWITCHES.get('alt_meanonly', {}).get('csv_flag', '')
)
cmd = '{} > {}.csv'.format(cmd, altmeanonly_outfile_stem)
process_counter['lpid_monitor_count'] += 1
if stderr_guard:
cmd = '( {} ) 2>> $LOG_DIR/stderror.err & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
else:
cmd = '{} & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
print_command(filename, cmd)
# ORD - PSEPT,EPT
if ord_enabled(summary, ORD_LECCALC):
ord_outputs = summary.get('ord_output', {})
ept_output = False
psept_output = False
cmd = 'ordleccalc {} -K{}{}_{}S{}_summaryleccalc'.format(
'-r' if ord_outputs.get('return_period_file') else '',
work_sub_dir,
runtype,
inuring_priority,
summary_set
)
process_counter['lpid_monitor_count'] += 1
for option, active in sorted(ord_outputs.items()):
# Add EPT switches
if active and option in ORD_EPT_OUTPUT_SWITCHES:
switch = ORD_EPT_OUTPUT_SWITCHES.get(option, '')
cmd = '{} {}'.format(cmd, switch)
if not ept_output:
ept_output = True
# Add PSEPT switches
if active and option in ORD_PSEPT_OUTPUT_SWITCHES:
switch = ORD_PSEPT_OUTPUT_SWITCHES.get(option, '')
cmd = '{} {}'.format(cmd, switch)
if not psept_output:
psept_output = True
ept_output_flag = '-O'
psept_output_flag = '-o'
outfile_ext = 'csv'
if summary.get('ord_output', {}).get('parquet_format'):
ept_output_flag = '-P'
psept_output_flag = '-p'
outfile_ext = 'parquet'
if ept_output:
cmd = '{} {} {}{}_{}S{}_ept.{}'.format(
cmd, ept_output_flag, output_dir, runtype,
inuring_priority, summary_set, outfile_ext
)
if psept_output:
cmd = '{} {} {}{}_{}S{}_psept.{}'.format(
cmd, psept_output_flag, output_dir, runtype,
inuring_priority, summary_set, outfile_ext
)
if stderr_guard:
cmd = '( {} ) 2>> $LOG_DIR/stderror.err & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
else:
cmd = '{} & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
print_command(filename, cmd)
# ktools ORIG - Leccalc
if leccalc_enabled(summary):
leccalc = summary.get('leccalc', {})
cmd = 'leccalc {} -K{}{}_{}S{}_summaryleccalc'.format(
'-r' if leccalc.get('return_period_file') else '',
work_sub_dir,
runtype,
inuring_priority,
summary_set
)
# Note: Backwards compatibility of "outputs" in lec_options
if "outputs" in leccalc:
leccalc = leccalc["outputs"]
process_counter['lpid_monitor_count'] += 1
for option, active in sorted(leccalc.items()):
if active and option in WAIT_PROCESSING_SWITCHES:
switch = WAIT_PROCESSING_SWITCHES.get(option, '')
cmd = '{} {} {}{}_{}S{}_leccalc_{}.csv'.format(
cmd, switch, output_dir, runtype,
inuring_priority, summary_set, option
)
if stderr_guard:
cmd = '( {} ) 2>> $LOG_DIR/stderror.err & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
else:
cmd = '{} & lpid{}=$!'.format(cmd, process_counter['lpid_monitor_count'])
print_command(filename, cmd)
[docs]
def get_fifo_name(fifo_dir, producer, producer_id, consumer=''):
"""Standard name for FIFO"""
if consumer:
return f'{fifo_dir}{producer}_{consumer}_P{producer_id}'
else:
return f'{fifo_dir}{producer}_P{producer_id}'
[docs]
def do_fifo_exec(producer, producer_id, filename, fifo_dir, action='mkfifo', consumer=''):
print_command(filename, f'{action} {get_fifo_name(fifo_dir, producer, producer_id, consumer)}')
[docs]
def do_fifos_exec(runtype, max_process_id, filename, fifo_dir, process_number=None, action='mkfifo', consumer=''):
for process_id in process_range(max_process_id, process_number):
do_fifo_exec(runtype, process_id, filename, fifo_dir, action, consumer)
print_command(filename, '')
[docs]
def do_fifos_exec_full_correlation(
runtype, max_process_id, filename, fifo_dir, process_number=None, action='mkfifo'):
for process_id in process_range(max_process_id, process_number):
print_command(filename, '{} {}{}_sumcalc_P{}'.format(
action, fifo_dir, runtype, process_id
))
print_command(filename, '')
for process_id in process_range(max_process_id, process_number):
print_command(filename, '{} {}{}_fmcalc_P{}'.format(
action, fifo_dir, runtype, process_id
))
print_command(filename, '')
[docs]
def do_fifos_calc(runtype, analysis_settings, max_process_id, filename, fifo_dir='fifo/', process_number=None, consumer_prefix=None, action='mkfifo'):
summaries = analysis_settings.get('{}_summaries'.format(runtype))
if not summaries:
return
if not consumer_prefix:
consumer_prefix = ''
for process_id in process_range(max_process_id, process_number):
for summary in summaries:
if 'id' in summary:
summary_set = summary['id']
do_fifo_exec(runtype, process_id, filename, fifo_dir, action, f'{consumer_prefix}S{summary_set}_summary')
if leccalc_enabled(summary) or ord_enabled(summary, ORD_LECCALC) or summary.get('aalcalc') or ord_enabled(summary, ORD_ALT_OUTPUT_SWITCHES):
idx_fifo = get_fifo_name(fifo_dir, runtype, process_id, f'{consumer_prefix}S{summary_set}_summary')
idx_fifo += '.idx'
print_command(filename, f'mkfifo {idx_fifo}')
for summary_type in SUMMARY_TYPES:
if summary.get(summary_type):
do_fifo_exec(runtype, process_id, filename, fifo_dir, action, f'{consumer_prefix}S{summary_set}_{summary_type}')
for ord_type, output_switch in OUTPUT_SWITCHES.items():
for ord_table in output_switch.keys():
if summary.get('ord_output', {}).get(ord_table):
do_fifo_exec(runtype, process_id, filename, fifo_dir, action, f'{consumer_prefix}S{summary_set}_{ord_type}')
break
print_command(filename, '')
[docs]
def create_workfolders(
runtype,
analysis_settings,
filename,
work_dir='work/',
inuring_priority=None
):
summaries = analysis_settings.get('{}_summaries'.format(runtype))
if not summaries:
return
if not inuring_priority:
inuring_priority = ''
for summary in summaries:
if 'id' in summary:
summary_set = summary['id']
# EDIT: leccalc and ordleccalc share the same summarycalc binary data
# only create the workfolders once if either option is selected
if leccalc_enabled(summary) or ord_enabled(summary, ORD_LECCALC):
print_command(
filename,
'mkdir -p {}{}_{}S{}_summaryleccalc'.format(work_dir, runtype, inuring_priority, summary_set)
)
if summary.get('aalcalc'):
print_command(
filename,
'mkdir -p {}{}_{}S{}_summaryaalcalc'.format(work_dir, runtype, inuring_priority, summary_set)
)
if summary.get('ord_output', {}).get('alt_period'):
print_command(
filename,
'mkdir -p {}{}_{}S{}_summary_palt'.format(work_dir, runtype, inuring_priority, summary_set)
)
if summary.get('aalcalcmeanonly'):
print_command(
filename,
'mkdir -p {}{}_{}S{}_summaryaalcalcmeanonly'.format(work_dir, runtype, inuring_priority, summary_set)
)
if summary.get('ord_output', {}).get('alt_meanonly'):
print_command(
filename,
'mkdir -p {}{}_{}S{}_summary_altmeanonly'.format(work_dir, runtype, inuring_priority, summary_set)
)
[docs]
def do_kats(
runtype,
analysis_settings,
max_process_id,
filename,
process_counter,
work_dir='work/kat/',
output_dir='output/',
sort_by_event=False,
process_number=None,
inuring_priority=None
):
summaries = analysis_settings.get('{}_summaries'.format(runtype))
if not summaries:
return False
if not inuring_priority:
inuring_priority = ''
anykats = False
for summary in summaries:
if 'id' in summary:
summary_set = summary['id']
if summary.get('eltcalc'):
anykats = True
cmd = 'kat' if sort_by_event else 'kat -u'
for process_id in process_range(max_process_id, process_number):
cmd = '{} {}{}_{}S{}_eltcalc_P{}'.format(
cmd, work_dir, runtype, inuring_priority,
summary_set, process_id
)
process_counter['kpid_monitor_count'] += 1
cmd = '{} > {}{}_{}S{}_eltcalc.csv & kpid{}=$!'.format(
cmd, output_dir, runtype, inuring_priority, summary_set,
process_counter['kpid_monitor_count']
)
print_command(filename, cmd)
if summary.get('pltcalc'):
anykats = True
cmd = 'kat' if sort_by_event else 'kat -u'
for process_id in process_range(max_process_id, process_number):
cmd = '{} {}{}_{}S{}_pltcalc_P{}'.format(
cmd, work_dir, runtype, inuring_priority,
summary_set, process_id
)
process_counter['kpid_monitor_count'] += 1
cmd = '{} > {}{}_{}S{}_pltcalc.csv & kpid{}=$!'.format(
cmd, output_dir, runtype, inuring_priority, summary_set,
process_counter['kpid_monitor_count']
)
print_command(filename, cmd)
if summary.get("summarycalc"):
anykats = True
cmd = 'kat' if sort_by_event else 'kat -u'
for process_id in process_range(max_process_id, process_number):
cmd = '{} {}{}_{}S{}_summarycalc_P{}'.format(
cmd, work_dir, runtype, inuring_priority,
summary_set, process_id
)
process_counter['kpid_monitor_count'] += 1
cmd = '{} > {}{}_{}S{}_summarycalc.csv & kpid{}=$!'.format(
cmd, output_dir, runtype, inuring_priority, summary_set,
process_counter['kpid_monitor_count']
)
print_command(filename, cmd)
for ord_type, output_switch in OUTPUT_SWITCHES.items():
for ord_table, v in output_switch.items():
if summary.get('ord_output', {}).get(ord_table):
anykats = True
cmd = 'kat' if sort_by_event else 'kat -u'
outfile_flag = '>'
outfile_ext = 'csv'
if summary.get('ord_output', {}).get('parquet_format'):
cmd = f'katparquet {v["kat_flag"]}'
outfile_flag = '-o'
outfile_ext = 'parquet'
for process_id in process_range(max_process_id, process_number):
cmd = f'{cmd} {work_dir}{runtype}_{inuring_priority}S{summary_set}_{ord_table}_P{process_id}'
process_counter['kpid_monitor_count'] += 1
cmd = f'{cmd} {outfile_flag} {output_dir}{runtype}_{inuring_priority}S{summary_set}_{v["table_name"]}.{outfile_ext}'
cmd = f'{cmd} & kpid{process_counter["kpid_monitor_count"]}=$!'
print_command(filename, cmd)
return anykats
[docs]
def do_summarycalcs(
runtype,
analysis_settings,
process_id,
filename,
summarypy,
fifo_dir='fifo/',
stderr_guard=True,
num_reinsurance_iterations=0,
gul_legacy_stream=None,
gul_full_correlation=False,
inuring_priority=None,
):
summaries = analysis_settings.get('{}_summaries'.format(runtype))
if not summaries:
return
if process_id == 1:
print_command(filename, '')
if summarypy:
summarycalc_switch = f'-t {runtype}'
else:
summarycalc_switch = '-f'
if runtype == RUNTYPE_GROUNDUP_LOSS:
if gul_legacy_stream:
# gul coverage stream
summarycalc_switch = '-g'
else:
# Accept item stream only
summarycalc_switch = '-i'
summarycalc_directory_switch = ""
inuring_priority_text = '' # Only relevant for reinsurance
if runtype == RUNTYPE_REINSURANCE_LOSS:
if inuring_priority.get('level'):
# Net reinsurance losses require zero losses in summarycalc output
# Text field for final inuring priority is empty string
summarycalc_directory_switch = f"-z -p {os.path.join('input', 'RI_' + str(inuring_priority['level']))}"
inuring_priority_text = inuring_priority['text']
if runtype == RUNTYPE_REINSURANCE_GROSS_LOSS:
# Gross reinsurance losses do not require zero losses in summarycalc output
summarycalc_directory_switch = f"-p {os.path.join('input', 'RI_' + str(inuring_priority['level']))}"
inuring_priority_text = inuring_priority['text']
input_filename_component = ''
if gul_full_correlation:
input_filename_component = '_sumcalc'
# Use -m flag to create summary index files
# This is likely to become default in future ktools releases
cmd = 'summarypy' if summarypy else 'summarycalc'
cmd = f'{cmd} -m {summarycalc_switch} {summarycalc_directory_switch}'
for summary in summaries:
if 'id' in summary:
summary_set = summary['id']
cmd = '{0} -{1} {4}{2}_{5}S{1}_summary_P{3}'.format(
cmd, summary_set, runtype, process_id, fifo_dir,
inuring_priority_text
)
cmd = '{0} < {1}{2}{3}_{5}P{4}'.format(
cmd, fifo_dir, runtype, input_filename_component, process_id,
inuring_priority_text
)
cmd = '( {0} ) 2>> $LOG_DIR/stderror.err &'.format(cmd) if stderr_guard else '{0} &'.format(cmd) # Wrap in subshell and pipe stderr to file
print_command(filename, cmd)
[docs]
def do_tees(
runtype,
analysis_settings,
process_id,
filename,
process_counter,
fifo_dir='fifo/',
work_dir='work/',
inuring_priority=None
):
summaries = analysis_settings.get('{}_summaries'.format(runtype))
if not summaries:
return
if not inuring_priority:
inuring_priority = ''
if process_id == 1:
print_command(filename, '')
for summary in summaries:
if 'id' in summary:
process_counter['pid_monitor_count'] += 1
summary_set = summary['id']
cmd = f'tee < {get_fifo_name(fifo_dir, runtype, process_id, f"{inuring_priority}S{summary_set}_summary")}'
if leccalc_enabled(summary) or ord_enabled(summary, ORD_LECCALC) or summary.get('aalcalc') or ord_enabled(summary, ORD_ALT_OUTPUT_SWITCHES):
cmd_idx = cmd + '.idx'
for summary_type in SUMMARY_TYPES:
if summary.get(summary_type):
cmd = f'{cmd} {get_fifo_name(fifo_dir, runtype, process_id, f"{inuring_priority}S{summary_set}_{summary_type}")}'
for ord_type, output_switch in OUTPUT_SWITCHES.items():
for ord_table in output_switch.keys():
if summary.get('ord_output', {}).get(ord_table):
cmd = f'{cmd} {get_fifo_name(fifo_dir, runtype, process_id, f"{inuring_priority}S{summary_set}_{ord_type}")}'
break
if summary.get('aalcalc'):
aalcalc_out = f'{work_dir}{runtype}_{inuring_priority}S{summary_set}_summaryaalcalc/P{process_id}'
cmd = f'{cmd} {aalcalc_out}.bin'
cmd_idx = f'{cmd_idx} {aalcalc_out}.idx'
if summary.get('ord_output', {}).get('alt_period'):
aalcalc_ord_out = f'{work_dir}{runtype}_{inuring_priority}S{summary_set}_summary_palt/P{process_id}'
cmd = f'{cmd} {aalcalc_ord_out}.bin'
cmd_idx = f'{cmd_idx} {aalcalc_ord_out}.idx'
if summary.get('aalcalcmeanonly'):
aalcalcmeanonly_out = f'{work_dir}{runtype}_{inuring_priority}S{summary_set}_summaryaalcalcmeanonly/P{process_id}'
cmd = f'{cmd} {aalcalcmeanonly_out}.bin'
if summary.get('ord_output', {}).get('alt_meanonly'):
aalcalcmeanonly_ord_out = f'{work_dir}{runtype}_{inuring_priority}S{summary_set}_summary_altmeanonly/P{process_id}'
cmd = f'{cmd} {aalcalcmeanonly_ord_out}.bin'
# leccalc and ordleccalc share the same summarycalc binary data
# only create the workfolders once if either option is selected
if leccalc_enabled(summary) or ord_enabled(summary, ORD_LECCALC):
leccalc_out = f'{work_dir}{runtype}_{inuring_priority}S{summary_set}_summaryleccalc/P{process_id}'
cmd = f'{cmd} {leccalc_out}.bin'
cmd_idx = f'{cmd_idx} {leccalc_out}.idx'
cmd = '{} > /dev/null & pid{}=$!'.format(cmd, process_counter['pid_monitor_count'])
print_command(filename, cmd)
if leccalc_enabled(summary) or ord_enabled(summary, ORD_LECCALC) or summary.get('aalcalc') or ord_enabled(summary, ORD_ALT_OUTPUT_SWITCHES):
process_counter['pid_monitor_count'] += 1
cmd_idx = '{} > /dev/null & pid{}=$!'.format(cmd_idx, process_counter['pid_monitor_count'])
print_command(filename, cmd_idx)
[docs]
def do_tees_fc_sumcalc_fmcalc(process_id, filename, correlated_output_stems):
if process_id == 1:
print_command(filename, '')
cmd = 'tee < {0}{1}'.format(
correlated_output_stems['gulcalc_output'], process_id
)
cmd = '{0} {1}{3} {2}{3} > /dev/null &'.format(
cmd,
correlated_output_stems['sumcalc_input'],
correlated_output_stems['fmcalc_input'],
process_id
)
print_command(filename, cmd)
[docs]
def do_ord(
runtype,
analysis_settings,
process_id,
filename,
process_counter,
fifo_dir='fifo/',
work_dir='work/',
stderr_guard=True,
inuring_priority=None
):
summaries = analysis_settings.get('{}_summaries'.format(runtype))
if not summaries:
return
if not inuring_priority:
inuring_priority = ''
if process_id == 1:
print_command(filename, '')
for summary in summaries:
if 'id' in summary:
summary_set = summary['id']
for ord_type, output_switch in OUTPUT_SWITCHES.items():
cmd = ''
fifo_out_name = ''
skip_line = True
for ord_table, flag_proc in output_switch.items():
if summary.get('ord_output', {}).get(ord_table):
if process_id != 1 and skip_line:
if ord_type == 'plt_ord':
cmd += ' -H'
else:
cmd += ' -s'
skip_line = False
if summary.get('ord_output', {}).get('parquet_format'):
cmd += f' {flag_proc["parquet_flag"]}'
else:
cmd += f' {flag_proc["csv_flag"]}'
fifo_out_name = get_fifo_name(f'{work_dir}kat/', runtype, process_id, f'{inuring_priority}S{summary_set}_{ord_table}')
if ord_type != 'selt_ord' or summary.get('ord_output', {}).get('parquet_format'):
cmd = f'{cmd} {fifo_out_name}'
if cmd:
fifo_in_name = get_fifo_name(fifo_dir, runtype, process_id, f'{inuring_priority}S{summary_set}_{ord_type}')
cmd = f'{cmd} < {fifo_in_name}'
if ord_type == 'selt_ord' and not summary.get('ord_output', {}).get('parquet_format'):
cmd = f'{cmd} > {fifo_out_name}'
process_counter['pid_monitor_count'] += 1
cmd = f'{flag_proc["ktools_exe"]}{cmd}'
if stderr_guard:
cmd = f'( {cmd} ) 2>> $LOG_DIR/stderror.err & pid{process_counter["pid_monitor_count"]}=$!'
else:
cmd = f'{cmd} & pid{process_counter["pid_monitor_count"]}=$!'
print_command(filename, cmd)
[docs]
def do_any(
runtype,
analysis_settings,
process_id,
filename,
process_counter,
fifo_dir='fifo/',
work_dir='work/',
stderr_guard=True,
inuring_priority=None
):
summaries = analysis_settings.get('{}_summaries'.format(runtype))
if not summaries:
return
if not inuring_priority:
inuring_priority = ''
if process_id == 1:
print_command(filename, '')
for summary in summaries:
if 'id' in summary:
summary_set = summary['id']
for summary_type in SUMMARY_TYPES:
if summary.get(summary_type):
# cmd exception for summarycalc
if summary_type == 'summarycalc':
cmd = 'summarycalctocsv'
else:
cmd = summary_type
if process_id != 1:
if summary_type == 'pltcalc':
cmd += ' -H'
else:
cmd += ' -s'
process_counter['pid_monitor_count'] += 1
fifo_in_name = get_fifo_name(fifo_dir, runtype, process_id, f'{inuring_priority}S{summary_set}_{summary_type}')
fifo_out_name = get_fifo_name(f'{work_dir}kat/', runtype, process_id, f'{inuring_priority}S{summary_set}_{summary_type}')
cmd = f'{cmd} < {fifo_in_name} > {fifo_out_name}'
if stderr_guard:
cmd = f'( {cmd} ) 2>> $LOG_DIR/stderror.err & pid{process_counter["pid_monitor_count"]}=$!'
else:
cmd = f'{cmd} & pid{process_counter["pid_monitor_count"]}=$!'
print_command(filename, cmd)
[docs]
def get_ri_inuring_priorities(analysis_settings, num_reinsurance_iterations):
intermediate_inuring_priorities = set(analysis_settings.get('ri_inuring_priorities', []))
ri_inuring_priorities = [
{
'text': INTERMEDIATE_INURING_PRIORITY_PREFIX + str(inuring_priority) + '_',
'level': inuring_priority
} for inuring_priority in intermediate_inuring_priorities if inuring_priority < num_reinsurance_iterations
]
ri_inuring_priorities.append({'text': '', 'level': num_reinsurance_iterations}) # Final inuring priority
return ri_inuring_priorities
[docs]
def get_rl_inuring_priorities(num_reinsurance_iterations):
rl_inuring_priorities = [
{
'text': INTERMEDIATE_INURING_PRIORITY_PREFIX + str(inuring_priority) + '_',
'level': inuring_priority
} for inuring_priority in range(1, num_reinsurance_iterations + 1)
]
return rl_inuring_priorities
[docs]
def rl(
analysis_settings,
max_process_id,
filename,
process_counter,
num_reinsurance_iterations,
summarypy,
fifo_dir='fifo/',
work_dir='work/',
stderr_guard=True,
process_number=None
):
for inuring_priority in get_rl_inuring_priorities(num_reinsurance_iterations):
for process_id in process_range(max_process_id, process_number):
do_any(
RUNTYPE_REINSURANCE_GROSS_LOSS, analysis_settings, process_id,
filename, process_counter, fifo_dir, work_dir, stderr_guard,
inuring_priority=inuring_priority['text']
)
for process_id in process_range(max_process_id, process_number):
do_ord(
RUNTYPE_REINSURANCE_GROSS_LOSS, analysis_settings, process_id,
filename, process_counter, fifo_dir, work_dir, stderr_guard,
inuring_priority=inuring_priority['text']
)
for process_id in process_range(max_process_id, process_number):
do_tees(
RUNTYPE_REINSURANCE_GROSS_LOSS, analysis_settings, process_id,
filename, process_counter, fifo_dir, work_dir,
inuring_priority=inuring_priority['text']
)
for process_id in process_range(max_process_id, process_number):
do_summarycalcs(
summarypy=summarypy,
runtype=RUNTYPE_REINSURANCE_GROSS_LOSS,
analysis_settings=analysis_settings,
process_id=process_id,
filename=filename,
fifo_dir=fifo_dir,
stderr_guard=stderr_guard,
num_reinsurance_iterations=num_reinsurance_iterations,
inuring_priority=inuring_priority
)
[docs]
def ri(
analysis_settings,
max_process_id,
filename,
process_counter,
num_reinsurance_iterations,
summarypy,
fifo_dir='fifo/',
work_dir='work/',
stderr_guard=True,
process_number=None
):
for inuring_priority in get_ri_inuring_priorities(analysis_settings, num_reinsurance_iterations):
for process_id in process_range(max_process_id, process_number):
do_any(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, process_id,
filename, process_counter, fifo_dir, work_dir, stderr_guard,
inuring_priority=inuring_priority['text']
)
for process_id in process_range(max_process_id, process_number):
do_ord(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, process_id,
filename, process_counter, fifo_dir, work_dir, stderr_guard,
inuring_priority=inuring_priority['text']
)
for process_id in process_range(max_process_id, process_number):
do_tees(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, process_id,
filename, process_counter, fifo_dir, work_dir,
inuring_priority=inuring_priority['text']
)
# TODO => insert server here
for process_id in process_range(max_process_id, process_number):
do_summarycalcs(
summarypy=summarypy,
runtype=RUNTYPE_REINSURANCE_LOSS,
analysis_settings=analysis_settings,
process_id=process_id,
filename=filename,
fifo_dir=fifo_dir,
stderr_guard=stderr_guard,
num_reinsurance_iterations=num_reinsurance_iterations,
inuring_priority=inuring_priority
)
[docs]
def il(analysis_settings, max_process_id, filename, process_counter, summarypy, fifo_dir='fifo/', work_dir='work/', stderr_guard=True, process_number=None):
for process_id in process_range(max_process_id, process_number):
do_any(RUNTYPE_INSURED_LOSS, analysis_settings, process_id, filename, process_counter, fifo_dir, work_dir, stderr_guard)
for process_id in process_range(max_process_id, process_number):
do_ord(RUNTYPE_INSURED_LOSS, analysis_settings, process_id, filename, process_counter, fifo_dir, work_dir, stderr_guard)
for process_id in process_range(max_process_id, process_number):
do_tees(RUNTYPE_INSURED_LOSS, analysis_settings, process_id, filename, process_counter, fifo_dir, work_dir)
for process_id in process_range(max_process_id, process_number):
do_summarycalcs(
summarypy=summarypy,
runtype=RUNTYPE_INSURED_LOSS,
analysis_settings=analysis_settings,
process_id=process_id,
filename=filename,
fifo_dir=fifo_dir,
stderr_guard=stderr_guard,
)
[docs]
def do_gul(
analysis_settings,
max_process_id,
filename,
process_counter,
summarypy,
fifo_dir='fifo/',
work_dir='work/',
gul_legacy_stream=None,
stderr_guard=True,
process_number=None,
):
for process_id in process_range(max_process_id, process_number):
do_any(RUNTYPE_GROUNDUP_LOSS, analysis_settings, process_id, filename, process_counter, fifo_dir, work_dir, stderr_guard)
for process_id in process_range(max_process_id, process_number):
do_ord(RUNTYPE_GROUNDUP_LOSS, analysis_settings, process_id, filename, process_counter, fifo_dir, work_dir, stderr_guard)
for process_id in process_range(max_process_id, process_number):
do_tees(RUNTYPE_GROUNDUP_LOSS, analysis_settings, process_id, filename, process_counter, fifo_dir, work_dir)
for process_id in process_range(max_process_id, process_number):
do_summarycalcs(
summarypy=summarypy,
runtype=RUNTYPE_GROUNDUP_LOSS,
analysis_settings=analysis_settings,
process_id=process_id,
filename=filename,
gul_legacy_stream=gul_legacy_stream,
fifo_dir=fifo_dir,
stderr_guard=stderr_guard
)
[docs]
def do_gul_full_correlation(
analysis_settings,
max_process_id,
filename,
process_counter,
fifo_dir='fifo/full_correlation/',
work_dir='work/full_correlation/',
gul_legacy_stream=None,
stderr_guard=None,
process_number=None,
):
for process_id in process_range(max_process_id, process_number):
do_any(
RUNTYPE_GROUNDUP_LOSS, analysis_settings, process_id, filename,
process_counter, fifo_dir, work_dir
)
for process_id in process_range(max_process_id, process_number):
do_tees(
RUNTYPE_GROUNDUP_LOSS, analysis_settings, process_id, filename,
process_counter, fifo_dir, work_dir
)
print_command(filename, '')
[docs]
def do_waits(wait_variable, wait_count, filename):
"""
Add waits to the script
:param wait_variable: The type of wait
:type wait_variable: str
:param wait_count: The number of processes to wait for
:type wait_count: int
:param filename: Script to add waits to
:type filename: str
"""
if wait_count > 0:
cmd = 'wait'
for pid in range(1, wait_count + 1):
cmd = '{} ${}{}'.format(cmd, wait_variable, pid)
print_command(filename, cmd)
print_command(filename, '')
[docs]
def do_pwaits(filename, process_counter):
"""
Add pwaits to the script
"""
do_waits('pid', process_counter['pid_monitor_count'], filename)
[docs]
def do_awaits(filename, process_counter):
"""
Add awaits to the script
"""
do_waits('apid', process_counter['apid_monitor_count'], filename)
[docs]
def do_lwaits(filename, process_counter):
"""
Add lwaits to the script
"""
do_waits('lpid', process_counter['lpid_monitor_count'], filename)
[docs]
def do_kwaits(filename, process_counter):
"""
Add kwaits to the script
"""
do_waits('kpid', process_counter['kpid_monitor_count'], filename)
[docs]
def get_getmodel_itm_cmd(
number_of_samples,
gul_threshold,
use_random_number_file,
gul_alloc_rule,
item_output,
process_id,
max_process_id,
correlated_output,
eve_shuffle_flag,
modelpy=False,
modelpy_server=False,
peril_filter=[],
gulpy=False,
gulpy_random_generator=1,
gulmc=False,
gulmc_random_generator=1,
gulmc_effective_damageability=False,
gulmc_vuln_cache_size=200,
model_df_engine='oasis_data_manager.df_reader.reader.OasisPandasReader',
dynamic_footprint=False,
**kwargs):
"""
Gets the getmodel ktools command (3.1.0+) Gulcalc item stream
:param number_of_samples: The number of samples to run
:type number_of_samples: int
:param gul_threshold: The GUL threshold to use
:type gul_threshold: float
:param use_random_number_file: flag to use the random number file
:type use_random_number_file: bool
:param gul_alloc_rule: back allocation rule for gulcalc
:type gul_alloc_rule: int
:param item_output: The item output
:type item_output: str
:param eve_shuffle_flag: The event shuffling rule
:type eve_shuffle_flag: str
:param model_df_engine: The engine to use when loading dataframes
:type model_df_engine: str
:return: The generated getmodel command
"""
cmd = f'eve {eve_shuffle_flag}{process_id} {max_process_id} | '
if gulmc is True:
cmd += f'{get_gulcmd(gulpy, gulpy_random_generator, gulmc, gulmc_random_generator, gulmc_effective_damageability, gulmc_vuln_cache_size, modelpy_server, peril_filter, model_df_engine=model_df_engine, dynamic_footprint=dynamic_footprint)} -S{number_of_samples} -L{gul_threshold}'
else:
cmd += f'{get_modelcmd(modelpy, modelpy_server, peril_filter)} | {get_gulcmd(gulpy, gulpy_random_generator, False, 0, False, 0, False, [], model_df_engine=model_df_engine)} -S{number_of_samples} -L{gul_threshold}'
if use_random_number_file:
if not gulpy and not gulmc:
# append this arg only if gulcalc is used
cmd = '{} -r'.format(cmd)
if correlated_output != '':
if not gulpy and not gulmc:
# append this arg only if gulcalc is used
cmd = '{} -j {}'.format(cmd, correlated_output)
cmd = '{} -a{}'.format(cmd, gul_alloc_rule)
if not gulpy and not gulmc:
# append this arg only if gulcalc is used
cmd = '{} -i {}'.format(cmd, item_output)
else:
cmd = '{} {}'.format(cmd, item_output)
return cmd
[docs]
def get_getmodel_cov_cmd(
number_of_samples,
gul_threshold,
use_random_number_file,
coverage_output,
item_output,
process_id,
max_process_id,
eve_shuffle_flag,
modelpy=False,
modelpy_server=False,
peril_filter=[],
gulpy=False,
gulpy_random_generator=1,
gulmc=False,
gulmc_random_generator=1,
gulmc_effective_damageability=False,
gulmc_vuln_cache_size=200,
model_df_engine='oasis_data_manager.df_reader.reader.OasisPandasReader',
dynamic_footprint=False,
**kwargs) -> str:
"""
Gets the getmodel ktools command (version < 3.0.8) gulcalc coverage stream
:param number_of_samples: The number of samples to run
:type number_of_samples: int
:param gul_threshold: The GUL threshold to use
:type gul_threshold: float
:param use_random_number_file: flag to use the random number file
:type use_random_number_file: bool
:param coverage_output: The coverage output
:type coverage_output: str
:param item_output: The item output
:type item_output: str
:param eve_shuffle_flag: The event shuffling rule
:type eve_shuffle_flag: str
:param df_engine: The engine to use when loading dataframes
:type df_engine: str
:return: (str) The generated getmodel command
"""
cmd = f'eve {eve_shuffle_flag}{process_id} {max_process_id} | '
if gulmc is True:
cmd += f'{get_gulcmd(gulpy, gulpy_random_generator, gulmc, gulmc_random_generator, gulmc_effective_damageability, gulmc_vuln_cache_size, modelpy_server, peril_filter, model_df_engine=model_df_engine,dynamic_footprint=dynamic_footprint)} -S{number_of_samples} -L{gul_threshold}'
else:
cmd += f'{get_modelcmd(modelpy, modelpy_server, peril_filter)} | {get_gulcmd(gulpy, gulpy_random_generator, False, 0, False, 0, False, [], model_df_engine=model_df_engine)} -S{number_of_samples} -L{gul_threshold}'
if use_random_number_file:
if not gulpy and not gulmc:
# append this arg only if gulcalc is used
cmd = '{} -r'.format(cmd)
if coverage_output != '':
if not gulpy and not gulmc:
# append this arg only if gulcalc is used
cmd = '{} -c {}'.format(cmd, coverage_output)
if not gulpy and not gulmc:
# append this arg only if gulcalc is used
if item_output != '':
cmd = '{} -i {}'.format(cmd, item_output)
else:
cmd = '{} {}'.format(cmd, item_output)
return cmd
[docs]
def add_pid_to_shell_command(cmd, process_counter):
"""
Add a variable to the end of a command in order to track the ID of the process executing it.
Each time this function is called, the counter `process_counter` is incremented.
Args:
cmd (str): the command whose process ID is to be stored in a variable.
process_counter (Counter or dict): the number of process IDs that are being tracked.
Returns:
cmd (str): the updated command string.
"""
process_counter["pid_monitor_count"] += 1
cmd = f'{cmd} pid{process_counter["pid_monitor_count"]}=$!'
return cmd
[docs]
def get_main_cmd_ri_stream(
cmd,
process_id,
il_output,
il_alloc_rule,
ri_alloc_rule,
num_reinsurance_iterations,
fifo_dir='fifo/',
stderr_guard=True,
from_file=False,
fmpy=True,
fmpy_low_memory=False,
fmpy_sort_output=False,
step_flag='',
process_counter=None,
ri_inuring_priorities=None,
rl_inuring_priorities=None
):
"""
Gets the fmcalc ktools command reinsurance stream
:param cmd: either gulcalc command stream or correlated output file
:type cmd: str
:param process_id: ID corresponding to thread
:type process_id: int
:param il_output: If insured loss outputs required
:type il_output: Boolean
:param il_alloc_rule: insured loss allocation rule for fmcalc
:type il_alloc_rule: int
:param ri_alloc_rule: reinsurance allocation rule for fmcalc
:type ri_alloc_rule: int
:param num_reinsurance_iterations: number of reinsurance iterations
:type num_reinsurance_iterations: int
:param fifo_dir: path to fifo directory
:type fifo_dir: str
:param stderr_guard: send stderr output to log file
:type stderr_guard: bool
:param from_file: must be true if cmd is a file and false if it can be piped
:type from_file: bool
:param ri_inuring_priorities: Inuring priorities where net output has been requested
:type ri_inuring_priorities: dict
:param rl_inuring_priorities: Inuring priorities where gross output has been requested
:type rl_inuring_priorities: dict
"""
if from_file:
main_cmd = f'{get_fmcmd(fmpy, fmpy_low_memory, fmpy_sort_output)} -a{il_alloc_rule}{step_flag} < {cmd}'
else:
main_cmd = f'{cmd} | {get_fmcmd(fmpy, fmpy_low_memory, fmpy_sort_output)} -a{il_alloc_rule}{step_flag}'
if il_output:
main_cmd += f" | tee {get_fifo_name(fifo_dir, RUNTYPE_INSURED_LOSS, process_id)}"
for i in range(1, num_reinsurance_iterations + 1):
main_cmd += f" | {get_fmcmd(fmpy, fmpy_low_memory, fmpy_sort_output)} -a{ri_alloc_rule} -p {os.path.join('input', 'RI_'+str(i))}"
if rl_inuring_priorities: # If rl output is requested then produce gross output at all inuring priorities
main_cmd += f" -o {get_fifo_name(fifo_dir, RUNTYPE_REINSURANCE_GROSS_LOSS, process_id, consumer=rl_inuring_priorities[i].rstrip('_'))}"
if i < num_reinsurance_iterations: # Net output required to process next inuring priority
main_cmd += ' -n -'
if i in ri_inuring_priorities.keys():
if i == num_reinsurance_iterations: # Final inuring priority always produces net output if ri output requested
ri_fifo_name = get_fifo_name(fifo_dir, RUNTYPE_REINSURANCE_LOSS, process_id)
main_cmd += f" -n - > {ri_fifo_name}"
else:
main_cmd += f" | tee {get_fifo_name(fifo_dir, RUNTYPE_REINSURANCE_LOSS, process_id, consumer=ri_inuring_priorities[i].rstrip('_'))}"
main_cmd = f'( {main_cmd} ) 2>> $LOG_DIR/stderror.err' if stderr_guard else f'{main_cmd}'
main_cmd = f'( {main_cmd} ) &'
if process_counter is not None:
main_cmd = add_pid_to_shell_command(main_cmd, process_counter)
return main_cmd
[docs]
def get_main_cmd_il_stream(
cmd,
process_id,
il_alloc_rule,
fifo_dir='fifo/',
stderr_guard=True,
from_file=False,
fmpy=True,
fmpy_low_memory=False,
fmpy_sort_output=False,
step_flag='',
process_counter=None,
):
"""
Gets the fmcalc ktools command insured losses stream
:param cmd: either gulcalc command stream or correlated output file
:type cmd: str
:param process_id: ID corresponding to thread
:type process_id: int
:param il_alloc_rule: insured loss allocation rule for fmcalc
:type il_alloc_rule: int
:param fifo_dir: path to fifo directory
:type fifo_dir: str
:param stderr_guard: send stderr output to log file
:type stderr_guard: bool
:param from_file: must be true if cmd is a file and false if it can be piped
:type from_file: bool
:return: generated fmcalc command as str
"""
il_fifo_name = get_fifo_name(fifo_dir, RUNTYPE_INSURED_LOSS, process_id)
if from_file:
main_cmd = f'{get_fmcmd(fmpy, fmpy_low_memory, fmpy_sort_output)} -a{il_alloc_rule}{step_flag} < {cmd} > {il_fifo_name}'
else:
# need extra space at the end to pass test
main_cmd = f'{cmd} | {get_fmcmd(fmpy, fmpy_low_memory, fmpy_sort_output)} -a{il_alloc_rule}{step_flag} > {il_fifo_name} '
main_cmd = f'( {main_cmd} ) 2>> $LOG_DIR/stderror.err' if stderr_guard else f'{main_cmd}'
main_cmd = f'( {main_cmd} ) &'
if process_counter is not None:
main_cmd = add_pid_to_shell_command(main_cmd, process_counter)
return main_cmd
[docs]
def get_main_cmd_gul_stream(
cmd,
process_id,
fifo_dir='fifo/',
stderr_guard=True,
consumer='',
process_counter=None,
):
"""
Gets the command to output ground up losses
:param cmd: either gulcalc command stream or correlated output file
:type cmd: str
:param process_id: ID corresponding to thread
:type process_id: int
:param fifo_dir: path to fifo directory
:type fifo_dir: str
:param stderr_guard: send stderr output to log file
:type stderr_guard: bool
:param consumer: optional name of the consumer of the stream
:type consumer: string
:return: generated command as str
"""
gul_fifo_name = get_fifo_name(fifo_dir, RUNTYPE_GROUNDUP_LOSS, process_id, consumer)
main_cmd = f'{cmd} > {gul_fifo_name} '
main_cmd = f'( {main_cmd} ) 2>> $LOG_DIR/stderror.err' if stderr_guard else f'{main_cmd}'
main_cmd = f'( {main_cmd} ) & '
if process_counter is not None:
main_cmd = add_pid_to_shell_command(main_cmd, process_counter)
return main_cmd
[docs]
def get_complex_model_cmd(custom_gulcalc_cmd, analysis_settings):
# If `given_gulcalc_cmd` is set then always run as a complex model
# and raise an exception when not found in PATH
if custom_gulcalc_cmd:
if not shutil.which(custom_gulcalc_cmd):
raise OasisException(
'Run error: Custom Gulcalc command "{}" explicitly set but not found in path.'.format(custom_gulcalc_cmd)
)
# when not set then fallback to previous behaviour:
# Check if a custom binary `<supplier>_<model>_gulcalc` exists in PATH
else:
inferred_gulcalc_cmd = "{}_{}_gulcalc".format(
analysis_settings.get('model_supplier_id'),
analysis_settings.get('model_name_id'))
if shutil.which(inferred_gulcalc_cmd):
custom_gulcalc_cmd = inferred_gulcalc_cmd
if custom_gulcalc_cmd:
def custom_get_getmodel_cmd(
number_of_samples,
gul_threshold,
use_random_number_file,
coverage_output,
item_output,
process_id,
max_process_id,
gul_alloc_rule,
stderr_guard,
gul_legacy_stream=False,
**kwargs
):
cmd = "{} -e {} {} -a {} -p {}".format(
custom_gulcalc_cmd,
process_id,
max_process_id,
os.path.abspath("analysis_settings.json"),
"input")
if gul_legacy_stream and coverage_output != '':
cmd = '{} -c {}'.format(cmd, coverage_output)
if item_output != '':
cmd = '{} -i {}'.format(cmd, item_output)
if stderr_guard:
cmd = '({}) 2>> $LOG_DIR/gul_stderror.err'.format(cmd)
return cmd
else:
custom_get_getmodel_cmd = None
return custom_get_getmodel_cmd
[docs]
def do_computes(outputs):
if len(outputs) == 0:
return
for output in outputs:
filename = output['compute_args']['filename']
print_command(filename, '')
print_command(
filename,
'# --- Do {} loss computes ---'.format(output['loss_type'])
)
output['compute_fun'](**output['compute_args'])
[docs]
def get_main_cmd_lb(num_lb, num_in_per_lb, num_out_per_lb, get_input_stream_name, get_output_stream_name, stderr_guard):
in_id = 1
out_id = 1
for _ in range(num_lb):
lb_in_l = []
for _ in range(num_in_per_lb):
lb_in_fifo_name = get_input_stream_name(producer_id=in_id)
in_id += 1
lb_in_l.append(lb_in_fifo_name)
lb_in = ' '.join(lb_in_l)
lb_out_l = []
for _ in range(num_out_per_lb):
lb_out_fifo_name = get_output_stream_name(producer_id=out_id)
out_id += 1
lb_out_l.append(lb_out_fifo_name)
lb_out = ' '.join(lb_out_l)
lb_main_cmd = f"load_balancer -i {lb_in} -o {lb_out}"
lb_main_cmd = f'( {lb_main_cmd} ) 2>> $LOG_DIR/stderror.err &' if stderr_guard else f'{lb_main_cmd} &'
yield lb_main_cmd
[docs]
def get_pla_cmd(pla, secondary_factor, uniform_factor):
"""
Determine whether Post Loss Amplification should be implemented and issue
plapy command.
Args:
pla (bool): flag to apply post loss amplification
secondary_factor (float): secondary factor to apply to post loss
amplification
uniform_factor (float): uniform factor to apply across all losses
Returns:
pla_cmd (str): post loss amplification command
"""
pla_cmd = ' | plapy' * pla
if pla:
if uniform_factor > 0:
pla_cmd += f' -F {uniform_factor}'
elif secondary_factor != 1:
pla_cmd += f' -f {secondary_factor}'
return pla_cmd
[docs]
def bash_params(
analysis_settings,
max_process_id=-1,
number_of_processes=-1,
num_reinsurance_iterations=0,
model_storage_json=None,
fifo_tmp_dir=True,
gul_alloc_rule=None,
il_alloc_rule=None,
ri_alloc_rule=None,
num_gul_per_lb=None,
num_fm_per_lb=None,
stderr_guard=True,
gul_legacy_stream=False,
bash_trace=False,
filename='run_kools.sh',
_get_getmodel_cmd=None,
custom_gulcalc_cmd=None,
custom_gulcalc_log_start=None,
custom_gulcalc_log_finish=None,
custom_args={},
fmpy=True,
fmpy_low_memory=False,
fmpy_sort_output=False,
event_shuffle=None,
modelpy=False,
gulpy=False,
gulpy_random_generator=1,
gulmc=False,
gulmc_random_generator=1,
gulmc_effective_damageability=False,
gulmc_vuln_cache_size=200,
# new options
process_number=None,
remove_working_files=True,
model_run_dir='',
model_py_server=False,
summarypy=False,
peril_filter=[],
exposure_df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader",
model_df_engine="oasis_data_manager.df_reader.reader.OasisPandasReader",
dynamic_footprint=False,
**kwargs
):
bash_params = {}
bash_params['max_process_id'] = max_process_id if max_process_id > 0 else multiprocessing.cpu_count()
bash_params['number_of_processes'] = number_of_processes if number_of_processes > 0 else multiprocessing.cpu_count()
bash_params['process_counter'] = Counter()
bash_params['num_reinsurance_iterations'] = num_reinsurance_iterations
bash_params['fifo_tmp_dir'] = fifo_tmp_dir
bash_params['gul_legacy_stream'] = gul_legacy_stream
bash_params['bash_trace'] = bash_trace
bash_params['filename'] = filename
bash_params['custom_args'] = custom_args
bash_params['modelpy'] = modelpy
bash_params['gulpy'] = gulpy
bash_params['gulpy_random_generator'] = gulpy_random_generator
bash_params['gulmc'] = gulmc
bash_params['gulmc_random_generator'] = gulmc_random_generator
bash_params['gulmc_effective_damageability'] = gulmc_effective_damageability
bash_params['gulmc_vuln_cache_size'] = gulmc_vuln_cache_size
bash_params['fmpy'] = fmpy
bash_params['fmpy_low_memory'] = fmpy_low_memory
bash_params['fmpy_sort_output'] = fmpy_sort_output
bash_params['process_number'] = process_number
bash_params['remove_working_files'] = remove_working_files
bash_params['model_run_dir'] = model_run_dir
if model_storage_json:
bash_params['model_storage_json'] = model_storage_json
bash_params['gul_threshold'] = analysis_settings.get('gul_threshold', 0)
bash_params['number_of_samples'] = analysis_settings.get('number_of_samples', 0)
bash_params["static_path"] = os.path.join(model_run_dir, "static/")
bash_params["model_py_server"] = model_py_server
bash_params['summarypy'] = summarypy if not gul_legacy_stream else False # summarypy doesn't support gul_legacy_stream
bash_params["peril_filter"] = peril_filter
# set complex model gulcalc command
if not _get_getmodel_cmd and custom_gulcalc_cmd:
bash_params['_get_getmodel_cmd'] = get_complex_model_cmd(custom_gulcalc_cmd, analysis_settings)
else:
bash_params['_get_getmodel_cmd'] = _get_getmodel_cmd
# Set custom gulcalc log statment checks,
bash_params['custom_gulcalc_log_start'] = custom_gulcalc_log_start or analysis_settings.get('model_custom_gulcalc_log_start')
bash_params['custom_gulcalc_log_finish'] = custom_gulcalc_log_finish or analysis_settings.get('model_custom_gulcalc_log_finish')
# Set fifo dirs
if fifo_tmp_dir:
bash_params['fifo_queue_dir'] = '/tmp/{}/fifo/'.format(''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)))
else:
bash_params['fifo_queue_dir'] = os.path.join(model_run_dir, 'fifo/')
# set work dir
if process_number:
work_base_dir = f'{process_number}.work/'
else:
work_base_dir = 'work/'
# set dirs
bash_params['stderr_guard'] = stderr_guard
bash_params['gul_item_stream'] = not gul_legacy_stream
bash_params['work_dir'] = os.path.join(model_run_dir, work_base_dir)
bash_params['work_kat_dir'] = os.path.join(model_run_dir, os.path.join(work_base_dir, 'kat/'))
bash_params['work_full_correlation_dir'] = os.path.join(model_run_dir, os.path.join(work_base_dir, 'full_correlation/'))
bash_params['work_full_correlation_kat_dir'] = os.path.join(model_run_dir, os.path.join(work_base_dir, 'full_correlation/kat/'))
bash_params['output_dir'] = os.path.join(model_run_dir, 'output/')
bash_params['output_full_correlation_dir'] = os.path.join(model_run_dir, 'output/full_correlation/')
bash_params['fifo_full_correlation_dir'] = os.path.join(bash_params['fifo_queue_dir'], 'full_correlation/')
# Set default alloc/shuffle rules if missing
bash_params['gul_alloc_rule'] = gul_alloc_rule if isinstance(gul_alloc_rule, int) else KTOOLS_ALLOC_GUL_DEFAULT
bash_params['il_alloc_rule'] = il_alloc_rule if isinstance(il_alloc_rule, int) else KTOOLS_ALLOC_IL_DEFAULT
bash_params['ri_alloc_rule'] = ri_alloc_rule if isinstance(ri_alloc_rule, int) else KTOOLS_ALLOC_RI_DEFAULT
bash_params['num_gul_per_lb'] = num_gul_per_lb if isinstance(num_gul_per_lb, int) else KTOOL_N_GUL_PER_LB
bash_params['num_fm_per_lb'] = num_fm_per_lb if isinstance(num_fm_per_lb, int) else KTOOL_N_FM_PER_LB
# Get event shuffle flags
event_shuffle_rule = event_shuffle if isinstance(event_shuffle, int) else EVE_DEFAULT_SHUFFLE
bash_params['event_shuffle'] = event_shuffle_rule
if event_shuffle_rule in EVE_SHUFFLE_OPTIONS:
bash_params['eve_shuffle_flag'] = EVE_SHUFFLE_OPTIONS[event_shuffle_rule]['eve']
bash_params['kat_sort_by_event'] = EVE_SHUFFLE_OPTIONS[event_shuffle_rule]['kat_sorting']
else:
raise OasisException(f'Error: Unknown event shuffle rule "{event_shuffle}" expected value between [0..{EVE_STD_SHUFFLE}]')
# set random num file option
use_random_number_file = False
if 'model_settings' in analysis_settings and analysis_settings['model_settings'].get('use_random_number_file'):
use_random_number_file = True
bash_params['use_random_number_file'] = use_random_number_file
# set full_correlation option
full_correlation = False
if 'full_correlation' in analysis_settings:
if _get_getmodel_cmd is None and bash_params['gul_item_stream']:
full_correlation = analysis_settings['full_correlation']
if full_correlation and gulmc:
full_correlation = False
logger.info("full_correlation has been disable as it isn't compatible with gulmc, see oasislmf correlation documentation.")
bash_params['full_correlation'] = full_correlation
# Output depends on being enabled AND having at least one summaries section
# checking output settings coherence
for mod in ['gul', 'il', 'ri', 'rl']:
if analysis_settings.get(f'{mod}_output') and not analysis_settings.get(f'{mod}_summaries'):
logger.warning(f'{mod}_output set to True but there is no {mod}_summaries')
analysis_settings[f'{mod}_output'] = False
# additional check for ri and rl, must have num_reinsurance_iterations
if not num_reinsurance_iterations:
for runtype in REINSURANCE_RUNTYPES:
if analysis_settings.get(f'{runtype}_output', False):
logger.warning(f'{runtype}_output set to True but there are no reinsurance layers')
analysis_settings[f'{runtype}_output'] = False
if not any(analysis_settings.get(f'{mod}_output') for mod in ['gul', 'il', 'ri', 'rl']):
raise OasisException('No valid output settings')
# Get perfecting values from 'analysis_settings' settings)
bash_params['analysis_settings'] = analysis_settings
bash_params['gul_output'] = analysis_settings.get('gul_output', False)
bash_params['il_output'] = analysis_settings.get('il_output', False)
bash_params['ri_output'] = analysis_settings.get('ri_output', False)
bash_params['rl_output'] = analysis_settings.get('rl_output', False)
bash_params['need_summary_fifo_for_gul'] = bash_params['gul_output'] and (
bash_params['il_output'] or bash_params['ri_output'] or bash_params['rl_output']
)
bash_params['exposure_df_engine'] = exposure_df_engine
bash_params['model_df_engine'] = model_df_engine
bash_params['dynamic_footprint'] = dynamic_footprint
return bash_params
@contextlib.contextmanager
[docs]
def bash_wrapper(
filename,
bash_trace,
stderr_guard,
log_sub_dir=None,
process_number=None,
custom_gulcalc_log_start=None,
custom_gulcalc_log_finish=None
):
# Header
print_command(filename, '#!/bin/bash')
print_command(filename, 'SCRIPT=$(readlink -f "$0") && cd $(dirname "$SCRIPT")')
print_command(filename, '')
print_command(filename, '# --- Script Init ---')
print_command(filename, 'set -euET -o pipefail')
print_command(filename, 'shopt -s inherit_errexit 2>/dev/null || echo "WARNING: Unable to set inherit_errexit. Possibly unsupported by this shell, Subprocess failures may not be detected."')
print_command(filename, '')
if process_number:
print_command(filename, f'LOG_DIR=log/{log_sub_dir}')
else:
print_command(filename, 'LOG_DIR=log')
print_command(filename, 'mkdir -p $LOG_DIR')
print_command(filename, 'rm -R -f $LOG_DIR/*')
print_command(filename, '')
# Trap func and logging
if bash_trace:
print_command(filename, BASH_TRACE)
if stderr_guard:
print_command(filename, TRAP_FUNC)
print_command(filename, get_check_function(custom_gulcalc_log_start, custom_gulcalc_log_finish))
# Script content
yield
# Script footer
if stderr_guard and not process_number:
# run process dropped check (single script run)
print_command(filename, '')
print_command(filename, 'check_complete')
elif stderr_guard and process_number:
# check stderror.err before exit (fallback check in case of short run)
print_command(filename, 'if [ -s $LOG_DIR/stderror.err ]; then')
print_command(filename, ' echo "Error detected in $LOG_DIR/stderror.err"')
print_command(filename, ' exit 1')
print_command(filename, 'fi')
# check for empty work bin files
print_command(filename, f'CHUNK_BINS=(`find {process_number}.work -name \'P{process_number}.bin\' | sort -r`)')
print_command(filename, 'echo " === Checking analysis output chunks === "')
print_command(filename, 'for b in "${CHUNK_BINS[@]}"; do')
print_command(filename, ' wc -c $b')
print_command(filename, 'done')
print_command(filename, '')
print_command(filename, '# exit error if empty')
print_command(filename, 'for b in "${CHUNK_BINS[@]}"; do')
print_command(filename, ' if [ ! -s $b ]; then')
print_command(filename, ' echo "Chunk output error: File \'$b\' is empty"')
print_command(filename, ' exit 1')
print_command(filename, ' fi')
print_command(filename, 'done')
print_command(filename, 'echo "Chunk output check [OK]"')
[docs]
def create_bash_analysis(
process_counter,
max_process_id,
num_reinsurance_iterations,
fifo_tmp_dir,
bash_trace,
filename,
_get_getmodel_cmd,
custom_args,
fmpy,
fmpy_low_memory,
fmpy_sort_output,
process_number,
remove_working_files,
model_run_dir,
fifo_queue_dir,
fifo_full_correlation_dir,
stderr_guard,
gul_item_stream,
work_dir,
work_kat_dir,
work_full_correlation_dir,
work_full_correlation_kat_dir,
output_dir,
output_full_correlation_dir,
gul_alloc_rule,
il_alloc_rule,
ri_alloc_rule,
num_gul_per_lb,
num_fm_per_lb,
event_shuffle,
eve_shuffle_flag,
kat_sort_by_event,
gul_threshold,
number_of_samples,
use_random_number_file,
full_correlation,
gul_output,
il_output,
ri_output,
rl_output,
need_summary_fifo_for_gul,
analysis_settings,
modelpy,
gulpy,
gulpy_random_generator,
gulmc,
gulmc_random_generator,
gulmc_effective_damageability,
gulmc_vuln_cache_size,
model_py_server,
peril_filter,
summarypy,
gul_legacy_stream=False,
model_df_engine='oasis_data_manager.df_reader.reader.OasisPandasReader',
dynamic_footprint=False,
**kwargs
):
process_counter = process_counter or Counter()
custom_args = custom_args or {}
# WORKAROUND: Disable load balancer if chunk number is set
# remove this limit when fixed -- need to support the load balancer + analysis chunks
if process_number is not None:
num_gul_per_lb = 0
num_fm_per_lb = 0
print_command(filename, '# --- Setup run dirs ---')
print_command(filename, '')
print_command(filename, "find output -type f -not -name '*summary-info*' -not -name '*.json' -exec rm -R -f {} +")
if full_correlation:
print_command(filename, 'mkdir -p {}'.format(output_full_correlation_dir))
print_command(filename, '')
if not fifo_tmp_dir:
if not process_number:
print_command(filename, 'rm -R -f {}*'.format(fifo_queue_dir))
else:
print_command(
filename, f"find {fifo_queue_dir} \( -name '*P{process_number}[^0-9]*' -o -name '*P{process_number}' \)" + " -exec rm -R -f {} +")
if full_correlation:
print_command(filename, 'mkdir -p {}'.format(fifo_full_correlation_dir))
# if not process_number:
print_command(filename, 'rm -R -f {}*'.format(work_dir))
# else:
# print_command(filename, f"find {work_dir} \( -name '*P{process_number}[^0-9]*' -o -name '*P{process_number}' \)" + " -exec rm -R -f {} +")
print_command(filename, 'mkdir -p {}'.format(work_kat_dir))
if full_correlation:
print_command(filename, 'mkdir -p {}'.format(work_full_correlation_dir))
print_command(
filename, 'mkdir -p {}'.format(work_full_correlation_kat_dir)
)
if model_py_server:
print_command(filename, '# --- run data server ---')
print_command(command_file=filename, cmd=f"servedata {kwargs['static_path']} {max_process_id} &")
# print_command(command_file=filename, cmd="while ! nc -vz localhost 8080 < /dev/null > /dev/null 2>&1; do")
# print_command(command_file=filename, cmd=" printf '.'")
# print_command(command_file=filename, cmd=" sleep 2")
# print_command(command_file=filename, cmd="done")
print_command(filename, '')
if fmpy:
if il_output or ri_output or rl_output:
print_command(
filename, f'#{get_fmcmd(fmpy)} -a{il_alloc_rule} --create-financial-structure-files'
)
if ri_output or rl_output:
for i in range(1, num_reinsurance_iterations + 1):
print_command(
filename, f"#{get_fmcmd(fmpy)} -a{ri_alloc_rule} --create-financial-structure-files -p {os.path.join('input', 'RI_'+str(i))}")
# Create FIFOS under /tmp/* (Windows support)
if fifo_tmp_dir:
# workaround to match bash tests
if not process_number:
if fifo_queue_dir.endswith('fifo/'):
print_command(filename, 'rm -R -f {}'.format(fifo_queue_dir[:-5]))
else:
print_command(filename, 'rm -R -f {}'.format(fifo_queue_dir))
print_command(filename, 'mkdir -p {}'.format(fifo_queue_dir))
if full_correlation:
print_command(filename, 'mkdir -p {}'.format(fifo_full_correlation_dir))
# Create workfolders
if gul_output:
create_workfolders(RUNTYPE_GROUNDUP_LOSS, analysis_settings, filename, work_dir)
if full_correlation:
create_workfolders(
RUNTYPE_GROUNDUP_LOSS, analysis_settings,
filename, work_full_correlation_dir
)
if il_output:
create_workfolders(RUNTYPE_INSURED_LOSS, analysis_settings, filename, work_dir)
if full_correlation:
create_workfolders(
RUNTYPE_INSURED_LOSS, analysis_settings,
filename, work_full_correlation_dir
)
if ri_output:
for inuring_priority in get_ri_inuring_priorities(analysis_settings, num_reinsurance_iterations):
create_workfolders(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, filename, work_dir,
inuring_priority=inuring_priority['text']
)
if full_correlation:
create_workfolders(
RUNTYPE_REINSURANCE_LOSS, analysis_settings,
filename, work_full_correlation_dir
)
if rl_output:
for inuring_priority in get_rl_inuring_priorities(num_reinsurance_iterations):
create_workfolders(
RUNTYPE_REINSURANCE_GROSS_LOSS, analysis_settings, filename,
work_dir, inuring_priority=inuring_priority['text']
)
print_command(filename, '')
# infer number of calc block and FIFO to create, (no load balancer for old stream option)
if num_gul_per_lb and num_fm_per_lb and (il_output or ri_output) and gul_item_stream:
block_process_size = num_gul_per_lb + (num_fm_per_lb * (2 if ri_output else 1))
num_lb = (max_process_id - 1) // block_process_size + 1
num_gul_output = num_lb * num_gul_per_lb
num_fm_output = num_lb * num_fm_per_lb
else:
num_lb = 0
num_gul_output = num_fm_output = max_process_id
fifo_dirs = [fifo_queue_dir]
if full_correlation:
fifo_dirs.append(fifo_full_correlation_dir)
if (il_output or ri_output) and (gul_output or not num_lb):
# create fifo for il or ri full correlation compute
do_fifos_exec(RUNTYPE_GROUNDUP_LOSS, num_gul_output, filename, fifo_full_correlation_dir,
process_number, consumer=RUNTYPE_FULL_CORRELATION)
for fifo_dir in fifo_dirs:
# create fifos for Summarycalc
if gul_output:
do_fifos_exec(RUNTYPE_GROUNDUP_LOSS, num_gul_output, filename, fifo_dir, process_number)
do_fifos_calc(RUNTYPE_GROUNDUP_LOSS, analysis_settings, num_gul_output, filename, fifo_dir, process_number)
if il_output:
do_fifos_exec(RUNTYPE_INSURED_LOSS, num_fm_output, filename, fifo_dir, process_number)
do_fifos_calc(RUNTYPE_INSURED_LOSS, analysis_settings, num_fm_output, filename, fifo_dir, process_number)
if ri_output:
for inuring_priority in get_ri_inuring_priorities(analysis_settings, num_reinsurance_iterations):
do_fifos_exec(
RUNTYPE_REINSURANCE_LOSS, num_fm_output, filename,
fifo_dir, process_number,
consumer=inuring_priority['text'].rstrip('_')
)
do_fifos_calc(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, num_fm_output,
filename, fifo_dir, process_number,
consumer_prefix=inuring_priority['text']
)
if rl_output:
for inuring_priority in get_rl_inuring_priorities(num_reinsurance_iterations):
do_fifos_exec(
RUNTYPE_REINSURANCE_GROSS_LOSS, num_fm_output, filename,
fifo_dir, process_number,
consumer=inuring_priority['text'].rstrip('_')
)
do_fifos_calc(
RUNTYPE_REINSURANCE_GROSS_LOSS, analysis_settings,
num_fm_output, filename, fifo_dir, process_number,
consumer_prefix=inuring_priority['text']
)
# create fifos for Load balancer
if num_lb:
do_fifos_exec(RUNTYPE_GROUNDUP_LOSS, num_gul_output, filename, fifo_dir, process_number, consumer=RUNTYPE_LOAD_BALANCED_LOSS)
do_fifos_exec(RUNTYPE_LOAD_BALANCED_LOSS, num_fm_output, filename, fifo_dir, process_number, consumer=RUNTYPE_INSURED_LOSS)
print_command(filename, '')
dirs = [(fifo_queue_dir, work_dir)]
if full_correlation:
dirs.append((fifo_full_correlation_dir, work_full_correlation_dir))
compute_outputs = []
for (_fifo_dir, _work_dir) in dirs: # create Summarycalc
if rl_output:
rl_computes = {
'loss_type': 'reinsurance gross',
'compute_fun': rl,
'compute_args': {
'analysis_settings': analysis_settings,
'max_process_id': num_fm_output,
'filename': filename,
'process_counter': process_counter,
'summarypy': summarypy,
'num_reinsurance_iterations': num_reinsurance_iterations,
'fifo_dir': _fifo_dir,
'work_dir': _work_dir,
'stderr_guard': stderr_guard,
'process_number': process_number
}
}
compute_outputs.append(rl_computes)
if ri_output:
ri_computes = {
'loss_type': 'reinsurance',
'compute_fun': ri,
'compute_args': {
'analysis_settings': analysis_settings,
'max_process_id': num_fm_output,
'filename': filename,
'process_counter': process_counter,
'summarypy': summarypy,
'num_reinsurance_iterations': num_reinsurance_iterations,
'fifo_dir': _fifo_dir,
'work_dir': _work_dir,
'stderr_guard': stderr_guard,
'process_number': process_number
}
}
compute_outputs.append(ri_computes)
if il_output:
il_computes = {
'loss_type': 'insured',
'compute_fun': il,
'compute_args': {
'analysis_settings': analysis_settings,
'max_process_id': num_fm_output,
'filename': filename,
'process_counter': process_counter,
'summarypy': summarypy,
'fifo_dir': _fifo_dir,
'work_dir': _work_dir,
'stderr_guard': stderr_guard,
'process_number': process_number
}
}
compute_outputs.append(il_computes)
if gul_output:
gul_computes = {
'loss_type': 'ground up',
'compute_fun': do_gul,
'compute_args': {
'analysis_settings': analysis_settings,
'max_process_id': num_gul_output,
'filename': filename,
'process_counter': process_counter,
'summarypy': summarypy,
'fifo_dir': _fifo_dir,
'work_dir': _work_dir,
'gul_legacy_stream': gul_legacy_stream,
'stderr_guard': stderr_guard,
'process_number': process_number
}
}
compute_outputs.append(gul_computes)
do_computes(compute_outputs)
print_command(filename, '')
# create all gul streams
get_gul_stream_cmds = {}
# WARNING: this probably wont work well with the load balancer (needs guard/ edit)
# for gul_id in range(1, num_gul_output + 1):
for gul_id in process_range(num_gul_output, process_number):
getmodel_args = {
'number_of_samples': number_of_samples,
'gul_threshold': gul_threshold,
'use_random_number_file': use_random_number_file,
'gul_alloc_rule': gul_alloc_rule,
'gul_legacy_stream': gul_legacy_stream,
'process_id': gul_id,
'max_process_id': num_gul_output,
'stderr_guard': stderr_guard,
'eve_shuffle_flag': eve_shuffle_flag,
'modelpy': modelpy,
'gulpy': gulpy,
'gulpy_random_generator': gulpy_random_generator,
'gulmc': gulmc,
'gulmc_random_generator': gulmc_random_generator,
'gulmc_effective_damageability': gulmc_effective_damageability,
'gulmc_vuln_cache_size': gulmc_vuln_cache_size,
'modelpy_server': model_py_server,
'peril_filter': peril_filter,
"model_df_engine": model_df_engine,
'dynamic_footprint': dynamic_footprint
}
# Establish whether items to amplifications map file is present
pla = os.path.isfile(
os.path.join(os.getcwd(), 'input/amplifications.bin')
)
# GUL coverage & item stream (Older)
gul_fifo_name = get_fifo_name(fifo_queue_dir, RUNTYPE_GROUNDUP_LOSS, gul_id)
if gul_item_stream:
getmodel_args['coverage_output'] = ''
getmodel_args['item_output'] = '-' * (not gulpy and not gulmc)
getmodel_args['item_output'] = getmodel_args['item_output'] + get_pla_cmd(
analysis_settings.get('pla', False),
analysis_settings.get('pla_secondary_factor', 1),
analysis_settings.get('pla_uniform_factor', 0)
)
if need_summary_fifo_for_gul:
getmodel_args['item_output'] = '{} | tee {}'.format(getmodel_args['item_output'], gul_fifo_name)
_get_getmodel_cmd = (_get_getmodel_cmd or get_getmodel_itm_cmd)
else:
if need_summary_fifo_for_gul:
getmodel_args['coverage_output'] = f'{gul_fifo_name}'
getmodel_args['item_output'] = '-'
elif gul_output: # only gul direct stdout to summary
getmodel_args['coverage_output'] = '-'
getmodel_args['item_output'] = ''
else: # direct stdout to il
getmodel_args['coverage_output'] = ''
getmodel_args['item_output'] = '-'
_get_getmodel_cmd = (_get_getmodel_cmd or get_getmodel_cov_cmd)
# gulcalc output file for fully correlated output
if full_correlation:
fc_gul_fifo_name = get_fifo_name(fifo_full_correlation_dir, RUNTYPE_GROUNDUP_LOSS, gul_id)
if need_summary_fifo_for_gul: # need both stream for summary and tream for il
getmodel_args['correlated_output'] = get_fifo_name(fifo_full_correlation_dir, RUNTYPE_GROUNDUP_LOSS, gul_id,
consumer=RUNTYPE_FULL_CORRELATION)
if num_lb:
tee_output = get_fifo_name(fifo_full_correlation_dir, RUNTYPE_GROUNDUP_LOSS, gul_id,
consumer=RUNTYPE_LOAD_BALANCED_LOSS)
tee_cmd = f"tee < {getmodel_args['correlated_output']} {fc_gul_fifo_name} > {tee_output} &"
print_command(filename, tee_cmd)
else:
tee_output = get_fifo_name(fifo_full_correlation_dir, RUNTYPE_GROUNDUP_LOSS, gul_id,
consumer=RUNTYPE_INSURED_LOSS)
tee_cmd = f"tee < {getmodel_args['correlated_output']} {fc_gul_fifo_name} "
get_gul_stream_cmds.setdefault(fifo_full_correlation_dir, []).append((tee_cmd, False))
elif gul_output: # only gul direct correlated_output to summary
getmodel_args['correlated_output'] = fc_gul_fifo_name
else:
if num_lb:
getmodel_args['correlated_output'] = get_fifo_name(fifo_full_correlation_dir, RUNTYPE_GROUNDUP_LOSS, gul_id,
consumer=RUNTYPE_LOAD_BALANCED_LOSS)
else:
getmodel_args['correlated_output'] = get_fifo_name(fifo_full_correlation_dir, RUNTYPE_GROUNDUP_LOSS, gul_id,
consumer=RUNTYPE_FULL_CORRELATION)
get_gul_stream_cmds.setdefault(fifo_full_correlation_dir, []).append((getmodel_args['correlated_output'], True))
else:
getmodel_args['correlated_output'] = ''
getmodel_args.update(custom_args)
getmodel_cmd = _get_getmodel_cmd(**getmodel_args)
if num_lb: # print main_cmd_gul_stream, get_gul_stream_cmds will be updated after by the main lb block
main_cmd_gul_stream = get_main_cmd_gul_stream(
getmodel_cmd, gul_id, fifo_queue_dir, stderr_guard, RUNTYPE_LOAD_BALANCED_LOSS
)
print_command(filename, main_cmd_gul_stream)
else:
get_gul_stream_cmds.setdefault(fifo_queue_dir, []).append((getmodel_cmd, False))
if num_lb: # create load balancer cmds
for fifo_dir in fifo_dirs:
get_gul_stream_cmds[fifo_dir] = [
(get_fifo_name(fifo_dir, RUNTYPE_LOAD_BALANCED_LOSS, fm_id, RUNTYPE_INSURED_LOSS), True) for
fm_id in range(1, num_fm_output + 1)]
# print the load balancing command
get_input_stream_name = partial(get_fifo_name,
fifo_dir=fifo_dir,
producer=RUNTYPE_GROUNDUP_LOSS,
consumer=RUNTYPE_LOAD_BALANCED_LOSS)
get_output_stream_name = partial(get_fifo_name,
fifo_dir=fifo_dir,
producer=RUNTYPE_LOAD_BALANCED_LOSS,
consumer=RUNTYPE_INSURED_LOSS)
for lb_main_cmd in get_main_cmd_lb(num_lb, num_gul_per_lb, num_fm_per_lb, get_input_stream_name,
get_output_stream_name, stderr_guard):
print_command(filename, lb_main_cmd)
# Establish whether step policies present
step_flag = ''
try:
pd.read_csv(
os.path.join(os.getcwd(), 'input/fm_profile.csv')
)['step_id']
except (OSError, FileNotFoundError, KeyError):
pass
else:
step_flag = ' -S'
for fifo_dir, gul_streams in get_gul_stream_cmds.items():
for i, (getmodel_cmd, from_file) in enumerate(gul_streams):
# THIS NEEDS EDIT - temp workaround for dist work chunk
if process_number is not None:
process_id = process_number
else:
process_id = i + 1
#######################################################
if ri_output or rl_output:
main_cmd = get_main_cmd_ri_stream(
getmodel_cmd,
process_id,
il_output,
il_alloc_rule,
ri_alloc_rule,
num_reinsurance_iterations,
fifo_dir,
stderr_guard,
from_file,
fmpy,
fmpy_low_memory,
fmpy_sort_output,
step_flag,
process_counter=process_counter,
ri_inuring_priorities={ip['level']: ip['text'] for ip in get_ri_inuring_priorities(
analysis_settings, num_reinsurance_iterations) if ip['level'] and ri_output},
rl_inuring_priorities={ip['level']: ip['text'] for ip in get_rl_inuring_priorities(num_reinsurance_iterations) if rl_output}
)
print_command(filename, main_cmd)
elif il_output:
main_cmd = get_main_cmd_il_stream(
getmodel_cmd, process_id, il_alloc_rule, fifo_dir,
stderr_guard,
from_file,
fmpy,
fmpy_low_memory,
fmpy_sort_output,
step_flag,
process_counter=process_counter
)
print_command(filename, main_cmd)
else:
main_cmd = get_main_cmd_gul_stream(
cmd=getmodel_cmd,
process_id=process_id,
fifo_dir=fifo_dir,
stderr_guard=stderr_guard,
process_counter=process_counter,
)
print_command(filename, main_cmd)
print_command(filename, '')
do_pwaits(filename, process_counter)
[docs]
def create_bash_outputs(
process_counter,
fifo_tmp_dir,
filename,
remove_working_files,
fifo_queue_dir,
stderr_guard,
work_dir,
work_full_correlation_dir,
output_dir,
output_full_correlation_dir,
full_correlation,
gul_output,
il_output,
ri_output,
rl_output,
analysis_settings,
num_reinsurance_iterations,
num_gul_per_lb,
num_fm_per_lb,
max_process_id,
work_kat_dir,
kat_sort_by_event,
gul_item_stream,
work_full_correlation_kat_dir,
**kwargs
):
if max_process_id is not None:
num_gul_per_lb = 0
num_fm_per_lb = 0
# infer number of calc block and FIFO to create, (no load balancer for old stream option)
if num_gul_per_lb and num_fm_per_lb and (il_output or ri_output) and gul_item_stream:
block_process_size = num_gul_per_lb + (num_fm_per_lb * (2 if ri_output else 1))
num_lb = (max_process_id - 1) // block_process_size + 1
num_gul_output = num_lb * num_gul_per_lb
num_fm_output = num_lb * num_fm_per_lb
else:
num_lb = 0
num_gul_output = num_fm_output = max_process_id
# Output Kats
if rl_output:
print_command(filename, '')
print_command(filename, '# --- Do reinsurance gross loss kats ---')
print_command(filename, '')
for inuring_priority in get_rl_inuring_priorities(num_reinsurance_iterations):
do_kats(
RUNTYPE_REINSURANCE_GROSS_LOSS, analysis_settings,
num_fm_output, filename, process_counter, work_kat_dir,
output_dir, kat_sort_by_event,
inuring_priority=inuring_priority['text']
)
if ri_output:
print_command(filename, '')
print_command(filename, '# --- Do reinsurance loss kats ---')
print_command(filename, '')
for inuring_priority in get_ri_inuring_priorities(analysis_settings, num_reinsurance_iterations):
do_kats(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, num_fm_output,
filename, process_counter, work_kat_dir, output_dir, kat_sort_by_event,
inuring_priority=inuring_priority['text']
)
if full_correlation:
print_command(filename, '')
print_command(
filename,
'# --- Do reinsurance loss kats for fully correlated output ---'
)
print_command(filename, '')
do_kats(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, num_fm_output,
filename, process_counter, work_full_correlation_kat_dir,
output_full_correlation_dir, kat_sort_by_event,
)
if il_output:
print_command(filename, '')
print_command(filename, '# --- Do insured loss kats ---')
print_command(filename, '')
do_kats(
RUNTYPE_INSURED_LOSS, analysis_settings, num_fm_output, filename,
process_counter, work_kat_dir, output_dir, kat_sort_by_event,
)
if full_correlation:
print_command(filename, '')
print_command(
filename,
'# --- Do insured loss kats for fully correlated output ---'
)
print_command(filename, '')
do_kats(
RUNTYPE_INSURED_LOSS, analysis_settings, num_fm_output,
filename, process_counter, work_full_correlation_kat_dir,
output_full_correlation_dir, kat_sort_by_event,
)
if gul_output:
print_command(filename, '')
print_command(filename, '# --- Do ground up loss kats ---')
print_command(filename, '')
do_kats(
RUNTYPE_GROUNDUP_LOSS, analysis_settings, num_gul_output, filename,
process_counter, work_kat_dir, output_dir, kat_sort_by_event,
)
if full_correlation:
print_command(filename, '')
print_command(
filename,
'# --- Do ground up loss kats for fully correlated output ---'
)
print_command(filename, '')
do_kats(
RUNTYPE_GROUNDUP_LOSS, analysis_settings, num_gul_output,
filename, process_counter, work_full_correlation_kat_dir,
output_full_correlation_dir, kat_sort_by_event,
)
do_kwaits(filename, process_counter)
# Output calcs
print_command(filename, '')
if rl_output:
for inuring_priority in get_rl_inuring_priorities(num_reinsurance_iterations):
do_post_wait_processing(
RUNTYPE_REINSURANCE_GROSS_LOSS, analysis_settings, filename,
process_counter, '', output_dir, stderr_guard,
inuring_priority=inuring_priority['text']
)
if ri_output:
for inuring_priority in get_ri_inuring_priorities(analysis_settings, num_reinsurance_iterations):
do_post_wait_processing(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, filename,
process_counter, '', output_dir, stderr_guard,
inuring_priority=inuring_priority['text']
)
if il_output:
do_post_wait_processing(
RUNTYPE_INSURED_LOSS, analysis_settings, filename, process_counter, '',
output_dir, stderr_guard
)
if gul_output:
do_post_wait_processing(
RUNTYPE_GROUNDUP_LOSS, analysis_settings, filename, process_counter, '',
output_dir, stderr_guard
)
if full_correlation:
work_sub_dir = re.sub('^work/', '', work_full_correlation_dir)
if ri_output:
do_post_wait_processing(
RUNTYPE_REINSURANCE_LOSS, analysis_settings, filename, process_counter,
work_sub_dir, output_full_correlation_dir, stderr_guard
)
if il_output:
do_post_wait_processing(
RUNTYPE_INSURED_LOSS, analysis_settings, filename, process_counter,
work_sub_dir, output_full_correlation_dir, stderr_guard
)
if gul_output:
do_post_wait_processing(
RUNTYPE_GROUNDUP_LOSS, analysis_settings, filename, process_counter,
work_sub_dir, output_full_correlation_dir, stderr_guard
)
do_awaits(filename, process_counter) # waits for aalcalc
do_lwaits(filename, process_counter) # waits for leccalc
if remove_working_files:
print_command(filename, 'rm -R -f {}'.format(os.path.join(work_dir, '*')))
if fifo_tmp_dir:
# workaround to match bash tests
if fifo_queue_dir.endswith('fifo/'):
print_command(filename, 'rm -R -f {}'.format(fifo_queue_dir[:-5]))
else:
print_command(filename, 'rm -R -f {}'.format(fifo_queue_dir))
else:
print_command(filename, 'rm -R -f {}'.format(os.path.join(fifo_queue_dir, '*')))
# ========================================================================== #
# COMPATIBILITY ONLY - used to support older model runners
# ========================================================================== #
[docs]
def genbash(
max_process_id,
analysis_settings,
num_reinsurance_iterations=0,
fifo_tmp_dir=True,
gul_alloc_rule=None,
il_alloc_rule=None,
ri_alloc_rule=None,
num_gul_per_lb=None,
num_fm_per_lb=None,
stderr_guard=True,
gul_legacy_stream=False,
bash_trace=False,
filename='run_kools.sh',
_get_getmodel_cmd=None,
custom_gulcalc_log_start=None,
custom_gulcalc_log_finish=None,
custom_args={},
fmpy=True,
fmpy_low_memory=False,
fmpy_sort_output=False,
event_shuffle=None,
modelpy=False,
gulpy=False,
gulpy_random_generator=1,
gulmc=False,
gulmc_random_generator=1,
gulmc_effective_damageability=False,
gulmc_vuln_cache_size=200,
model_py_server=False,
peril_filter=[],
summarypy=False,
base_df_engine='oasis_data_manager.df_reader.reader.OasisPandasReader',
model_df_engine=None,
dynamic_footprint=False
):
"""
Generates a bash script containing ktools calculation instructions for an
Oasis model.
:param max_process_id: The number of processes to create
:type max_process_id: int
:param analysis_settings: The analysis settings
:type analysis_settings: dict
:param filename: The output file name
:type filename: string
:param num_reinsurance_iterations: The number of reinsurance iterations
:type num_reinsurance_iterations: int
:param fifo_tmp_dir: When set to True, Create and use FIFO quese in `/tmp/[A-Z,0-9]/fifo`, if False run in './fifo'
:type fifo_tmp_dir: boolean
:param gul_alloc_rule: Allocation rule (None or 1) for gulcalc, if not set default to coverage stream
:type gul_alloc_rule: Int
:param il_alloc_rule: Allocation rule (0, 1 or 2) for fmcalc
:type il_alloc_rule: Int
:param ri_alloc_rule: Allocation rule (0, 1 or 2) for fmcalc
:type ri_alloc_rule: Int
:param num_gul_in_calc_block: number of gul in calc block
:type num_gul_in_calc_block: Int
:param num_fm_in_calc_block: number of gul in calc block
:type num_fm_in_calc_block: Int
:param get_getmodel_cmd: Method for getting the getmodel command, by default
``GenerateLossesCmd.get_getmodel_cmd`` is used.
:type get_getmodel_cmd: callable
:param base_df_engine: The engine to use when loading dataframes.
:type base_df_engine: str
:param model_df_engine: The engine to use when loading model dataframes.
:type model_df_engine: str
"""
model_df_engine = model_df_engine or base_df_engine
params = bash_params(
max_process_id=max_process_id,
analysis_settings=analysis_settings,
num_reinsurance_iterations=num_reinsurance_iterations,
fifo_tmp_dir=fifo_tmp_dir,
gul_alloc_rule=gul_alloc_rule,
il_alloc_rule=il_alloc_rule,
ri_alloc_rule=ri_alloc_rule,
num_gul_per_lb=num_gul_per_lb,
num_fm_per_lb=num_fm_per_lb,
stderr_guard=stderr_guard,
gul_legacy_stream=gul_legacy_stream,
bash_trace=bash_trace,
filename=filename,
_get_getmodel_cmd=_get_getmodel_cmd,
custom_gulcalc_log_start=custom_gulcalc_log_start,
custom_gulcalc_log_finish=custom_gulcalc_log_finish,
custom_args=custom_args,
fmpy=fmpy,
fmpy_low_memory=fmpy_low_memory,
fmpy_sort_output=fmpy_sort_output,
event_shuffle=event_shuffle,
modelpy=modelpy,
gulpy=gulpy,
gulpy_random_generator=gulpy_random_generator,
gulmc=gulmc,
gulmc_random_generator=gulmc_random_generator,
gulmc_effective_damageability=gulmc_effective_damageability,
gulmc_vuln_cache_size=gulmc_vuln_cache_size,
model_py_server=model_py_server,
peril_filter=peril_filter,
summarypy=summarypy,
model_df_engine=model_df_engine,
dynamic_footprint=dynamic_footprint
)
# remove the file if it already exists
if os.path.exists(filename):
os.remove(filename)
with bash_wrapper(
filename,
bash_trace,
stderr_guard,
custom_gulcalc_log_start=params['custom_gulcalc_log_start'],
custom_gulcalc_log_finish=params['custom_gulcalc_log_finish'],
):
create_bash_analysis(**params)
create_bash_outputs(**params)