Skip to content

Commit 7ad55b3

Browse files
gridnevvvitabyss7
authored andcommitted
add stats for queries with errors (ydb-platform#7753)
(cherry picked from commit 93998b8)
1 parent e0d1d96 commit 7ad55b3

9 files changed

+234
-182
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

-34
Original file line numberDiff line numberDiff line change
@@ -206,40 +206,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
206206
);
207207
}
208208

209-
bool LogStatsByLongTasks() const {
210-
return Stats->CollectStatsByLongTasks && HasOlapTable;
211-
}
212-
213-
void FillResponseStats(Ydb::StatusIds::StatusCode status) {
214-
auto& response = *ResponseEv->Record.MutableResponse();
215-
216-
response.SetStatus(status);
217-
218-
if (Stats) {
219-
ReportEventElapsedTime();
220-
221-
Stats->FinishTs = TInstant::Now();
222-
Stats->Finish();
223-
224-
if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) {
225-
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
226-
const auto& tx = Request.Transactions[txId].Body;
227-
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
228-
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
229-
}
230-
}
231-
232-
if (LogStatsByLongTasks()) {
233-
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
234-
if (!txPlansWithStats.empty()) {
235-
LOG_N("Full stats: " << txPlansWithStats);
236-
}
237-
}
238-
239-
Stats.reset();
240-
}
241-
}
242-
243209
void Finalize() {
244210
if (LocksBroken) {
245211
TString message = "Transaction locks invalidated.";

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+58-88
Original file line numberDiff line numberDiff line change
@@ -408,86 +408,48 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
408408
}
409409
}
410410

411+
YQL_ENSURE(Planner);
412+
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
413+
411414
switch (state.GetState()) {
412415
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
413416
YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId);
414417
return;
415418
}
416419

417-
case NYql::NDqProto::COMPUTE_STATE_FAILURE: {
418-
ReplyErrorAndDie(NYql::NDq::DqStatusToYdbStatus(state.GetStatusCode()), state.MutableIssues());
419-
return;
420-
}
421-
422420
case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
423-
// initial TEvState event from Compute Actor
424-
// there can be race with RM answer
425-
if (Planner) {
426-
if (Planner->GetPendingComputeTasks().erase(taskId)) {
427-
auto it = Planner->GetPendingComputeActors().emplace(computeActor, TProgressStat());
428-
YQL_ENSURE(it.second);
429-
430-
if (state.HasStats()) {
431-
it.first->second.Set(state.GetStats());
432-
}
433-
434-
auto& task = TasksGraph.GetTask(taskId);
435-
task.ComputeActorId = computeActor;
436-
437-
THashMap<TActorId, THashSet<ui64>> updates;
438-
CollectTaskChannelsUpdates(task, updates);
439-
PropagateChannelsUpdates(updates);
440-
} else {
441-
auto it = Planner->GetPendingComputeActors().find(computeActor);
442-
if (it != Planner->GetPendingComputeActors().end()) {
443-
if (state.HasStats()) {
444-
it->second.Set(state.GetStats());
445-
}
446-
}
447-
}
421+
if (populateChannels) {
422+
auto& task = TasksGraph.GetTask(taskId);
423+
THashMap<TActorId, THashSet<ui64>> updates;
424+
CollectTaskChannelsUpdates(task, updates);
425+
PropagateChannelsUpdates(updates);
448426
}
449427
break;
450428
}
451429

430+
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
452431
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432+
ExtraData[computeActor].Swap(state.MutableExtraData());
453433
if (Stats) {
454434
Stats->AddComputeActorStats(
455435
computeActor.NodeId(),
456436
std::move(*state.MutableStats()),
457437
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
458438
);
459439
}
460-
ExtraData[computeActor].Swap(state.MutableExtraData());
461440

