Skip to content

Commit e9d24b0

Browse files
authored
Merge 8421db6 into 34e4e1e
2 parents 34e4e1e + 8421db6 commit e9d24b0

File tree

14 files changed

+75
-69
lines changed

14 files changed

+75
-69
lines changed

ydb/core/tx/columnshard/counters/portion_index.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {
2525

2626
{
2727
auto findClass = TotalStats.find(portionClass);
28-
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
28+
AFL_VERIFY(findClass != TotalStats.end())("path_id", portion.GetPathId());
2929
findClass->second.RemovePortion(portion);
3030
if (findClass->second.IsEmpty()) {
3131
TotalStats.erase(findClass);
@@ -34,9 +34,9 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {
3434

3535
{
3636
auto findPathId = StatsByPathId.find(portion.GetPathId());
37-
AFL_VERIFY(!findPathId.IsEnd())("path_id", portion.GetPathId());
37+
AFL_VERIFY(findPathId != StatsByPathId.end())("path_id", portion.GetPathId());
3838
auto findClass = findPathId->second.find(portionClass);
39-
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
39+
AFL_VERIFY(findClass != findPathId->second.end())("path_id", portion.GetPathId());
4040
findClass->second.RemovePortion(portion);
4141
if (findClass->second.IsEmpty()) {
4242
findPathId->second.erase(findClass);

ydb/core/tx/columnshard/engines/reader/common/description.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ struct TReadDescription {
3131
std::vector<ui32> ColumnIds;
3232

3333
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
34-
AFL_VERIFY(ScanCursor);
3534
return ScanCursor;
3635
}
3736

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ TConclusionStatus TReadMetadata::Init(
4242
return TConclusionStatus::Success();
4343
}
4444

45+
TReadMetadata::TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read)
46+
: TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC,
47+
read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursor())
48+
, PathId(read.PathId)
49+
, ReadStats(std::make_shared<TReadStats>()) {
50+
}
51+
4552
std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
4653
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
4754
const auto& ids = GetProgram().GetEarlyFilterColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,7 @@ class TReadMetadata: public TReadMetadataBase {
117117
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
118118
std::shared_ptr<TReadStats> ReadStats;
119119

120-
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
121-
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
122-
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
123-
, PathId(pathId)
124-
, ReadStats(std::make_shared<TReadStats>()) {
125-
}
120+
TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read);
126121

127122
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
128123
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,21 @@ class TColumnsSetIds {
8585
}
8686
return result;
8787
}
88+
89+
TColumnsSetIds& operator+=(const TColumnsSetIds& external) {
90+
return (*this = *this + external);
91+
}
92+
93+
TColumnsSetIds& operator-=(const TColumnsSetIds& external) {
94+
return (*this = *this - external);
95+
}
96+
8897
bool IsEmpty() const {
8998
return ColumnIds.empty();
9099
}
100+
bool Size() const {
101+
return ColumnIds.size();
102+
}
91103

92104
bool operator!() const {
93105
return IsEmpty();

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons
7575

7676
TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
7777
ui64 size = PredefinedSize.value_or(0);
78-
for (auto&& i : Packs) {
79-
ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
80-
if (source->GetStageData().GetUseFilter() && i.GetMemType() != EMemType::Blob && source->GetContext()->GetReadMetadata()->HasLimit() &&
81-
(HasAppData() && !AppDataVerified().ColumnShardConfig.GetUseSlicesFilter())) {
78+
for (auto&& [memType, columns] : ColumnsByMemType) {
79+
ui32 sizeLocal = source->GetColumnsVolume(columns.GetColumnIds(), static_cast<EMemType>(memType));
80+
if (source->GetStageData().GetUseFilter() && static_cast<EMemType>(memType) != EMemType::Blob &&
81+
source->GetContext()->GetReadMetadata()->HasLimit() && (HasAppData() && !AppDataVerified().ColumnShardConfig.GetUseSlicesFilter())) {
8282
const ui32 filtered =
8383
source->GetStageData().GetFilteredCount(source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->GetLimitRobust());
8484
if (filtered < source->GetRecordsCount()) {

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,7 @@ namespace NKikimr::NOlap::NReader::NCommon {
88
class TAllocateMemoryStep: public IFetchingStep {
99
private:
1010
using TBase = IFetchingStep;
11-
class TColumnsPack {
12-
private:
13-
YDB_READONLY_DEF(TColumnsSetIds, Columns);
14-
YDB_READONLY(EMemType, MemType, EMemType::Blob);
15-
16-
public:
17-
TColumnsPack(const TColumnsSetIds& columns, const EMemType memType)
18-
: Columns(columns)
19-
, MemType(memType) {
20-
}
21-
};
22-
std::vector<TColumnsPack> Packs;
23-
THashMap<ui32, THashSet<EMemType>> Control;
11+
THashMap<ui32, TColumnsSetIds> ColumnsByMemType;
2412
const EStageFeaturesIndexes StageIndex;
2513
const std::optional<ui64> PredefinedSize;
2614

@@ -51,10 +39,7 @@ class TAllocateMemoryStep: public IFetchingStep {
5139
if (!ids.GetColumnsCount()) {
5240
return;
5341
}
54-
for (auto&& i : ids.GetColumnIds()) {
55-
AFL_VERIFY(Control[i].emplace(memType).second);
56-
}
57-
Packs.emplace_back(ids, memType);
42+
ColumnsByMemType[(ui32)memType] += ids;
5843
}
5944
EStageFeaturesIndexes GetStage() const {
6045
return StageIndex;
@@ -134,6 +119,15 @@ class TColumnBlobsFetchingStep: public IFetchingStep {
134119
return TStringBuilder() << "columns=" << Columns.DebugString() << ";";
135120
}
136121

122+
virtual bool Merge(const std::shared_ptr<const IFetchingStep>& nextStep) override {
123+
const auto step = std::dynamic_pointer_cast<const TColumnBlobsFetchingStep>(nextStep);
124+
if (!step) {
125+
return false;
126+
}
127+
Columns = Columns + step->Columns;
128+
return true;
129+
}
130+
137131
public:
138132
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
139133
TColumnBlobsFetchingStep(const TColumnsSetIds& columns)

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ class IFetchingStep: public TNonCopyable {
9595
Signals.AddBytes(size);
9696
}
9797

98+
virtual bool Merge(const std::shared_ptr<const IFetchingStep>& /*nextStep*/) {
99+
return false;
100+
}
101+
98102
virtual ~IFetchingStep() = default;
99103

100104
[[nodiscard]] TConclusion<bool> ExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
@@ -162,6 +166,9 @@ class TFetchingScript {
162166

163167
void AddStep(const std::shared_ptr<IFetchingStep>& step) {
164168
AFL_VERIFY(step);
169+
if (Steps.size() && Steps.back()->Merge(step)) {
170+
return;
171+
}
165172
Steps.emplace_back(step);
166173
}
167174

@@ -233,8 +240,8 @@ class TFetchingScriptOwner: TNonCopyable {
233240

234241
class TColumnsAccumulator {
235242
private:
236-
TColumnsSetIds FetchingReadyColumns;
237-
TColumnsSetIds AssemblerReadyColumns;
243+
YDB_READONLY_DEF(TColumnsSetIds, FetchingReadyColumns);
244+
YDB_READONLY_DEF(TColumnsSetIds, AssemblerReadyColumns);
238245
ISnapshotSchema::TPtr FullSchema;
239246
std::shared_ptr<TColumnsSetIds> GuaranteeNotOptional;
240247

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ SRCS(
1313

1414
PEERDIR(
1515
ydb/core/tx/columnshard/engines/scheme
16+
yql/essentials/minikql
1617
)
1718

1819
GENERATE_ENUM_SERIALIZATION(columns_set.h)

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3535

3636
TDataStorageAccessor dataAccessor(insertTable, index);
3737
AFL_VERIFY(read.PathId);
38-
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
39-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr);
38+
auto readMetadata = std::make_shared<TReadMetadata>(index->CopyVersionedIndexPtr(), read);
4039

4140
auto initResult = readMetadata->Init(self, read, dataAccessor);
4241
if (!initResult) {

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -93,32 +93,31 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
9393
bool hasFilterSharding = false;
9494
if (needFilterSharding && !GetShardingColumns()->IsEmpty()) {
9595
hasFilterSharding = true;
96-
TColumnsSetIds columnsFetch = *GetShardingColumns();
96+
acc.AddFetchingStep(*result, *GetShardingColumns(), EStageFeaturesIndexes::Filter);
9797
if (!exclusiveSource) {
98-
columnsFetch = columnsFetch + *GetPKColumns() + *GetSpecColumns();
98+
acc.AddFetchingStep(*result, *GetPKColumns(), EStageFeaturesIndexes::Filter);
99+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Filter);
99100
}
100-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
101-
acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false);
101+
acc.AddAssembleStep(*result, acc.GetFetchingReadyColumns(), "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false);
102102
result->AddStep(std::make_shared<TShardingFilter>());
103103
}
104104
if (!GetEFColumns()->GetColumnsCount() && !partialUsageByPredicate) {
105105
result->SetBranchName("simple");
106-
TColumnsSetIds columnsFetch = *GetFFColumns();
106+
acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching);
107107
if (needFilterDeletion) {
108-
columnsFetch = columnsFetch + *GetDeletionColumns();
108+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Fetching);
109109
}
110110
if (needSnapshots) {
111-
columnsFetch = columnsFetch + *GetSpecColumns();
111+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Fetching);
112112
}
113113
if (!exclusiveSource) {
114-
columnsFetch = columnsFetch + *GetMergeColumns();
114+
acc.AddFetchingStep(*result, *GetMergeColumns(), EStageFeaturesIndexes::Fetching);
115115
} else {
116-
if (columnsFetch.GetColumnsCount() == 1 && GetSpecColumns()->Contains(columnsFetch) && !hasFilterSharding) {
116+
if (acc.GetFetchingReadyColumns().Size() == 1 && GetSpecColumns()->Contains(acc.GetFetchingReadyColumns()) && !hasFilterSharding) {
117117
return nullptr;
118118
}
119119
}
120-
if (columnsFetch.GetColumnsCount() || hasFilterSharding || needFilterDeletion) {
121-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Fetching);
120+
if (acc.GetFetchingReadyColumns().Size() || hasFilterSharding || needFilterDeletion) {
122121
if (needSnapshots) {
123122
acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false);
124123
}
@@ -132,25 +131,25 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
132131
acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false);
133132
result->AddStep(std::make_shared<TDeletionFilter>());
134133
}
135-
acc.AddAssembleStep(*result, columnsFetch, "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
134+
acc.AddAssembleStep(
135+
*result, acc.GetFetchingReadyColumns().GetColumnIds(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
136136
} else {
137137
return nullptr;
138138
}
139139
} else if (exclusiveSource) {
140140
result->SetBranchName("exclusive");
141-
TColumnsSet columnsFetch = *GetEFColumns();
141+
acc.AddFetchingStep(*result, *GetEFColumns(), EStageFeaturesIndexes::Filter);
142142
if (needFilterDeletion) {
143-
columnsFetch = columnsFetch + *GetDeletionColumns();
143+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Filter);
144144
}
145145
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
146-
columnsFetch = columnsFetch + *GetSpecColumns();
146+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Filter);
147147
}
148148
if (partialUsageByPredicate) {
149-
columnsFetch = columnsFetch + *GetPredicateColumns();
149+
acc.AddFetchingStep(*result, *GetPredicateColumns(), EStageFeaturesIndexes::Filter);
150150
}
151151

152-
AFL_VERIFY(columnsFetch.GetColumnsCount());
153-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
152+
AFL_VERIFY(acc.GetFetchingReadyColumns().Size());
154153

155154
if (needFilterDeletion) {
156155
acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false);
@@ -168,12 +167,12 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
168167
acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
169168
} else {
170169
result->SetBranchName("merge");
171-
TColumnsSet columnsFetch = *GetMergeColumns() + *GetEFColumns();
170+
acc.AddFetchingStep(*result, *GetMergeColumns(), EStageFeaturesIndexes::Filter);
171+
acc.AddFetchingStep(*result, *GetEFColumns(), EStageFeaturesIndexes::Filter);
172172
if (needFilterDeletion) {
173-
columnsFetch = columnsFetch + *GetDeletionColumns();
173+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Filter);
174174
}
175-
AFL_VERIFY(columnsFetch.GetColumnsCount());
176-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
175+
AFL_VERIFY(acc.GetFetchingReadyColumns().Size());
177176

178177
acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false);
179178
acc.AddAssembleStep(*result, *GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false);

ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3434

3535
TDataStorageAccessor dataAccessor(insertTable, index);
3636
AFL_VERIFY(read.PathId);
37-
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
38-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), read.GetScanCursor());
37+
auto readMetadata = std::make_shared<TReadMetadata>(index->CopyVersionedIndexPtr(), read);
3938

4039
auto initResult = readMetadata->Init(self, read, dataAccessor);
4140
if (!initResult) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,15 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
8080
}
8181
{
8282
result->SetBranchName("exclusive");
83-
TColumnsSet columnsFetch = *GetEFColumns();
83+
acc.AddFetchingStep(*result, *GetEFColumns(), EStageFeaturesIndexes::Filter);
8484
if (needFilterDeletion) {
85-
columnsFetch = columnsFetch + *GetDeletionColumns();
85+
acc.AddFetchingStep(*result, *GetDeletionColumns(), EStageFeaturesIndexes::Filter);
8686
}
8787
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
88-
columnsFetch = columnsFetch + *GetSpecColumns();
88+
acc.AddFetchingStep(*result, *GetSpecColumns(), EStageFeaturesIndexes::Filter);
8989
}
9090
if (partialUsageByPredicate) {
91-
columnsFetch = columnsFetch + *GetPredicateColumns();
92-
}
93-
94-
if (columnsFetch.GetColumnsCount()) {
95-
acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
91+
acc.AddFetchingStep(*result, *GetPredicateColumns(), EStageFeaturesIndexes::Filter);
9692
}
9793

9894
if (needFilterDeletion) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
1010

11-
#include <yql/essentials/minikql/mkql_terminator.h>
12-
1311
namespace NKikimr::NOlap::NReader::NSimple {
1412

1513
TConclusion<bool> TIndexBlobsFetchingStep::DoExecuteInplace(

0 commit comments

Comments
 (0)