Skip to content

Commit a303aa8

Browse files
authored
lz4 decompressor has been fixed (#6414)
1 parent 3b6fd61 commit a303aa8

File tree

7 files changed

+106
-20
lines changed

7 files changed

+106
-20
lines changed

ydb/library/yql/providers/s3/compressors/lz4io.cpp

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ constexpr ui32 LegacyMagicNumber = 0x184C2102U;
2222
constexpr size_t LegacyBlockSize = 8_MB;
2323
constexpr size_t FrameMaxBlockSize = 4_MB;
2424

25-
void WriteLE32 (void* p, ui32 value32)
25+
void WriteLE32(void* p, ui32 value32)
2626
{
2727
const auto dstPtr = static_cast<unsigned char*>(p);
2828
dstPtr[0] = (unsigned char)value32;
@@ -31,7 +31,7 @@ void WriteLE32 (void* p, ui32 value32)
3131
dstPtr[3] = (unsigned char)(value32 >> 24U);
3232
}
3333

34-
ui32 ReadLE32 (const void* s) {
34+
ui32 ReadLE32(const void* s) {
3535
const auto srcPtr = static_cast<const unsigned char*>(s);
3636
ui32 value32 = srcPtr[0];
3737
value32 += (ui32)srcPtr[1] << 8U;
@@ -112,29 +112,35 @@ bool TReadBuffer::nextImpl() {
112112
}
113113

114114
size_t TReadBuffer::DecompressFrame() {
115-
if (NextToLoad > InBuffer.size()) {
116-
InBuffer.resize(NextToLoad);
117-
}
118-
119-
if (Pos >= Remaining) {
120-
for (auto toRead = NextToLoad; toRead > 0U;) {
121-
const auto sizeCheck = Source.read(InBuffer.data() + NextToLoad - toRead, toRead);
122-
YQL_ENSURE(sizeCheck > 0U && sizeCheck <= toRead, "Cannot access compressed block.");
123-
toRead -= sizeCheck;
115+
while (NextToLoad) {
116+
if (NextToLoad > InBuffer.size()) {
117+
InBuffer.resize(NextToLoad);
124118
}
125119

126-
Pos = 0ULL;
127-
Remaining = NextToLoad;
128-
}
120+
if (Pos >= Remaining) {
121+
for (auto toRead = NextToLoad; toRead > 0U;) {
122+
const auto sizeCheck = Source.read(InBuffer.data() + NextToLoad - toRead, toRead);
123+
YQL_ENSURE(sizeCheck > 0U && sizeCheck <= toRead, "Cannot access compressed block.");
124+
toRead -= sizeCheck;
125+
}
126+
127+
Pos = 0ULL;
128+
Remaining = NextToLoad;
129+
}
129130

130-
if (Pos < Remaining) {
131131
auto decodedBytes = OutBuffer.size();
132-
NextToLoad = LZ4F_decompress_usingDict(Ctx, OutBuffer.data(), &decodedBytes, InBuffer.data() + Pos, &Remaining, nullptr, 0ULL, nullptr);
133-
YQL_ENSURE(!LZ4F_isError(NextToLoad), "Decompression error: " << LZ4F_getErrorName(NextToLoad));
134-
Pos += Remaining;
132+
while (Pos < Remaining || (decodedBytes == OutBuffer.size())) {
133+
decodedBytes = OutBuffer.size();
134+
NextToLoad = LZ4F_decompress_usingDict(Ctx, OutBuffer.data(), &decodedBytes, InBuffer.data() + Pos, &Remaining, nullptr, 0ULL, nullptr);
135+
YQL_ENSURE(!LZ4F_isError(NextToLoad), "Decompression error: " << LZ4F_getErrorName(NextToLoad));
136+
Pos += Remaining;
137+
138+
if (decodedBytes)
139+
return decodedBytes;
135140

136-
if (decodedBytes)
137-
return decodedBytes;
141+
if (!NextToLoad)
142+
return decodedBytes;
143+
}
138144
}
139145

140146
return 0ULL;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#include <ydb/library/yql/providers/s3/compressors/lz4io.h>
2+
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromFile.h>
3+
4+
#include <library/cpp/scheme/scheme.h>
5+
#include <library/cpp/testing/common/env.h>
6+
#include <library/cpp/testing/unittest/registar.h>
7+
8+
namespace NYql::NCompressors {
9+
10+
namespace {
11+
TString GetResourcePath(const TString& path) {
12+
return ArcadiaSourceRoot() + "/ydb/library/yql/providers/s3/compressors/ut/test_compression_data/" + path;
13+
}
14+
}
15+
16+
Y_UNIT_TEST_SUITE(TCompressorTests) {
17+
Y_UNIT_TEST(SuccessLz4) {
18+
NDB::ReadBufferFromFile buffer(GetResourcePath("test.json.lz4"));
19+
auto decompressorBuffer = std::make_unique<NLz4::TReadBuffer>(buffer);
20+
21+
char str[256] = {};
22+
decompressorBuffer->read(str, 256);
23+
UNIT_ASSERT_VALUES_EQUAL(NSc::TValue::FromJsonThrow(str), NSc::TValue::FromJsonThrow(R"([
24+
{
25+
"id": 0,
26+
"description": "yq",
27+
"info": "abc"
28+
}
29+
])"));
30+
}
31+
32+
Y_UNIT_TEST(WrongMagicLz4) {
33+
NDB::ReadBufferFromFile buffer(GetResourcePath("test.json"));
34+
UNIT_ASSERT_EXCEPTION_CONTAINS(std::make_unique<NLz4::TReadBuffer>(buffer), yexception, "TReadBuffer(): requirement StreamType != EStreamType::Unknown failed, message: Wrong magic.");
35+
}
36+
37+
Y_UNIT_TEST(ErrorLz4) {
38+
NDB::ReadBufferFromFile buffer(GetResourcePath("test.broken.lz4"));
39+
auto decompressorBuffer = std::make_unique<NLz4::TReadBuffer>(buffer);
40+
char str[256] = {};
41+
UNIT_ASSERT_EXCEPTION_CONTAINS(decompressorBuffer->read(str, 256), yexception, "DecompressFrame(): requirement !LZ4F_isError(NextToLoad) failed, message: Decompression error: ERROR_reservedFlag_set");
42+
}
43+
}
44+
45+
}
Binary file not shown.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[
2+
{
3+
"id": 0,
4+
"description": "yq",
5+
"info": "abc"
6+
}
7+
]
8+
Binary file not shown.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
IF (NOT OS_WINDOWS AND CLANG AND NOT WITH_VALGRIND)
2+
3+
UNITTEST_FOR(ydb/library/yql/providers/s3/compressors)
4+
5+
SRCS(
6+
decompressor_ut.cpp
7+
)
8+
9+
PEERDIR(
10+
library/cpp/scheme
11+
ydb/library/yql/public/udf/service/stub
12+
ydb/library/yql/udfs/common/clickhouse/client
13+
)
14+
15+
ADDINCL(
16+
ydb/library/yql/udfs/common/clickhouse/client/base
17+
ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random
18+
ydb/library/yql/udfs/common/clickhouse/client/src
19+
)
20+
21+
END()
22+
23+
ENDIF()
24+

ydb/library/yql/providers/s3/compressors/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,6 @@ YQL_LAST_ABI_VERSION()
3636

3737
END()
3838

39+
RECURSE_FOR_TESTS(
40+
ut
41+
)

0 commit comments

Comments
 (0)