Skip to content

Commit 45f8e08

Browse files
Merge to stable-24-3 missing commits for stream join (#9566)
1 parent 1283f9e commit 45f8e08

12 files changed

+335
-132
lines changed

ydb/core/kqp/common/kqp_yql.h

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName";
4848

4949
static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
5050
static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;
51+
static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;
5152

5253
struct TKqpReadTableSettings {
5354
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,10 @@
209209
{
210210
"Name": "TKqlStreamLookupIndex",
211211
"Base": "TKqlLookupIndexBase",
212-
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"}
212+
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"},
213+
"Children": [
214+
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
215+
]
213216
},
214217
{
215218
"Name": "TKqlEffectBase",

ydb/core/kqp/host/kqp_type_ann.cpp

+14-5
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,12 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
451451
TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
452452
const TKikimrTablesData& tablesData, bool withSystemColumns)
453453
{
454-
if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) || TKqlStreamLookupTable::Match(node.Get()) ? 4 : 3, ctx)) {
454+
const bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get()) || TKqlStreamLookupIndex::Match(node.Get());
455+
if (isStreamLookup && !EnsureArgsCount(*node, TKqlStreamLookupIndex::Match(node.Get()) ? 5 : 4, ctx)) {
456+
return TStatus::Error;
457+
}
458+
459+
if (!isStreamLookup && !EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
455460
return TStatus::Error;
456461
}
457462

@@ -495,14 +500,16 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
495500
YQL_ENSURE(lookupType);
496501

497502
const TStructExprType* structType = nullptr;
498-
bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get());
499503
if (isStreamLookup) {
500-
auto lookupStrategy = node->Child(TKqlStreamLookupTable::idx_LookupStrategy);
504+
auto lookupStrategy = node->Child(TKqlStreamLookupTable::Match(node.Get()) ?
505+
TKqlStreamLookupTable::idx_LookupStrategy : TKqlStreamLookupIndex::idx_LookupStrategy);
501506
if (!EnsureAtom(*lookupStrategy, ctx)) {
502507
return TStatus::Error;
503508
}
504509

505-
if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) {
510+
if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName
511+
|| lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) {
512+
506513
if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
507514
return TStatus::Error;
508515
}
@@ -1682,7 +1689,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
16821689

16831690
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
16841691

1685-
} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) {
1692+
} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName
1693+
|| lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) {
1694+
16861695
if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
16871696
return TStatus::Error;
16881697
}

ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp

+49-26
Original file line numberDiff line numberDiff line change
@@ -412,44 +412,67 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
412412
}
413413

414414
TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
415-
if (!kqpCtx.IsScanQuery()) {
415+
if (!node.Maybe<TKqlStreamLookupIndex>()) {
416416
return node;
417417
}
418418

419-
if (auto maybeStreamLookupIndex = node.Maybe<TKqlStreamLookupIndex>()) {
420-
auto streamLookupIndex = maybeStreamLookupIndex.Cast();
419+
auto streamLookupIndex = node.Maybe<TKqlStreamLookupIndex>().Cast();
421420

422-
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
423-
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
421+
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
422+
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
424423

425-
const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
426-
if (!needDataRead) {
427-
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
428-
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
429-
.LookupKeys(streamLookupIndex.LookupKeys())
430-
.Columns(streamLookupIndex.Columns())
431-
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
432-
.Done();
433-
}
434-
435-
auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);
436-
437-
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
424+
const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
425+
if (!needDataRead) {
426+
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
438427
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
439428
.LookupKeys(streamLookupIndex.LookupKeys())
440-
.Columns(keyColumnsList)
441-
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
429+
.Columns(streamLookupIndex.Columns())
430+
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
442431
.Done();
432+
}
443433

444-
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
445-
.Table(streamLookupIndex.Table())
446-
.LookupKeys(lookupIndexTable.Ptr())
447-
.Columns(streamLookupIndex.Columns())
448-
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
434+
auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);
435+
436+
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
437+
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
438+
.LookupKeys(streamLookupIndex.LookupKeys())
439+
.Columns(keyColumnsList)
440+
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
441+
.Done();
442+
443+
TMaybeNode<TExprBase> lookupKeys;
444+
YQL_ENSURE(streamLookupIndex.LookupStrategy().Maybe<TCoAtom>());
445+
TString lookupStrategy = streamLookupIndex.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
446+
if (lookupStrategy == TKqpStreamLookupJoinStrategyName || lookupStrategy == TKqpStreamLookupSemiJoinStrategyName) {
447+
// Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>,
448+
// expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>,
449+
// so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>>
450+
lookupKeys = Build<TCoMap>(ctx, node.Pos())
451+
.Input(lookupIndexTable)
452+
.Lambda()
453+
.Args({"tuple"})
454+
.Body<TExprList>()
455+
.Add<TCoNth>()
456+
.Tuple("tuple")
457+
.Index().Value("1").Build()
458+
.Build()
459+
.Add<TCoNth>()
460+
.Tuple("tuple")
461+
.Index().Value("0").Build()
462+
.Build()
463+
.Build()
464+
.Build()
449465
.Done();
466+
} else {
467+
lookupKeys = lookupIndexTable;
450468
}
451469

452-
return node;
470+
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
471+
.Table(streamLookupIndex.Table())
472+
.LookupKeys(lookupKeys.Cast())
473+
.Columns(streamLookupIndex.Columns())
474+
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
475+
.Done();
453476
}
454477

455478
/// Can push flat map node to read from table using only columns available in table description

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

+31-15
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos,
198198
.Columns(columns)
199199
.Index()
200200
.Build(indexName)
201+
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
201202
.Done();
202203
}
203204

@@ -336,6 +337,7 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) {
336337
TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
337338
const TDqJoin& join,
338339
TExprBase leftInput,
340+
const TString& indexName,
339341
const TPrefixLookup& rightLookup,
340342
const TKqpMatchReadResult& rightReadMatch,
341343
TExprContext& ctx)
@@ -395,19 +397,34 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
395397
}
396398
}
397399

398-
TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
399-
.Table(rightLookup.MainTable)
400-
.LookupKeys(leftInput)
401-
.Columns(lookupColumns.Cast())
402-
.LookupStrategy().Build(TKqpStreamLookupJoinStrategyName)
403-
.Done();
400+
auto strategy = join.JoinType().Value() == "LeftSemi"
401+
? TKqpStreamLookupSemiJoinStrategyName
402+
: TKqpStreamLookupJoinStrategyName;
403+
404+
TMaybeNode<TExprBase> lookupJoin;
405+
if (indexName) {
406+
lookupJoin = Build<TKqlStreamLookupIndex>(ctx, join.Pos())
407+
.Table(rightLookup.MainTable)
408+
.LookupKeys(leftInput)
409+
.Columns(lookupColumns.Cast())
410+
.Index().Build(indexName)
411+
.LookupStrategy().Build(strategy)
412+
.Done();
413+
} else {
414+
lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
415+
.Table(rightLookup.MainTable)
416+
.LookupKeys(leftInput)
417+
.Columns(lookupColumns.Cast())
418+
.LookupStrategy().Build(strategy)
419+
.Done();
420+
}
404421

405422
// Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>>
406423
// so we should apply filters to second element of tuple for each row
407424

408425
if (extraRightFilter.IsValid()) {
409426
lookupJoin = Build<TCoMap>(ctx, join.Pos())
410-
.Input(lookupJoin)
427+
.Input(lookupJoin.Cast())
411428
.Lambda()
412429
.Args({"tuple"})
413430
.Body<TExprList>()
@@ -429,7 +446,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
429446

430447
if (rightReadMatch.ExtractMembers) {
431448
lookupJoin = Build<TCoMap>(ctx, join.Pos())
432-
.Input(lookupJoin)
449+
.Input(lookupJoin.Cast())
433450
.Lambda()
434451
.Args({"tuple"})
435452
.Body<TExprList>()
@@ -451,7 +468,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
451468

452469
if (rightReadMatch.FilterNullMembers) {
453470
lookupJoin = Build<TCoMap>(ctx, join.Pos())
454-
.Input(lookupJoin)
471+
.Input(lookupJoin.Cast())
455472
.Lambda()
456473
.Args({"tuple"})
457474
.Body<TExprList>()
@@ -473,7 +490,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
473490

474491
if (rightReadMatch.SkipNullMembers) {
475492
lookupJoin = Build<TCoMap>(ctx, join.Pos())
476-
.Input(lookupJoin)
493+
.Input(lookupJoin.Cast())
477494
.Lambda()
478495
.Args({"tuple"})
479496
.Body<TExprList>()
@@ -495,7 +512,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
495512

496513
if (rightReadMatch.FlatMap) {
497514
lookupJoin = Build<TCoMap>(ctx, join.Pos())
498-
.Input(lookupJoin)
515+
.Input(lookupJoin.Cast())
499516
.Lambda()
500517
.Args({"tuple"})
501518
.Body<TExprList>()
@@ -516,7 +533,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
516533
}
517534

518535
return Build<TKqlIndexLookupJoin>(ctx, join.Pos())
519-
.Input(lookupJoin)
536+
.Input(lookupJoin.Cast())
520537
.LeftLabel().Build(leftLabel)
521538
.RightLabel().Build(rightLabel)
522539
.JoinType(join.JoinType())
@@ -597,8 +614,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
597614
}
598615

599616
const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
600-
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
601-
&& !indexName;
617+
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin;
602618

603619
auto leftRowArg = Build<TCoArgument>(ctx, join.Pos())
604620
.Name("leftRowArg")
@@ -833,7 +849,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
833849
.Build()
834850
.Done();
835851

836-
return BuildKqpStreamIndexLookupJoin(join, leftInput, *prefixLookup, *rightReadMatch, ctx);
852+
return BuildKqpStreamIndexLookupJoin(join, leftInput, indexName, *prefixLookup, *rightReadMatch, ctx);
837853
}
838854

839855
auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());

ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
387387
.LookupKeys(keys)
388388
.Index(indexName.Cast())
389389
.LookupKeys(keys)
390+
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
390391
.Done();
391392
}
392393
} else {

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view
9191
lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP;
9292
} else if (strategy == "LookupJoinRows"sv) {
9393
lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN;
94+
} else if (strategy == "LookupSemiJoinRows"sv) {
95+
lookupStrategy = NKqpProto::EStreamLookupStrategy::SEMI_JOIN;
9496
}
9597

9698
YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED,
@@ -1274,7 +1276,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
12741276

12751277
break;
12761278
}
1277-
case NKqpProto::EStreamLookupStrategy::JOIN: {
1279+
case NKqpProto::EStreamLookupStrategy::JOIN:
1280+
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: {
12781281
YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple);
12791282
const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
12801283
YQL_ENSURE(inputTupleType->GetSize() == 2);

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
142142
, HolderFactory(holderFactory)
143143
, InputDesc(inputDesc)
144144
, TablePath(settings.GetTable().GetPath())
145-
, TableId(MakeTableId(settings.GetTable())) {
145+
, TableId(MakeTableId(settings.GetTable()))
146+
, Strategy(settings.GetLookupStrategy()) {
146147

147148
KeyColumns.reserve(settings.GetKeyColumns().size());
148149
i32 keyOrder = 0;
@@ -748,6 +749,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
748749
auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
749750
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
750751

752+
if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
753+
// Semi join should return one result row per key
754+
continue;
755+
}
756+
751757
TReadResultStats rowStats;
752758
i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
753759
auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId);
@@ -962,6 +968,7 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq
962968
case NKqpProto::EStreamLookupStrategy::LOOKUP:
963969
return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
964970
case NKqpProto::EStreamLookupStrategy::JOIN:
971+
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN:
965972
return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
966973
default:
967974
return {};

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class TKqpStreamLookupWorker {
7171
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
7272
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
7373
std::vector<TSysTables::TTableColumnInfo> Columns;
74+
const NKqpProto::EStreamLookupStrategy Strategy;
7475
};
7576

7677
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,

0 commit comments

Comments
 (0)