Skip to content

Commit 35c8dfe

Browse files
authored
Fix stats inside plan for full stats mode (#8553)
1 parent 78242bd commit 35c8dfe

File tree

8 files changed

+79
-76
lines changed

8 files changed

+79
-76
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+2-5
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
215215
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
216216
}
217217

218-
auto& response = *ResponseEv->Record.MutableResponse();
219-
220-
FillResponseStats(Ydb::StatusIds::SUCCESS);
218+
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
221219
Counters->TxProxyMon->ReportStatusOK->Inc();
222220

223221
auto addLocks = [this](const auto& data) {
@@ -257,7 +255,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
257255
if (LockHandle) {
258256
ResponseEv->LockHandle = std::move(LockHandle);
259257
}
260-
BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
258+
BuildLocks(*ResponseEv->Record.MutableResponse()->MutableResult()->MutableLocks(), Locks);
261259
}
262260

263261
auto resultSize = ResponseEv->GetByteSize();
@@ -283,7 +281,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
283281

284282
ExecuterSpan.EndOk();
285283

286-
Request.Transactions.crop(0);
287284
AlreadyReplied = true;
288285
PassAway();
289286
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+60-60
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
177177
}
178178

179179
void ReportEventElapsedTime() {
180-
if (Stats) {
181-
ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;
182-
Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros);
183-
}
180+
YQL_ENSURE(Stats);
181+
182+
ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;
183+
Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros);
184184
}
185185

186186
protected:
@@ -330,11 +330,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
330330
}
331331

332332
YQL_ENSURE(channel.DstTask == 0);
333+
YQL_ENSURE(Stats);
333334

334-
if (Stats) {
335-
Stats->ResultBytes += batch.Size();
336-
Stats->ResultRows += batch.RowCount();
337-
}
335+
Stats->ResultBytes += batch.Size();
336+
Stats->ResultRows += batch.RowCount();
338337

339338
LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << task.Meta.ShardId
340339
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
@@ -391,7 +390,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
391390
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
392391
<< ", stats: " << state.GetStats());
393392

394-
if (Stats && state.HasStats() && Request.ProgressStatsPeriod) {
393+
YQL_ENSURE(Stats);
394+
395+
if (state.HasStats() && Request.ProgressStatsPeriod) {
395396
Stats->UpdateTaskStats(taskId, state.GetStats());
396397
auto now = TInstant::Now();
397398
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
@@ -418,13 +419,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
418419
if (Planner->CompletedCA(taskId, computeActor)) {
419420
ExtraData[computeActor].Swap(state.MutableExtraData());
420421

421-
if (Stats) {
422-
Stats->AddComputeActorStats(
423-
computeActor.NodeId(),
424-
std::move(*state.MutableStats()),
425-
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
426-
);
427-
}
422+
Stats->AddComputeActorStats(
423+
computeActor.NodeId(),
424+
std::move(*state.MutableStats()),
425+
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
426+
);
428427

429428
LastTaskId = taskId;
430429
LastComputeActorId = computeActor.ToString();
@@ -512,9 +511,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
512511
auto now = TAppData::TimeProvider->Now();
513512
StartResolveTime = now;
514513

515-
if (Stats) {
516-
Stats->StartTs = now;
517-
}
514+
YQL_ENSURE(Stats);
515+
516+
Stats->StartTs = now;
518517
}
519518

520519
TMaybe<size_t> FindReadRangesSource(const NKqpProto::TKqpPhyStage& stage) {
@@ -1167,8 +1166,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
11671166
: Nothing();
11681167

11691168
YQL_ENSURE(!shardsResolved || nodeId);
1169+
YQL_ENSURE(Stats);
11701170

1171-
if (shardId && Stats) {
1171+
if (shardId) {
11721172
Stats->AffectedShards.insert(*shardId);
11731173
}
11741174

@@ -1236,11 +1236,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
12361236

12371237
if (partitions.size() > 0 && source.GetSequentialInFlightShards() > 0 && partitions.size() > source.GetSequentialInFlightShards()) {
12381238
auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv());
1239-
if (Stats) {
1240-
for (auto& [shardId, _] : partitions) {
1241-
Stats->AffectedShards.insert(shardId);
1242-
}
1239+
1240+
YQL_ENSURE(Stats);
1241+
1242+
for (auto& [shardId, _] : partitions) {
1243+
Stats->AffectedShards.insert(shardId);
12431244
}
1245+
12441246
if (shardInfo.KeyReadRanges) {
12451247
addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards());
12461248
fillRangesForTasks();
@@ -1507,6 +1509,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
15071509
THashMap<ui64, ui64> assignedShardsCount;
15081510
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
15091511

1512+
YQL_ENSURE(Stats);
1513+
15101514
const auto& tableInfo = stageInfo.Meta.TableConstInfo;
15111515
const auto& keyTypes = tableInfo->KeyColumnTypes;
15121516
ui32 metaId = 0;
@@ -1535,7 +1539,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
15351539
nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second)));
15361540
}
15371541

1538-
if (Stats && CollectProfileStats(Request.StatsMode)) {
1542+
if (CollectProfileStats(Request.StatsMode)) {
15391543
for (auto&& i : nodeShards) {
15401544
Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size());
15411545
}
@@ -1720,7 +1724,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17201724
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
17211725
}
17221726

1723-
FillResponseStats(Ydb::StatusIds::TIMEOUT);
1727+
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::TIMEOUT);
17241728

