Skip to content

Commit 8d0f151

Browse files
disk usage limit for CS compaction (in general slider limit) (#4864)
1 parent f2808e8 commit 8d0f151

24 files changed

+449
-5
lines changed

ydb/core/base/events.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ struct TKikimrEvents : TEvents {
178178
ES_REPLICATION_SERVICE,
179179
ES_BACKUP_SERVICE,
180180
ES_TX_BACKGROUND,
181-
ES_SS_BG_TASKS
181+
ES_SS_BG_TASKS,
182+
ES_LIMITER
182183
};
183184
};
184185

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,12 @@
176176
#include <ydb/services/metadata/ds_table/service.h>
177177
#include <ydb/services/metadata/service.h>
178178

179-
#include <ydb/core/tx/conveyor/usage/config.h>
180179
#include <ydb/core/tx/conveyor/service/service.h>
180+
#include <ydb/core/tx/conveyor/usage/config.h>
181181
#include <ydb/core/tx/conveyor/usage/service.h>
182+
#include <ydb/core/tx/limiter/service/service.h>
183+
#include <ydb/core/tx/limiter/usage/config.h>
184+
#include <ydb/core/tx/limiter/usage/service.h>
182185

183186
#include <ydb/core/backup/controller/tablet.h>
184187

@@ -2169,6 +2172,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
21692172
}
21702173
}
21712174

2175+
TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
2176+
: IKikimrServicesInitializer(runConfig) {
2177+
}
2178+
2179+
void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
2180+
NLimiter::TConfig serviceConfig;
2181+
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto<NLimiter::TCompDiskLimiterPolicy>(Config.GetCompDiskLimiterConfig()));
2182+
2183+
if (serviceConfig.IsEnabled()) {
2184+
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
2185+
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_COMP_DISK_LIMITER");
2186+
2187+
auto service = NLimiter::TCompDiskOperator::CreateService(serviceConfig, countersGroup);
2188+
2189+
setup->LocalServices.push_back(std::make_pair(
2190+
NLimiter::TCompDiskOperator::MakeServiceId(NodeId),
2191+
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
2192+
}
2193+
}
2194+
21722195
TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig)
21732196
: IKikimrServicesInitializer(runConfig) {
21742197
}

ydb/core/driver_lib/run/kikimr_services_initializers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,12 @@ class TKqpServiceInitializer : public IKikimrServicesInitializer {
391391
IGlobalObjectStorage& GlobalObjects;
392392
};
393393

394+
class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
395+
public:
396+
TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig);
397+
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
398+
};
399+
394400
class TCompConveyorInitializer: public IKikimrServicesInitializer {
395401
public:
396402
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,6 +1542,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
15421542
sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig));
15431543
}
15441544

1545+
if (serviceMask.EnableCompDiskLimiter) {
1546+
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
1547+
}
1548+
15451549
if (serviceMask.EnableScanConveyor) {
15461550
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
15471551
}

ydb/core/driver_lib/run/service_mask.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ union TBasicKikimrServicesMask {
7777

7878
bool EnableDatabaseMetadataCache:1;
7979
bool EnableGraphService:1;
80+
bool EnableCompDiskLimiter:1;
8081
};
8182

