Skip to content

Commit 106817f

Browse files
authored
Merge 1e9860a into 555fbb8
2 parents 555fbb8 + 1e9860a commit 106817f

File tree

11 files changed

+57
-48
lines changed

11 files changed

+57
-48
lines changed

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ TConclusionStatus TReadMetadata::Init(
4242
return TConclusionStatus::Success();
4343
}
4444

45+
TReadMetadata::TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read)
46+
: TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC,
47+
read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursor())
48+
, PathId(read.PathId)
49+
, ReadStats(std::make_shared<TReadStats>()) {
50+
}
51+
4552
std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
4653
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
4754
const auto& ids = GetProgram().GetEarlyFilterColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,7 @@ class TReadMetadata: public TReadMetadataBase {
117117
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
118118
std::shared_ptr<TReadStats> ReadStats;
119119

120-
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
121-
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
122-
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
123-
, PathId(pathId)
124-
, ReadStats(std::make_shared<TReadStats>()) {
125-
}
120+
TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read);
126121

127122
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
128123
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ class TColumnsSetIds {
8888
bool IsEmpty() const {
8989
return ColumnIds.empty();
9090
}
91+
bool Size() const {
92+
return ColumnIds.size();
93+
}
9194

9295
bool operator!() const {
9396
return IsEmpty();

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ class TAllocateMemoryStep: public IFetchingStep {
2020
}
2121
};
2222
std::vector<TColumnsPack> Packs;
23-
THashMap<ui32, THashSet<EMemType>> Control;
2423
const EStageFeaturesIndexes StageIndex;
2524
const std::optional<ui64> PredefinedSize;
2625

@@ -51,9 +50,6 @@ class TAllocateMemoryStep: public IFetchingStep {
5150
if (!ids.GetColumnsCount()) {
5251
return;
5352
}
54-
for (auto&& i : ids.GetColumnIds()) {
55-
AFL_VERIFY(Control[i].emplace(memType).second);
56-
}
5753
Packs.emplace_back(ids, memType);
5854
}
5955
EStageFeaturesIndexes GetStage() const {
@@ -134,6 +130,15 @@ class TColumnBlobsFetchingStep: public IFetchingStep {
134130
return TStringBuilder() << "columns=" << Columns.DebugString() << ";";
135131
}
136132

133+
virtual bool Merge(const std::shared_ptr<const IFetchingStep>& nextStep) override {
134+
const auto step = std::dynamic_pointer_cast<const TColumnBlobsFetchingStep>(nextStep);
135+
if (!step) {
136+
return false;
137+
}
138+
Columns = Columns + step->Columns;
139+
return true;
140+
}
141+
137142
public:
138143
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
139144
TColumnBlobsFetchingStep(const TColumnsSetIds& columns)

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ class IFetchingStep: public TNonCopyable {
9595
Signals.AddBytes(size);
9696
}
9797

98+
virtual bool Merge(const std::shared_ptr<const IFetchingStep>& /*nextStep*/) {
99+
return false;
100+
}
101+
98102
virtual ~IFetchingStep() = default;
99103

100104
[[nodiscard]] TConclusion<bool> ExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
@@ -162,6 +166,9 @@ class TFetchingScript {
162166

163167
void AddStep(const std::shared_ptr<IFetchingStep>& step) {
164168
AFL_VERIFY(step);
169+
if (Steps.size() && Steps.back()->Merge(step)) {
170+
return;
171+
}
165172
Steps.emplace_back(step);
166173
}
167174

@@ -233,8 +240,8 @@ class TFetchingScriptOwner: TNonCopyable {
233240

234241
class TColumnsAccumulator {
235242
private:
236-
TColumnsSetIds FetchingReadyColumns;
237-
TColumnsSetIds AssemblerReadyColumns;
243+
YDB_READONLY_DEF(TColumnsSetIds, FetchingReadyColumns);
244+
YDB_READONLY_DEF(TColumnsSetIds, AssemblerReadyColumns);
238245
ISnapshotSchema::TPtr FullSchema;
239246
std::shared_ptr<TColumnsSetIds> GuaranteeNotOptional;
240247

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ SRCS(
1313

1414
PEERDIR(
1515
ydb/core/tx/columnshard/engines/scheme
16+
yql/essentials/minikql
1617
)
1718

1819
GENERATE_ENUM_SERIALIZATION(columns_set.h)

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3535

3636
TDataStorageAccessor dataAccessor(insertTable, index);
3737
AFL_VERIFY(read.PathId);
38-
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
39-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr);
38+
auto readMetadata = std::make_shared<TReadMetadata>(index->CopyVersionedIndexPtr(), read);
4039

4140
auto initResult = readMetadata->Init(self, read, dataAccessor);
4241
if (!initResult) {

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -93,32 +93,31 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
9393
bool hasFilterSharding = false;
9494
if (needFilterSharding && !GetShardingColumns()->IsEmpty()) {
9595
hasFilterSharding = true;
96-
TColumnsSetIds columnsFetch = *GetShardingColumns();
96+
acc.AddFetchingStep(*result, *GetShardingColumns(), EStageFeaturesIndexes::Filter);
9797
if (!exclusiveSource) {
98-
columnsFetch = columnsFetch + *GetPKColumns() + *GetSpecColumns();
98+
acc.AddFetchingStep(*result, *GetPKColumns(), EStageFeaturesIndexes::Filter);
99+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Filter);
99100
}
100-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
101-
acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false);
101+
acc.AddAssembleStep(*result, acc.GetFetchingReadyColumns(), "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false);
102102
result->AddStep(std::make_shared<TShardingFilter>());
103103
}
104104
if (!GetEFColumns()->GetColumnsCount() && !partialUsageByPredicate) {
105105
result->SetBranchName("simple");
106-
TColumnsSetIds columnsFetch = *GetFFColumns();
106+
acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching);
107107
if (needFilterDeletion) {
108-
columnsFetch = columnsFetch + *GetDeletionColumns();
108+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Fetching);
109109
}
110110
if (needSnapshots) {
111-
columnsFetch = columnsFetch + *GetSpecColumns();
111+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Fetching);
112112
}
113113
if (!exclusiveSource) {
114-
columnsFetch = columnsFetch + *GetMergeColumns();
114+
acc.AddFetchingStep(*result, *GetMergeColumns(), EStageFeaturesIndexes::Fetching);
115115
} else {
116-
if (columnsFetch.GetColumnsCount() == 1 && GetSpecColumns()->Contains(columnsFetch) && !hasFilterSharding) {
116+
if (acc.GetFetchingReadyColumns().Size() == 1 && GetSpecColumns()->Contains(acc.GetFetchingReadyColumns()) && !hasFilterSharding) {
117117
return nullptr;
118118
}
119119
}
120-
if (columnsFetch.GetColumnsCount() || hasFilterSharding || needFilterDeletion) {
121-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Fetching);
120+
if (acc.GetFetchingReadyColumns().Size() || hasFilterSharding || needFilterDeletion) {
122121
if (needSnapshots) {
123122
acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false);
124123
}
@@ -132,25 +131,25 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
132131
acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false);
133132
result->AddStep(std::make_shared<TDeletionFilter>());
134133
}
135-
acc.AddAssembleStep(*result, columnsFetch, "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
134+
acc.AddAssembleStep(
135+
*result, acc.GetFetchingReadyColumns().GetColumnIds(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
136136
} else {
137137
return nullptr;
138138
}
139139
} else if (exclusiveSource) {
140140
result->SetBranchName("exclusive");
141-
TColumnsSet columnsFetch = *GetEFColumns();
141+
acc.AddFetchingStep(*result, *GetEFColumns(), EStageFeaturesIndexes::Filter);
142142
if (needFilterDeletion) {
143-
columnsFetch = columnsFetch + *GetDeletionColumns();
143+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Filter);
144144
}
145145
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
146-
columnsFetch = columnsFetch + *GetSpecColumns();
146+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Filter);
147147
}
148148
if (partialUsageByPredicate) {
149-
columnsFetch = columnsFetch + *GetPredicateColumns();
149+
acc.AddFetchingStep(*result, *GetPredicateColumns(), EStageFeaturesIndexes::Filter);
150150
}
151151

152-
AFL_VERIFY(columnsFetch.GetColumnsCount());
153-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
152+
AFL_VERIFY(acc.GetFetchingReadyColumns().Size());
154153

155154
if (needFilterDeletion) {
156155
acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false);
@@ -168,12 +167,12 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
168167
acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
169168
} else {
170169
result->SetBranchName("merge");
171-
TColumnsSet columnsFetch = *GetMergeColumns() + *GetEFColumns();
170+
acc.AddFetchingStep(*result, *GetMergeColumns(), EStageFeaturesIndexes::Filter);
171+
acc.AddFetchingStep(*result, *GetEFColumns(), EStageFeaturesIndexes::Filter);
172172
if (needFilterDeletion) {
173-
columnsFetch = columnsFetch + *GetDeletionColumns();
173+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Filter);
174174
}
175-
AFL_VERIFY(columnsFetch.GetColumnsCount());
176-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
175+
AFL_VERIFY(acc.GetFetchingReadyColumns().Size());
177176