17251729
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17261730
if (abortSender != Target) {
@@ -1730,38 +1734,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17301734

17311735
LOG_E("Sending timeout response to: " << Target);
17321736

1733-
Request.Transactions.crop(0);
17341737
this->Shutdown();
17351738
}
17361739

1737-
void FillResponseStats(Ydb::StatusIds::StatusCode status) {
1738-
auto& response = *ResponseEv->Record.MutableResponse();
1739-
1740-
response.SetStatus(status);
1741-
1742-
if (Stats) {
1743-
ReportEventElapsedTime();
1744-
1745-
Stats->FinishTs = TInstant::Now();
1746-
Stats->Finish();
1747-
1748-
if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
1749-
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
1750-
const auto& tx = Request.Transactions[txId].Body;
1751-
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
1752-
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
1753-
}
1754-
}
1755-
1756-
if (Stats->CollectStatsByLongTasks) {
1757-
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
1758-
if (!txPlansWithStats.empty()) {
1759-
LOG_N("Full stats: " << txPlansWithStats);
1760-
}
1761-
}
1762-
}
1763-
}
1764-
17651740
virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
17661741
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
17671742
{
@@ -1775,8 +1750,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17751750
AlreadyReplied = true;
17761751
auto& response = *ResponseEv->Record.MutableResponse();
17771752

1778-
FillResponseStats(status);
1779-
1753+
response.SetStatus(status);
17801754
response.MutableIssues()->Swap(issues);
17811755

17821756
LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
@@ -1795,7 +1769,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17951769
ExecuterSpan.EndError(response.DebugString());
17961770
ExecuterStateSpan.EndError(response.DebugString());
17971771

1798-
Request.Transactions.crop(0);
17991772
this->Shutdown();
18001773
}
18011774

@@ -1873,8 +1846,35 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18731846
void PassAway() override {
18741847
YQL_ENSURE(AlreadyReplied && ResponseEv);
18751848

1876-
// Actualize stats with the last stats from terminated CAs, but keep the status.
1877-
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
1849+
// Fill response stats
1850+
{
1851+
auto& response = *ResponseEv->Record.MutableResponse();
1852+
1853+
YQL_ENSURE(Stats);
1854+
1855+
ReportEventElapsedTime();
1856+
1857+
Stats->FinishTs = TInstant::Now();
1858+
Stats->Finish();
1859+
1860+
if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
1861+
response.MutableResult()->MutableStats()->ClearTxPlansWithStats();
1862+
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
1863+
const auto& tx = Request.Transactions[txId].Body;
1864+
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
1865+
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
1866+
}
1867+
}
1868+
1869+
if (Stats->CollectStatsByLongTasks) {
1870+
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
1871+
if (!txPlansWithStats.empty()) {
1872+
LOG_N("Full stats: " << response.GetResult().GetStats());
1873+
}
1874+
}
1875+
}
1876+
1877+
Request.Transactions.crop(0);
18781878
this->Send(Target, ResponseEv.release());
18791879

18801880
for (auto channelPair: ResultChannelProxies) {

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,10 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
701701
}
702702

