Skip to content

Commit 160238c

Browse files
optimizer predicates usage on fetching (#1505)
* dont use predicate columns fetching and filter usage in case whole portion matched in predicates ranges * fix * fix
1 parent 320f520 commit 160238c

File tree

9 files changed

+100
-29
lines changed

9 files changed

+100
-29
lines changed

ydb/core/tx/columnshard/engines/predicate/filter.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInf
8484
return SortedRanges.empty();
8585
}
8686

87+
bool TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
88+
for (auto&& i : SortedRanges) {
89+
if (i.IsPortionInPartialUsage(start, end, indexInfo)) {
90+
return true;
91+
}
92+
}
93+
return false;
94+
}
95+
8796
TPKRangesFilter::TPKRangesFilter(const bool reverse)
8897
: ReverseFlag(reverse)
8998
{

ydb/core/tx/columnshard/engines/predicate/filter.h

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class TPKRangesFilter {
3838
}
3939

4040
bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
41+
bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;
4142

4243
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
4344

ydb/core/tx/columnshard/engines/predicate/range.cpp

+31
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,37 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo
5959
return true;
6060
}
6161

62+
bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
63+
bool startUsage = false;
64+
bool endUsage = false;
65+
if (auto from = PredicateFrom.ExtractKey(indexInfo.GetPrimaryKey())) {
66+
AFL_VERIFY(from->Size() <= start.Size());
67+
if (PredicateFrom.IsInclude()) {
68+
startUsage = std::is_lt(start.ComparePartNotNull(*from, from->Size()));
69+
} else {
70+
startUsage = std::is_lteq(start.ComparePartNotNull(*from, from->Size()));
71+
}
72+
} else {
73+
startUsage = true;
74+
}
75+
76+
if (auto to = PredicateTo.ExtractKey(indexInfo.GetPrimaryKey())) {
77+
AFL_VERIFY(to->Size() <= end.Size());
78+
if (PredicateTo.IsInclude()) {
79+
endUsage = std::is_gt(end.ComparePartNotNull(*to, to->Size()));
80+
} else {
81+
endUsage = std::is_gteq(end.ComparePartNotNull(*to, to->Size()));
82+
}
83+
} else {
84+
endUsage = true;
85+
}
86+
87+
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", start.DebugString())("end", end.DebugString())("from", PredicateFrom.DebugString())("to", PredicateTo.DebugString())
88+
// ("start_usage", startUsage)("end_usage", endUsage);
89+
90+
return endUsage || startUsage;
91+
}
92+
6293
std::optional<NKikimr::NOlap::TPKRangeFilter> TPKRangeFilter::Build(TPredicateContainer&& from, TPredicateContainer&& to) {
6394
if (!from.CrossRanges(to)) {
6495
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot_build_predicate_range")("error", "predicates from/to not intersected");

ydb/core/tx/columnshard/engines/predicate/range.h

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TPKRangeFilter {
4141
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
4242

4343
bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
44+
bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;
4445

4546
std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const;
4647
TString DebugString() const;

ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp

+33-12
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,23 @@ ui64 TSpecialReadContext::GetMemoryForSources(const std::map<ui32, std::shared_p
1919

2020
std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const {
2121
const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax();
22-
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0];
22+
const bool partialUsageByPK = ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey(), ReadMetadata->GetIndexInfo());
23+
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0];
2324
if (!result) {
2425
return std::make_shared<TBuildFakeSpec>(source->GetRecordsCount(), "fake");
2526
}
2627
return result;
2728
}
2829

29-
std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const {
30+
std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt) const {
3031
std::shared_ptr<IFetchingStep> result = std::make_shared<TFakeStep>();
3132
std::shared_ptr<IFetchingStep> current = result;
33+
const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount();
3234
if (!!IndexChecker) {
3335
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TIndexesSet>(IndexChecker->GetIndexIds())));
3436
current = current->AttachNext(std::make_shared<TApplyIndexStep>(IndexChecker));
3537
}
36-
if (!EFColumns->GetColumnsCount()) {
38+
if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) {
3739
TColumnsSet columnsFetch = *FFColumns;
3840
if (needSnapshots) {
3941
columnsFetch = columnsFetch + *SpecColumns;
@@ -52,6 +54,9 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
5254
if (needSnapshots || FFColumns->Contains(SpecColumns)) {
5355
columnsFetch = columnsFetch + *SpecColumns;
5456
}
57+
if (partialUsageByPredicate) {
58+
columnsFetch = columnsFetch + *PredicateColumns;
59+
}
5560
AFL_VERIFY(columnsFetch.GetColumnsCount());
5661
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef"));
5762

@@ -60,17 +65,21 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
6065
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
6166
columnsFetch = columnsFetch - *SpecColumns;
6267
}
63-
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
64-
if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
68+
if (partialUsageByPredicate) {
69+
current = current->AttachNext(std::make_shared<TAssemblerStep>(PredicateColumns));
6570
current = current->AttachNext(std::make_shared<TPredicateFilter>());
71+
columnsFetch = columnsFetch - *PredicateColumns;
72+
}
73+
if (columnsFetch.GetColumnsCount()) {
74+
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
6675
}
6776
for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
6877
if (!i->IsFilterOnly()) {
6978
break;
7079
}
7180
current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
7281
}
73-
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns;
82+
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PredicateColumns;
7483
if (columnsAdditionalFetch.GetColumnsCount()) {
7584
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
7685
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
@@ -84,7 +93,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
8493
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
8594
}
8695
current = current->AttachNext(std::make_shared<TAssemblerStep>(PKColumns));
87-
if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
96+
if (partialUsageByPredicate) {
8897
current = current->AttachNext(std::make_shared<TPredicateFilter>());
8998
}
9099
const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns;
@@ -95,7 +104,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
95104
}
96105
current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
97106
}
98-
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns;
107+
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns - *PredicateColumns;
99108
if (columnsAdditionalFetch.GetColumnsCount()) {
100109
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
101110
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
@@ -114,6 +123,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
114123
auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot());
115124
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
116125
IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
126+
{
127+
auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
128+
if (predicateColumns.size()) {
129+
PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, ReadMetadata->GetIndexInfo(), readSchema);
130+
} else {
131+
PredicateColumns = std::make_shared<TColumnsSet>();
132+
}
133+
}
117134
{
118135
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
119136
if (efColumns.size()) {
@@ -144,10 +161,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
144161
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);
145162

146163
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
147-
CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false);
148-
CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true);
149-
CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false);
150-
CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true);
164+
CacheFetchingScripts[0][0][0] = BuildColumnsFetchingPlan(false, false, false);
165+
CacheFetchingScripts[0][1][0] = BuildColumnsFetchingPlan(false, true, false);
166+
CacheFetchingScripts[1][0][0] = BuildColumnsFetchingPlan(true, false, false);
167+
CacheFetchingScripts[1][1][0] = BuildColumnsFetchingPlan(true, true, false);
168+
CacheFetchingScripts[0][0][1] = BuildColumnsFetchingPlan(false, false, true);
169+
CacheFetchingScripts[0][1][1] = BuildColumnsFetchingPlan(false, true, true);
170+
CacheFetchingScripts[1][0][1] = BuildColumnsFetchingPlan(true, false, true);
171+
CacheFetchingScripts[1][1][1] = BuildColumnsFetchingPlan(true, true, true);
151172
}
152173

