Skip to content

Commit 5b11bce

Browse files
shnikdGazizonoki
andauthored
Support QueryMeta and diagnostics (#11371)
Co-authored-by: Bulat <[email protected]>
1 parent 010e83f commit 5b11bce

File tree

23 files changed

+542
-68
lines changed

23 files changed

+542
-68
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,25 @@ bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
179179
}
180180
}
181181

182+
bool NeedCollectDiagnostics(const Ydb::Query::ExecuteQueryRequest& req) {
183+
switch (req.exec_mode()) {
184+
case Ydb::Query::EXEC_MODE_EXPLAIN:
185+
return true;
186+
187+
case Ydb::Query::EXEC_MODE_EXECUTE:
188+
switch (req.stats_mode()) {
189+
case Ydb::Query::StatsMode::STATS_MODE_FULL:
190+
case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
191+
return true;
192+
default:
193+
return false;
194+
}
195+
196+
default:
197+
return false;
198+
}
199+
}
200+
182201
class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
183202
public:
184203
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -284,6 +303,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
284303
req->pool_id());
285304

286305
ev->SetProgressStatsPeriod(TDuration::MilliSeconds(req->stats_period_ms()));
306+
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));
287307

288308
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
289309
NYql::TIssues issues;
@@ -403,6 +423,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
403423
if (NeedReportAst(*Request_->GetProtoRequest())) {
404424
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
405425
}
426+
if (NeedCollectDiagnostics(*Request_->GetProtoRequest())) {
427+
response.mutable_exec_stats()->set_query_meta(kqpResponse.GetQueryDiagnostics());
428+
}
406429
}
407430

408431
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {

ydb/core/grpc_services/rpc_execute_data_query.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ using namespace Ydb;
2727
using namespace Ydb::Table;
2828
using namespace NKqp;
2929

30+
bool NeedCollectDiagnostics(const Ydb::Table::ExecuteDataQueryRequest& req) {
31+
switch (req.collect_stats()) {
32+
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
33+
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
34+
return true;
35+
default:
36+
return false;
37+
}
38+
}
39+
3040
using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
3141
Ydb::Table::ExecuteDataQueryResponse>;
3242

@@ -147,6 +157,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
147157
req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
148158
req->has_operation_params() ? &req->operation_params() : nullptr);
149159

160+
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));
161+
150162
ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
151163

152164
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId());
@@ -166,6 +178,9 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
166178
if (from.HasQueryStats()) {
167179
FillQueryStats(*to->mutable_query_stats(), from);
168180
to->mutable_query_stats()->set_query_ast(from.GetQueryAst());
181+
if (from.HasQueryDiagnostics()) {
182+
to->mutable_query_stats()->set_query_meta(from.GetQueryDiagnostics());
183+
}
169184
return;
170185
}
171186
}

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,27 @@ bool NeedReportPlan(const Ydb::Table::ExecuteScanQueryRequest& req) {
9393
}
9494
}
9595

96+
bool NeedCollectDiagnostics(const Ydb::Table::ExecuteScanQueryRequest& req) {
97+
switch (req.mode()) {
98+
case ExecuteScanQueryRequest_Mode_MODE_EXPLAIN:
99+
return true;
100+
101+
case ExecuteScanQueryRequest_Mode_MODE_EXEC:
102+
switch (req.collect_stats()) {
103+
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
104+
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
105+
return true;
106+
default:
107+
break;
108+
}
109+
110+
return false;
111+
112+
default:
113+
return false;
114+
}
115+
}
116+
96117
bool CheckRequest(const Ydb::Table::ExecuteScanQueryRequest& req, TParseRequestError& error)
97118
{
98119
switch (req.mode()) {
@@ -228,7 +249,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
228249
nullptr
229250
);
230251

231-
ev->Record.MutableRequest()->SetCollectDiagnostics(req->Getcollect_full_diagnostics());
252+
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));
232253

233254
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
234255
NYql::TIssues issues;
@@ -291,6 +312,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
291312

292313
bool reportStats = NeedReportStats(*Request_->GetProtoRequest());
293314
bool reportPlan = reportStats && NeedReportPlan(*Request_->GetProtoRequest());
315+
bool collectDiagnostics = NeedCollectDiagnostics(*Request_->GetProtoRequest());
294316

295317
if (reportStats) {
296318
if (kqpResponse.HasQueryStats()) {
@@ -308,7 +330,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
308330
response.mutable_result()->mutable_query_stats()->set_query_ast(kqpResponse.GetQueryAst());
309331
}
310332

311-
response.mutable_result()->set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
333+
if (collectDiagnostics) {
334+
response.mutable_result()->mutable_query_stats()->set_query_meta(kqpResponse.GetQueryDiagnostics());
335+
}
312336

313337
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
314338
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());

ydb/core/kqp/common/compilation/events.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,14 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
126126
};
127127

