Skip to content

Commit 00b4b43

Browse files
authored
Merge 483ff2c into 55bb8ab
2 parents 55bb8ab + 483ff2c commit 00b4b43

File tree

14 files changed

+181
-176
lines changed

14 files changed

+181
-176
lines changed

ydb/core/tx/columnshard/counters/portion_index.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {
2525

2626
{
2727
auto findClass = TotalStats.find(portionClass);
28-
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
28+
AFL_VERIFY(findClass != TotalStats.end())("path_id", portion.GetPathId());
2929
findClass->second.RemovePortion(portion);
3030
if (findClass->second.IsEmpty()) {
3131
TotalStats.erase(findClass);
@@ -34,9 +34,9 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {
3434

3535
{
3636
auto findPathId = StatsByPathId.find(portion.GetPathId());
37-
AFL_VERIFY(!findPathId.IsEnd())("path_id", portion.GetPathId());
37+
AFL_VERIFY(findPathId != StatsByPathId.end())("path_id", portion.GetPathId());
3838
auto findClass = findPathId->second.find(portionClass);
39-
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
39+
AFL_VERIFY(findClass != findPathId->second.end())("path_id", portion.GetPathId());
4040
findClass->second.RemovePortion(portion);
4141
if (findClass->second.IsEmpty()) {
4242
findPathId->second.erase(findClass);

ydb/core/tx/columnshard/engines/reader/common/description.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ struct TReadDescription {
3131
std::vector<ui32> ColumnIds;
3232

3333
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
34-
AFL_VERIFY(ScanCursor);
3534
return ScanCursor;
3635
}
3736

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ TConclusionStatus TReadMetadata::Init(
4242
return TConclusionStatus::Success();
4343
}
4444

45+
TReadMetadata::TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read)
46+
: TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC,
47+
read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursor())
48+
, PathId(read.PathId)
49+
, ReadStats(std::make_shared<TReadStats>()) {
50+
}
51+
4552
std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
4653
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
4754
const auto& ids = GetProgram().GetEarlyFilterColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,7 @@ class TReadMetadata: public TReadMetadataBase {
117117
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
118118
std::shared_ptr<TReadStats> ReadStats;
119119

120-
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
121-
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
122-
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
123-
, PathId(pathId)
124-
, ReadStats(std::make_shared<TReadStats>()) {
125-
}
120+
TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read);
126121

127122
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
128123
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ class TColumnsSetIds {
8585
}
8686
return result;
8787
}
88+
89+
TColumnsSetIds& operator+=(const TColumnsSetIds& external) {
90+
return (*this = *this + external);
91+
}
92+
93+
TColumnsSetIds& operator-=(const TColumnsSetIds& external) {
94+
return (*this = *this - external);
95+
}
96+
8897
bool IsEmpty() const {
8998
return ColumnIds.empty();
9099
}

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class TOptionalAssemblerStep: public IFetchingStep {
126126
class TColumnBlobsFetchingStep: public IFetchingStep {
127127
private:
128128
using TBase = IFetchingStep;
129-
TColumnsSetIds Columns;
129+
YDB_READONLY_DEF(TColumnsSetIds, Columns);
130130

131131
protected:
132132
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,9 @@ TString TFetchingScript::DebugString() const {
9292
return sb;
9393
}
9494

95-
TFetchingScript::TFetchingScript(const TSpecialReadContext& /*context*/) {
96-
}
97-
98-
void TFetchingScript::Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
95+
void TFetchingScriptBuilder::AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
9996
if (Steps.size() == 0) {
100-
AddStep<TAllocateMemoryStep>(entityIds, mType, stage);
97+
AddStep(std::make_shared<TAllocateMemoryStep>(entityIds, mType, stage));
10198
} else {
10299
std::optional<ui32> addIndex;
103100
for (i32 i = Steps.size() - 1; i >= 0; --i) {
@@ -130,42 +127,52 @@ TString IFetchingStep::DebugString() const {
130127
return sb;
131128
}
132129

133-
bool TColumnsAccumulator::AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
134-
auto actualColumns = GetNotFetchedAlready(columns);
135-
FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns;
136-
if (!actualColumns.IsEmpty()) {
137-
script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob);
138-
script.AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
139-
return true;
130+
TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& context)
131+
: GuaranteeNotOptional(context.GetMergeColumns())
132+
, FullSchema(context.GetReadMetadata()->GetResultSchema()) {
133+
}
134+
135+
void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
136+
auto actualColumns = columns - AddedFetchingColumns;
137+
AddedFetchingColumns += columns;
138+
if (actualColumns.IsEmpty()) {
139+
return;
140+
}
141+
if (Steps.size() && std::dynamic_pointer_cast<TColumnBlobsFetchingStep>(Steps.back())) {
142+
TColumnsSetIds fetchingColumns = actualColumns + std::dynamic_pointer_cast<TColumnBlobsFetchingStep>(Steps.back())->GetColumns();
143+
Steps.pop_back();
144+
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob);
145+
AddStep(std::make_shared<TColumnBlobsFetchingStep>(fetchingColumns));
146+
} else {
147+
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob);
148+
AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
140149
}
141-
return false;
142150
}
143151

144-
bool TColumnsAccumulator::AddAssembleStep(
145-
TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) {
146-
auto actualColumns = columns - AssemblerReadyColumns;
147-
AssemblerReadyColumns = AssemblerReadyColumns + columns;
152+
void TFetchingScriptBuilder::AddAssembleStep(
153+
const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) {
154+
auto actualColumns = columns - AddedAssembleColumns;
155+
AddedAssembleColumns += columns;
148156
if (actualColumns.IsEmpty()) {
149-
return false;
157+
return;
150158
}
151159
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
152160
if (sequential) {
153161
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
154162
if (notSequentialColumnIds.size()) {
155-
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
163+
AddAllocation(notSequentialColumnIds, stage, EMemType::Raw);
156164
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
157-
script.AddStep<TAssemblerStep>(cross, purposeId);
165+
AddStep(std::make_shared<TAssemblerStep>(cross, purposeId));
158166
*actualSet = *actualSet - *cross;
159167
}
160168
if (!actualSet->IsEmpty()) {
161-
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
162-
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
169+
AddAllocation(notSequentialColumnIds, stage, EMemType::RawSequential);
170+
AddStep(std::make_shared<TOptionalAssemblerStep>(actualSet, purposeId));
163171
}
164172
} else {
165-
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
166-
script.AddStep<TAssemblerStep>(actualSet, purposeId);
173+
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
174+
AddStep(std::make_shared<TAssemblerStep>(actualSet, purposeId));
167175
}
168-
return true;
169176
}
170177

171178
TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,16 @@ class IFetchingStep: public TNonCopyable {
115115

116116
class TFetchingScript {
117117
private:
118-
YDB_ACCESSOR(TString, BranchName, "UNDEFINED");
118+
YDB_READONLY_DEF(TString, BranchName);
119119
std::vector<std::shared_ptr<IFetchingStep>> Steps;
120120
TAtomic StartInstant;
121121
TAtomic FinishInstant;
122122

123123
public:
124-
TFetchingScript(const TSpecialReadContext& context);
125-
126-
void Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);
124+
TFetchingScript(const TString& branchName, std::vector<std::shared_ptr<IFetchingStep>>&& steps)
125+
: BranchName(branchName)
126+
, Steps(std::move(steps)) {
127+
}
127128

128129
void AddStepDataSize(const ui32 index, const ui64 size) {
129130
GetStep(index)->AddDataSize(size);
@@ -145,26 +146,6 @@ class TFetchingScript {
145146
return Steps[index];
146147
}
147148

148-
template <class T, typename... Args>
149-
std::shared_ptr<T> AddStep(Args... args) {
150-
auto result = std::make_shared<T>(args...);
151-
Steps.emplace_back(result);
152-
return result;
153-
}
154-
155-
template <class T, typename... Args>
156-
std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
157-
AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size());
158-
auto result = std::make_shared<T>(args...);
159-
Steps.insert(Steps.begin() + index, result);
160-
return result;
161-
}
162-
163-
void AddStep(const std::shared_ptr<IFetchingStep>& step) {
164-
AFL_VERIFY(step);
165-
Steps.emplace_back(step);
166-
}
167-
168149
bool IsFinished(const ui32 currentStepIdx) const {
169150
AFL_VERIFY(currentStepIdx <= Steps.size());
170151
return currentStepIdx == Steps.size();
@@ -231,26 +212,41 @@ class TFetchingScriptOwner: TNonCopyable {
231212
}
232213
};
233214

234-
class TColumnsAccumulator {
215+
class TFetchingScriptBuilder {
235216
private:
236-
TColumnsSetIds FetchingReadyColumns;
237-
TColumnsSetIds AssemblerReadyColumns;
238-
ISnapshotSchema::TPtr FullSchema;
239217
std::shared_ptr<TColumnsSetIds> GuaranteeNotOptional;
218+
ISnapshotSchema::TPtr FullSchema;
219+
220+
YDB_ACCESSOR(TString, BranchName, "UNDEFINED");
221+
std::vector<std::shared_ptr<IFetchingStep>> Steps;
222+
YDB_READONLY_DEF(TColumnsSetIds, AddedFetchingColumns);
223+
YDB_READONLY_DEF(TColumnsSetIds, AddedAssembleColumns);
224+
225+
private:
226+
void AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);
227+
228+
template <class T, typename... Args>
229+
std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
230+
AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size());
231+
auto result = std::make_shared<T>(args...);
232+
Steps.insert(Steps.begin() + index, result);
233+
return result;
234+
}
240235

241236
public:
242-
TColumnsAccumulator(const std::shared_ptr<TColumnsSetIds>& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema)
243-
: FullSchema(fullSchema)
244-
, GuaranteeNotOptional(guaranteeNotOptional) {
237+
TFetchingScriptBuilder(const TSpecialReadContext& context);
238+
239+
std::shared_ptr<TFetchingScript> Build()&& {
240+
return std::make_shared<TFetchingScript>(BranchName, std::move(Steps));
245241
}
246242

247-
TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const {
248-
return columns - FetchingReadyColumns;
243+
void AddStep(const std::shared_ptr<IFetchingStep>& step) {
244+
AFL_VERIFY(step);
245+
Steps.emplace_back(step);
249246
}
250247

251-
bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage);
252-
bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage,
253-
const bool sequential);
248+
void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage);
249+
void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential);
254250
};
255251

256252
class TFetchingScriptCursor {

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ SRCS(
1515

1616
PEERDIR(
1717
ydb/core/tx/columnshard/engines/scheme
18+
yql/essentials/minikql
1819
)
1920

2021
GENERATE_ENUM_SERIALIZATION(columns_set.h)

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3535

3636
TDataStorageAccessor dataAccessor(insertTable, index);
3737
AFL_VERIFY(read.PathId);
38-
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
39-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr);
38+
auto readMetadata = std::make_shared<TReadMetadata>(index->CopyVersionedIndexPtr(), read);
4039

4140
auto initResult = readMetadata->Init(self, read, dataAccessor);
4241
if (!initResult) {

0 commit comments

Comments
 (0)