import numpy as np
from numpy.testing import assert_allclose
from oasislmf.pytools.common.data import oasis_float
from .stream_sparse import event_agg_dtype, sidx_loss_dtype
from .common import EXTRA_VALUES
from .financial_structure import load_static
[docs]
def stream_to_dict_array(stream_obj):
stream_type = stream_obj.read(4)
len_sample = np.frombuffer(stream_obj.read(4), dtype=np.int32)[0]
buf = bytearray(8)
mv = memoryview(buf)
event_agg = np.ndarray(1, buffer=mv, dtype=event_agg_dtype)
sidx_loss = np.ndarray(1, buffer=mv, dtype=sidx_loss_dtype)
event_id_last = 0
event_id, agg_id = 0, 0
dict_array = {}
while stream_obj.readinto(mv):
if agg_id:
sidx, loss = sidx_loss[0]
if sidx == -3:
sidx = 0
elif sidx == -2:
continue
elif sidx == 0:
agg_id = 0
continue
cur_array[sidx] = 0 if np.isnan(loss) else loss
else:
event_id, agg_id = event_agg[0]
# if event_id_last != event_id:
# if event_id_last:
# break
# else:
# event_id_last = event_id
cur_array = np.zeros(len_sample + EXTRA_VALUES, dtype=oasis_float)
dict_array[(event_id, agg_id)] = cur_array
return stream_type, len_sample, dict_array
[docs]
def round_dict_array(dict_array, precision):
for key, values in dict_array.items():
values.round(decimals=precision, out=values)
[docs]
def dict_array_to_np_array(dict_array, len_sample):
res = np.empty(len(dict_array), dtype=np.dtype(f"i4, i4, ({len_sample + EXTRA_VALUES})f4"))
for i, (event_id, agg_id) in enumerate(sorted(dict_array)):
res[i] = event_id, agg_id, dict_array[(event_id, agg_id)]
return res
[docs]
def compare_streams(gul_stream, fm_stream_obj1, fm_stream_obj2, precision):
fm_programme, fm_policytc, fm_profile, fm_xref, _, _ = load_static('./input')
_, _, dict_array_gul = stream_to_dict_array(gul_stream)
stream_type1, len_sample1, dict_array1 = stream_to_dict_array(fm_stream_obj1)
stream_type2, len_sample2, dict_array2 = stream_to_dict_array(fm_stream_obj2)
if stream_type1 != stream_type2:
return f"stream have different type: {stream_type1}, {stream_type2}"
if len_sample1 != len_sample2:
return f"stream have different len_sample: {len_sample1}, {len_sample2}"
keys1 = set(dict_array1)
keys2 = set(dict_array2)
missing_in_1 = keys2 - keys1
missing_in_2 = keys1 - keys2
if missing_in_1 or missing_in_2:
msg = "some event_id, agg_id are not matching\n"
for i, missing in enumerate([missing_in_1, missing_in_2]):
if missing:
msg += f" {len(missing)} missing in {i+1} : {sorted(missing)[:10]}" \
f"{'...' if len(missing)>10 else ''}\n"
return msg
# round_dict_array(dict_array1, precision)
# round_dict_array(dict_array2, precision)
msg_list = []
mismatch = 0
for i, key in enumerate(dict_array1):
try:
assert_allclose(dict_array1[key], dict_array2[key], precision)
except AssertionError:
mismatch += 1
msg_list.append(f"value mismatch for {key} index {i}:\n\t{dict_array_gul.get(key)}\n\t{dict_array1[key]}\n\t{dict_array2[key]}")
output_id, agg_id, layer_id = np.extract(fm_xref['output_id'] == key[1], fm_xref)[0]
cur_level, cur_agg_id = 1, agg_id
while True:
policytc = np.extract(np.logical_and(fm_policytc['level_id'] == cur_level,
fm_policytc['agg_id'] == agg_id,
np.logical_or(fm_policytc['layer_id'] == layer_id, fm_policytc['layer_id'] == 1)),
fm_policytc)
if policytc.shape[0] > 1:
profile_id = np.extract(policytc['layer_id'] == layer_id, policytc['profile_id'])
true_layer = layer_id
else:
profile_id = np.extract(policytc['layer_id'] == 1, policytc['profile_id'])
true_layer = 1
profile = np.extract(fm_profile['profile_id'] == profile_id, fm_profile)
msg_list.append(str((cur_level, agg_id, true_layer, profile)))
cur_level += 1
parent = np.extract(np.logical_and(fm_programme['from_agg_id'] == agg_id,
fm_programme['level_id'] == cur_level),
fm_programme)
if not parent:
break
else:
brothers = np.extract(np.logical_and(fm_programme['to_agg_id'] == parent['to_agg_id'],
fm_programme['level_id'] == cur_level),
fm_programme)
msg_list.append(f"prothers {brothers}")
if mismatch > 10:
msg_list.append("...")
break
return "\n".join(msg_list)