Source code for oasislmf.utils.compress
__all__ = [
'compress_string',
'decompress_string',
'CHUNK_SIZE',
]
import zlib
from .exceptions import OasisException
[docs]
CHUNK_SIZE = 5 * 10 ** 8 # 500 Mb
[docs]
def compress_string(st: str) -> bytes:
"""
Compresses strings using the zlib library.
Adapted from a StackOverflow.com solution by Dmitry Skryabin
https://stackoverflow.com/a/36056646/7556955
with a modification to set block/chunk size to 500 Mb (5 x 10^8 bytes).
:param s: Input string to be compressed
:type s: str
:return: Compressed string as bytes
:rtype: bytes
"""
_st = ''.join(st).encode('utf-8')
compressed = b''
begin = 0
compressor = zlib.compressobj()
try:
while begin < len(_st):
compressed += compressor.compress(_st[begin:begin + CHUNK_SIZE])
begin += CHUNK_SIZE
compressed += compressor.flush()
except zlib.error as e:
raise OasisException("Exception raised in 'compress_string'", e)
return compressed
[docs]
def decompress_string(bt: bytes) -> str:
"""
Decompresses zlib-compressed strings
:param bt: zlib-compressed string
:type bt: bytes
:return: Decompressed (Unicode) string
:rtype: str
"""
decompressor = zlib.decompressobj()
return decompressor.decompress(bt).decode('utf-8')