Skip to content

Commit b392a83

Browse files
change limit usage control (#16831)
1 parent 2c44b87 commit b392a83

File tree

6 files changed

+203
-49
lines changed

6 files changed

+203
-49
lines changed

.github/config/muted_ya.txt

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[9]
126126
ydb/tests/functional/tpc/large test_tpch_spilling.py.TestTpchSpillingS10.test_tpch[7]
127127
ydb/tests/olap sole chunk chunk
128128
ydb/tests/olap test_quota_exhaustion.py.TestYdbWorkload.test_delete
129+
ydb/tests/olap/data_quotas test_quota_exhaustion.py.TestYdbWorkload.test_duplicates
129130
ydb/tests/olap/column_family/compression alter_compression.py.TestAlterCompression.test_all_supported_compression
130131
ydb/tests/olap/column_family/compression sole chunk chunk
131132
ydb/tests/olap/oom overlapping_portions.py.TestOverlappingPortions.test

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp

+28-13
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,25 @@ std::shared_ptr<IDataSource> TScanWithLimitCollection::DoExtractNext() {
1111
AFL_VERIFY(FetchingInFlightSources.emplace(TCompareKeyForScanSequence::FromFinish(result)).second);
1212
auto predPosition = std::move(HeapSources.back());
1313
HeapSources.pop_back();
14-
if (HeapSources.size()) {
15-
FullIntervalsFetchingCount.Add(GetInFlightIntervalsCount(predPosition.GetStart(), HeapSources.front().GetStart()));
16-
} else {
17-
FullIntervalsFetchingCount = FetchingInFlightSources.size() + FinishedSources.size();
18-
}
1914
FetchingInFlightCount.Inc();
2015
return result;
2116
}
2217

2318
void TScanWithLimitCollection::DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) {
19+
if (!source->GetResultRecordsCount() && InFlightLimit < GetMaxInFlight()) {
20+
InFlightLimit = 2 * InFlightLimit;
21+
}
2422
FetchingInFlightCount.Dec();
2523
AFL_VERIFY(FetchingInFlightSources.erase(TCompareKeyForScanSequence::FromFinish(source)));
26-
AFL_VERIFY(FinishedSources.emplace(TCompareKeyForScanSequence::FromFinish(source), TFinishedDataSource(source)).second);
2724
while (FinishedSources.size() && (HeapSources.empty() || FinishedSources.begin()->first < HeapSources.front().GetStart())) {
2825
auto finishedSource = FinishedSources.begin()->second;
29-
if (!finishedSource.GetRecordsCount() && InFlightLimit < GetMaxInFlight()) {
30-
InFlightLimit = 2 * InFlightLimit;
31-
}
3226
FetchedCount += finishedSource.GetRecordsCount();
3327
FinishedSources.erase(FinishedSources.begin());
34-
if (Context->IsActive()) {
35-
--FullIntervalsFetchingCount;
36-
}
3728
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource.GetSourceId())(
3829
"source_idx", finishedSource.GetSourceIdx())("limit", Limit)("fetched", finishedSource.GetRecordsCount());
3930
if (Limit <= FetchedCount && HeapSources.size()) {
4031
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount);
4132
HeapSources.clear();
42-
FullIntervalsFetchingCount = FinishedSources.size() + FetchingInFlightSources.size();
4333
}
4434
}
4535
}
@@ -85,6 +75,31 @@ TScanWithLimitCollection::TScanWithLimitCollection(
8575
std::make_heap(HeapSources.begin(), HeapSources.end());
8676
}
8777

