Skip to content

Merge CBO + CBO-KQP improvements #10400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
63507d1
Fix Olap Filter selectivity computation (#9021)
pavelvelikhov Sep 12, 2024
463649e
Fixed computation of ByteSize for CBO for OLAP tables (#9626)
pavelvelikhov Sep 23, 2024
ac5d781
Lowered MapJoin threshold to avoid excessive broadcasts (#10288)
pavelvelikhov Oct 10, 2024
8225e62
Added ensure Node != nullptr. (#8855)
pashandor789 Sep 9, 2024
217b631
[CBO] Hints parser added (#9252)
pashandor789 Sep 17, 2024
63a1d74
[CBO] hints parser fixes (#9463)
pashandor789 Sep 19, 2024
c683ef1
[KQP] Plan inf rec fix (#9519)
pashandor789 Sep 20, 2024
2897dbd
[CLI] Bugfix: remove item prefix correctly (#9663)
pashandor789 Sep 24, 2024
41fb53f
[KQP] Many result sets added (#9696)
pashandor789 Sep 24, 2024
42e64fe
[KQP] CBO hints warnings added (#9701)
pashandor789 Sep 24, 2024
9085ecf
[CBO] Ignore IsJoinApplicable with hints. Improve hints warning messa…
pashandor789 Sep 26, 2024
e8290ec
[CBO] Asan test memory leak fix (#9791)
pashandor789 Sep 26, 2024
b048dc7
[CBO] Hint syntax fix (#9875)
pashandor789 Sep 30, 2024
994e011
[KQP] Fix plan olap filter selectivity estimation (#9936)
pashandor789 Oct 1, 2024
abe5dbf
[CBO] Fix LeftOnly (LeftAntiJoin) algebraic matrix (#9974)
pashandor789 Oct 2, 2024
bf47906
[KQP] turn off CBO IsLookupJoinApplicable for not row column. (#10141)
pashandor789 Oct 7, 2024
90fd6bf
[KQP] Binary symbols in range breaks plan json and make server crash …
pashandor789 Oct 7, 2024
07f6ccd
[KQP] Fix comparison attributes OLAP (#10192)
pashandor789 Oct 7, 2024
d554307
[KQP] Make TKqpColumnStatisticsRequester async (#10224)
pashandor789 Oct 9, 2024
70c8b76
TFuture add IsReady method
Jul 26, 2024
439089a
Don't lose 'any' flag after CBO. (#8674)
Tony-Romanov Sep 24, 2024
93ee979
[] fix
Oct 14, 2024
e057121
[] fix
Oct 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions library/cpp/threading/future/core/future-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ namespace NThreading {
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
bool IsReady() const {
return AtomicGet(State) != NotReady;
}

const T& GetValue(TDuration timeout = TDuration::Zero()) const {
AccessValue(timeout, ValueRead);
Expand Down Expand Up @@ -297,6 +300,9 @@ namespace NThreading {
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
bool IsReady() const {
return AtomicGet(State) != NotReady;
}

void GetValue(TDuration timeout = TDuration::Zero()) const {
TAtomicBase state = AtomicGet(State);
Expand Down Expand Up @@ -583,6 +589,10 @@ namespace NThreading {
inline bool TFuture<T>::HasException() const {
return State && State->HasException();
}
template <typename T>
inline bool TFuture<T>::IsReady() const {
return State && State->IsReady();
}

template <typename T>
inline void TFuture<T>::Wait() const {
Expand Down Expand Up @@ -688,6 +698,9 @@ namespace NThreading {
inline bool TFuture<void>::HasException() const {
return State && State->HasException();
}
inline bool TFuture<void>::IsReady() const {
return State && State->IsReady();
}

inline void TFuture<void>::Wait() const {
EnsureInitialized();
Expand Down Expand Up @@ -823,6 +836,11 @@ namespace NThreading {
return State && State->HasException();
}

template <typename T>
inline bool TPromise<T>::IsReady() const {
return State && State->IsReady();
}

template <typename T>
inline void TPromise<T>::SetException(const TString& e) {
EnsureInitialized();
Expand Down Expand Up @@ -904,6 +922,10 @@ namespace NThreading {
return State && State->HasException();
}

inline bool TPromise<void>::IsReady() const {
return State && State->IsReady();
}

inline void TPromise<void>::SetException(const TString& e) {
EnsureInitialized();
State->SetException(std::make_exception_ptr(yexception() << e));
Expand Down
22 changes: 22 additions & 0 deletions library/cpp/threading/future/core/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ namespace NThreading {
void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// NOTE: returns true even if value was extracted from promise
// good replace for HasValue() || HasException()
bool IsReady() const;

void Wait() const;
bool Wait(TDuration timeout) const;
bool Wait(TInstant deadline) const;
Expand Down Expand Up @@ -153,6 +159,11 @@ namespace NThreading {
void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// good replace for HasValue() || HasException()
bool IsReady() const;

void Wait() const;
bool Wait(TDuration timeout) const;
bool Wait(TInstant deadline) const;
Expand Down Expand Up @@ -216,6 +227,12 @@ namespace NThreading {

void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// NOTE: returns true even if value was extracted from promise
// good replace for HasValue() || HasException()
bool IsReady() const;
void SetException(const TString& e);
void SetException(std::exception_ptr e);
bool TrySetException(std::exception_ptr e);
Expand Down Expand Up @@ -256,6 +273,11 @@ namespace NThreading {

void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// good replace for HasValue() || HasException()
bool IsReady() const;
void SetException(const TString& e);
void SetException(std::exception_ptr e);
bool TrySetException(std::exception_ptr e);
Expand Down
4 changes: 4 additions & 0 deletions library/cpp/threading/future/future_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ namespace {

future = MakeFuture(345);
UNIT_ASSERT(future.HasValue());
UNIT_ASSERT(future.IsReady());
UNIT_ASSERT_EQUAL(future.GetValue(), 345);
}

Expand All @@ -115,6 +116,7 @@ namespace {

TFuture<void> future = promise.GetFuture();
UNIT_ASSERT(future.HasValue());
UNIT_ASSERT(future.IsReady());

future = MakeFuture();
UNIT_ASSERT(future.HasValue());
Expand Down Expand Up @@ -523,6 +525,7 @@ namespace {
{
auto future1 = MakeErrorFuture<void>(std::make_exception_ptr(TFutureException()));
UNIT_ASSERT(future1.HasException());
UNIT_ASSERT(future1.IsReady());
UNIT_CHECK_GENERATED_EXCEPTION(future1.GetValue(), TFutureException);

auto future2 = MakeErrorFuture<int>(std::make_exception_ptr(TFutureException()));
Expand Down Expand Up @@ -563,6 +566,7 @@ namespace {
promise2.SetException("foo-exception");
wait.Wait();
UNIT_ASSERT(future2.HasException());
UNIT_ASSERT(!future1.IsReady());
UNIT_ASSERT(!future1.HasValue() && !future1.HasException());
}

Expand Down
44 changes: 26 additions & 18 deletions ydb/core/kqp/opt/kqp_column_statistics_requester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
NKikimr::NStat::TRequest req;
req.ColumnTag = columnsMeta[column].Id;
req.PathId = pathId;
getStatisticsRequest->StatRequests.push_back(req);
getStatisticsRequest->StatRequests.push_back(std::move(req));

tableMetaByPathId[pathId].TableName = table;
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
Expand All @@ -113,15 +113,14 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:

using TRequest = NStat::TEvStatistics::TEvGetStatistics;
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
struct TResult : public NYql::IKikimrGateway::TGenericResult {
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
};

auto promise = NewPromise<TResult>();
AsyncReadiness = NewPromise<void>();
auto promise = NewPromise<TColumnStatisticsResponse>();
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
(TPromise<TColumnStatisticsResponse> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
if (!response.Success) {
promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!"));
promise.SetValue(NYql::NCommon::ResultFromError<TColumnStatisticsResponse>("can't get column statistics!"));
return;
}

THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
Expand All @@ -133,30 +132,39 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
}

promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
promise.SetValue(TColumnStatisticsResponse{.ColumnStatisticsByTableName = std::move(columnStatisticsByTableName)});
};
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
IActor* requestHandler =
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
auto actorId = ActorSystem
new TActorRequestHandler<TRequest, TResponse, TColumnStatisticsResponse>(statServiceId, getStatisticsRequest.Release(), promise, callback);
ActorSystem
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
Y_UNUSED(actorId);

auto res = promise.GetFuture().GetValueSync();
if (!res.Issues().Empty()) {
TStringStream ss;
res.Issues().PrintTo(ss);
YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
promise.GetFuture().Subscribe([this](auto result){ ColumnStatisticsResponse = result.ExtractValue(); AsyncReadiness.SetValue(); });

return TStatus::Async;
}

IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) {
Y_ENSURE(AsyncReadiness.IsReady() && ColumnStatisticsResponse.has_value());

if (!ColumnStatisticsResponse->Issues().Empty()) {
TStringStream ss; ColumnStatisticsResponse->Issues().PrintTo(ss);
YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
return IGraphTransformer::TStatus::Ok;
}

for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) {
for (auto&& [tableName, columnStatistics]: ColumnStatisticsResponse->ColumnStatisticsByTableName) {
TypesCtx.ColumnStatisticsByTableName.insert(
{std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))}
);
}

return IGraphTransformer::TStatus::Ok;
return TStatus::Ok;
}

TFuture<void> TKqpColumnStatisticsRequester::DoGetAsyncFuture(const TExprNode&) {
return AsyncReadiness.GetFuture();
}

bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/kqp/opt/kqp_column_statistics_requester.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ using namespace NYql::NNodes;
* Then it requests column statistics for these attributes from the column statistics service
* and stores it into a TTypeAnnotationContext.
*/
class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
class TKqpColumnStatisticsRequester : public TGraphTransformerBase {
public:
TKqpColumnStatisticsRequester(
const TKikimrConfiguration::TPtr& config,
Expand All @@ -37,6 +37,10 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
// Main method of the transformer
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;

NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final;

IGraphTransformer::TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;

void Rewind() override {}

~TKqpColumnStatisticsRequester() override = default;
Expand All @@ -56,6 +60,16 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
THashMap<TExprNode::TPtr, TExprNode::TPtr> KqpTableByExprNode;
THashMap<TString, THashSet<TString>> ColumnsByTableName;

//////////////////////////////////////////////////////////////
/* for waiting response with column statistics */
struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult {
THashMap<TString, TOptimizerStatistics::TColumnStatMap> ColumnStatisticsByTableName;
};
std::optional<TColumnStatisticsResponse> ColumnStatisticsResponse;
NThreading::TPromise<void> AsyncReadiness;

//////////////////////////////////////////////////////////////

const TKikimrConfiguration::TPtr& Config;
TTypeAnnotationContext& TypesCtx;
TKikimrTablesData& Tables;
Expand Down
40 changes: 8 additions & 32 deletions ydb/core/kqp/opt/kqp_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
, QueryCtx(queryCtx)
, Tables(tables)
, UserRequestContext(userRequestContext)
{
{
YQL_ENSURE(QueryCtx);
YQL_ENSURE(Tables);
}
Expand All @@ -31,9 +31,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
int JoinsCount{};
int EquiJoinsCount{};
std::shared_ptr<NJson::TJsonValue> OverrideStatistics{};
std::shared_ptr<NYql::TCardinalityHints> CardinalityHints{};
std::shared_ptr<NYql::TJoinAlgoHints> JoinAlgoHints{};
std::shared_ptr<NYql::TJoinOrderHints> JoinOrderHints{};
std::shared_ptr<NYql::TOptimizerHints> Hints{};

std::shared_ptr<NJson::TJsonValue> GetOverrideStatistics() {
if (Config->OptOverrideStatistics.Get()) {
Expand All @@ -49,37 +47,15 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
}
}

NYql::TCardinalityHints GetCardinalityHints() {
if (Config->OptCardinalityHints.Get()) {
if (!CardinalityHints) {
CardinalityHints = std::make_shared<NYql::TCardinalityHints>(*Config->OptCardinalityHints.Get());
NYql::TOptimizerHints GetOptimizerHints() {
if (Config->OptimizerHints.Get()) {
if (!Hints) {
Hints = std::make_shared<NYql::TOptimizerHints>(*Config->OptimizerHints.Get());
}
return *CardinalityHints;
} else {
return NYql::TCardinalityHints();
}
}

NYql::TJoinAlgoHints GetJoinAlgoHints() {
if (Config->OptJoinAlgoHints.Get()) {
if (!JoinAlgoHints) {
JoinAlgoHints = std::make_shared<NYql::TJoinAlgoHints>(*Config->OptJoinAlgoHints.Get());
}
return *JoinAlgoHints;
} else {
return NYql::TJoinAlgoHints();
return *Hints;
}
}

NYql::TJoinOrderHints GetJoinOrderHints() {
if (Config->OptJoinOrderHints.Get()) {
if (!JoinOrderHints) {
JoinOrderHints = std::make_shared<NYql::TJoinOrderHints>(*Config->OptJoinOrderHints.Get());
}
return *JoinOrderHints;
} else {
return NYql::TJoinOrderHints();
}
return NYql::TOptimizerHints();
}

bool IsDataQuery() const {
Expand Down
Loading
Loading