Skip to content

Commit 332cf0a

Browse files
committed
Add tests for _ZlibDecompressor
1 parent 9d60339 commit 332cf0a

File tree

1 file changed

+175
-7
lines changed

1 file changed

+175
-7
lines changed

Lib/test/test_zlib.py

+175-7
Original file line numberDiff line numberDiff line change
@@ -871,13 +871,6 @@ def test_wbits(self):
871871
)
872872
self.assertEqual(expected, actual)
873873

874-
def choose_lines(source, number, seed=None, generator=random):
875-
"""Return a list of number lines randomly chosen from the source"""
876-
if seed is not None:
877-
generator.seed(seed)
878-
sources = source.split('\n')
879-
return [generator.choice(sources) for n in range(number)]
880-
881874

882875
HAMLET_SCENE = b"""
883876
LAERTES
@@ -944,6 +937,181 @@ def choose_lines(source, number, seed=None, generator=random):
944937
"""
945938

946939

940+
class ZlibDecompressorTest():
941+
# Test adopted from test_bz2.py
942+
TEXT = HAMLET_SCENE
943+
DATA = zlib.compress(HAMLET_SCENE)
944+
BAD_DATA = b"Not a valid deflate block"
945+
def test_Constructor(self):
946+
self.assertRaises(TypeError, zlib._ZlibDecompressor, 42)
947+
948+
def testDecompress(self):
949+
zlibd = zlib._ZlibDecompressor()
950+
self.assertRaises(TypeError, zlibd.decompress)
951+
text = zlibd.decompress(self.DATA)
952+
self.assertEqual(text, self.TEXT)
953+
954+
def testDecompressChunks10(self):
955+
zlibd = zlib._ZlibDecompressor()
956+
text = b''
957+
n = 0
958+
while True:
959+
str = self.DATA[n*10:(n+1)*10]
960+
if not str:
961+
break
962+
text += zlibd.decompress(str)
963+
n += 1
964+
self.assertEqual(text, self.TEXT)
965+
966+
def testDecompressUnusedData(self):
967+
zlibd = zlib._ZlibDecompressor()
968+
unused_data = b"this is unused data"
969+
text = zlibd.decompress(self.DATA+unused_data)
970+
self.assertEqual(text, self.TEXT)
971+
self.assertEqual(zlibd.unused_data, unused_data)
972+
973+
def testEOFError(self):
974+
zlibd = zlib._ZlibDecompressor()
975+
text = zlibd.decompress(self.DATA)
976+
self.assertRaises(EOFError, zlibd.decompress, b"anything")
977+
self.assertRaises(EOFError, zlibd.decompress, b"")
978+
979+
@support.skip_if_pgo_task
980+
@bigmemtest(size=_4G + 100, memuse=3.3)
981+
def testDecompress4G(self, size):
982+
# "Test zlib._ZlibDecompressor.decompress() with >4GiB input"
983+
blocksize = 10 * 1024 * 1024
984+
block = random.randbytes(blocksize)
985+
try:
986+
data = block * (size // blocksize + 1)
987+
compressed = zlib.compress(data)
988+
zlibd = zlib._ZlibDecompressor()
989+
decompressed = zlibd.decompress(compressed)
990+
self.assertTrue(decompressed == data)
991+
finally:
992+
data = None
993+
compressed = None
994+
decompressed = None
995+
996+
def testPickle(self):
997+
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
998+
with self.assertRaises(TypeError):
999+
pickle.dumps(zlib._ZlibDecompressor(), proto)
1000+
1001+
def testDecompressorChunksMaxsize(self):
1002+
zlibd = zlib._ZlibDecompressor()
1003+
max_length = 100
1004+
out = []
1005+
1006+
# Feed some input
1007+
len_ = len(self.BIG_DATA) - 64
1008+
out.append(zlibd.decompress(self.BIG_DATA[:len_],
1009+
max_length=max_length))
1010+
self.assertFalse(zlibd.needs_input)
1011+
self.assertEqual(len(out[-1]), max_length)
1012+
1013+
# Retrieve more data without providing more input
1014+
out.append(zlibd.decompress(b'', max_length=max_length))
1015+
self.assertFalse(zlibd.needs_input)
1016+
self.assertEqual(len(out[-1]), max_length)
1017+
1018+
# Retrieve more data while providing more input
1019+
out.append(zlibd.decompress(self.BIG_DATA[len_:],
1020+
max_length=max_length))
1021+
self.assertLessEqual(len(out[-1]), max_length)
1022+
1023+
# Retrieve remaining uncompressed data
1024+
while not zlibd.eof:
1025+
out.append(zlibd.decompress(b'', max_length=max_length))
1026+
self.assertLessEqual(len(out[-1]), max_length)
1027+
1028+
out = b"".join(out)
1029+
self.assertEqual(out, self.BIG_TEXT)
1030+
self.assertEqual(zlibd.unused_data, b"")
1031+
1032+
def test_decompressor_inputbuf_1(self):
1033+
# Test reusing input buffer after moving existing
1034+
# contents to beginning
1035+
zlibd = zlib._ZlibDecompressor()
1036+
out = []
1037+
1038+
# Create input buffer and fill it
1039+
self.assertEqual(zlibd.decompress(self.DATA[:100],
1040+
max_length=0), b'')
1041+
1042+
# Retrieve some results, freeing capacity at beginning
1043+
# of input buffer
1044+
out.append(zlibd.decompress(b'', 2))
1045+
1046+
# Add more data that fits into input buffer after
1047+
# moving existing data to beginning
1048+
out.append(zlibd.decompress(self.DATA[100:105], 15))
1049+
1050+
# Decompress rest of data
1051+
out.append(zlibd.decompress(self.DATA[105:]))
1052+
self.assertEqual(b''.join(out), self.TEXT)
1053+
1054+
def test_decompressor_inputbuf_2(self):
1055+
# Test reusing input buffer by appending data at the
1056+
# end right away
1057+
zlibd = zlib._ZlibDecompressor()
1058+
out = []
1059+
1060+
# Create input buffer and empty it
1061+
self.assertEqual(zlibd.decompress(self.DATA[:200],
1062+
max_length=0), b'')
1063+
out.append(zlibd.decompress(b''))
1064+
1065+
# Fill buffer with new data
1066+
out.append(zlibd.decompress(self.DATA[200:280], 2))
1067+
1068+
# Append some more data, not enough to require resize
1069+
out.append(zlibd.decompress(self.DATA[280:300], 2))
1070+
1071+
# Decompress rest of data
1072+
out.append(zlibd.decompress(self.DATA[300:]))
1073+
self.assertEqual(b''.join(out), self.TEXT)
1074+
1075+
def test_decompressor_inputbuf_3(self):
1076+
# Test reusing input buffer after extending it
1077+
1078+
zlibd = zlib._ZlibDecompressor()
1079+
out = []
1080+
1081+
# Create almost full input buffer
1082+
out.append(zlibd.decompress(self.DATA[:200], 5))
1083+
1084+
# Add even more data to it, requiring resize
1085+
out.append(zlibd.decompress(self.DATA[200:300], 5))
1086+
1087+
# Decompress rest of data
1088+
out.append(zlibd.decompress(self.DATA[300:]))
1089+
self.assertEqual(b''.join(out), self.TEXT)
1090+
1091+
def test_failure(self):
1092+
zlibd = zlib._ZlibDecompressor()
1093+
self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)
1094+
# Previously, a second call could crash due to internal inconsistency
1095+
self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)
1096+
1097+
@support.refcount_test
1098+
def test_refleaks_in___init__(self):
1099+
gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
1100+
zlibd = zlib._ZlibDecompressor()
1101+
refs_before = gettotalrefcount()
1102+
for i in range(100):
1103+
zlibd.__init__()
1104+
self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
1105+
1106+
1107+
def choose_lines(source, number, seed=None, generator=random):
1108+
"""Return a list of number lines randomly chosen from the source"""
1109+
if seed is not None:
1110+
generator.seed(seed)
1111+
sources = source.split('\n')
1112+
return [generator.choice(sources) for n in range(number)]
1113+
1114+
9471115
class CustomInt:
9481116
def __index__(self):
9491117
return 100

0 commit comments

Comments
 (0)