178177
acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false);
179178
acc.AddAssembleStep(*result, *GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false);

ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3434

3535
TDataStorageAccessor dataAccessor(insertTable, index);
3636
AFL_VERIFY(read.PathId);
37-
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
38-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), read.GetScanCursor());
37+
auto readMetadata = std::make_shared<TReadMetadata>(index->CopyVersionedIndexPtr(), read);
3938

4039
auto initResult = readMetadata->Init(self, read, dataAccessor);
4140
if (!initResult) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,15 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
8080
}
8181
{
8282
result->SetBranchName("exclusive");
83-
TColumnsSet columnsFetch = *GetEFColumns();
83+
acc.AddFetchingStep(*result, *GetEFColumns(), EStageFeaturesIndexes::Filter);
8484
if (needFilterDeletion) {
85-
columnsFetch = columnsFetch + *GetDeletionColumns();
85+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Filter);
8686
}
8787
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
88-
columnsFetch = columnsFetch + *GetSpecColumns();
88+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Filter);
8989
}
9090
if (partialUsageByPredicate) {
91-
columnsFetch = columnsFetch + *GetPredicateColumns();
92-
}
93-
94-
if (columnsFetch.GetColumnsCount()) {
95-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
91+
acc.AddFetchingStep(*result, *GetPredicateColumns(), EStageFeaturesIndexes::Filter);
9692
}
9793

9894
if (needFilterDeletion) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
1010

11-
#include <yql/essentials/minikql/mkql_terminator.h>
12-
1311
namespace NKikimr::NOlap::NReader::NSimple {
1412

1513
TConclusion<bool> TIndexBlobsFetchingStep::DoExecuteInplace(

0 commit comments

Comments
 (0)