Skip to content

Commit 7c628c7

Browse files
authored
Blocks for first stages (#1404)
1 parent 658eb36 commit 7c628c7

File tree

4 files changed

+165
-32
lines changed

4 files changed

+165
-32
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,25 @@ bool NeedReportStats(const Ydb::Query::ExecuteQueryRequest& req) {
176176
}
177177
}
178178

179+
bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
180+
switch (req.exec_mode()) {
181+
case Ydb::Query::EXEC_MODE_EXPLAIN:
182+
return true;
183+
184+
case Ydb::Query::EXEC_MODE_EXECUTE:
185+
switch (req.stats_mode()) {
186+
case Ydb::Query::StatsMode::STATS_MODE_FULL:
187+
case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
188+
return true;
189+
default:
190+
return false;
191+
}
192+
193+
default:
194+
return false;
195+
}
196+
}
197+
179198
class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
180199
public:
181200
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -382,6 +401,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
382401
if (NeedReportStats(*Request_->GetProtoRequest())) {
383402
hasTrailingMessage = true;
384403
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
404+
if (NeedReportAst(*Request_->GetProtoRequest())) {
405+
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
406+
}
385407
}
386408

387409
if (hasTrailingMessage) {

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ class TKqpRunner : public IKqpRunner {
138138
: Gateway(gateway)
139139
, Cluster(cluster)
140140
, TypesCtx(*typesCtx)
141+
, SessionCtx(sessionCtx)
142+
, FunctionRegistry(funcRegistry)
141143
, Config(sessionCtx->ConfigPtr())
142144
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
143145
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
@@ -192,14 +194,19 @@ class TKqpRunner : public IKqpRunner {
192194
YQL_ENSURE(IsIn({EKikimrQueryType::Query, EKikimrQueryType::Script}, TransformCtx->QueryCtx->Type));
193195
YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query));
194196

197+
TypesCtx.BlockEngineMode = NYql::EBlockEngineMode::Auto;
198+
195199
return PrepareQueryInternal(cluster, TKiDataQueryBlocks(query), ctx, settings);
196200
}
197201

198202
private:
203+
199204
TIntrusivePtr<TAsyncQueryResult> PrepareQueryInternal(const TString& cluster,
200205
const TKiDataQueryBlocks& dataQueryBlocks, TExprContext& ctx,
201206
const IKikimrQueryExecutor::TExecuteSettings& settings)
202207
{
208+
CreateGraphTransformer(&TypesCtx, SessionCtx, FunctionRegistry);
209+
203210
YQL_ENSURE(cluster == Cluster);
204211
YQL_ENSURE(!settings.CommitTx);
205212
YQL_ENSURE(!settings.RollbackTx);
@@ -311,7 +318,7 @@ class TKqpRunner : public IKqpRunner {
311318
TAutoPtr<IGraphTransformer> compilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster,
312319
*TransformCtx,
313320
*OptimizeCtx,
314-
TypesCtx,
321+
*typesCtx,
315322
funcRegistry,
316323
Config));
317324

@@ -349,6 +356,8 @@ class TKqpRunner : public IKqpRunner {
349356
TIntrusivePtr<IKqpGateway> Gateway;
350357
TString Cluster;
351358
TTypeAnnotationContext& TypesCtx;
359+
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
360+
const NMiniKQL::IFunctionRegistry& FunctionRegistry;
352361
TKikimrConfiguration::TPtr Config;
353362

354363
TIntrusivePtr<TKqlTransformContext> TransformCtx;

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -908,7 +908,16 @@ static void FillPlan(const NYdb::NScripting::TYqlResultPart& streamPart, TCollec
908908
}
909909
}
910910

911-
static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& /*streamPart*/, TCollectedStreamResult& /*res*/) {}
911+
static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& streamPart, TCollectedStreamResult& res) {
912+
if (streamPart.GetStats() ) {
913+
res.QueryStats = NYdb::TProtoAccessor::GetProto(*streamPart.GetStats());
914+
915+
auto plan = res.QueryStats->query_plan();
916+
if (!plan.empty()) {
917+
res.PlanJson = plan;
918+
}
919+
}
920+
}
912921

913922
template<typename TIterator>
914923
TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 123 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -690,12 +690,27 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
690690
};
691691
}
692692

