Skip to content

Commit 4bf8208

Browse files
authored
KIKIMR-20543: ddl+dml in query service (#1444)
1 parent 02bbc51 commit 4bf8208

30 files changed

+985
-215
lines changed

ydb/core/kqp/common/compilation/events.h

+13-6
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,24 @@ namespace NKikimr::NKqp::NPrivateEvents {
1414

1515
struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
1616
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
17-
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, TInstant deadline,
17+
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, TInstant deadline,
1818
TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult,
1919
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
20-
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false)
20+
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing())
2121
: UserToken(userToken)
2222
, Uid(uid)
2323
, Query(std::move(query))
2424
, KeepInCache(keepInCache)
2525
, IsQueryActionPrepare(isQueryActionPrepare)
26+
, PerStatementResult(perStatementResult)
2627
, Deadline(deadline)
2728
, DbCounters(dbCounters)
2829
, UserRequestContext(userRequestContext)
2930
, Orbit(std::move(orbit))
3031
, TempTablesState(std::move(tempTablesState))
3132
, IntrestedInResult(std::move(intrestedInResult))
3233
, CollectDiagnostics(collectDiagnostics)
34+
, QueryAst(queryAst)
3335
{
3436
Y_ENSURE(Uid.Defined() != Query.Defined());
3537
}
@@ -39,6 +41,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
3941
TMaybe<TKqpQueryId> Query;
4042
bool KeepInCache = false;
4143
bool IsQueryActionPrepare = false;
44+
bool PerStatementResult = false;
4245
// it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
4346
TInstant Deadline;
4447
TKqpDbCountersPtr DbCounters;
@@ -51,6 +54,8 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
5154
std::shared_ptr<std::atomic<bool>> IntrestedInResult;
5255

5356
bool CollectDiagnostics = false;
57+
58+
TMaybe<TQueryAst> QueryAst;
5459
};
5560

5661
struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
@@ -103,12 +108,14 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
103108
};
104109

105110
struct TEvParseResponse: public TEventLocal<TEvParseResponse, TKqpEvents::EvParseResponse> {
106-
TEvParseResponse(const TKqpQueryId& query, TMaybe<TQueryAst> astResult)
107-
: AstResult(std::move(astResult))
108-
, Query(query) {}
111+
TEvParseResponse(const TKqpQueryId& query, TVector<TQueryAst> astStatements, NLWTrace::TOrbit orbit = {})
112+
: AstStatements(std::move(astStatements))
113+
, Query(query)
114+
, Orbit(std::move(orbit)) {}
109115

110-
TMaybe<TQueryAst> AstResult;
116+
TVector<TQueryAst> AstStatements;
111117
TKqpQueryId Query;
118+
NLWTrace::TOrbit Orbit;
112119
};
113120

114121
struct TEvCompileInvalidateRequest: public TEventLocal<TEvCompileInvalidateRequest,

ydb/core/kqp/common/kqp_lwtrace_probes.h

+3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ struct TQueryAction {
6868
PROBE(KqpCompileServiceReplyFromCache, GROUPS("KQP"), \
6969
TYPES(), \
7070
NAMES()) \
71+
PROBE(KqpCompileServiceReplyStatements, GROUPS("KQP"), \
72+
TYPES(), \
73+
NAMES()) \
7174
PROBE(KqpCompileServiceReplyError, GROUPS("KQP"), \
7275
TYPES(), \
7376
NAMES()) \

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+45-22
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
5252
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
5353
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
5454
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
55-
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult)
55+
bool perStatementResult, ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst)
5656
: Owner(owner)
5757
, ModuleResolverState(moduleResolverState)
5858
, Counters(counters)
5959
, FederatedQuerySetup(federatedQuerySetup)
6060
, Uid(uid)
6161
, QueryId(queryId)
62-
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, astResult)
62+
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, queryAst)
6363
, UserToken(userToken)
6464
, DbCounters(dbCounters)
6565
, Config(MakeIntrusive<TKikimrConfiguration>())
@@ -70,8 +70,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
7070
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
7171
, TempTablesState(std::move(tempTablesState))
7272
, CollectFullDiagnostics(collectFullDiagnostics)
73+
, PerStatementResult(perStatementResult)
7374
, CompileAction(compileAction)
74-
, AstResult(std::move(astResult))
75+
, QueryAst(std::move(queryAst))
7576
{
7677
Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false);
7778

@@ -127,26 +128,22 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
127128
}
128129

