Skip to content

Fix stats inside plan for full stats mode #8553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
}

auto& response = *ResponseEv->Record.MutableResponse();

FillResponseStats(Ydb::StatusIds::SUCCESS);
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

auto addLocks = [this](const auto& data) {
Expand Down Expand Up @@ -257,7 +255,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (LockHandle) {
ResponseEv->LockHandle = std::move(LockHandle);
}
BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
BuildLocks(*ResponseEv->Record.MutableResponse()->MutableResult()->MutableLocks(), Locks);
}

auto resultSize = ResponseEv->GetByteSize();
Expand All @@ -283,7 +281,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

ExecuterSpan.EndOk();

Request.Transactions.crop(0);
AlreadyReplied = true;
PassAway();
}
Expand Down
120 changes: 60 additions & 60 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

void ReportEventElapsedTime() {
if (Stats) {
ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;
Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros);
}
YQL_ENSURE(Stats);

ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;
Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros);
}

protected:
Expand Down Expand Up @@ -330,11 +330,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

YQL_ENSURE(channel.DstTask == 0);
YQL_ENSURE(Stats);

if (Stats) {
Stats->ResultBytes += batch.Size();
Stats->ResultRows += batch.RowCount();
}
Stats->ResultBytes += batch.Size();
Stats->ResultRows += batch.RowCount();

LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << task.Meta.ShardId
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
Expand Down Expand Up @@ -391,7 +390,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
<< ", stats: " << state.GetStats());

if (Stats && state.HasStats() && Request.ProgressStatsPeriod) {
YQL_ENSURE(Stats);

if (state.HasStats() && Request.ProgressStatsPeriod) {
Stats->UpdateTaskStats(taskId, state.GetStats());
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
Expand All @@ -418,13 +419,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
if (Planner->CompletedCA(taskId, computeActor)) {
ExtraData[computeActor].Swap(state.MutableExtraData());

if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
Expand Down Expand Up @@ -512,9 +511,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
auto now = TAppData::TimeProvider->Now();
StartResolveTime = now;

if (Stats) {
Stats->StartTs = now;
}
YQL_ENSURE(Stats);

Stats->StartTs = now;
}

TMaybe<size_t> FindReadRangesSource(const NKqpProto::TKqpPhyStage& stage) {
Expand Down Expand Up @@ -1167,8 +1166,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
: Nothing();

YQL_ENSURE(!shardsResolved || nodeId);
YQL_ENSURE(Stats);

if (shardId && Stats) {
if (shardId) {
Stats->AffectedShards.insert(*shardId);
}

Expand Down Expand Up @@ -1236,11 +1236,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

if (partitions.size() > 0 && source.GetSequentialInFlightShards() > 0 && partitions.size() > source.GetSequentialInFlightShards()) {
auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv());
if (Stats) {
for (auto& [shardId, _] : partitions) {
Stats->AffectedShards.insert(shardId);
}

YQL_ENSURE(Stats);

for (auto& [shardId, _] : partitions) {
Stats->AffectedShards.insert(shardId);
}

if (shardInfo.KeyReadRanges) {
addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards());
fillRangesForTasks();
Expand Down Expand Up @@ -1507,6 +1509,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
THashMap<ui64, ui64> assignedShardsCount;
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

YQL_ENSURE(Stats);

const auto& tableInfo = stageInfo.Meta.TableConstInfo;
const auto& keyTypes = tableInfo->KeyColumnTypes;
ui32 metaId = 0;
Expand Down Expand Up @@ -1535,7 +1539,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second)));
}

if (Stats && CollectProfileStats(Request.StatsMode)) {
if (CollectProfileStats(Request.StatsMode)) {
for (auto&& i : nodeShards) {
Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size());
}
Expand Down Expand Up @@ -1720,7 +1724,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
}

FillResponseStats(Ydb::StatusIds::TIMEOUT);
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::TIMEOUT);

// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
if (abortSender != Target) {
Expand All @@ -1730,38 +1734,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

LOG_E("Sending timeout response to: " << Target);

Request.Transactions.crop(0);
this->Shutdown();
}

void FillResponseStats(Ydb::StatusIds::StatusCode status) {
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);

if (Stats) {
ReportEventElapsedTime();

Stats->FinishTs = TInstant::Now();
Stats->Finish();

if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}
}

if (Stats->CollectStatsByLongTasks) {
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
if (!txPlansWithStats.empty()) {
LOG_N("Full stats: " << txPlansWithStats);
}
}
}
}

virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
{
Expand All @@ -1775,8 +1750,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
AlreadyReplied = true;
auto& response = *ResponseEv->Record.MutableResponse();

FillResponseStats(status);

response.SetStatus(status);
response.MutableIssues()->Swap(issues);

LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
Expand All @@ -1795,7 +1769,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ExecuterSpan.EndError(response.DebugString());
ExecuterStateSpan.EndError(response.DebugString());

Request.Transactions.crop(0);
this->Shutdown();
}