128128
struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
129-
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {}, const std::optional<TString>& replayMessage = std::nullopt)
129+
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {})
130130
: CompileResult(compileResult)
131-
, ReplayMessage(replayMessage)
132131
, Orbit(std::move(orbit)) {
133132
}
134133

135134
TKqpCompileResult::TConstPtr CompileResult;
136135
TKqpStatsCompile Stats;
137136
std::optional<TString> ReplayMessage;
138-
std::optional<TString> ReplayMessageUserView;
139137

140138
NLWTrace::TOrbit Orbit;
141139
};

ydb/core/kqp/common/compilation/result.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,22 @@ struct TKqpCompileResult {
1616

1717
TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues,
1818
ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, TMaybe<TQueryAst> queryAst = {},
19-
bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
19+
bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
2020
: Status(status)
2121
, Issues(issues)
2222
, Query(std::move(query))
2323
, Uid(uid)
2424
, MaxReadType(maxReadType)
2525
, QueryAst(std::move(queryAst))
2626
, NeedToSplit(needToSplit)
27-
, CommandTagName(commandTagName) {}
27+
, CommandTagName(commandTagName)
28+
, ReplayMessageUserView(replayMessageUserView) {}
2829

2930
static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status,
3031
const NYql::TIssues& issues, ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {},
31-
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
32+
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
3233
{
33-
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName);
34+
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName, replayMessageUserView);
3435
}
3536

3637
std::shared_ptr<NYql::TAstParseResult> GetAst() const;
@@ -47,6 +48,8 @@ struct TKqpCompileResult {
4748
bool NeedToSplit = false;
4849
TMaybe<TString> CommandTagName = {};
4950

51+
TMaybe<TString> ReplayMessageUserView;
52+
5053
std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
5154
};
5255

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
353353

354354
replayMessage.InsertValue("query_id", Uid);
355355
replayMessage.InsertValue("version", "1.0");
356-
replayMessage.InsertValue("query_text", EscapeC(QueryId.Text));
357356
NJson::TJsonValue queryParameterTypes(NJson::JSON_MAP);
358357
if (QueryId.QueryParameterTypes) {
359358
for (const auto& [paramName, paramType] : *QueryId.QueryParameterTypes) {
@@ -365,7 +364,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
365364
replayMessage.InsertValue("query_syntax", ToString(Config->_KqpYqlSyntaxVersion.Get().GetRef()));
366365
replayMessage.InsertValue("query_database", QueryId.Database);
367366
replayMessage.InsertValue("query_cluster", QueryId.Cluster);
368-
replayMessage.InsertValue("query_plan", queryPlan);
369367
replayMessage.InsertValue("query_type", ToString(QueryId.Settings.QueryType));
370368

371369
if (CollectFullDiagnostics) {
@@ -380,6 +378,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
380378
ReplayMessageUserView = NJson::WriteJson(replayMessage, /*formatOutput*/ false);
381379
}
382380

381+
replayMessage.InsertValue("query_plan", queryPlan);
382+
replayMessage.InsertValue("query_text", EscapeC(QueryId.Text));
383383
replayMessage.InsertValue("table_metadata", TString(NJson::WriteJson(tablesMeta, false)));
384384
replayMessage.InsertValue("table_meta_serialization_type", EMetaSerializationType::EncodedProto);
385385

@@ -401,10 +401,12 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
401401
<< ", issues: " << KqpCompileResult->Issues.ToString()
402402
<< ", uid: " << KqpCompileResult->Uid);
403403

404+
if (ReplayMessageUserView) {
405+
KqpCompileResult->ReplayMessageUserView = std::move(*ReplayMessageUserView);
406+
}
404407
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(KqpCompileResult);
405408

406409
responseEv->ReplayMessage = std::move(ReplayMessage);
407-
responseEv->ReplayMessageUserView = std::move(ReplayMessageUserView);
408410
ReplayMessage = std::nullopt;
409411
ReplayMessageUserView = std::nullopt;
410412
auto& stats = responseEv->Stats;

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
610610

611611
if (compileResult->NeedToSplit) {
612612
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
613-
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
613+
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
614614
ProcessQueue(ctx);
615615
return;
616616
}
@@ -635,7 +635,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
635635
for (auto& request : requests) {
636636
LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
637637
Reply(request.Sender, compileResult, compileStats, ctx,
638-
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
638+
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan));
639639
}
640640
} else {
641641
if (!hasTempTablesNameClashes) {
@@ -647,7 +647,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
647647

648648
LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
649649
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
650-
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
650+
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
651651
}
652652
catch (const std::exception& e) {
653653
LogException("TEvCompileResponse", ev->Sender, e, ctx);
@@ -809,7 +809,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
809809
if (compileResult->GetAst() && QueryCache->FindByAst(query, *compileResult->GetAst(), keepInCache)) {
810810
return false;
811811
}
812-
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst);
812+
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst,
813+
false, {}, compileResult->ReplayMessageUserView);
813814
newCompileResult->AllowCache = compileResult->AllowCache;
814815
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
815816
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
@@ -865,7 +866,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
865866