693-
void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes,
693+
template <typename TClient>
694+
auto StreamExplainQuery(const TString& query, TClient& client) {
695+
if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) {
696+
TStreamExecScanQuerySettings scanSettings;
697+
scanSettings.Explain(true);
698+
return client.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
699+
} else {
700+
NYdb::NQuery::TExecuteQuerySettings scanSettings;
701+
scanSettings.ExecMode(NYdb::NQuery::EExecMode::Explain);
702+
return client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), scanSettings).GetValueSync();
703+
}
704+
}
705+
706+
template <typename TClient>
707+
void CheckPlanForAggregatePushdown(
708+
const TString& query,
709+
TClient& client,
710+
const std::vector<std::string>& expectedPlanNodes,
694711
const std::string& readNodeType)
695712
{
696-
TStreamExecScanQuerySettings scanSettings;
697-
scanSettings.Explain(true);
698-
auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
713+
auto res = StreamExplainQuery(query, client);
699714
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
700715

701716
auto planRes = CollectStreamResult(res);
@@ -704,7 +719,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
704719
Cerr << planRes.PlanJson.GetOrElse("NO_PLAN") << Endl;
705720
Cerr << "AST:" << Endl;
706721
Cerr << ast << Endl;
707-
for (auto planNode : planNodes) {
722+
for (auto planNode : expectedPlanNodes) {
708723
UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
709724
TStringBuilder() << planNode << " was not found. Query: " << query);
710725
}
@@ -2437,10 +2452,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
24372452
ExpectedReply = value;
24382453
return *this;
24392454
}
2455+
24402456
TAggregationTestCase& AddExpectedPlanOptions(const std::string& value) {
24412457
ExpectedPlanOptions.emplace_back(value);
24422458
return *this;
24432459
}
2460+
24442461
const std::vector<std::string>& GetExpectedPlanOptions() const {
24452462
return ExpectedPlanOptions;
24462463
}
@@ -2548,7 +2565,28 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
25482565
TestAggregationsInternal(cases);
25492566
}
25502567

2551-
void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases) {
2568+
template <typename TClient>
2569+
auto StreamExecuteQuery(const TAggregationTestCase& testCase, TClient& client) {
2570+
if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) {
2571+
return client.StreamExecuteScanQuery(testCase.GetFixedQuery()).GetValueSync();
2572+
} else {
2573+
return client.StreamExecuteQuery(
2574+
testCase.GetFixedQuery(),
2575+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
2576+
}
2577+
}
2578+
2579+
template <typename TClient>
2580+
void RunTestCaseWithClient(const TAggregationTestCase& testCase, TClient& client) {
2581+
auto it = StreamExecuteQuery(testCase, client);
2582+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
2583+
TString result = StreamResultToYson(it);
2584+
if (!testCase.GetExpectedReply().empty()) {
2585+
CompareYson(result, testCase.GetExpectedReply());
2586+
}
2587+
}
2588+
2589+
void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases, const bool genericQuery) {
25522590
auto settings = TKikimrSettings()
25532591
.SetWithSampleTables(false)
25542592
.SetForceColumnTablesCompositeMarks(true);
@@ -2564,17 +2602,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
25642602
WriteTestDataForClickBench(kikimr, "/Root/benchTable", 0, 1000000 + i * 1000000, iterationPackSize);
25652603
}
25662604

2567-
for (auto&& i : cases) {
2568-
const TString queryFixed = i.GetFixedQuery();
2569-
{
2570-
auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync();
2571-
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
2572-
TString result = StreamResultToYson(it);
2573-
if (!i.GetExpectedReply().empty()) {
2574-
CompareYson(result, i.GetExpectedReply());
2575-
}
2605+
if (!genericQuery) {
2606+
auto tableClient = kikimr.GetTableClient();
2607+
for (auto&& i : cases) {
2608+
const TString queryFixed = i.GetFixedQuery();
2609+
RunTestCaseWithClient(i, tableClient);
2610+
CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
2611+
}
2612+
} else {
2613+
auto queryClient = kikimr.GetQueryClient();
2614+
for (auto&& i : cases) {
2615+
const TString queryFixed = i.GetFixedQuery();
2616+
RunTestCaseWithClient(i, queryClient);
2617+
CheckPlanForAggregatePushdown(queryFixed, queryClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
25762618
}
2577-
CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
25782619
}
25792620
}
25802621

@@ -2637,12 +2678,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
26372678
}
26382679
}
26392680

