Skip to content

Backport everything about blocks and stats #8972

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ __pycache__/
*.pb.h
*.pb.cc

# Other generated
*.fbs.h

# MacOS specific
.DS_Store

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
}

void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType,
bool allowCache) {
bool allowCache, bool success) {
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
preparingQuery.release(), AppData()->FunctionRegistry);
preparingQuery.release(), AppData()->FunctionRegistry, !success);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()) && allowCache;
Expand Down Expand Up @@ -503,7 +503,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

if (status == Ydb::StatusIds::SUCCESS) {
YQL_ENSURE(kqpResult.PreparingQuery);
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache);
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache, true);

auto now = TInstant::Now();
auto duration = now - StartTime;
Expand All @@ -514,7 +514,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", duration: " << duration);
} else {
if (kqpResult.PreparingQuery) {
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache);
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache, false);
}

LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"
Expand Down
117 changes: 74 additions & 43 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,51 +206,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
);
}

bool LogStatsByLongTasks() const {
return Stats->CollectStatsByLongTasks && HasOlapTable;
}

void FillResponseStats(Ydb::StatusIds::StatusCode status) {
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);

if (Stats) {
ReportEventElapsedTime();

Stats->FinishTs = TInstant::Now();
Stats->Finish();

if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) {
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}
}

if (LogStatsByLongTasks()) {
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
if (!txPlansWithStats.empty()) {
LOG_N("Full stats: " << txPlansWithStats);
}
}

Stats.reset();
}
}

void Finalize() {
YQL_ENSURE(!AlreadyReplied);

if (LocksBroken) {
TString message = "Transaction locks invalidated.";

return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
}

auto& response = *ResponseEv->Record.MutableResponse();

FillResponseStats(Ydb::StatusIds::SUCCESS);
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

auto addLocks = [this](const auto& data) {
Expand Down Expand Up @@ -287,7 +253,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (LockHandle) {
ResponseEv->LockHandle = std::move(LockHandle);
}
BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
BuildLocks(*ResponseEv->Record.MutableResponse()->MutableResult()->MutableLocks(), Locks);
}

auto resultSize = ResponseEv->GetByteSize();
Expand All @@ -313,9 +279,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

ExecuterSpan.EndOk();

Request.Transactions.crop(0);
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
Send(Target, ResponseEv.release());
AlreadyReplied = true;
PassAway();
}

Expand Down Expand Up @@ -355,6 +319,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return "WaitSnapshotState";
} else if (func == &TThis::WaitResolveState) {
return "WaitResolveState";
} else if (func == &TThis::WaitShutdownState) {
return "WaitShutdownState";
} else {
return TBase::CurrentStateFuncName();
}
Expand Down Expand Up @@ -562,7 +528,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
CancelProposal(0);
}
HandleComputeStats(ev);
HandleComputeState(ev);
}

void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
Expand Down Expand Up @@ -1041,7 +1007,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
Expand Down Expand Up @@ -2631,6 +2597,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

void Shutdown() override {
if (Planner) {
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
LOG_I("Shutdown immediately - nothing to wait");
PassAway();
} else {
this->Become(&TThis::WaitShutdownState);
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
<< Planner->GetPendingComputeActors().size() << " compute actors");
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
}
} else {
PassAway();
}
}

void PassAway() override {
auto totalTime = TInstant::Now() - StartTime;
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
Expand All @@ -2648,6 +2631,54 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TBase::PassAway();
}

STATEFN(WaitShutdownState) {
switch(ev->GetTypeRewrite()) {
hFunc(TEvDqCompute::TEvState, HandleShutdown);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
hFunc(TEvents::TEvPoison, HandleShutdown);
default:
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
}
}

void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
HandleComputeStats(ev);

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
const auto nodeId = ev->Get()->NodeId;
LOG_N("Node has disconnected while shutdown: " << nodeId);

YQL_ENSURE(Planner);

for (const auto& task : TasksGraph.GetTasks()) {
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
if (task.ComputeActorId) {
Planner->CompletedCA(task.Id, task.ComputeActorId);
} else {
Planner->TaskNotStarted(task.Id);
}
}
}

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
// Self-poison means timeout - don't wait anymore.
LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown");

if (ev->Sender == SelfId()) {
PassAway();
}
}

private:
void ReplyTxStateUnknown(ui64 shardId) {
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;
Expand Down
Loading
Loading