oasislmf.pytools.common.event_stream ==================================== .. py:module:: oasislmf.pytools.common.event_stream .. autoapi-nested-parse:: Contain all common function and attribute to help read the event stream containing the losses Attributes ---------- .. autoapisummary:: oasislmf.pytools.common.event_stream.PIPE_CAPACITY oasislmf.pytools.common.event_stream.CDF_STREAM_ID oasislmf.pytools.common.event_stream.GUL_STREAM_ID oasislmf.pytools.common.event_stream.FM_STREAM_ID oasislmf.pytools.common.event_stream.LOSS_STREAM_ID oasislmf.pytools.common.event_stream.SUMMARY_STREAM_ID oasislmf.pytools.common.event_stream.ITEM_STREAM oasislmf.pytools.common.event_stream.COVERAGE_STREAM oasislmf.pytools.common.event_stream.MEAN_IDX oasislmf.pytools.common.event_stream.STD_DEV_IDX oasislmf.pytools.common.event_stream.TIV_IDX oasislmf.pytools.common.event_stream.MAX_LOSS_IDX Classes ------- .. autoapisummary:: oasislmf.pytools.common.event_stream.EventReader Functions --------- .. autoapisummary:: oasislmf.pytools.common.event_stream.stream_info_to_bytes oasislmf.pytools.common.event_stream.bytes_to_stream_types oasislmf.pytools.common.event_stream.read_stream_info oasislmf.pytools.common.event_stream.get_streams_in oasislmf.pytools.common.event_stream.get_and_check_header_in oasislmf.pytools.common.event_stream.init_streams_in oasislmf.pytools.common.event_stream.mv_read oasislmf.pytools.common.event_stream.mv_write oasislmf.pytools.common.event_stream.mv_write_summary_header oasislmf.pytools.common.event_stream.mv_write_item_header oasislmf.pytools.common.event_stream.mv_write_sidx_loss oasislmf.pytools.common.event_stream.mv_write_delimiter oasislmf.pytools.common.event_stream.write_mv_to_stream Module Contents --------------- .. py:data:: PIPE_CAPACITY :value: 65536 .. py:data:: CDF_STREAM_ID :value: 0 .. py:data:: GUL_STREAM_ID :value: 1 .. py:data:: FM_STREAM_ID :value: 2 .. py:data:: LOSS_STREAM_ID :value: 2 .. py:data:: SUMMARY_STREAM_ID :value: 3 .. py:data:: ITEM_STREAM :value: 1 .. py:data:: COVERAGE_STREAM :value: 2 .. py:data:: MEAN_IDX .. py:data:: STD_DEV_IDX .. py:data:: TIV_IDX .. py:data:: MAX_LOSS_IDX .. py:function:: stream_info_to_bytes(stream_source_type, stream_agg_type) From Stream source type and aggregation type produce the stream header Args: stream_source_type (np.int32): stream_agg_type (np.int32): Returns: return bytes .. py:function:: bytes_to_stream_types(stream_header) Read the stream header and return the information on stream type Args: stream_header: bytes Returns: (stream source type (np.int32), stream aggregation type (np.int32)) .. py:function:: read_stream_info(stream_obj) from open stream object return the information that characterize the stream (stream_source_type, stream_agg_type, len_sample) Args: stream_obj: open stream Returns: (stream_source_type, stream_agg_type, len_sample) as np.int32 triplet .. py:function:: get_streams_in(files_in, stack) .. py:function:: get_and_check_header_in(streams_in) .. py:function:: init_streams_in(files_in, stack) if files_in use stdin as stream in otherwise open each path in files_in, read the header, check that they are the same, and return the streams and their info Args: files_in: none or a list of path stack: contextlib stack to add the open stream to Returns: list of open streams and their info .. py:function:: mv_read(byte_mv, cursor, _dtype, itemsize) read a certain dtype from numpy byte view starting at cursor, return the value and the index of the end of the object Args: byte_mv: numpy byte view cursor: index of where the object start _dtype: data type of the object itemsize: size of the data type Returns: (object value, end of object index) .. py:function:: mv_write(byte_mv, cursor, _dtype, itemsize, value) -> int load an object into the numpy byte view at index cursor, return the index of the end of the object Args: byte_mv: numpy byte view cursor: index of where the object start _dtype: data type of the object itemsize: size of the data type value: value to write Returns: end of object index .. py:function:: mv_write_summary_header(byte_mv, cursor, event_id, summary_id, exposure_value) -> int write a summary header to the numpy byte view at index cursor, return the index of the end of the object Args: byte_mv: numpy byte view cursor: index of where the object start event_id: event id summary_id: summary id exposure_value: exposure value Returns: end of object index .. py:function:: mv_write_item_header(byte_mv, cursor, event_id, item_id) -> int write a item header to the numpy byte view at index cursor, return the index of the end of the object Args: byte_mv: numpy byte view cursor: index of where the object start event_id: event id item_id: item id Returns: end of object index .. py:function:: mv_write_sidx_loss(byte_mv, cursor, sidx, loss) -> int write sidx and loss to the numpy byte view at index cursor, return the index of the end of the object Args: byte_mv: numpy byte view cursor: index of where the object start sidx: sample id loss: loss Returns: end of object index .. py:function:: mv_write_delimiter(byte_mv, cursor) -> int write the item delimiter (0,0) to the numpy byte view at index cursor, return the index of the end of the object Args: byte_mv: numpy byte view cursor: index of where the object start Returns: end of delimiter index .. py:class:: EventReader Abstract class to read event stream This class provide a generic interface to read multiple event stream using: - selector : handle back pressure, the program is paused and don't use resource if nothing is in the stream buffer - memoryview : read a chuck (PIPE_CAPACITY) of data at a time then work on it using a numpy byte view of this buffer To use those methods need to be implemented: - __init__(self, ...) the constructor with all data structure needed to read and store the event stream - read_buffer(self, byte_mv, cursor, valid_buff, event_id, item_id) simply point to a local numba.jit function name read_buffer (a template is provided bellow) this function should implement the specific logic of where and how to store the event information. Those to method may be overwritten - item_exit(self): specific logic to do when an item is finished (only executed once the stream is finished but no 0,0 closure was present) - event_read_log(self): what kpi to log when a full event is read usage snippet: with ExitStack() as stack: streams_in, (stream_type, stream_agg_type, len_sample) = init_streams_in(files_in, stack) reader = CustomReader() for event_id in reader.read_streams(streams_in): .. py:method:: register_streams_in(selector_class, streams_in) :staticmethod: Data from input process is generally sent by event block, meaning once a stream receive data, the complete event is going to be sent in a short amount of time. Therefore, we can focus on each stream one by one using their specific selector 'stream_selector'. .. py:method:: read_streams(streams_in) read multiple stream input, yield each event id and load relevant value according to the specific read_buffer implemented in subclass Args: streams_in: streams to read Returns: event id generator .. py:method:: read_event(stream_in, main_selector, stream_selector, mv, byte_mv, cursor, valid_buff) read one event from stream_in close and remove the stream from main_selector when all is read Args: stream_in: stream to read main_selector: selector that contain all the streams stream_selector: this stream selector mv: buffer memoryview byte_mv: numpy byte view of the buffer cursor: current cursor of the memory view valid_buff: valid data in memory view Returns: event_id, cursor, valid_buff .. py:method:: read_buffer(byte_mv, cursor, valid_buff, event_id, item_id) :abstractmethod: .. py:method:: item_exit() .. py:method:: event_read_log(event_id) .. py:function:: write_mv_to_stream(stream, byte_mv, cursor) Write numpy byte array view to stream - use select to handle forward pressure - use a while loop in case the stream is non-blocking (meaning the ammount of byte written is not guarantied to be cursor len) Args: stream: stream to write to byte_mv: numpy byte view of the buffer to write cursor: ammount of byte to write