2640-
void TestClickBench(const std::vector<TAggregationTestCase>& cases) {
2641-
TestClickBenchBase(cases);
2642-
TestClickBenchInternal(cases);
2681+
void TestClickBench(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) {
2682+
TestClickBenchBase(cases, genericQuery);
2683+
if (!genericQuery) {
2684+
TestClickBenchInternal(cases);
2685+
}
26432686
}
26442687

2645-
void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases) {
2688+
void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) {
26462689
auto settings = TKikimrSettings()
26472690
.SetWithSampleTables(false)
26482691
.SetForceColumnTablesCompositeMarks(true);
@@ -2656,17 +2699,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
26562699
WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls");
26572700
}
26582701

2659-
for (auto&& i : cases) {
2660-
const TString queryFixed = i.GetFixedQuery();
2661-
{
2662-
auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync();
2663-
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
2664-
TString result = StreamResultToYson(it);
2665-
if (!i.GetExpectedReply().empty()) {
2666-
CompareYson(result, i.GetExpectedReply());
2667-
}
2702+
if (!genericQuery) {
2703+
auto tableClient = kikimr.GetTableClient();
2704+
for (auto&& i : cases) {
2705+
RunTestCaseWithClient(i, tableClient);
2706+
CheckPlanForAggregatePushdown(i.GetFixedQuery(), tableClient,
2707+
i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
2708+
}
2709+
} else {
2710+
auto queryClient = kikimr.GetQueryClient();
2711+
for (auto&& i : cases) {
2712+
RunTestCaseWithClient(i, queryClient);
2713+
CheckPlanForAggregatePushdown(i.GetFixedQuery(), queryClient,
2714+
i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
26682715
}
2669-
CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
26702716
}
26712717
}
26722718

@@ -5883,6 +5929,53 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
58835929
CompareYson(output, R"([])");
58845930
}
58855931
}
5932+
5933+
Y_UNIT_TEST(BlockGenericWithDistinct) {
5934+
TAggregationTestCase testCase;
5935+
testCase.SetQuery(R"(
5936+
SELECT
5937+
COUNT(DISTINCT id)
5938+
FROM `/Root/tableWithNulls`
5939+
WHERE level = 5 AND Cast(id AS String) = "5";
5940+
)")
5941+
.AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
5942+
.AddExpectedPlanOptions("WideFromBlocks")
5943+
.SetExpectedReply("[[1u]]");
5944+
TestTableWithNulls({ testCase }, /* generic */ true);
5945+
}
5946+
5947+
Y_UNIT_TEST(BlockGenericSimpleAggregation) {
5948+
TAggregationTestCase testCase;
5949+
testCase.SetQuery(R"(
5950+
SELECT
5951+
level, COUNT(*), SUM(id)
5952+
FROM `/Root/tableWithNulls`
5953+
WHERE level = 5
5954+
GROUP BY level
5955+
ORDER BY level;
5956+
)")
5957+
.AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
5958+
.AddExpectedPlanOptions("WideFromBlocks")
5959+
.SetExpectedReply(R"([[[5];1u;5]])");
5960+
5961+
TestTableWithNulls({ testCase }, /* generic */ true);
5962+
}
5963+
5964+
Y_UNIT_TEST(BlockGenericSelectAll) {
5965+
TAggregationTestCase testCase;
5966+
testCase.SetQuery(R"(
5967+
SELECT
5968+
id, resource_id, level
5969+
FROM `/Root/tableWithNulls`
5970+
WHERE level != 5 OR level IS NULL
5971+
ORDER BY id, resource_id, level;
5972+
)")
5973+
.AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
5974+
.AddExpectedPlanOptions("WideFromBlocks")
5975+
.SetExpectedReply(R"([[1;#;[1]];[2;#;[2]];[3;#;[3]];[4;#;[4]];[6;["6"];#];[7;["7"];#];[8;["8"];#];[9;["9"];#];[10;["10"];#]])");
5976+
5977+
TestTableWithNulls({ testCase }, /* generic */ true);
5978+
}
58865979
}
58875980

58885981
} // namespace NKqp

0 commit comments

Comments
 (0)