Skip to content

add stats for queries with errors #7753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,40 +204,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
);
}

bool LogStatsByLongTasks() const {
return Stats->CollectStatsByLongTasks && HasOlapTable;
}

void FillResponseStats(Ydb::StatusIds::StatusCode status) {
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);

if (Stats) {
ReportEventElapsedTime();

Stats->FinishTs = TInstant::Now();
Stats->Finish();

if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) {
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}
}

if (LogStatsByLongTasks()) {
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
if (!txPlansWithStats.empty()) {
LOG_N("Full stats: " << txPlansWithStats);
}
}

Stats.reset();
}
}

void Finalize() {
if (LocksBroken) {
TString message = "Transaction locks invalidated.";
Expand Down
146 changes: 58 additions & 88 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,86 +408,48 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}
}

YQL_ENSURE(Planner);
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);

switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId);
return;
}

case NYql::NDqProto::COMPUTE_STATE_FAILURE: {
ReplyErrorAndDie(NYql::NDq::DqStatusToYdbStatus(state.GetStatusCode()), state.MutableIssues());
return;
}

case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
// initial TEvState event from Compute Actor
// there can be race with RM answer
if (Planner) {
if (Planner->GetPendingComputeTasks().erase(taskId)) {
auto it = Planner->GetPendingComputeActors().emplace(computeActor, TProgressStat());
YQL_ENSURE(it.second);

if (state.HasStats()) {
it.first->second.Set(state.GetStats());
}

auto& task = TasksGraph.GetTask(taskId);
task.ComputeActorId = computeActor;

THashMap<TActorId, THashSet<ui64>> updates;
CollectTaskChannelsUpdates(task, updates);
PropagateChannelsUpdates(updates);
} else {
auto it = Planner->GetPendingComputeActors().find(computeActor);
if (it != Planner->GetPendingComputeActors().end()) {
if (state.HasStats()) {
it->second.Set(state.GetStats());
}
}
}
if (populateChannels) {
auto& task = TasksGraph.GetTask(taskId);
THashMap<TActorId, THashSet<ui64>> updates;
CollectTaskChannelsUpdates(task, updates);
PropagateChannelsUpdates(updates);
}
break;
}

case NYql::NDqProto::COMPUTE_STATE_FAILURE:
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
ExtraData[computeActor].Swap(state.MutableExtraData());
if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}
ExtraData[computeActor].Swap(state.MutableExtraData());

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();

if (Planner) {
auto it = Planner->GetPendingComputeActors().find(computeActor);
if (it == Planner->GetPendingComputeActors().end()) {
LOG_W("Got execution state for compute actor: " << computeActor
<< ", task: " << taskId
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
<< ", too early (waiting reply from RM)");

if (Planner && Planner->GetPendingComputeTasks().erase(taskId)) {
LOG_E("Got execution state for compute actor: " << computeActor
<< ", for unknown task: " << state.GetTaskId()
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()));
return;
}
} else {
if (state.HasStats()) {
it->second.Set(state.GetStats());
}
LastStats.emplace_back(std::move(it->second));
Planner->GetPendingComputeActors().erase(it);
YQL_ENSURE(Planner->GetPendingComputeTasks().find(taskId) == Planner->GetPendingComputeTasks().end());
}
}
YQL_ENSURE(Planner);
Planner->CompletedCA(taskId, computeActor);
}
}

if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
ReplyErrorAndDie(NYql::NDq::DqStatusToYdbStatus(state.GetStatusCode()), state.MutableIssues());
return;
}

static_cast<TDerived*>(this)->CheckExecutionComplete();
}

Expand Down Expand Up @@ -683,20 +645,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
auto taskId = startedTask.GetTaskId();
auto& task = TasksGraph.GetTask(taskId);

task.ComputeActorId = ActorIdFromProto(startedTask.GetActorId());

LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);

if (Planner) {
if (Planner->GetPendingComputeTasks().erase(taskId) == 0) {
LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished");
} else {
auto result = Planner->GetPendingComputeActors().emplace(std::make_pair(task.ComputeActorId, TProgressStat()));
YQL_ENSURE(result.second);

CollectTaskChannelsUpdates(task, channelsUpdates);
}
TActorId computeActorId = ActorIdFromProto(startedTask.GetActorId());
LOG_D("Executing task: " << taskId << " on compute actor: " << computeActorId);
YQL_ENSURE(Planner);
bool channelUpdates = Planner->AcknowledgeCA(taskId, computeActorId, nullptr);
if (channelUpdates) {
CollectTaskChannelsUpdates(task, channelsUpdates);
}

}

PropagateChannelsUpdates(channelsUpdates);
Expand Down Expand Up @@ -789,16 +745,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
LastResourceUsageUpdate = now;

