Skip to content

Commit b11d10a

Browse files
Merge 8d336d5 into f2ccd9c
2 parents f2ccd9c + 8d336d5 commit b11d10a

28 files changed

+872
-125
lines changed

ydb/core/kqp/host/kqp_runner.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/kqp/opt/logical/kqp_opt_log.h>
77
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
88
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
9+
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
910

1011

1112
#include <ydb/core/kqp/opt/physical/kqp_opt_phy.h>
@@ -143,6 +144,7 @@ class TKqpRunner : public IKqpRunner {
143144
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
144145
sessionCtx->TablesPtr()))
145146
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
147+
, Pctx(TKqpProviderContext(*OptimizeCtx))
146148
{
147149
CreateGraphTransformer(typesCtx, sessionCtx, funcRegistry);
148150
}
@@ -259,7 +261,7 @@ class TKqpRunner : public IKqpRunner {
259261
.AddPostTypeAnnotation(/* forSubgraph */ true)
260262
.AddCommonOptimization()
261263
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
262-
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config), "Statistics")
264+
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
263265
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
264266
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
265267
.Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "KqpPhysicalOptimize")
@@ -293,7 +295,7 @@ class TKqpRunner : public IKqpRunner {
293295
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
294296
.AddPostTypeAnnotation()
295297
.Add(CreateKqpBuildPhysicalQueryTransformer(OptimizeCtx, BuildQueryCtx), "BuildPhysicalQuery")
296-
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config), "Statistics")
298+
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
297299
.Build(false);
298300

299301
auto physicalPeepholeTransformer = TTransformationPipeline(typesCtx)
@@ -355,6 +357,8 @@ class TKqpRunner : public IKqpRunner {
355357
TIntrusivePtr<TKqpOptimizeContext> OptimizeCtx;
356358
TIntrusivePtr<TKqpBuildQueryContext> BuildQueryCtx;
357359

360+
TKqpProviderContext Pctx;
361+
358362
TAutoPtr<IGraphTransformer> Transformer;
359363
};
360364

ydb/core/kqp/opt/kqp_query_plan.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
1515
#include <ydb/library/yql/dq/tasks/dq_tasks_graph.h>
1616
#include <ydb/library/yql/utils/plan/plan_utils.h>
17+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
1718

1819
#include <library/cpp/json/writer/json.h>
1920
#include <library/cpp/json/json_reader.h>
@@ -1357,7 +1358,7 @@ class TxPlanSerializer {
13571358
}
13581359

13591360
void AddOptimizerEstimates(TOperator& op, const TExprBase& expr) {
1360-
if (!SerializerCtx.Config->HasOptEnableCostBasedOptimization()) {
1361+
if (SerializerCtx.Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel)==0) {
13611362
return;
13621363
}
13631364

ydb/core/kqp/opt/kqp_statistics_transformer.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
44
#include <ydb/library/yql/core/yql_cost_function.h>
55

6+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
7+
8+
69
#include <charconv>
710

811
using namespace NYql;
@@ -187,7 +190,7 @@ IGraphTransformer::TStatus TKqpStatisticsTransformer::DoTransform(TExprNode::TPt
187190
TExprNode::TPtr& output, TExprContext& ctx) {
188191

189192
output = input;
190-
if (!Config->HasOptEnableCostBasedOptimization()) {
193+
if (Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel) == 0) {
191194
return IGraphTransformer::TStatus::Ok;
192195
}
193196

@@ -238,6 +241,6 @@ bool TKqpStatisticsTransformer::AfterLambdasSpecific(const TExprNode::TPtr& inpu
238241
}
239242

240243
TAutoPtr<IGraphTransformer> NKikimr::NKqp::CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
241-
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) {
242-
return THolder<IGraphTransformer>(new TKqpStatisticsTransformer(kqpCtx, typeCtx, config));
244+
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx) {
245+
return THolder<IGraphTransformer>(new TKqpStatisticsTransformer(kqpCtx, typeCtx, config, pctx));
243246
}

ydb/core/kqp/opt/kqp_statistics_transformer.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/yql/core/yql_statistics.h>
66

77
#include <ydb/core/kqp/common/kqp_yql.h>
8+
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
89
#include <ydb/library/yql/core/yql_graph_transformer.h>
910
#include <ydb/library/yql/core/yql_expr_optimize.h>
1011
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
@@ -33,8 +34,8 @@ class TKqpStatisticsTransformer : public NYql::NDq::TDqStatisticsTransformerBase
3334

3435
public:
3536
TKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx,
36-
const TKikimrConfiguration::TPtr& config) :
37-
TDqStatisticsTransformerBase(&typeCtx),
37+
const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx) :
38+
TDqStatisticsTransformerBase(&typeCtx, pctx),
3839
Config(config),
3940
KqpCtx(*kqpCtx) {}
4041