153174
}

ydb/core/tx/columnshard/engines/reader/plain_reader/context.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class TSpecialReadContext {
1515
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, SpecColumns);
1616
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, MergeColumns);
1717
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns);
18+
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PredicateColumns);
1819
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns);
1920
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns);
2021
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ProgramInputColumns);
@@ -25,8 +26,8 @@ class TSpecialReadContext {
2526
std::shared_ptr<TColumnsSet> PKFFColumns;
2627
std::shared_ptr<TColumnsSet> EFPKColumns;
2728
std::shared_ptr<TColumnsSet> FFMinusEFColumns;
28-
std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const;
29-
std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2> CacheFetchingScripts;
29+
std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate) const;
30+
std::array<std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2>, 2> CacheFetchingScripts;
3031
public:
3132
ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive);
3233

ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp

+2-7
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,10 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte
3333
} else {
3434
insertedPortionsBytes += (*itPortion)->BlobsBytes();
3535
}
36-
auto start = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyStart());
37-
auto finish = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyEnd());
38-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", start.DebugJson())("finish", finish.DebugJson());
39-
sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, start, finish));
36+
sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, (*itPortion)->IndexKeyStart(), (*itPortion)->IndexKeyEnd()));
4037
++itPortion;
4138
} else {
42-
auto start = GetReadMetadata()->BuildSortedPosition(itCommitted->GetFirstVerified());
43-
auto finish = GetReadMetadata()->BuildSortedPosition(itCommitted->GetLastVerified());
44-
sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, start, finish));
39+
sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, itCommitted->GetFirstVerified(), itCommitted->GetLastVerified()));
4540
committedPortionsBytes += itCommitted->GetSize();
4641
++itCommitted;
4742
}