Expand Down Expand Up @@ -1873,8 +1846,35 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
void PassAway() override {
YQL_ENSURE(AlreadyReplied && ResponseEv);

// Actualize stats with the last stats from terminated CAs, but keep the status.
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
// Fill response stats
{
auto& response = *ResponseEv->Record.MutableResponse();

YQL_ENSURE(Stats);

ReportEventElapsedTime();

Stats->FinishTs = TInstant::Now();
Stats->Finish();

if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
response.MutableResult()->MutableStats()->ClearTxPlansWithStats();
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}
}

if (Stats->CollectStatsByLongTasks) {
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
if (!txPlansWithStats.empty()) {
LOG_N("Full stats: " << response.GetResult().GetStats());
}
}
}

Request.Transactions.crop(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

сначала crop, а потом Send. в request transaction AFAIK есть unboxed values и аллокатор, поэтому сначала нужно его зачистить а потом делать send, чтобы избежать обработки двумя акторами

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если в transaction есть UV, то почему нет баиндинга аллокатора?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если в transaction есть UV, то почему нет баиндинга аллокатора?

потому биндинг есть в деструкторе.

this->Send(Target, ResponseEv.release());

for (auto channelPair: ResultChannelProxies) {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,10 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
}

// checking whether the task is long

// TODO(ilezhankin): investigate - for some reason `task.FinishTimeMs` may be large (or small?)
// enough to result in an enormous duration - triggering the "long tasks" mode.

auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs());
bool longTask = taskDuration > collectLongTaskStatsTimeout;
if (longTask) {
Expand Down Expand Up @@ -1126,7 +1130,7 @@ void TQueryExecutionStats::Finish() {
Result->SetCpuTimeUs(Result->GetCpuTimeUs() + ExecuterCpuTime.MicroSeconds());
Result->SetDurationUs(FinishTs.MicroSeconds() - StartTs.MicroSeconds());

// Result->Result* feilds are (temporary?) commented out in proto due to lack of use
// Result->Result* fields are (temporary?) commented out in proto due to lack of use
//
// Result->SetResultBytes(ResultBytes);
// Result->SetResultRows(ResultRows);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
YQL_ENSURE(!AlreadyReplied);
AlreadyReplied = true;

FillResponseStats(Ydb::StatusIds::SUCCESS);
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);

LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2033,9 +2033,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,

newPlans.AppendValue(ReconstructQueryPlanRec(
plan.GetMapSafe().at("Plans").GetArraySafe()[0],
0,
planIndex,
precomputes,
0,
planIndex,
precomputes,
nodeCounter));

newPlans.AppendValue(lookupPlan);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,17 @@ class TTxState : public TAtomicRefCount<TTxState> {
}

TString ToString() const {
auto res = TStringBuilder() << "TxResourcesInfo{ "
auto res = TStringBuilder() << "TxResourcesInfo { "
<< "TxId: " << TxId
<< "Database: " << Database;
<< ", Database: " << Database;

if (!PoolId.empty()) {
res << ", PoolId: " << PoolId
<< ", MemoryPoolPercent: " << Sprintf("%.2f", MemoryPoolPercent);
}

res << ", memory initially granted resources: " << TxExternalDataQueryMemory.load()
<< ", extra allocations " << TxScanQueryMemory.load()
<< ", tx total allocations " << TxScanQueryMemory.load()
<< ", execution units: " << TxExecutionUnits.load()
<< ", started at: " << CreatedAt
<< " }";
Expand Down
6 changes: 4 additions & 2 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
void OnMemoryLimitExceptionHandler() {
TString memoryConsumptionDetails = MemoryLimits.MemoryQuotaManager->MemoryConsumptionDetails();
TStringBuilder failureReason = TStringBuilder()
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
<< "Mkql memory limit exceeded, allocated by task " << Task.GetId() << ": " << GetMkqlMemoryLimit()
<< ", host: " << HostName()
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory;

if (!memoryConsumptionDetails.empty()) {
failureReason << ", memory manager details: " << memoryConsumptionDetails;
failureReason << ", memory manager details for current node: " << memoryConsumptionDetails;
}

InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, failureReason);
Expand Down Expand Up @@ -1825,6 +1825,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}
}
}
} else {
// TODO: what should happen in this case?
}

static_cast<TDerived*>(this)->FillExtraStats(dst, last);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/protos/dq_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ message TDqExecutionStats {
uint64 CpuTimeUs = 1; // total cpu time, executer + compute actors + ...
uint64 DurationUs = 2; // execution wall time

// these feilds are never used, needs to be reviewed
// these fields are never used, needs to be reviewed
reserved 3; // uint64 ResultRows = 3;
reserved 4; // uint64 ResultBytes = 4;

Expand Down
Loading