703703
// checking whether the task is long
704+
705+
// TODO(ilezhankin): investigate - for some reason `task.FinishTimeMs` may be large (or small?)
706+
// enough to result in an enormous duration - triggering the "long tasks" mode.
707+
704708
auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs());
705709
bool longTask = taskDuration > collectLongTaskStatsTimeout;
706710
if (longTask) {
@@ -1126,7 +1130,7 @@ void TQueryExecutionStats::Finish() {
11261130
Result->SetCpuTimeUs(Result->GetCpuTimeUs() + ExecuterCpuTime.MicroSeconds());
11271131
Result->SetDurationUs(FinishTs.MicroSeconds() - StartTs.MicroSeconds());
11281132

1129-
// Result->Result* feilds are (temporary?) commented out in proto due to lack of use
1133+
// Result->Result* fields are (temporary?) commented out in proto due to lack of use
11301134
//
11311135
// Result->SetResultBytes(ResultBytes);
11321136
// Result->SetResultRows(ResultRows);

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
276276
YQL_ENSURE(!AlreadyReplied);
277277
AlreadyReplied = true;
278278

279-
FillResponseStats(Ydb::StatusIds::SUCCESS);
279+
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
280280

281281
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
282282

ydb/core/kqp/opt/kqp_query_plan.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -2101,9 +2101,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
21012101

21022102
newPlans.AppendValue(ReconstructQueryPlanRec(
21032103
plan.GetMapSafe().at("Plans").GetArraySafe()[0],
2104-
0,
2105-
planIndex,
2106-
precomputes,
2104+
0,
2105+
planIndex,
2106+
precomputes,
21072107
nodeCounter));
21082108

21092109
newPlans.AppendValue(lookupPlan);

ydb/core/kqp/rm_service/kqp_rm_service.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,17 @@ class TTxState : public TAtomicRefCount<TTxState> {
139139
}
140140

141141
TString ToString() const {
142-
auto res = TStringBuilder() << "TxResourcesInfo{ "
142+
auto res = TStringBuilder() << "TxResourcesInfo { "
143143
<< "TxId: " << TxId
144-
<< "Database: " << Database;
144+
<< ", Database: " << Database;
145145

146146
if (!PoolId.empty()) {
147147
res << ", PoolId: " << PoolId
148148
<< ", MemoryPoolPercent: " << Sprintf("%.2f", MemoryPoolPercent);
149149
}
150150

151151
res << ", memory initially granted resources: " << TxExternalDataQueryMemory.load()
152-
<< ", extra allocations " << TxScanQueryMemory.load()
152+
<< ", tx total allocations " << TxScanQueryMemory.load()
153153
<< ", execution units: " << TxExecutionUnits.load()
154154
<< ", started at: " << CreatedAt
155155
<< " }";

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
313313
void OnMemoryLimitExceptionHandler() {
314314
TString memoryConsumptionDetails = MemoryLimits.MemoryQuotaManager->MemoryConsumptionDetails();
315315
TStringBuilder failureReason = TStringBuilder()
316-
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
316+
<< "Mkql memory limit exceeded, allocated by task " << Task.GetId() << ": " << GetMkqlMemoryLimit()
317317
<< ", host: " << HostName()
318318
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory;
319319

320320
if (!memoryConsumptionDetails.empty()) {
321-
failureReason << ", memory manager details: " << memoryConsumptionDetails;
321+
failureReason << ", memory manager details for current node: " << memoryConsumptionDetails;
322322
}
323323

324324
InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, failureReason);
@@ -1825,6 +1825,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
18251825
}
18261826
}
18271827
}
1828+
} else {
1829+
// TODO: what should happen in this case?
18281830
}
18291831

18301832
static_cast<TDerived*>(this)->FillExtraStats(dst, last);

ydb/library/yql/dq/actors/protos/dq_stats.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ message TDqExecutionStats {
377377
uint64 CpuTimeUs = 1; // total cpu time, executer + compute actors + ...
378378
uint64 DurationUs = 2; // execution wall time
379379

380-
// these feilds are never used, needs to be reviewed
380+
// these fields are never used, needs to be reviewed
381381
reserved 3; // uint64 ResultRows = 3;
382382
reserved 4; // uint64 ResultBytes = 4;
383383

0 commit comments

Comments
 (0)