Source code for oasislmf.lookup.factory

__all__ = [
    'KeyServerFactory',
    'BasicKeyServer'
]

import copy
import csv
import json
import os
import sys
import types
import warnings

from collections import OrderedDict
from contextlib import ExitStack

import math
import numpy as np
import pandas as pd

from ..utils.data import get_json
from ..utils.exceptions import OasisException
from ..utils.log import oasis_log
from ..utils.path import import_from_string, get_custom_module, as_path
from ..utils.status import OASIS_KEYS_STATUS

from .builtin import PerilCoveredDeterministicLookup
from .builtin import Lookup as NewLookup

try:
    from billiard import cpu_count, Queue, Process
except ImportError:
    from multiprocessing import cpu_count, Queue, Process

from queue import Empty, Full

# add pickling support for traceback object
import tblib.pickling_support

tblib.pickling_support.install()


def with_error_queue(fct):
    def wrapped_fct(error_queue, *args, **kwargs):
        try:
            return fct(error_queue, *args, **kwargs)
        except Exception:
            error_queue.put(sys.exc_info())

    return wrapped_fct


[docs] class KeyServerFactory(object): """ A factory class to create the Keys Server that will be use to generate the keys files All Key Server must implement the interface defined in lookup.interface.KeyServerInterface Oasis provides a built-in Key Server that manage the generation of the key files from the key provided by a built-in or a custom Key Lookup. The factory now return a KeyServer object and not a KeyLookup. The parameter to pass has also been simplified usage of all the below parameter are now deprecated - complex_lookup_config_fp => pass the path to your complex lookup config directly in lookup_config_fg - lookup_module_path => set as key 'lookup_module_path' in the lookup config - model_keys_data_path => set as key 'keys_data_path' in the lookup config - model_version_file_path => set the model information ('supplier_id', 'model_id', 'model_version') directly into the config """ @classmethod
[docs] def get_config(cls, config_fp): return as_path(os.path.dirname(config_fp), 'config_fp'), get_json(config_fp)
@classmethod
[docs] def get_model_info(cls, model_version_file_path): """ Get model information from the model version file. """ model_version_file_path = as_path(model_version_file_path, 'model_version_file_path', preexists=True, null_is_valid=False) with open(model_version_file_path, 'r', encoding='utf-8') as f: return next(csv.DictReader( f, fieldnames=['supplier_id', 'model_id', 'model_version'] ))
@classmethod
[docs] def update_deprecated_args(cls, config_dir, config, complex_lookup_config_fp, model_keys_data_path, model_version_file_path, lookup_module_path): if (complex_lookup_config_fp or model_keys_data_path or model_version_file_path or lookup_module_path): warnings.warn('usage of complex_lookup_config_fp, model_keys_data_path, ' 'model_version_file_path and lookup_module_path is now deprecated' 'those variables now need to be set in lookup config see (key server documentation)') if complex_lookup_config_fp: config_dir, config = cls.get_config(complex_lookup_config_fp) if model_keys_data_path: config['keys_data_path'] = as_path(model_keys_data_path, 'model_keys_data_path', preexists=True) if model_version_file_path: config['model'] = cls.get_model_info(model_version_file_path) if lookup_module_path: config['lookup_module_path'] = lookup_module_path return config_dir, config
@classmethod
[docs] def create( cls, model_keys_data_path=None, model_version_file_path=None, lookup_module_path=None, lookup_config=None, lookup_config_json=None, lookup_config_fp=None, complex_lookup_config_fp=None, user_data_dir=None, output_directory=None, ): """ Creates a keys lookup class instance for the given model and supplier - local file paths are required for the model keys data folder, the model version file and the Git repository for the model keys server. Returns a pair ``(model_info, klc)``, where ``model_info`` is a dictionary holding model information from the model version file and `klc` is the lookup service class instance for the model. """ if lookup_config: config_dir = '.' config = lookup_config elif lookup_config_json: config_dir = '.' config = json.loads(lookup_config_json) elif lookup_config_fp: config_dir, config = cls.get_config(lookup_config_fp) else: # no config config_dir, config = '.', {} if not config: config_dir, config = cls.update_deprecated_args(config_dir, config, complex_lookup_config_fp, model_keys_data_path, model_version_file_path, lookup_module_path) else: # reproduce lookup_config overwrite complex_lookup_config_fp if complex_lookup_config_fp: complex_config_dir, complex_config = cls.get_config(complex_lookup_config_fp) config['complex_config_dir'] = complex_config_dir config['complex_config'] = complex_config complex_lookup_config_fp = None if config.get('key_server_module_path'): _KeyServer_module = get_custom_module(config.get('key_server_module_path'), 'key_server_module_path') _KeyServer = getattr(_KeyServer_module, '{}KeysServer'.format(config['model']['model_id'])) else: _KeyServer = BasicKeyServer if _KeyServer.interface_version == '1': key_server = _KeyServer(config, config_dir=config_dir, user_data_dir=user_data_dir, output_dir=output_directory) else: raise OasisException(f"KeyServer interface version {_KeyServer.interface_version} not implemented") if complex_lookup_config_fp: key_server.complex_lookup_config_fp = complex_lookup_config_fp return config['model'], key_server
[docs] class BasicKeyServer: """ A basic implementation of the KeyServerInterface will load the KeyLookup class from config['lookup_module_path'] if present or used the built-in KeyLookup KeyLookup must implement the KeyLookupInterface will provide a multiprocess solution if KeyLoopup implement the process_locations_multiproc method both single and multiprocess solutions will use low amount of memory as they process the key by chunk of limited size. This class implement all the file writing method that were previously handled by the lookup factory """
[docs] interface_version = "1"
[docs] valid_format = ['oasis', 'json']
[docs] error_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('status', 'Status'), ('message', 'Message'), ])
[docs] model_data_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('model_data', 'ModelData'), ])
[docs] dynamic_model_data_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('area_peril_id', 'AreaPerilID'), ('vulnerability_id', 'VulnerabilityID'), ('intensity_adjustment', 'IntensityAdjustment'), ('return_period', 'ReturnPeriod'), ('section_id', 'section_id'), ])
[docs] model_data_with_amplification_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('model_data', 'ModelData'), ('amplification_id', 'AmplificationID') ])
[docs] dynamic_model_with_amplification_data_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('area_peril_id', 'AreaPerilID'), ('vulnerability_id', 'VulnerabilityID'), ('intensity_adjustment', 'IntensityAdjustment'), ('return_period', 'ReturnPeriod'), ('section_id', 'section_id'), ('amplification_id', 'AmplificationID') ])
[docs] key_success_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('area_peril_id', 'AreaPerilID'), ('vulnerability_id', 'VulnerabilityID'), ])
[docs] key_success_with_amplification_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('area_peril_id', 'AreaPerilID'), ('vulnerability_id', 'VulnerabilityID'), ('amplification_id', 'AmplificationID') ])
[docs] key_success_with_message_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('area_peril_id', 'AreaPerilID'), ('vulnerability_id', 'VulnerabilityID'), ('message', 'Message') ])
[docs] key_success_with_amplification_and_message_heading_row = OrderedDict([ ('loc_id', 'LocID'), ('peril_id', 'PerilID'), ('coverage_type', 'CoverageTypeID'), ('area_peril_id', 'AreaPerilID'), ('vulnerability_id', 'VulnerabilityID'), ('amplification_id', 'AmplificationID'), ('message', 'Message') ])
[docs] min_bloc_size = 1000
[docs] max_bloc_size = 10000
def __init__(self, config, config_dir=None, user_data_dir=None, output_dir=None):
[docs] self.config = config
[docs] self.config_dir = config_dir or '.'
[docs] self.user_data_dir = user_data_dir
[docs] self.output_dir = output_dir
[docs] self.lookup_cls = self.get_lookup_cls()
[docs] def get_lookup_cls(self): if self.config.get('lookup_class'): lookup_cls = import_from_string(self.config.get('lookup_class')) elif self.config.get('lookup_module'): lookup_module = import_from_string(self.config.get('lookup_module')) lookup_cls = getattr(lookup_module, '{}KeysLookup'.format(self.config['model']['model_id'])) elif self.config.get('lookup_module_path'): lookup_module_path = self.config.get('lookup_module_path') if not os.path.isabs(lookup_module_path): lookup_module_path = os.path.join(self.config_dir, lookup_module_path) lookup_module = get_custom_module(lookup_module_path, 'lookup_module_path') lookup_cls = getattr(lookup_module, '{}KeysLookup'.format(self.config['model']['model_id'])) else: # built-in lookup if self.config.get('builtin_lookup_type') == 'peril_covered_deterministic': lookup_cls = PerilCoveredDeterministicLookup elif self.config.get('builtin_lookup_type') == 'new_lookup': lookup_cls = NewLookup else: raise OasisException("Unrecognised lookup config file, or config file is from deprecated built in lookup module 'oasislmf<=1.16.0' ") return lookup_cls
@staticmethod
[docs] def create_lookup(lookup_cls, config, config_dir, user_data_dir, output_dir, lookup_id): lookup_config = copy.deepcopy(config) lookup_config['lookup_id'] = lookup_id lookup_interface_version = getattr(lookup_cls, 'interface_version', '0') if lookup_interface_version == '1': return lookup_cls(config, config_dir=config_dir, user_data_dir=user_data_dir, output_dir=output_dir) elif lookup_interface_version == '0': warnings.warn('OasisLookupInterface (or OasisBaseKeysLookup) is now deprecated' ' Interface for lookup is now lookup.interface.LookupInterface' ' for similar functionality use lookup.base.AbstractBasicKeyLookup' ' for multiprocess implementation add lookup.base.MultiprocLookupMixin') if not (config and output_dir): return lookup_cls( keys_data_directory=config.get('keys_data_path'), supplier=config['model']['supplier_id'], model_name=config['model']['model_id'], model_version=config['model']['model_version'], ) elif not user_data_dir: return lookup_cls( keys_data_directory=config.get('keys_data_path'), supplier=config['model']['supplier_id'], model_name=config['model']['model_id'], model_version=config['model']['model_version'], complex_lookup_config_fp=config_dir, output_directory=output_dir ) else: return lookup_cls( keys_data_directory=config.get('keys_data_path'), supplier=config['model']['supplier_id'], model_name=config['model']['model_id'], model_version=config['model']['model_version'], complex_lookup_config_fp=config_dir, user_data_dir=user_data_dir, output_directory=output_dir ) else: raise OasisException(f"lookup interface version {lookup_interface_version} not implemented")
[docs] def get_locations(self, location_fp): """load exposure data from location_fp and return the exposure dataframe""" raise NotImplementedError('oasislmf now use ods_tools to pass location to the KeyServer. ' 'this method need to be implemented' 'if you want to provide you own loader from filepath')
@staticmethod @with_error_queue
[docs] def location_producer(error_queue, loc_df, part_count, loc_queue): loc_ids_parts = np.array_split(np.unique(loc_df['loc_id']), part_count) loc_df_parts = (loc_df[loc_df['loc_id'].isin(loc_ids_parts[i])] for i in range(part_count)) loc_df_part = True while loc_df_part is not None: loc_df_part = next(loc_df_parts, None) while error_queue.empty(): try: loc_queue.put(loc_df_part, timeout=5) break except Full: pass else: return
@staticmethod @with_error_queue
[docs] def lookup_multiproc_worker(error_queue, lookup_cls, config, config_dir, user_data_dir, output_dir, lookup_id, loc_queue, key_queue): lookup = BasicKeyServer.create_lookup(lookup_cls, config, config_dir, user_data_dir, output_dir, lookup_id) while True: while error_queue.empty(): try: loc_df_part = loc_queue.get(timeout=5) break except Empty: pass else: return if loc_df_part is None: loc_queue.put(None) key_queue.put(None) break while error_queue.empty(): try: key_queue.put(lookup.process_locations_multiproc(loc_df_part), timeout=5) break except Full: pass else: return
@staticmethod
[docs] def key_producer(key_queue, error_queue, worker_count): finished_workers = 0 while finished_workers < worker_count and error_queue.empty(): while error_queue.empty(): try: res = key_queue.get(timeout=5) break except Empty: pass else: break if res is None: finished_workers += 1 else: yield res
[docs] def get_success_heading_row(self, keys, keys_success_msg): has_amplification_id = 'amplification_id' in keys is_dynamic = 'intensity_adjustment' in keys and 'return_period' in keys is_complex = 'model_data' in keys if is_complex and has_amplification_id: return self.model_data_with_amplification_heading_row elif is_complex: return self.model_data_heading_row elif has_amplification_id and is_dynamic: return self.dynamic_model_with_amplification_data_heading_row elif is_dynamic: return self.dynamic_model_data_heading_row elif keys_success_msg and has_amplification_id: return self.key_success_with_amplification_and_message_heading_row elif keys_success_msg: return self.key_success_with_message_heading_row elif has_amplification_id: return self.key_success_with_amplification_heading_row else: return self.key_success_heading_row
[docs] def write_json_keys_file(self, results, keys_success_msg, successes_fp, errors_fp): # no streaming implementation for json format results = pd.concat((r for r in results if not r.empty)) success = results['status'] == OASIS_KEYS_STATUS['success']['id'] success_df = results[success] success_df.to_json(successes_fp, orient='records', indent=4, force_ascii=False) successes_count = success_df.shape[0] if errors_fp: errors_df = results[~success] errors_df.to_json(errors_fp, orient='records', indent=4, force_ascii=False) error_count = errors_df.shape[0] else: error_count = 0 return successes_count, error_count
[docs] def write_oasis_keys_file(self, results, keys_success_msg, successes_fp, errors_fp): with ExitStack() as stack: successes_file = stack.enter_context(open(successes_fp, 'w', encoding='utf-8')) if errors_fp: errors_file = stack.enter_context(open(errors_fp, 'w', encoding='utf-8')) errors_file.write(','.join(self.error_heading_row.values()) + '\n') else: errors_file = None success_heading_row = None successes_count = 0 error_count = 0 for i, result in enumerate(results): success = result['status'] == OASIS_KEYS_STATUS['success']['id'] success_df = result[success] if not success_df.empty: if success_heading_row is None: success_heading_row = self.get_success_heading_row(result.columns, keys_success_msg) write_header = True else: write_header = False success_df[success_heading_row.keys()].rename(columns=success_heading_row ).to_csv(successes_file, index=False, header=write_header) successes_count += success_df.shape[0] if errors_file: errors_df = result[~success] if 'message' not in errors_df.columns: errors_df['message'] = "" # If no error message column, fill with blank to prevent KeyError errors_df[self.error_heading_row.keys()].rename(columns=self.error_heading_row ).to_csv(errors_file, index=False, header=False) error_count += errors_df.shape[0] return successes_count, error_count
[docs] def write_keys_file(self, results, successes_fp, errors_fp, output_format, keys_success_msg): if output_format not in self.valid_format: raise OasisException(f"Unrecognised lookup file output format {output_format} - valid formats are {self.valid_format}") write = getattr(self, f'write_{output_format}_keys_file') successes_count, error_count = write(results, keys_success_msg, successes_fp, errors_fp) if errors_fp: return successes_fp, successes_count, errors_fp, error_count else: return successes_fp, successes_count
[docs] def generate_key_files_singleproc(self, loc_df, successes_fp, errors_fp, output_format, keys_success_msg, **kwargs): if getattr(self, 'complex_lookup_config_fp', None): # backward compatibility 1.15 hack config_dir = getattr(self, 'complex_lookup_config_fp', None) else: config_dir = self.config_dir lookup = self.create_lookup(self.lookup_cls, self.config, config_dir, self.user_data_dir, self.output_dir, lookup_id=None) key_results = lookup.process_locations(loc_df) def gen_results(results): if isinstance(results, pd.DataFrame): yield results elif isinstance(results, (list, tuple)): yield pd.DataFrame(results) elif isinstance(results, types.GeneratorType): results_part = pd.DataFrame.from_records(results, nrows=self.max_bloc_size) while not results_part.empty: yield results_part results_part = pd.DataFrame.from_records(results, nrows=self.max_bloc_size) else: raise OasisException("Unrecognised type for results: {type(results)}. expected ") return self.write_keys_file(gen_results(key_results), successes_fp=successes_fp, errors_fp=errors_fp, output_format=output_format, keys_success_msg=keys_success_msg, )
[docs] def generate_key_files_multiproc(self, loc_df, successes_fp, errors_fp, output_format, keys_success_msg, num_cores, num_partitions, **kwargs): """ Process and return the lookup results a location row Used in multiprocessing based query location_row is of type <class 'pandas.core.series.Series'> """ if getattr(self, 'complex_lookup_config_fp', None): # backward compatibility 1.15 hack config_dir = getattr(self, 'complex_lookup_config_fp', None) else: config_dir = self.config_dir pool_count = num_cores if num_cores > 0 else cpu_count() if num_partitions > 0: part_count = num_partitions else: bloc_size = min(max(math.ceil(loc_df.shape[0] / pool_count), self.min_bloc_size), self.max_bloc_size) part_count = math.ceil(loc_df.shape[0] / bloc_size) pool_count = min(pool_count, part_count) if pool_count <= 1: return self.generate_key_files_singleproc(loc_df, successes_fp, errors_fp, output_format, keys_success_msg) loc_queue = Queue(maxsize=pool_count) key_queue = Queue(maxsize=pool_count) error_queue = Queue() location_producer = Process(target=self.location_producer, args=(error_queue, loc_df, part_count, loc_queue)) workers = [Process(target=self.lookup_multiproc_worker, args=(error_queue, self.lookup_cls, self.config, config_dir, self.user_data_dir, self.output_dir, lookup_id, loc_queue, key_queue)) for lookup_id in range(pool_count)] location_producer.start() [worker.start() for worker in workers] try: return self.write_keys_file(self.key_producer(key_queue, error_queue, worker_count=pool_count), successes_fp=successes_fp, errors_fp=errors_fp, output_format=output_format, keys_success_msg=keys_success_msg, ) except Exception: error_queue.put(sys.exc_info()) finally: for process in [location_producer] + workers: if process.is_alive(): process.terminate() process.join() loc_queue.close() key_queue.close() if not error_queue.empty(): exc_info = error_queue.get() raise exc_info[0].with_traceback(exc_info[1], exc_info[2])
@oasis_log()
[docs] def generate_key_files( self, location_fp=None, successes_fp=None, errors_fp=None, output_format='oasis', keys_success_msg=False, multiproc_enabled=True, multiproc_num_cores=-1, multiproc_num_partitions=-1, location_df=None, **kwargs ): """ generate key files by calling: 1. get_locations to get a location object from the location_fp 2. process_locations or process_locations_multiproc to get results object from the locations object 3. write_keys_file to writes the relevant files from the results object """ successes_fp = as_path(successes_fp, 'successes_fp', preexists=False) errors_fp = as_path(errors_fp, 'errors_fp', preexists=False) if location_df is not None: locations = location_df else: locations = self.get_locations(location_fp) # need overwrite as not supported anymore we pass the df if multiproc_enabled and hasattr(self.lookup_cls, 'process_locations_multiproc'): return self.generate_key_files_multiproc(locations, successes_fp=successes_fp, errors_fp=errors_fp, output_format=output_format, keys_success_msg=keys_success_msg, num_cores=multiproc_num_cores, num_partitions=multiproc_num_partitions) else: return self.generate_key_files_singleproc(locations, successes_fp=successes_fp, errors_fp=errors_fp, output_format=output_format, keys_success_msg=keys_success_msg, )