Skip to content

Commit 5c18e12

Browse files
authored
start using spilling in YDB (#5496)
1 parent cc73f2c commit 5c18e12

File tree

9 files changed

+33
-4
lines changed

9 files changed

+33
-4
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

+13-2
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1313
, std::shared_ptr<IKqpNodeState> state
1414
, ui64 txId
1515
, ui64 taskId
16-
, ui64 limit)
16+
, ui64 limit
17+
, ui64 reasonableSpillingTreshold)
1718
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
1819
, ResourceManager(std::move(resourceManager))
1920
, MemoryPool(memoryPool)
2021
, State(std::move(state))
2122
, TxId(txId)
2223
, TaskId(taskId)
24+
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
2325
{
2426
}
2527

@@ -42,6 +44,8 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
4244
return false;
4345
}
4446

47+
TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory;
48+
4549
return true;
4650
}
4751

@@ -51,6 +55,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
5155
);
5256
}
5357

58+
bool IsReasonableToUseSpilling() const override {
59+
return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
60+
}
61+
5462
void TerminateHandler(bool success, const NYql::TIssues& issues) {
5563
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
5664
("problem", "finish_compute_actor")
@@ -64,6 +72,8 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
6472
ui64 TxId;
6573
ui64 TaskId;
6674
bool Success = true;
75+
ui64 TotalQueryAllocationsSize = 0;
76+
ui64 ReasonableSpillingTreshold = 0;
6777
};
6878

6979
class TKqpCaFactory : public IKqpNodeComputeActorFactory {
@@ -123,7 +133,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
123133
std::move(state),
124134
txId,
125135
dqTask->GetId(),
126-
limit);
136+
limit,
137+
Config.GetReasonableSpillingTreshold());
127138

128139
auto runtimeSettings = settings;
129140
NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager;

ydb/core/kqp/counters/kqp_counters.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ void TKqpCounters::UpdateTxCounters(const TKqpTransactionInfo& txInfo,
726726
}
727727

728728
TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, const TActorContext* ctx)
729-
: NYql::NDq::TSpillingCounters(counters)
729+
: NYql::NDq::TSpillingCounters(GetServiceCounters(counters, "kqp"))
730730
, AllocCounters(counters, "kqp")
731731
{
732732
Counters = counters;

ydb/core/kqp/rm_service/kqp_rm_service.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ class TKqpResourceManager : public IKqpResourceManager {
308308

309309
auto& tx = txBucket.Txs[txId];
310310
ui64 txTotalRequestedMemory = tx.TxScanQueryMemory + resources.Memory;
311+
result.TotalAllocatedQueryMemory = txTotalRequestedMemory;
311312
if (txTotalRequestedMemory > queryMemoryLimit) {
312313
TStringBuilder reason;
313314
reason << "TxId: " << txId << ", taskId: " << taskId << ". Query memory limit exceeded: "

ydb/core/kqp/rm_service/kqp_rm_service.h

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct TKqpRMAllocateResult {
5151
bool Success = true;
5252
NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason Status = NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR;
5353
TString FailReason;
54+
ui64 TotalAllocatedQueryMemory = 0;
5455

5556
NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason GetStatus() const {
5657
return Status;

ydb/core/protos/table_service_config.proto

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ message TTableServiceConfig {
4242
optional TInfoExchangerSettings InfoExchangerSettings = 19;
4343
optional uint64 KqpPatternCachePatternAccessTimesBeforeTryToCompile = 20 [default = 5];
4444
optional uint64 KqpPatternCacheCompiledCapacityBytes = 21 [default = 104857600]; // 100 MiB
45+
optional uint64 ReasonableSpillingTreshold = 22 [default = 104857600]; // 100 MiB
4546
}
4647

4748
message TSpillingServiceConfig {

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

+4
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,10 @@ struct TGuaranteeQuotaManager : public IMemoryQuotaManager {
307307
return true;
308308
}
309309

310+
bool IsReasonableToUseSpilling() const override {
311+
return false;
312+
}
313+
310314
void FreeQuota(ui64 memorySize) override {
311315
Y_ABORT_UNLESS(Quota >= memorySize);
312316
Quota -= memorySize;

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ struct IMemoryQuotaManager {
6767
virtual void FreeQuota(ui64 memorySize) = 0;
6868
virtual ui64 GetCurrentQuota() const = 0;
6969
virtual ui64 GetMaxMemorySize() const = 0;
70+
virtual bool IsReasonableToUseSpilling() const = 0;
7071
};
7172

7273
// Source/transform.

ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ namespace NYql::NDq {
6565

6666
alloc->Ref().SetIncreaseMemoryLimitCallback([this, alloc](ui64 limit, ui64 required) {
6767
RequestExtraMemory(required - limit, alloc);
68-
68+
6969
ui64 currentRSS = NMemInfo::GetMemInfo().RSS;
7070
if (currentRSS > criticalRSSValue) {
7171
alloc->SetMaximumLimitValueReached(true);
@@ -150,6 +150,12 @@ namespace NYql::NDq {
150150
// << ", requested: " << memory << ", host: " << HostName();
151151
}
152152

153+
if (MemoryLimits.MemoryQuotaManager->IsReasonableToUseSpilling()) {
154+
alloc->SetMaximumLimitValueReached(true);
155+
} else {
156+
alloc->SetMaximumLimitValueReached(false);
157+
}
158+
153159
if (Y_UNLIKELY(ProfileStats)) {
154160
ProfileStats->MkqlExtraMemoryBytes += memory;
155161
ProfileStats->MkqlExtraMemoryRequests++;

ydb/library/yql/providers/dq/actors/worker_actor.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ class TDummyMemoryQuotaManager: public IMemoryQuotaManager {
8181
ui64 GetMaxMemorySize() const override {
8282
return std::numeric_limits<ui64>::max();
8383
}
84+
85+
bool IsReasonableToUseSpilling() const override {
86+
return false;
87+
}
8488
};
8589

8690
class TDqWorker: public TRichActor<TDqWorker>

0 commit comments

Comments
 (0)