Skip to content

Commit 9030f7d

Browse files
committed
Add tests for LeftOnly join
1 parent 1ae62de commit 9030f7d

File tree

1 file changed

+112
-0
lines changed

1 file changed

+112
-0
lines changed

ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,118 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTest) {
229229
[](ui64 key) { return key < testSize; });
230230
UNIT_ASSERT_VALUES_EQUAL(dictSize, blockLength);
231231
}
232+
233+
Y_UNIT_TEST(TestLeftOnlyOnUint64) {
234+
TSetup<false> setup;
235+
TProgramBuilder& pb = *setup.PgmBuilder;
236+
237+
const TVector<ui64> dictKeys = {1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144};
238+
const auto dict = MakeStrDict(pb, dictKeys);
239+
240+
const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
241+
const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id);
242+
const auto ui64BlockType = pb.NewBlockType(ui64Type, TBlockType::EShape::Many);
243+
const auto strBlockType = pb.NewBlockType(strType, TBlockType::EShape::Many);
244+
const auto blockLenType = pb.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
245+
const auto structType = pb.NewStructType({
246+
{"key", ui64BlockType},
247+
{"subkey", ui64BlockType},
248+
{"payload", strBlockType},
249+
{"_yql_block_length", blockLenType}
250+
});
251+
const auto fields = NameToIndex(AS_TYPE(TStructType, structType));
252+
const auto listStructType = pb.NewListType(structType);
253+
254+
const auto leftArg = pb.Arg(listStructType);
255+
256+
const auto leftWideFlow = pb.ExpandMap(pb.ToFlow(leftArg),
257+
[&](TRuntimeNode item) -> TRuntimeNode::TList {
258+
return {
259+
pb.Member(item, "key"),
260+
pb.Member(item, "subkey"),
261+
pb.Member(item, "payload"),
262+
pb.Member(item, "_yql_block_length")
263+
};
264+
});
265+
266+
const auto joinNode = pb.BlockMapJoinCore(leftWideFlow, dict,
267+
EJoinKind::LeftOnly, {0});
268+
269+
const auto rootNode = pb.Collect(pb.NarrowMap(joinNode,
270+
[&](TRuntimeNode::TList items) -> TRuntimeNode {
271+
return pb.NewStruct(structType, {
272+
{"key", items[0]},
273+
{"subkey", items[1]},
274+
{"payload", items[2]},
275+
{"_yql_block_length", items.back()}
276+
});
277+
}));
278+
279+
const auto graph = setup.BuildGraph(rootNode, {leftArg.GetNode()});
280+
const auto& leftBlocks = graph->GetEntryPoint(0, true);
281+
const auto& holderFactory = graph->GetHolderFactory();
282+
auto& ctx = graph->GetContext();
283+
284+
constexpr size_t testSize = 256;
285+
TVector<ui64> keys(testSize);
286+
TVector<ui64> subkeys;
287+
std::iota(keys.begin(), keys.end(), 1);
288+
std::transform(keys.cbegin(), keys.cend(), std::back_inserter(subkeys),
289+
[](const auto& value) { return value * 1001; });
290+
291+
TVector<const char*> payloads;
292+
std::transform(keys.cbegin(), keys.cend(), std::back_inserter(payloads),
293+
[](const auto& value) { return twoLetterPayloads[value].c_str(); });
294+
295+
const size_t blockSize = 64;
296+
size_t current = 0;
297+
TDefaultListRepresentation leftListValues;
298+
while (current < testSize) {
299+
arrow::UInt64Builder keysBuilder(&ctx.ArrowMemoryPool);
300+
arrow::UInt64Builder subkeysBuilder(&ctx.ArrowMemoryPool);
301+
arrow::BinaryBuilder payloadsBuilder(&ctx.ArrowMemoryPool);
302+
ARROW_OK(keysBuilder.Reserve(blockSize));
303+
ARROW_OK(subkeysBuilder.Reserve(blockSize));
304+
ARROW_OK(payloadsBuilder.Reserve(blockSize));
305+
for (size_t i = 0; i < blockSize; i++, current++) {
306+
keysBuilder.UnsafeAppend(keys[current]);
307+
subkeysBuilder.UnsafeAppend(subkeys[current]);
308+
ARROW_OK(payloadsBuilder.Append(payloads[current], payloadSize));
309+
}
310+
std::shared_ptr<arrow::ArrayData> keysData;
311+
ARROW_OK(keysBuilder.FinishInternal(&keysData));
312+
std::shared_ptr<arrow::ArrayData> subkeysData;
313+
ARROW_OK(subkeysBuilder.FinishInternal(&subkeysData));
314+
std::shared_ptr<arrow::ArrayData> payloadsData;
315+
ARROW_OK(payloadsBuilder.FinishInternal(&payloadsData));
316+
317+
NUdf::TUnboxedValue* items = nullptr;
318+
const auto structObj = holderFactory.CreateDirectArrayHolder(fields.size(), items);
319+
items[fields.at("key")] = holderFactory.CreateArrowBlock(keysData);
320+
items[fields.at("subkey")] = holderFactory.CreateArrowBlock(subkeysData);
321+
items[fields.at("payload")] = holderFactory.CreateArrowBlock(payloadsData);
322+
items[fields.at("_yql_block_length")] = MakeBlockCount(holderFactory, blockSize);
323+
leftListValues = leftListValues.Append(std::move(structObj));
324+
}
325+
leftBlocks->SetValue(ctx, holderFactory.CreateDirectListHolder(std::move(leftListValues)));
326+
const auto joinIterator = graph->GetValue().GetListIterator();
327+
328+
NUdf::TUnboxedValue item;
329+
TVector<NUdf::TUnboxedValue> joinResult;
330+
while (joinIterator.Next(item)) {
331+
joinResult.push_back(item);
332+
}
333+
334+
UNIT_ASSERT_VALUES_EQUAL(joinResult.size(), 1);
335+
const auto blocks = joinResult.front();
336+
const auto blockLengthValue = blocks.GetElement(fields.at("_yql_block_length"));
337+
const auto blockLengthDatum = TArrowBlock::From(blockLengthValue).GetDatum();
338+
UNIT_ASSERT(blockLengthDatum.is_scalar());
339+
const auto blockLength = blockLengthDatum.scalar_as<arrow::UInt64Scalar>().value;
340+
const auto dictSize = std::count_if(dictKeys.cbegin(), dictKeys.cend(),
341+
[](ui64 key) { return key < testSize; });
342+
UNIT_ASSERT_VALUES_EQUAL(testSize - dictSize, blockLength);
343+
}
232344
} // Y_UNIT_TEST_SUITE
233345

234346
} // namespace NMiniKQL

0 commit comments

Comments
 (0)