@@ -47,6 +48,6 @@ class TKqpStatisticsTransformer : public NYql::NDq::TDqStatisticsTransformerBase
4748
};
4849

4950
TAutoPtr<IGraphTransformer> CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
50-
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config);
51+
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx);
5152
}
5253
}
+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
#include "kqp_opt_cbo.h"
2+
#include "kqp_opt_log_impl.h"
3+
4+
#include <ydb/library/yql/core/yql_opt_utils.h>
5+
#include <ydb/library/yql/utils/log/log.h>
6+
7+
8+
namespace NKikimr::NKqp::NOpt {
9+
10+
using namespace NYql;
11+
using namespace NYql::NCommon;
12+
using namespace NYql::NDq;
13+
using namespace NYql::NNodes;
14+
15+
namespace {
16+
17+
/**
18+
* KQP specific rule to check if a LookupJoin is applicable
19+
*/
20+
bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNode>& node, const TVector<TString>& joinColumns, const TKqpProviderContext& ctx) {
21+
22+
auto rel = std::static_pointer_cast<TKqpRelOptimizerNode>(node);
23+
auto expr = TExprBase(rel->Node);
24+
25+
if (ctx.KqpCtx.IsScanQuery() && !ctx.KqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin) {
26+
return false;
27+
}
28+
29+
if (find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) { return node->Stats->KeyColumns[0] == s;})) {
30+
return true;
31+
}
32+
33+
auto readMatch = MatchRead<TKqlReadTable>(expr);
34+
TMaybeNode<TKqlKeyInc> maybeTablePrefix;
35+
size_t prefixSize;
36+
37+
if (readMatch) {
38+
if (readMatch->FlatMap && !IsPassthroughFlatMap(readMatch->FlatMap.Cast(), nullptr)){
39+
return false;
40+
}
41+
auto read = readMatch->Read.Cast<TKqlReadTable>();
42+
maybeTablePrefix = GetRightTableKeyPrefix(read.Range());
43+
44+
if (!maybeTablePrefix) {
45+
return false;
46+
}
47+
48+
prefixSize = maybeTablePrefix.Cast().ArgCount();
49+
50+
if (!prefixSize) {
51+
return true;
52+
}
53+
}
54+
else {
55+
readMatch = MatchRead<TKqlReadTableRangesBase>(expr);
56+
if (readMatch) {
57+
if (readMatch->FlatMap && !IsPassthroughFlatMap(readMatch->FlatMap.Cast(), nullptr)){
58+
return false;
59+
}
60+
auto read = readMatch->Read.Cast<TKqlReadTableRangesBase>();
61+
if (TCoVoid::Match(read.Ranges().Raw())) {
62+
return true;
63+
} else {
64+
auto prompt = TKqpReadTableExplainPrompt::Parse(read);
65+
66+
if (prompt.PointPrefixLen != prompt.UsedKeyColumns.size()) {
67+
return false;
68+
}
69+
70+
if (prompt.ExpectedMaxRanges != TMaybe<ui64>(1)) {
71+
return false;
72+
}
73+
prefixSize = prompt.PointPrefixLen;
74+
}
75+
}
76+
}
77+
if (! readMatch) {
78+
return false;
79+
}
80+
81+
if (prefixSize < node->Stats->KeyColumns.size() && !(find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) {
82+
return node->Stats->KeyColumns[prefixSize] == s;
83+
}))){
84+
return false;
85+
}
86+
87+
return true;
88+
}
89+
90+
bool IsLookupJoinApplicable(std::shared_ptr<IBaseOptimizerNode> left,
91+
std::shared_ptr<IBaseOptimizerNode> right,
92+
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
93+
TKqpProviderContext& ctx) {
94+
95+
Y_UNUSED(left);
96+
97+
auto rightStats = right->Stats;
98+
99+
if (rightStats->Type != EStatisticsType::BaseTable) {
100+
return false;
101+
}
102+
if (joinConditions.size() > rightStats->KeyColumns.size()) {
103+
return false;
104+
}
105+
106+
for (auto [leftCol, rightCol] : joinConditions) {
107+
if (! find_if(rightStats->KeyColumns.begin(), rightStats->KeyColumns.end(),
108+
[rightCol] (const TString& s) {
109+
return rightCol.AttributeName == s;
110+
} )) {
111+
return false;
112+
}
113+
}
114+
115+
TVector<TString> joinKeys;
116+
for( auto [leftJc, rightJc] : joinConditions ) {
117+
joinKeys.emplace_back( rightJc.AttributeName);
118+
}
119+
120+
return IsLookupJoinApplicableDetailed(std::static_pointer_cast<TRelOptimizerNode>(right), joinKeys, ctx);
121+
}
122+
123+
}
124+
125+
bool TKqpProviderContext::IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
126+
const std::shared_ptr<IBaseOptimizerNode>& right,
127+
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
128+
EJoinImplType joinImpl) {
129+
130+
switch( joinImpl ) {
131+
case EJoinImplType::LookupJoin:
132+
return IsLookupJoinApplicable(left, right, joinConditions, *this);
133+
default:
134+
return true;
135+
}
136+
}
137+
138+
double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinImplType joinImpl) const {
139+
140+
switch(joinImpl) {
141+
case EJoinImplType::LookupJoin:
142+
return -1;
143+
default:
144+
return leftStats.Nrows + 2.0 * rightStats.Nrows;
145+
}
146+
}
147+
148+
149+
}
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#pragma once
2+
3+
#include <ydb/library/yql/ast/yql_expr.h>
4+
#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
5+
6+
#include <ydb/core/kqp/opt/kqp_opt.h>
7+
8+
namespace NKikimr::NKqp::NOpt {
9+
10+
struct TKqpRelOptimizerNode : public NYql::TRelOptimizerNode {
11+
const NYql::TExprNode::TPtr Node;
12+
13+
TKqpRelOptimizerNode(TString label, std::shared_ptr<NYql::TOptimizerStatistics> stats, const NYql::TExprNode::TPtr node) :
14+
TRelOptimizerNode(label, stats), Node(node) { }
15+
};
16+
17+
struct TKqpProviderContext : public NYql::IProviderContext {
18+
TKqpProviderContext(const TKqpOptimizeContext& kqpCtx) : KqpCtx(kqpCtx) {}
19+
20+
virtual bool IsJoinApplicable(const std::shared_ptr<NYql::IBaseOptimizerNode>& left,
21+
const std::shared_ptr<NYql::IBaseOptimizerNode>& right,
22+
const std::set<std::pair<NYql::NDq::TJoinColumn, NYql::NDq::TJoinColumn>>& joinConditions,
23+
NYql::EJoinImplType joinImpl);
24+
25+
virtual double ComputeJoinCost(const NYql::TOptimizerStatistics& leftStats, const NYql::TOptimizerStatistics& rightStats, NYql::EJoinImplType joinImpl) const;
26+
27+
const TKqpOptimizeContext& KqpCtx;
28+
};
29+
30+
}

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "kqp_opt_log_rules.h"
2+
#include "kqp_opt_cbo.h"
23

