Skip to content

Commit 8343cfd

Browse files
Limit requested memory (#6698)
1 parent 89c16ee commit 8343cfd

File tree

8 files changed

+106
-48
lines changed

8 files changed

+106
-48
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,6 +1518,7 @@ message TColumnShardConfig {
15181518
repeated TRepairInfo Repairs = 15;
15191519

15201520
optional uint32 MaxInFlightIntervalsOnRequest = 16;
1521+
optional uint32 MaxInFlightMemoryOnRequest = 17;
15211522
}
15221523

15231524
message TSchemeShardConfig {

ydb/core/tx/columnshard/counters/scan.h

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,38 @@
33
#include "common/histogram.h"
44
#include <ydb/core/tx/columnshard/resources/memory.h>
55
#include <ydb/core/tx/columnshard/resource_subscriber/counters.h>
6+
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
67
#include <library/cpp/monlib/dynamic_counters/counters.h>
78

89
namespace NKikimr::NColumnShard {
910

1011
class TScanAggregations: public TCommonCountersOwner {
1112
private:
1213
using TBase = TCommonCountersOwner;
13-
std::shared_ptr<NOlap::TMemoryAggregation> ReadBlobs;
14-
std::shared_ptr<NOlap::TMemoryAggregation> GranulesProcessing;
15-
std::shared_ptr<NOlap::TMemoryAggregation> GranulesReady;
1614
std::shared_ptr<NOlap::TMemoryAggregation> ResultsReady;
15+
std::shared_ptr<NOlap::TMemoryAggregation> RequestedResourcesMemory;
1716
std::shared_ptr<TValueAggregationClient> ScanDuration;
1817
std::shared_ptr<TValueAggregationClient> BlobsWaitingDuration;
1918
public:
2019
TScanAggregations(const TString& moduleId)
2120
: TBase(moduleId)
22-
, GranulesProcessing(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Granules/Processing"))
2321
, ResultsReady(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Results/Ready"))
22+
, RequestedResourcesMemory(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Resources/Requested"))
2423
, ScanDuration(TBase::GetValueAutoAggregationsClient("ScanDuration"))
2524
, BlobsWaitingDuration(TBase::GetValueAutoAggregationsClient("BlobsWaitingDuration"))
2625
{
2726

2827
}
2928

29+
std::shared_ptr<NOlap::TMemoryAggregation> GetRequestedResourcesMemory() const {
30+
return RequestedResourcesMemory;
31+
}
32+
3033
void OnBlobWaitingDuration(const TDuration d, const TDuration fullScanDuration) const {
3134
BlobsWaitingDuration->Add(d.MicroSeconds());
3235
ScanDuration->SetValue(fullScanDuration.MicroSeconds());
3336
}
3437

35-
const std::shared_ptr<NOlap::TMemoryAggregation>& GetGranulesProcessing() const {
36-
return GranulesProcessing;
37-
}
3838
const std::shared_ptr<NOlap::TMemoryAggregation>& GetResultsReady() const {
3939
return ResultsReady;
4040
}
@@ -282,16 +282,55 @@ class TCounterGuard: TNonCopyable {
282282

283283
};
284284

285+
class TReaderResourcesGuard {
286+
private:
287+
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> Allocated;
288+
std::shared_ptr<TAtomicCounter> Requested;
289+
const std::shared_ptr<NOlap::TMemoryAggregation> SignalCounter;
290+
const ui64 Volume;
291+
292+
public:
293+
TReaderResourcesGuard(const ui64 volume, const std::shared_ptr<TAtomicCounter>& requested, const std::shared_ptr<NOlap::TMemoryAggregation>& signalWatcher)
294+
: Requested(requested)
295+
, SignalCounter(signalWatcher)
296+
, Volume(volume)
297+
{
298+
AFL_VERIFY(Requested);
299+
Requested->Add(Volume);
300+
SignalCounter->AddBytes(volume);
301+
}
302+
303+
void InitResources(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& g) {
304+
AFL_VERIFY(!Allocated);
305+
AFL_VERIFY(g->GetMemory() == Volume)("volume", Volume)("allocated", g->GetMemory());
306+
Allocated = g;
307+
}
308+
309+
~TReaderResourcesGuard() {
310+
SignalCounter->RemoveBytes(Volume);
311+
AFL_VERIFY(Requested->Sub(Volume) >= 0);
312+
}
313+
};
314+
285315
class TConcreteScanCounters: public TScanCounters {
286316
private:
287317
using TBase = TScanCounters;
318+
std::shared_ptr<TAtomicCounter> RequestedResourcesBytes;
288319
std::shared_ptr<TAtomicCounter> MergeTasksCount;
289320
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
290321
std::shared_ptr<TAtomicCounter> ReadTasksCount;
291322
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
292323
public:
293324
TScanAggregations Aggregations;
294325

326+
ui64 GetRequestedMemoryBytes() const {
327+
return RequestedResourcesBytes->Val();
328+
}
329+
330+
std::shared_ptr<TReaderResourcesGuard> BuildRequestedResourcesGuard(const ui64 volume) const {
331+
return std::make_shared<TReaderResourcesGuard>(volume, RequestedResourcesBytes, Aggregations.GetRequestedResourcesMemory());
332+
}
333+
295334
TCounterGuard GetMergeTasksGuard() const {
296335
return TCounterGuard(MergeTasksCount);
297336
}
@@ -319,6 +358,7 @@ class TConcreteScanCounters: public TScanCounters {
319358

320359
TConcreteScanCounters(const TScanCounters& counters)
321360
: TBase(counters)
361+
, RequestedResourcesBytes(std::make_shared<TAtomicCounter>())
322362
, MergeTasksCount(std::make_shared<TAtomicCounter>())
323363
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
324364
, ReadTasksCount(std::make_shared<TAtomicCounter>())

ydb/core/tx/columnshard/engines/reader/common/result.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22
#include <ydb/core/tx/columnshard/common/snapshot.h>
3+
#include <ydb/core/tx/columnshard/counters/scan.h>
34
#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
45
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
56
#include <ydb/core/tx/program/program.h>
@@ -10,7 +11,7 @@ namespace NKikimr::NOlap::NReader {
1011
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
1112
class TPartialReadResult {
1213
private:
13-
YDB_READONLY_DEF(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>, ResourcesGuards);
14+
YDB_READONLY_DEF(std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>, ResourcesGuards);
1415
NArrow::TShardedRecordBatch ResultBatch;
1516

1617
// This 1-row batch contains the last key that was read while producing the ResultBatch.
@@ -32,7 +33,7 @@ class TPartialReadResult {
3233
return ResultBatch.GetRecordBatch();
3334
}
3435

35-
const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& GetResourcesGuardOnly() const {
36+
const std::shared_ptr<NColumnShard::TReaderResourcesGuard>& GetResourcesGuardOnly() const {
3637
AFL_VERIFY(ResourcesGuards.size() == 1);
3738
AFL_VERIFY(!!ResourcesGuards.front());
3839
return ResourcesGuards.front();
@@ -56,14 +57,12 @@ class TPartialReadResult {
5657
return LastReadKey;
5758
}
5859

59-
explicit TPartialReadResult(
60-
const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards,
60+
explicit TPartialReadResult(const std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>& resourcesGuards,
6161
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
6262
: ResourcesGuards(resourcesGuards)
6363
, ResultBatch(batch)
6464
, LastReadKey(lastKey)
65-
, NotFinishedIntervalIdx(notFinishedIntervalIdx)
66-
{
65+
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
6766
for (auto&& i : ResourcesGuards) {
6867
AFL_VERIFY(i);
6968
}
@@ -72,16 +71,17 @@ class TPartialReadResult {
7271
Y_ABORT_UNLESS(LastReadKey->num_rows() == 1);
7372
}
7473

75-
explicit TPartialReadResult(
76-
const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuards,
74+
explicit TPartialReadResult(const std::shared_ptr<NColumnShard::TReaderResourcesGuard>& resourcesGuards,
7775
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
78-
: TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>({resourcesGuards}), batch, lastKey, notFinishedIntervalIdx) {
76+
: TPartialReadResult(
77+
std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>({ resourcesGuards }), batch, lastKey, notFinishedIntervalIdx) {
7978
AFL_VERIFY(resourcesGuards);
8079
}
8180

82-
explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
83-
: TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>(), batch, lastKey, notFinishedIntervalIdx) {
81+
explicit TPartialReadResult(
82+
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
83+
: TPartialReadResult(std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>(), batch, lastKey, notFinishedIntervalIdx) {
8484
}
8585
};
8686

87-
}
87+
} // namespace NKikimr::NOlap::NReader

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,6 @@ void TFetchingInterval::ConstructResult() {
1818
}
1919
}
2020

21-
void TFetchingInterval::OnInitResourcesGuard(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
22-
IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitSources);
23-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocated")("interval_idx", IntervalIdx);
24-
AFL_VERIFY(guard);
25-
AFL_VERIFY(!ResourcesGuard);
26-
ResourcesGuard = guard;
27-
for (auto&& i : Sources) {
28-
i.second->OnInitResourcesGuard(i.second);
29-
}
30-
AFL_VERIFY(ReadyGuards.Inc() <= 1);
31-
ConstructResult();
32-
}
33-
3421
void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) {
3522
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetched")("interval_idx", IntervalIdx);
3623
AFL_VERIFY(ReadySourcesCount.Inc() <= WaitSourcesCount);
@@ -45,6 +32,7 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi
4532
, Context(context)
4633
, TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
4734
, Sources(sources)
35+
, ResourcesGuard(Context->GetCommonContext()->GetCounters().BuildRequestedResourcesGuard(GetMemoryAllocation()))
4836
, IntervalIdx(intervalIdx)
4937
, IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard())
5038
{
@@ -62,7 +50,13 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBro
6250
AFL_VERIFY(guard);
6351
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("interval_idx", IntervalIdx)("event", "resources_allocated")
6452
("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size());
65-
OnInitResourcesGuard(guard);
53+
IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitSources);
54+
ResourcesGuard->InitResources(guard);
55+
for (auto&& i : Sources) {
56+
i.second->OnInitResourcesGuard(i.second);
57+
}
58+
AFL_VERIFY(ReadyGuards.Inc() <= 1);
59+
ConstructResult();
6660
}
6761

6862
void TFetchingInterval::SetMerger(std::unique_ptr<NArrow::NMerger::TMergePartialStream>&& merger) {

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe
2020

2121
void ConstructResult();
2222

23-
std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
23+
std::shared_ptr<NColumnShard::TReaderResourcesGuard> ResourcesGuard;
2424
const ui32 IntervalIdx;
2525
TAtomicCounter ReadySourcesCount = 0;
2626
TAtomicCounter ReadyGuards = 0;
2727
ui32 WaitSourcesCount = 0;
2828
NColumnShard::TConcreteScanCounters::TScanIntervalStateGuard IntervalStateGuard;
29-
void OnInitResourcesGuard(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard);
3029
protected:
3130
virtual void DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) override;
3231

@@ -47,7 +46,7 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe
4746
return Sources;
4847
}
4948

50-
const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& GetResourcesGuard() const {
49+
const std::shared_ptr<NColumnShard::TReaderResourcesGuard>& GetResourcesGuard() const {
5150
return ResourcesGuard;
5251
}
5352

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,15 @@ TConclusionStatus TScanHead::Start() {
9797
TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context)
9898
: Context(context)
9999
{
100-
if (!HasAppData() || !AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
101-
MaxInFlight = 256;
102-
} else {
103-
MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
100+
101+
if (HasAppData()) {
102+
if (AppDataVerified().ColumnShardConfig.HasMaxInFlightMemoryOnRequest()) {
103+
MaxInFlightMemory = AppDataVerified().ColumnShardConfig.GetMaxInFlightMemoryOnRequest();
104+
}
105+
106+
if (AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
107+
MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
108+
}
104109
}
105110

106111
if (Context->GetReadMetadata()->Limit) {
@@ -239,19 +244,31 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
239244
if (AbortFlag) {
240245
return false;
241246
}
242-
while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) {
247+
while (BorderPoints.size()) {
248+
if (BorderPoints.begin()->second.GetStartSources().size()) {
249+
if (FetchingIntervals.size() >= InFlightLimit) {
250+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "too many intervals in flight")(
251+
"count", FetchingIntervals.size())("limit", InFlightLimit);
252+
return false;
253+
}
254+
if (Context->GetCommonContext()->GetCounters().GetRequestedMemoryBytes() >= MaxInFlightMemory) {
255+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "a lot of memory in usage")(
256+
"volume", Context->GetCommonContext()->GetCounters().GetRequestedMemoryBytes())("limit", MaxInFlightMemory);
257+
return false;
258+
}
259+
}
243260
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
244261
CurrentState.OnStartPoint(firstBorderPointInfo);
245262

246263
if (CurrentState.GetIsSpecialPoint()) {
247264
const ui32 intervalIdx = SegmentIdxCounter++;
248-
auto interval = std::make_shared<TFetchingInterval>(
249-
BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(),
250-
Context, true, true, false);
265+
auto interval = std::make_shared<TFetchingInterval>(BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx,
266+
CurrentState.GetCurrentSources(), Context, true, true, false);
251267
FetchingIntervals.emplace(intervalIdx, interval);
252268
IntervalStats.emplace_back(CurrentState.GetCurrentSources().size(), true);
253269
NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), interval);
254-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", interval->DebugJson());
270+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)(
271+
"interval", interval->DebugJson());
255272
}
256273

257274
CurrentState.OnFinishPoint(firstBorderPointInfo);
@@ -262,11 +279,13 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
262279
Y_ABORT_UNLESS(BorderPoints.size());
263280
CurrentState.OnNextPointInfo(BorderPoints.begin()->second);
264281
const ui32 intervalIdx = SegmentIdxCounter++;
265-
auto interval = std::make_shared<TFetchingInterval>(*CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(), Context,
266-
CurrentState.GetIncludeFinish(), CurrentState.GetIncludeStart(), CurrentState.GetIsExclusiveInterval());
282+
auto interval =
283+
std::make_shared<TFetchingInterval>(*CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(),
284+
Context, CurrentState.GetIncludeFinish(), CurrentState.GetIncludeStart(), CurrentState.GetIsExclusiveInterval());
267285
FetchingIntervals.emplace(intervalIdx, interval);
268286
IntervalStats.emplace_back(CurrentState.GetCurrentSources().size(), false);
269-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", interval->DebugJson());
287+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)(
288+
"interval", interval->DebugJson());
270289
NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), interval);
271290
return true;
272291
} else {

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class TScanHead {
7979
std::vector<TIntervalStat> IntervalStats;
8080
ui64 InFlightLimit = 1;
8181
ui64 MaxInFlight = 256;
82+
ui64 MaxInFlightMemory = ((ui64)256) << 20;
8283
ui64 ZeroCount = 0;
8384
bool AbortFlag = false;
8485
void DrainSources();

ydb/core/tx/columnshard/resource_subscriber/task.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class TResourcesGuard: public NColumnShard::TMonitoringObjectsCounter<TResources
2828
const TTaskContext Context;
2929
const ui64 Priority;
3030
public:
31+
ui64 GetMemory() const {
32+
return Memory;
33+
}
34+
3135
TString DebugString() const {
3236
return TStringBuilder() << "(mem=" << Memory << ";cpu=" << Cpu << ";)";
3337
}

0 commit comments

Comments
 (0)