oasislmf.pytools.common.event_stream

Contain all common function and attribute to help read the event stream containing the losses

Attributes

Classes

EventReader

Abstract class to read event stream

Functions

stream_info_to_bytes(stream_source_type, stream_agg_type)

From Stream source type and aggregation type produce the stream header

bytes_to_stream_types(stream_header)

Read the stream header and return the information on stream type

read_stream_info(stream_obj)

from open stream object return the information that characterize the stream (stream_source_type, stream_agg_type, len_sample)

get_streams_in(files_in, stack)

get_and_check_header_in(streams_in)

init_streams_in(files_in, stack)

if files_in use stdin as stream in

mv_read(byte_mv, cursor, _dtype, itemsize)

read a certain dtype from numpy byte view starting at cursor, return the value and the index of the end of the object

mv_write(→ int)

load an object into the numpy byte view at index cursor, return the index of the end of the object

mv_write_summary_header(→ int)

write a summary header to the numpy byte view at index cursor, return the index of the end of the object

mv_write_item_header(→ int)

write a item header to the numpy byte view at index cursor, return the index of the end of the object

mv_write_sidx_loss(→ int)

write sidx and loss to the numpy byte view at index cursor, return the index of the end of the object

mv_write_delimiter(→ int)

write the item delimiter (0,0) to the numpy byte view at index cursor, return the index of the end of the object

write_mv_to_stream(stream, byte_mv, cursor)

Write numpy byte array view to stream

Module Contents

oasislmf.pytools.common.event_stream.PIPE_CAPACITY = 65536[source]
oasislmf.pytools.common.event_stream.CDF_STREAM_ID = 0[source]
oasislmf.pytools.common.event_stream.GUL_STREAM_ID = 1[source]
oasislmf.pytools.common.event_stream.FM_STREAM_ID = 2[source]
oasislmf.pytools.common.event_stream.LOSS_STREAM_ID = 2[source]
oasislmf.pytools.common.event_stream.SUMMARY_STREAM_ID = 3[source]
oasislmf.pytools.common.event_stream.ITEM_STREAM = 1[source]
oasislmf.pytools.common.event_stream.COVERAGE_STREAM = 2[source]
oasislmf.pytools.common.event_stream.MEAN_IDX = -1[source]
oasislmf.pytools.common.event_stream.STD_DEV_IDX = -2[source]
oasislmf.pytools.common.event_stream.TIV_IDX = -3[source]
oasislmf.pytools.common.event_stream.MAX_LOSS_IDX = -5[source]
oasislmf.pytools.common.event_stream.stream_info_to_bytes(stream_source_type, stream_agg_type)[source]

From Stream source type and aggregation type produce the stream header Args:

stream_source_type (np.int32): stream_agg_type (np.int32):

Returns:

return bytes

oasislmf.pytools.common.event_stream.bytes_to_stream_types(stream_header)[source]

Read the stream header and return the information on stream type Args:

stream_header: bytes

Returns:

(stream source type (np.int32), stream aggregation type (np.int32))

oasislmf.pytools.common.event_stream.read_stream_info(stream_obj)[source]

from open stream object return the information that characterize the stream (stream_source_type, stream_agg_type, len_sample) Args:

stream_obj: open stream

Returns:

(stream_source_type, stream_agg_type, len_sample) as np.int32 triplet

oasislmf.pytools.common.event_stream.get_streams_in(files_in, stack)[source]
oasislmf.pytools.common.event_stream.get_and_check_header_in(streams_in)[source]
oasislmf.pytools.common.event_stream.init_streams_in(files_in, stack)[source]

if files_in use stdin as stream in otherwise open each path in files_in, read the header, check that they are the same, and return the streams and their info Args:

files_in: none or a list of path stack: contextlib stack to add the open stream to

Returns:

list of open streams and their info

oasislmf.pytools.common.event_stream.mv_read(byte_mv, cursor, _dtype, itemsize)[source]

read a certain dtype from numpy byte view starting at cursor, return the value and the index of the end of the object Args:

byte_mv: numpy byte view cursor: index of where the object start _dtype: data type of the object itemsize: size of the data type

Returns:

(object value, end of object index)

