Source code for oasislmf.pytools.converters.bintoparquet.manager

#!/usr/bin/env python

from contextlib import ExitStack
import logging
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

from oasislmf.pytools.common.data import DEFAULT_BUFFER_SIZE, resolve_file
from oasislmf.pytools.converters.data import TOOL_INFO

[docs] logger = logging.getLogger(__name__)
[docs] def default_toparquet(stack, file_in, file_out, file_type): headers = TOOL_INFO[file_type]["headers"] dtype = TOOL_INFO[file_type]["dtype"] file_in = resolve_file(file_in, "rb", stack) chunk_bytes = DEFAULT_BUFFER_SIZE * dtype.itemsize schema = pa.schema([(col, pa.array(np.empty(0, dtype=dtype.fields[col][0])).type) for col in headers]) writer = pq.ParquetWriter(file_out, schema) try: while True: raw = file_in.read(chunk_bytes) if not raw: break chunk = np.frombuffer(raw, dtype=dtype) writer.write_table(pa.Table.from_arrays([pa.array(chunk[col]) for col in headers], schema=schema)) finally: writer.close()
[docs] def bintoparquet(file_in, file_out, file_type, **kwargs): """Convert bin file to parquet file based on file type Args: file_in (str | os.PathLike): Input file path file_out (str | os.PathLike): Output file path file_type (str): File type str from SUPPORTED_BINTOPARQUET """ with ExitStack() as stack: file_out = resolve_file(file_out, "wb", stack) default_toparquet(stack, file_in, file_out, file_type, **kwargs)