Skip to content

Commit e8c42a4

Browse files
pashandor789pavelvelikhovPavel IvanovilnurkhTony-Romanov
authored
Merge CBO + CBO-KQP improvements (#10400)
Co-authored-by: Pavel Velikhov <[email protected]> Co-authored-by: Pavel Ivanov <[email protected]> Co-authored-by: ilnurkh <[email protected]> Co-authored-by: Tony-Romanov <[email protected]>
1 parent 3b364c1 commit e8c42a4

File tree

66 files changed

+2572
-591
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2572
-591
lines changed

library/cpp/threading/future/core/future-inl.h

+22
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ namespace NThreading {
116116
bool HasException() const {
117117
return AtomicGet(State) == ExceptionSet;
118118
}
119+
bool IsReady() const {
120+
return AtomicGet(State) != NotReady;
121+
}
119122

120123
const T& GetValue(TDuration timeout = TDuration::Zero()) const {
121124
AccessValue(timeout, ValueRead);
@@ -297,6 +300,9 @@ namespace NThreading {
297300
bool HasException() const {
298301
return AtomicGet(State) == ExceptionSet;
299302
}
303+
bool IsReady() const {
304+
return AtomicGet(State) != NotReady;
305+
}
300306

301307
void GetValue(TDuration timeout = TDuration::Zero()) const {
302308
TAtomicBase state = AtomicGet(State);
@@ -583,6 +589,10 @@ namespace NThreading {
583589
inline bool TFuture<T>::HasException() const {
584590
return State && State->HasException();
585591
}
592+
template <typename T>
593+
inline bool TFuture<T>::IsReady() const {
594+
return State && State->IsReady();
595+
}
586596

587597
template <typename T>
588598
inline void TFuture<T>::Wait() const {
@@ -688,6 +698,9 @@ namespace NThreading {
688698
inline bool TFuture<void>::HasException() const {
689699
return State && State->HasException();
690700
}
701+
inline bool TFuture<void>::IsReady() const {
702+
return State && State->IsReady();
703+
}
691704

692705
inline void TFuture<void>::Wait() const {
693706
EnsureInitialized();
@@ -823,6 +836,11 @@ namespace NThreading {
823836
return State && State->HasException();
824837
}
825838

839+
template <typename T>
840+
inline bool TPromise<T>::IsReady() const {
841+
return State && State->IsReady();
842+
}
843+
826844
template <typename T>
827845
inline void TPromise<T>::SetException(const TString& e) {
828846
EnsureInitialized();
@@ -904,6 +922,10 @@ namespace NThreading {
904922
return State && State->HasException();
905923
}
906924

925+
inline bool TPromise<void>::IsReady() const {
926+
return State && State->IsReady();
927+
}
928+
907929
inline void TPromise<void>::SetException(const TString& e) {
908930
EnsureInitialized();
909931
State->SetException(std::make_exception_ptr(yexception() << e));

library/cpp/threading/future/core/future.h

+22
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ namespace NThreading {
9898
void TryRethrow() const;
9999
bool HasException() const;
100100

101+
// returns true if exception or value was set.
102+
// allows to check readiness without locking cheker-thread
103+
// NOTE: returns true even if value was extracted from promise
104+
// good replace for HasValue() || HasException()
105+
bool IsReady() const;
106+
101107
void Wait() const;
102108
bool Wait(TDuration timeout) const;
103109
bool Wait(TInstant deadline) const;
@@ -153,6 +159,11 @@ namespace NThreading {
153159
void TryRethrow() const;
154160
bool HasException() const;
155161

162+
// returns true if exception or value was set.
163+
// allows to check readiness without locking cheker-thread
164+
// good replace for HasValue() || HasException()
165+
bool IsReady() const;
166+
156167
void Wait() const;
157168
bool Wait(TDuration timeout) const;
158169
bool Wait(TInstant deadline) const;
@@ -216,6 +227,12 @@ namespace NThreading {
216227

217228
void TryRethrow() const;
218229
bool HasException() const;
230+
231+
// returns true if exception or value was set.
232+
// allows to check readiness without locking cheker-thread
233+
// NOTE: returns true even if value was extracted from promise
234+
// good replace for HasValue() || HasException()
235+
bool IsReady() const;
219236
void SetException(const TString& e);
220237
void SetException(std::exception_ptr e);
221238
bool TrySetException(std::exception_ptr e);
@@ -256,6 +273,11 @@ namespace NThreading {
256273

257274
void TryRethrow() const;
258275
bool HasException() const;
276+
277+
// returns true if exception or value was set.
278+
// allows to check readiness without locking cheker-thread
279+
// good replace for HasValue() || HasException()
280+
bool IsReady() const;
259281
void SetException(const TString& e);
260282
void SetException(std::exception_ptr e);
261283
bool TrySetException(std::exception_ptr e);

library/cpp/threading/future/future_ut.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ namespace {
105105

106106
future = MakeFuture(345);
107107
UNIT_ASSERT(future.HasValue());
108+
UNIT_ASSERT(future.IsReady());
108109
UNIT_ASSERT_EQUAL(future.GetValue(), 345);
109110
}
110111

@@ -115,6 +116,7 @@ namespace {
115116

116117
TFuture<void> future = promise.GetFuture();
117118
UNIT_ASSERT(future.HasValue());
119+
UNIT_ASSERT(future.IsReady());
118120

119121
future = MakeFuture();
120122
UNIT_ASSERT(future.HasValue());
@@ -523,6 +525,7 @@ namespace {
523525
{
524526
auto future1 = MakeErrorFuture<void>(std::make_exception_ptr(TFutureException()));
525527
UNIT_ASSERT(future1.HasException());
528+
UNIT_ASSERT(future1.IsReady());
526529
UNIT_CHECK_GENERATED_EXCEPTION(future1.GetValue(), TFutureException);
527530

528531
auto future2 = MakeErrorFuture<int>(std::make_exception_ptr(TFutureException()));
@@ -563,6 +566,7 @@ namespace {
563566
promise2.SetException("foo-exception");
564567
wait.Wait();
565568
UNIT_ASSERT(future2.HasException());
569+
UNIT_ASSERT(!future1.IsReady());
566570
UNIT_ASSERT(!future1.HasValue() && !future1.HasException());
567571
}
568572

ydb/core/kqp/opt/kqp_column_statistics_requester.cpp

+26-18
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
100100
NKikimr::NStat::TRequest req;
101101
req.ColumnTag = columnsMeta[column].Id;
102102
req.PathId = pathId;
103-
getStatisticsRequest->StatRequests.push_back(req);
103+
getStatisticsRequest->StatRequests.push_back(std::move(req));
104104

105105
tableMetaByPathId[pathId].TableName = table;
106106
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
@@ -113,15 +113,14 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
113113

114114
using TRequest = NStat::TEvStatistics::TEvGetStatistics;
115115
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
116-
struct TResult : public NYql::IKikimrGateway::TGenericResult {
117-
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
118-
};
119116

120-
auto promise = NewPromise<TResult>();
117+
AsyncReadiness = NewPromise<void>();
118+
auto promise = NewPromise<TColumnStatisticsResponse>();
121119
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
122-
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
120+
(TPromise<TColumnStatisticsResponse> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
123121
if (!response.Success) {
124-
promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!"));
122+
promise.SetValue(NYql::NCommon::ResultFromError<TColumnStatisticsResponse>("can't get column statistics!"));
123+
return;
125124
}
126125

127126
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
@@ -133,30 +132,39 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
133132
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
134133
}
135134

136-
promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
135+
promise.SetValue(TColumnStatisticsResponse{.ColumnStatisticsByTableName = std::move(columnStatisticsByTableName)});
137136
};
138137
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
139138
IActor* requestHandler =
140-
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
141-
auto actorId = ActorSystem
139+
new TActorRequestHandler<TRequest, TResponse, TColumnStatisticsResponse>(statServiceId, getStatisticsRequest.Release(), promise, callback);
140+
ActorSystem
142141
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
143-
Y_UNUSED(actorId);
144142

145-
auto res = promise.GetFuture().GetValueSync();
146-
if (!res.Issues().Empty()) {
147-
TStringStream ss;
148-
res.Issues().PrintTo(ss);
149-
YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
143+
promise.GetFuture().Subscribe([this](auto result){ ColumnStatisticsResponse = result.ExtractValue(); AsyncReadiness.SetValue(); });
144+
145+
return TStatus::Async;
146+
}
147+
148+
IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) {
149+
Y_ENSURE(AsyncReadiness.IsReady() && ColumnStatisticsResponse.has_value());
150+
151+
if (!ColumnStatisticsResponse->Issues().Empty()) {
152+
TStringStream ss; ColumnStatisticsResponse->Issues().PrintTo(ss);
153+
YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
150154
return IGraphTransformer::TStatus::Ok;
151155
}
152156

153-
for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) {
157+
for (auto&& [tableName, columnStatistics]: ColumnStatisticsResponse->ColumnStatisticsByTableName) {
154158
TypesCtx.ColumnStatisticsByTableName.insert(
155159
{std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))}
156160
);
157161
}
158162

159-
return IGraphTransformer::TStatus::Ok;
163+
return TStatus::Ok;
164+
}
165+
166+
TFuture<void> TKqpColumnStatisticsRequester::DoGetAsyncFuture(const TExprNode&) {
167+
return AsyncReadiness.GetFuture();
160168
}
161169

162170
bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) {

ydb/core/kqp/opt/kqp_column_statistics_requester.h

+15-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ using namespace NYql::NNodes;
1818
* Then it requests column statistics for these attributes from the column statistics service
1919
* and stores it into a TTypeAnnotationContext.
2020
*/
21-
class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
21+
class TKqpColumnStatisticsRequester : public TGraphTransformerBase {
2222
public:
2323
TKqpColumnStatisticsRequester(
2424
const TKikimrConfiguration::TPtr& config,
@@ -37,6 +37,10 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
3737
// Main method of the transformer
3838
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
3939

40+
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final;
41+
42+
IGraphTransformer::TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
43+
4044
void Rewind() override {}
4145

4246
~TKqpColumnStatisticsRequester() override = default;
@@ -56,6 +60,16 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
5660
THashMap<TExprNode::TPtr, TExprNode::TPtr> KqpTableByExprNode;
5761
THashMap<TString, THashSet<TString>> ColumnsByTableName;
5862

63+
//////////////////////////////////////////////////////////////
64+
/* for waiting response with column statistics */
65+
struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult {
66+
THashMap<TString, TOptimizerStatistics::TColumnStatMap> ColumnStatisticsByTableName;
67+
};
68+
std::optional<TColumnStatisticsResponse> ColumnStatisticsResponse;
69+
NThreading::TPromise<void> AsyncReadiness;
70+
71+
//////////////////////////////////////////////////////////////
72+
5973
const TKikimrConfiguration::TPtr& Config;
6074
TTypeAnnotationContext& TypesCtx;
6175
TKikimrTablesData& Tables;

ydb/core/kqp/opt/kqp_opt.h

+8-32
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
1818
, QueryCtx(queryCtx)
1919
, Tables(tables)
2020
, UserRequestContext(userRequestContext)
21-
{
21+
{
2222
YQL_ENSURE(QueryCtx);
2323
YQL_ENSURE(Tables);
2424
}
@@ -31,9 +31,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
3131
int JoinsCount{};
3232
int EquiJoinsCount{};
3333
std::shared_ptr<NJson::TJsonValue> OverrideStatistics{};
34-
std::shared_ptr<NYql::TCardinalityHints> CardinalityHints{};
35-
std::shared_ptr<NYql::TJoinAlgoHints> JoinAlgoHints{};
36-
std::shared_ptr<NYql::TJoinOrderHints> JoinOrderHints{};
34+
std::shared_ptr<NYql::TOptimizerHints> Hints{};
3735

3836
std::shared_ptr<NJson::TJsonValue> GetOverrideStatistics() {
3937
if (Config->OptOverrideStatistics.Get()) {
@@ -49,37 +47,15 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
4947
}
5048
}
5149

52-
NYql::TCardinalityHints GetCardinalityHints() {
53-
if (Config->OptCardinalityHints.Get()) {
54-
if (!CardinalityHints) {
55-
CardinalityHints = std::make_shared<NYql::TCardinalityHints>(*Config->OptCardinalityHints.Get());
50+
NYql::TOptimizerHints GetOptimizerHints() {
51+
if (Config->OptimizerHints.Get()) {
52+
if (!Hints) {
53+
Hints = std::make_shared<NYql::TOptimizerHints>(*Config->OptimizerHints.Get());
5654
}
57-
return *CardinalityHints;
58-
} else {
59-
return NYql::TCardinalityHints();
60-
}
61-
}
62-
63-
NYql::TJoinAlgoHints GetJoinAlgoHints() {
64-
if (Config->OptJoinAlgoHints.Get()) {
65-
if (!JoinAlgoHints) {
66-
JoinAlgoHints = std::make_shared<NYql::TJoinAlgoHints>(*Config->OptJoinAlgoHints.Get());
67-
}
68-
return *JoinAlgoHints;
69-
} else {
70-
return NYql::TJoinAlgoHints();
55+
return *Hints;
7156
}
72-
}
7357

74-
NYql::TJoinOrderHints GetJoinOrderHints() {
75-
if (Config->OptJoinOrderHints.Get()) {
76-
if (!JoinOrderHints) {
77-
JoinOrderHints = std::make_shared<NYql::TJoinOrderHints>(*Config->OptJoinOrderHints.Get());
78-
}
79-
return *JoinOrderHints;
80-
} else {
81-
return NYql::TJoinOrderHints();
82-
}
58+
return NYql::TOptimizerHints();
8359
}
8460

8561
bool IsDataQuery() const {

0 commit comments

Comments
 (0)