Skip to content

Commit 6ff0493

Browse files
feat(kqp): support stream lookup join for index table (#9429)
1 parent 893a1ee commit 6ff0493

File tree

6 files changed

+245
-128
lines changed

6 files changed

+245
-128
lines changed

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,10 @@
209209
{
210210
"Name": "TKqlStreamLookupIndex",
211211
"Base": "TKqlLookupIndexBase",
212-
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"}
212+
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"},
213+
"Children": [
214+
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
215+
]
213216
},
214217
{
215218
"Name": "TKqlEffectBase",

ydb/core/kqp/host/kqp_type_ann.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,12 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
451451
TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
452452
const TKikimrTablesData& tablesData, bool withSystemColumns)
453453
{
454-
if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) || TKqlStreamLookupTable::Match(node.Get()) ? 4 : 3, ctx)) {
454+
const bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get()) || TKqlStreamLookupIndex::Match(node.Get());
455+
if (isStreamLookup && !EnsureArgsCount(*node, TKqlStreamLookupIndex::Match(node.Get()) ? 5 : 4, ctx)) {
456+
return TStatus::Error;
457+
}
458+
459+
if (!isStreamLookup && !EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
455460
return TStatus::Error;
456461
}
457462

@@ -495,9 +500,9 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
495500
YQL_ENSURE(lookupType);
496501

497502
const TStructExprType* structType = nullptr;
498-
bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get());
499503
if (isStreamLookup) {
500-
auto lookupStrategy = node->Child(TKqlStreamLookupTable::idx_LookupStrategy);
504+
auto lookupStrategy = node->Child(TKqlStreamLookupTable::Match(node.Get()) ?
505+
TKqlStreamLookupTable::idx_LookupStrategy : TKqlStreamLookupIndex::idx_LookupStrategy);
501506
if (!EnsureAtom(*lookupStrategy, ctx)) {
502507
return TStatus::Error;
503508
}

ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -412,44 +412,67 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
412412
}
413413

414414
TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
415-
if (!kqpCtx.IsScanQuery()) {
415+
if (!node.Maybe<TKqlStreamLookupIndex>()) {
416416
return node;
417417
}
418418

419-
if (auto maybeStreamLookupIndex = node.Maybe<TKqlStreamLookupIndex>()) {
420-
auto streamLookupIndex = maybeStreamLookupIndex.Cast();
419+
auto streamLookupIndex = node.Maybe<TKqlStreamLookupIndex>().Cast();
421420

422-
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
423-
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
421+
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
422+
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
424423

425-
const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
426-
if (!needDataRead) {
427-
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
428-
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
429-
.LookupKeys(streamLookupIndex.LookupKeys())
430-
.Columns(streamLookupIndex.Columns())
431-
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
432-
.Done();
433-
}
434-
435-
auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);
436-
437-
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
424+
const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
425+
if (!needDataRead) {
426+
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
438427
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
439428
.LookupKeys(streamLookupIndex.LookupKeys())
440-
.Columns(keyColumnsList)
441-
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
429+
.Columns(streamLookupIndex.Columns())
430+
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
442431
.Done();
432+
}
443433

444-
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
445-
.Table(streamLookupIndex.Table())
446-
.LookupKeys(lookupIndexTable.Ptr())
447-
.Columns(streamLookupIndex.Columns())
448-
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
434+
auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);
435+
436+
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
437+
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
438+
.LookupKeys(streamLookupIndex.LookupKeys())
439+
.Columns(keyColumnsList)
440+
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
441+
.Done();
442+
443+
TMaybeNode<TExprBase> lookupKeys;
444+
YQL_ENSURE(streamLookupIndex.LookupStrategy().Maybe<TCoAtom>());
445+
TString lookupStrategy = streamLookupIndex.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
446+
if (lookupStrategy == TKqpStreamLookupJoinStrategyName || lookupStrategy == TKqpStreamLookupSemiJoinStrategyName) {
447+
// Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>,
448+
// expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>,
449+
// so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>>
450+
lookupKeys = Build<TCoMap>(ctx, node.Pos())
451+
.Input(lookupIndexTable)
452+
.Lambda()
453+
.Args({"tuple"})
454+
.Body<TExprList>()
455+
.Add<TCoNth>()
456+
.Tuple("tuple")
457+
.Index().Value("1").Build()
458+
.Build()
459+
.Add<TCoNth>()
460+
.Tuple("tuple")
461+
.Index().Value("0").Build()
462+
.Build()
463+
.Build()
464+
.Build()
449465
.Done();
466+
} else {
467+
lookupKeys = lookupIndexTable;
450468
}
451469

452-
return node;
470+
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
471+
.Table(streamLookupIndex.Table())
472+
.LookupKeys(lookupKeys.Cast())
473+
.Columns(streamLookupIndex.Columns())
474+
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
475+
.Done();
453476
}
454477

