Skip to content

Commit 6dbfa0f

Browse files
correction
1 parent caf92fa commit 6dbfa0f

15 files changed

+70
-50
lines changed

ydb/core/formats/arrow/program/abstract.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ NJson::TJsonValue IResourceProcessor::DebugJson() const {
2020
return result;
2121
}
2222

23-
TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr<TAccessorsCollection>& resources) const {
23+
TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const {
2424
for (auto&& i : Output) {
2525
if (resources->HasColumn(i.GetColumnId())) {
2626
return TConclusionStatus::Fail("column " + ::ToString(i.GetColumnId()) + " has already");
2727
}
2828
}
29-
return DoExecute(resources);
29+
return DoExecute(resources, context);
3030
}
3131

3232
std::optional<TFetchingInfo> IResourceProcessor::BuildFetchTask(

ydb/core/formats/arrow/program/abstract.h

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,38 @@ class TFetchingInfo {
188188
}
189189
};
190190

191+
class TProcessorContext {
192+
protected:
193+
std::vector<TColumnChainInfo> ColumnsToFetch;
194+
std::vector<TColumnChainInfo> OriginalColumnsToUse;
195+
std::vector<TColumnChainInfo> ColumnsToDrop;
196+
197+
public:
198+
const std::vector<TColumnChainInfo>& GetColumnsToFetch() const {
199+
return ColumnsToFetch;
200+
}
201+
const std::vector<TColumnChainInfo>& GetOriginalColumnsToUse() const {
202+
return OriginalColumnsToUse;
203+
}
204+
const std::vector<TColumnChainInfo>& GetColumnsToDrop() const {
205+
return ColumnsToDrop;
206+
}
207+
208+
TProcessorContext(
209+
std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse, std::vector<TColumnChainInfo>&& toDrop)
210+
: ColumnsToFetch(std::move(toFetch))
211+
, OriginalColumnsToUse(std::move(originalToUse))
212+
, ColumnsToDrop(std::move(toDrop)) {
213+
}
214+
};
215+
191216
class IResourceProcessor {
192217
private:
193218
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Input);
194219
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Output);
195220
YDB_READONLY(EProcessorType, ProcessorType, EProcessorType::Unknown);
196221

197-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const = 0;
222+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const = 0;
198223

