Skip to content

Commit 4ba2617

Browse files
authored
cleanup resource manager & node service (#4624)
1 parent 665c03b commit 4ba2617

8 files changed

+235
-435
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

+35-57
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,8 @@ NKqpNode::TState& GetStateBucketByTx(std::shared_ptr<TBucketArray> buckets, ui64
6060
}
6161

6262
void FinishKqpTask(ui64 txId, ui64 taskId, bool success, NKqpNode::TState& bucket, std::shared_ptr<NRm::IKqpResourceManager> ResourceManager) {
63-
auto ctx = bucket.RemoveTask(txId, taskId, success);
64-
if (ctx) {
65-
ResourceManager->FreeExecutionUnits(1);
66-
if (ctx->ComputeActorsNumber == 0) {
67-
ResourceManager->FreeResources(txId);
68-
} else {
69-
ResourceManager->FreeResources(txId, taskId);
70-
}
71-
}
63+
bucket.RemoveTask(txId, taskId, success);
64+
ResourceManager->FreeResources(txId, taskId);
7265
}
7366

7467
struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
@@ -86,7 +79,8 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
8679
, Buckets(std::move(buckets))
8780
, TxId(txId)
8881
, TaskId(taskId)
89-
, InstantAlloc(instantAlloc) {
82+
, InstantAlloc(instantAlloc)
83+
{
9084
}
9185

9286
~TMemoryQuotaManager() override {
@@ -100,8 +94,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
10094
return false;
10195
}
10296

103-
if (!ResourceManager->AllocateResources(TxId, TaskId,
104-
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize})) {
97+
auto result = ResourceManager->AllocateResources(TxId, TaskId,
98+
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});
99+
100+
if (!result) {
105101
LOG_W("Can not allocate memory. TxId: " << TxId << ", taskId: " << TaskId << ", memory: +" << extraSize);
106102
return false;
107103
}
@@ -292,8 +288,14 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
292288
LOG_D("TxId: " << txId << ", new compute tasks request from " << requester
293289
<< " with " << msg.GetTasks().size() << " tasks: " << TasksIdsStr(msg.GetTasks()));
294290

295-
NKqpNode::TTasksRequest request;
296-
request.Executer = ActorIdFromProto(msg.GetExecuterActorId());
291+
auto now = TAppData::TimeProvider->Now();
292+
NKqpNode::TTasksRequest request(txId, ev->Sender, now);
293+
auto& msgRtSettings = msg.GetRuntimeSettings();
294+
if (msgRtSettings.GetTimeoutMs() > 0) {
295+
// compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer
296+
auto timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs());
297+
request.Deadline = now + timeout + /* gap */ TDuration::Seconds(5);
298+
}
297299

298300
auto& bucket = GetStateBucketByTx(Buckets, txId);
299301

@@ -311,16 +313,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
311313
memoryPool = NRm::EKqpMemoryPool::Unspecified;
312314
}
313315