866867
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
867868
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
868-
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
869+
NLWTrace::TOrbit orbit, NWilson::TSpan span)
869870
{
870871
const auto& query = compileResult->Query;
871872
LWTRACK(KqpCompileServiceReply,
@@ -878,7 +879,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
878879
<< ", queryUid: " << compileResult->Uid
879880
<< ", status:" << compileResult->Status);
880881

881-
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
882+
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit));
882883
responseEv->Stats = compileStats;
883884

884885
if (span) {

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,6 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
143143
return false;
144144
}
145145
Orbit = std::move(ev->Orbit);
146-
if (ev->ReplayMessage) {
147-
ReplayMessage = *ev->ReplayMessage;
148-
}
149146

150147
return true;
151148
}
@@ -160,6 +157,10 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr comp
160157
return false;
161158
}
162159

160+
if (compileResult->ReplayMessageUserView && GetCollectDiagnostics()) {
161+
ReplayMessage = *compileResult->ReplayMessageUserView;
162+
}
163+
163164
YQL_ENSURE(CompileResult->PreparedQuery);
164165
const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion();
165166
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2698,15 +2698,15 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
26982698
UNIT_ASSERT_C(value.IsMap(), "Incorrect Diagnostics");
26992699
UNIT_ASSERT_C(value.Has("query_id"), "Incorrect Diagnostics");
27002700
UNIT_ASSERT_C(value.Has("version"), "Incorrect Diagnostics");
2701-
UNIT_ASSERT_C(value.Has("query_text"), "Incorrect Diagnostics");
2701+
UNIT_ASSERT_C(!value.Has("query_text"), "Incorrect Diagnostics");
27022702
UNIT_ASSERT_C(value.Has("query_parameter_types"), "Incorrect Diagnostics");
27032703
UNIT_ASSERT_C(value.Has("table_metadata"), "Incorrect Diagnostics");
27042704
UNIT_ASSERT_C(value["table_metadata"].IsArray(), "Incorrect Diagnostics: table_metadata type should be an array");
27052705
UNIT_ASSERT_C(value.Has("created_at"), "Incorrect Diagnostics");
27062706
UNIT_ASSERT_C(value.Has("query_syntax"), "Incorrect Diagnostics");
27072707
UNIT_ASSERT_C(value.Has("query_database"), "Incorrect Diagnostics");
27082708
UNIT_ASSERT_C(value.Has("query_cluster"), "Incorrect Diagnostics");
2709-
UNIT_ASSERT_C(value.Has("query_plan"), "Incorrect Diagnostics");
2709+
UNIT_ASSERT_C(!value.Has("query_plan"), "Incorrect Diagnostics");
27102710
UNIT_ASSERT_C(value.Has("query_type"), "Incorrect Diagnostics");
27112711
}
27122712

ydb/core/kqp/ut/olap/helpers/query_executor.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
namespace NKikimr::NKqp {
77

8-
TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo /*= nullptr*/, NJson::TJsonValue* diagnostics /*= nullptr*/) {
8+
TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo /*= nullptr*/, NJson::TJsonValue* meta /*= nullptr*/) {
99
TVector<THashMap<TString, NYdb::TValue>> rows;
1010
if (statInfo) {
1111
*statInfo = NJson::JSON_NULL;
1212
}
13-
if (diagnostics) {
14-
*diagnostics = NJson::JSON_NULL;
13+
if (meta) {
14+
*meta = NJson::JSON_NULL;
1515
}
1616
for (;;) {
1717
auto streamPart = it.ReadNext().GetValueSync();
@@ -28,12 +28,10 @@ TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPar
2828
if (plan && statInfo) {
2929
UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo));
3030
}
31-
}
3231

33-
if (streamPart.HasDiagnostics()) {
34-
auto diagnosticsString = TString{streamPart.GetDiagnostics()};
35-
if (!diagnosticsString.empty() && diagnostics) {
36-
UNIT_ASSERT(NJson::ReadJsonFastTree(diagnosticsString, diagnostics));
32+
auto metaString = streamPart.GetQueryStats().GetMeta();
33+
if (metaString && !metaString->empty() && meta) {
34+
UNIT_ASSERT(NJson::ReadJsonFastTree(*metaString, meta));
3735
}
3836
}
3937

@@ -70,4 +68,4 @@ TVector<THashMap<TString, NYdb::TValue>> ExecuteScanQuery(NYdb::NTable::TTableCl
7068
return rows;
7169
}
7270

73-
}
71+
}

0 commit comments

Comments
 (0)