|
| 1 | +#include "kqp_column_statistics_requester.h" |
| 2 | + |
| 3 | +#include <ydb/library/yql/core/yql_expr_optimize.h> |
| 4 | +#include <ydb/core/statistics/service/service.h> |
| 5 | +#include <ydb/core/statistics/events.h> |
| 6 | +#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h> |
| 7 | +#include <ydb/library/yql/core/yql_statistics.h> |
| 8 | +#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> |
| 9 | +#include <ydb/library/yql/dq/opt/dq_opt_stat.h> |
| 10 | +#include <ydb/library/yql/utils/log/log.h> |
| 11 | + |
| 12 | +namespace NKikimr::NKqp { |
| 13 | + |
| 14 | +using namespace NThreading; |
| 15 | +using namespace NYql; |
| 16 | + |
| 17 | +void TKqpColumnStatisticsRequester::PropagateTableToLambdaArgument(const TExprNode::TPtr& input) { |
| 18 | + if (input->ChildrenSize() < 2) { |
| 19 | + return; |
| 20 | + } |
| 21 | + |
| 22 | + auto callableInput = input->ChildRef(0); |
| 23 | + |
| 24 | + |
| 25 | + for (size_t i = 1; i < input->ChildrenSize(); ++i) { |
| 26 | + auto maybeLambda = TExprBase(input->ChildRef(i)); |
| 27 | + if (!maybeLambda.Maybe<TCoLambda>()) { |
| 28 | + continue; |
| 29 | + } |
| 30 | + |
| 31 | + auto lambda = maybeLambda.Cast<TCoLambda>(); |
| 32 | + if (!lambda.Args().Size()){ |
| 33 | + continue; |
| 34 | + } |
| 35 | + |
| 36 | + if (callableInput->IsList()){ |
| 37 | + for (size_t j = 0; j < callableInput->ChildrenSize(); ++j){ |
| 38 | + KqpTableByExprNode[lambda.Args().Arg(j).Ptr()] = KqpTableByExprNode[callableInput->Child(j)]; |
| 39 | + } |
| 40 | + } else { |
| 41 | + KqpTableByExprNode[lambda.Args().Arg(0).Ptr()] = KqpTableByExprNode[callableInput.Get()]; |
| 42 | + } |
| 43 | + } |
| 44 | +} |
| 45 | + |
| 46 | +IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { |
| 47 | + Y_UNUSED(ctx); |
| 48 | + |
| 49 | + output = input; |
| 50 | + auto optLvl = Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel); |
| 51 | + auto enableColumnStats = Config->FeatureFlags.GetEnableColumnStatistics(); |
| 52 | + if (!(optLvl > 0 && enableColumnStats)) { |
| 53 | + return IGraphTransformer::TStatus::Ok; |
| 54 | + } |
| 55 | + |
| 56 | + VisitExprLambdasLast( |
| 57 | + input, |
| 58 | + [&](const TExprNode::TPtr& input) { |
| 59 | + BeforeLambdas(input) || BeforeLambdasUnmatched(input); |
| 60 | + |
| 61 | + if (input->IsCallable()) { |
| 62 | + PropagateTableToLambdaArgument(input); |
| 63 | + } |
| 64 | + |
| 65 | + return true; |
| 66 | + }, |
| 67 | + [&](const TExprNode::TPtr& input) { |
| 68 | + return AfterLambdas(input) || AfterLambdasUnmatched(input); |
| 69 | + } |
| 70 | + ); |
| 71 | + |
| 72 | + if (ColumnsByTableName.empty()) { |
| 73 | + return IGraphTransformer::TStatus::Ok; |
| 74 | + } |
| 75 | + |
| 76 | + struct TTableMeta { |
| 77 | + TString TableName; |
| 78 | + THashMap<ui32, TString> ColumnNameByTag; |
| 79 | + }; |
| 80 | + THashMap<TPathId, TTableMeta> tableMetaByPathId; |
| 81 | + |
| 82 | + // TODO: Add other statistics, not only COUNT_MIN_SKETCH. |
| 83 | + auto getStatisticsRequest = MakeHolder<NStat::TEvStatistics::TEvGetStatistics>(); |
| 84 | + getStatisticsRequest->StatType = NKikimr::NStat::EStatType::COUNT_MIN_SKETCH; |
| 85 | + |
| 86 | + for (const auto& [table, columns]: ColumnsByTableName) { |
| 87 | + auto tableMeta = Tables.GetTable(Cluster, table).Metadata; |
| 88 | + auto& columnsMeta = tableMeta->Columns; |
| 89 | + |
| 90 | + auto pathId = TPathId(tableMeta->PathId.OwnerId(), tableMeta->PathId.TableId()); |
| 91 | + for (const auto& column: columns) { |
| 92 | + if (TypesCtx.ColumnStatisticsByTableName.contains(table) && TypesCtx.ColumnStatisticsByTableName[table]->Data.contains(column)) { |
| 93 | + continue; |
| 94 | + } |
| 95 | + |
| 96 | + if (!columns.contains(column)) { |
| 97 | + YQL_CLOG(DEBUG, ProviderKikimr) << "Table: " + table + " doesn't contain " + column + " to request for column statistics"; |
| 98 | + } |
| 99 | + |
| 100 | + NKikimr::NStat::TRequest req; |
| 101 | + req.ColumnTag = columnsMeta[column].Id; |
| 102 | + req.PathId = pathId; |
| 103 | + getStatisticsRequest->StatRequests.push_back(req); |
| 104 | + |
| 105 | + tableMetaByPathId[pathId].TableName = table; |
| 106 | + tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column; |
| 107 | + } |
| 108 | + } |
| 109 | + |
| 110 | + if (getStatisticsRequest->StatRequests.empty()) { |
| 111 | + return IGraphTransformer::TStatus::Ok; |
| 112 | + } |
| 113 | + |
| 114 | + using TRequest = NStat::TEvStatistics::TEvGetStatistics; |
| 115 | + using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult; |
| 116 | + struct TResult : public NYql::IKikimrGateway::TGenericResult { |
| 117 | + THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName; |
| 118 | + }; |
| 119 | + |
| 120 | + auto promise = NewPromise<TResult>(); |
| 121 | + auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)] |
| 122 | + (TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable { |
| 123 | + if (!response.Success) { |
| 124 | + promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!")); |
| 125 | + } |
| 126 | + |
| 127 | + THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName; |
| 128 | + |
| 129 | + for (auto&& stat: response.StatResponses) { |
| 130 | + auto meta = tableMetaByPathId[stat.Req.PathId]; |
| 131 | + auto columnName = meta.ColumnNameByTag[stat.Req.ColumnTag.value()]; |
| 132 | + auto& columnStatistics = columnStatisticsByTableName[meta.TableName].Data[columnName]; |
| 133 | + columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin); |
| 134 | + } |
| 135 | + |
| 136 | + promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)}); |
| 137 | + }; |
| 138 | + auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId); |
| 139 | + IActor* requestHandler = |
| 140 | + new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback); |
| 141 | + auto actorId = ActorSystem |
| 142 | + ->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); |
| 143 | + Y_UNUSED(actorId); |
| 144 | + |
| 145 | + auto res = promise.GetFuture().GetValueSync(); |
| 146 | + if (!res.Issues().Empty()) { |
| 147 | + TStringStream ss; |
| 148 | + res.Issues().PrintTo(ss); |
| 149 | + YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str(); |
| 150 | + return IGraphTransformer::TStatus::Ok; |
| 151 | + } |
| 152 | + |
| 153 | + for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) { |
| 154 | + TypesCtx.ColumnStatisticsByTableName.insert( |
| 155 | + {std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))} |
| 156 | + ); |
| 157 | + } |
| 158 | + |
| 159 | + return IGraphTransformer::TStatus::Ok; |
| 160 | +} |
| 161 | + |
| 162 | +bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) { |
| 163 | + bool matched = true; |
| 164 | + |
| 165 | + if (TKqpTable::Match(input.Get())) { |
| 166 | + KqpTableByExprNode[input.Get()] = input.Get(); |
| 167 | + } else if (auto maybeStreamLookup = TExprBase(input).Maybe<TKqpCnStreamLookup>()) { |
| 168 | + KqpTableByExprNode[input.Get()] = maybeStreamLookup.Cast().Table().Ptr(); |
| 169 | + } else { |
| 170 | + matched = false; |
| 171 | + } |
| 172 | + |
| 173 | + return matched; |
| 174 | +} |
| 175 | + |
| 176 | +bool TKqpColumnStatisticsRequester::BeforeLambdasUnmatched(const TExprNode::TPtr& input) { |
| 177 | + for (const auto& node: input->Children()) { |
| 178 | + if (KqpTableByExprNode.contains(node)) { |
| 179 | + KqpTableByExprNode[input.Get()] = KqpTableByExprNode[node]; |
| 180 | + return true; |
| 181 | + } |
| 182 | + } |
| 183 | + |
| 184 | + return true; |
| 185 | +} |
| 186 | + |
| 187 | +bool TKqpColumnStatisticsRequester::AfterLambdas(const TExprNode::TPtr& input) { |
| 188 | + bool matched = true; |
| 189 | + |
| 190 | + if ( |
| 191 | + TCoFilterBase::Match(input.Get()) || |
| 192 | + TCoFlatMapBase::Match(input.Get()) && IsPredicateFlatMap(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body().Ref()) |
| 193 | + ) { |
| 194 | + std::shared_ptr<TOptimizerStatistics> dummyStats = nullptr; |
| 195 | + auto computer = NDq::TPredicateSelectivityComputer(dummyStats, true); |
| 196 | + |
| 197 | + if (TCoFilterBase::Match(input.Get())) { |
| 198 | + computer.Compute(TExprBase(input).Cast<TCoFilterBase>().Lambda().Body()); |
| 199 | + } else if (TCoFlatMapBase::Match(input.Get())) { |
| 200 | + computer.Compute(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body()); |
| 201 | + } else { |
| 202 | + Y_ENSURE(false); |
| 203 | + } |
| 204 | + |
| 205 | + auto columnStatsUsedMembers = computer.GetColumnStatsUsedMembers(); |
| 206 | + for (const auto& item: columnStatsUsedMembers.Data) { |
| 207 | + auto exprNode = TExprBase(item.Member).Ptr(); |
| 208 | + if (!KqpTableByExprNode.contains(exprNode) || KqpTableByExprNode[exprNode] == nullptr) { |
| 209 | + continue; |
| 210 | + } |
| 211 | + |
| 212 | + auto table = TExprBase(KqpTableByExprNode[exprNode]).Cast<TKqpTable>().Path().StringValue(); |
| 213 | + auto column = item.Member.Name().StringValue(); |
| 214 | + size_t pointPos = column.find('.'); // table.column |
| 215 | + if (pointPos != TString::npos) { |
| 216 | + column = column.substr(pointPos + 1); |
| 217 | + } |
| 218 | + |
| 219 | + ColumnsByTableName[table].insert(std::move(column)); |
| 220 | + } |
| 221 | + } else { |
| 222 | + matched = false; |
| 223 | + } |
| 224 | + |
| 225 | + return matched; |
| 226 | +} |
| 227 | + |
| 228 | +bool TKqpColumnStatisticsRequester::AfterLambdasUnmatched(const TExprNode::TPtr& input) { |
| 229 | + if (KqpTableByExprNode.contains(input.Get())) { |
| 230 | + return true; |
| 231 | + } |
| 232 | + |
| 233 | + for (const auto& node: input->Children()) { |
| 234 | + if (KqpTableByExprNode.contains(node)) { |
| 235 | + KqpTableByExprNode[input.Get()] = KqpTableByExprNode[node]; |
| 236 | + return true; |
| 237 | + } |
| 238 | + } |
| 239 | + |
| 240 | + return true; |
| 241 | +} |
| 242 | + |
| 243 | +TAutoPtr<IGraphTransformer> CreateKqpColumnStatisticsRequester( |
| 244 | + const TKikimrConfiguration::TPtr& config, |
| 245 | + TTypeAnnotationContext& typesCtx, |
| 246 | + TKikimrTablesData& tables, |
| 247 | + TString cluster, |
| 248 | + TActorSystem* actorSystem |
| 249 | +) { |
| 250 | + return THolder<IGraphTransformer>(new TKqpColumnStatisticsRequester(config, typesCtx, tables, cluster, actorSystem)); |
| 251 | +} |
| 252 | + |
| 253 | +} // end of NKikimr::NKqp |
0 commit comments