Skip to content

YQ-3446 add queued time into query stats #6965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr<NOpt::TKqpOptim
return planJson.GetStringRobust();
}

TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "") {
TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") {
NJsonWriter::TBuf writer;
writer.SetIndentSpaces(2);

Expand All @@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NO
writer.BeginObject();
writer.WriteKey("Node Type").WriteString("Query");
writer.WriteKey("PlanNodeType").WriteString("Query");

if (queryStats) {
NJson::TJsonValue queryStatsJson;
NJson::ReadJsonTree(queryStats, &queryStatsJson, true);

writer.WriteKey("Stats");
writer.WriteJsonValue(&queryStatsJson);
}

writer.WriteKey("Plans");
writer.BeginList();

Expand Down Expand Up @@ -2717,7 +2726,27 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
txPlans.push_back(txPlan);
}
}
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>());

NJsonWriter::TBuf writer;
writer.BeginObject();

if (queryStats.HasCompilation()) {
const auto& compilation = queryStats.GetCompilation();

writer.WriteKey("Compilation");
writer.BeginObject();
writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache());
writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs());
writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs());
writer.EndObject();
}

writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
writer.EndObject();

return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());
}

TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TKqpQueryState : public TNonCopyable {
bool IsDocumentApiRestricted_ = false;

TInstant StartTime;
TInstant ContinueTime;
NYql::TKikimrQueryDeadlines QueryDeadlines;
TKqpQueryStats QueryStats;
bool KeepSession = false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
NKqpProto::TKqpStatsQuery result;
result.SetDurationUs(DurationUs);
result.SetQueuedTimeUs(QueuedTimeUs);

if (Compilation) {
result.MutableCompilation()->SetFromCache(Compilation->FromCache);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NKqp {

struct TKqpQueryStats {
ui64 DurationUs = 0;
ui64 QueuedTimeUs = 0;
std::optional<TKqpStatsCompile> Compilation;

ui64 WorkerCpuTimeUs = 0;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
YQL_ENSURE(QueryState);
QueryState->ContinueTime = TInstant::Now();

if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
LOG_T("Failed to place request in resource pool, feature flag is disabled");
Expand Down Expand Up @@ -1551,6 +1552,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
if (const auto continueTime = QueryState->ContinueTime) {
stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds();
}
if (QueryState->CompileResult) {
stats->Compilation.emplace();
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
UpdateConfigCounters(poolConfig);
}

void CollectRequestLatency(TInstant continueTime) {
if (continueTime) {
RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds());
}
}

void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) {
InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0));
QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0));
Expand Down Expand Up @@ -106,6 +112,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
const TActorId WorkerActorId;
const TString SessionId;
const TInstant StartTime = TInstant::Now();
TInstant ContinueTime;

EState State = EState::Pending;
bool Started = false; // after TEvContinueRequest success
Expand Down Expand Up @@ -267,6 +274,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
if (status == Ydb::StatusIds::SUCCESS) {
LocalInFlight++;
request->Started = true;
request->ContinueTime = TInstant::Now();
Counters.LocalInFly->Inc();
Counters.ContinueOk->Inc();
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Expand Down Expand Up @@ -387,7 +395,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {

if (status == Ydb::StatusIds::SUCCESS) {
Counters.CleanupOk->Inc();
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Counters.CollectRequestLatency(request->ContinueTime);
LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
} else {
Counters.CleanupError->Inc();
Expand All @@ -401,7 +409,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release());

Counters.Cancelled->Inc();
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Counters.CollectRequestLatency(request->ContinueTime);
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message TKqpExecutionExtraStats {
message TKqpStatsQuery {
// Basic stats
uint64 DurationUs = 1;
uint64 QueuedTimeUs = 9;
TKqpStatsCompile Compilation = 2;

reserved 3; // repeated TKqpStatsExecution Executions = 3;
Expand Down
Loading