oasislmf.pytools.fm.stream_sparse

This module handles reading loss data from the input stream (GUL or previous FM) and writing computed losses to the output stream. Data is stored using a CSR (Compressed Sparse Row) inspired format for memory efficiency.

Stream Format

Input/Output streams use the Oasis binary format: - Header: stream_type (4 bytes) + max_sidx (4 bytes) - Per item: event_id (4) + item_id (4) + [sidx (4) + loss (4)]* + delimiter (8)

Special sample indices (sidx): - -5 (MAX_LOSS_IDX): Maximum possible loss - -4 (CHANCE_OF_LOSS_IDX): Probability of non-zero loss (pass-through value) - -3 (TIV_IDX): Total Insured Value - -2: Standard deviation (ignored in FM) - -1 (MEAN_IDX): Mean/expected loss - 1..N: Monte Carlo sample indices

Sparse Storage

The CSR-inspired format stores data compactly: - sidx_indptr[i] points to the start of node i’s data in sidx_val - sidx_val contains the actual sidx values (only non-zero losses) - loss_val contains corresponding loss values

This is memory-efficient because most samples may have zero loss, and we only store the non-zero values.

Reading Flow

  1. Read event_id + item_id header

  2. For each (sidx, loss) pair until delimiter: - Add to sparse arrays maintaining sorted sidx order

  3. On new event or end of stream: signal event completion

Writing Flow

  1. Write stream header

  2. For each output node with losses: - Write item header (event_id, output_id, special sidx) - Write each (sidx, loss) pair where loss > 0 - Write delimiter

Attributes

Classes

FMReader

when reading the stream we store relenvant value into a slithly modified version of the CSR sparse matrix where

EventWriterSparse

EventWriterOrderedOutputSparse

Functions

reset_empty_items(compute_idx, sidx_indptr, sidx_val, ...)

Handle items that received no loss data (all samples were zero).

add_new_loss(sidx, loss, compute_i, sidx_indptr, ...)

Insert a (sidx, loss) pair into the sparse arrays, maintaining sorted sidx order.

event_log_msg(event_id, sidx_indptr, len_array, node_count)

read_buffer(byte_mv, cursor, valid_buff, event_id, ...)

Parse a buffer of stream data, populating sparse loss arrays.

load_event(byte_mv, event_id, nodes_array, ...)

get_compute_end(computes, compute_idx)

Module Contents

oasislmf.pytools.fm.stream_sparse.logger[source]
oasislmf.pytools.fm.stream_sparse.SPECIAL_SIDX_COUNT = 6[source]
oasislmf.pytools.fm.stream_sparse.ITEM_HEADER_SIZE[source]
oasislmf.pytools.fm.stream_sparse.SIDX_LOSS_WRITE_SIZE[source]
oasislmf.pytools.fm.stream_sparse.reset_empty_items(compute_idx, sidx_indptr, sidx_val, loss_val, computes)[source]

Handle items that received no loss data (all samples were zero).

If an item’s sidx range is empty (no non-zero losses), we still need a placeholder entry to maintain array consistency. This adds a single TIV_IDX (-3) entry with zero loss.

oasislmf.pytools.fm.stream_sparse.add_new_loss(sidx, loss, compute_i, sidx_indptr, sidx_val, loss_val)[source]

Insert a (sidx, loss) pair into the sparse arrays, maintaining sorted sidx order.

The sidx values must be stored in sorted order for efficient lookup during computation. This function handles three cases:

  1. First value for this node: insert at current position

  2. Sidx > last sidx: append at end (common case, O(1))

  3. Sidx < last sidx: binary search for position, shift existing values (O(n))

Raises ValueError if duplicate sidx is detected (stream corruption).

Args:

sidx: Sample index to insert loss: Loss value for this sample compute_i: Current computation index (node being populated) sidx_indptr: CSR pointers into sidx_val sidx_val: Sample index values loss_val: Loss values

oasislmf.pytools.fm.stream_sparse.event_log_msg(event_id, sidx_indptr, len_array, node_count)[source]
oasislmf.pytools.fm.stream_sparse.read_buffer(byte_mv, cursor, valid_buff, event_id, item_id, nodes_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, computes, compute_idx)[source]

Parse a buffer of stream data, populating sparse loss arrays.

This is the core stream parsing function. It handles the state machine for reading the Oasis binary stream format:

State: item_id == 0 (reading header)
  • Read event_id, item_id

  • Initialize node storage pointers

  • Add node to compute queue

State: item_id != 0 (reading item data)
  • Read (sidx, loss) pairs

  • sidx == 0: delimiter, item complete, return to header state

  • sidx == -2: standard deviation (ignored)

  • sidx == -4: chance of loss (stored in pass_through)

  • other: regular loss sample, add to sparse arrays

The function processes until: - Buffer exhausted (returns with done=0, will be called again) - New event detected (returns with done=1, event complete)

Args:

byte_mv: Memory view of the input buffer cursor: Current read position in buffer valid_buff: Number of valid bytes in buffer event_id: Current event ID (0 on first call) item_id: Current item ID (0 when reading header) nodes_array: Node metadata for mapping item_id to storage sidx_indexes: Maps node_id to sidx array position sidx_indptr: CSR pointers for sidx sidx_val: Sample index values loss_indptr: CSR pointers for loss loss_val: Loss values pass_through: Chance-of-loss values per item computes: Queue of items to compute compute_idx: Computation state pointers

Returns:

(cursor, event_id, item_id, done): Updated state - done=1: Event complete, ready for computation - done=0: Need more data, call again with next buffer

class oasislmf.pytools.fm.stream_sparse.FMReader(nodes_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, len_array, computes, compute_idx)[source]

Bases: oasislmf.pytools.common.event_stream.EventReader

when reading the stream we store relenvant value into a slithly modified version of the CSR sparse matrix where the column indices for row i are stored in indices[indptr[i]:indptr[i+1]] and their corresponding values are stored in data[indptr[i]:indptr[i+1]].

nodes_array: array containing all the static information on the nodes loss_indptr: array containing the indexes of the beginning and end of samples of an item loss_sidx: array containing the sidx of the samples loss_val: array containing the loss of the samples

nodes_array[source]
sidx_indexes[source]
sidx_indptr[source]
sidx_val[source]
loss_indptr[source]
loss_val[source]
pass_through[source]
len_array[source]
computes[source]
compute_idx[source]
logger[source]
read_buffer(byte_mv, cursor, valid_buff, event_id, item_id, **kwargs)[source]
item_exit()[source]
event_read_log(event_id)[source]
oasislmf.pytools.fm.stream_sparse.load_event(byte_mv, event_id, nodes_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, computes, compute_idx, output_array, layer_i, val_i)[source]
class oasislmf.pytools.fm.stream_sparse.EventWriterSparse(files_out, nodes_array, output_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, len_sample, computes)[source]
files_out[source]
nodes_array[source]
sidx_indexes[source]
sidx_indptr[source]
sidx_val[source]
loss_indptr[source]
loss_val[source]
pass_through[source]
len_sample[source]
computes[source]
output_array[source]
byte_mv[source]
write(event_id, compute_idx)[source]
oasislmf.pytools.fm.stream_sparse.get_compute_end(computes, compute_idx)[source]
class oasislmf.pytools.fm.stream_sparse.EventWriterOrderedOutputSparse(files_out, nodes_array, output_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, len_sample, computes)[source]

Bases: EventWriterSparse

write(event_id, compute_idx)[source]