Skip to content

Commit d554307

Browse files
pashandor789Pavel Ivanov
authored and
Pavel Ivanov
committed
[KQP] Make TKqpColumnStatisticsRequester async (#10224)
1 parent 07f6ccd commit d554307

File tree

2 files changed

+41
-19
lines changed

2 files changed

+41
-19
lines changed

ydb/core/kqp/opt/kqp_column_statistics_requester.cpp

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
100100
NKikimr::NStat::TRequest req;
101101
req.ColumnTag = columnsMeta[column].Id;
102102
req.PathId = pathId;
103-
getStatisticsRequest->StatRequests.push_back(req);
103+
getStatisticsRequest->StatRequests.push_back(std::move(req));
104104

105105
tableMetaByPathId[pathId].TableName = table;
106106
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
@@ -113,15 +113,14 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
113113

114114
using TRequest = NStat::TEvStatistics::TEvGetStatistics;
115115
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
116-
struct TResult : public NYql::IKikimrGateway::TGenericResult {
117-
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
118-
};
119116

120-
auto promise = NewPromise<TResult>();
117+
AsyncReadiness = NewPromise<void>();
118+
auto promise = NewPromise<TColumnStatisticsResponse>();
121119
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
122-
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
120+
(TPromise<TColumnStatisticsResponse> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
123121
if (!response.Success) {
124-
promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!"));
122+
promise.SetValue(NYql::NCommon::ResultFromError<TColumnStatisticsResponse>("can't get column statistics!"));
123+
return;
125124
}
126125

127126
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
@@ -133,30 +132,39 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
133132
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
134133
}
135134

136-
promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
135+
promise.SetValue(TColumnStatisticsResponse{.ColumnStatisticsByTableName = std::move(columnStatisticsByTableName)});
137136
};
138137
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
139138
IActor* requestHandler =
140-
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
141-
auto actorId = ActorSystem
139+
new TActorRequestHandler<TRequest, TResponse, TColumnStatisticsResponse>(statServiceId, getStatisticsRequest.Release(), promise, callback);
140+
ActorSystem
142141
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
143-
Y_UNUSED(actorId);
144142

145-
auto res = promise.GetFuture().GetValueSync();
146-
if (!res.Issues().Empty()) {
147-
TStringStream ss;
148-
res.Issues().PrintTo(ss);
149-
YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
143+
promise.GetFuture().Subscribe([this](auto result){ ColumnStatisticsResponse = result.ExtractValue(); AsyncReadiness.SetValue(); });
144+
145+
return TStatus::Async;
146+
}
147+
148+
IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) {
149+
Y_ENSURE(AsyncReadiness.IsReady() && ColumnStatisticsResponse.has_value());
150+
151+
if (!ColumnStatisticsResponse->Issues().Empty()) {
152+
TStringStream ss; ColumnStatisticsResponse->Issues().PrintTo(ss);
153+
YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
150154
return IGraphTransformer::TStatus::Ok;
151155
}
152156

153-
for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) {
157+
for (auto&& [tableName, columnStatistics]: ColumnStatisticsResponse->ColumnStatisticsByTableName) {
154158
TypesCtx.ColumnStatisticsByTableName.insert(
155159
{std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))}
156160
);
157161
}
158162

159-
return IGraphTransformer::TStatus::Ok;
163+
return TStatus::Ok;
164+
}
165+
166+
TFuture<void> TKqpColumnStatisticsRequester::DoGetAsyncFuture(const TExprNode&) {
167+
return AsyncReadiness.GetFuture();
160168
}
161169

162170
bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) {

ydb/core/kqp/opt/kqp_column_statistics_requester.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ using namespace NYql::NNodes;
1818
* Then it requests column statistics for these attributes from the column statistics service
1919
* and stores it into a TTypeAnnotationContext.
2020
*/
21-
class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
21+
class TKqpColumnStatisticsRequester : public TGraphTransformerBase {
2222
public:
2323
TKqpColumnStatisticsRequester(
2424
const TKikimrConfiguration::TPtr& config,
@@ -37,6 +37,10 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
3737
// Main method of the transformer
3838
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
3939

40+
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final;
41+
42+
IGraphTransformer::TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
43+
4044
void Rewind() override {}
4145

4246
~TKqpColumnStatisticsRequester() override = default;
@@ -56,6 +60,16 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
5660
THashMap<TExprNode::TPtr, TExprNode::TPtr> KqpTableByExprNode;
5761
THashMap<TString, THashSet<TString>> ColumnsByTableName;
5862

63+
//////////////////////////////////////////////////////////////
64+
/* for waiting response with column statistics */
65+
struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult {
66+
THashMap<TString, TOptimizerStatistics::TColumnStatMap> ColumnStatisticsByTableName;
67+
};
68+
std::optional<TColumnStatisticsResponse> ColumnStatisticsResponse;
69+
NThreading::TPromise<void> AsyncReadiness;
70+
71+
//////////////////////////////////////////////////////////////
72+
5973
const TKikimrConfiguration::TPtr& Config;
6074
TTypeAnnotationContext& TypesCtx;
6175
TKikimrTablesData& Tables;

0 commit comments

Comments
 (0)