199224
virtual NJson::TJsonValue DoDebugJson() const {
200225
return NJson::JSON_MAP;
@@ -208,7 +233,8 @@ class IResourceProcessor {
208233
return DoGetWeight();
209234
}
210235

211-
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
236+
virtual std::optional<TFetchingInfo> BuildFetchTask(
237+
const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
212238

213239
virtual bool IsAggregation() const = 0;
214240

@@ -232,25 +258,21 @@ class IResourceProcessor {
232258
, ProcessorType(type) {
233259
}
234260

235-
[[nodiscard]] TConclusionStatus Execute(const std::shared_ptr<TAccessorsCollection>& resources) const;
261+
[[nodiscard]] TConclusionStatus Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const;
236262
};
237263

238-
class TResourceProcessorStep {
264+
class TResourceProcessorStep: public TProcessorContext {
239265
private:
240-
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, ColumnsToFetch);
241-
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, OriginalColumnsToUse);
266+
using TBase = TProcessorContext;
242267
YDB_READONLY_DEF(std::shared_ptr<IResourceProcessor>, Processor);
243-
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, ColumnsToDrop);
244268

245269
public:
246270
NJson::TJsonValue DebugJson() const;
247271

248272
TResourceProcessorStep(std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse,
249273
std::shared_ptr<IResourceProcessor>&& processor, std::vector<TColumnChainInfo>&& toDrop)
250-
: ColumnsToFetch(std::move(toFetch))
251-
, OriginalColumnsToUse(std::move(originalToUse))
252-
, Processor(std::move(processor))
253-
, ColumnsToDrop(std::move(toDrop)) {
274+
: TBase(std::move(toFetch), std::move(originalToUse), std::move(toDrop))
275+
, Processor(std::move(processor)) {
254276
AFL_VERIFY(Processor);
255277
}
256278

ydb/core/formats/arrow/program/aggr_keys.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ CH::AggFunctionId TWithKeysAggregationOption::GetHouseFunction(const EAggregate
6666
return CH::AggFunctionId::AGG_UNSPECIFIED;
6767
}
6868

69-
TConclusionStatus TWithKeysAggregationProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
69+
TConclusionStatus TWithKeysAggregationProcessor::DoExecute(
70+
const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
7071
CH::GroupByOptions funcOpts;
7172
funcOpts.assigns.reserve(AggregationKeys.size() + Aggregations.size());
7273
funcOpts.has_nullable_key = false;

ydb/core/formats/arrow/program/aggr_keys.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class TAggregateFunction: public TInternalFunction {
2020
}
2121
virtual TConclusion<arrow::Datum> Call(
2222
const TExecFunctionContext& context, const std::shared_ptr<TAccessorsCollection>& resources) const override;
23-
23+
2424
TConclusion<arrow::Datum> PrepareResult(arrow::Datum&& datum) const override {
2525
if (!datum.is_scalar()) {
2626
return TConclusionStatus::Fail("Aggregate result is not a scalar.");
@@ -103,13 +103,14 @@ class TAggregateFunction: public TInternalFunction {
103103
return "";
104104
}
105105

106-
virtual TConclusionStatus CheckIO(const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& output) const override {
106+
virtual TConclusionStatus CheckIO(
107+
const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& output) const override {
107108
if (output.size() != 1) {
108109
return TConclusionStatus::Fail("output size != 1 (" + ::ToString(output.size()) + ")");
109110
}
110-
// if (input.size() != 1) {
111-
// return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")");
112-
// }
111+
// if (input.size() != 1) {
112+
// return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")");
113+
// }
113114
return TConclusionStatus::Success();
114115
}
115116
};
@@ -149,7 +150,7 @@ class TWithKeysAggregationProcessor: public IResourceProcessor {
149150
std::vector<TColumnChainInfo> AggregationKeys;
150151
std::vector<TWithKeysAggregationOption> Aggregations;
151152

152-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
153+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
153154

154155
TWithKeysAggregationProcessor(std::vector<TColumnChainInfo>&& input, std::vector<TColumnChainInfo>&& output,
155156
std::vector<TColumnChainInfo>&& aggregationKeys, std::vector<TWithKeysAggregationOption>&& aggregations)

ydb/core/formats/arrow/program/assign_const.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace NKikimr::NArrow::NSSA {
1212

13-
TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
13+
TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
1414
AFL_VERIFY(GetInput().empty());
1515
resources->AddConstantVerified(GetOutputColumnIdOnce(), ScalarConstant);
1616
return TConclusionStatus::Success();

ydb/core/formats/arrow/program/assign_const.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class TConstProcessor: public IResourceProcessor {
88
using TBase = IResourceProcessor;
99
YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, ScalarConstant);
1010

11-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
11+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
1212

1313
virtual bool IsAggregation() const override {
1414
return false;

ydb/core/formats/arrow/program/assign_internal.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
namespace NKikimr::NArrow::NSSA {
66

7-
TConclusionStatus TCalculationProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
7+
TConclusionStatus TCalculationProcessor::DoExecute(
8+
const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
89
if (KernelLogic) {
910
auto resultKernel = KernelLogic->Execute(GetInput(), GetOutput(), resources);
1011
if (resultKernel.IsFail()) {

ydb/core/formats/arrow/program/assign_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class TCalculationProcessor: public IResourceProcessor {
1616

1717
std::shared_ptr<IStepFunction> Function;
1818

19-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
19+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
2020

2121
TCalculationProcessor(std::vector<TColumnChainInfo>&& input, std::vector<TColumnChainInfo>&& output,
2222
const std::shared_ptr<IStepFunction>& function, const std::shared_ptr<IKernelLogic>& kernelLogic)

ydb/core/formats/arrow/program/chain.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ TConclusionStatus TProgramChain::Initialize() {
160160

161161
TConclusionStatus TProgramChain::Apply(const std::shared_ptr<TAccessorsCollection>& resources) const {
162162
for (auto&& i : Processors) {
163-
auto status = i->Execute(resources);
163+
auto status = i->Execute(resources, i);
164164
if (status.IsFail()) {
165165
return status;
166166
}

ydb/core/formats/arrow/program/filter.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,13 @@ class TFilterVisitor: public arrow::ArrayVisitor {
6969
}
7070
};
7171

72-
TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
72+
TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const {
7373
std::vector<std::shared_ptr<IChunkedArray>> inputColumns;
74-
if (ReuseColumns) {
75-
inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
76-
} else {
74+
AFL_VERIFY(context.GetColumnsToDrop().size() <= 1)("size", context.GetColumnsToDrop().size());
75+
if (context.GetColumnsToDrop().size() && GetInputColumnIdOnce() == context.GetColumnsToDrop().front()) {
7776
inputColumns = resources->ExtractAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
77+
} else {
78+
inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
7879
}
7980
TFilterVisitor filterVisitor(inputColumns.front()->GetRecordsCount());
8081
for (auto& arr : inputColumns) {

ydb/core/formats/arrow/program/filter.h

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,20 @@ namespace NKikimr::NArrow::NSSA {
66
class TFilterProcessor: public IResourceProcessor {
77
private:
88
using TBase = IResourceProcessor;
9-
const bool ReuseColumns;
10-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
9+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
1110

1211
virtual bool IsAggregation() const override {
1312
return false;
1413
}
1514

1615
public:
17-
TFilterProcessor(std::vector<TColumnChainInfo>&& input, const bool reuseColumns = false)
18-
: TBase(std::move(input), {}, EProcessorType::Filter)
19-
, ReuseColumns(reuseColumns)
20-
{
21-
AFL_VERIFY(GetInput().size());
16+
TFilterProcessor(std::vector<TColumnChainInfo>&& input)
17+
: TBase(std::move(input), {}, EProcessorType::Filter) {
18+
AFL_VERIFY(GetInput().size() == 1)("size", GetInput().size());
2219
}
2320

24-
TFilterProcessor(const TColumnChainInfo& input, const bool reuseColumns = false)
25-
: TBase({ input }, {}, EProcessorType::Filter)
26-
, ReuseColumns(reuseColumns)
27-
{
21+
TFilterProcessor(const TColumnChainInfo& input)
22+
: TBase({ input }, {}, EProcessorType::Filter) {
2823
}
2924
};
3025

ydb/core/formats/arrow/program/graph.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,10 @@ TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> TGraph::BuildChain
193193
}
194194
}
195195
std::sort(nodeChains.begin(), nodeChains.end());
196-
bool found = false;
196+
ui32 foundCount = 0;
197197
for (auto&& [_, i] : Nodes) {
198-
if (i->GetProcessor()->GetProcessorType() == EProcessorType::Projection) {
199-
if (found) {
200-
return TConclusionStatus::Fail("detected projections duplication");
201-
}
202-
found = true;
198+
if (i->GetProcessor()->GetProcessorType() != EProcessorType::Filter && i->GetProcessor()->GetOutput().empty()) {
199+
++foundCount;
203200
std::vector<const TGraphNode*> chain = i->GetFetchingChain();
204201
std::vector<const TGraphNode*> actualChain;
205202
for (auto&& c : chain) {
@@ -211,7 +208,7 @@ TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> TGraph::BuildChain
211208
nodeChains.emplace_back(std::move(actualChain));
212209
}
213210
}
214-
if (!found) {
211+
if (!foundCount) {
215212
return TConclusionStatus::Fail("not found projection node");
216213
}
217214
std::vector<std::shared_ptr<IResourceProcessor>> result;

ydb/core/formats/arrow/program/original.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ class TOriginalColumnProcessor: public IResourceProcessor {
1212
YDB_ACCESSOR(ui32, ColumnId, 0);
1313
YDB_ACCESSOR_DEF(TString, ColumnName);
1414

15-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& /*resources*/) const override {
15+
virtual TConclusionStatus DoExecute(
16+
const std::shared_ptr<TAccessorsCollection>& /*resources*/, const TProcessorContext& /*context*/) const override {
1617
AFL_VERIFY(false);
1718
return TConclusionStatus::Success();
1819
}

ydb/core/formats/arrow/program/projection.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
namespace NKikimr::NArrow::NSSA {
55

6-
TConclusionStatus TProjectionProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
6+
TConclusionStatus TProjectionProcessor::DoExecute(
7+
const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
78
resources->RemainOnly(TColumnChainInfo::ExtractColumnIds(GetInput()), true);
89
return TConclusionStatus::Success();
910
}

ydb/core/formats/arrow/program/projection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ class TProjectionProcessor: public IResourceProcessor {
77
private:
88
using TBase = IResourceProcessor;
99

10-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
10+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
1111

1212
virtual bool IsAggregation() const override {
1313
return false;

0 commit comments

Comments
 (0)