8283
struct {

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ PEERDIR(
112112
ydb/core/tx/columnshard
113113
ydb/core/tx/coordinator
114114
ydb/core/tx/conveyor/service
115+
ydb/core/tx/limiter/service
115116
ydb/core/tx/datashard
116117
ydb/core/tx/long_tx_service
117118
ydb/core/tx/long_tx_service/public

ydb/core/protos/config.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,12 @@ message TConveyorConfig {
595595
optional double WorkersCountDouble = 5;
596596
}
597597

598+
message TLimiterConfig {
599+
optional bool Enabled = 1 [default = true];
600+
optional uint64 Limit = 2;
601+
optional uint64 PeriodMilliSeconds = 3 [default = 1000];
602+
}
603+
598604
message TExternalIndexConfig {
599605
optional bool Enabled = 1 [default = true];
600606
optional TInternalRequestConfig RequestConfig = 2;
@@ -1841,6 +1847,7 @@ message TAppConfig {
18411847
optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76;
18421848
optional TBackgroundCleaningConfig BackgroundCleaningConfig = 77;
18431849
optional TBlobCacheConfig BlobCacheConfig = 78;
1850+
optional TLimiterConfig CompDiskLimiterConfig = 79;
18441851

18451852
repeated TNamedConfig NamedConfigs = 100;
18461853
optional string ClusterYamlConfig = 101;

ydb/core/tx/columnshard/columnshard__write_index.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,34 @@
66
#include "engines/changes/abstract/abstract.h"
77
#include "engines/writer/compacted_blob_constructor.h"
88

9+
#include <ydb/core/tx/limiter/usage/abstract.h>
10+
#include <ydb/core/tx/limiter/usage/service.h>
11+
912
#include <ydb/library/actors/core/log.h>
1013

1114
namespace NKikimr::NColumnShard {
1215

16+
class TDiskResourcesRequest: public NLimiter::IResourceRequest {
17+
private:
18+
using TBase = NLimiter::IResourceRequest;
19+
std::shared_ptr<NOlap::TCompactedWriteController> WriteController;
20+
const ui64 TabletId;
21+
22+
private:
23+
virtual void DoOnResourceAllocated() override {
24+
NActors::TActivationContext::AsActorContext().Register(CreateWriteActor(TabletId, WriteController, TInstant::Max()));
25+
}
26+
27+
public:
28+
TDiskResourcesRequest(const std::shared_ptr<NOlap::TCompactedWriteController>& writeController, const ui64 tabletId)
29+
: TBase(writeController->GetWriteVolume())
30+
, WriteController(writeController)
31+
, TabletId(tabletId)
32+
{
33+
34+
}
35+
};
36+
1337
void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
1438
auto putStatus = ev->Get()->GetPutStatus();
1539

@@ -32,7 +56,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
3256
if (*needDraftTransaction) {
3357
Execute(new TTxWriteDraft(this, writeController));
3458
} else {
35-
ctx.Register(CreateWriteActor(TabletID(), writeController, TInstant::Max()));
59+
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID()));
3660
}
3761
}
3862
} else {

ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T
2323
for (auto&& b : portionWithBlobs.GetBlobs()) {
2424
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId())));
2525
b.RegisterBlobId(portionWithBlobs, task.GetBlobId());
26+
WriteVolume += b.GetSize();
2627
}
2728
}
2829
}

ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,17 @@ class TCompactedWriteController : public NColumnShard::IWriteController {
1313
private:
1414
TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> WriteIndexEv;
1515
TActorId DstActor;
16+
ui64 WriteVolume = 0;
17+
1618
protected:
1719
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
1820
virtual void DoAbort(const TString& reason) override;
21+
1922
public:
2023
const TBlobsAction& GetBlobsAction();
24+
ui64 GetWriteVolume() const {
25+
return WriteVolume;
26+
}
2127

2228
TCompactedWriteController(const TActorId& dstActor, TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeEv);
2329
~TCompactedWriteController();

ydb/core/tx/conveyor/service/service.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ void TDistributor::Bootstrap() {
3030
}
3131

3232
void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) {
33+
const auto now = TMonotonic::Now();
34+
const TDuration dExecution = now - ev->Get()->GetStartInstant();
3335
Counters.SolutionsRate->Inc();
34-
Counters.ExecuteHistogram->Collect((TMonotonic::Now() - ev->Get()->GetStartInstant()).MilliSeconds());
36+
Counters.ExecuteHistogram->Collect(dExecution.MilliSeconds());
3537
if (Waiting.size()) {
3638
auto task = Waiting.pop();
37-
Counters.WaitingHistogram->Collect((TMonotonic::Now() - task.GetCreateInstant()).MilliSeconds());
39+
Counters.WaitingHistogram->Collect((ev->Get()->GetStartInstant() - task.GetCreateInstant()).MilliSeconds());
3840
task.OnBeforeStart();
3941
Send(ev->Sender, new TEvInternal::TEvNewTask(task));
4042
} else {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include "service.h"
2+
3+
namespace NKikimr::NLimiter {
4+
5+
TLimiterActor::TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters)
6+
: LimiterName(limiterName)
7+
, Config(config)
8+
, Counters(LimiterName, baseCounters)
9+
{
10+
Counters.InProgressLimit->Set(Config.GetLimit());
11+
}
12+
13+
void TLimiterActor::HandleMain(TEvExternal::TEvAskResource::TPtr& ev) {
14+
const auto now = TMonotonic::Now();
15+
if (RequestsInFlight.empty() || VolumeInFlight + ev->Get()->GetRequest()->GetVolume() <= Config.GetLimit()) {
16+
VolumeInFlight += ev->Get()->GetRequest()->GetVolume();
17+
RequestsInFlight.emplace_back(now, ev->Get()->GetRequest()->GetVolume());
18+
if (RequestsInFlight.size() == 1) {
19+
Schedule(now + Config.GetPeriod(), new NActors::TEvents::TEvWakeup());
20+
}
21+
ev->Get()->GetRequest()->OnResourceAllocated();
22+
Counters.InProgressStart->Inc();
23+
} else {
24+
RequestsQueue.emplace_back(now, ev->Get()->GetRequest());
25+
VolumeInWaiting += ev->Get()->GetRequest()->GetVolume();
26+
}
27+
Counters.InProgressCount->Set(RequestsInFlight.size());
28+
Counters.InProgressVolume->Set(VolumeInFlight);
29+
Counters.WaitingQueueCount->Set(RequestsQueue.size());
30+
Counters.WaitingQueueVolume->Set(VolumeInWaiting);
31+
}
32+
33+
void TLimiterActor::HandleMain(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) {
34+
const auto now = TMonotonic::Now();
35+
AFL_VERIFY(RequestsInFlight.size());
36+
while (RequestsInFlight.size() && RequestsInFlight.front().GetInstant() + Config.GetPeriod() <= now) {
37+
AFL_VERIFY(RequestsInFlight.front().GetVolume() <= VolumeInFlight);
38+
VolumeInFlight = VolumeInFlight - RequestsInFlight.front().GetVolume();
39+
RequestsInFlight.pop_front();
40+
}
41+
if (RequestsInFlight.empty()) {
42+
AFL_VERIFY(!VolumeInFlight);
43+
}
44+
while (RequestsQueue.size() && (RequestsInFlight.empty() || VolumeInFlight + RequestsQueue.front().GetRequest()->GetVolume() <= Config.GetLimit())) {
45+
Counters.WaitingHistogram->Collect((i64)(now - RequestsQueue.front().GetInstant()).MilliSeconds(), 1);
46+
VolumeInFlight += RequestsQueue.front().GetRequest()->GetVolume();
47+
RequestsInFlight.emplace_back(now, RequestsQueue.front().GetRequest()->GetVolume());
48+
RequestsQueue.front().GetRequest()->OnResourceAllocated();
49+
AFL_VERIFY(VolumeInWaiting >= RequestsQueue.front().GetRequest()->GetVolume());
50+
VolumeInWaiting -= RequestsQueue.front().GetRequest()->GetVolume();
51+
RequestsQueue.pop_front();
52+
Counters.InProgressStart->Inc();
53+
}
54+
if (RequestsInFlight.size()) {
55+
Schedule(RequestsInFlight.front().GetInstant() + Config.GetPeriod(), new NActors::TEvents::TEvWakeup());
56+
}
57+
Counters.InProgressCount->Set(RequestsInFlight.size());
58+
Counters.InProgressVolume->Set(VolumeInFlight);
59+
Counters.WaitingQueueCount->Set(RequestsQueue.size());
60+
Counters.WaitingQueueVolume->Set(VolumeInWaiting);
61+
}
62+
63+
}

ydb/core/tx/limiter/service/service.h

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#pragma once
2+
#include <ydb/core/tx/columnshard/counters/common/owner.h>
3+
#include <ydb/core/tx/limiter/usage/abstract.h>
4+
#include <ydb/core/tx/limiter/usage/config.h>
5+
#include <ydb/core/tx/limiter/usage/events.h>
6+
7+
#include <ydb/library/actors/core/actor_bootstrapped.h>
8+
#include <ydb/library/actors/core/log.h>
9+
#include <ydb/library/accessor/accessor.h>
10+
11+
#include <library/cpp/monlib/dynamic_counters/counters.h>
12+
13+
#include <queue>
14+
15+
namespace NKikimr::NLimiter {
16+
17+
class TCounters: public NColumnShard::TCommonCountersOwner {
18+
private:
19+
using TBase = NColumnShard::TCommonCountersOwner;
20+
public:
21+
const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueCount;
22+
const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueVolume;
23+
const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressLimit;
24+
25+
const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressCount;
26+
const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressVolume;
27+
28+
const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressStart;
29+
30+
const ::NMonitoring::THistogramPtr WaitingHistogram;
31+
32+
TCounters(const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals)
33+
: TBase("Limiter/" + limiterName, baseSignals)
34+
, WaitingQueueCount(TBase::GetValue("WaitingQueue/Count"))
35+
, WaitingQueueVolume(TBase::GetValue("WaitingQueue/Volume"))
36+
, InProgressLimit(TBase::GetValue("InProgress/Limit/Volume"))
37+
, InProgressCount(TBase::GetValue("InProgress/Count"))
38+
, InProgressVolume(TBase::GetValue("InProgress/Volume"))
39+
, InProgressStart(TBase::GetDeriviative("InProgress"))
40+
, WaitingHistogram(TBase::GetHistogram("Waiting", NMonitoring::ExponentialHistogram(20, 2))) {
41+
}
42+
};
43+
44+
class TLimiterActor: public NActors::TActorBootstrapped<TLimiterActor> {
45+
private:
46+
const TString LimiterName;
47+
const TConfig Config;
48+
TCounters Counters;
49+
class TResourceRequest {
50+
private:
51+
YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero());
52+
YDB_READONLY_DEF(std::shared_ptr<IResourceRequest>, Request);
53+
public:
54+
TResourceRequest(const TMonotonic instant, const std::shared_ptr<IResourceRequest>& req)
55+
: Instant(instant)
56+
, Request(req) {
57+
58+
}
59+
};
60+
61+
class TResourceRequestInFlight {
62+
private:
63+
YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero());
64+
YDB_READONLY(ui64, Volume, 0);
65+
public:
66+
TResourceRequestInFlight(const TMonotonic instant, const ui64 volume)
67+
: Instant(instant)
68+
, Volume(volume) {
69+
70+
}
71+
};
72+
73+
ui64 VolumeInFlight = 0;
74+
ui64 VolumeInWaiting = 0;
75+
std::deque<TResourceRequest> RequestsQueue;
76+
std::deque<TResourceRequestInFlight> RequestsInFlight;
77+
78+
void HandleMain(TEvExternal::TEvAskResource::TPtr& ev);
79+
void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);
80+
81+
public:
82+
83+
STATEFN(StateMain) {
84+
switch (ev->GetTypeRewrite()) {
85+
hFunc(TEvExternal::TEvAskResource, HandleMain);
86+
hFunc(NActors::TEvents::TEvWakeup, HandleMain);
87+
default:
88+
AFL_ERROR(NKikimrServices::TX_LIMITER)("limiter", LimiterName)("problem", "unexpected event")("type", ev->GetTypeRewrite());
89+
AFL_VERIFY_DEBUG(false)("type", ev->GetTypeRewrite());
90+
break;
91+
}
92+
}
93+
94+
TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters);
95+
96+
void Bootstrap() {
97+
Become(&TLimiterActor::StateMain);
98+
}
99+
};
100+
101+
}

ydb/core/tx/limiter/service/ya.make

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
service.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/tx/limiter/usage
9+
ydb/core/protos
10+
)
11+
12+
END()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "abstract.h"
2+
3+
namespace NKikimr::NLimiter {
4+
5+
}

0 commit comments

Comments
 (0)