diff --git a/ydb/library/yql/minikql/comp_nodes/packed_tuple/hashes_calc.h b/ydb/library/yql/minikql/comp_nodes/packed_tuple/hashes_calc.h new file mode 100644 index 000000000000..8f7b13d10102 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/packed_tuple/hashes_calc.h @@ -0,0 +1,155 @@ +#pragma once + +#include + +namespace NKikimr { +namespace NMiniKQL { +namespace NPackedTuple { + + + +// Calculates CRC32 of data using hardware acceleration instruction (Size <= 16) +template ui32 CalculateCRC32(const ui8 * data, ui32 initHash = 0) { + static_assert(Size <= 16, "Size for template CRC32 calculation should be <= 16 !"); + + using TSimdI8 = TTraits::TSimdI8; + + ui32 hash = initHash; + + if constexpr (Size == 1 ) { + hash = TSimdI8::CRC32u8(hash, *(ui8*) data); + } + + if constexpr (Size == 2 ) { + hash = TSimdI8::CRC32u16(hash, *(ui16*) data); + } + + if constexpr (Size == 3 ) { + + hash = TSimdI8::CRC32u16(hash, *(ui16*) data); + hash = TSimdI8::CRC32u8(hash, *(ui8*) (data+2)); + } + + if constexpr (Size == 4 ) { + hash = TSimdI8::CRC32u32(hash, *(ui32*) data); + } + + if constexpr (Size == 5 ) { + hash = TSimdI8::CRC32u32(hash, *(ui32*) data); + hash = TSimdI8::CRC32u8(hash, *(ui8*) (data+4)); + } + + if constexpr (Size == 6 ) { + hash = TSimdI8::CRC32u32(hash, *(ui32*) data); + hash = TSimdI8::CRC32u16(hash, *(ui16*) (data+4)); + } + + if constexpr (Size == 7 ) { + hash = TSimdI8::CRC32u32(hash, *(ui32*) data); + hash = TSimdI8::CRC32u16(hash, *(ui16*) (data+4)); + hash = TSimdI8::CRC32u8(hash, *(ui8*) (data+6)); + } + + if constexpr (Size == 8 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + } + + if constexpr (Size == 9 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u8(hash, *(ui8*) (data+8)); + } + + if constexpr (Size == 10 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u16(hash, *(ui16*) (data+8)); + } + + if constexpr (Size == 11 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u16(hash, *(ui16*) (data+8)); + hash = TSimdI8::CRC32u8(hash, *(ui8*) (data+10)); + } + + if constexpr (Size == 12 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u32(hash, *(ui32*) (data+8)); + } + + if constexpr (Size == 13 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u32(hash, *(ui32*) (data+8)); + hash = TSimdI8::CRC32u8(hash, *(ui8*) (data+12)); + } + + if constexpr (Size == 14 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u32(hash, *(ui32*) (data+8)); + hash = TSimdI8::CRC32u16(hash, *(ui16*) (data+12)); + } + + if constexpr (Size == 15 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u32(hash, *(ui32*) (data+8)); + hash = TSimdI8::CRC32u16(hash, *(ui16*) (data+12)); + hash = TSimdI8::CRC32u8(hash, *(ui8*) (data+14)); + } + + if constexpr (Size == 16 ) { + hash = TSimdI8::CRC32u64(hash, *(ui64*) data); + hash = TSimdI8::CRC32u64(hash, *(ui64*) (data+8)); + } + + return hash; + +} + + +template +inline ui32 CalculateCRC32(const ui8 * data, ui32 size, ui32 hash = 0 ) { + + using TSimdI8 = TTraits::TSimdI8; + + while (size >= 8) { + hash = TSimdI8::CRC32u64(hash, ReadUnaligned(data)); + size -= 8; + data += 8; + } + + switch(size) { + case 7: + hash = TSimdI8::CRC32u32(hash, ReadUnaligned(data)); + data += 4; + [[fallthrough]]; + case 3: + hash = TSimdI8::CRC32u16(hash, ReadUnaligned(data)); + data += 2; + [[fallthrough]]; + case 1: + hash = TSimdI8::CRC32u8(hash, ReadUnaligned(data)); + break; + case 6: + hash = TSimdI8::CRC32u32(hash, ReadUnaligned(data)); + data += 4; + [[fallthrough]]; + case 2: + hash = TSimdI8::CRC32u16(hash, ReadUnaligned(data)); + break; + case 5: + hash = TSimdI8::CRC32u32(hash, ReadUnaligned(data)); + data += 4; + hash = TSimdI8::CRC32u8(hash, ReadUnaligned(data)); + break; + case 4: + hash = TSimdI8::CRC32u32(hash, ReadUnaligned(data)); + break; + case 0: + break; + } + return hash; + +} +} + +} + +} diff --git a/ydb/library/yql/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp b/ydb/library/yql/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp new file mode 100644 index 000000000000..c5ee98321599 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp @@ -0,0 +1,600 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include + +namespace NKikimr { +namespace NMiniKQL { +namespace NPackedTuple { + +using namespace std::chrono_literals; + +static volatile bool IsVerbose = false; +#define CTEST (IsVerbose ? Cerr : Cnull) + +namespace { + +template +void TestCalculateCRC32_Impl() { + std::mt19937_64 rng; // fixed-seed (0) prng + std::vector v(1024); + std::generate(v.begin(), v.end(), rng); + + ui64 nanoseconds = 0; + ui64 totalBytes = 0; + ui32 hash = 0; + for (ui32 test = 0; test < 65535; ++test) { + ui32 bytes = rng() % (sizeof(v[0])*v.size()); + + std::chrono::steady_clock::time_point begin01 = std::chrono::steady_clock::now(); + hash = CalculateCRC32((const ui8 *) v.data(), bytes, hash); + std::chrono::steady_clock::time_point end01 = std::chrono::steady_clock::now(); + + nanoseconds += std::chrono::duration_cast(end01 - begin01).count(); + totalBytes += bytes; + } + CTEST << "Hash: " << hash << Endl; + UNIT_ASSERT_VALUES_EQUAL(hash, 80113928); + CTEST << "Data Size: " << totalBytes << Endl; + CTEST << "Time for hash: " << ((nanoseconds + 999)/1000) << "[microseconds]" << Endl; + CTEST << "Calculating speed: " << totalBytes / ((nanoseconds + 999)/1000) << "MB/sec" << Endl; +} +} + +Y_UNIT_TEST_SUITE(TestHash) { + +Y_UNIT_TEST(TestCalculateCRC32Fallback) { + TestCalculateCRC32_Impl(); +} + +Y_UNIT_TEST(TestCalculateCRC32SSE42) { + if (NX86::HaveSSE42()) + TestCalculateCRC32_Impl(); + else + CTEST << "Skipped SSE42 test\n"; +} + +Y_UNIT_TEST(TestCalculateCRC32AVX2) { + if (NX86::HaveAVX2()) + TestCalculateCRC32_Impl(); + else + CTEST << "Skipped AVX2 test\n"; +} + +} + +Y_UNIT_TEST_SUITE(TupleLayout) { +Y_UNIT_TEST(CreateLayout) { + + TColumnDesc kc1, kc2, pc1, pc2, pc3; + + kc1.Role = EColumnRole::Key; + kc1.DataSize = 8; + + kc2.Role = EColumnRole::Key; + kc2.DataSize = 4; + + pc1.Role = EColumnRole::Payload; + pc1.DataSize = 16; + + pc2.Role = EColumnRole::Payload; + pc2.DataSize = 4; + + pc3.Role = EColumnRole::Payload; + pc3.DataSize = 8; + + std::vector columns{kc1, kc2, pc1, pc2, pc3}; + + auto tl = TTupleLayout::Create(columns); + UNIT_ASSERT(tl->TotalRowSize == 45); +} + +Y_UNIT_TEST(Pack) { + + TScopedAlloc alloc(__LOCATION__); + + TColumnDesc kc1, kc2, pc1, pc2; + + kc1.Role = EColumnRole::Key; + kc1.DataSize = 8; + + kc2.Role = EColumnRole::Key; + kc2.DataSize = 4; + + pc1.Role = EColumnRole::Payload; + pc1.DataSize = 8; + + pc2.Role = EColumnRole::Payload; + pc2.DataSize = 4; + + std::vector columns{kc1, kc2, pc1, pc2}; + + auto tl = TTupleLayout::Create(columns); + UNIT_ASSERT(tl->TotalRowSize == 29); + + const ui64 NTuples1 = 10e6; + + const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; + + std::vector col1(NTuples1, 0); + std::vector col2(NTuples1, 0); + std::vector col3(NTuples1, 0); + std::vector col4(NTuples1, 0); + + std::vector res(Tuples1DataBytes + 64, 0); + + for (ui32 i = 0; i < NTuples1; ++i) { + col1[i] = i; + col2[i] = i; + col3[i] = i; + col4[i] = i; + } + + const ui8* cols[4]; + + cols[0] = (ui8*) col1.data(); + cols[1] = (ui8*) col2.data(); + cols[2] = (ui8*) col3.data(); + cols[3] = (ui8*) col4.data(); + + std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); + + std::vector colValid1((NTuples1 + 7)/8, ~0); + std::vector colValid2((NTuples1 + 7)/8, ~0); + std::vector colValid3((NTuples1 + 7)/8, ~0); + std::vector colValid4((NTuples1 + 7)/8, ~0); + const ui8 *colsValid[4] = { + colValid1.data(), + colValid2.data(), + colValid3.data(), + colValid4.data(), + }; + + std::vector> overflow; + tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); + std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); + ui64 microseconds = std::chrono::duration_cast(end02 - begin02).count(); + if (microseconds == 0) microseconds = 1; + + CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; + CTEST << "Data size = " << Tuples1DataBytes / (1024 * 1024) << "[MB]" << Endl; + CTEST << "Calculating speed = " << Tuples1DataBytes / microseconds << "MB/sec" << Endl; + CTEST << Endl; + + UNIT_ASSERT(true); + +} + +Y_UNIT_TEST(PackVarSize) { + + TScopedAlloc alloc(__LOCATION__); + + TColumnDesc kc1, kcv1, kcv2, kc2, pc1, pc2; + + kc1.Role = EColumnRole::Key; + kc1.DataSize = 8; + + kc2.Role = EColumnRole::Key; + kc2.DataSize = 4; + + pc1.Role = EColumnRole::Payload; + pc1.DataSize = 8; + + pc2.Role = EColumnRole::Payload; + pc2.DataSize = 4; + + kcv1.Role = EColumnRole::Key; + kcv1.DataSize = 8; + kcv1.SizeType = EColumnSizeType::Variable; + + kcv2.Role = EColumnRole::Key; + kcv2.DataSize = 16; + kcv2.SizeType = EColumnSizeType::Variable; + + pc1.Role = EColumnRole::Payload; + pc1.DataSize = 8; + + pc2.Role = EColumnRole::Payload; + pc2.DataSize = 4; + + std::vector columns{kc1, kc2, kcv1, kcv2, pc1, pc2}; + + auto tl = TTupleLayout::Create(columns); + CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl; + UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 54); + + const ui64 NTuples1 = 3; + + const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; + + std::vector col1(NTuples1, 0); + std::vector col2(NTuples1, 0); + std::vector col3(NTuples1, 0); + std::vector col4(NTuples1, 0); + + std::vector vcol1(1, 0); + + std::vector vcol1data; + std::vector vcol2(1, 0); + std::vector vcol2data; + + std::vector res(Tuples1DataBytes + 64, 0); + std::vector vcol1str { + "abc", + "ABCDEFGHIJKLMNO", + "ZYXWVUTSPR" + }; + std::vector vcol2str { + "ABC", + "abcdefghijklmno", + "zyxwvutspr" + }; + for (auto &&str: vcol1str) { + for (auto c: str) + vcol1data.push_back(c); + vcol1.push_back(vcol1data.size()); + } + UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1); + for (auto &&str: vcol2str) { + for (auto c: str) + vcol2data.push_back(c); + vcol2.push_back(vcol2data.size()); + } + UNIT_ASSERT_VALUES_EQUAL(vcol2.size(), NTuples1 + 1); + for (ui32 i = 0; i < NTuples1; ++i) { + col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1; + col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1; + col3[i] = (3ull<<(sizeof(col3[0])*8 - 4)) + i + 1; + col4[i] = (4ull<<(sizeof(col4[0])*8 - 4)) + i + 1; + } + + const ui8* cols[4 + 2*2]; + + cols[0] = (ui8*) col1.data(); + cols[1] = (ui8*) col2.data(); + cols[2] = (ui8*) vcol1.data(); + cols[3] = (ui8*) vcol1data.data(); + cols[4] = (ui8*) vcol2.data(); + cols[5] = (ui8*) vcol2data.data(); + cols[6] = (ui8*) col3.data(); + cols[7] = (ui8*) col4.data(); + + std::vector> overflow; + std::vector colValid((NTuples1 + 7)/8, ~0); + const ui8 *colsValid[8] = { + colValid.data(), + colValid.data(), + colValid.data(), + nullptr, + colValid.data(), + nullptr, + colValid.data(), + colValid.data(), + }; + + std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); + tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); + std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); + ui64 microseconds = std::chrono::duration_cast(end02 - begin02).count(); + + if (microseconds == 0) + microseconds = 1; + + CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; +#ifndef NDEBUG + CTEST << "Result size = " << Tuples1DataBytes << Endl; + CTEST << "Result = "; + for (ui32 i = 0; i < Tuples1DataBytes; ++i) + CTEST << int(res[i]) << ' '; + CTEST << Endl; + CTEST << "Overflow size = " << overflow.size() << Endl; + CTEST << "Overflow = "; + for (auto c: overflow) + CTEST << int(c) << ' '; + CTEST << Endl; +#endif + static const ui8 expected_data[54*3] = { + // row1 + + 0xe2,0x47,0x16,0x6c, // hash + 0x1, 0, 0, 0x20, // col1 + 0x1, 0, 0, 0, 0, 0, 0, 0x10, // col2 + 0x3, 0x61, 0x62, 0x63, 0, 0, 0, 0, 0, // vcol1 + 0x3, 0x41, 0x42, 0x43, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // vcol2 + 0x3f, //NULL bitmap + 0x1, 0, 0, 0x40, // col3 + 0x1, 0, 0, 0, 0, 0, 0, 0x30, // col4 + // row2 + 0xc2, 0x1c, 0x1b, 0xa8, // hash + 0x2, 0, 0, 0x20, // col1 + 0x2, 0, 0, 0, 0, 0, 0, 0x10, // col2 + 0xff, 0, 0, 0, 0, 0xf, 0, 0, 0, // vcol1 [overflow offset, overflow size] + 0xf, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, 0x6d, 0x6e, 0x6f, // vcol2 + 0x3f, // NULL bitmap + 0x2, 0, 0, 0x40, // col3 + 0x2, 0, 0, 0, 0, 0, 0, 0x30, // col4 + // row3 + 0xfa, 0x49, 0x5, 0xe9, // hash + 0x3, 0, 0, 0x20, // col1 + 0x3, 0, 0, 0, 0, 0, 0, 0x10, // col2 + 0xff, 0xf, 0, 0, 0, 0xa, 0, 0, 0, // vcol1 [overflow offset, overflow size] + 0xa, 0x7a, 0x79, 0x78, 0x77, 0x76, 0x75, 0x74, 0x73, 0x70, 0x72, 0, 0, 0, 0, 0, // vcol2 + 0x3f, // NULL bitmap + 0x3, 0, 0, 0x40, // col3 + 0x3, 0, 0, 0, 0, 0, 0, 0x30, // col4 + }; + static const ui8 expected_overflow[25] = { + 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x4a, 0x4b, 0x4c, 0x4d, 0x4e, 0x4f, + 0x5a, 0x59, 0x58, 0x57, 0x56, 0x55, 0x54, 0x53, 0x50, 0x52, + }; + UNIT_ASSERT_VALUES_EQUAL(sizeof(expected_data), tl->TotalRowSize*NTuples1); + UNIT_ASSERT_VALUES_EQUAL(overflow.size(), sizeof(expected_overflow)); + for (ui32 i = 0; i < sizeof(expected_data); ++i) + UNIT_ASSERT_VALUES_EQUAL(expected_data[i], res[i]); + for (ui32 i = 0; i < sizeof(expected_overflow); ++i) + UNIT_ASSERT_VALUES_EQUAL(expected_overflow[i], overflow[i]); +} + +Y_UNIT_TEST(PackVarSizeBig) { + + TScopedAlloc alloc(__LOCATION__); + + TColumnDesc kc1, kc2, kcv1; + + kc1.Role = EColumnRole::Key; + kc1.DataSize = 1; + + kc2.Role = EColumnRole::Key; + kc2.DataSize = 2; + + kcv1.Role = EColumnRole::Key; + kcv1.DataSize = 1000; + kcv1.SizeType = EColumnSizeType::Variable; + + std::vector columns{kc1, kc2, kcv1 }; + + auto tl = TTupleLayout::Create(columns); + //CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl; + UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 263); + + const ui64 NTuples1 = 2; + + const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; + + std::vector col1(NTuples1, 0); + std::vector col2(NTuples1, 0); + + std::vector vcol1(1, 0); + + std::vector vcol1data; + + std::vector res(Tuples1DataBytes + 64, 0); + std::vector vcol1str { + "zaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbb" + "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabcdefghijklnmorstuvwxy", + "zaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbb" + "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbrstuv", + }; + for (auto &&str: vcol1str) { + for (auto c: str) + vcol1data.push_back(c); + vcol1.push_back(vcol1data.size()); + } + UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1); + for (ui32 i = 0; i < NTuples1; ++i) { + col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1; + col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1; + } + + const ui8* cols[2 + 1*2]; + + cols[0] = (ui8*) col1.data(); + cols[1] = (ui8*) col2.data(); + cols[2] = (ui8*) vcol1.data(); + cols[3] = (ui8*) vcol1data.data(); + + std::vector colValid((NTuples1 + 7)/8, ~0); + const ui8 *colsValid[2 + 1*2] = { + colValid.data(), + colValid.data(), + colValid.data(), + nullptr, + }; + std::vector> overflow; + + std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); + tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); + std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); + ui64 microseconds = std::chrono::duration_cast(end02 - begin02).count(); + + CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; +#ifndef NDEBUG + CTEST << "Result size = " << Tuples1DataBytes << Endl; + CTEST << "Result = "; + for (ui32 i = 0; i < Tuples1DataBytes; ++i) + CTEST << int(res[i]) << ' '; + CTEST << Endl; + CTEST << "Overflow size = " << overflow.size() << Endl; + CTEST << "Overflow = "; + for (auto c: overflow) + CTEST << int(c) << ' '; + CTEST << Endl; +#endif + static const ui8 expected_data[263*2] = { + // row1 + 0xe1,0x22,0x63,0xf5, // hash + 0x11, // col1 + 0x1, 0x20, // col2 + 0xff, 0, 0, 0, 0, 0xb, 0, 0, 0, // vcol2 [ overflow offset, overflow size ] + 0x7a, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, + 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, + 0x7, // NULL bitmap + // row 2 + 0xab,0xa5,0x5f,0xd4, // hash + 0x12, // col1 + 0x2, 0x20, // col2 + 0xfe, 0x7a, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, + 0x61, 0x61, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, + 0x62, 0x62, 0x72, 0x73, 0x74, 0x75, 0x76, + 0x7, // NULLs bitmap + }; + static const ui8 expected_overflow[11] = { + 0x6e, 0x6d, 0x6f, 0x72, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, + }; + UNIT_ASSERT_VALUES_EQUAL(sizeof(expected_data), tl->TotalRowSize*NTuples1); + UNIT_ASSERT_VALUES_EQUAL(overflow.size(), sizeof(expected_overflow)); + for (ui32 i = 0; i < sizeof(expected_data); ++i) + UNIT_ASSERT_VALUES_EQUAL(expected_data[i], res[i]); + for (ui32 i = 0; i < sizeof(expected_overflow); ++i) + UNIT_ASSERT_VALUES_EQUAL(expected_overflow[i], overflow[i]); +} +Y_UNIT_TEST(PackIsValidFuzz) { + + TScopedAlloc alloc(__LOCATION__); + + std::mt19937 rng; // fixed-seed (0) prng + std::vector columns; + std::vector> colsdata; + std::vector colsptr; + std::vector> isValidData; + std::vector isValidPtr; + + ui64 totalNanoseconds = 0; + ui64 totalSize = 0; + ui64 totalRows = 0; + for (ui32 test = 0; test < 10; ++test) { + ui32 rows = 1 + (rng() % 1000); + ui32 cols = 1 + (rng() % 100); + columns.resize(cols); + colsdata.resize(cols); + colsptr.resize(cols); + isValidData.resize(cols); + isValidPtr.resize(cols); + ui32 isValidSize = (rows + 7)/8; + totalRows += rows; + for (ui32 j = 0; j < cols; ++j) { + auto &col = columns[j]; + col.Role = (rng() % 10 < 1) ? EColumnRole::Key : EColumnRole::Payload; + col.DataSize = 1u <<(rng() % 16); + col.SizeType = EColumnSizeType::Fixed; + colsdata[j].resize(rows*col.DataSize); + colsptr[j] = colsdata[j].data(); + isValidData[j].resize(isValidSize); + isValidPtr[j] = isValidData[j].data(); + std::generate(isValidData[j].begin(), isValidData[j].end(), rng); + } + auto tl = TTupleLayout::Create(columns); + std::vector res; + for (ui32 subtest = 0; subtest < 20; ++subtest) { + ui32 subRows = 1 + (rows ? rng() % (rows - 1) : 0); + ui32 off = subRows != rows ? rng() % (rows - subRows) : 0; + std::vector> overflow; + totalSize += subRows*tl->TotalRowSize; + res.resize(subRows*tl->TotalRowSize); + + std::chrono::steady_clock::time_point begin01 = std::chrono::steady_clock::now(); + tl->Pack(colsptr.data(), isValidPtr.data(), res.data(), overflow, off, subRows); + std::chrono::steady_clock::time_point end01 = std::chrono::steady_clock::now(); + totalNanoseconds += std::chrono::duration_cast(end01 - begin01).count(); + + UNIT_ASSERT_VALUES_EQUAL(overflow.size(), 0); + auto resptr = res.data(); + for (ui32 row = 0; row < subRows; ++row, resptr += tl->TotalRowSize) { + for (ui32 j = 0; j < cols; ++j) { + auto &col = tl->Columns[j]; + UNIT_ASSERT_VALUES_EQUAL(((resptr[tl->BitmaskOffset + (j / 8)] >> (j % 8)) & 1), ((isValidData[col.OriginalIndex][(off + row) / 8] >> ((off + row) % 8)) & 1)); + } + } + } + } + + if (totalNanoseconds == 0) totalNanoseconds = 1; + + CTEST << "Time for " << totalRows << " transpose (external cycle)= " << (totalNanoseconds + 999)/1000 << "[microseconds]" << Endl; + CTEST << "Data size = " << totalSize / (1024 * 1024) << "[MB]" << Endl; + CTEST << "Calculating speed = " << totalSize / ((totalNanoseconds + 999)/1000) << "MB/sec" << Endl; + CTEST << Endl; +} +} + + + +} +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/packed_tuple/tuple.cpp b/ydb/library/yql/minikql/comp_nodes/packed_tuple/tuple.cpp new file mode 100644 index 000000000000..d20cc784a7b3 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/packed_tuple/tuple.cpp @@ -0,0 +1,238 @@ +#include "tuple.h" +#include "hashes_calc.h" + +#include +#include +#include +#include + +#include +#include + +#include + +namespace NKikimr { +namespace NMiniKQL { +namespace NPackedTuple { + + THolder TTupleLayout::Create(const std::vector& columns) { + + if (NX86::HaveAVX2()) + return MakeHolder>(columns); + + if (NX86::HaveSSE42()) + return MakeHolder>(columns); + + return MakeHolder>(columns); + + } + + template + TTupleLayoutFallback::TTupleLayoutFallback(const std::vector& columns) : TTupleLayout(columns) { + + for (ui32 i = 0, idx = 0; i < OrigColumns.size(); ++i) { + auto &col = OrigColumns[i]; + + col.OriginalIndex = idx; + + if (col.SizeType == EColumnSizeType::Variable) { + // we cannot handle (rare) overflow strings unless we have at least space for header; + // size of inlined strings is limited to 254 bytes, limit maximum inline data size + col.DataSize = std::max(1 + 2*sizeof(ui32), std::min(255, col.DataSize)); + idx += 2; // Variable-size takes two columns: one for offsets, and another for payload + } else { + idx += 1; + } + + if (col.Role == EColumnRole::Key) { + KeyColumns.push_back(col); + } else { + PayloadColumns.push_back(col); + } + } + + KeyColumnsNum = KeyColumns.size(); + + auto ColumnDescLess = [](const TColumnDesc& a, const TColumnDesc& b) { + if (a.SizeType != b.SizeType) // Fixed first + return a.SizeType == EColumnSizeType::Fixed; + + if (a.DataSize == b.DataSize) + // relative order of (otherwise) same key columns must be preserved + return a.OriginalIndex < b.OriginalIndex; + + return a.DataSize < b.DataSize; + }; + + std::sort(KeyColumns.begin(), KeyColumns.end(), ColumnDescLess); + std::sort(PayloadColumns.begin(), PayloadColumns.end(), ColumnDescLess); + + KeyColumnsFixedEnd = 0; + + ui32 currOffset = 4; // crc32 hash in the beginning + KeyColumnsOffset = currOffset; + KeyColumnsFixedNum = KeyColumnsNum; + + for (ui32 i = 0; i < KeyColumnsNum; ++i) { + auto &col = KeyColumns[i]; + + if (col.SizeType == EColumnSizeType::Variable && KeyColumnsFixedEnd == 0) { + KeyColumnsFixedEnd = currOffset; + KeyColumnsFixedNum = i; + } + + col.ColumnIndex = i; + col.Offset = currOffset; + Columns.push_back(col); + currOffset += col.DataSize; + } + + KeyColumnsEnd = currOffset; + + if (KeyColumnsFixedEnd == 0) // >= 4 if was ever assigned + KeyColumnsFixedEnd = KeyColumnsEnd; + + KeyColumnsSize = KeyColumnsEnd - KeyColumnsOffset; + BitmaskOffset = currOffset; + + BitmaskSize = (OrigColumns.size() + 7) / 8; + + currOffset += BitmaskSize; + BitmaskEnd = currOffset; + + PayloadOffset = currOffset; + + for (ui32 i = 0; i < PayloadColumns.size(); ++i) { + auto &col = PayloadColumns[i]; + col.ColumnIndex = KeyColumnsNum + i; + col.Offset = currOffset; + Columns.push_back(col); + currOffset += col.DataSize; + } + + PayloadEnd = currOffset; + PayloadSize = PayloadEnd - PayloadOffset; + + TotalRowSize = currOffset; + + for (auto &col: Columns) { + if (col.SizeType == EColumnSizeType::Variable) { + VariableColumns_.push_back(col); + } else if (IsPowerOf2(col.DataSize) && col.DataSize < (1u<= 1 + 2*4 + // if size of payload is less than col.DataSize: + // u8 one byte of size (0..254) + // u8 [size] data + // u8 [DataSize - 1 - size] padding + // if size of payload is greater than DataSize: + // u8 = 255 + // u32 = offset in overflow buffer + // u32 = size + // u8 [DataSize - 1 - 2*4] initial bytes of data + // Data is expected to be consistent with isValidBitmask (0 for fixed-size, empty for variable-size) + template + void TTupleLayoutFallback::Pack( const ui8** columns, const ui8** isValidBitmask, ui8 * res, std::vector> &overflow, ui32 start, ui32 count) const { + + std::vector bitmaskMatrix(BitmaskSize); + + for (; count--; ++start, res += TotalRowSize) { + ui32 hash = 0; + auto bitmaskIdx = start / 8; + auto bitmaskShift = start % 8; + + bool anyOverflow = false; + + for (ui32 i = KeyColumnsFixedNum; i < KeyColumns.size(); ++i) { + auto& col = KeyColumns[i]; + ui32 dataOffset = ReadUnaligned(columns[col.OriginalIndex] + sizeof(ui32)*start); + ui32 nextOffset = ReadUnaligned(columns[col.OriginalIndex] + sizeof(ui32)*(start + 1)); + auto size = nextOffset - dataOffset; + + if (size >= col.DataSize) { + anyOverflow = true; + break; + } + } + + std::memset(res + BitmaskOffset, 0, BitmaskSize); + + for (ui32 i = 0; i < Columns.size(); ++i) { + auto& col = Columns[i]; + + res[BitmaskOffset + (i / 8)] |= ((isValidBitmask[col.OriginalIndex][bitmaskIdx] >> bitmaskShift) & 1u) << (i % 8); + } + + for (auto &col: FixedNPOTColumns_) { + std::memcpy(res + col.Offset, columns[col.OriginalIndex] + start*col.DataSize, col.DataSize); + } + +#define PackPOTColumn(POT) \ + for (auto &col: FixedPOTColumns_[POT]) { \ + std::memcpy(res + col.Offset, columns[col.OriginalIndex] + start*(1u<(columns[col.OriginalIndex] + sizeof(ui32)*start); + auto nextOffset = ReadUnaligned(columns[col.OriginalIndex] + sizeof(ui32)*(start + 1)); + auto size = nextOffset - dataOffset; + auto data = columns[col.OriginalIndex + 1] + dataOffset; + + if (size >= col.DataSize) { + res[col.Offset] = 255; + + ui32 prefixSize = (col.DataSize - 1 - 2*sizeof(ui32)); + auto overflowSize = size - prefixSize; + auto overflowOffset = overflow.size(); + + overflow.resize(overflowOffset + overflowSize); + + WriteUnaligned(res + col.Offset + 1 + 0*sizeof(ui32), overflowOffset); + WriteUnaligned(res + col.Offset + 1 + 1*sizeof(ui32), overflowSize); + std::memcpy(res + col.Offset + 1 + 2*sizeof(ui32), data, prefixSize); + std::memcpy(overflow.data() + overflowOffset, data + prefixSize, overflowSize); + } else { + Y_DEBUG_ABORT_UNLESS(size < 255); + res[col.Offset] = size; + std::memcpy(res + col.Offset + 1, data, size); + std::memset(res + col.Offset + 1 + size, 0, col.DataSize - (size + 1)); + } + + if (anyOverflow && col.Role == EColumnRole::Key) { + hash = CalculateCRC32((ui8 *)&size, hash); + hash = CalculateCRC32(data, size, hash); + } + } + + // isValid bitmap is NOT included into hashed data + if (anyOverflow) { + hash = CalculateCRC32(res + KeyColumnsOffset, KeyColumnsFixedEnd - KeyColumnsOffset, hash ); + } else { + hash = CalculateCRC32(res + KeyColumnsOffset, KeyColumnsEnd - KeyColumnsOffset); + } + WriteUnaligned(res, hash); + } + } +} +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/packed_tuple/tuple.h b/ydb/library/yql/minikql/comp_nodes/packed_tuple/tuple.h new file mode 100644 index 000000000000..aa4e48758ef7 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/packed_tuple/tuple.h @@ -0,0 +1,81 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include +#include + +namespace NKikimr { +namespace NMiniKQL { +namespace NPackedTuple { + +// Defines if data type of particular column variable or fixed +enum class EColumnSizeType {Fixed, Variable}; + +// Defines if particular column is key column or payload column +enum class EColumnRole {Key, Payload}; + +// Describes layout and size of particular column +struct TColumnDesc { + ui32 ColumnIndex = 0; // Index of the column in particular layout + ui32 OriginalIndex = 0; // Index of the column in input representation + EColumnRole Role = EColumnRole::Payload; // Role of the particular column in tuple (Key or Payload) + EColumnSizeType SizeType = EColumnSizeType::Fixed; // Fixed size or variable size column + ui32 DataSize = 0; // Size of the column in bytes for fixed size part + // Must be same for matching key columns + ui32 Offset = 0; // Offset in bytes for column value from the beginning of tuple +}; + +// Defines in memory layout of tuple. +struct TTupleLayout { + std::vector OrigColumns; // Columns description and order as passed during layout construction + std::vector Columns; // Vector describing all columns in order corresponding to tuple layout + std::vector KeyColumns; // Vector describing key columns + std::vector PayloadColumns; // Vector describing payload columns + ui32 KeyColumnsNum; // Total number of key columns + ui32 KeyColumnsSize; // Total size of all key columns in bytes + ui32 KeyColumnsOffset; // Start of row-packed keys data + ui32 KeyColumnsFixedEnd; // Offset in row-packed keys data of first variable key (can be same as KeyColumnsEnd, if there are none) + ui32 KeyColumnsFixedNum; // Number of fixed-size columns + ui32 KeyColumnsEnd; // First byte after key columns. Start of bitmask for row-based columns + ui32 BitmaskSize; // Size of bitmask for null values flag in columns + ui32 BitmaskOffset; // Offset of nulls bitmask. = KeyColumnsEnd + ui32 BitmaskEnd; // First byte after bitmask. = PayloadOffset + ui32 PayloadSize; // Total size in bytes of the payload columns + ui32 PayloadOffset; // Offset of payload values. = BitmaskEnd. + ui32 PayloadEnd; // First byte after payload + ui32 TotalRowSize; // Total size of bytes for packed row + + // Creates new tuple layout based on provided columns description. + static THolder Create(const std::vector& columns); + + TTupleLayout(const std::vector &columns):OrigColumns(columns) {} + virtual ~TTupleLayout() {} + + // Takes array of pointer to columns, array of validity bitmaps, + // outputs packed rows + virtual void Pack( const ui8** columns, const ui8** isValidBitmask, ui8 * res, std::vector> &overflow, ui32 start, ui32 count) const = 0; +}; + +template +struct TTupleLayoutFallback: public TTupleLayout { + + TTupleLayoutFallback(const std::vector& columns); + + void Pack( const ui8** columns, const ui8** isValidBitmask, ui8 * res, std::vector> &overflow, ui32 start, ui32 count) const override; + +private: + std::array, 5> FixedPOTColumns_; // Fixed-size columns for power-of-two sizes from 1 to 16 bytes + std::vector FixedNPOTColumns_; // Remaining fixed-size columns + std::vector VariableColumns_; // Variable-size columns only + using TSimdI8 = TTrait::TSimdI8; +}; + +} +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/packed_tuple/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/packed_tuple/ut/ya.make new file mode 100644 index 000000000000..d43090af0af1 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/packed_tuple/ut/ya.make @@ -0,0 +1,43 @@ +UNITTEST_FOR(ydb/library/yql/minikql/comp_nodes/packed_tuple) + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +REQUIREMENTS(ram:32) + +OWNER( + g:yql + g:yql_ydb_core +) + +SRCS( + packed_tuple_ut.cpp +) + +PEERDIR( + ydb/library/yql/public/udf + ydb/library/yql/public/udf/arrow + ydb/library/yql/public/udf/service/exception_policy + ydb/library/yql/sql/pg_dummy +) + +CFLAGS( + -mprfchw +) + +YQL_LAST_ABI_VERSION() + +IF (MKQL_RUNTIME_VERSION) + CFLAGS( + -DMKQL_RUNTIME_VERSION=$MKQL_RUNTIME_VERSION + ) +ENDIF() + + +END() diff --git a/ydb/library/yql/minikql/comp_nodes/packed_tuple/ya.make b/ydb/library/yql/minikql/comp_nodes/packed_tuple/ya.make new file mode 100644 index 000000000000..6104ad0a3e0e --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/packed_tuple/ya.make @@ -0,0 +1,32 @@ +LIBRARY() + +OWNER( + g:yql + g:yql_ydb_core +) + +SRCS( + tuple.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/library/binary_json + ydb/library/yql/minikql + ydb/library/yql/utils + ydb/library/yql/utils/log + library/cpp/digest/crc32c +) + +CFLAGS( + -mprfchw + -DMKQL_DISABLE_CODEGEN +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +)