Skip to content

Merge to stable-24-3 missing commits for stream join #9566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_yql.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName";

static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;
static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;

struct TKqpReadTableSettings {
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@
{
"Name": "TKqlStreamLookupIndex",
"Base": "TKqlLookupIndexBase",
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"}
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"},
"Children": [
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
]
},
{
"Name": "TKqlEffectBase",
Expand Down
19 changes: 14 additions & 5 deletions ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,12 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) || TKqlStreamLookupTable::Match(node.Get()) ? 4 : 3, ctx)) {
const bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get()) || TKqlStreamLookupIndex::Match(node.Get());
if (isStreamLookup && !EnsureArgsCount(*node, TKqlStreamLookupIndex::Match(node.Get()) ? 5 : 4, ctx)) {
return TStatus::Error;
}

if (!isStreamLookup && !EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -495,14 +500,16 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
YQL_ENSURE(lookupType);

const TStructExprType* structType = nullptr;
bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get());
if (isStreamLookup) {
auto lookupStrategy = node->Child(TKqlStreamLookupTable::idx_LookupStrategy);
auto lookupStrategy = node->Child(TKqlStreamLookupTable::Match(node.Get()) ?
TKqlStreamLookupTable::idx_LookupStrategy : TKqlStreamLookupIndex::idx_LookupStrategy);
if (!EnsureAtom(*lookupStrategy, ctx)) {
return TStatus::Error;
}

if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) {
if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName
|| lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) {

if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;
}
Expand Down Expand Up @@ -1682,7 +1689,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));

} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) {
} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName
|| lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) {

if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
Expand Down
75 changes: 49 additions & 26 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,44 +412,67 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
}

TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!kqpCtx.IsScanQuery()) {
if (!node.Maybe<TKqlStreamLookupIndex>()) {
return node;
}

if (auto maybeStreamLookupIndex = node.Maybe<TKqlStreamLookupIndex>()) {
auto streamLookupIndex = maybeStreamLookupIndex.Cast();
auto streamLookupIndex = node.Maybe<TKqlStreamLookupIndex>().Cast();

const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());

const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
if (!needDataRead) {
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);

TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
if (!needDataRead) {
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Done();
}

return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(streamLookupIndex.Table())
.LookupKeys(lookupIndexTable.Ptr())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);

TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Done();

TMaybeNode<TExprBase> lookupKeys;
YQL_ENSURE(streamLookupIndex.LookupStrategy().Maybe<TCoAtom>());
TString lookupStrategy = streamLookupIndex.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
if (lookupStrategy == TKqpStreamLookupJoinStrategyName || lookupStrategy == TKqpStreamLookupSemiJoinStrategyName) {
// Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>,
// expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>,
// so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>>
lookupKeys = Build<TCoMap>(ctx, node.Pos())
.Input(lookupIndexTable)
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
.Add<TCoNth>()
.Tuple("tuple")
.Index().Value("1").Build()
.Build()
.Add<TCoNth>()
.Tuple("tuple")
.Index().Value("0").Build()
.Build()
.Build()
.Build()
.Done();
} else {
lookupKeys = lookupIndexTable;
}

return node;
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(streamLookupIndex.Table())
.LookupKeys(lookupKeys.Cast())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Done();
}

/// Can push flat map node to read from table using only columns available in table description
Expand Down
46 changes: 31 additions & 15 deletions ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos,
.Columns(columns)
.Index()
.Build(indexName)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

Expand Down Expand Up @@ -336,6 +337,7 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) {
TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
const TDqJoin& join,
TExprBase leftInput,
const TString& indexName,
const TPrefixLookup& rightLookup,
const TKqpMatchReadResult& rightReadMatch,
TExprContext& ctx)
Expand Down Expand Up @@ -395,19 +397,34 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
}
}

TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
.Table(rightLookup.MainTable)
.LookupKeys(leftInput)
.Columns(lookupColumns.Cast())
.LookupStrategy().Build(TKqpStreamLookupJoinStrategyName)
.Done();
auto strategy = join.JoinType().Value() == "LeftSemi"
? TKqpStreamLookupSemiJoinStrategyName
: TKqpStreamLookupJoinStrategyName;

TMaybeNode<TExprBase> lookupJoin;
if (indexName) {
lookupJoin = Build<TKqlStreamLookupIndex>(ctx, join.Pos())
.Table(rightLookup.MainTable)
.LookupKeys(leftInput)
.Columns(lookupColumns.Cast())
.Index().Build(indexName)
.LookupStrategy().Build(strategy)
.Done();
} else {
lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
.Table(rightLookup.MainTable)
.LookupKeys(leftInput)
.Columns(lookupColumns.Cast())
.LookupStrategy().Build(strategy)
.Done();
}

// Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>>
// so we should apply filters to second element of tuple for each row

if (extraRightFilter.IsValid()) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -429,7 +446,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.ExtractMembers) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -451,7 +468,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.FilterNullMembers) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -473,7 +490,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.SkipNullMembers) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -495,7 +512,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.FlatMap) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -516,7 +533,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
}

return Build<TKqlIndexLookupJoin>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.LeftLabel().Build(leftLabel)
.RightLabel().Build(rightLabel)
.JoinType(join.JoinType())
Expand Down Expand Up @@ -597,8 +614,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
}

const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
&& !indexName;
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin;

auto leftRowArg = Build<TCoArgument>(ctx, join.Pos())
.Name("leftRowArg")
Expand Down Expand Up @@ -833,7 +849,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
.Build()
.Done();

return BuildKqpStreamIndexLookupJoin(join, leftInput, *prefixLookup, *rightReadMatch, ctx);
return BuildKqpStreamIndexLookupJoin(join, leftInput, indexName, *prefixLookup, *rightReadMatch, ctx);
}

auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
.LookupKeys(keys)
.Index(indexName.Cast())
.LookupKeys(keys)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}
} else {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view
lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP;
} else if (strategy == "LookupJoinRows"sv) {
lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN;
} else if (strategy == "LookupSemiJoinRows"sv) {
lookupStrategy = NKqpProto::EStreamLookupStrategy::SEMI_JOIN;
}

YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED,
Expand Down Expand Up @@ -1274,7 +1276,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {

break;
}
case NKqpProto::EStreamLookupStrategy::JOIN: {
case NKqpProto::EStreamLookupStrategy::JOIN:
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: {
YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple);
const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
YQL_ENSURE(inputTupleType->GetSize() == 2);
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
, HolderFactory(holderFactory)
, InputDesc(inputDesc)
, TablePath(settings.GetTable().GetPath())
, TableId(MakeTableId(settings.GetTable())) {
, TableId(MakeTableId(settings.GetTable()))
, Strategy(settings.GetLookupStrategy()) {

KeyColumns.reserve(settings.GetKeyColumns().size());
i32 keyOrder = 0;
Expand Down Expand Up @@ -748,6 +749,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());

if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
// Semi join should return one result row per key
continue;
}

TReadResultStats rowStats;
i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId);
Expand Down Expand Up @@ -962,6 +968,7 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq
case NKqpProto::EStreamLookupStrategy::LOOKUP:
return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
case NKqpProto::EStreamLookupStrategy::JOIN:
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN:
return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
default:
return {};
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TKqpStreamLookupWorker {
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
std::vector<TSysTables::TTableColumnInfo> Columns;
const NKqpProto::EStreamLookupStrategy Strategy;
};

std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
Expand Down
Loading
Loading