TProgressStat::TEntry consumption;
if (Planner) {
for (const auto& p : Planner->GetPendingComputeActors()) {
const auto& t = p.second.GetLastUsage();
consumption += t;
}
}

for (const auto& p : LastStats) {
const auto& t = p.GetLastUsage();
consumption += t;
if (Planner) {
consumption += Planner->CalculateConsumptionUpdate();
}

auto ru = NRuCalc::CalcRequestUnit(consumption);
Expand All @@ -811,13 +760,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
return;

if (Planner) {
for (auto& p : Planner->GetPendingComputeActors()) {
p.second.Update();
}
}

for (auto& p : LastStats) {
p.Update();
Planner->ShiftConsumption();
}

if (Request.RlPath) {
Expand Down Expand Up @@ -1758,7 +1701,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
}

static_cast<TDerived*>(this)->FillResponseStats(Ydb::StatusIds::TIMEOUT);
FillResponseStats(Ydb::StatusIds::TIMEOUT);

// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
if (abortSender != Target) {
Expand All @@ -1775,6 +1718,34 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
this->PassAway();
}

void FillResponseStats(Ydb::StatusIds::StatusCode status) {
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);

if (Stats) {
ReportEventElapsedTime();

Stats->FinishTs = TInstant::Now();
Stats->Finish();

if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}
}

if (Stats->CollectStatsByLongTasks) {
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
if (!txPlansWithStats.empty()) {
LOG_N("Full stats: " << txPlansWithStats);
}
}
}
}

virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
{
Expand All @@ -1794,7 +1765,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
AlreadyReplied = true;
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);
FillResponseStats(status);

response.MutableIssues()->Swap(issues);

LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
Expand Down Expand Up @@ -1972,8 +1944,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
TActorId KqpShardsResolverId;
THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;

TVector<TProgressStat> LastStats;

TInstant StartResolveTime;
TInstant LastResourceUsageUpdate;

Expand Down
81 changes: 71 additions & 10 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)

TActorId* actorId = std::get_if<TActorId>(&startResult);
Y_ABORT_UNLESS(actorId);
task.ComputeActorId = *actorId;

LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);

auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
YQL_ENSURE(result.second);
AcknowledgeCA(taskId, *actorId, nullptr);
return TString();
}

Expand Down Expand Up @@ -479,8 +474,6 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
if (!result.empty()) {
return MakeActorStartFailureError(ExecuterId, result);
}

PendingComputeTasks.erase(taskId);
}
}
}
Expand Down Expand Up @@ -522,11 +515,79 @@ void TKqpPlanner::Unsubscribe() {
}
}

THashMap<TActorId, TProgressStat>& TKqpPlanner::GetPendingComputeActors() {
bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state) {
auto& task = TasksGraph.GetTask(taskId);
if (!task.ComputeActorId) {
task.ComputeActorId = computeActor;
PendingComputeTasks.erase(taskId);
auto [it, success] = PendingComputeActors.try_emplace(computeActor);
YQL_ENSURE(success);
if (state && state->HasStats()) {
it->second.Set(state->GetStats());
}

return true;
}

YQL_ENSURE(task.ComputeActorId == computeActor);
auto it = PendingComputeActors.find(computeActor);
if (!task.Meta.Completed) {
YQL_ENSURE(it != PendingComputeActors.end());
}

if (it != PendingComputeActors.end() && state && state->HasStats()) {
it->second.Set(state->GetStats());
}

return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unreachable?

}

void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
auto& task = TasksGraph.GetTask(taskId);
if (task.Meta.Completed) {
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Предлагаю проверить здесь инвариант YQL_ENSURE(!PendingComputeActors.contains(computeActor))

}

task.Meta.Completed = true;
auto it = PendingComputeActors.find(computeActor);
YQL_ENSURE(it != PendingComputeActors.end());
LastStats.emplace_back(std::move(it->second));
PendingComputeActors.erase(it);
return;
}

TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {
TProgressStat::TEntry consumption;

for (const auto& p : PendingComputeActors) {
const auto& t = p.second.GetLastUsage();
consumption += t;
}

for (const auto& p : LastStats) {
const auto& t = p.GetLastUsage();
consumption += t;
}

return consumption;
}

void TKqpPlanner::ShiftConsumption() {
for (auto& p : PendingComputeActors) {
p.second.Update();
}

for (auto& p : LastStats) {
p.Update();
}
}

const THashMap<TActorId, TProgressStat>& TKqpPlanner::GetPendingComputeActors() {
return PendingComputeActors;
}

THashSet<ui64>& TKqpPlanner::GetPendingComputeTasks() {
const THashSet<ui64>& TKqpPlanner::GetPendingComputeTasks() {
return PendingComputeTasks;
}

Expand Down
Loading
Loading