462441
LastTaskId = taskId;
463442
LastComputeActorId = computeActor.ToString();
464-
465-
if (Planner) {
466-
auto it = Planner->GetPendingComputeActors().find(computeActor);
467-
if (it == Planner->GetPendingComputeActors().end()) {
468-
LOG_W("Got execution state for compute actor: " << computeActor
469-
<< ", task: " << taskId
470-
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
471-
<< ", too early (waiting reply from RM)");
472-
473-
if (Planner && Planner->GetPendingComputeTasks().erase(taskId)) {
474-
LOG_E("Got execution state for compute actor: " << computeActor
475-
<< ", for unknown task: " << state.GetTaskId()
476-
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()));
477-
return;
478-
}
479-
} else {
480-
if (state.HasStats()) {
481-
it->second.Set(state.GetStats());
482-
}
483-
LastStats.emplace_back(std::move(it->second));
484-
Planner->GetPendingComputeActors().erase(it);
485-
YQL_ENSURE(Planner->GetPendingComputeTasks().find(taskId) == Planner->GetPendingComputeTasks().end());
486-
}
487-
}
443+
YQL_ENSURE(Planner);
444+
Planner->CompletedCA(taskId, computeActor);
488445
}
489446
}
490447

448+
if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
449+
ReplyErrorAndDie(NYql::NDq::DqStatusToYdbStatus(state.GetStatusCode()), state.MutableIssues());
450+
return;
451+
}
452+
491453
static_cast<TDerived*>(this)->CheckExecutionComplete();
492454
}
493455

@@ -683,20 +645,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
683645
auto taskId = startedTask.GetTaskId();
684646
auto& task = TasksGraph.GetTask(taskId);
685647

686-
task.ComputeActorId = ActorIdFromProto(startedTask.GetActorId());
687-
688-
LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
689-
690-
if (Planner) {
691-
if (Planner->GetPendingComputeTasks().erase(taskId) == 0) {
692-
LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished");
693-
} else {
694-
auto result = Planner->GetPendingComputeActors().emplace(std::make_pair(task.ComputeActorId, TProgressStat()));
695-
YQL_ENSURE(result.second);
696-
697-
CollectTaskChannelsUpdates(task, channelsUpdates);
698-
}
648+
TActorId computeActorId = ActorIdFromProto(startedTask.GetActorId());
649+
LOG_D("Executing task: " << taskId << " on compute actor: " << computeActorId);
650+
YQL_ENSURE(Planner);
651+
bool channelUpdates = Planner->AcknowledgeCA(taskId, computeActorId, nullptr);
652+
if (channelUpdates) {
653+
CollectTaskChannelsUpdates(task, channelsUpdates);
699654
}
655+
700656
}
701657

702658
PropagateChannelsUpdates(channelsUpdates);
@@ -789,16 +745,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
789745
LastResourceUsageUpdate = now;
790746

791747
TProgressStat::TEntry consumption;
792-
if (Planner) {
793-
for (const auto& p : Planner->GetPendingComputeActors()) {
794-
const auto& t = p.second.GetLastUsage();
795-
consumption += t;
796-
}
797-
}
798748

799-
for (const auto& p : LastStats) {
800-
const auto& t = p.GetLastUsage();
801-
consumption += t;
749+
if (Planner) {
750+
consumption += Planner->CalculateConsumptionUpdate();
802751
}
803752

804753
auto ru = NRuCalc::CalcRequestUnit(consumption);
@@ -811,13 +760,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
811760
return;
812761

813762
if (Planner) {
814-
for (auto& p : Planner->GetPendingComputeActors()) {
815-
p.second.Update();
816-
}
817-
}
818-
819-
for (auto& p : LastStats) {
820-
p.Update();
763+
Planner->ShiftConsumption();
821764
}
822765

823766
if (Request.RlPath) {
@@ -1731,7 +1674,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17311674
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
17321675
}
17331676

1734-
static_cast<TDerived*>(this)->FillResponseStats(Ydb::StatusIds::TIMEOUT);
1677+
FillResponseStats(Ydb::StatusIds::TIMEOUT);
17351678

17361679
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17371680
if (abortSender != Target) {
@@ -1748,6 +1691,34 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17481691
this->PassAway();
17491692
}
17501693

1694+
void FillResponseStats(Ydb::StatusIds::StatusCode status) {
1695+
auto& response = *ResponseEv->Record.MutableResponse();
1696+
1697+
response.SetStatus(status);
1698+
1699+
if (Stats) {
1700+
ReportEventElapsedTime();
1701+
1702+
Stats->FinishTs = TInstant::Now();
1703+
Stats->Finish();
1704+
1705+
if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
1706+
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
1707+
const auto& tx = Request.Transactions[txId].Body;
1708+
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
1709+
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
1710+
}
1711+
}
1712+
1713+
if (Stats->CollectStatsByLongTasks) {
1714+
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
1715+
if (!txPlansWithStats.empty()) {
1716+
LOG_N("Full stats: " << txPlansWithStats);
1717+
}
1718+
}
1719+
}
1720+
}
1721+
17511722
virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
17521723
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
17531724
{
@@ -1767,7 +1738,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17671738
AlreadyReplied = true;
17681739
auto& response = *ResponseEv->Record.MutableResponse();
17691740

1770-
response.SetStatus(status);
1741+
FillResponseStats(status);
1742+
17711743
response.MutableIssues()->Swap(issues);
17721744

17731745
LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
@@ -1945,8 +1917,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
19451917
TActorId KqpShardsResolverId;
19461918
THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
19471919

1948-
TVector<TProgressStat> LastStats;
1949-
19501920
TInstant StartResolveTime;
19511921
TInstant LastResourceUsageUpdate;
19521922

ydb/core/kqp/executer_actor/kqp_planner.cpp

+71-10
Original file line numberDiff line numberDiff line change
@@ -443,12 +443,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
443443

444444
TActorId* actorId = std::get_if<TActorId>(&startResult);
445445
Y_ABORT_UNLESS(actorId);
446-
task.ComputeActorId = *actorId;
447-
448-
LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
449-
450-
auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
451-
YQL_ENSURE(result.second);
446+
AcknowledgeCA(taskId, *actorId, nullptr);
452447
return TString();
453448
}
454449

