"""
This file houses a series of functions that load, convert, and save vulnerability data with Parquet files.
"""
import argparse
import json
import logging
import os
import pathlib
from math import ceil
from typing import List, Tuple
import numba as nb
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from oasislmf.pytools.common.data import oasis_float
from .common import Vulnerability
[docs]
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
[docs]
vulnerability_filename = 'vulnerability.bin'
[docs]
vulnerability_dataset = "vulnerability_dataset.parquet"
[docs]
vulnerability_parquet_filename = 'part_{}.parquet'
[docs]
vulnerability_bloc_size = int(os.environ.get('VULNERABILITY_BLOC_SIZE', 1e8)) # Bytes
@nb.njit(cache=True)
[docs]
def get_vuln_info(vulns_bin) -> Tuple[int, int, int]:
"""
Extracts meta data from the vulnerability data.
Args:
vulns_bin: (List[Vulnerability]) vulnerability data from the file
Returns: (Tuple[int, int, int]) number of vulnerability IDs, number of intensity bins, number of damage bins
"""
vulnerability_ids_set = set()
num_intensity_bins = 0
num_damage_bins = 0
for vuln_i in range(vulns_bin.shape[0]):
vuln = vulns_bin[vuln_i]
vulnerability_ids_set.add(vuln['vulnerability_id'])
if num_intensity_bins < vuln['intensity_bin_id']:
num_intensity_bins = vuln['intensity_bin_id']
if num_damage_bins < vuln['damage_bin_id']:
num_damage_bins = vuln['damage_bin_id']
return len(vulnerability_ids_set), num_intensity_bins, num_damage_bins
@nb.njit()
[docs]
def get_array(vulns_bin, num_intensity_bins, num_damage_bins, max_vulnerability_id_size) -> Tuple[List[int], List[List[List[float]]]]:
"""
Flattens the vulnerability data into a one-dimensional array for each vulnerability ID.
NOTE => Numba: cannot cache generator for the moment to work properlly, data on the same vulnerability_id must
all be in one block
Args:
vulns_bin: (List[Vulnerability]) vulnerability data from file
num_intensity_bins: (int) the number of intensity bins in the data
num_damage_bins: (int) the number of damage bins in the data
max_vulnerability_id_size: (int) the size of the vulnerability_ids array inside the function, this will
be the size of the vulnerability IDs array that is returned
Returns: (Tuple[List[int], List[List[List[float]]]]) array of vulnerability IDs, vulnerability data
"""
vulnerability_ids = np.empty(max_vulnerability_id_size, dtype=np.int32)
vuln_array = np.zeros((vulnerability_ids.shape[0], num_damage_bins, num_intensity_bins), dtype=oasis_float)
cursor = 0
vulnerability_id_index = 0
vulnerability_id = -1
while cursor < vulns_bin.shape[0]:
vuln: Vulnerability = vulns_bin[cursor]
if vuln['vulnerability_id'] != vulnerability_id:
if vulnerability_id_index == max_vulnerability_id_size:
yield vulnerability_ids, vuln_array
vuln_array.fill(0)
vulnerability_id_index = 0
vulnerability_id = vulns_bin[cursor]['vulnerability_id']
vulnerability_ids[vulnerability_id_index] = vulnerability_id
cur_vuln_array = vuln_array[vulnerability_id_index]
vulnerability_id_index += 1
cur_vuln_array[vuln['damage_bin_id'] - 1, vuln['intensity_bin_id'] - 1] = vuln['probability']
cursor += 1
if vulnerability_id_index:
yield vulnerability_ids[:vulnerability_id_index], vuln_array[:vulnerability_id_index]
[docs]
def iter_table(vulns_bin, num_intensity_bins, num_damage_bins, info, max_vulnerability_id_size):
"""
Loops through the vulnerability data, converting it into one-dimensional arrays, in-turn converting this into
PyArrow arrays and then converting into Tables.
Args:
vulns_bin: (List[Vulnerability]) vulnerability data from file
num_intensity_bins: (int) the number of intensity bins in the data
num_damage_bins: (int) the number of damage bins in the data
info: (dict) meta data around the vulnerability data. It has to store:
num_vulnerability_id => number of vulnerability IDs
num_intensity_bins => number of intensity bins
num_damage_bins => number of damage bins
max_vulnerability_id_size: (int) the size of the vulnerability_ids array
Returns:
"""
for vulnerability_ids, vuln_array in get_array(vulns_bin, num_intensity_bins, num_damage_bins, max_vulnerability_id_size):
arr_vulnerability_ids = pa.array(vulnerability_ids)
arr_vulnerability = pa.FixedSizeListArray.from_arrays(vuln_array.ravel(), num_intensity_bins * num_damage_bins)
vulnerability_table = pa.Table.from_arrays([arr_vulnerability_ids, arr_vulnerability], names=[
'vulnerability_id', 'vuln_array'], metadata=info)
yield vulnerability_table
[docs]
def vulnerability_to_parquet(run_dir) -> None:
"""
Converts the vulnerability data to parquet file and saves it.
Args:
run_dir: (str) the directory of the data where this process is to take place
Returns: None
"""
logger.debug(f'retrieving vulnerability info from {os.path.join(run_dir, vulnerability_filename)}')
vulns_bin = np.memmap(os.path.join(run_dir, vulnerability_filename), dtype=Vulnerability, offset=4, mode='r')
num_vulnerability_id, num_intensity_bins, num_damage_bins = get_vuln_info(vulns_bin)
info = {"num_vulnerability_id": str(num_vulnerability_id),
"num_intensity_bins": str(num_intensity_bins),
"num_damage_bins": str(num_damage_bins),
}
logger.debug(f'{info}')
with open(pathlib.Path(os.path.join(run_dir, parquetvulnerability_meta_filename)), 'w') as f:
json.dump({key: int(val) for key, val in info.items()}, f)
dataset_path = pathlib.Path(os.path.join(run_dir, vulnerability_dataset))
dataset_path.mkdir(exist_ok=True)
for filepath in dataset_path.glob(vulnerability_parquet_filename.format('*')):
os.remove(filepath)
max_vulnerability_id_size = vulnerability_bloc_size // (num_intensity_bins * num_damage_bins * oasis_float.itemsize)
num_step = ceil(num_vulnerability_id / max_vulnerability_id_size)
for i, vuln_table in enumerate(iter_table(vulns_bin, num_intensity_bins, num_damage_bins, info, max_vulnerability_id_size)):
logger.debug(f"step {i + 1}/{num_step}")
pq.write_table(vuln_table, os.path.join(dataset_path, vulnerability_parquet_filename.format(i)))
[docs]
parser = argparse.ArgumentParser()
parser.add_argument('-r', '--run-dir', help='path to the directory containing vulnerability.bin', default='static')
parser.add_argument('-v', '--logging-level', help='logging level (debug:10, info:20, warning:30, error:40, critical:50)',
default=30, type=int)
[docs]
def main():
kwargs = vars(parser.parse_args())
# add handler to fm logger
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logging_level = kwargs.pop('logging_level')
logger.setLevel(logging_level)
vulnerability_to_parquet(**kwargs)
if __name__ == "__main__":
main()