78+
void TScanWithLimitCollection::DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) {
79+
std::vector<std::shared_ptr<arrow::ChunkedArray>> pkArrays;
80+
for (auto&& f : Context->GetReadMetadata()->GetResultSchema()->GetIndexInfo().GetReplaceKey()->fields()) {
81+
pkArrays.emplace_back(table->GetColumnByName(f->name()));
82+
if (!pkArrays.back()) {
83+
pkArrays.pop_back();
84+
break;
85+
}
86+
}
87+
AFL_VERIFY(pkArrays.size());
88+
const ui32 partsCount = std::min<ui32>(10, table->num_rows());
89+
std::optional<i32> lastPosition;
90+
for (ui32 i = 0; i < partsCount; ++i) {
91+
const i32 currentPosition = (i + 1) * (table->num_rows() - 1) / partsCount;
92+
if (lastPosition) {
93+
AFL_VERIFY(*lastPosition < currentPosition);
94+
}
95+
const i64 size = lastPosition ? (currentPosition - *lastPosition) : currentPosition;
96+
lastPosition = currentPosition;
97+
TReplaceKeyAdapter key(NArrow::TComparablePosition(pkArrays, currentPosition), Context->GetReadMetadata()->IsDescSorted());
98+
TCompareKeyForScanSequence finishPos(key, source->GetSourceId());
99+
AFL_VERIFY(FinishedSources.emplace(finishPos, TFinishedDataSource(source, size)).second);
100+
}
101+
}
102+
88103
ISourcesCollection::ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context)
89104
: Context(context) {
90105
if (HasAppData() && AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h

+20-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class ISourcesCollection {
1212
virtual std::shared_ptr<IDataSource> DoExtractNext() = 0;
1313
virtual bool DoCheckInFlightLimits() const = 0;
1414
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) = 0;
15+
virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) = 0;
1516
virtual void DoClear() = 0;
1617

1718
TPositiveControlInteger SourcesInFlightCount;
@@ -30,6 +31,10 @@ class ISourcesCollection {
3031
return DoBuildCursor(source, readyRecords);
3132
}
3233

34+
void OnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) {
35+
return DoOnIntervalResult(table, source);
36+
}
37+
3338
TString DebugString() const {
3439
return DoDebugString();
3540
}
@@ -87,6 +92,8 @@ class TNotSortedCollection: public ISourcesCollection {
8792
virtual bool DoCheckInFlightLimits() const override {
8893
return InFlightCount < InFlightLimit;
8994
}
95+
virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override {
96+
}
9097
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override {
9198
if (!source->GetResultRecordsCount() && InFlightLimit * 2 < GetMaxInFlight()) {
9299
InFlightLimit *= 2;
@@ -103,8 +110,7 @@ class TNotSortedCollection: public ISourcesCollection {
103110
TNotSortedCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
104111
const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit)
105112
: TBase(context)
106-
, Limit(limit)
107-
{
113+
, Limit(limit) {
108114
if (Limit) {
109115
InFlightLimit = 1;
110116
} else {
@@ -141,6 +147,8 @@ class TSortedFullScanCollection: public ISourcesCollection {
141147
virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
142148
return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords);
143149
}
150+
virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override {
151+
}
144152
virtual std::shared_ptr<IDataSource> DoExtractNext() override {
145153
AFL_VERIFY(HeapSources.size());
146154
auto result = HeapSources.front().Construct(Context);
@@ -192,6 +200,13 @@ class TScanWithLimitCollection: public ISourcesCollection {
192200
, SourceId(source->GetSourceId())
193201
, SourceIdx(source->GetSourceIdx()) {
194202
}
203+
204+
TFinishedDataSource(const std::shared_ptr<IDataSource>& source, const ui32 partSize)
205+
: RecordsCount(partSize)
206+
, SourceId(source->GetSourceId())
207+
, SourceIdx(source->GetSourceIdx()) {
208+
AFL_VERIFY(partSize < source->GetResultRecordsCount());
209+
}
195210
};
196211

197212
std::deque<TSourceConstructor> HeapSources;
@@ -203,6 +218,7 @@ class TScanWithLimitCollection: public ISourcesCollection {
203218
std::map<TCompareKeyForScanSequence, TFinishedDataSource> FinishedSources;
204219
std::set<TCompareKeyForScanSequence> FetchingInFlightSources;
205220

221+
virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) override;
206222
virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
207223
return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords);
208224
}
@@ -214,7 +230,8 @@ class TScanWithLimitCollection: public ISourcesCollection {
214230
}
215231
virtual std::shared_ptr<IDataSource> DoExtractNext() override;
216232
virtual bool DoCheckInFlightLimits() const override {
217-
return (FetchingInFlightCount < GetMaxInFlight()) && (FullIntervalsFetchingCount < InFlightLimit);
233+
return (FetchingInFlightCount < InFlightLimit);
234+
//&&(FullIntervalsFetchingCount < InFlightLimit);
218235
}
219236
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override;
220237
ui32 GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const;

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
4444
auto cursor = SourcesCollection->BuildCursor(frontSource, startIndex + recordsCount);
4545
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table,
4646
cursor, Context->GetCommonContext(), sourceIdxToContinue));
47+
SourcesCollection->OnIntervalResult(table, frontSource);
4748
} else if (sourceIdxToContinue) {
4849
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
4950
"source_idx", frontSource->GetSourceIdx());

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h

+7-2
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,22 @@ class TPortionPage {
4848
class TReplaceKeyAdapter {
4949
private:
5050
bool Reverse = false;
51-
NArrow::TReplaceKey Value;
51+
NArrow::TComparablePosition Value;
5252

5353
public:
5454
TReplaceKeyAdapter(const NArrow::TReplaceKey& rk, const bool reverse)
5555
: Reverse(reverse)
5656
, Value(rk) {
5757
}
5858

59+
TReplaceKeyAdapter(const NArrow::TComparablePosition& pos, const bool reverse)
60+
: Reverse(reverse)
61+
, Value(pos) {
62+
}
63+
5964
std::partial_ordering Compare(const TReplaceKeyAdapter& item) const {
6065
AFL_VERIFY(Reverse == item.Reverse);
61-
const std::partial_ordering result = Value.CompareNotNull(item.Value);
66+
const std::partial_ordering result = Value.Compare(item.Value);
6267
if (result == std::partial_ordering::equivalent) {
6368
return std::partial_ordering::equivalent;
6469
} else if (result == std::partial_ordering::less) {

0 commit comments

Comments
 (0)