34
#include <ydb/core/kqp/common/kqp_yql.h>
45
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
@@ -134,7 +135,11 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
134135

135136
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
136137
auto maxDPccpDPTableSize = Config->MaxDPccpDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPccpDPTableSize);
137-
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->HasOptEnableCostBasedOptimization(), maxDPccpDPTableSize);
138+
TKqpProviderContext providerContext(KqpCtx);
139+
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel),
140+
maxDPccpDPTableSize, providerContext, [](auto& rels, auto label, auto node, auto stat) {
141+
rels.emplace_back(std::make_shared<TKqpRelOptimizerNode>(TString(label), stat, node));
142+
});
138143
DumpAppliedRule("OptimizeEquiJoinWithCosts", node.Ptr(), output.Ptr(), ctx);
139144
return output;
140145
}

ydb/core/kqp/opt/logical/kqp_opt_log_impl.h

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ TMaybe<TKqpMatchReadResult> MatchRead(NYql::NNodes::TExprBase node) {
2222
return MatchRead(node, [] (NYql::NNodes::TExprBase node) { return node.Maybe<TRead>().IsValid(); });
2323
}
2424

25+
NYql::NNodes::TMaybeNode<NYql::NNodes::TKqlKeyInc> GetRightTableKeyPrefix(const NYql::NNodes::TKqlKeyRange& range);
26+
2527
} // NKikimr::NKqp::NOpt
2628

