LOCAL Student Department Advent Calendar Day 6
I happened to grow up while writing the article on the 11th day, so I'm going to fill in the empty space.
Official: zlib.net It is a library of compression algorithms used for Zip etc., and Deflate is implemented internally. Since binary data can be easily compressed, it may be used for communication. (I have never tried it) You can often see it when it comes to file compression.
license The zlib License is applied to zlib. It's a fairly loose license similar to MIT. Please check for details.
See here for more information.
compress(data: bytes, level: int = -1) -> bytes
Compresses back data
.
level
is the compression ratio.
Contains values from -1 to 9
, with a default value of -1
(equivalent to 6
as of December 5, 2019).
0
is uncompressed, and 9
has the highest compression ratio.
The higher the compression ratio, the longer it will take, so in most cases you can leave the default.
compress()
import zlib
data = b'test data\x00' #Arbitrary binary data
compressed = zlib.compress(data)
print(compressed) # b'x\x9c+I-.QHI,Id\x00\x00\x159\x03{'
decompress(data: bytes, wbits: int = 15, bufsize: int = 16384) -> bytes
Unzip and return data
.
The other arguments are basically fine by default.
bufsize
is incremented as needed.
decompress()
import zlib
data = b'test data\x00' #Arbitrary binary data
decompressed = zlib.decompress(zlib.compress(data))
print(decompressed) # b'test data\x00'
compressobj(level: int = -1, method: int = 8, wbits: int = 15, memLevel: int = 8, strategy: int = 0, zdict: bytes = ...) -> _Compress
Returns a compressed object for compressing data that cannot be stored in memory at once.
level
is the same ascompress ()
.
method
is a compression algorithm and as of December 5, 2019, the only supported value is DEFLATED = 8
zdict
is a predefined compressed dictionary, a sequence of bytes that you expect to appear repeatedly in your data.
compressobj()
import zlib
import io
data_stream = io.BytesIO(b'test data\x00')
cobj = zlib.compressobj()
compressed = b''
while True:
tmp = data_stream.read(64)
if not tmp:
compressed += cobj.flush()
break
compressed += cobj.compress(tmp)
print(compressed) # b'x\x9c+I-.QHI,Id\x00\x00\x159\x03{'
Forgetting the last flush ()
can result in incomplete data.
decompressobj(wbits: int = 15, zdict: bytes = ...) -> _Decompress
The zdict
must be the same as that used incompressobj ()
.
Also, do not change the object passed to zdict between the call to decompressobj ()
and the first call to decompress ()
.
decompressobj()
import zlib
import io
data_stream = io.BytesIO(zlib.compress(b'test data\x00'))
dobj = zlib.decompressobj()
decompressed = b''
while True:
tmp = data_stream.read(64)
if not tmp:
decompressed += dobj.flush()
break
while True:
if not tmp:
break
decompressed += dobj.decompress(tmp)
tmp = dobj.unconsumed_tail
print(decompressed) # b'test data\x00'
The bytes that did not fit in the buffer and were not processed by the decompress ()
call go into ʻunconsumed_tail`.
It is saved in the order of header, filename & path, compressed_file
, and this block is repeated for the number of files.
file_header
| 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 |
|---------------------------------------|
| name_len(uint_32) | file_len(uint_32) |
|---------------------------------------|
It can be used with python mcp.py TARGET [-o OUTPUT]
.
TARGET
is the path to the file or directory.
I did not write it for actual use, so if you use it, please do so at your own risk.
Decompression is done on the 11th Advent calendar.
mcp.py
import sys
import argparse
import os
import zlib
from ctypes import *
import random
import string
import glob
import io
import shutil
tmp_dir = ''.join(random.choices(
string.ascii_letters + string.digits, k=64))+'_mcptmp'
def main():
p = argparse.ArgumentParser(
description='Compress file and dir', usage='Add target to Command line arguments')
p.add_argument('target', help='Compression target')
p.add_argument('--out', '-o', help='Output file path',
default='compressed.mcp')
if len(sys.argv) < 2:
p.print_help()
target = p.parse_args().target
out = p.parse_args().out
if os.path.isfile(target):
_compress_file(target, out)
elif os.path.isdir(target):
_compress_dir(target, out)
else:
raise Exception('Argument error')
def _compress_file(path: str, out: str):
_create_mtp(os.path.basename(path), path)
size = os.path.getsize(os.path.join(tmp_dir, os.path.basename(path)))
with open(os.path.join(tmp_dir, os.path.basename(path)), 'rb') as t:
with open(out, 'wb') as o:
o.write(_make_file_header(size, os.path.basename(path)))
while True:
tmp = t.read(1024)
if not tmp:
o.flush()
break
o.write(tmp)
def _make_file_header(file_len: int, filename: str) -> bytes:
filename_len = len(filename)
return bytes(FileHeaderStructure(filename_len, file_len)) + filename.encode('UTF-8')
def _compress_dir(path: str, out: str):
files = [p[len(path)-1 + len(os.sep):] for p in glob.glob(
os.path.join(path, '**'), recursive=True) if os.path.isfile(p)]
for f in files:
os.makedirs(os.path.join(tmp_dir, os.path.dirname(f)), exist_ok=True)
_create_mtp(f, os.path.join(path, f))
with open(out, 'wb') as o:
for f in files:
o.write(_make_file_header(
os.path.getsize(os.path.join(tmp_dir, f)), f))
with open(os.path.join(tmp_dir, f), 'rb') as t:
while True:
tmp = t.read(1024)
if not tmp:
break
o.write(tmp)
o.flush()
def _create_mtp(path: str, source: str):
c = zlib.compressobj()
with open(source, mode='rb') as f:
with open(os.path.join(tmp_dir, path), mode='wb') as o:
while True:
t = f.read(1024)
if not t:
o.write(c.flush())
break
ced = c.compress(t)
if ced:
o.write(ced)
def _rem_tmp():
shutil.rmtree(tmp_dir)
class FileHeaderStructure(Structure):
_fields_ = (
('filename_len', c_uint32),
('file_len', c_uint32)
)
if __name__ == "__main__":
main()
_rem_tmp()
I can't think of a way to get the size after compression, so I output the compressed one to a file and get the size of that file.
If you put the compressed one in memory, you can get it with len ()
, but then it makes no sense to use compressobj ()
...
I had a hard time creating the headers attached to the data in the file.
I'm not good at this kind of thing in Python, so I'm talking about doing it in C ++.
There is no structure in Python, but it seems that you can create something like that using a class that inherits Structure.
from ctypes import *
and write the structure in _fields_
.
There seems to be struct.pack (format, values ...)
, but it seems that it only supports ~~ integers (usually usable) ~~ It seems that almost all major types support (Documentation.
Recommended Posts