Skip to content

Commit 19770f3

Browse files
authored
Merge b76a755 into 1e9aba9
2 parents 1e9aba9 + b76a755 commit 19770f3

File tree

17 files changed

+290
-194
lines changed

17 files changed

+290
-194
lines changed

ydb/core/tx/columnshard/counters/portion_index.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {
2525

2626
{
2727
auto findClass = TotalStats.find(portionClass);
28-
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
28+
AFL_VERIFY(findClass != TotalStats.end())("path_id", portion.GetPathId());
2929
findClass->second.RemovePortion(portion);
3030
if (findClass->second.IsEmpty()) {
3131
TotalStats.erase(findClass);
@@ -34,9 +34,9 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {
3434

3535
{
3636
auto findPathId = StatsByPathId.find(portion.GetPathId());
37-
AFL_VERIFY(!findPathId.IsEnd())("path_id", portion.GetPathId());
37+
AFL_VERIFY(findPathId != StatsByPathId.end())("path_id", portion.GetPathId());
3838
auto findClass = findPathId->second.find(portionClass);
39-
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
39+
AFL_VERIFY(findClass != findPathId->second.end())("path_id", portion.GetPathId());
4040
findClass->second.RemovePortion(portion);
4141
if (findClass->second.IsEmpty()) {
4242
findPathId->second.erase(findClass);

ydb/core/tx/columnshard/engines/reader/common/description.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ struct TReadDescription {
3030
// List of columns
3131
std::vector<ui32> ColumnIds;
3232

33-
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
34-
AFL_VERIFY(ScanCursor);
33+
const std::shared_ptr<IScanCursor>& GetScanCursorOptional() const {
3534
return ScanCursor;
3635
}
3736

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ TConclusionStatus TReadMetadata::Init(
4242
return TConclusionStatus::Success();
4343
}
4444

45+
TReadMetadata::TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read)
46+
: TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC,
47+
read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursorOptional())
48+
, PathId(read.PathId)
49+
, ReadStats(std::make_shared<TReadStats>()) {
50+
}
51+
4552
std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
4653
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
4754
const auto& ids = GetProgram().GetEarlyFilterColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,7 @@ class TReadMetadata: public TReadMetadataBase {
117117
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
118118
std::shared_ptr<TReadStats> ReadStats;
119119

120-
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
121-
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
122-
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
123-
, PathId(pathId)
124-
, ReadStats(std::make_shared<TReadStats>()) {
125-
}
120+
TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read);
126121

127122
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
128123
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ class TColumnsSetIds {
8585
}
8686
return result;
8787
}
88+
89+
TColumnsSetIds& operator+=(const TColumnsSetIds& external) {
90+
return (*this = *this + external);
91+
}
92+
93+
TColumnsSetIds& operator-=(const TColumnsSetIds& external) {
94+
return (*this = *this - external);
95+
}
96+
8897
bool IsEmpty() const {
8998
return ColumnIds.empty();
9099
}

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@ class TAllocateMemoryStep: public IFetchingStep {
4343
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
4444
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
4545
virtual TString DoDebugString() const override {
46-
return TStringBuilder() << "stage=" << StageIndex << ";";
46+
std::vector<TString> columns;
47+
for (const auto& pack : Packs) {
48+
for (const ui32 columnId : pack.GetColumns().GetColumnIds()) {
49+
columns.emplace_back(TStringBuilder() << pack.GetMemType() << ':' << columnId);
50+
}
51+
}
52+
return TStringBuilder() << "stage=" << StageIndex << ";column_ids=[" << JoinSeq(',', columns) << "];";
4753
}
4854

4955
public:
@@ -126,7 +132,7 @@ class TOptionalAssemblerStep: public IFetchingStep {
126132
class TColumnBlobsFetchingStep: public IFetchingStep {
127133
private:
128134
using TBase = IFetchingStep;
129-
TColumnsSetIds Columns;
135+
YDB_READONLY_DEF(TColumnsSetIds, Columns);
130136

131137
protected:
132138
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,24 @@ TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSour
7373
}
7474

7575
TString TFetchingScript::DebugString() const {
76+
TStringBuilder sb;
77+
TStringBuilder sbBranch;
78+
for (auto&& i : Steps) {
79+
sbBranch << "{" << i->DebugString() << "};";
80+
}
81+
if (!sbBranch) {
82+
return "";
83+
}
84+
sb << "{branch:" << BranchName << ";steps:[" << sbBranch << "]}";
85+
return sb;
86+
}
87+
88+
TString TFetchingScript::ProfileDebugString() const {
7689
TStringBuilder sb;
7790
TStringBuilder sbBranch;
7891
for (auto&& i : Steps) {
7992
if (i->GetSumDuration() > TDuration::MilliSeconds(10)) {
80-
sbBranch << "{" << i->DebugString() << "};";
93+
sbBranch << "{" << i->DebugString(true) << "};";
8194
}
8295
}
8396
if (!sbBranch) {
@@ -92,12 +105,9 @@ TString TFetchingScript::DebugString() const {
92105
return sb;
93106
}
94107

95-
TFetchingScript::TFetchingScript(const TSpecialReadContext& /*context*/) {
96-
}
97-
98-
void TFetchingScript::Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
108+
void TFetchingScriptBuilder::AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
99109
if (Steps.size() == 0) {
100-
AddStep<TAllocateMemoryStep>(entityIds, mType, stage);
110+
AddStep(std::make_shared<TAllocateMemoryStep>(entityIds, mType, stage));
101111
} else {
102112
std::optional<ui32> addIndex;
103113
for (i32 i = Steps.size() - 1; i >= 0; --i) {
@@ -123,49 +133,62 @@ void TFetchingScript::Allocation(const std::set<ui32>& entityIds, const EStageFe
123133
}
124134
}
125135

126-
TString IFetchingStep::DebugString() const {
136+
TString IFetchingStep::DebugString(const bool stats) const {
127137
TStringBuilder sb;
128-
sb << "name=" << Name << ";duration=" << GetSumDuration() << ";"
129-
<< "size=" << 1e-9 * GetSumSize() << ";details={" << DoDebugString() << "};";
138+
sb << "name=" << Name;
139+
if (stats) {
140+
sb << ";duration=" << GetSumDuration() << ";"
141+
<< "size=" << 1e-9 * GetSumSize();
142+
}
143+
sb << ";details={" << DoDebugString() << "};";
130144
return sb;
131145
}
132146

133-
bool TColumnsAccumulator::AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
134-
auto actualColumns = GetNotFetchedAlready(columns);
135-
FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns;
136-
if (!actualColumns.IsEmpty()) {
137-
script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob);
138-
script.AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
139-
return true;
147+
TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& context)
148+
: TFetchingScriptBuilder(context.GetReadMetadata()->GetResultSchema(), context.GetMergeColumns()) {
149+
}
150+
151+
void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
152+
auto actualColumns = columns - AddedFetchingColumns;
153+
AddedFetchingColumns += columns;
154+
if (actualColumns.IsEmpty()) {
155+
return;
156+
}
157+
if (Steps.size() && std::dynamic_pointer_cast<TColumnBlobsFetchingStep>(Steps.back())) {
158+
TColumnsSetIds fetchingColumns = actualColumns + std::dynamic_pointer_cast<TColumnBlobsFetchingStep>(Steps.back())->GetColumns();
159+
Steps.pop_back();
160+
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob);
161+
AddStep(std::make_shared<TColumnBlobsFetchingStep>(fetchingColumns));
162+
} else {
163+
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob);
164+
AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
140165
}
141-
return false;
142166
}
143167

144-
bool TColumnsAccumulator::AddAssembleStep(
145-
TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) {
146-
auto actualColumns = columns - AssemblerReadyColumns;
147-
AssemblerReadyColumns = AssemblerReadyColumns + columns;
168+
void TFetchingScriptBuilder::AddAssembleStep(
169+
const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) {
170+
auto actualColumns = columns - AddedAssembleColumns;
171+
AddedAssembleColumns += columns;
148172
if (actualColumns.IsEmpty()) {
149-
return false;
173+
return;
150174
}
151175
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
152176
if (sequential) {
153177
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
154178
if (notSequentialColumnIds.size()) {
155-
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
179+
AddAllocation(notSequentialColumnIds, stage, EMemType::Raw);
156180
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
157-
script.AddStep<TAssemblerStep>(cross, purposeId);
181+
AddStep(std::make_shared<TAssemblerStep>(cross, purposeId));
158182
*actualSet = *actualSet - *cross;
159183
}
160184
if (!actualSet->IsEmpty()) {
161-
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
162-
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
185+
AddAllocation(notSequentialColumnIds, stage, EMemType::RawSequential);
186+
AddStep(std::make_shared<TOptionalAssemblerStep>(actualSet, purposeId));
163187
}
164188
} else {
165-
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
166-
script.AddStep<TAssemblerStep>(actualSet, purposeId);
189+
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
190+
AddStep(std::make_shared<TAssemblerStep>(actualSet, purposeId));
167191
}
168-
return true;
169192
}
170193

171194
TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,21 @@ class IFetchingStep: public TNonCopyable {
110110
, Signals(TFetchingStepsSignalsCollection::GetSignals(name)) {
111111
}
112112

113-
TString DebugString() const;
113+
TString DebugString(const bool stats = false) const;
114114
};
115115

116116
class TFetchingScript {
117117
private:
118-
YDB_ACCESSOR(TString, BranchName, "UNDEFINED");
118+
YDB_READONLY_DEF(TString, BranchName);
119119
std::vector<std::shared_ptr<IFetchingStep>> Steps;
120120
TAtomic StartInstant;
121121
TAtomic FinishInstant;
122122

123123
public:
124-
TFetchingScript(const TSpecialReadContext& context);
125-
126-
void Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);
124+
TFetchingScript(const TString& branchName, std::vector<std::shared_ptr<IFetchingStep>>&& steps)
125+
: BranchName(branchName)
126+
, Steps(std::move(steps)) {
127+
}
127128

128129
void AddStepDataSize(const ui32 index, const ui64 size) {
129130
GetStep(index)->AddDataSize(size);
@@ -139,32 +140,13 @@ class TFetchingScript {
139140
}
140141

141142
TString DebugString() const;
143+
TString ProfileDebugString() const;
142144

143145
const std::shared_ptr<IFetchingStep>& GetStep(const ui32 index) const {
144146
AFL_VERIFY(index < Steps.size());
145147
return Steps[index];
146148
}
147149

148-
template <class T, typename... Args>
149-
std::shared_ptr<T> AddStep(Args... args) {
150-
auto result = std::make_shared<T>(args...);
151-
Steps.emplace_back(result);
152-
return result;
153-
}
154-
155-
template <class T, typename... Args>
156-
std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
157-
AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size());
158-
auto result = std::make_shared<T>(args...);
159-
Steps.insert(Steps.begin() + index, result);
160-
return result;
161-
}
162-
163-
void AddStep(const std::shared_ptr<IFetchingStep>& step) {
164-
AFL_VERIFY(step);
165-
Steps.emplace_back(step);
166-
}
167-
168150
bool IsFinished(const ui32 currentStepIdx) const {
169151
AFL_VERIFY(currentStepIdx <= Steps.size());
170152
return currentStepIdx == Steps.size();
@@ -189,9 +171,9 @@ class TFetchingScriptOwner: TNonCopyable {
189171
return Script;
190172
}
191173

192-
TString DebugString() const {
174+
TString ProfileDebugString() const {
193175
if (Script) {
194-
return TStringBuilder() << Script->DebugString() << Endl;
176+
return TStringBuilder() << Script->ProfileDebugString() << Endl;
195177
} else {
196178
return TStringBuilder() << "NO_SCRIPT" << Endl;
197179
}
@@ -231,26 +213,50 @@ class TFetchingScriptOwner: TNonCopyable {
231213
}
232214
};
233215

234-
class TColumnsAccumulator {
216+
class TFetchingScriptBuilder {
235217
private:
236-
TColumnsSetIds FetchingReadyColumns;
237-
TColumnsSetIds AssemblerReadyColumns;
238-
ISnapshotSchema::TPtr FullSchema;
239218
std::shared_ptr<TColumnsSetIds> GuaranteeNotOptional;
219+
ISnapshotSchema::TPtr FullSchema;
220+
221+
YDB_ACCESSOR(TString, BranchName, "UNDEFINED");
222+
std::vector<std::shared_ptr<IFetchingStep>> Steps;
223+
YDB_READONLY_DEF(TColumnsSetIds, AddedFetchingColumns);
224+
YDB_READONLY_DEF(TColumnsSetIds, AddedAssembleColumns);
225+
226+
TFetchingScriptBuilder(const ISnapshotSchema::TPtr& schema, const std::shared_ptr<TColumnsSetIds>& guaranteeNotOptional)
227+
: GuaranteeNotOptional(guaranteeNotOptional)
228+
, FullSchema(schema) {
229+
}
230+
231+
private:
232+
void AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);
233+
234+
template <class T, typename... Args>
235+
std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
236+
AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size());
237+
auto result = std::make_shared<T>(args...);
238+
Steps.insert(Steps.begin() + index, result);
239+
return result;
240+
}
240241

241242
public:
242-
TColumnsAccumulator(const std::shared_ptr<TColumnsSetIds>& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema)
243-
: FullSchema(fullSchema)
244-
, GuaranteeNotOptional(guaranteeNotOptional) {
243+
TFetchingScriptBuilder(const TSpecialReadContext& context);
244+
245+
std::shared_ptr<TFetchingScript> Build()&& {
246+
return std::make_shared<TFetchingScript>(BranchName, std::move(Steps));
245247
}
246248

247-
TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const {
248-
return columns - FetchingReadyColumns;
249+
void AddStep(const std::shared_ptr<IFetchingStep>& step) {
250+
AFL_VERIFY(step);
251+
Steps.emplace_back(step);
249252
}
250253

251-
bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage);
252-
bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage,
253-
const bool sequential);
254+
void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage);
255+
void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential);
256+
257+
static TFetchingScriptBuilder MakeForTests(ISnapshotSchema::TPtr schema, std::shared_ptr<TColumnsSetIds> guaranteeNotOptional = nullptr) {
258+
return TFetchingScriptBuilder(schema, guaranteeNotOptional ? guaranteeNotOptional : std::make_shared<TColumnsSetIds>());
259+
}
254260
};
255261

256262
class TFetchingScriptCursor {

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ SRCS(
1515

1616
PEERDIR(
1717
ydb/core/tx/columnshard/engines/scheme
18+
yql/essentials/minikql
1819
)
1920

2021
GENERATE_ENUM_SERIALIZATION(columns_set.h)

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3535

3636
TDataStorageAccessor dataAccessor(insertTable, index);
3737
AFL_VERIFY(read.PathId);
38-
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
39-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr);
38+
auto readMetadata = std::make_shared<TReadMetadata>(index->CopyVersionedIndexPtr(), read);
4039

4140
auto initResult = readMetadata->Init(self, read, dataAccessor);
4241
if (!initResult) {

0 commit comments

Comments
 (0)