314-
size_t executionUnits = msg.GetTasks().size();
315-
if (!ResourceManager()->AllocateExecutionUnits(executionUnits)) {
316-
Counters->RmNotEnoughComputeActors->Inc();
317-
TStringBuilder error;
318-
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough compute actors, requested " << msg.GetTasks().size();
319-
LOG_N(error);
320-
ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_EXECUTION_UNITS, error);
321-
return;
322-
}
323-
324316
ui32 requestChannels = 0;
325317
for (auto& dqTask : *msg.MutableTasks()) {
326318
auto estimation = EstimateTaskResources(dqTask, Config, msg.GetTasks().size());
@@ -348,20 +340,20 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
348340
for (auto& task : request.InFlyTasks) {
349341
NRm::TKqpResourcesRequest resourcesRequest;
350342
resourcesRequest.MemoryPool = memoryPool;
343+
resourcesRequest.ExecutionUnits = 1;
351344

352345
// !!!!!!!!!!!!!!!!!!!!!
353346
// we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing.
354347
resourcesRequest.Memory = Min<double>(task.second.Memory, 1 << 19) /* 512kb limit for check that memory exists for processing with minimal requirements */;
355348

356-
NRm::TKqpNotEnoughResources resourcesResponse;
357-
if (!ResourceManager()->AllocateResources(txId, task.first, resourcesRequest, &resourcesResponse)) {
349+
auto result = ResourceManager()->AllocateResources(txId, task.first, resourcesRequest);
350+
351+
if (!result) {
358352
for (ui64 taskId : allocatedTasks) {
359353
ResourceManager()->FreeResources(txId, taskId);
360354
}
361355

362-
ResourceManager()->FreeExecutionUnits(executionUnits);
363-
364-
ReplyError(txId, request.Executer, msg, resourcesResponse.GetStatus(), resourcesResponse.GetFailReason());
356+
ReplyError(txId, request.Executer, msg, result.GetStatus(), result.GetFailReason());
365357
return;
366358
}
367359

@@ -377,14 +369,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
377369
memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit();
378370

379371
NYql::NDq::TComputeRuntimeSettings runtimeSettingsBase;
380-
auto& msgRtSettings = msg.GetRuntimeSettings();
381-
if (msgRtSettings.GetTimeoutMs() > 0) {
382-
// compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer
383-
auto timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs());
384-
request.Deadline = TAppData::TimeProvider->Now() + timeout + /* gap */ TDuration::Seconds(5);
385-
bucket.InsertExpiringRequest(request.Deadline, txId, requester);
386-
}
387-
388372
runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool;
389373
runtimeSettingsBase.FailOnUndelivery = msgRtSettings.GetExecType() != NYql::NDqProto::TComputeRuntimeSettings::SCAN;
390374

@@ -502,7 +486,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
502486

503487
Counters->NodeServiceProcessTime->Collect(NHPTimer::GetTimePassed(&workHandlerStart) * SecToUsec);
504488

505-
bucket.NewRequest(txId, requester, std::move(request), memoryPool);
489+
bucket.NewRequest(std::move(request));
506490
}
507491

508492
// used only for unit tests
@@ -522,36 +506,30 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
522506
Counters->NodeServiceProcessCancelTime->Collect(timer.Passed() * SecToUsec);
523507
}
524508

525-
void TerminateTx(ui64 txId, const TString& reason) {
509+
void TerminateTx(ui64 txId, const TString& reason, NYql::NDqProto::StatusIds_StatusCode status = NYql::NDqProto::StatusIds::UNSPECIFIED) {
526510
auto& bucket = GetStateBucketByTx(Buckets, txId);
527511
auto tasksToAbort = bucket.GetTasksByTxId(txId);
528512

529513
if (!tasksToAbort.empty()) {
514+
TStringBuilder finalReason;
515+
finalReason << "node service cancelled the task, because it " << reason
516+
<< ", NodeId: "<< SelfId().NodeId()
517+
<< ", TxId: " << txId;
518+
519+
LOG_E(finalReason);
530520
for (const auto& [taskId, computeActorId]: tasksToAbort) {
531-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED,
532-
reason);
533-
Send(computeActorId, abortEv.Release());
521+
auto abortEv = std::make_unique<TEvKqp::TEvAbortExecution>(status, reason);
522+
Send(computeActorId, abortEv.release());
534523
}
535524
}
536525
}
537526

538527
void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
539528
Schedule(TDuration::Seconds(1), ev->Release().Release());
540-
std::vector<ui64> txIdsToFree;
541529
for (auto& bucket : *Buckets) {
542530
auto expiredRequests = bucket.ClearExpiredRequests();
543531
for (auto& cxt : expiredRequests) {
544-
LOG_D("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester
545-
<< ", execution timeout, request: " << cxt.Exists);
546-
if (!cxt.Exists) {
547-
// it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer
548-
LOG_I("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester
549-
<< ", unknown request");
550-
continue;
551-
}
552-
// don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order:
553-
// KqpProxy -> SessionActor -> Executer -> ComputeActor
554-
ResourceManager()->FreeResources(cxt.RequestId.TxId);
532+
TerminateTx(cxt.TxId, "reached execution deadline", NYql::NDqProto::StatusIds::TIMEOUT);
555533
}
556534
}
557535
}
@@ -619,9 +597,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
619597
switch (ev->Get()->SourceType) {
620598
case TEvKqpNode::TEvStartKqpTasksResponse::EventType: {
621599
ui64 txId = ev->Cookie;
622-
LOG_E("TxId: " << txId << ", executer lost: " << (int) ev->Get()->Reason);
623-
624-
TerminateTx(txId, "executer lost");
600+
TStringBuilder reason;
601+
reason << "executer lost: " << (int) ev->Get()->Reason;
602+
TerminateTx(txId, reason, NYql::NDqProto::StatusIds::ABORTED);
625603
break;
626604
}
627605

0 commit comments

Comments
 (0)