oasislmf.pytools.common.event_stream¶
Contain all common function and attribute to help read the event stream containing the losses
Attributes¶
Classes¶
Abstract class to read event stream |
Functions¶
|
From Stream source type and aggregation type produce the stream header |
|
Read the stream header and return the information on stream type |
|
from open stream object return the information that characterize the stream (stream_source_type, stream_agg_type, len_sample) |
|
|
|
|
|
if files_in use stdin as stream in |
|
read a certain dtype from numpy byte view starting at cursor, return the value and the index of the end of the object |
|
load an object into the numpy byte view at index cursor, return the index of the end of the object |
|
write a summary header to the numpy byte view at index cursor, return the index of the end of the object |
|
write a item header to the numpy byte view at index cursor, return the index of the end of the object |
|
write sidx and loss to the numpy byte view at index cursor, return the index of the end of the object |
|
write the item delimiter (0,0) to the numpy byte view at index cursor, return the index of the end of the object |
|
Write numpy byte array view to stream |
Module Contents¶
- oasislmf.pytools.common.event_stream.stream_info_to_bytes(stream_source_type, stream_agg_type)[source]¶
From Stream source type and aggregation type produce the stream header Args:
stream_source_type (np.int32): stream_agg_type (np.int32):
- Returns:
return bytes
- oasislmf.pytools.common.event_stream.bytes_to_stream_types(stream_header)[source]¶
Read the stream header and return the information on stream type Args:
stream_header: bytes
- Returns:
(stream source type (np.int32), stream aggregation type (np.int32))
- oasislmf.pytools.common.event_stream.read_stream_info(stream_obj)[source]¶
from open stream object return the information that characterize the stream (stream_source_type, stream_agg_type, len_sample) Args:
stream_obj: open stream
- Returns:
(stream_source_type, stream_agg_type, len_sample) as np.int32 triplet
- oasislmf.pytools.common.event_stream.init_streams_in(files_in, stack)[source]¶
if files_in use stdin as stream in otherwise open each path in files_in, read the header, check that they are the same, and return the streams and their info Args:
files_in: none or a list of path stack: contextlib stack to add the open stream to
- Returns:
list of open streams and their info
- oasislmf.pytools.common.event_stream.mv_read(byte_mv, cursor, _dtype, itemsize)[source]¶
read a certain dtype from numpy byte view starting at cursor, return the value and the index of the end of the object Args:
byte_mv: numpy byte view cursor: index of where the object start _dtype: data type of the object itemsize: size of the data type
- Returns:
(object value, end of object index)
- oasislmf.pytools.common.event_stream.mv_write(byte_mv, cursor, _dtype, itemsize, value) int [source]¶
load an object into the numpy byte view at index cursor, return the index of the end of the object Args:
byte_mv: numpy byte view cursor: index of where the object start _dtype: data type of the object itemsize: size of the data type value: value to write
- Returns:
end of object index
- oasislmf.pytools.common.event_stream.mv_write_summary_header(byte_mv, cursor, event_id, summary_id, exposure_value) int [source]¶
write a summary header to the numpy byte view at index cursor, return the index of the end of the object Args:
byte_mv: numpy byte view cursor: index of where the object start event_id: event id summary_id: summary id exposure_value: exposure value
- Returns:
end of object index
- oasislmf.pytools.common.event_stream.mv_write_item_header(byte_mv, cursor, event_id, item_id) int [source]¶
write a item header to the numpy byte view at index cursor, return the index of the end of the object Args:
byte_mv: numpy byte view cursor: index of where the object start event_id: event id item_id: item id
- Returns:
end of object index
- oasislmf.pytools.common.event_stream.mv_write_sidx_loss(byte_mv, cursor, sidx, loss) int [source]¶
write sidx and loss to the numpy byte view at index cursor, return the index of the end of the object Args:
byte_mv: numpy byte view cursor: index of where the object start sidx: sample id loss: loss
- Returns:
end of object index
- oasislmf.pytools.common.event_stream.mv_write_delimiter(byte_mv, cursor) int [source]¶
write the item delimiter (0,0) to the numpy byte view at index cursor, return the index of the end of the object Args:
byte_mv: numpy byte view cursor: index of where the object start
- Returns:
end of delimiter index
- class oasislmf.pytools.common.event_stream.EventReader[source]¶
Abstract class to read event stream
This class provide a generic interface to read multiple event stream using: - selector : handle back pressure, the program is paused and don’t use resource if nothing is in the stream buffer - memoryview : read a chuck (PIPE_CAPACITY) of data at a time then work on it using a numpy byte view of this buffer
To use those methods need to be implemented: - __init__(self, …) the constructor with all data structure needed to read and store the event stream - read_buffer(self, byte_mv, cursor, valid_buff, event_id, item_id)
simply point to a local numba.jit function name read_buffer (a template is provided bellow) this function should implement the specific logic of where and how to store the event information.
Those to method may be overwritten - item_exit(self):
specific logic to do when an item is finished (only executed once the stream is finished but no 0,0 closure was present)
- event_read_log(self):
what kpi to log when a full event is read
- usage snippet:
- with ExitStack() as stack:
streams_in, (stream_type, stream_agg_type, len_sample) = init_streams_in(files_in, stack) reader = CustomReader(<read relevant attributes>) for event_id in reader.read_streams(streams_in):
<event logic>
- static register_streams_in(selector_class, streams_in)[source]¶
Data from input process is generally sent by event block, meaning once a stream receive data, the complete event is going to be sent in a short amount of time. Therefore, we can focus on each stream one by one using their specific selector ‘stream_selector’.
- read_streams(streams_in)[source]¶
read multiple stream input, yield each event id and load relevant value according to the specific read_buffer implemented in subclass Args:
streams_in: streams to read
- Returns:
event id generator
- read_event(stream_in, main_selector, stream_selector, mv, byte_mv, cursor, valid_buff)[source]¶
read one event from stream_in close and remove the stream from main_selector when all is read Args:
stream_in: stream to read main_selector: selector that contain all the streams stream_selector: this stream selector mv: buffer memoryview byte_mv: numpy byte view of the buffer cursor: current cursor of the memory view valid_buff: valid data in memory view
- Returns:
event_id, cursor, valid_buff
- oasislmf.pytools.common.event_stream.write_mv_to_stream(stream, byte_mv, cursor)[source]¶
Write numpy byte array view to stream - use select to handle forward pressure - use a while loop in case the stream is non-blocking (meaning the ammount of byte written is not guarantied to be cursor len) Args:
stream: stream to write to byte_mv: numpy byte view of the buffer to write cursor: ammount of byte to write