455478
/// Can push flat map node to read from table using only columns available in table description

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos,
198198
.Columns(columns)
199199
.Index()
200200
.Build(indexName)
201+
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
201202
.Done();
202203
}
203204

@@ -336,6 +337,7 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) {
336337
TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
337338
const TDqJoin& join,
338339
TExprBase leftInput,
340+
const TString& indexName,
339341
const TPrefixLookup& rightLookup,
340342
const TKqpMatchReadResult& rightReadMatch,
341343
TExprContext& ctx)
@@ -399,19 +401,30 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
399401
? TKqpStreamLookupSemiJoinStrategyName
400402
: TKqpStreamLookupJoinStrategyName;
401403

402-
TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
403-
.Table(rightLookup.MainTable)
404-
.LookupKeys(leftInput)
405-
.Columns(lookupColumns.Cast())
406-
.LookupStrategy().Build(strategy)
407-
.Done();
404+
TMaybeNode<TExprBase> lookupJoin;
405+
if (indexName) {
406+
lookupJoin = Build<TKqlStreamLookupIndex>(ctx, join.Pos())
407+
.Table(rightLookup.MainTable)
408+
.LookupKeys(leftInput)
409+
.Columns(lookupColumns.Cast())
410+
.Index().Build(indexName)
411+
.LookupStrategy().Build(strategy)
412+
.Done();
413+
} else {
414+
lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
415+
.Table(rightLookup.MainTable)
416+
.LookupKeys(leftInput)
417+
.Columns(lookupColumns.Cast())
418+
.LookupStrategy().Build(strategy)
419+
.Done();
420+
}
408421

409422
// Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>>
410423
// so we should apply filters to second element of tuple for each row
411424

412425
if (extraRightFilter.IsValid()) {
413426
lookupJoin = Build<TCoMap>(ctx, join.Pos())
414-
.Input(lookupJoin)
427+
.Input(lookupJoin.Cast())
415428
.Lambda()
416429
.Args({"tuple"})
417430
.Body<TExprList>()
@@ -433,7 +446,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
433446

434447
if (rightReadMatch.ExtractMembers) {
435448
lookupJoin = Build<TCoMap>(ctx, join.Pos())
436-
.Input(lookupJoin)
449+
.Input(lookupJoin.Cast())
437450
.Lambda()
438451
.Args({"tuple"})
439452
.Body<TExprList>()
@@ -455,7 +468,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
455468

456469
if (rightReadMatch.FilterNullMembers) {
457470
lookupJoin = Build<TCoMap>(ctx, join.Pos())
458-
.Input(lookupJoin)
471+
.Input(lookupJoin.Cast())
459472
.Lambda()
460473
.Args({"tuple"})
461474
.Body<TExprList>()
@@ -477,7 +490,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
477490

478491
if (rightReadMatch.SkipNullMembers) {
479492
lookupJoin = Build<TCoMap>(ctx, join.Pos())
480-
.Input(lookupJoin)
493+
.Input(lookupJoin.Cast())
481494
.Lambda()
482495
.Args({"tuple"})
483496
.Body<TExprList>()
@@ -499,7 +512,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
499512

500513
if (rightReadMatch.FlatMap) {
501514
lookupJoin = Build<TCoMap>(ctx, join.Pos())
502-
.Input(lookupJoin)
515+
.Input(lookupJoin.Cast())
503516
.Lambda()
504517
.Args({"tuple"})
505518
.Body<TExprList>()
@@ -520,7 +533,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
520533
}
521534

522535
return Build<TKqlIndexLookupJoin>(ctx, join.Pos())
523-
.Input(lookupJoin)
536+
.Input(lookupJoin.Cast())
524537
.LeftLabel().Build(leftLabel)
525538
.RightLabel().Build(rightLabel)
526539
.JoinType(join.JoinType())
@@ -597,8 +610,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
597610
}
598611

599612
const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
600-
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
601-
&& !indexName;
613+
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin;
602614

603615
auto leftRowArg = Build<TCoArgument>(ctx, join.Pos())
604616
.Name("leftRowArg")
@@ -830,7 +842,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
830842
.Build()
831843
.Done();
832844

833-
return BuildKqpStreamIndexLookupJoin(join, leftInput, *prefixLookup, *rightReadMatch, ctx);
845+
return BuildKqpStreamIndexLookupJoin(join, leftInput, indexName, *prefixLookup, *rightReadMatch, ctx);
834846
}
835847

836848
auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());

ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
237237
.LookupKeys(keys)
238238
.Index(indexName.Cast())
239239
.LookupKeys(keys)
240+
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
240241
.Done();
241242
}
242243
} else {

0 commit comments

Comments
 (0)