Skip to content

change limit usage control #16831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[9]
ydb/tests/functional/tpc/large test_tpch_spilling.py.TestTpchSpillingS10.test_tpch[7]
ydb/tests/olap sole chunk chunk
ydb/tests/olap test_quota_exhaustion.py.TestYdbWorkload.test_delete
ydb/tests/olap/data_quotas test_quota_exhaustion.py.TestYdbWorkload.test_duplicates
ydb/tests/olap/column_family/compression alter_compression.py.TestAlterCompression.test_all_supported_compression
ydb/tests/olap/column_family/compression sole chunk chunk
ydb/tests/olap/oom overlapping_portions.py.TestOverlappingPortions.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,25 @@ std::shared_ptr<IDataSource> TScanWithLimitCollection::DoExtractNext() {
AFL_VERIFY(FetchingInFlightSources.emplace(TCompareKeyForScanSequence::FromFinish(result)).second);
auto predPosition = std::move(HeapSources.back());
HeapSources.pop_back();
if (HeapSources.size()) {
FullIntervalsFetchingCount.Add(GetInFlightIntervalsCount(predPosition.GetStart(), HeapSources.front().GetStart()));
} else {
FullIntervalsFetchingCount = FetchingInFlightSources.size() + FinishedSources.size();
}
FetchingInFlightCount.Inc();
return result;
}

void TScanWithLimitCollection::DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) {
if (!source->GetResultRecordsCount() && InFlightLimit < GetMaxInFlight()) {
InFlightLimit = 2 * InFlightLimit;
}
FetchingInFlightCount.Dec();
AFL_VERIFY(FetchingInFlightSources.erase(TCompareKeyForScanSequence::FromFinish(source)));
AFL_VERIFY(FinishedSources.emplace(TCompareKeyForScanSequence::FromFinish(source), TFinishedDataSource(source)).second);
while (FinishedSources.size() && (HeapSources.empty() || FinishedSources.begin()->first < HeapSources.front().GetStart())) {
auto finishedSource = FinishedSources.begin()->second;
if (!finishedSource.GetRecordsCount() && InFlightLimit < GetMaxInFlight()) {
InFlightLimit = 2 * InFlightLimit;
}
FetchedCount += finishedSource.GetRecordsCount();
FinishedSources.erase(FinishedSources.begin());
if (Context->IsActive()) {
--FullIntervalsFetchingCount;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource.GetSourceId())(
"source_idx", finishedSource.GetSourceIdx())("limit", Limit)("fetched", finishedSource.GetRecordsCount());
if (Limit <= FetchedCount && HeapSources.size()) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount);
HeapSources.clear();
FullIntervalsFetchingCount = FinishedSources.size() + FetchingInFlightSources.size();
}
}
}
Expand Down Expand Up @@ -85,6 +75,31 @@ TScanWithLimitCollection::TScanWithLimitCollection(
std::make_heap(HeapSources.begin(), HeapSources.end());
}

void TScanWithLimitCollection::DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) {
std::vector<std::shared_ptr<arrow::ChunkedArray>> pkArrays;
for (auto&& f : Context->GetReadMetadata()->GetResultSchema()->GetIndexInfo().GetReplaceKey()->fields()) {
pkArrays.emplace_back(table->GetColumnByName(f->name()));
if (!pkArrays.back()) {
pkArrays.pop_back();
break;
}
}
AFL_VERIFY(pkArrays.size());
const ui32 partsCount = std::min<ui32>(10, table->num_rows());
std::optional<i32> lastPosition;
for (ui32 i = 0; i < partsCount; ++i) {
const i32 currentPosition = (i + 1) * (table->num_rows() - 1) / partsCount;
if (lastPosition) {
AFL_VERIFY(*lastPosition < currentPosition);
}
const i64 size = lastPosition ? (currentPosition - *lastPosition) : currentPosition;
lastPosition = currentPosition;
TReplaceKeyAdapter key(NArrow::TComparablePosition(pkArrays, currentPosition), Context->GetReadMetadata()->IsDescSorted());
TCompareKeyForScanSequence finishPos(key, source->GetSourceId());
AFL_VERIFY(FinishedSources.emplace(finishPos, TFinishedDataSource(source, size)).second);
}
}

