oasislmf.pytools.fm.stream_sparse¶
This module handles reading loss data from the input stream (GUL or previous FM) and writing computed losses to the output stream. Data is stored using a CSR (Compressed Sparse Row) inspired format for memory efficiency.
Stream Format¶
Input/Output streams use the Oasis binary format: - Header: stream_type (4 bytes) + max_sidx (4 bytes) - Per item: event_id (4) + item_id (4) + [sidx (4) + loss (4)]* + delimiter (8)
Special sample indices (sidx): - -5 (MAX_LOSS_IDX): Maximum possible loss - -4 (CHANCE_OF_LOSS_IDX): Probability of non-zero loss (pass-through value) - -3 (TIV_IDX): Total Insured Value - -2: Standard deviation (ignored in FM) - -1 (MEAN_IDX): Mean/expected loss - 1..N: Monte Carlo sample indices
Sparse Storage¶
The CSR-inspired format stores data compactly: - sidx_indptr[i] points to the start of node i’s data in sidx_val - sidx_val contains the actual sidx values (only non-zero losses) - loss_val contains corresponding loss values
This is memory-efficient because most samples may have zero loss, and we only store the non-zero values.
Reading Flow¶
Read event_id + item_id header
For each (sidx, loss) pair until delimiter: - Add to sparse arrays maintaining sorted sidx order
On new event or end of stream: signal event completion
Writing Flow¶
Write stream header
For each output node with losses: - Write item header (event_id, output_id, special sidx) - Write each (sidx, loss) pair where loss > 0 - Write delimiter
Attributes¶
Classes¶
when reading the stream we store relenvant value into a slithly modified version of the CSR sparse matrix where |
|
Functions¶
|
Handle items that received no loss data (all samples were zero). |
|
Insert a (sidx, loss) pair into the sparse arrays, maintaining sorted sidx order. |
|
|
|
Parse a buffer of stream data, populating sparse loss arrays. |
|
|
|
Module Contents¶
- oasislmf.pytools.fm.stream_sparse.reset_empty_items(compute_idx, sidx_indptr, sidx_val, loss_val, computes)[source]¶
Handle items that received no loss data (all samples were zero).
If an item’s sidx range is empty (no non-zero losses), we still need a placeholder entry to maintain array consistency. This adds a single TIV_IDX (-3) entry with zero loss.
- oasislmf.pytools.fm.stream_sparse.add_new_loss(sidx, loss, compute_i, sidx_indptr, sidx_val, loss_val)[source]¶
Insert a (sidx, loss) pair into the sparse arrays, maintaining sorted sidx order.
The sidx values must be stored in sorted order for efficient lookup during computation. This function handles three cases:
First value for this node: insert at current position
Sidx > last sidx: append at end (common case, O(1))
Sidx < last sidx: binary search for position, shift existing values (O(n))
Raises ValueError if duplicate sidx is detected (stream corruption).
- Args:
sidx: Sample index to insert loss: Loss value for this sample compute_i: Current computation index (node being populated) sidx_indptr: CSR pointers into sidx_val sidx_val: Sample index values loss_val: Loss values
- oasislmf.pytools.fm.stream_sparse.event_log_msg(event_id, sidx_indptr, len_array, node_count)[source]¶
- oasislmf.pytools.fm.stream_sparse.read_buffer(byte_mv, cursor, valid_buff, event_id, item_id, nodes_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, computes, compute_idx)[source]¶
Parse a buffer of stream data, populating sparse loss arrays.
This is the core stream parsing function. It handles the state machine for reading the Oasis binary stream format:
- State: item_id == 0 (reading header)
Read event_id, item_id
Initialize node storage pointers
Add node to compute queue
- State: item_id != 0 (reading item data)
Read (sidx, loss) pairs
sidx == 0: delimiter, item complete, return to header state
sidx == -2: standard deviation (ignored)
sidx == -4: chance of loss (stored in pass_through)
other: regular loss sample, add to sparse arrays
The function processes until: - Buffer exhausted (returns with done=0, will be called again) - New event detected (returns with done=1, event complete)
- Args:
byte_mv: Memory view of the input buffer cursor: Current read position in buffer valid_buff: Number of valid bytes in buffer event_id: Current event ID (0 on first call) item_id: Current item ID (0 when reading header) nodes_array: Node metadata for mapping item_id to storage sidx_indexes: Maps node_id to sidx array position sidx_indptr: CSR pointers for sidx sidx_val: Sample index values loss_indptr: CSR pointers for loss loss_val: Loss values pass_through: Chance-of-loss values per item computes: Queue of items to compute compute_idx: Computation state pointers
- Returns:
(cursor, event_id, item_id, done): Updated state - done=1: Event complete, ready for computation - done=0: Need more data, call again with next buffer
- class oasislmf.pytools.fm.stream_sparse.FMReader(nodes_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, len_array, computes, compute_idx)[source]¶
Bases:
oasislmf.pytools.common.event_stream.EventReaderwhen reading the stream we store relenvant value into a slithly modified version of the CSR sparse matrix where the column indices for row i are stored in indices[indptr[i]:indptr[i+1]] and their corresponding values are stored in data[indptr[i]:indptr[i+1]].
nodes_array: array containing all the static information on the nodes loss_indptr: array containing the indexes of the beginning and end of samples of an item loss_sidx: array containing the sidx of the samples loss_val: array containing the loss of the samples
- oasislmf.pytools.fm.stream_sparse.load_event(byte_mv, event_id, nodes_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, computes, compute_idx, output_array, layer_i, val_i)[source]¶
- class oasislmf.pytools.fm.stream_sparse.EventWriterSparse(files_out, nodes_array, output_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, len_sample, computes)[source]¶
- class oasislmf.pytools.fm.stream_sparse.EventWriterOrderedOutputSparse(files_out, nodes_array, output_array, sidx_indexes, sidx_indptr, sidx_val, loss_indptr, loss_val, pass_through, len_sample, computes)[source]¶
Bases:
EventWriterSparse