129130
private:
130-
void SetQueryAst(const TActorContext &ctx) {
131-
TString cluster = QueryId.Cluster;
131+
132+
TVector<TQueryAst> GetAstStatements(const TActorContext &ctx) {
133+
TString cluster = QueryId.Cluster;
132134
TString kqpTablePathPrefix = Config->_KqpTablePathPrefix.Get().GetRef();
133135
ui16 kqpYqlSyntaxVersion = Config->_KqpYqlSyntaxVersion.Get().GetRef();
134136
NSQLTranslation::EBindingsMode bindingsMode = Config->BindingsMode;
135137
bool isEnableExternalDataSources = AppData(ctx)->FeatureFlags.GetEnableExternalDataSources();
136138
bool isEnablePgConstsToParams = Config->EnablePgConstsToParams;
139+
bool perStatementExecution = Config->EnablePerStatementQueryExecution && PerStatementResult;
137140

138-
auto astResult = ParseQuery(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, QueryId.IsSql(), cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams);
139-
YQL_ENSURE(astResult.Ast);
140-
if (astResult.Ast->IsOk()) {
141-
AstResult = std::move(astResult);
142-
}
141+
return ParseStatements(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams, QueryId.IsSql(), perStatementExecution);
143142
}
144143

145144
void StartParsing(const TActorContext &ctx) {
146-
SetQueryAst(ctx);
147-
148145
Become(&TKqpCompileActor::CompileState);
149-
ReplyParseResult(ctx);
146+
ReplyParseResult(ctx, GetAstStatements(ctx));
150147
}
151148

152149
void StartCompilation(const TActorContext &ctx) {
@@ -352,15 +349,38 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
352349
<< ", at state:" << state);
353350
}
354351

355-
void ReplyParseResult(const TActorContext &ctx) {
352+
void ReplyParseResult(const TActorContext &ctx, TVector<TQueryAst>&& astStatements) {
356353
Y_UNUSED(ctx);
354+
355+
if (astStatements.empty()) {
356+
NYql::TIssue issue(NYql::TPosition(), "Parsing result of query is empty");
357+
ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue});
358+
return;
359+
}
360+
361+
for (size_t statementId = 0; statementId < astStatements.size(); ++statementId) {
362+
if (!astStatements[statementId].Ast || !astStatements[statementId].Ast->IsOk() || !astStatements[statementId].Ast->Root) {
363+
ALOG_ERROR(NKikimrServices::KQP_COMPILE_ACTOR, "Get parsing result with error"
364+
<< ", self: " << SelfId()
365+
<< ", owner: " << Owner
366+
<< ", statement id: " << statementId);
367+
368+
NYql::TIssue issue(NYql::TPosition(), "Error while parsing query.");
369+
for (const auto& i : astStatements[statementId].Ast->Issues) {
370+
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
371+
}
372+
373+
ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue});
374+
return;
375+
}
376+
}
377+
357378
ALOG_DEBUG(NKikimrServices::KQP_COMPILE_ACTOR, "Send parsing result"
358379
<< ", self: " << SelfId()
359380
<< ", owner: " << Owner
360-
<< (AstResult && AstResult->Ast->IsOk() ? ", parsing is successful" : ", parsing is not successful"));
381+
<< ", statements size: " << astStatements.size());
361382

362-
auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(AstResult));
363-
AstResult = Nothing();
383+
auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(astStatements));
364384
Send(Owner, responseEv.Release());
365385

366386
Counters->ReportCompileFinish(DbCounters);
@@ -379,8 +399,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
379399
KqpCompileResult->PreparedQuery = preparedQueryHolder;
380400
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());
381401

382-
if (AstResult) {
383-
KqpCompileResult->Ast = AstResult->Ast;
402+
if (QueryAst) {
403+
KqpCompileResult->Ast = QueryAst->Ast;
384404
}
385405
}
386406

@@ -482,8 +502,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
482502
TKqpTempTablesState::TConstPtr TempTablesState;
483503
bool CollectFullDiagnostics;
484504

505+
const bool PerStatementResult;
485506
ECompileActorAction CompileAction;
486-
TMaybe<TQueryAst> AstResult;
507+
TMaybe<TQueryAst> QueryAst;
487508
};
488509

489510
void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
@@ -512,6 +533,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
512533
kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode();
513534
kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams() && serviceConfig.GetEnableAstCache();
514535
kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit();
536+
kqpConfig.EnablePerStatementQueryExecution = serviceConfig.GetEnablePerStatementQueryExecution();
515537

516538
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
517539
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
@@ -527,14 +549,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
527549
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
528550
TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
529551
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
530-
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult, bool collectFullDiagnostics)
552+
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
553+
bool perStatementResult)
531554
{
532555
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig,
533556
moduleResolverState, counters,
534557
uid, query, userToken, dbCounters,
535558
federatedQuerySetup, userRequestContext,
536559
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
537-
compileAction, std::move(astResult));
560+
perStatementResult, compileAction, std::move(queryAst));
538561
}
539562

540563
} // namespace NKqp

0 commit comments

Comments
 (0)