Skip to content

Commit c30a288

Browse files
filter chains optimization (#15237)
1 parent e2c166d commit c30a288

21 files changed

+657
-54
lines changed

ydb/core/formats/arrow/program/abstract.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ NJson::TJsonValue IResourceProcessor::DebugJson() const {
2020
return result;
2121
}
2222

23-
TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr<TAccessorsCollection>& resources) const {
23+
TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const {
2424
for (auto&& i : Output) {
2525
if (resources->HasColumn(i.GetColumnId())) {
2626
return TConclusionStatus::Fail("column " + ::ToString(i.GetColumnId()) + " has already");
2727
}
2828
}
29-
return DoExecute(resources);
29+
return DoExecute(resources, context);
3030
}
3131

3232
std::optional<TFetchingInfo> IResourceProcessor::BuildFetchTask(

ydb/core/formats/arrow/program/abstract.h

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ enum class EProcessorType {
164164
Calculation,
165165
Projection,
166166
Filter,
167-
Aggregation
167+
Aggregation,
168+
Original
168169
};
169170

170171
class TFetchingInfo {
@@ -187,20 +188,53 @@ class TFetchingInfo {
187188
}
188189
};
189190

191+
class TProcessorContext {
192+
protected:
193+
std::vector<TColumnChainInfo> ColumnsToFetch;
194+
std::vector<TColumnChainInfo> OriginalColumnsToUse;
195+
std::vector<TColumnChainInfo> ColumnsToDrop;
196+
197+
public:
198+
const std::vector<TColumnChainInfo>& GetColumnsToFetch() const {
199+
return ColumnsToFetch;
200+
}
201+
const std::vector<TColumnChainInfo>& GetOriginalColumnsToUse() const {
202+
return OriginalColumnsToUse;
203+
}
204+
const std::vector<TColumnChainInfo>& GetColumnsToDrop() const {
205+
return ColumnsToDrop;
206+
}
207+
208+
TProcessorContext(
209+
std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse, std::vector<TColumnChainInfo>&& toDrop)
210+
: ColumnsToFetch(std::move(toFetch))
211+
, OriginalColumnsToUse(std::move(originalToUse))
212+
, ColumnsToDrop(std::move(toDrop)) {
213+
}
214+
};
215+
190216
class IResourceProcessor {
191217
private:
192218
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Input);
193219
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Output);
194220
YDB_READONLY(EProcessorType, ProcessorType, EProcessorType::Unknown);
195221

196-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const = 0;
222+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const = 0;
197223

198224
virtual NJson::TJsonValue DoDebugJson() const {
199225
return NJson::JSON_MAP;
200226
}
227+
virtual ui64 DoGetWeight() const {
228+
return 0;
229+
}
201230

202231
public:
203-
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
232+
ui64 GetWeight() const {
233+
return DoGetWeight();
234+
}
235+
236+
virtual std::optional<TFetchingInfo> BuildFetchTask(
237+
const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
204238

205239
virtual bool IsAggregation() const = 0;
206240

@@ -224,25 +258,21 @@ class IResourceProcessor {
224258
, ProcessorType(type) {
225259
}
226260

227-
[[nodiscard]] TConclusionStatus Execute(const std::shared_ptr<TAccessorsCollection>& resources) const;
261+
[[nodiscard]] TConclusionStatus Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const;
228262
};
229263

230-
class TResourceProcessorStep {
264+
class TResourceProcessorStep: public TProcessorContext {
231265
private:
232-
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, ColumnsToFetch);
233-
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, OriginalColumnsToUse);
266+
using TBase = TProcessorContext;
234267
YDB_READONLY_DEF(std::shared_ptr<IResourceProcessor>, Processor);
235-
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, ColumnsToDrop);
236268

237269
public:
238270
NJson::TJsonValue DebugJson() const;
239271

240272
TResourceProcessorStep(std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse,
241273
std::shared_ptr<IResourceProcessor>&& processor, std::vector<TColumnChainInfo>&& toDrop)
242-
: ColumnsToFetch(std::move(toFetch))
243-
, OriginalColumnsToUse(std::move(originalToUse))
244-
, Processor(std::move(processor))
245-
, ColumnsToDrop(std::move(toDrop)) {
274+
: TBase(std::move(toFetch), std::move(originalToUse), std::move(toDrop))
275+
, Processor(std::move(processor)) {
246276
AFL_VERIFY(Processor);
247277
}
248278

ydb/core/formats/arrow/program/aggr_keys.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ CH::AggFunctionId TWithKeysAggregationOption::GetHouseFunction(const EAggregate
6666
return CH::AggFunctionId::AGG_UNSPECIFIED;
6767
}
6868

69-
TConclusionStatus TWithKeysAggregationProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
69+
TConclusionStatus TWithKeysAggregationProcessor::DoExecute(
70+
const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
7071
CH::GroupByOptions funcOpts;
7172
funcOpts.assigns.reserve(AggregationKeys.size() + Aggregations.size());
7273
funcOpts.has_nullable_key = false;

ydb/core/formats/arrow/program/aggr_keys.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class TAggregateFunction: public TInternalFunction {
2020
}
2121
virtual TConclusion<arrow::Datum> Call(
2222
const TExecFunctionContext& context, const std::shared_ptr<TAccessorsCollection>& resources) const override;
23-
23+
2424
TConclusion<arrow::Datum> PrepareResult(arrow::Datum&& datum) const override {
2525
if (!datum.is_scalar()) {
2626
return TConclusionStatus::Fail("Aggregate result is not a scalar.");
@@ -103,13 +103,14 @@ class TAggregateFunction: public TInternalFunction {
103103
return "";
104104
}
105105

106-
virtual TConclusionStatus CheckIO(const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& output) const override {
106+
virtual TConclusionStatus CheckIO(
107+
const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& output) const override {
107108
if (output.size() != 1) {
108109
return TConclusionStatus::Fail("output size != 1 (" + ::ToString(output.size()) + ")");
109110
}
110-
// if (input.size() != 1) {
111-
// return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")");
112-
// }
111+
// if (input.size() != 1) {
112+
// return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")");
113+
// }
113114
return TConclusionStatus::Success();
114115
}
115116
};
@@ -149,7 +150,7 @@ class TWithKeysAggregationProcessor: public IResourceProcessor {
149150
std::vector<TColumnChainInfo> AggregationKeys;
150151
std::vector<TWithKeysAggregationOption> Aggregations;
151152

152-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
153+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
153154

154155
TWithKeysAggregationProcessor(std::vector<TColumnChainInfo>&& input, std::vector<TColumnChainInfo>&& output,
155156
std::vector<TColumnChainInfo>&& aggregationKeys, std::vector<TWithKeysAggregationOption>&& aggregations)

ydb/core/formats/arrow/program/assign_const.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace NKikimr::NArrow::NSSA {
1212

13-
TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
13+
TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
1414
AFL_VERIFY(GetInput().empty());
1515
resources->AddConstantVerified(GetOutputColumnIdOnce(), ScalarConstant);
1616
return TConclusionStatus::Success();

ydb/core/formats/arrow/program/assign_const.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class TConstProcessor: public IResourceProcessor {
88
using TBase = IResourceProcessor;
99
YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, ScalarConstant);
1010

11-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
11+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
1212

1313
virtual bool IsAggregation() const override {
1414
return false;

ydb/core/formats/arrow/program/assign_internal.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
namespace NKikimr::NArrow::NSSA {
66

7-
TConclusionStatus TCalculationProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
7+
TConclusionStatus TCalculationProcessor::DoExecute(
8+
const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
89
if (KernelLogic) {
910
auto resultKernel = KernelLogic->Execute(GetInput(), GetOutput(), resources);
1011
if (resultKernel.IsFail()) {

ydb/core/formats/arrow/program/assign_internal.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "functions.h"
44
#include "kernel_logic.h"
55

6+
#include <yql/essentials/core/arrow_kernels/request/request.h>
7+
68
namespace NKikimr::NArrow::NSSA {
79

810
class TCalculationProcessor: public IResourceProcessor {
@@ -14,7 +16,7 @@ class TCalculationProcessor: public IResourceProcessor {
1416

1517
std::shared_ptr<IStepFunction> Function;
1618

17-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
19+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
1820

1921
TCalculationProcessor(std::vector<TColumnChainInfo>&& input, std::vector<TColumnChainInfo>&& output,
2022
const std::shared_ptr<IStepFunction>& function, const std::shared_ptr<IKernelLogic>& kernelLogic)
@@ -27,6 +29,23 @@ class TCalculationProcessor: public IResourceProcessor {
2729
return Function->IsAggregation();
2830
}
2931

32+
virtual ui64 DoGetWeight() const override {
33+
if (KernelLogic) {
34+
return 0;
35+
}
36+
if (!YqlOperationId) {
37+
return 10;
38+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith ||
39+
(NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) {
40+
return 7;
41+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) {
42+
return 10;
43+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::Equals) {
44+
return 5;
45+
}
46+
return 0;
47+
}
48+
3049
public:
3150
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType,
3251
const std::shared_ptr<TAccessorsCollection>& resources) const override {

ydb/core/formats/arrow/program/chain.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "chain.h"
22
#include "collection.h"
3+
#include "graph.h"
34

45
namespace NKikimr::NArrow::NSSA {
56

@@ -39,7 +40,17 @@ class TColumnUsage {
3940
};
4041
} // namespace
4142

42-
TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver) {
43+
TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processorsExt, const IColumnResolver& resolver) {
44+
NOptimization::TGraph graph(std::move(processorsExt), resolver);
45+
auto conclusion = graph.Collapse();
46+
if (conclusion.IsFail()) {
47+
return conclusion;
48+
}
49+
auto processorsConclusion = graph.BuildChain();
50+
if (processorsConclusion.IsFail()) {
51+
return processorsConclusion;
52+
}
53+
auto processors = processorsConclusion.DetachResult();
4354
THashMap<ui32, TColumnUsage> contextUsage;
4455
ui32 stepIdx = 0;
4556
THashSet<ui32> sourceColumns;
@@ -149,7 +160,7 @@ TConclusionStatus TProgramChain::Initialize() {
149160

150161
TConclusionStatus TProgramChain::Apply(const std::shared_ptr<TAccessorsCollection>& resources) const {
151162
for (auto&& i : Processors) {
152-
auto status = i->Execute(resources);
163+
auto status = i->Execute(resources, i);
153164
if (status.IsFail()) {
154165
return status;
155166
}

ydb/core/formats/arrow/program/filter.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,21 @@ class TFilterVisitor: public arrow::ArrayVisitor {
6060
arrow::Status VisitImpl(const TArray& array) {
6161
AFL_VERIFY(Started);
6262
for (ui32 i = 0; i < array.length(); ++i) {
63-
const bool columnValue = (bool)array.Value(i);
6463
const ui32 currentIdx = CursorIdx++;
65-
FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && columnValue;
64+
FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && !array.IsNull(i) && (bool)array.Value(i);
6665
}
6766
AFL_VERIFY(CursorIdx <= FiltersMerged.size());
6867
return arrow::Status::OK();
6968
}
7069
};
7170

72-
TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
71+
TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const {
7372
std::vector<std::shared_ptr<IChunkedArray>> inputColumns;
74-
if (ReuseColumns) {
75-
inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
76-
} else {
73+
AFL_VERIFY(context.GetColumnsToDrop().size() <= 1)("size", context.GetColumnsToDrop().size());
74+
if (context.GetColumnsToDrop().size() && GetInputColumnIdOnce() == context.GetColumnsToDrop().front()) {
7775
inputColumns = resources->ExtractAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
76+
} else {
77+
inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
7878
}
7979
TFilterVisitor filterVisitor(inputColumns.front()->GetRecordsCount());
8080
for (auto& arr : inputColumns) {

ydb/core/formats/arrow/program/filter.h

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,20 @@ namespace NKikimr::NArrow::NSSA {
66
class TFilterProcessor: public IResourceProcessor {
77
private:
88
using TBase = IResourceProcessor;
9-
const bool ReuseColumns;
10-
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
9+
virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
1110

1211
virtual bool IsAggregation() const override {
1312
return false;
1413
}
1514

1615
public:
17-
TFilterProcessor(std::vector<TColumnChainInfo>&& input, const bool reuseColumns = false)
18-
: TBase(std::move(input), {}, EProcessorType::Filter)
19-
, ReuseColumns(reuseColumns)
20-
{
21-
AFL_VERIFY(GetInput().size());
16+
TFilterProcessor(std::vector<TColumnChainInfo>&& input)
17+
: TBase(std::move(input), {}, EProcessorType::Filter) {
18+
AFL_VERIFY(GetInput().size() == 1)("size", GetInput().size());
2219
}
2320

24-
TFilterProcessor(const TColumnChainInfo& input, const bool reuseColumns = false)
25-
: TBase({ input }, {}, EProcessorType::Filter)
26-
, ReuseColumns(reuseColumns)
27-
{
21+
TFilterProcessor(const TColumnChainInfo& input)
22+
: TBase({ input }, {}, EProcessorType::Filter) {
2823
}
2924
};
3025

0 commit comments

Comments
 (0)