Source code for oasislmf.pytools.getmodel.vulnerability

"""
This file houses a series of functions that load, convert, and save vulnerability data with Parquet files.
"""
import argparse
import json
import logging
import os
import pathlib
from math import ceil
from typing import List, Tuple

import numba as nb
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

from oasislmf.pytools.common.data import oasis_float

from .common import Vulnerability

[docs] logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
[docs] vulnerability_filename = 'vulnerability.bin'
[docs] vulnerability_dataset = "vulnerability_dataset.parquet"
[docs] parquetvulnerability_meta_filename = "vulnerability_parquet_meta.json"
[docs] vulnerability_parquet_filename = 'part_{}.parquet'
[docs] vulnerability_bloc_size = int(os.environ.get('VULNERABILITY_BLOC_SIZE', 1e8)) # Bytes
@nb.njit(cache=True)
[docs] def get_vuln_info(vulns_bin) -> Tuple[int, int, int]: """ Extracts meta data from the vulnerability data. Args: vulns_bin: (List[Vulnerability]) vulnerability data from the file Returns: (Tuple[int, int, int]) number of vulnerability IDs, number of intensity bins, number of damage bins """ vulnerability_ids_set = set() num_intensity_bins = 0 num_damage_bins = 0 for vuln_i in range(vulns_bin.shape[0]): vuln = vulns_bin[vuln_i] vulnerability_ids_set.add(vuln['vulnerability_id']) if num_intensity_bins < vuln['intensity_bin_id']: num_intensity_bins = vuln['intensity_bin_id'] if num_damage_bins < vuln['damage_bin_id']: num_damage_bins = vuln['damage_bin_id'] return len(vulnerability_ids_set), num_intensity_bins, num_damage_bins
@nb.njit()
[docs] def get_array(vulns_bin, num_intensity_bins, num_damage_bins, max_vulnerability_id_size) -> Tuple[List[int], List[List[List[float]]]]: """ Flattens the vulnerability data into a one-dimensional array for each vulnerability ID. NOTE => Numba: cannot cache generator for the moment to work properlly, data on the same vulnerability_id must all be in one block Args: vulns_bin: (List[Vulnerability]) vulnerability data from file num_intensity_bins: (int) the number of intensity bins in the data num_damage_bins: (int) the number of damage bins in the data max_vulnerability_id_size: (int) the size of the vulnerability_ids array inside the function, this will be the size of the vulnerability IDs array that is returned Returns: (Tuple[List[int], List[List[List[float]]]]) array of vulnerability IDs, vulnerability data """ vulnerability_ids = np.empty(max_vulnerability_id_size, dtype=np.int32) vuln_array = np.zeros((vulnerability_ids.shape[0], num_damage_bins, num_intensity_bins), dtype=oasis_float) cursor = 0 vulnerability_id_index = 0 vulnerability_id = -1 while cursor < vulns_bin.shape[0]: vuln: Vulnerability = vulns_bin[cursor] if vuln['vulnerability_id'] != vulnerability_id: if vulnerability_id_index == max_vulnerability_id_size: yield vulnerability_ids, vuln_array vuln_array.fill(0) vulnerability_id_index = 0 vulnerability_id = vulns_bin[cursor]['vulnerability_id'] vulnerability_ids[vulnerability_id_index] = vulnerability_id cur_vuln_array = vuln_array[vulnerability_id_index] vulnerability_id_index += 1 cur_vuln_array[vuln['damage_bin_id'] - 1, vuln['intensity_bin_id'] - 1] = vuln['probability'] cursor += 1 if vulnerability_id_index: yield vulnerability_ids[:vulnerability_id_index], vuln_array[:vulnerability_id_index]
[docs] def iter_table(vulns_bin, num_intensity_bins, num_damage_bins, info, max_vulnerability_id_size): """ Loops through the vulnerability data, converting it into one-dimensional arrays, in-turn converting this into PyArrow arrays and then converting into Tables. Args: vulns_bin: (List[Vulnerability]) vulnerability data from file num_intensity_bins: (int) the number of intensity bins in the data num_damage_bins: (int) the number of damage bins in the data info: (dict) meta data around the vulnerability data. It has to store: num_vulnerability_id => number of vulnerability IDs num_intensity_bins => number of intensity bins num_damage_bins => number of damage bins max_vulnerability_id_size: (int) the size of the vulnerability_ids array Returns: """ for vulnerability_ids, vuln_array in get_array(vulns_bin, num_intensity_bins, num_damage_bins, max_vulnerability_id_size): arr_vulnerability_ids = pa.array(vulnerability_ids) arr_vulnerability = pa.FixedSizeListArray.from_arrays(vuln_array.ravel(), num_intensity_bins * num_damage_bins) vulnerability_table = pa.Table.from_arrays([arr_vulnerability_ids, arr_vulnerability], names=[ 'vulnerability_id', 'vuln_array'], metadata=info) yield vulnerability_table
[docs] def vulnerability_to_parquet(run_dir) -> None: """ Converts the vulnerability data to parquet file and saves it. Args: run_dir: (str) the directory of the data where this process is to take place Returns: None """ logger.debug(f'retrieving vulnerability info from {os.path.join(run_dir, vulnerability_filename)}') vulns_bin = np.memmap(os.path.join(run_dir, vulnerability_filename), dtype=Vulnerability, offset=4, mode='r') num_vulnerability_id, num_intensity_bins, num_damage_bins = get_vuln_info(vulns_bin) info = {"num_vulnerability_id": str(num_vulnerability_id), "num_intensity_bins": str(num_intensity_bins), "num_damage_bins": str(num_damage_bins), } logger.debug(f'{info}') with open(pathlib.Path(os.path.join(run_dir, parquetvulnerability_meta_filename)), 'w') as f: json.dump({key: int(val) for key, val in info.items()}, f) dataset_path = pathlib.Path(os.path.join(run_dir, vulnerability_dataset)) dataset_path.mkdir(exist_ok=True) for filepath in dataset_path.glob(vulnerability_parquet_filename.format('*')): os.remove(filepath) max_vulnerability_id_size = vulnerability_bloc_size // (num_intensity_bins * num_damage_bins * oasis_float.itemsize) num_step = ceil(num_vulnerability_id / max_vulnerability_id_size) for i, vuln_table in enumerate(iter_table(vulns_bin, num_intensity_bins, num_damage_bins, info, max_vulnerability_id_size)): logger.debug(f"step {i + 1}/{num_step}") pq.write_table(vuln_table, os.path.join(dataset_path, vulnerability_parquet_filename.format(i)))
[docs] parser = argparse.ArgumentParser()
parser.add_argument('-r', '--run-dir', help='path to the directory containing vulnerability.bin', default='static') parser.add_argument('-v', '--logging-level', help='logging level (debug:10, info:20, warning:30, error:40, critical:50)', default=30, type=int)
[docs] def main(): kwargs = vars(parser.parse_args()) # add handler to fm logger ch = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) logging_level = kwargs.pop('logging_level') logger.setLevel(logging_level) vulnerability_to_parquet(**kwargs)
if __name__ == "__main__": main()