Skip to content

Commit 6e9dd6a

Browse files
authored
Several CTAS (#6330)
1 parent f94fdc3 commit 6e9dd6a

File tree

4 files changed

+79
-6
lines changed

4 files changed

+79
-6
lines changed

ydb/core/kqp/common/compilation/events.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
7676
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
7777
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
7878
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
79-
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing())
79+
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(),
80+
bool split = false, NYql::TExprContext* splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr)
8081
: UserToken(userToken)
8182
, Uid(uid)
8283
, Query(query)
@@ -90,6 +91,9 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
9091
, TempTablesState(std::move(tempTablesState))
9192
, IntrestedInResult(std::move(intrestedInResult))
9293
, QueryAst(queryAst)
94+
, Split(split)
95+
, SplitCtx(splitCtx)
96+
, SplitExpr(splitExpr)
9397
{
9498
}
9599

@@ -110,6 +114,10 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
110114
std::shared_ptr<std::atomic<bool>> IntrestedInResult;
111115

112116
TMaybe<TQueryAst> QueryAst;
117+
bool Split = false;
118+
119+
NYql::TExprContext* SplitCtx = nullptr;
120+
NYql::TExprNode::TPtr SplitExpr = nullptr;
113121
};
114122

115123
struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
175175
YQL_ENSURE(PerStatementResult);
176176

177177
const auto prepareSettings = PrepareCompilationSettings(ctx);
178-
179-
auto result = KqpHost->SplitQuery(QueryId.Text, prepareSettings);
178+
auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);
180179

181180
Become(&TKqpCompileActor::CompileState);
182181
ReplySplitResult(ctx, std::move(result));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

+14-3
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
611611
<< ", queryUid: " << (request.Uid ? *request.Uid : "<empty>")
612612
<< ", queryText: \"" << (request.Query ? EscapeC(request.Query->Text) : "<empty>") << "\""
613613
<< ", keepInCache: " << request.KeepInCache
614+
<< ", split: " << request.Split
614615
<< *request.UserRequestContext);
615616

616617
*Counters->CompileQueryCacheSize = QueryCache.Size();
@@ -697,7 +698,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
697698
request.Deadline,
698699
ev->Get()->Split
699700
? ECompileActorAction::SPLIT
700-
: TableServiceConfig.GetEnableAstCache()
701+
: (TableServiceConfig.GetEnableAstCache() && !request.QueryAst)
701702
? ECompileActorAction::PARSE
702703
: ECompileActorAction::COMPILE);
703704
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
@@ -760,7 +761,16 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
760761

761762
NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
762763

763-
TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE);
764+
TKqpCompileSettings compileSettings(
765+
true,
766+
request.IsQueryActionPrepare,
767+
false,
768+
request.Deadline,
769+
ev->Get()->Split
770+
? ECompileActorAction::SPLIT
771+
: (TableServiceConfig.GetEnableAstCache() && !request.QueryAst)
772+
? ECompileActorAction::PARSE
773+
: ECompileActorAction::COMPILE);
764774
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query,
765775
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
766776
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
@@ -824,6 +834,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
824834
if (compileResult->NeedToSplit) {
825835
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
826836
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
837+
ProcessQueue(ctx);
827838
return;
828839
}
829840

@@ -961,7 +972,6 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
961972
compileRequest.Orbit,
962973
compileRequest.Query.UserSid);
963974

964-
compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE;
965975
compileRequest.QueryAst = std::move(queryAst);
966976

967977
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
@@ -994,6 +1004,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
9941004
return;
9951005
}
9961006

1007+
compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE;
9971008
CompileByAst(astStatements.front(), compileRequest, ctx);
9981009
}
9991010

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

+55
Original file line numberDiff line numberDiff line change
@@ -2566,6 +2566,61 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
25662566
}
25672567
}
25682568

2569+
Y_UNIT_TEST(SeveralCTAS) {
2570+
NKikimrConfig::TAppConfig appConfig;
2571+
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
2572+
appConfig.MutableTableServiceConfig()->SetEnableAstCache(true);
2573+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
2574+
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
2575+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
2576+
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
2577+
auto setting = NKikimrKqp::TKqpSetting();
2578+
auto serverSettings = TKikimrSettings()
2579+
.SetAppConfig(appConfig)
2580+
.SetKqpSettings({setting})
2581+
.SetWithSampleTables(false)
2582+
.SetEnableTempTables(true);
2583+
2584+
TKikimrRunner kikimr(serverSettings);
2585+
auto db = kikimr.GetQueryClient();
2586+
2587+
{
2588+
auto result = db.ExecuteQuery(R"(
2589+
CREATE TABLE Table1 (
2590+
PRIMARY KEY (Key)
2591+
) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2;
2592+
CREATE TABLE Table2 (
2593+
PRIMARY KEY (Key)
2594+
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
2595+
CREATE TABLE Table3 (
2596+
PRIMARY KEY (Key)
2597+
) AS SELECT * FROM Table2 UNION ALL SELECT * FROM Table1;
2598+
SELECT * FROM Table1 ORDER BY Key;
2599+
SELECT * FROM Table2 ORDER BY Key;
2600+
SELECT * FROM Table3 ORDER BY Key;
2601+
)", TTxControl::NoTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(500))).ExtractValueSync();
2602+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2603+
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3);
2604+
// Results are empty. Snapshot was taken before tables were created, so we don't see changes after snapshot.
2605+
// This will be fixed in future, for example, by implicit commit before/after each ddl statement.
2606+
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
2607+
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1)));
2608+
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2)));
2609+
2610+
result = db.ExecuteQuery(R"(
2611+
SELECT * FROM Table1 ORDER BY Key;
2612+
SELECT * FROM Table2 ORDER BY Key;
2613+
SELECT * FROM Table3 ORDER BY Key;
2614+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2615+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2616+
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3);
2617+
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
2618+
CompareYson(R"([[[2u];["2"];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
2619+
// Also empty now(
2620+
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2)));
2621+
}
2622+
}
2623+
25692624
Y_UNIT_TEST(TableSink_ReplaceFromSelectOlap) {
25702625
NKikimrConfig::TAppConfig appConfig;
25712626
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);

0 commit comments

Comments
 (0)