Source code for oasislmf.computation.run.platform

__all__ = [
    'PlatformBase',
    'PlatformList',
    'PlatformGet',
    'PlatformRun',
    'PlatformDelete',
]


import getpass
import io
import os
import json

from tabulate import tabulate
from mimetypes import guess_extension

from requests.exceptions import HTTPError

from ...platform_api.client import APIClient
from ...utils.exceptions import OasisException
from ...utils.defaults import API_EXAMPLE_AUTH

from ..base import ComputationStep


[docs] class PlatformBase(ComputationStep): """ Base platform class to handle opening a client connection """
[docs] step_params = [ {'name': 'server_login_json', 'is_path': True, 'pre_exist': False, 'help': 'Source location CSV file path'}, {'name': 'server_url', 'default': 'http://localhost:8000', 'help': 'URL to Oasis Platform server, default is localhost'}, {'name': 'server_version', 'default': 'v2', 'help': "Version prefix for OasisPlatform server, 'v1' = single server run, 'v2' = distributed on cluster"}, ]
def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] self.server = self.open_connection()
[docs] def load_credentials(self, login_arg): """ Load credentials from JSON file or Prompt for username / password Options: 1.'--server-login ./APIcredentials.json' 2. Load credentials from default config file '-C oasislmf.json' """ if isinstance(login_arg, str): with io.open(login_arg, encoding='utf-8') as f: return json.load(f) self.logger.info('API Login:') api_login = {} api_login['username'] = input('Username: ') api_login['password'] = getpass.getpass('Password: ') return api_login
[docs] def open_connection(self): try: # If no password given try the reference example return APIClient( api_url=self.server_url, api_ver=self.server_version, username=API_EXAMPLE_AUTH['user'], password=API_EXAMPLE_AUTH['pass'], ) except OasisException as e: if isinstance(e.original_exception, HTTPError): # Prompt for password and try to re-autehnticate self.logger.info("-- Authentication Required --") credentials = self.load_credentials(self.server_login_json) self.logger.info(f'Connecting to - {self.server_url}') return APIClient( api_url=self.server_url, api_ver=self.server_version, username=credentials['username'], password=credentials['password'], )
[docs] def tabulate_json(self, json_data, items): table_data = dict() for i in items: table_data[i] = list() for m in json_data: for k in table_data: # will have link+data if dict returned if isinstance(m[k], dict): table_data[k].append('Yes') # If none then no data elif m[k] is None: table_data[k].append('-') # If URL then something linked to field elif isinstance(m[k], str): if any(v in m[k] for v in ['http://', 'https://']): table_data[k].append('Linked') else: table_data[k].append(m[k]) # Fallback - add value as string else: table_data[k].append(str(m[k])) return table_data
[docs] def print_endpoint(self, attr, items): endpoint_obj = getattr(self.server, attr) self.logger.info(f'\nAvailable {attr}:') data = self.tabulate_json(endpoint_obj.get().json(), items) self.logger.info(tabulate(data, headers=items, tablefmt='psql')) return data
[docs] def select_id(self, msg, valid_ids): while True: try: value = str(input(f'Select {msg} ID: ')) except ValueError: self.logger.info('Invalid Response: {}'.format(value)) continue except KeyboardInterrupt: return -1 if value not in valid_ids: self.logger.info(f'id {value} not among the valid ids: {valid_ids} - ctrl-c to exit') continue else: break return int(value)
[docs] class PlatformList(PlatformBase): """ Return status and details from an Oasis Platform API server """
[docs] step_params = PlatformBase.step_params + [ {'name': 'models', 'flag': '-m', 'type': int, 'nargs': '+', 'help': 'List of model ids to print in detail'}, {'name': 'portfolios', 'flag': '-p', 'type': int, 'nargs': '+', 'help': 'List of portfolio ids to print in detail'}, {'name': 'analyses', 'flag': '-a', 'type': int, 'nargs': '+', 'help': 'List of analyses ids to print in detail'}, ]
[docs] def run(self): # Default to printing summary of API status if not any([self.models, self.portfolios, self.analyses]): self.print_endpoint('models', ['id', 'supplier_id', 'model_id', 'version_id']) self.print_endpoint('portfolios', ['id', 'name', 'location_file', 'accounts_file', 'reinsurance_info_file', 'reinsurance_scope_file']) self.print_endpoint('analyses', ['id', 'name', 'model', 'portfolio', 'status', 'input_file', 'output_file', 'run_log_file']) if self.models: for Id in self.models: msg = f'Model (id={Id}): \n' try: rsp = self.server.models.get(Id) self.logger.info(msg + json.dumps(rsp.json(), indent=4, sort_keys=True)) except HTTPError as e: self.logger.info(msg + e.response.text) if self.portfolios: for Id in self.portfolios: msg = f'Portfolio (id={Id}): \n' try: rsp = self.server.portfolios.get(Id) self.logger.info(msg + json.dumps(rsp.json(), indent=4, sort_keys=True)) except HTTPError as e: self.logger.info(msg + e.response.text) if self.analyses: for Id in self.analyses: msg = f'Analysis (id={Id}): \n' try: rsp = self.server.analyses.get(Id) self.logger.info(msg + json.dumps(rsp.json(), indent=4, sort_keys=True)) except HTTPError as e: self.logger.info(msg + e.response.text)
class PlatformRunInputs(PlatformBase): """ run generate inputs via the Oasis Platoform API """ step_params = PlatformBase.step_params + [ {'name': 'model_id', 'type': int, 'help': 'API `id` of a model to run an analysis with'}, {'name': 'portfolio_id', 'type': int, 'help': 'API `id` of a portfolio to run an analysis with'}, {'name': 'analysis_id', 'type': int, 'help': 'API `id` of an analysis to run'}, {'name': 'analysis_settings_json', 'flag': '-a', 'is_path': True, 'pre_exist': True, 'help': 'Analysis settings JSON file path'}, {'name': 'oed_location_csv', 'flag': '-x', 'is_path': True, 'pre_exist': True, 'help': 'Source location CSV file path'}, {'name': 'oed_accounts_csv', 'flag': '-y', 'is_path': True, 'pre_exist': True, 'help': 'Source accounts CSV file path'}, {'name': 'oed_info_csv', 'flag': '-i', 'is_path': True, 'pre_exist': True, 'help': 'Reinsurance info. CSV file path'}, {'name': 'oed_scope_csv', 'flag': '-s', 'is_path': True, 'pre_exist': True, 'help': 'Reinsurance scope CSV file path'}, ] def run(self): # Run Input geneneration from ID if self.analysis_id: try: status = self.server.analyses.status(self.analysis_id) if status in ['RUN_QUEUED', 'RUN_STARTED']: self.server.cancel_analysis(self.analysis_id) elif status in ['INPUTS_GENERATION_QUEUED', 'INPUTS_GENERATION_STARTED']: self.server.cancel_generate(self.analysis_id) self.server.run_generate(self.analysis_id) return self.analysis_id except HTTPError as e: raise OasisException(f'Error running analysis ({self.analysis_id}) - {e}') # Create Portfolio and Ananlysis, then run if not (self.portfolio_id or self.oed_location_csv or self.oed_accounts_csv): raise OasisException('Error: At least one of the following inputs is required [portfolio_id, oed_location_csv, oed_accounts_csv]') # when no model is selected prompt user for choice if not self.model_id: models = self.server.models.get().json() model_count = len(models) if model_count < 1: raise OasisException(f'No models found in API: {self.server_url}') if model_count == 1: self.model_id = models[0]['id'] if model_count > 1: models_table = self.print_endpoint('models', ['id', 'supplier_id', 'model_id', 'version_id']) self.model_id = self.select_id('models', models_table['id']) if self.model_id < 0: raise OasisException(' Model selection cancelled') # Select or create a portfilo if self.portfolio_id: portfolios = self.server.portfolios.get().json() if self.portfolio_id not in [p['id'] for p in portfolios]: raise OasisException(f'Portfolio "{self.portfolio_id}" not found in API: {self.server_url}') else: portfolio = self.server.upload_inputs( portfolio_id=None, location_fp=self.oed_location_csv, accounts_fp=self.oed_accounts_csv, ri_info_fp=self.oed_info_csv, ri_scope_fp=self.oed_scope_csv, ) self.portfolio_id = portfolio['id'] analysis = self.server.create_analysis( portfolio_id=self.portfolio_id, model_id=self.model_id, analysis_settings_fp=self.analysis_settings_json, ) self.analysis_id = analysis['id'] # Execure run self.server.run_generate(self.analysis_id) return self.analysis_id class PlatformRunLosses(PlatformBase): """ run generate losses via the Oasis Platoform API """ step_params = PlatformBase.step_params + [ {'name': 'analysis_id', 'type': int, 'required': True, 'help': 'API `id` of an analysis to run'}, {'name': 'output_dir', 'flag': '-o', 'is_path': True, 'pre_exist': True, 'help': 'Output data directory for results data (absolute or relative file path)', 'default': './'}, {'name': 'analysis_settings_json', 'flag': '-a', 'is_path': True, 'pre_exist': True, 'help': 'Analysis settings JSON file path'}, ] def run(self): self.server.run_analysis(self.analysis_id, self.analysis_settings_json) self.server.download_output(self.analysis_id, self.output_dir)
[docs] class PlatformRun(PlatformBase): """ End to End - run model via the Oasis Platoform API """
[docs] chained_commands = [PlatformRunInputs, PlatformRunLosses]
[docs] def run(self): self.kwargs['analysis_id'] = PlatformRunInputs(**self.kwargs).run() PlatformRunLosses(**self.kwargs).run()
[docs] class PlatformDelete(PlatformBase): """ Delete either a 'model', 'portfolio' or an 'analysis' from the API's Database """
[docs] step_params = PlatformBase.step_params + [ {'name': 'models', 'flag': '-m', 'type': int, 'nargs': '+', 'help': 'List of model ids to Delete.'}, {'name': 'portfolios', 'flag': '-p', 'type': int, 'nargs': '+', 'help': 'List of Portfolio ids to Delete'}, {'name': 'analyses', 'flag': '-a', 'type': int, 'nargs': '+', 'help': 'List of Analyses ids to Detele'}, ]
[docs] def delete_list(self, attr, id_list): if not all(isinstance(ID, int) for ID in id_list): raise OasisException(f"Invalid input, '{attr}', must be a list of type Int, not {id_list}") api_endpoint = getattr(self.server, attr) for Id in id_list: try: if api_endpoint.delete(Id).ok: self.logger.info(f'Deleted {attr}_id={Id}') except HTTPError as e: self.logger.error('Delete error {}_id={} - {}'.format(attr, Id, e)) continue
[docs] def run(self): if not any([self.models, self.portfolios, self.analyses]): raise OasisException("""Select item(s) to delete, list of either: --models MODEL_ID [MODEL_ID ... n], --portfolios PORTFOLIOS_ID [PORTFOLIO_ID ... n] --analyses ANALYSES_ID [ANALYSES_ID ... n] """) if self.models: self.delete_list('models', self.models) if self.portfolios: self.delete_list('portfolios', self.portfolios) if self.analyses: self.delete_list('analyses', self.analyses)
[docs] class PlatformGet(PlatformBase): """ Download file(s) from the api """
[docs] step_params = PlatformBase.step_params + [ {'name': 'output_dir', 'flag': '-o', 'is_path': True, 'pre_exist': True, 'help': 'Output data directory for results data (absolute or relative file path)', 'default': './'}, # Files for models object {'name': 'model_settings', 'type': int, 'nargs': '+', 'help': 'Model ids to download settings file.'}, {'name': 'model_versions', 'type': int, 'nargs': '+', 'help': 'Model ids to download versions file'}, # Files from portfolio {'name': 'portfolio_location_file', 'type': int, 'nargs': '+', 'help': 'Portfolio ids to download Location file'}, {'name': 'portfolio_accounts_file', 'type': int, 'nargs': '+', 'help': 'Portfolio ids to download Accounts file'}, {'name': 'portfolio_reinsurance_scope_file', 'type': int, 'nargs': '+', 'help': 'Portfolio ids to download RI scope file.'}, {'name': 'portfolio_reinsurance_info_file', 'type': int, 'nargs': '+', 'help': 'Portfolio ids to download RI info file.'}, # Files from an analyses {'name': 'analyses_settings_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download settings file'}, {'name': 'analyses_run_traceback_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download traceback logs'}, {'name': 'analyses_run_log_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download Ktools run logs'}, {'name': 'analyses_input_generation_traceback_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download input_generation traceback logs'}, {'name': 'analyses_input_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download Generated inputs tar'}, {'name': 'analyses_output_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download Output losses tar'}, {'name': 'analyses_summary_levels_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download summary levels file'}, {'name': 'analyses_lookup_validation_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download exposure summary'}, {'name': 'analyses_lookup_success_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download successful lookups'}, {'name': 'analyses_lookup_errors_file', 'type': int, 'nargs': '+', 'help': 'Analyses ids to download summary of failed lookups'}, ]
[docs] def extract_args(self, param_suffix): return {k.replace(param_suffix, ''): v for k, v in self.kwargs.items() if v and param_suffix in k}
[docs] def download(self, collection, req_files, chuck_size=1024): collection_obj = getattr(self.server, collection) for File in req_files: resource = getattr(collection_obj, File) for ID in req_files[File]: try: r = resource.get(ID) ext = guess_extension(r.headers['content-type'].partition(';')[0].strip()) filename = os.path.join(self.output_dir, f'{ID}_{collection}_{File}{ext}') with io.open(filename, 'wb') as f: for chunk in r.iter_content(chunk_size=chuck_size): f.write(chunk) self.logger.info(f'Downloaded: {File} from {collection}_id={ID} "{filename}"') except HTTPError as e: self.logger.error('Download failed: - {}'.format(e))
[docs] def run(self): model_files = self.extract_args('model_') portfolio_files = self.extract_args('portfolio_') analyses_files = self.extract_args('analyses_') # Check that at least one option is given if not any([model_files, portfolio_files, analyses_files]): raise OasisException('Select file for download e.g. "--analyses_output <id_1> .. <id_n>"') if model_files: self.download('models', model_files) if portfolio_files: self.download('portfolios', portfolio_files) if analyses_files: self.download('analyses', analyses_files)