Skip to content

Commit 91d556c

Browse files
non sorted collection usage (#16804)
1 parent 7db9a40 commit 91d556c

File tree

4 files changed

+36
-16
lines changed

4 files changed

+36
-16
lines changed

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ ISourcesCollection::ISourcesCollection(const std::shared_ptr<TSpecialReadContext
9292
}
9393
}
9494

95-
std::shared_ptr<NKikimr::NOlap::IScanCursor> TNotSortedFullScanCollection::DoBuildCursor(
95+
std::shared_ptr<NKikimr::NOlap::IScanCursor> TNotSortedCollection::DoBuildCursor(
9696
const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const {
9797
return std::make_shared<TNotSortedSimpleScanCursor>(source->GetSourceId(), readyRecords);
9898
}

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h

+25-6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ISourcesCollection {
4646
}
4747

4848
void OnSourceFinished(const std::shared_ptr<IDataSource>& source) {
49+
AFL_VERIFY(source);
4950
SourcesInFlightCount.Dec();
5051
DoOnSourceFinished(source);
5152
}
@@ -61,11 +62,14 @@ class ISourcesCollection {
6162
ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context);
6263
};
6364

64-
class TNotSortedFullScanCollection: public ISourcesCollection {
65+
class TNotSortedCollection: public ISourcesCollection {
6566
private:
6667
using TBase = ISourcesCollection;
68+
std::optional<ui32> Limit;
69+
ui32 InFlightLimit = 1;
6770
std::deque<TSourceConstructor> Sources;
6871
TPositiveControlInteger InFlightCount;
72+
ui32 FetchedCount = 0;
6973
virtual void DoClear() override {
7074
Sources.clear();
7175
}
@@ -81,16 +85,31 @@ class TNotSortedFullScanCollection: public ISourcesCollection {
8185
return result;
8286
}
8387
virtual bool DoCheckInFlightLimits() const override {
84-
return InFlightCount < GetMaxInFlight();
88+
return InFlightCount < InFlightLimit;
8589
}
86-
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& /*source*/) override {
90+
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override {
91+
if (!source->GetResultRecordsCount() && InFlightLimit * 2 < GetMaxInFlight()) {
92+
InFlightLimit *= 2;
93+
}
94+
FetchedCount += source->GetResultRecordsCount();
95+
if (Limit && *Limit <= FetchedCount && Sources.size()) {
96+
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount);
97+
Sources.clear();
98+
}
8799
InFlightCount.Dec();
88100
}
89101

90102
public:
91-
TNotSortedFullScanCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
92-
const std::shared_ptr<IScanCursor>& cursor)
93-
: TBase(context) {
103+
TNotSortedCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
104+
const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit)
105+
: TBase(context)
106+
, Limit(limit)
107+
{
108+
if (Limit) {
109+
InFlightLimit = 1;
110+
} else {
111+
InFlightLimit = GetMaxInFlight();
112+
}
94113
if (cursor && cursor->IsInitialized()) {
95114
while (sources.size()) {
96115
bool usage = false;

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
2121

2222
sources.emplace_back(TSourceConstructor(sourceIdx++, i, context));
2323
}
24-
std::make_heap(sources.begin(), sources.end());
2524
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);
2625

2726
auto& stats = GetReadMetadata()->ReadStats;

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp

+10-8
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,17 @@ TConclusionStatus TScanHead::Start() {
6666

6767
TScanHead::TScanHead(std::deque<TSourceConstructor>&& sources, const std::shared_ptr<TSpecialReadContext>& context)
6868
: Context(context) {
69-
if (Context->GetReadMetadata()->HasLimit()) {
70-
SourcesCollection =
71-
std::make_unique<TScanWithLimitCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
72-
} else if (Context->GetReadMetadata()->IsSorted()) {
73-
SourcesCollection =
74-
std::make_unique<TSortedFullScanCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
69+
if (Context->GetReadMetadata()->IsSorted()) {
70+
if (Context->GetReadMetadata()->HasLimit()) {
71+
SourcesCollection =
72+
std::make_unique<TScanWithLimitCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
73+
} else {
74+
SourcesCollection =
75+
std::make_unique<TSortedFullScanCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
76+
}
7577
} else {
76-
SourcesCollection =
77-
std::make_unique<TNotSortedFullScanCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
78+
SourcesCollection = std::make_unique<TNotSortedCollection>(
79+
Context, std::move(sources), context->GetCommonContext()->GetScanCursor(), Context->GetReadMetadata()->GetLimitRobustOptional());
7880
}
7981
}
8082

0 commit comments

Comments
 (0)