Skip to content

Merging CBO into stable-24-3-8-analytics #8819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bd7d66a
Enabled CBO (#5780)
pavelvelikhov Jul 8, 2024
b2ab2e5
Added table service flag for CBO (#6493)
pavelvelikhov Jul 10, 2024
dc1eae0
Fixed bugs in CBO statistics calculation (#6537)
pavelvelikhov Jul 11, 2024
37dfe2c
Enabled constant folding by default (#6509)
pavelvelikhov Jul 11, 2024
3246373
Fixed statistics calculation for index read (#6629)
pavelvelikhov Jul 12, 2024
bf51dd2
[CBO] Fix Selectivity (#6645)
pashandor789 Jul 15, 2024
9b7069e
Cached overriden statistics for CBO (#6791)
pavelvelikhov Jul 18, 2024
033c22d
Added parameter selectivity and boosted LookupJoin (#6874)
pavelvelikhov Jul 19, 2024
8551fdb
[CBO] Logging level fixed (#6911)
pashandor789 Jul 22, 2024
b01b3b3
[] Added query action check. (#7017)
pashandor789 Jul 23, 2024
e509c5e
Fixed a problem in RBO in MapJoin implementation (#7044)
pavelvelikhov Jul 26, 2024
7d434fb
Reenabled grace join pragma (#7160)
pavelvelikhov Jul 27, 2024
63105ce
Don't run CBO if statistics is not avaliable (#7089)
pavelvelikhov Jul 28, 2024
6d7ab43
Removed a rule that turned Shuffle into Map breaking some TPCDS queri…
pavelvelikhov Jul 30, 2024
66a2d2d
Switched off GraceSelfJoinCore (#7308)
pavelvelikhov Jul 31, 2024
0761181
Added Query Hints for the Optimizer (#7629)
pavelvelikhov Aug 13, 2024
c0282dd
Cleaned up code that removes aliases from attributes (#7745)
pavelvelikhov Aug 13, 2024
95a0484
[CBO] Make optimizer get only columns stats that are used in request …
pashandor789 Aug 16, 2024
325affb
[CBO] Column shards tests added (#7972)
pashandor789 Aug 19, 2024
74698ad
Fixed a subtle bug in removing aliases (#8003)
pavelvelikhov Aug 19, 2024
2a796d0
Lowered threshold for MapJoin (#8147)
pavelvelikhov Aug 22, 2024
4eb4a9b
[CBO] Join order hints added (#8106)
pashandor789 Aug 26, 2024
f91d492
Added another cost based optimization level (#8348)
pavelvelikhov Aug 28, 2024
b621242
Stopped pushing null as arguments to TCoIf in column shards (#8536)
pavelvelikhov Aug 31, 2024
140c037
[KQP] Make KqpOlapFilter be shown in a query plan (#8504)
pashandor789 Sep 2, 2024
5b755d9
[CBO] Make hints process many trees. (#8554)
pashandor789 Sep 2, 2024
5cf466f
[CBO] Join Order test improvement (#8602)
pashandor789 Sep 5, 2024
3811053
Disabled spilling ut test: requires merging of new spilling code
pavelvelikhov Sep 5, 2024
7f31d1b
Set default optimization level to 2
pavelvelikhov Sep 6, 2024
0966da4
Muted QueryService test
pavelvelikhov Sep 6, 2024
839709d
Fix pushdown logical ops with scalar argument. (#6142)
Tony-Romanov Jul 2, 2024
efee80f
Fix test of exotic XOR. (#6317)
Tony-Romanov Jul 5, 2024
58ecfb4
Updated unit tests
pavelvelikhov Sep 6, 2024
b304730
Cannonized dq_file tests
pavelvelikhov Sep 6, 2024
b5ab6fd
Canonized suite tests
pavelvelikhov Sep 6, 2024
16e7046
Canonized clickbench
pavelvelikhov Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ydb/core/external_sources *
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/join KqpJoinOrder.Chain65Nodes
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
Expand All @@ -28,6 +29,7 @@ ydb/core/kqp/ut/scheme [15/50]*
ydb/core/kqp/ut/scheme [44/50]*
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpUpdate
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [38/50]*
ydb/core/persqueue/ut [37/40] chunk chunk
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
bool hasTrailingMessage = false;

auto& kqpResponse = record.GetResponse();
if (kqpResponse.GetYdbResults().size() > 1) {
if (kqpResponse.GetYdbResults().size() > 1 && QueryAction != NKikimrKqp::QUERY_ACTION_EXPLAIN) {
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
"Unexpected trailing message with multiple result sets.");
ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,8 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.OldLookupJoinBehaviour = serviceConfig.GetOldLookupJoinBehaviour();
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto mkqlHeavyLimit = TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit();

bool enableQueryServiceSpilling = TableServiceConfig.GetEnableQueryServiceSpilling();
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();

TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
Expand Down Expand Up @@ -561,7 +563,9 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes) {
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||
TableServiceConfig.GetEnableConstantFolding() != enableConstantFolding) {

QueryCache.Clear();

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
auto s = resp.Simple;
result.Metadata->RecordsCount = s.RowCount;
result.Metadata->DataSize = s.BytesSize;
result.Metadata->StatsLoaded = true;
result.Metadata->StatsLoaded = response.Success;
promise.SetValue(result);
});

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ class TKqpHost : public IKqpHost {

void Init(EKikimrQueryType queryType) {
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry, ActorSystem);

ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class IKqpRunner : public TThrRefBase {

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TActorSystem* actorSystem);

TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/kqp/opt/kqp_opt.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_log.h>
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
#include <ydb/core/kqp/opt/kqp_column_statistics_requester.h>
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>

Expand Down Expand Up @@ -137,7 +138,8 @@ class TKqpRunner : public IKqpRunner {
public:
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry,
TActorSystem* actorSystem)
: Gateway(gateway)
, Cluster(cluster)
, TypesCtx(*typesCtx)
Expand All @@ -148,7 +150,8 @@ class TKqpRunner : public IKqpRunner {
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr(), sessionCtx->GetUserRequestContext()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel)))
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel)))
, ActorSystem(actorSystem)
{
CreateGraphTransformer(typesCtx, sessionCtx, funcRegistry);
}
Expand Down Expand Up @@ -297,6 +300,7 @@ class TKqpRunner : public IKqpRunner {
.AddPostTypeAnnotation(/* forSubgraph */ true)
.AddCommonOptimization()
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
.Add(CreateKqpColumnStatisticsRequester(Config, *typesCtx, SessionCtx->Tables(), Cluster, ActorSystem), "ColumnStatisticsRequester")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
Expand Down Expand Up @@ -324,7 +328,7 @@ class TKqpRunner : public IKqpRunner {
Config),
"BuildPhysicalTxs")
.Build(false));

auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
Expand Down Expand Up @@ -399,15 +403,17 @@ class TKqpRunner : public IKqpRunner {
TKqpProviderContext Pctx;

TAutoPtr<IGraphTransformer> Transformer;

TActorSystem* ActorSystem;
};

} // namespace

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TActorSystem* actorSystem)
{
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry, actorSystem);
}

} // namespace NKqp
Expand Down
253 changes: 253 additions & 0 deletions ydb/core/kqp/opt/kqp_column_statistics_requester.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
#include "kqp_column_statistics_requester.h"

#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/core/statistics/service/service.h>
#include <ydb/core/statistics/events.h>
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
#include <ydb/library/yql/core/yql_statistics.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
#include <ydb/library/yql/utils/log/log.h>

namespace NKikimr::NKqp {

using namespace NThreading;
using namespace NYql;

void TKqpColumnStatisticsRequester::PropagateTableToLambdaArgument(const TExprNode::TPtr& input) {
if (input->ChildrenSize() < 2) {
return;
}

auto callableInput = input->ChildRef(0);


for (size_t i = 1; i < input->ChildrenSize(); ++i) {
auto maybeLambda = TExprBase(input->ChildRef(i));
if (!maybeLambda.Maybe<TCoLambda>()) {
continue;
}

auto lambda = maybeLambda.Cast<TCoLambda>();
if (!lambda.Args().Size()){
continue;
}

if (callableInput->IsList()){
for (size_t j = 0; j < callableInput->ChildrenSize(); ++j){
KqpTableByExprNode[lambda.Args().Arg(j).Ptr()] = KqpTableByExprNode[callableInput->Child(j)];
}
} else {
KqpTableByExprNode[lambda.Args().Arg(0).Ptr()] = KqpTableByExprNode[callableInput.Get()];
}
}
}

IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
Y_UNUSED(ctx);

output = input;
auto optLvl = Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel);
auto enableColumnStats = Config->FeatureFlags.GetEnableColumnStatistics();
if (!(optLvl > 0 && enableColumnStats)) {
return IGraphTransformer::TStatus::Ok;
}

VisitExprLambdasLast(
input,
[&](const TExprNode::TPtr& input) {
BeforeLambdas(input) || BeforeLambdasUnmatched(input);

if (input->IsCallable()) {
PropagateTableToLambdaArgument(input);
}

return true;
},
[&](const TExprNode::TPtr& input) {
return AfterLambdas(input) || AfterLambdasUnmatched(input);
}
);

if (ColumnsByTableName.empty()) {
return IGraphTransformer::TStatus::Ok;
}

struct TTableMeta {
TString TableName;
THashMap<ui32, TString> ColumnNameByTag;
};
THashMap<TPathId, TTableMeta> tableMetaByPathId;

// TODO: Add other statistics, not only COUNT_MIN_SKETCH.
auto getStatisticsRequest = MakeHolder<NStat::TEvStatistics::TEvGetStatistics>();
getStatisticsRequest->StatType = NKikimr::NStat::EStatType::COUNT_MIN_SKETCH;

for (const auto& [table, columns]: ColumnsByTableName) {
auto tableMeta = Tables.GetTable(Cluster, table).Metadata;
auto& columnsMeta = tableMeta->Columns;

auto pathId = TPathId(tableMeta->PathId.OwnerId(), tableMeta->PathId.TableId());
for (const auto& column: columns) {
if (TypesCtx.ColumnStatisticsByTableName.contains(table) && TypesCtx.ColumnStatisticsByTableName[table]->Data.contains(column)) {
continue;
}

if (!columns.contains(column)) {
YQL_CLOG(DEBUG, ProviderKikimr) << "Table: " + table + " doesn't contain " + column + " to request for column statistics";
}

NKikimr::NStat::TRequest req;
req.ColumnTag = columnsMeta[column].Id;
req.PathId = pathId;
getStatisticsRequest->StatRequests.push_back(req);

tableMetaByPathId[pathId].TableName = table;
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
}
}

if (getStatisticsRequest->StatRequests.empty()) {
return IGraphTransformer::TStatus::Ok;
}

using TRequest = NStat::TEvStatistics::TEvGetStatistics;
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
struct TResult : public NYql::IKikimrGateway::TGenericResult {
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
};

auto promise = NewPromise<TResult>();
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
if (!response.Success) {
promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!"));
}

THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;

for (auto&& stat: response.StatResponses) {
auto meta = tableMetaByPathId[stat.Req.PathId];
auto columnName = meta.ColumnNameByTag[stat.Req.ColumnTag.value()];
auto& columnStatistics = columnStatisticsByTableName[meta.TableName].Data[columnName];
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
}

promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
};
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
IActor* requestHandler =
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
auto actorId = ActorSystem
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
Y_UNUSED(actorId);

auto res = promise.GetFuture().GetValueSync();
if (!res.Issues().Empty()) {
TStringStream ss;
res.Issues().PrintTo(ss);
YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
return IGraphTransformer::TStatus::Ok;
}

for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) {
TypesCtx.ColumnStatisticsByTableName.insert(
{std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))}
);
}

return IGraphTransformer::TStatus::Ok;
}

bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) {
bool matched = true;

if (TKqpTable::Match(input.Get())) {
KqpTableByExprNode[input.Get()] = input.Get();
} else if (auto maybeStreamLookup = TExprBase(input).Maybe<TKqpCnStreamLookup>()) {
KqpTableByExprNode[input.Get()] = maybeStreamLookup.Cast().Table().Ptr();
} else {
matched = false;
}

return matched;
}

bool TKqpColumnStatisticsRequester::BeforeLambdasUnmatched(const TExprNode::TPtr& input) {
for (const auto& node: input->Children()) {
if (KqpTableByExprNode.contains(node)) {
KqpTableByExprNode[input.Get()] = KqpTableByExprNode[node];
return true;
}
}

return true;
}

bool TKqpColumnStatisticsRequester::AfterLambdas(const TExprNode::TPtr& input) {
bool matched = true;

if (
TCoFilterBase::Match(input.Get()) ||
TCoFlatMapBase::Match(input.Get()) && IsPredicateFlatMap(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body().Ref())
) {
std::shared_ptr<TOptimizerStatistics> dummyStats = nullptr;
auto computer = NDq::TPredicateSelectivityComputer(dummyStats, true);

if (TCoFilterBase::Match(input.Get())) {
computer.Compute(TExprBase(input).Cast<TCoFilterBase>().Lambda().Body());
} else if (TCoFlatMapBase::Match(input.Get())) {
computer.Compute(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body());
} else {
Y_ENSURE(false);
}

auto columnStatsUsedMembers = computer.GetColumnStatsUsedMembers();
for (const auto& item: columnStatsUsedMembers.Data) {
auto exprNode = TExprBase(item.Member).Ptr();
if (!KqpTableByExprNode.contains(exprNode) || KqpTableByExprNode[exprNode] == nullptr) {
continue;
}

auto table = TExprBase(KqpTableByExprNode[exprNode]).Cast<TKqpTable>().Path().StringValue();
auto column = item.Member.Name().StringValue();
size_t pointPos = column.find('.'); // table.column
if (pointPos != TString::npos) {
column = column.substr(pointPos + 1);
}

ColumnsByTableName[table].insert(std::move(column));
}
} else {
matched = false;
}

return matched;
}

bool TKqpColumnStatisticsRequester::AfterLambdasUnmatched(const TExprNode::TPtr& input) {
if (KqpTableByExprNode.contains(input.Get())) {
return true;
}

for (const auto& node: input->Children()) {
if (KqpTableByExprNode.contains(node)) {
KqpTableByExprNode[input.Get()] = KqpTableByExprNode[node];
return true;
}
}

return true;
}

TAutoPtr<IGraphTransformer> CreateKqpColumnStatisticsRequester(
const TKikimrConfiguration::TPtr& config,
TTypeAnnotationContext& typesCtx,
TKikimrTablesData& tables,
TString cluster,
TActorSystem* actorSystem
) {
return THolder<IGraphTransformer>(new TKqpColumnStatisticsRequester(config, typesCtx, tables, cluster, actorSystem));
}

} // end of NKikimr::NKqp
Loading
Loading