mirror of
https://github.com/TheAlgorithms/Python.git
synced 2025-07-07 11:37:36 +08:00
Prep for Python 3.14: Rename compression to data_compression (#12725)
* Prep for Python 3.14: Rename compression to data_compression * updating DIRECTORY.md --------- Co-authored-by: cclauss <cclauss@users.noreply.github.com>
This commit is contained in:
10
data_compression/README.md
Normal file
10
data_compression/README.md
Normal file
@ -0,0 +1,10 @@
|
||||
# Compression
|
||||
|
||||
Data compression is everywhere, you need it to store data without taking too much space.
|
||||
Either the compression loses some data (then we talk about lossy compression, such as .jpg) or it does not (and then it is lossless compression, such as .png)
|
||||
|
||||
Lossless compression is mainly used for archive purpose as it allows storing data without losing information about the file archived. On the other hand, lossy compression is used for transfer of file where quality isn't necessarily what is required (i.e: images on Twitter).
|
||||
|
||||
* <https://www.sciencedirect.com/topics/computer-science/compression-algorithm>
|
||||
* <https://en.wikipedia.org/wiki/Data_compression>
|
||||
* <https://en.wikipedia.org/wiki/Pigeonhole_principle>
|
0
data_compression/__init__.py
Normal file
0
data_compression/__init__.py
Normal file
177
data_compression/burrows_wheeler.py
Normal file
177
data_compression/burrows_wheeler.py
Normal file
@ -0,0 +1,177 @@
|
||||
"""
|
||||
https://en.wikipedia.org/wiki/Burrows%E2%80%93Wheeler_transform
|
||||
|
||||
The Burrows-Wheeler transform (BWT, also called block-sorting compression)
|
||||
rearranges a character string into runs of similar characters. This is useful
|
||||
for compression, since it tends to be easy to compress a string that has runs
|
||||
of repeated characters by techniques such as move-to-front transform and
|
||||
run-length encoding. More importantly, the transformation is reversible,
|
||||
without needing to store any additional data except the position of the first
|
||||
original character. The BWT is thus a "free" method of improving the efficiency
|
||||
of text compression algorithms, costing only some extra computation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
class BWTTransformDict(TypedDict):
|
||||
bwt_string: str
|
||||
idx_original_string: int
|
||||
|
||||
|
||||
def all_rotations(s: str) -> list[str]:
|
||||
"""
|
||||
:param s: The string that will be rotated len(s) times.
|
||||
:return: A list with the rotations.
|
||||
:raises TypeError: If s is not an instance of str.
|
||||
Examples:
|
||||
|
||||
>>> all_rotations("^BANANA|") # doctest: +NORMALIZE_WHITESPACE
|
||||
['^BANANA|', 'BANANA|^', 'ANANA|^B', 'NANA|^BA', 'ANA|^BAN', 'NA|^BANA',
|
||||
'A|^BANAN', '|^BANANA']
|
||||
>>> all_rotations("a_asa_da_casa") # doctest: +NORMALIZE_WHITESPACE
|
||||
['a_asa_da_casa', '_asa_da_casaa', 'asa_da_casaa_', 'sa_da_casaa_a',
|
||||
'a_da_casaa_as', '_da_casaa_asa', 'da_casaa_asa_', 'a_casaa_asa_d',
|
||||
'_casaa_asa_da', 'casaa_asa_da_', 'asaa_asa_da_c', 'saa_asa_da_ca',
|
||||
'aa_asa_da_cas']
|
||||
>>> all_rotations("panamabanana") # doctest: +NORMALIZE_WHITESPACE
|
||||
['panamabanana', 'anamabananap', 'namabananapa', 'amabananapan',
|
||||
'mabananapana', 'abananapanam', 'bananapanama', 'ananapanamab',
|
||||
'nanapanamaba', 'anapanamaban', 'napanamabana', 'apanamabanan']
|
||||
>>> all_rotations(5)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: The parameter s type must be str.
|
||||
"""
|
||||
if not isinstance(s, str):
|
||||
raise TypeError("The parameter s type must be str.")
|
||||
|
||||
return [s[i:] + s[:i] for i in range(len(s))]
|
||||
|
||||
|
||||
def bwt_transform(s: str) -> BWTTransformDict:
|
||||
"""
|
||||
:param s: The string that will be used at bwt algorithm
|
||||
:return: the string composed of the last char of each row of the ordered
|
||||
rotations and the index of the original string at ordered rotations list
|
||||
:raises TypeError: If the s parameter type is not str
|
||||
:raises ValueError: If the s parameter is empty
|
||||
Examples:
|
||||
|
||||
>>> bwt_transform("^BANANA")
|
||||
{'bwt_string': 'BNN^AAA', 'idx_original_string': 6}
|
||||
>>> bwt_transform("a_asa_da_casa")
|
||||
{'bwt_string': 'aaaadss_c__aa', 'idx_original_string': 3}
|
||||
>>> bwt_transform("panamabanana")
|
||||
{'bwt_string': 'mnpbnnaaaaaa', 'idx_original_string': 11}
|
||||
>>> bwt_transform(4)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: The parameter s type must be str.
|
||||
>>> bwt_transform('')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: The parameter s must not be empty.
|
||||
"""
|
||||
if not isinstance(s, str):
|
||||
raise TypeError("The parameter s type must be str.")
|
||||
if not s:
|
||||
raise ValueError("The parameter s must not be empty.")
|
||||
|
||||
rotations = all_rotations(s)
|
||||
rotations.sort() # sort the list of rotations in alphabetically order
|
||||
# make a string composed of the last char of each rotation
|
||||
response: BWTTransformDict = {
|
||||
"bwt_string": "".join([word[-1] for word in rotations]),
|
||||
"idx_original_string": rotations.index(s),
|
||||
}
|
||||
return response
|
||||
|
||||
|
||||
def reverse_bwt(bwt_string: str, idx_original_string: int) -> str:
|
||||
"""
|
||||
:param bwt_string: The string returned from bwt algorithm execution
|
||||
:param idx_original_string: A 0-based index of the string that was used to
|
||||
generate bwt_string at ordered rotations list
|
||||
:return: The string used to generate bwt_string when bwt was executed
|
||||
:raises TypeError: If the bwt_string parameter type is not str
|
||||
:raises ValueError: If the bwt_string parameter is empty
|
||||
:raises TypeError: If the idx_original_string type is not int or if not
|
||||
possible to cast it to int
|
||||
:raises ValueError: If the idx_original_string value is lower than 0 or
|
||||
greater than len(bwt_string) - 1
|
||||
|
||||
>>> reverse_bwt("BNN^AAA", 6)
|
||||
'^BANANA'
|
||||
>>> reverse_bwt("aaaadss_c__aa", 3)
|
||||
'a_asa_da_casa'
|
||||
>>> reverse_bwt("mnpbnnaaaaaa", 11)
|
||||
'panamabanana'
|
||||
>>> reverse_bwt(4, 11)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: The parameter bwt_string type must be str.
|
||||
>>> reverse_bwt("", 11)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: The parameter bwt_string must not be empty.
|
||||
>>> reverse_bwt("mnpbnnaaaaaa", "asd") # doctest: +NORMALIZE_WHITESPACE
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: The parameter idx_original_string type must be int or passive
|
||||
of cast to int.
|
||||
>>> reverse_bwt("mnpbnnaaaaaa", -1)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: The parameter idx_original_string must not be lower than 0.
|
||||
>>> reverse_bwt("mnpbnnaaaaaa", 12) # doctest: +NORMALIZE_WHITESPACE
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: The parameter idx_original_string must be lower than
|
||||
len(bwt_string).
|
||||
>>> reverse_bwt("mnpbnnaaaaaa", 11.0)
|
||||
'panamabanana'
|
||||
>>> reverse_bwt("mnpbnnaaaaaa", 11.4)
|
||||
'panamabanana'
|
||||
"""
|
||||
if not isinstance(bwt_string, str):
|
||||
raise TypeError("The parameter bwt_string type must be str.")
|
||||
if not bwt_string:
|
||||
raise ValueError("The parameter bwt_string must not be empty.")
|
||||
try:
|
||||
idx_original_string = int(idx_original_string)
|
||||
except ValueError:
|
||||
raise TypeError(
|
||||
"The parameter idx_original_string type must be int or passive"
|
||||
" of cast to int."
|
||||
)
|
||||
if idx_original_string < 0:
|
||||
raise ValueError("The parameter idx_original_string must not be lower than 0.")
|
||||
if idx_original_string >= len(bwt_string):
|
||||
raise ValueError(
|
||||
"The parameter idx_original_string must be lower than len(bwt_string)."
|
||||
)
|
||||
|
||||
ordered_rotations = [""] * len(bwt_string)
|
||||
for _ in range(len(bwt_string)):
|
||||
for i in range(len(bwt_string)):
|
||||
ordered_rotations[i] = bwt_string[i] + ordered_rotations[i]
|
||||
ordered_rotations.sort()
|
||||
return ordered_rotations[idx_original_string]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
entry_msg = "Provide a string that I will generate its BWT transform: "
|
||||
s = input(entry_msg).strip()
|
||||
result = bwt_transform(s)
|
||||
print(
|
||||
f"Burrows Wheeler transform for string '{s}' results "
|
||||
f"in '{result['bwt_string']}'"
|
||||
)
|
||||
original_string = reverse_bwt(result["bwt_string"], result["idx_original_string"])
|
||||
print(
|
||||
f"Reversing Burrows Wheeler transform for entry '{result['bwt_string']}' "
|
||||
f"we get original string '{original_string}'"
|
||||
)
|
92
data_compression/huffman.py
Normal file
92
data_compression/huffman.py
Normal file
@ -0,0 +1,92 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
class Letter:
|
||||
def __init__(self, letter: str, freq: int):
|
||||
self.letter: str = letter
|
||||
self.freq: int = freq
|
||||
self.bitstring: dict[str, str] = {}
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.letter}:{self.freq}"
|
||||
|
||||
|
||||
class TreeNode:
|
||||
def __init__(self, freq: int, left: Letter | TreeNode, right: Letter | TreeNode):
|
||||
self.freq: int = freq
|
||||
self.left: Letter | TreeNode = left
|
||||
self.right: Letter | TreeNode = right
|
||||
|
||||
|
||||
def parse_file(file_path: str) -> list[Letter]:
|
||||
"""
|
||||
Read the file and build a dict of all letters and their
|
||||
frequencies, then convert the dict into a list of Letters.
|
||||
"""
|
||||
chars: dict[str, int] = {}
|
||||
with open(file_path) as f:
|
||||
while True:
|
||||
c = f.read(1)
|
||||
if not c:
|
||||
break
|
||||
chars[c] = chars[c] + 1 if c in chars else 1
|
||||
return sorted((Letter(c, f) for c, f in chars.items()), key=lambda x: x.freq)
|
||||
|
||||
|
||||
def build_tree(letters: list[Letter]) -> Letter | TreeNode:
|
||||
"""
|
||||
Run through the list of Letters and build the min heap
|
||||
for the Huffman Tree.
|
||||
"""
|
||||
response: list[Letter | TreeNode] = list(letters)
|
||||
while len(response) > 1:
|
||||
left = response.pop(0)
|
||||
right = response.pop(0)
|
||||
total_freq = left.freq + right.freq
|
||||
node = TreeNode(total_freq, left, right)
|
||||
response.append(node)
|
||||
response.sort(key=lambda x: x.freq)
|
||||
return response[0]
|
||||
|
||||
|
||||
def traverse_tree(root: Letter | TreeNode, bitstring: str) -> list[Letter]:
|
||||
"""
|
||||
Recursively traverse the Huffman Tree to set each
|
||||
Letter's bitstring dictionary, and return the list of Letters
|
||||
"""
|
||||
if isinstance(root, Letter):
|
||||
root.bitstring[root.letter] = bitstring
|
||||
return [root]
|
||||
treenode: TreeNode = root
|
||||
letters = []
|
||||
letters += traverse_tree(treenode.left, bitstring + "0")
|
||||
letters += traverse_tree(treenode.right, bitstring + "1")
|
||||
return letters
|
||||
|
||||
|
||||
def huffman(file_path: str) -> None:
|
||||
"""
|
||||
Parse the file, build the tree, then run through the file
|
||||
again, using the letters dictionary to find and print out the
|
||||
bitstring for each letter.
|
||||
"""
|
||||
letters_list = parse_file(file_path)
|
||||
root = build_tree(letters_list)
|
||||
letters = {
|
||||
k: v for letter in traverse_tree(root, "") for k, v in letter.bitstring.items()
|
||||
}
|
||||
print(f"Huffman Coding of {file_path}: ")
|
||||
with open(file_path) as f:
|
||||
while True:
|
||||
c = f.read(1)
|
||||
if not c:
|
||||
break
|
||||
print(letters[c], end=" ")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# pass the file path to the huffman function
|
||||
huffman(sys.argv[1])
|
BIN
data_compression/image_data/PSNR-example-base.png
Normal file
BIN
data_compression/image_data/PSNR-example-base.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 4.3 MiB |
BIN
data_compression/image_data/PSNR-example-comp-10.jpg
Normal file
BIN
data_compression/image_data/PSNR-example-comp-10.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 104 KiB |
BIN
data_compression/image_data/compressed_image.png
Normal file
BIN
data_compression/image_data/compressed_image.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 26 KiB |
BIN
data_compression/image_data/example_image.jpg
Normal file
BIN
data_compression/image_data/example_image.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 29 KiB |
BIN
data_compression/image_data/example_wikipedia_image.jpg
Normal file
BIN
data_compression/image_data/example_wikipedia_image.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 476 KiB |
BIN
data_compression/image_data/original_image.png
Normal file
BIN
data_compression/image_data/original_image.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 82 KiB |
125
data_compression/lempel_ziv.py
Normal file
125
data_compression/lempel_ziv.py
Normal file
@ -0,0 +1,125 @@
|
||||
"""
|
||||
One of the several implementations of Lempel-Ziv-Welch compression algorithm
|
||||
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
|
||||
"""
|
||||
|
||||
import math
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
def read_file_binary(file_path: str) -> str:
|
||||
"""
|
||||
Reads given file as bytes and returns them as a long string
|
||||
"""
|
||||
result = ""
|
||||
try:
|
||||
with open(file_path, "rb") as binary_file:
|
||||
data = binary_file.read()
|
||||
for dat in data:
|
||||
curr_byte = f"{dat:08b}"
|
||||
result += curr_byte
|
||||
return result
|
||||
except OSError:
|
||||
print("File not accessible")
|
||||
sys.exit()
|
||||
|
||||
|
||||
def add_key_to_lexicon(
|
||||
lexicon: dict[str, str], curr_string: str, index: int, last_match_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Adds new strings (curr_string + "0", curr_string + "1") to the lexicon
|
||||
"""
|
||||
lexicon.pop(curr_string)
|
||||
lexicon[curr_string + "0"] = last_match_id
|
||||
|
||||
if math.log2(index).is_integer():
|
||||
for curr_key, value in lexicon.items():
|
||||
lexicon[curr_key] = f"0{value}"
|
||||
|
||||
lexicon[curr_string + "1"] = bin(index)[2:]
|
||||
|
||||
|
||||
def compress_data(data_bits: str) -> str:
|
||||
"""
|
||||
Compresses given data_bits using Lempel-Ziv-Welch compression algorithm
|
||||
and returns the result as a string
|
||||
"""
|
||||
lexicon = {"0": "0", "1": "1"}
|
||||
result, curr_string = "", ""
|
||||
index = len(lexicon)
|
||||
|
||||
for i in range(len(data_bits)):
|
||||
curr_string += data_bits[i]
|
||||
if curr_string not in lexicon:
|
||||
continue
|
||||
|
||||
last_match_id = lexicon[curr_string]
|
||||
result += last_match_id
|
||||
add_key_to_lexicon(lexicon, curr_string, index, last_match_id)
|
||||
index += 1
|
||||
curr_string = ""
|
||||
|
||||
while curr_string != "" and curr_string not in lexicon:
|
||||
curr_string += "0"
|
||||
|
||||
if curr_string != "":
|
||||
last_match_id = lexicon[curr_string]
|
||||
result += last_match_id
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def add_file_length(source_path: str, compressed: str) -> str:
|
||||
"""
|
||||
Adds given file's length in front (using Elias gamma coding) of the compressed
|
||||
string
|
||||
"""
|
||||
file_length = os.path.getsize(source_path)
|
||||
file_length_binary = bin(file_length)[2:]
|
||||
length_length = len(file_length_binary)
|
||||
|
||||
return "0" * (length_length - 1) + file_length_binary + compressed
|
||||
|
||||
|
||||
def write_file_binary(file_path: str, to_write: str) -> None:
|
||||
"""
|
||||
Writes given to_write string (should only consist of 0's and 1's) as bytes in the
|
||||
file
|
||||
"""
|
||||
byte_length = 8
|
||||
try:
|
||||
with open(file_path, "wb") as opened_file:
|
||||
result_byte_array = [
|
||||
to_write[i : i + byte_length]
|
||||
for i in range(0, len(to_write), byte_length)
|
||||
]
|
||||
|
||||
if len(result_byte_array[-1]) % byte_length == 0:
|
||||
result_byte_array.append("10000000")
|
||||
else:
|
||||
result_byte_array[-1] += "1" + "0" * (
|
||||
byte_length - len(result_byte_array[-1]) - 1
|
||||
)
|
||||
|
||||
for elem in result_byte_array:
|
||||
opened_file.write(int(elem, 2).to_bytes(1, byteorder="big"))
|
||||
except OSError:
|
||||
print("File not accessible")
|
||||
sys.exit()
|
||||
|
||||
|
||||
def compress(source_path: str, destination_path: str) -> None:
|
||||
"""
|
||||
Reads source file, compresses it and writes the compressed result in destination
|
||||
file
|
||||
"""
|
||||
data_bits = read_file_binary(source_path)
|
||||
compressed = compress_data(data_bits)
|
||||
compressed = add_file_length(source_path, compressed)
|
||||
write_file_binary(destination_path, compressed)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
compress(sys.argv[1], sys.argv[2])
|
111
data_compression/lempel_ziv_decompress.py
Normal file
111
data_compression/lempel_ziv_decompress.py
Normal file
@ -0,0 +1,111 @@
|
||||
"""
|
||||
One of the several implementations of Lempel-Ziv-Welch decompression algorithm
|
||||
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
|
||||
"""
|
||||
|
||||
import math
|
||||
import sys
|
||||
|
||||
|
||||
def read_file_binary(file_path: str) -> str:
|
||||
"""
|
||||
Reads given file as bytes and returns them as a long string
|
||||
"""
|
||||
result = ""
|
||||
try:
|
||||
with open(file_path, "rb") as binary_file:
|
||||
data = binary_file.read()
|
||||
for dat in data:
|
||||
curr_byte = f"{dat:08b}"
|
||||
result += curr_byte
|
||||
return result
|
||||
except OSError:
|
||||
print("File not accessible")
|
||||
sys.exit()
|
||||
|
||||
|
||||
def decompress_data(data_bits: str) -> str:
|
||||
"""
|
||||
Decompresses given data_bits using Lempel-Ziv-Welch compression algorithm
|
||||
and returns the result as a string
|
||||
"""
|
||||
lexicon = {"0": "0", "1": "1"}
|
||||
result, curr_string = "", ""
|
||||
index = len(lexicon)
|
||||
|
||||
for i in range(len(data_bits)):
|
||||
curr_string += data_bits[i]
|
||||
if curr_string not in lexicon:
|
||||
continue
|
||||
|
||||
last_match_id = lexicon[curr_string]
|
||||
result += last_match_id
|
||||
lexicon[curr_string] = last_match_id + "0"
|
||||
|
||||
if math.log2(index).is_integer():
|
||||
new_lex = {}
|
||||
for curr_key in list(lexicon):
|
||||
new_lex["0" + curr_key] = lexicon.pop(curr_key)
|
||||
lexicon = new_lex
|
||||
|
||||
lexicon[bin(index)[2:]] = last_match_id + "1"
|
||||
index += 1
|
||||
curr_string = ""
|
||||
return result
|
||||
|
||||
|
||||
def write_file_binary(file_path: str, to_write: str) -> None:
|
||||
"""
|
||||
Writes given to_write string (should only consist of 0's and 1's) as bytes in the
|
||||
file
|
||||
"""
|
||||
byte_length = 8
|
||||
try:
|
||||
with open(file_path, "wb") as opened_file:
|
||||
result_byte_array = [
|
||||
to_write[i : i + byte_length]
|
||||
for i in range(0, len(to_write), byte_length)
|
||||
]
|
||||
|
||||
if len(result_byte_array[-1]) % byte_length == 0:
|
||||
result_byte_array.append("10000000")
|
||||
else:
|
||||
result_byte_array[-1] += "1" + "0" * (
|
||||
byte_length - len(result_byte_array[-1]) - 1
|
||||
)
|
||||
|
||||
for elem in result_byte_array[:-1]:
|
||||
opened_file.write(int(elem, 2).to_bytes(1, byteorder="big"))
|
||||
except OSError:
|
||||
print("File not accessible")
|
||||
sys.exit()
|
||||
|
||||
|
||||
def remove_prefix(data_bits: str) -> str:
|
||||
"""
|
||||
Removes size prefix, that compressed file should have
|
||||
Returns the result
|
||||
"""
|
||||
counter = 0
|
||||
for letter in data_bits:
|
||||
if letter == "1":
|
||||
break
|
||||
counter += 1
|
||||
|
||||
data_bits = data_bits[counter:]
|
||||
data_bits = data_bits[counter + 1 :]
|
||||
return data_bits
|
||||
|
||||
|
||||
def compress(source_path: str, destination_path: str) -> None:
|
||||
"""
|
||||
Reads source file, decompresses it and writes the result in destination file
|
||||
"""
|
||||
data_bits = read_file_binary(source_path)
|
||||
data_bits = remove_prefix(data_bits)
|
||||
decompressed = decompress_data(data_bits)
|
||||
write_file_binary(destination_path, decompressed)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
compress(sys.argv[1], sys.argv[2])
|
225
data_compression/lz77.py
Normal file
225
data_compression/lz77.py
Normal file
@ -0,0 +1,225 @@
|
||||
"""
|
||||
LZ77 compression algorithm
|
||||
- lossless data compression published in papers by Abraham Lempel and Jacob Ziv in 1977
|
||||
- also known as LZ1 or sliding-window compression
|
||||
- form the basis for many variations including LZW, LZSS, LZMA and others
|
||||
|
||||
It uses a “sliding window” method. Within the sliding window we have:
|
||||
- search buffer
|
||||
- look ahead buffer
|
||||
len(sliding_window) = len(search_buffer) + len(look_ahead_buffer)
|
||||
|
||||
LZ77 manages a dictionary that uses triples composed of:
|
||||
- Offset into search buffer, it's the distance between the start of a phrase and
|
||||
the beginning of a file.
|
||||
- Length of the match, it's the number of characters that make up a phrase.
|
||||
- The indicator is represented by a character that is going to be encoded next.
|
||||
|
||||
As a file is parsed, the dictionary is dynamically updated to reflect the compressed
|
||||
data contents and size.
|
||||
|
||||
Examples:
|
||||
"cabracadabrarrarrad" <-> [(0, 0, 'c'), (0, 0, 'a'), (0, 0, 'b'), (0, 0, 'r'),
|
||||
(3, 1, 'c'), (2, 1, 'd'), (7, 4, 'r'), (3, 5, 'd')]
|
||||
"ababcbababaa" <-> [(0, 0, 'a'), (0, 0, 'b'), (2, 2, 'c'), (4, 3, 'a'), (2, 2, 'a')]
|
||||
"aacaacabcabaaac" <-> [(0, 0, 'a'), (1, 1, 'c'), (3, 4, 'b'), (3, 3, 'a'), (1, 2, 'c')]
|
||||
|
||||
Sources:
|
||||
en.wikipedia.org/wiki/LZ77_and_LZ78
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
__version__ = "0.1"
|
||||
__author__ = "Lucia Harcekova"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Token:
|
||||
"""
|
||||
Dataclass representing triplet called token consisting of length, offset
|
||||
and indicator. This triplet is used during LZ77 compression.
|
||||
"""
|
||||
|
||||
offset: int
|
||||
length: int
|
||||
indicator: str
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""
|
||||
>>> token = Token(1, 2, "c")
|
||||
>>> repr(token)
|
||||
'(1, 2, c)'
|
||||
>>> str(token)
|
||||
'(1, 2, c)'
|
||||
"""
|
||||
return f"({self.offset}, {self.length}, {self.indicator})"
|
||||
|
||||
|
||||
class LZ77Compressor:
|
||||
"""
|
||||
Class containing compress and decompress methods using LZ77 compression algorithm.
|
||||
"""
|
||||
|
||||
def __init__(self, window_size: int = 13, lookahead_buffer_size: int = 6) -> None:
|
||||
self.window_size = window_size
|
||||
self.lookahead_buffer_size = lookahead_buffer_size
|
||||
self.search_buffer_size = self.window_size - self.lookahead_buffer_size
|
||||
|
||||
def compress(self, text: str) -> list[Token]:
|
||||
"""
|
||||
Compress the given string text using LZ77 compression algorithm.
|
||||
|
||||
Args:
|
||||
text: string to be compressed
|
||||
|
||||
Returns:
|
||||
output: the compressed text as a list of Tokens
|
||||
|
||||
>>> lz77_compressor = LZ77Compressor()
|
||||
>>> str(lz77_compressor.compress("ababcbababaa"))
|
||||
'[(0, 0, a), (0, 0, b), (2, 2, c), (4, 3, a), (2, 2, a)]'
|
||||
>>> str(lz77_compressor.compress("aacaacabcabaaac"))
|
||||
'[(0, 0, a), (1, 1, c), (3, 4, b), (3, 3, a), (1, 2, c)]'
|
||||
"""
|
||||
|
||||
output = []
|
||||
search_buffer = ""
|
||||
|
||||
# while there are still characters in text to compress
|
||||
while text:
|
||||
# find the next encoding phrase
|
||||
# - triplet with offset, length, indicator (the next encoding character)
|
||||
token = self._find_encoding_token(text, search_buffer)
|
||||
|
||||
# update the search buffer:
|
||||
# - add new characters from text into it
|
||||
# - check if size exceed the max search buffer size, if so, drop the
|
||||
# oldest elements
|
||||
search_buffer += text[: token.length + 1]
|
||||
if len(search_buffer) > self.search_buffer_size:
|
||||
search_buffer = search_buffer[-self.search_buffer_size :]
|
||||
|
||||
# update the text
|
||||
text = text[token.length + 1 :]
|
||||
|
||||
# append the token to output
|
||||
output.append(token)
|
||||
|
||||
return output
|
||||
|
||||
def decompress(self, tokens: list[Token]) -> str:
|
||||
"""
|
||||
Convert the list of tokens into an output string.
|
||||
|
||||
Args:
|
||||
tokens: list containing triplets (offset, length, char)
|
||||
|
||||
Returns:
|
||||
output: decompressed text
|
||||
|
||||
Tests:
|
||||
>>> lz77_compressor = LZ77Compressor()
|
||||
>>> lz77_compressor.decompress([Token(0, 0, 'c'), Token(0, 0, 'a'),
|
||||
... Token(0, 0, 'b'), Token(0, 0, 'r'), Token(3, 1, 'c'),
|
||||
... Token(2, 1, 'd'), Token(7, 4, 'r'), Token(3, 5, 'd')])
|
||||
'cabracadabrarrarrad'
|
||||
>>> lz77_compressor.decompress([Token(0, 0, 'a'), Token(0, 0, 'b'),
|
||||
... Token(2, 2, 'c'), Token(4, 3, 'a'), Token(2, 2, 'a')])
|
||||
'ababcbababaa'
|
||||
>>> lz77_compressor.decompress([Token(0, 0, 'a'), Token(1, 1, 'c'),
|
||||
... Token(3, 4, 'b'), Token(3, 3, 'a'), Token(1, 2, 'c')])
|
||||
'aacaacabcabaaac'
|
||||
"""
|
||||
|
||||
output = ""
|
||||
|
||||
for token in tokens:
|
||||
for _ in range(token.length):
|
||||
output += output[-token.offset]
|
||||
output += token.indicator
|
||||
|
||||
return output
|
||||
|
||||
def _find_encoding_token(self, text: str, search_buffer: str) -> Token:
|
||||
"""Finds the encoding token for the first character in the text.
|
||||
|
||||
Tests:
|
||||
>>> lz77_compressor = LZ77Compressor()
|
||||
>>> lz77_compressor._find_encoding_token("abrarrarrad", "abracad").offset
|
||||
7
|
||||
>>> lz77_compressor._find_encoding_token("adabrarrarrad", "cabrac").length
|
||||
1
|
||||
>>> lz77_compressor._find_encoding_token("abc", "xyz").offset
|
||||
0
|
||||
>>> lz77_compressor._find_encoding_token("", "xyz").offset
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: We need some text to work with.
|
||||
>>> lz77_compressor._find_encoding_token("abc", "").offset
|
||||
0
|
||||
"""
|
||||
|
||||
if not text:
|
||||
raise ValueError("We need some text to work with.")
|
||||
|
||||
# Initialise result parameters to default values
|
||||
length, offset = 0, 0
|
||||
|
||||
if not search_buffer:
|
||||
return Token(offset, length, text[length])
|
||||
|
||||
for i, character in enumerate(search_buffer):
|
||||
found_offset = len(search_buffer) - i
|
||||
if character == text[0]:
|
||||
found_length = self._match_length_from_index(text, search_buffer, 0, i)
|
||||
# if the found length is bigger than the current or if it's equal,
|
||||
# which means it's offset is smaller: update offset and length
|
||||
if found_length >= length:
|
||||
offset, length = found_offset, found_length
|
||||
|
||||
return Token(offset, length, text[length])
|
||||
|
||||
def _match_length_from_index(
|
||||
self, text: str, window: str, text_index: int, window_index: int
|
||||
) -> int:
|
||||
"""Calculate the longest possible match of text and window characters from
|
||||
text_index in text and window_index in window.
|
||||
|
||||
Args:
|
||||
text: _description_
|
||||
window: sliding window
|
||||
text_index: index of character in text
|
||||
window_index: index of character in sliding window
|
||||
|
||||
Returns:
|
||||
The maximum match between text and window, from given indexes.
|
||||
|
||||
Tests:
|
||||
>>> lz77_compressor = LZ77Compressor(13, 6)
|
||||
>>> lz77_compressor._match_length_from_index("rarrad", "adabrar", 0, 4)
|
||||
5
|
||||
>>> lz77_compressor._match_length_from_index("adabrarrarrad",
|
||||
... "cabrac", 0, 1)
|
||||
1
|
||||
"""
|
||||
if not text or text[text_index] != window[window_index]:
|
||||
return 0
|
||||
return 1 + self._match_length_from_index(
|
||||
text, window + text[text_index], text_index + 1, window_index + 1
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from doctest import testmod
|
||||
|
||||
testmod()
|
||||
# Initialize compressor class
|
||||
lz77_compressor = LZ77Compressor(window_size=13, lookahead_buffer_size=6)
|
||||
|
||||
# Example
|
||||
TEXT = "cabracadabrarrarrad"
|
||||
compressed_text = lz77_compressor.compress(TEXT)
|
||||
print(lz77_compressor.compress("ababcbababaa"))
|
||||
decompressed_text = lz77_compressor.decompress(compressed_text)
|
||||
assert decompressed_text == TEXT, "The LZ77 algorithm returned the invalid result."
|
46
data_compression/peak_signal_to_noise_ratio.py
Normal file
46
data_compression/peak_signal_to_noise_ratio.py
Normal file
@ -0,0 +1,46 @@
|
||||
"""
|
||||
Peak signal-to-noise ratio - PSNR
|
||||
https://en.wikipedia.org/wiki/Peak_signal-to-noise_ratio
|
||||
Source:
|
||||
https://tutorials.techonical.com/how-to-calculate-psnr-value-of-two-images-using-python
|
||||
"""
|
||||
|
||||
import math
|
||||
import os
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
PIXEL_MAX = 255.0
|
||||
|
||||
|
||||
def peak_signal_to_noise_ratio(original: float, contrast: float) -> float:
|
||||
mse = np.mean((original - contrast) ** 2)
|
||||
if mse == 0:
|
||||
return 100
|
||||
|
||||
return 20 * math.log10(PIXEL_MAX / math.sqrt(mse))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||||
# Loading images (original image and compressed image)
|
||||
original = cv2.imread(os.path.join(dir_path, "image_data/original_image.png"))
|
||||
contrast = cv2.imread(os.path.join(dir_path, "image_data/compressed_image.png"), 1)
|
||||
|
||||
original2 = cv2.imread(os.path.join(dir_path, "image_data/PSNR-example-base.png"))
|
||||
contrast2 = cv2.imread(
|
||||
os.path.join(dir_path, "image_data/PSNR-example-comp-10.jpg"), 1
|
||||
)
|
||||
|
||||
# Value expected: 29.73dB
|
||||
print("-- First Test --")
|
||||
print(f"PSNR value is {peak_signal_to_noise_ratio(original, contrast)} dB")
|
||||
|
||||
# # Value expected: 31.53dB (Wikipedia Example)
|
||||
print("\n-- Second Test --")
|
||||
print(f"PSNR value is {peak_signal_to_noise_ratio(original2, contrast2)} dB")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
48
data_compression/run_length_encoding.py
Normal file
48
data_compression/run_length_encoding.py
Normal file
@ -0,0 +1,48 @@
|
||||
# https://en.wikipedia.org/wiki/Run-length_encoding
|
||||
|
||||
|
||||
def run_length_encode(text: str) -> list:
|
||||
"""
|
||||
Performs Run Length Encoding
|
||||
>>> run_length_encode("AAAABBBCCDAA")
|
||||
[('A', 4), ('B', 3), ('C', 2), ('D', 1), ('A', 2)]
|
||||
>>> run_length_encode("A")
|
||||
[('A', 1)]
|
||||
>>> run_length_encode("AA")
|
||||
[('A', 2)]
|
||||
>>> run_length_encode("AAADDDDDDFFFCCCAAVVVV")
|
||||
[('A', 3), ('D', 6), ('F', 3), ('C', 3), ('A', 2), ('V', 4)]
|
||||
"""
|
||||
encoded = []
|
||||
count = 1
|
||||
|
||||
for i in range(len(text)):
|
||||
if i + 1 < len(text) and text[i] == text[i + 1]:
|
||||
count += 1
|
||||
else:
|
||||
encoded.append((text[i], count))
|
||||
count = 1
|
||||
|
||||
return encoded
|
||||
|
||||
|
||||
def run_length_decode(encoded: list) -> str:
|
||||
"""
|
||||
Performs Run Length Decoding
|
||||
>>> run_length_decode([('A', 4), ('B', 3), ('C', 2), ('D', 1), ('A', 2)])
|
||||
'AAAABBBCCDAA'
|
||||
>>> run_length_decode([('A', 1)])
|
||||
'A'
|
||||
>>> run_length_decode([('A', 2)])
|
||||
'AA'
|
||||
>>> run_length_decode([('A', 3), ('D', 6), ('F', 3), ('C', 3), ('A', 2), ('V', 4)])
|
||||
'AAADDDDDDFFFCCCAAVVVV'
|
||||
"""
|
||||
return "".join(char * length for char, length in encoded)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from doctest import testmod
|
||||
|
||||
testmod(name="run_length_encode", verbose=True)
|
||||
testmod(name="run_length_decode", verbose=True)
|
Reference in New Issue
Block a user