Source code for oasislmf.pytools.converters.bintocsv.utils.vulnerability

import sys
import zlib
import numpy as np
from oasislmf.pytools.common.data import DEFAULT_BUFFER_SIZE, resolve_file, write_ndarray_to_fmt_csv
from oasislmf.pytools.converters.data import TOOL_INFO
from oasislmf.pytools.getmodel.manager import VulnerabilityIndex, VulnerabilityRow_dtype
from oasislmf.utils.exceptions import OasisException


def _read_bin_idx_zip(file_in, idx_file_in, dtype):
    if file_in == sys.stdin.buffer:
        bin_data = np.frombuffer(file_in.read(), dtype="u1")
    else:
        bin_data = np.fromfile(file_in, dtype="u1")

    idx_data = np.memmap(idx_file_in, dtype=VulnerabilityIndex)
    num_rows = np.sum(idx_data["original_size"]) // VulnerabilityRow_dtype.itemsize

    data = np.empty(num_rows, dtype=dtype)
    data_index = 0
    for row in idx_data:
        vuln_id = row["vulnerability_id"]
        offset = row["offset"]
        size = row["size"]
        original_size = row["original_size"]
        num_data = original_size // VulnerabilityRow_dtype.itemsize
        zipped_data = bin_data[offset:offset + size]
        unzipped_data = zlib.decompress(zipped_data)
        vuln_row_data = np.frombuffer(unzipped_data, dtype=VulnerabilityRow_dtype)
        data[data_index:data_index + num_data]["vulnerability_id"] = vuln_id
        data[data_index:data_index + num_data]["intensity_bin_id"] = vuln_row_data["intensity_bin_id"]
        data[data_index:data_index + num_data]["damage_bin_id"] = vuln_row_data["damage_bin_id"]
        data[data_index:data_index + num_data]["probability"] = vuln_row_data["probability"]
        data_index += num_data
    return data


def _read_bin_idx(file_in, idx_file_in, dtype):
    if file_in == sys.stdin.buffer:
        bin_data = np.frombuffer(file_in.read(), dtype="u1")
    else:
        bin_data = np.fromfile(file_in, dtype="u1")

    idx_data = np.memmap(idx_file_in, dtype=VulnerabilityIndex)
    num_rows = np.sum(idx_data["size"]) // VulnerabilityRow_dtype.itemsize

    data = np.empty(num_rows, dtype=dtype)
    data_index = 0
    for row in idx_data:
        vuln_id = row["vulnerability_id"]
        offset = row["offset"]
        size = row["size"]
        num_data = size // VulnerabilityRow_dtype.itemsize
        vuln_row_data = bin_data[offset:offset + size].view(dtype=VulnerabilityRow_dtype)
        data[data_index:data_index + num_data]["vulnerability_id"] = vuln_id
        data[data_index:data_index + num_data]["intensity_bin_id"] = vuln_row_data["intensity_bin_id"]
        data[data_index:data_index + num_data]["damage_bin_id"] = vuln_row_data["damage_bin_id"]
        data[data_index:data_index + num_data]["probability"] = vuln_row_data["probability"]
        data_index += num_data
    return data


def _read_bin(file_in, dtype):
    # Offet 4 bytes for damage bins int32
    if file_in == sys.stdin.buffer:
        return np.frombuffer(file_in.read(), dtype=dtype, offset=4)
    else:
        return np.fromfile(file_in, dtype=dtype, offset=4)


[docs] def vulnerability_tocsv(stack, file_in, file_out, file_type, noheader, idx_file_in, zip_files): headers = TOOL_INFO[file_type]["headers"] dtype = TOOL_INFO[file_type]["dtype"] fmt = TOOL_INFO[file_type]["fmt"] file_in = resolve_file(file_in, "rb", stack) if not noheader: file_out.write(",".join(headers) + "\n") if zip_files: if idx_file_in is None: raise OasisException(f"Error: Cannot read zip files without provided idx_file_in zip path, currently {idx_file_in}") data = _read_bin_idx_zip(file_in, idx_file_in, dtype) else: if idx_file_in is not None: data = _read_bin_idx(file_in, idx_file_in, dtype) else: data = _read_bin(file_in, dtype) num_rows = data.shape[0] buffer_size = DEFAULT_BUFFER_SIZE for start in range(0, num_rows, buffer_size): end = min(start + buffer_size, num_rows) buffer_data = data[start:end] write_ndarray_to_fmt_csv(file_out, buffer_data, headers, fmt)