Source code for oasislmf.pytools.eve.manager

# evepy/manager.py

from contextlib import ExitStack
import logging
from pathlib import Path
import numpy as np

from oasislmf.pytools.utils import redirect_logging
from oasislmf.pytools.common.data import oasis_int, resolve_file

[docs] logger = logging.getLogger(__name__)
[docs] DEFAULT_EVENTS_FILE = Path('input/events.bin')
[docs] NUMPY_RANDOM_SEED = 723706
[docs] def read_events(input_file): """Read the event IDs from the binary events file. Args: input_file (str | os.PathLike): Path to binary events file. """ return np.fromfile(input_file, dtype=oasis_int)
[docs] def stream_events(events, stream_out): """Stream the output events. Args: events (Iterable): Iterable containing the events to stream. stream_out (File object): File object with `write` method for handling output. """ for e in events: stream_out.write(np.int32(e).tobytes())
[docs] def calculate_events_per_process(n_events, total_processes): """Calculate number of events per process. """ events_per_process, remainder = divmod(n_events, total_processes) return events_per_process + bool(remainder) # add 1 if remainder
[docs] def partition_events__no_shuffle(events, process_number, total_processes): """Assign events in the order they are loaded to each process in turn. Only output the event IDs allocated to the given `process_number`. Args: events (np.array): Array of ordered event IDs. process_number (int): The process number to receive a partition of events. total_processes (int): Total number of processes to distribute the events over. """ events_per_process = calculate_events_per_process(len(events), total_processes) return events[(process_number - 1) * events_per_process: process_number * events_per_process]
[docs] def partition_events__random_builtin(events, process_number, total_processes): """Shuffle the events randomly and allocate to each process using builtin shuffle. Only output the event IDs to the given `process_number`. Note that this can be memory intensive. For `len(events) > 10**5` recommend using `partition_events__random`. Args: events (np.array): Array of ordered event IDs. process_number (int): The process number to receive a partition of events. total_processes (int): Total number of processes to distribute the events over. """ rng = np.random.default_rng(NUMPY_RANDOM_SEED) rng.shuffle(events) return partition_events__no_shuffle(events, process_number, total_processes)
[docs] def partition_events__random(events, process_number, total_processes): """Shuffle the events randomly and allocate to each process. Only output the event IDs to the given `process_number`. Generates an iterator. Randomisation is implemented using the Fisher-Yates algorithm. Args: events (np.array): Array of ordered event IDs. process_number (int): The process number to receive a partition of events. total_processes (int): Total number of processes to distribute the events over. """ rng = np.random.default_rng(NUMPY_RANDOM_SEED) for i in range(len(events) - 1, 0, -1): j = rng.integers(0, i + 1) events[i], events[j] = events[j], events[i] if (i - process_number) % total_processes == 0: yield events[i] if process_number % total_processes == 0: # Event at 0 index yield events[0]
[docs] def partition_events__round_robin(events, process_number, total_processes): """Partition the events sequentially in a round robin style per process. Only output the events allocated to the given `process_number`. Args: events (np.array): Array of ordered event IDs. process_number (int): The process number to receive a partition of events. total_processes (int): Total number of processes to distribute the events over. """ return events[np.arange(process_number - 1, len(events), total_processes)]
[docs] def run(input_file, process_number, total_processes, no_shuffle=False, randomise=False, randomise_builtin=False, output_file='-', ): """Generate event ID partitions as a binary data stream with shuffling. By default the events are shuffled by assiging to processes one by one cyclically. Args: input_file (str | os.PathLike): Path to binary events file. If None then defaults to DEFAULT_EVENTS_FILE. process_number (int): The process number to receive a partition of events. total_processes (int): Total number of processes to distribute the events over. no_shuffle (bool, optional): Disable shuffling events. Events are split and distributed into blocks in the order they are input. Takes priority over `randomise(_builtin)`. randomise (bool, optional): Shuffle events randomly in the blocks and stream events on the fly. If `no_shuffle` is `True` then it takes priority. randomise_builtin (bool, optional): Shuffle events randomly in the blocks using builtin shuffle. If `no_shuffle` or `randomise` is `True` then it they take priority. output_file (str | os.PathLike): Path to output file. If '-' then outputs to stdout. """ if input_file is None: input_file = DEFAULT_EVENTS_FILE # Check input file is valid input_file = Path(input_file) if not input_file.exists(): raise FileNotFoundError(f"ERROR: File \'{input_file}\' does not exist.") if not input_file.is_file(): raise ValueError(f"ERROR: \'{input_file}\' is not a file.") # Check shuffle and randomise settings if no_shuffle and (randomise or randomise_builtin): logger.warning("Warning: `no_shuffle` and `randomise(_builtin)` options are incompatible. Ignoring `randomise(_builtin)`.") randomise = False randomise_builtin = False if randomise and randomise_builtin: logger.warning("Warning: `randomise` and `randomise_builtin` options are incompatible. Ignoring `randomise_builtin`.") randomise_builtin = False events = read_events(input_file) if no_shuffle: event_partitions = partition_events__no_shuffle(events, process_number, total_processes) elif randomise: event_partitions = partition_events__random(events, process_number, total_processes) elif randomise_builtin: event_partitions = partition_events__random_builtin(events, process_number, total_processes) else: event_partitions = partition_events__round_robin(events, process_number, total_processes) with ExitStack() as stack: stream_out = resolve_file(output_file, 'wb', stack) stream_events(event_partitions, stream_out)
@redirect_logging(exec_name='evepy')
[docs] def main(input_file=None, process_number=None, total_processes=None, no_shuffle=False, randomise=False, randomise_builtin=False, output_file='-', **kwargs): run(input_file=input_file, process_number=process_number, total_processes=total_processes, no_shuffle=no_shuffle, randomise=randomise, randomise_builtin=randomise_builtin, output_file=output_file)