oasislmf.pytools.common.event_stream.mv_write(byte_mv, cursor, _dtype, itemsize, value) int[source]

load an object into the numpy byte view at index cursor, return the index of the end of the object Args:

byte_mv: numpy byte view cursor: index of where the object start _dtype: data type of the object itemsize: size of the data type value: value to write

Returns:

end of object index

oasislmf.pytools.common.event_stream.mv_write_summary_header(byte_mv, cursor, event_id, summary_id, exposure_value) int[source]

write a summary header to the numpy byte view at index cursor, return the index of the end of the object Args:

byte_mv: numpy byte view cursor: index of where the object start event_id: event id summary_id: summary id exposure_value: exposure value

Returns:

end of object index

oasislmf.pytools.common.event_stream.mv_write_item_header(byte_mv, cursor, event_id, item_id) int[source]

write a item header to the numpy byte view at index cursor, return the index of the end of the object Args:

byte_mv: numpy byte view cursor: index of where the object start event_id: event id item_id: item id

Returns:

end of object index

oasislmf.pytools.common.event_stream.mv_write_sidx_loss(byte_mv, cursor, sidx, loss) int[source]

write sidx and loss to the numpy byte view at index cursor, return the index of the end of the object Args:

byte_mv: numpy byte view cursor: index of where the object start sidx: sample id loss: loss

Returns:

end of object index

oasislmf.pytools.common.event_stream.mv_write_delimiter(byte_mv, cursor) int[source]

write the item delimiter (0,0) to the numpy byte view at index cursor, return the index of the end of the object Args:

byte_mv: numpy byte view cursor: index of where the object start

Returns:

end of delimiter index

class oasislmf.pytools.common.event_stream.EventReader[source]

Abstract class to read event stream

This class provide a generic interface to read multiple event stream using: - selector : handle back pressure, the program is paused and don’t use resource if nothing is in the stream buffer - memoryview : read a chuck (PIPE_CAPACITY) of data at a time then work on it using a numpy byte view of this buffer

To use those methods need to be implemented: - __init__(self, …) the constructor with all data structure needed to read and store the event stream - read_buffer(self, byte_mv, cursor, valid_buff, event_id, item_id)

simply point to a local numba.jit function name read_buffer (a template is provided bellow) this function should implement the specific logic of where and how to store the event information.

Those to method may be overwritten - item_exit(self):

specific logic to do when an item is finished (only executed once the stream is finished but no 0,0 closure was present)

  • event_read_log(self):

    what kpi to log when a full event is read

usage snippet:
with ExitStack() as stack:

streams_in, (stream_type, stream_agg_type, len_sample) = init_streams_in(files_in, stack) reader = CustomReader(<read relevant attributes>) for event_id in reader.read_streams(streams_in):

<event logic>

static register_streams_in(selector_class, streams_in)[source]

Data from input process is generally sent by event block, meaning once a stream receive data, the complete event is going to be sent in a short amount of time. Therefore, we can focus on each stream one by one using their specific selector ‘stream_selector’.

read_streams(streams_in)[source]

read multiple stream input, yield each event id and load relevant value according to the specific read_buffer implemented in subclass Args:

streams_in: streams to read

Returns:

event id generator

read_event(stream_in, main_selector, stream_selector, mv, byte_mv, cursor, valid_buff)[source]

read one event from stream_in close and remove the stream from main_selector when all is read Args:

stream_in: stream to read main_selector: selector that contain all the streams stream_selector: this stream selector mv: buffer memoryview byte_mv: numpy byte view of the buffer cursor: current cursor of the memory view valid_buff: valid data in memory view

Returns:

event_id, cursor, valid_buff

abstract read_buffer(byte_mv, cursor, valid_buff, event_id, item_id)[source]
item_exit()[source]
event_read_log(event_id)[source]
oasislmf.pytools.common.event_stream.write_mv_to_stream(stream, byte_mv, cursor)[source]

Write numpy byte array view to stream - use select to handle forward pressure - use a while loop in case the stream is non-blocking (meaning the ammount of byte written is not guarantied to be cursor len) Args:

stream: stream to write to byte_mv: numpy byte view of the buffer to write cursor: ammount of byte to write