ISourcesCollection::ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context)
: Context(context) {
if (HasAppData() && AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ISourcesCollection {
virtual std::shared_ptr<IDataSource> DoExtractNext() = 0;
virtual bool DoCheckInFlightLimits() const = 0;
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) = 0;
virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) = 0;
virtual void DoClear() = 0;

TPositiveControlInteger SourcesInFlightCount;
Expand All @@ -30,6 +31,10 @@ class ISourcesCollection {
return DoBuildCursor(source, readyRecords);
}

void OnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) {
return DoOnIntervalResult(table, source);
}

TString DebugString() const {
return DoDebugString();
}
Expand Down Expand Up @@ -87,6 +92,8 @@ class TNotSortedCollection: public ISourcesCollection {
virtual bool DoCheckInFlightLimits() const override {
return InFlightCount < InFlightLimit;
}
virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override {
}
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override {
if (!source->GetResultRecordsCount() && InFlightLimit * 2 < GetMaxInFlight()) {
InFlightLimit *= 2;
Expand All @@ -103,8 +110,7 @@ class TNotSortedCollection: public ISourcesCollection {
TNotSortedCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit)
: TBase(context)
, Limit(limit)
{
, Limit(limit) {
if (Limit) {
InFlightLimit = 1;
} else {
Expand Down Expand Up @@ -141,6 +147,8 @@ class TSortedFullScanCollection: public ISourcesCollection {
virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords);
}
virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override {
}
virtual std::shared_ptr<IDataSource> DoExtractNext() override {
AFL_VERIFY(HeapSources.size());
auto result = HeapSources.front().Construct(Context);
Expand Down Expand Up @@ -192,6 +200,13 @@ class TScanWithLimitCollection: public ISourcesCollection {
, SourceId(source->GetSourceId())
, SourceIdx(source->GetSourceIdx()) {
}

TFinishedDataSource(const std::shared_ptr<IDataSource>& source, const ui32 partSize)
: RecordsCount(partSize)
, SourceId(source->GetSourceId())
, SourceIdx(source->GetSourceIdx()) {
AFL_VERIFY(partSize < source->GetResultRecordsCount());
}
};

std::deque<TSourceConstructor> HeapSources;
Expand All @@ -203,6 +218,7 @@ class TScanWithLimitCollection: public ISourcesCollection {
std::map<TCompareKeyForScanSequence, TFinishedDataSource> FinishedSources;
std::set<TCompareKeyForScanSequence> FetchingInFlightSources;

virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) override;
virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords);
}
Expand All @@ -214,7 +230,8 @@ class TScanWithLimitCollection: public ISourcesCollection {
}
virtual std::shared_ptr<IDataSource> DoExtractNext() override;
virtual bool DoCheckInFlightLimits() const override {
return (FetchingInFlightCount < GetMaxInFlight()) && (FullIntervalsFetchingCount < InFlightLimit);
return (FetchingInFlightCount < InFlightLimit);
//&&(FullIntervalsFetchingCount < InFlightLimit);
}
virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override;
ui32 GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
auto cursor = SourcesCollection->BuildCursor(frontSource, startIndex + recordsCount);
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table,
cursor, Context->GetCommonContext(), sourceIdxToContinue));
SourcesCollection->OnIntervalResult(table, frontSource);
} else if (sourceIdxToContinue) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,22 @@ class TPortionPage {
class TReplaceKeyAdapter {
private:
bool Reverse = false;
NArrow::TReplaceKey Value;
NArrow::TComparablePosition Value;

public:
TReplaceKeyAdapter(const NArrow::TReplaceKey& rk, const bool reverse)
: Reverse(reverse)
, Value(rk) {
}

TReplaceKeyAdapter(const NArrow::TComparablePosition& pos, const bool reverse)
: Reverse(reverse)
, Value(pos) {
}

std::partial_ordering Compare(const TReplaceKeyAdapter& item) const {
AFL_VERIFY(Reverse == item.Reverse);
const std::partial_ordering result = Value.CompareNotNull(item.Value);
const std::partial_ordering result = Value.Compare(item.Value);
if (result == std::partial_ordering::equivalent) {
return std::partial_ordering::equivalent;
} else if (result == std::partial_ordering::less) {
Expand Down
Loading
Loading