Source code for oasislmf.pytools.converters.bintocsv.utils.vulnerability
importsysimportzlibimportnumpyasnpfromoasislmf.pytools.common.dataimportDEFAULT_BUFFER_SIZE,resolve_file,write_ndarray_to_fmt_csvfromoasislmf.pytools.converters.dataimportTOOL_INFOfromoasislmf.pytools.getmodel.managerimportVulnerabilityIndex,VulnerabilityRow_dtypefromoasislmf.utils.exceptionsimportOasisExceptiondef_read_bin_idx_zip(file_in,idx_file_in,dtype):iffile_in==sys.stdin.buffer:bin_data=np.frombuffer(file_in.read(),dtype="u1")else:bin_data=np.fromfile(file_in,dtype="u1")idx_data=np.memmap(idx_file_in,dtype=VulnerabilityIndex)num_rows=np.sum(idx_data["original_size"])//VulnerabilityRow_dtype.itemsizedata=np.empty(num_rows,dtype=dtype)data_index=0forrowinidx_data:vuln_id=row["vulnerability_id"]offset=row["offset"]size=row["size"]original_size=row["original_size"]num_data=original_size//VulnerabilityRow_dtype.itemsizezipped_data=bin_data[offset:offset+size]unzipped_data=zlib.decompress(zipped_data)vuln_row_data=np.frombuffer(unzipped_data,dtype=VulnerabilityRow_dtype)data[data_index:data_index+num_data]["vulnerability_id"]=vuln_iddata[data_index:data_index+num_data]["intensity_bin_id"]=vuln_row_data["intensity_bin_id"]data[data_index:data_index+num_data]["damage_bin_id"]=vuln_row_data["damage_bin_id"]data[data_index:data_index+num_data]["probability"]=vuln_row_data["probability"]data_index+=num_datareturndatadef_read_bin_idx(file_in,idx_file_in,dtype):iffile_in==sys.stdin.buffer:bin_data=np.frombuffer(file_in.read(),dtype="u1")else:bin_data=np.fromfile(file_in,dtype="u1")idx_data=np.memmap(idx_file_in,dtype=VulnerabilityIndex)num_rows=np.sum(idx_data["size"])//VulnerabilityRow_dtype.itemsizedata=np.empty(num_rows,dtype=dtype)data_index=0forrowinidx_data:vuln_id=row["vulnerability_id"]offset=row["offset"]size=row["size"]num_data=size//VulnerabilityRow_dtype.itemsizevuln_row_data=bin_data[offset:offset+size].view(dtype=VulnerabilityRow_dtype)data[data_index:data_index+num_data]["vulnerability_id"]=vuln_iddata[data_index:data_index+num_data]["intensity_bin_id"]=vuln_row_data["intensity_bin_id"]data[data_index:data_index+num_data]["damage_bin_id"]=vuln_row_data["damage_bin_id"]data[data_index:data_index+num_data]["probability"]=vuln_row_data["probability"]data_index+=num_datareturndatadef_read_bin(file_in,dtype):# Offet 4 bytes for damage bins int32iffile_in==sys.stdin.buffer:returnnp.frombuffer(file_in.read(),dtype=dtype,offset=4)else:returnnp.fromfile(file_in,dtype=dtype,offset=4)
[docs]defvulnerability_tocsv(stack,file_in,file_out,file_type,noheader,idx_file_in,zip_files):headers=TOOL_INFO[file_type]["headers"]dtype=TOOL_INFO[file_type]["dtype"]fmt=TOOL_INFO[file_type]["fmt"]file_in=resolve_file(file_in,"rb",stack)ifnotnoheader:file_out.write(",".join(headers)+"\n")ifzip_files:ifidx_file_inisNone:raiseOasisException(f"Error: Cannot read zip files without provided idx_file_in zip path, currently {idx_file_in}")data=_read_bin_idx_zip(file_in,idx_file_in,dtype)else:ifidx_file_inisnotNone:data=_read_bin_idx(file_in,idx_file_in,dtype)else:data=_read_bin(file_in,dtype)num_rows=data.shape[0]buffer_size=DEFAULT_BUFFER_SIZEforstartinrange(0,num_rows,buffer_size):end=min(start+buffer_size,num_rows)buffer_data=data[start:end]write_ndarray_to_fmt_csv(file_out,buffer_data,headers,fmt)