Contain all common function and attribute to help read the event stream containing the losses
import selectors
from select import select
import sys
import numpy as np
import numba as nb
from .data import oasis_int, oasis_int_size, oasis_float, oasis_float_size
# streams
PIPE_CAPACITY = 65536 # bytes
# stream source type
GUL_STREAM_ID = 1 # deprecated use LOSS_STREAM_ID
FM_STREAM_ID = 2 # deprecated use LOSS_STREAM_ID
# stream aggregation type (represent the time of aggregation used
# special sample id
def stream_info_to_bytes(stream_source_type, stream_agg_type):
From Stream source type and aggregation type produce the stream header
stream_source_type (np.int32):
stream_agg_type (np.int32):
return bytes
return np.array([stream_agg_type], '<i4').tobytes()[:3] + np.int8(stream_source_type).tobytes()
def bytes_to_stream_types(stream_header):
Read the stream header and return the information on stream type
stream_header: bytes
(stream source type (np.int32), stream aggregation type (np.int32))
return np.frombuffer(stream_header[3:], 'i1')[0], np.frombuffer(stream_header[:3] + b'\x00', '<i4')[0]
def read_stream_info(stream_obj):
from open stream object return the information that characterize the stream (stream_source_type, stream_agg_type, len_sample)
stream_obj: open stream
(stream_source_type, stream_agg_type, len_sample) as np.int32 triplet
stream_source_type, stream_agg_type = bytes_to_stream_types(stream_obj.read(4))
len_sample = np.frombuffer(stream_obj.read(4), dtype=np.int32)[0]
return stream_source_type, stream_agg_type, len_sample
def get_streams_in(files_in, stack):
if files_in is None:
streams_in = [sys.stdin.buffer]
elif isinstance(files_in, list):
streams_in = [stack.enter_context(open(file_in, 'rb')) for file_in in files_in]
streams_in = [stack.enter_context(open(files_in, 'rb'))]
return streams_in
def get_and_check_header_in(streams_in):
streams_info = [read_stream_info(stream_in) for stream_in in streams_in]
if len(set(streams_info)) > 1:
raise IOError(f"multiple stream type detected in streams {dict(enumerate(streams_info))}")
return streams_info[0]
def init_streams_in(files_in, stack):
if files_in use stdin as stream in
otherwise open each path in files_in, read the header, check that they are the same, and return the streams and their info
files_in: none or a list of path
stack: contextlib stack to add the open stream to
list of open streams and their info
streams_in = get_streams_in(files_in, stack)
return streams_in, get_and_check_header_in(streams_in)
@nb.jit(nopython=True, cache=True)
def mv_read(byte_mv, cursor, _dtype, itemsize):
read a certain dtype from numpy byte view starting at cursor, return the value and the index of the end of the object
byte_mv: numpy byte view
cursor: index of where the object start
_dtype: data type of the object
itemsize: size of the data type
(object value, end of object index)
return byte_mv[cursor:cursor + itemsize].view(_dtype)[0], cursor + itemsize
@nb.jit(nopython=True, cache=True)
def mv_write(byte_mv, cursor, _dtype, itemsize, value) -> int:
load an object into the numpy byte view at index cursor, return the index of the end of the object
byte_mv: numpy byte view
cursor: index of where the object start
_dtype: data type of the object
itemsize: size of the data type
value: value to write
end of object index
byte_mv[cursor:cursor + itemsize].view(_dtype)[0] = value
return cursor + itemsize
@nb.jit(nopython=True, cache=True)
@nb.jit(nopython=True, cache=True)
@nb.jit(nopython=True, cache=True)
def mv_write_sidx_loss(byte_mv, cursor, sidx, loss) -> int:
write sidx and loss to the numpy byte view at index cursor, return the index of the end of the object
byte_mv: numpy byte view
cursor: index of where the object start
sidx: sample id
loss: loss
end of object index
# print(' ', sidx, loss)
cursor = mv_write(byte_mv, cursor, oasis_int, oasis_int_size, sidx)
cursor = mv_write(byte_mv, cursor, oasis_float, oasis_float_size, loss)
return cursor
@nb.jit(nopython=True, cache=True)
def mv_write_delimiter(byte_mv, cursor) -> int:
write the item delimiter (0,0) to the numpy byte view at index cursor, return the index of the end of the object
byte_mv: numpy byte view
cursor: index of where the object start
end of delimiter index
cursor = mv_write(byte_mv, cursor, oasis_int, oasis_int_size, 0)
cursor = mv_write(byte_mv, cursor, oasis_float, oasis_float_size, 0)
# print('end', cursor)
return cursor
class EventReader:
Abstract class to read event stream
This class provide a generic interface to read multiple event stream using:
- selector : handle back pressure, the program is paused and don't use resource if nothing is in the stream buffer
- memoryview : read a chuck (PIPE_CAPACITY) of data at a time then work on it using a numpy byte view of this buffer
To use those methods need to be implemented:
- __init__(self, ...) the constructor with all data structure needed to read and store the event stream
- read_buffer(self, byte_mv, cursor, valid_buff, event_id, item_id)
simply point to a local numba.jit function name read_buffer (a template is provided bellow)
this function should implement the specific logic of where and how to store the event information.
Those to method may be overwritten
- item_exit(self):
specific logic to do when an item is finished (only executed once the stream is finished but no 0,0 closure was present)
- event_read_log(self):
what kpi to log when a full event is read
usage snippet:
with ExitStack() as stack:
streams_in, (stream_type, stream_agg_type, len_sample) = init_streams_in(files_in, stack)
reader = CustomReader(<read relevant attributes>)
for event_id in reader.read_streams(streams_in):
<event logic>
def register_streams_in(selector_class, streams_in):
Data from input process is generally sent by event block, meaning once a stream receive data, the complete event is
going to be sent in a short amount of time.
Therefore, we can focus on each stream one by one using their specific selector 'stream_selector'.
main_selector = selector_class()
stream_data = []
for file_idx, stream_in in enumerate(streams_in):
mv = memoryview(bytearray(PIPE_CAPACITY))
byte_mv = np.frombuffer(buffer=mv, dtype='b')
stream_selector = selector_class()
stream_selector.register(stream_in, selectors.EVENT_READ)
data = {'mv': mv,
'byte_mv': byte_mv,
'cursor': 0,
'valid_buff': 0,
'stream_selector': stream_selector,
'file_idx': file_idx,
main_selector.register(stream_in, selectors.EVENT_READ, data)
return main_selector, stream_data
def read_streams(self, streams_in):
read multiple stream input, yield each event id and load relevant value according to the specific read_buffer implemented in subclass
streams_in: streams to read
event id generator
main_selector, stream_data = self.register_streams_in(selectors.DefaultSelector, streams_in)
self.logger.debug("Streams read with DefaultSelector")
except PermissionError: # Fall back option if stream_in contain regular files
main_selector, stream_data = self.register_streams_in(selectors.SelectSelector, streams_in)
self.logger.debug("Streams read with SelectSelector")
while main_selector.get_map():
for sKey, _ in main_selector.select():
event = self.read_event(sKey.fileobj, main_selector, **sKey.data)
if event:
event_id, cursor, valid_buff = event
sKey.data['cursor'] = cursor
sKey.data['valid_buff'] = valid_buff
yield event_id
# Stream is read, we need to check if there is remaining event to be parsed
for data in stream_data:
if data['cursor'] < data['valid_buff']:
byte_mv = data['byte_mv']
cursor = data['cursor']
valid_buff = data['valid_buff']
file_idx = data['file_idx']
yield_event = True
while yield_event:
cursor, event_id, item_id, yield_event = self.read_buffer(byte_mv, cursor, valid_buff, 0, 0, file_idx=file_idx)
if event_id:
yield event_id
def read_event(self, stream_in, main_selector, stream_selector, mv, byte_mv, cursor, valid_buff, file_idx):
read one event from stream_in
close and remove the stream from main_selector when all is read
stream_in: stream to read
main_selector: selector that contain all the streams
stream_selector: this stream selector
mv: buffer memoryview
byte_mv: numpy byte view of the buffer
cursor: current cursor of the memory view
valid_buff: valid data in memory view
file_idx: file index
event_id, cursor, valid_buff
event_id = 0
item_id = 0
while True:
if valid_buff < PIPE_CAPACITY:
len_read = stream_in.readinto1(mv[valid_buff:])
valid_buff += len_read
if len_read == 0:
if event_id:
return event_id, cursor, valid_buff
cursor, event_id, item_id, yield_event = self.read_buffer(byte_mv, cursor, valid_buff, event_id, item_id, file_idx=file_idx)
if yield_event:
if 2 * cursor > valid_buff:
mv[:valid_buff - cursor] = mv[cursor: valid_buff]
valid_buff -= cursor
cursor = 0
return event_id, cursor, valid_buff
mv[:valid_buff - cursor] = mv[cursor: valid_buff]
valid_buff -= cursor
cursor = 0
def read_buffer(self, byte_mv, cursor, valid_buff, event_id, item_id, **kwargs):
raise NotImplementedError
def item_exit(self):
def event_read_log(self, event_id):
# read_buffer template,
# add you input argument and implement what is needed in the three steps ('do new item setup', 'do loss read', 'do item exit')
# @nb.jit(nopython=True, cache=True)
# def read_buffer(byte_mv, cursor, valid_buff, event_id, item_id, [array necessary to load and store the event data]):
# last_event_id = event_id
# while True:
# if item_id:
# if valid_buff - cursor < (oasis_int_size + oasis_float_size):
# break
# sidx, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size)
# if sidx:
# loss, cursor = mv_read(byte_mv, cursor, oasis_float, oasis_float_size)
# loss = 0 if np.isnan(loss) else loss
# ###### do loss read ######
# ##########
# else:
# ##### do item exit ####
# ##########
# cursor += oasis_float_size
# item_id = 0
# else:
# if valid_buff - cursor < 2 * oasis_int_size:
# break
# event_id, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size)
# if event_id != last_event_id:
# if last_event_id: # we have a new event we return the one we just finished
# return cursor - oasis_int_size, last_event_id, 0, 1
# else: # first pass we store the event we are reading
# last_event_id = event_id
# item_id, cursor = mv_read(byte_mv, cursor, oasis_int, oasis_int_size)
# ##### do new item setup #####
# ##########
# return cursor, event_id, item_id, 0
def write_mv_to_stream(stream, byte_mv, cursor):
Write numpy byte array view to stream
- use select to handle forward pressure
- use a while loop in case the stream is non-blocking (meaning the ammount of byte written is not guarantied to be cursor len)
stream: stream to write to
byte_mv: numpy byte view of the buffer to write
cursor: ammount of byte to write
written = 0
while written < cursor:
_, writable, exceptional = select([], [stream], [stream])
if exceptional:
raise IOError(f'error with input stream, {exceptional}')
written += stream.write(byte_mv[written:cursor].tobytes())