Skip to content

Commit 6f23431

Browse files
Merge de229f8 into 264201f
2 parents 264201f + de229f8 commit 6f23431

File tree

5 files changed

+30
-5
lines changed

5 files changed

+30
-5
lines changed

ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,16 @@ class TFetchedData {
109109

110110
};
111111

112+
class TFetchedResult {
113+
private:
114+
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
115+
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedFilter);
116+
public:
117+
TFetchedResult(std::unique_ptr<TFetchedData>&& data)
118+
: Batch(data->GetBatch())
119+
, NotAppliedFilter(data->GetNotAppliedFilter()) {
120+
121+
}
122+
};
123+
112124
}

ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const {
1616
bool TStepAction::DoExecute() {
1717
while (Step) {
1818
if (Source->IsEmptyData()) {
19+
Source->Finalize();
1920
FinishedFlag = true;
2021
return true;
2122
}
2223
if (!Step->ExecuteInplace(Source, Step)) {
2324
return true;
2425
}
2526
if (Source->IsEmptyData()) {
27+
Source->Finalize();
2628
FinishedFlag = true;
2729
return true;
2830
}
2931
Step = Step->GetNextStep();
3032
}
33+
Source->Finalize();
3134
FinishedFlag = true;
3235
return true;
3336
}

ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask {
6060
}
6161
virtual bool DoExecute() override {
6262
if (MergingContext->IsExclusiveInterval()) {
63-
ResultBatch = Sources.begin()->second->GetStageData().GetBatch();
63+
ResultBatch = Sources.begin()->second->GetStageResult().GetBatch();
6464
if (ResultBatch && ResultBatch->num_rows()) {
6565
LastPK = Sources.begin()->second->GetLastPK();
6666
ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector());
@@ -81,8 +81,8 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask {
8181
}
8282
std::shared_ptr<NIndexedReader::TMergePartialStream> merger = Context->BuildMerger();
8383
for (auto&& [_, i] : Sources) {
84-
if (auto rb = i->GetStageData().GetBatch()) {
85-
merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter());
84+
if (auto rb = i->GetStageResult().GetBatch()) {
85+
merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter());
8686
}
8787
}
8888
AFL_VERIFY(merger->GetSourcesCount() <= Sources.size());

ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace NKikimr::NOlap::NPlainReader {
1515
void IDataSource::InitFetchingPlan(const std::shared_ptr<IFetchingStep>& fetchingFirstStep, const std::shared_ptr<IDataSource>& sourcePtr, const bool isExclusive) {
1616
AFL_VERIFY(fetchingFirstStep);
1717
if (AtomicCas(&FilterStageFlag, 1, 0)) {
18-
StageData = std::make_shared<TFetchedData>(isExclusive);
18+
StageData = std::make_unique<TFetchedData>(isExclusive);
1919
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx);
2020
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
2121
if (IsAborted()) {

ydb/core/tx/columnshard/engines/reader/plain_reader/source.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class IDataSource {
3636
protected:
3737
THashMap<ui32, TFetchingInterval*> Intervals;
3838

39-
std::shared_ptr<TFetchedData> StageData;
39+
std::unique_ptr<TFetchedData> StageData;
40+
std::unique_ptr<TFetchedResult> StageResult;
4041

4142
TAtomic FilterStageFlag = 0;
4243
bool IsReadyFlag = false;
@@ -51,8 +52,17 @@ class IDataSource {
5152
virtual void DoAbort() = 0;
5253
virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0;
5354
public:
55+
const TFetchedResult& GetStageResult() const {
56+
AFL_VERIFY(!!StageResult);
57+
return *StageResult;
58+
}
59+
5460
void SetIsReady();
5561

62+
void Finalize() {
63+
StageResult = std::make_unique<TFetchedResult>(std::move(StageData));
64+
}
65+
5666
bool IsEmptyData() const {
5767
return GetStageData().IsEmpty();
5868
}

0 commit comments

Comments
 (0)