@@ -533,8 +528,6 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
533528
if (!result.empty()) {
534529
return MakeActorStartFailureError(ExecuterId, result);
535530
}
536-
537-
PendingComputeTasks.erase(taskId);
538531
}
539532
}
540533
}
@@ -576,11 +569,79 @@ void TKqpPlanner::Unsubscribe() {
576569
}
577570
}
578571

579-
THashMap<TActorId, TProgressStat>& TKqpPlanner::GetPendingComputeActors() {
572+
bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state) {
573+
auto& task = TasksGraph.GetTask(taskId);
574+
if (!task.ComputeActorId) {
575+
task.ComputeActorId = computeActor;
576+
PendingComputeTasks.erase(taskId);
577+
auto [it, success] = PendingComputeActors.try_emplace(computeActor);
578+
YQL_ENSURE(success);
579+
if (state && state->HasStats()) {
580+
it->second.Set(state->GetStats());
581+
}
582+
583+
return true;
584+
}
585+
586+
YQL_ENSURE(task.ComputeActorId == computeActor);
587+
auto it = PendingComputeActors.find(computeActor);
588+
if (!task.Meta.Completed) {
589+
YQL_ENSURE(it != PendingComputeActors.end());
590+
}
591+
592+
if (it != PendingComputeActors.end() && state && state->HasStats()) {
593+
it->second.Set(state->GetStats());
594+
}
595+
596+
return false;
597+
}
598+
599+
void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
600+
auto& task = TasksGraph.GetTask(taskId);
601+
if (task.Meta.Completed) {
602+
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
603+
return;
604+
}
605+
606+
task.Meta.Completed = true;
607+
auto it = PendingComputeActors.find(computeActor);
608+
YQL_ENSURE(it != PendingComputeActors.end());
609+
LastStats.emplace_back(std::move(it->second));
610+
PendingComputeActors.erase(it);
611+
return;
612+
}
613+
614+
TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {
615+
TProgressStat::TEntry consumption;
616+
617+
for (const auto& p : PendingComputeActors) {
618+
const auto& t = p.second.GetLastUsage();
619+
consumption += t;
620+
}
621+
622+
for (const auto& p : LastStats) {
623+
const auto& t = p.GetLastUsage();
624+
consumption += t;
625+
}
626+
627+
return consumption;
628+
}
629+
630+
void TKqpPlanner::ShiftConsumption() {
631+
for (auto& p : PendingComputeActors) {
632+
p.second.Update();
633+
}
634+
635+
for (auto& p : LastStats) {
636+
p.Update();
637+
}
638+
}
639+
640+
const THashMap<TActorId, TProgressStat>& TKqpPlanner::GetPendingComputeActors() {
580641
return PendingComputeActors;
581642
}
582643

583-
THashSet<ui64>& TKqpPlanner::GetPendingComputeTasks() {
644+
const THashSet<ui64>& TKqpPlanner::GetPendingComputeTasks() {
584645
return PendingComputeTasks;
585646
}
586647

0 commit comments

Comments
 (0)