Source code for oasislmf.pytools.data_layer.conversions.footprint

import json
import os
import argparse
import shutil
from tqdm.auto import tqdm
from contextlib import ExitStack

import pandas as pd
import pyarrow as pa
from pyarrow import parquet as pq

from oasis_data_manager.filestore.backends.local import LocalStorage
from oasislmf.pytools.getmodel.footprint import Footprint


[docs] def convert_bin_to_parquet_event(static_path: str, **kwargs) -> None: """ Converts the data from a binary file to a parquet file. Args: static_path: (str) the path to the static file Returns: None """ root_path = os.path.join(static_path, 'footprint.parquet') try: shutil.rmtree(root_path) except FileNotFoundError: pass with ExitStack() as stack: storage = LocalStorage( root_dir=static_path, cache_dir=None, ) footprint_obj = stack.enter_context(Footprint.load(storage, ignore_file_type={'csv', 'parquet'})) index_data = footprint_obj.footprint_index meta_data = { "num_intensity_bins": footprint_obj.num_intensity_bins, "has_intensity_uncertainty": True if footprint_obj.has_intensity_uncertainty == 1 else False } for event_id in tqdm(index_data.keys(), desc="processing events"): data_slice = footprint_obj.get_event(event_id) df = pd.DataFrame(data_slice) df["event_id"] = event_id pq.write_to_dataset( pa.Table.from_pandas(df), root_path=root_path, partition_cols=['event_id'], compression="BROTLI" ) with storage.open('footprint_parquet_meta.json', 'w') as outfile: json.dump(meta_data, outfile)
[docs] def convert_bin_to_parquet_chunk(static_path, chunk_size, **kwargs) -> None: """ Converts the data from a binary file to a parquet file. Args: static_path: (str) the path to the static file chunk_size: target raw size of the partition Returns: None """ root_path = os.path.join(static_path, 'footprint_chunk') try: shutil.rmtree(root_path) except FileNotFoundError: pass os.mkdir(root_path) density = (0.0, 0) with ExitStack() as stack: storage = LocalStorage( root_dir=static_path, cache_dir=None, ) footprint_obj = stack.enter_context( Footprint.load(storage, ignore_file_type={'csv', 'parquet'}) ) index_data = footprint_obj.footprint_index meta_data = { "num_intensity_bins": footprint_obj.num_intensity_bins, "has_intensity_uncertainty": True if footprint_obj.has_intensity_uncertainty == 1 else False } event_data = [] for event_id in tqdm(index_data.keys(), desc="parsing index file"): data_slice = footprint_obj.get_event(event_id) df = pd.DataFrame(data_slice) min_areaperil_id = min(df['areaperil_id']) max_areaperil_id = max(df['areaperil_id']) event_data.append((min_areaperil_id, max_areaperil_id, event_id)) if df.shape[0]: if max_areaperil_id == min_areaperil_id: cur_density = 1 else: cur_density = df.shape[0] / (max_areaperil_id - min_areaperil_id) density = ((density[0] * density[1] + cur_density) / (density[1] + 1), density[1] + 1) event_data.sort(key=lambda x: x[0]) current_chunk = [] current_size = 0 count = 1 footprint_lookup = [] for min_apid, max_apid, event_id in tqdm(event_data, desc="processing chunks"): footprint_lookup.append({ 'event_id': event_id, 'partition': count, 'min_areaperil_id': min_apid, 'max_areaperil_id': max_apid }) # size? data_slice = footprint_obj.get_event(event_id) df = pd.DataFrame(data_slice) df["event_id"] = event_id current_chunk.append(df) current_size += df.memory_usage(deep=True).sum() if (current_size < chunk_size): continue pq.write_table( pa.Table.from_pandas( pd.concat(current_chunk, ignore_index=True) ), os.path.join(root_path, f'footprint_{count}.parquet'), compression="BROTLI" ) current_chunk = [] current_size = 0 count += 1 if current_chunk: pq.write_table( pa.Table.from_pandas( pd.concat(current_chunk, ignore_index=True) ), os.path.join(root_path, f'footprint_{count}.parquet'), compression='BROTLI' ) footprint_lookup_df = pd.DataFrame(footprint_lookup) footprint_lookup_df.to_parquet(f'{static_path}/footprint_lookup.parquet', index=False) # Writing to other file types min and max ids no_partition = footprint_lookup_df.drop(columns=['partition']) no_partition.to_pickle(f'{static_path}/footprint_lookup.bin') no_partition.to_csv(f'{static_path}/footprint_lookup.csv', index=False) with open(f'{static_path}/footprint_parquet_meta.json', 'w') as outfile: json.dump(meta_data, outfile) print(f"event area_peril_id density is {density[0]}")
[docs] default_chunk_size = 1024 * 1024 * 8
[docs] parser = argparse.ArgumentParser()
parser.add_argument('-p', '--static-path', help='path to the folder containing the footprint files', default='.') parser.add_argument('--chunk', help='if flag is set will create chunk footprint with the target raw size of chunk_size', action='store_true') parser.add_argument('--chunk-size', help=f'target size for chunk parquet, default {default_chunk_size/1024*1024}MB', type=int, default=default_chunk_size)
[docs] def main(): kwargs = vars(parser.parse_args()) if kwargs['static_path'] == '.': kwargs['static_path'] = str(os.getcwd()) if kwargs['chunk']: convert_bin_to_parquet_chunk(**kwargs) else: convert_bin_to_parquet_event(**kwargs)
if __name__ == '__main__': main()