2729

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

+19-19
Original file line numberDiff line numberDiff line change
@@ -167,25 +167,6 @@ TDqJoin FlipLeftSemiJoin(const TDqJoin& join, TExprContext& ctx) {
167167
.Done();
168168
}
169169

170-
TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
171-
if (!range.From().Maybe<TKqlKeyInc>() || !range.To().Maybe<TKqlKeyInc>()) {
172-
return {};
173-
}
174-
auto rangeFrom = range.From().Cast<TKqlKeyInc>();
175-
auto rangeTo = range.To().Cast<TKqlKeyInc>();
176-
177-
if (rangeFrom.ArgCount() != rangeTo.ArgCount()) {
178-
return {};
179-
}
180-
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
181-
if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) {
182-
return {};
183-
}
184-
}
185-
186-
return rangeFrom;
187-
}
188-
189170
TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos,
190171
const TKqpTable& table, const TCoAtomList& columns,
191172
const TExprBase& keysToLookup, const TVector<TCoAtom>& skipNullColumns, const TString& indexName,
@@ -859,6 +840,25 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
859840

860841
} // anonymous namespace
861842

843+
TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
844+
if (!range.From().Maybe<TKqlKeyInc>() || !range.To().Maybe<TKqlKeyInc>()) {
845+
return {};
846+
}
847+
auto rangeFrom = range.From().Cast<TKqlKeyInc>();
848+
auto rangeTo = range.To().Cast<TKqlKeyInc>();
849+
850+
if (rangeFrom.ArgCount() != rangeTo.ArgCount()) {
851+
return {};
852+
}
853+
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
854+
if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) {
855+
return {};
856+
}
857+
}
858+
859+
return rangeFrom;
860+
}
861+
862862
TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx)
863863
{
864864
if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin) || !node.Maybe<TDqJoin>()) {

ydb/core/kqp/opt/logical/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ SRCS(
1212
kqp_opt_log_sqlin.cpp
1313
kqp_opt_log_sqlin_compact.cpp
1414
kqp_opt_log.cpp
15+
kqp_opt_cbo.cpp
1516
)
1617

1718
PEERDIR(

0 commit comments

Comments
 (0)