ydb/core/tx/columnshard/engines/reader/plain_reader/source.h

+19-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class IDataSource {
2626
YDB_READONLY(ui32, SourceIdx, 0);
2727
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start);
2828
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish);
29+
NArrow::TReplaceKey StartReplaceKey;
30+
NArrow::TReplaceKey FinishReplaceKey;
2931
YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context);
3032
YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero());
3133
std::optional<ui32> RecordsCount;
@@ -52,6 +54,13 @@ class IDataSource {
5254
virtual void DoAbort() = 0;
5355
virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0;
5456
public:
57+
const NArrow::TReplaceKey& GetStartReplaceKey() const {
58+
return StartReplaceKey;
59+
}
60+
const NArrow::TReplaceKey& GetFinishReplaceKey() const {
61+
return FinishReplaceKey;
62+
}
63+
5564
const TFetchedResult& GetStageResult() const {
5665
AFL_VERIFY(!!StageResult);
5766
return *StageResult;
@@ -147,16 +156,19 @@ class IDataSource {
147156
void RegisterInterval(TFetchingInterval& interval);
148157

149158
IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context,
150-
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
159+
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish,
151160
const TSnapshot& recordSnapshotMax, const std::optional<ui32> recordsCount
152161
)
153162
: SourceIdx(sourceIdx)
154-
, Start(start)
155-
, Finish(finish)
163+
, Start(context->GetReadMetadata()->BuildSortedPosition(start))
164+
, Finish(context->GetReadMetadata()->BuildSortedPosition(finish))
165+
, StartReplaceKey(start)
166+
, FinishReplaceKey(finish)
156167
, Context(context)
157168
, RecordSnapshotMax(recordSnapshotMax)
158169
, RecordsCount(recordsCount)
159170
{
171+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson());
160172
if (Start.IsReverseSort()) {
161173
std::swap(Start, Finish);
162174
}
@@ -210,10 +222,10 @@ class TPortionDataSource: public IDataSource {
210222
}
211223

212224
TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context,
213-
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
225+
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
214226
: TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount())
215-
, Portion(portion) {
216-
227+
, Portion(portion)
228+
{
217229
}
218230
};
219231

@@ -256,7 +268,7 @@ class TCommittedDataSource: public IDataSource {
256268
}
257269

258270
TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context,
259-
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
271+
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
260272
: TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {})
261273
, CommittedBlob(committed) {
262274

ydb/core/tx/columnshard/engines/reader/read_metadata.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto
4848

4949
std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
5050
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
51-
std::set<ui32> result = GetPKRangesFilter().GetColumnIds(indexInfo);
51+
std::set<ui32> result;
5252
for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
5353
auto id = indexInfo.GetColumnIdOptional(i);
5454
if (id) {

0 commit comments

Comments
 (0)