Skip to content

Commit 2f48936

Browse files
Merge pull request #1 from LuciaHarcekova/lz77
Add LZ77 compression algorithm
2 parents 79ef431 + 848143b commit 2f48936

File tree

1 file changed

+197
-0
lines changed

1 file changed

+197
-0
lines changed

compression/lz77.py

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
"""
2+
LZ77 compression algorithm
3+
- lossless data compression published in papers by Abraham Lempel and Jacob Ziv in 1977
4+
- also known as LZ1 or sliding-window compression
5+
- form the basis for many variations including LZW, LZSS, LZMA and others
6+
7+
It uses a “sliding window” method. Within the sliding window we have:
8+
- search buffer
9+
- look ahead buffer
10+
len(sliding_window) = len(search_buffer) + len(look_ahead_buffer)
11+
12+
LZ77 manages a dictionary that uses triples composed of:
13+
- Offset into search buffer, it's the distance between the start of a phrase and
14+
the beginning of a file.
15+
- Length of the match, it's the number of characters that make up a phrase.
16+
- The indicator is represented by a character that is going to be encoded next.
17+
18+
As a file is parsed, the dictionary is dynamically updated to reflect the compressed
19+
data contents and size.
20+
21+
Examples:
22+
"cabracadabrarrarrad" <-> [(0, 0, 'c'), (0, 0, 'a'), (0, 0, 'b'), (0, 0, 'r'),
23+
(3, 1, 'c'), (2, 1, 'd'), (7, 4, 'r'), (3, 5, 'd')]
24+
"ababcbababaa" <-> [(0, 0, 'a'), (0, 0, 'b'), (2, 2, 'c'), (4, 3, 'a'), (2, 2, 'a')]
25+
"aacaacabcabaaac" <-> [(0, 0, 'a'), (1, 1, 'c'), (3, 4, 'b'), (3, 3, 'a'), (1, 2, 'c')]
26+
27+
Sources:
28+
en.wikipedia.org/wiki/LZ77_and_LZ78
29+
"""
30+
31+
from typing import List, Tuple
32+
33+
__version__ = '0.1'
34+
__author__ = 'Lucia Harcekova'
35+
36+
37+
class LZ77Compressor:
38+
"""
39+
Class containg compress and decompress methods using LZ77 compression algorithm.
40+
"""
41+
42+
def __init__(self, window_size=13, lookahead_buffer_size=6):
43+
self.window_size = window_size
44+
self.lookahead_buffer_size = lookahead_buffer_size
45+
self.search_buffer_size = self.window_size - self.lookahead_buffer_size
46+
47+
def compress(self, text: str) -> List[Tuple[int, int, str]]:
48+
"""This method compresses given string text using LZ77 compression algorithm.
49+
50+
Args:
51+
text (str): string that's going to be compressed
52+
53+
Returns:
54+
output (List[Tuple[int, int, str]]): the compressed text
55+
56+
Tests:
57+
>>> lz77_compressor = LZ77Compressor(13, 6)
58+
>>> lz77_compressor.compress("ababcbababaa")
59+
[(0, 0, 'a'), (0, 0, 'b'), (2, 2, 'c'), (4, 3, 'a'), (2, 2, 'a')]
60+
>>> lz77_compressor.compress("aacaacabcabaaac")
61+
[(0, 0, 'a'), (1, 1, 'c'), (3, 4, 'b'), (3, 3, 'a'), (1, 2, 'c')]
62+
"""
63+
64+
output = []
65+
search_buffer = ""
66+
67+
# while there are still characters in text to compress
68+
while text:
69+
70+
# find the next encoding phrase
71+
# - triplet with offset, length, indicator (the next encoding character)
72+
(offset, length, indicator) = self._find_encoding_token(
73+
text, search_buffer)
74+
75+
# update the search buffer:
76+
# - add new characters from text into it
77+
# - check if size exceed the max search buffer size, if so, drop the
78+
# oldest elements
79+
search_buffer += text[:length+1]
80+
if len(search_buffer) > self.search_buffer_size:
81+
search_buffer = search_buffer[-self.search_buffer_size:]
82+
83+
# update the text
84+
text = text[length+1:]
85+
86+
# append the token to output
87+
output.append((offset, length, indicator))
88+
89+
return output
90+
91+
def decompress(self, tokens: List[Tuple[int, int, str]]) -> str:
92+
"""This method turns the list of tokens consisting of triplets of the form
93+
(offset, length, char), into an output string.
94+
95+
Args:
96+
tokens (List[Tuple[int, int, str]]): Tokens (offset, length, char)
97+
98+
Returns:
99+
output (str): The decompressed text
100+
101+
Tests:
102+
>>> lz77_compressor = LZ77Compressor(13, 6)
103+
>>> lz77_compressor.decompress([(0, 0, 'c'), (0, 0, 'a'), (0, 0, 'b'), \
104+
(0, 0, 'r'), (3, 1, 'c'), (2, 1, 'd'), (7, 4, 'r'), (3, 5, 'd')])
105+
'cabracadabrarrarrad'
106+
>>> lz77_compressor.decompress([(0, 0, 'a'), (0, 0, 'b'), (2, 2, 'c'), \
107+
(4, 3, 'a'), (2, 2, 'a')])
108+
'ababcbababaa'
109+
>>> lz77_compressor.decompress([(0, 0, 'a'), (1, 1, 'c'), (3, 4, 'b'), \
110+
(3, 3, 'a'), (1, 2, 'c')])
111+
'aacaacabcabaaac'
112+
"""
113+
114+
output = ""
115+
116+
for (offset, length, indicator) in tokens:
117+
for _ in range(length):
118+
output += output[-offset]
119+
output += indicator
120+
121+
return output
122+
123+
def _find_encoding_token(self, text: str, search_buffer: str) \
124+
-> Tuple[int, int, str]:
125+
"""Finds the encoding token for the first character in the text.
126+
127+
Args:
128+
text (str)
129+
search_buffer (str)
130+
131+
Returns:
132+
Tuple[int, int, str]: Token
133+
134+
Tests:
135+
>>> lz77_compressor = LZ77Compressor(13, 6)
136+
>>> lz77_compressor._find_encoding_token("abrarrarrad", "abracad")
137+
(7, 4, 'r')
138+
>>> lz77_compressor._find_encoding_token("adabrarrarrad", "cabrac")
139+
(2, 1, 'd')
140+
"""
141+
142+
# Initialise result parameters to default values
143+
length, offset = 0, 0
144+
145+
if search_buffer == "":
146+
return offset, length, text[length]
147+
148+
for i, character in enumerate(search_buffer):
149+
found_offset = len(search_buffer) - i
150+
if character == text[0]:
151+
found_length = self._match_length_from_index(
152+
text, search_buffer, 0, i)
153+
# if the found length is bigger than the current or if it's equal,
154+
# which means it's offset is smaller: update offset and length
155+
if found_length >= length:
156+
offset, length = found_offset, found_length
157+
158+
return offset, length, text[length]
159+
160+
def _match_length_from_index(self, text: str,
161+
window: str, text_index: int, window_index: int) -> int:
162+
"""Calculate the longest possible match of text and window characters from
163+
text_index in text and window_index in window.
164+
165+
Args:
166+
text (str): _description_
167+
window (str): sliding window
168+
text_index (int): index of character in text
169+
window_index (int): index of character in sliding window
170+
171+
Returns:
172+
int: The maximum match between text and window, from given indexes.
173+
174+
Tests:
175+
>>> lz77_compressor = LZ77Compressor(13, 6)
176+
>>> lz77_compressor._match_length_from_index("rarrad", "adabrar", 0, 4)
177+
5
178+
>>> lz77_compressor._match_length_from_index("adabrarrarrad", \
179+
"cabrac", 0, 1)
180+
1
181+
"""
182+
if text == "" or text[text_index] != window[window_index]:
183+
return 0
184+
return 1 + self._match_length_from_index(text,
185+
window + text[text_index], text_index + 1, window_index + 1)
186+
187+
188+
if __name__ == '__main__':
189+
190+
# Initialize compressor class
191+
lz77_compressor = LZ77Compressor(window_size=13, lookahead_buffer_size=6)
192+
193+
# Example
194+
TEXT = "cabracadabrarrarrad"
195+
compressed_text = lz77_compressor.compress(TEXT)
196+
decompressed_text = lz77_compressor.decompress(compressed_text)
197+
assert decompressed_text == TEXT, "The LZ77 agirithm returned the invalid result."

0 commit comments

Comments
 (0)