Skip to content

Commit d6345c0

Browse files
committed
disabled block engine for spilling (#7778)
1 parent 58f3123 commit d6345c0

18 files changed

+149
-41
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
141141
true, // defaultWatermarksMode
142142
true); // syncActor
143143
} else {
144-
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
144+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
145+
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey(), spillingSettings.IsAggregationSpillingEnabled());
145146
}
146147
if (output) {
147148
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
252252
}
253253

254254
TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
255-
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
255+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
256+
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false, spillingSettings.IsAggregationSpillingEnabled());
256257
DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
257258
return TExprBase(output);
258259
}

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ ui64 ParseEnableSpillingNodes(const TString &v) {
3434
if (s.empty()) {
3535
throw yexception() << "Empty value item";
3636
}
37-
auto value = FromStringWithDefault<NYql::TDqSettings::EEnabledSpillingNodes>(
38-
s, NYql::TDqSettings::EEnabledSpillingNodes::None);
37+
38+
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
3939
res |= ui64(value);
4040
}
4141
return res;

ydb/core/kqp/ut/olap/aggregations_ut.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,49 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
171171
}
172172
}
173173

174+
Y_UNIT_TEST_TWIN(DisableBlockEngineInAggregationWithSpilling, AllowSpilling) {
175+
auto settings = TKikimrSettings()
176+
.SetWithSampleTables(false);
177+
settings.AppConfig.MutableTableServiceConfig()->SetBlockChannelsMode(NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_FORCE);
178+
if (AllowSpilling) {
179+
settings.AppConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("Aggregation");
180+
} else {
181+
settings.AppConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("None");
182+
}
183+
TKikimrRunner kikimr(settings);
184+
185+
TLocalHelper(kikimr).CreateTestOlapTable();
186+
auto client = kikimr.GetQueryClient();
187+
188+
{
189+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
190+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
191+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
192+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
193+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
194+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
195+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
196+
}
197+
198+
{
199+
TString query = R"(
200+
--!syntax_v1
201+
SELECT
202+
COUNT(*)
203+
FROM `/Root/olapStore/olapTable`
204+
GROUP BY level
205+
)";
206+
207+
auto res = StreamExplainQuery(query, client);
208+
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
209+
210+
auto plan = CollectStreamResult(res);
211+
212+
bool hasWideCombiner = plan.QueryStats->Getquery_ast().Contains("WideCombiner");
213+
UNIT_ASSERT_C(hasWideCombiner == AllowSpilling, plan.QueryStats->Getquery_ast());
214+
}
215+
}
216+
174217
Y_UNIT_TEST_TWIN(CountAllPushdown, UseLlvm) {
175218
auto settings = TKikimrSettings()
176219
.SetWithSampleTables(false);

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2491,6 +2491,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
24912491
NKikimrConfig::TAppConfig appConfig;
24922492
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
24932493
appConfig.MutableTableServiceConfig()->SetBlockChannelsMode(blockChannelsMode);
2494+
appConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("None");
24942495
auto settings = TKikimrSettings()
24952496
.SetAppConfig(appConfig)
24962497
.SetWithSampleTables(true);

ydb/library/yql/core/yql_aggregate_expander.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregateWithFullOutput()
3636

3737
HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
3838
[](const auto& child) { return child->ChildrenSize() == 3; });
39-
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.IsBlockEngineEnabled()) || ForceCompact || HasSetting(*settings, "compact");
39+
EffectiveCompact = (HaveDistinct && CompactForDistinct && !UseBlocks) || ForceCompact || HasSetting(*settings, "compact");
4040
for (const auto& trait : Traits) {
4141
auto mergeLambda = trait->Child(5);
4242
if (mergeLambda->Tail().IsCallable("Void")) {
@@ -67,7 +67,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregateWithFullOutput()
6767
return GeneratePhases();
6868
}
6969

70-
if (TypesCtx.IsBlockEngineEnabled()) {
70+
if (UseBlocks) {
7171
if (Suffix == "Combine") {
7272
auto ret = TryGenerateBlockCombine();
7373
if (ret) {
@@ -2785,7 +2785,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
27852785
streams.push_back(SerializeIdxSet(indicies));
27862786
}
27872787

2788-
if (TypesCtx.IsBlockEngineEnabled()) {
2788+
if (UseBlocks) {
27892789
for (ui32 i = 0; i < unionAllInputs.size(); ++i) {
27902790
unionAllInputs[i] = Ctx.Builder(Node->Pos())
27912791
.Callable("Map")
@@ -2806,7 +2806,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
28062806
}
28072807

28082808
auto settings = cleanOutputSettings;
2809-
if (TypesCtx.IsBlockEngineEnabled()) {
2809+
if (UseBlocks) {
28102810
settings = AddSetting(*settings, Node->Pos(), "many_streams", Ctx.NewList(Node->Pos(), std::move(streams)), Ctx);
28112811
}
28122812

@@ -2839,7 +2839,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
28392839
}
28402840

28412841
TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalize() {
2842-
if (UsePartitionsByKeys || !TypesCtx.IsBlockEngineEnabled()) {
2842+
if (UsePartitionsByKeys || !UseBlocks) {
28432843
return nullptr;
28442844
}
28452845

@@ -2934,7 +2934,7 @@ TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContex
29342934
return ret;
29352935
}
29362936
}
2937-
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled());
2937+
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled(), false);
29382938
}
29392939

29402940
} // namespace NYql

ydb/library/yql/core/yql_aggregate_expander.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace NYql {
99
class TAggregateExpander {
1010
public:
1111
TAggregateExpander(bool usePartitionsByKeys, const bool useFinalizeByKeys, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
12-
bool forceCompact = false, bool compactForDistinct = false, bool usePhases = false)
12+
bool forceCompact = false, bool compactForDistinct = false, bool usePhases = false, bool allowSpilling = false)
1313
: Node(node)
1414
, Ctx(ctx)
1515
, TypesCtx(typesCtx)
@@ -25,6 +25,7 @@ class TAggregateExpander {
2525
, HaveSessionSetting(false)
2626
, OriginalRowType(nullptr)
2727
, RowType(nullptr)
28+
, UseBlocks(typesCtx.IsBlockEngineEnabled() && !allowSpilling)
2829
{
2930
PreMap = Ctx.Builder(node->Pos())
3031
.Lambda()
@@ -115,6 +116,7 @@ class TAggregateExpander {
115116
const TStructExprType* RowType;
116117
TVector<const TItemExprType*> RowItems;
117118
TExprNode::TPtr PreMap;
119+
bool UseBlocks;
118120

119121
TExprNode::TListType InitialColumnNames;
120122
TExprNode::TListType FinalColumnNames;
@@ -130,8 +132,10 @@ class TAggregateExpander {
130132
std::unordered_map<std::string_view, TExprNode::TPtr> UdfWasChanged;
131133
};
132134

133-
inline TExprNode::TPtr ExpandAggregatePeepholeImpl(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const bool useFinalizeByKey, const bool useBlocks) {
134-
TAggregateExpander aggExpander(!useFinalizeByKey && !useBlocks, useFinalizeByKey, node, ctx, typesCtx, true);
135+
inline TExprNode::TPtr ExpandAggregatePeepholeImpl(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
136+
const bool useFinalizeByKey, const bool useBlocks, const bool allowSpilling) {
137+
TAggregateExpander aggExpander(!useFinalizeByKey && !useBlocks, useFinalizeByKey, node, ctx, typesCtx,
138+
true, false, false, allowSpilling);
135139
return aggExpander.ExpandAggregate();
136140
}
137141

ydb/library/yql/dq/common/dq_common.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,34 @@ enum class EHashJoinMode {
9292
GraceAndSelf /* "graceandself" */,
9393
};
9494

95+
enum class EEnabledSpillingNodes : ui64 {
96+
None = 0ULL /* "None" */,
97+
GraceJoin = 1ULL /* "GraceJoin" */,
98+
Aggregation = 2ULL /* "Aggregation" */,
99+
All = ~0ULL /* "All" */,
100+
};
101+
102+
class TSpillingSettings {
103+
public:
104+
TSpillingSettings() = default;
105+
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};
106+
107+
operator bool() const {
108+
return Mask;
109+
}
110+
111+
bool IsGraceJoinSpillingEnabled() const {
112+
return Mask & ui64(EEnabledSpillingNodes::GraceJoin);
113+
}
114+
115+
bool IsAggregationSpillingEnabled() const {
116+
return Mask & ui64(EEnabledSpillingNodes::Aggregation);
117+
}
118+
119+
private:
120+
const ui64 Mask = 0;
121+
};
122+
95123
} // namespace NYql::NDq
96124

97125
IOutputStream& operator<<(IOutputStream& stream, const NYql::NDq::TTxId& txId);

ydb/library/yql/dq/opt/dq_opt_log.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ using namespace NYql::NNodes;
1717
namespace NYql::NDq {
1818

1919
TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool compactForDistinct,
20-
bool usePhases, const bool useFinalizeByKey)
20+
bool usePhases, const bool useFinalizeByKey, const bool allowSpilling)
2121
{
2222
if (!node.Maybe<TCoAggregateBase>()) {
2323
return node;
2424
}
25-
TAggregateExpander aggExpander(!typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
25+
TAggregateExpander aggExpander(!typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey,
26+
useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases, allowSpilling);
2627
auto result = aggExpander.ExpandAggregate();
2728
YQL_ENSURE(result);
2829

ydb/library/yql/dq/opt/dq_opt_log.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ namespace NYql {
1919

2020
namespace NYql::NDq {
2121

22-
NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool compactForDistinct, bool usePhases, const bool useFinalizeByKey);
22+
NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
23+
bool compactForDistinct, bool usePhases, const bool useFinalizeByKey, const bool allowSpilling);
2324

2425
NNodes::TExprBase DqRewriteTakeSortToTopSort(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);
2526

ydb/library/yql/dq/tasks/dq_task_program.h

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,6 @@
1111

1212
namespace NYql::NDq {
1313

14-
class TSpillingSettings {
15-
public:
16-
TSpillingSettings() = default;
17-
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};
18-
19-
operator bool() const {
20-
return Mask;
21-
}
22-
23-
bool IsGraceJoinSpillingEnabled() const {
24-
return Mask & ui64(TDqConfiguration::EEnabledSpillingNodes::GraceJoin);
25-
}
26-
27-
private:
28-
const ui64 Mask = 0;
29-
};
30-
3114
const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx);
3215

3316
TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,

ydb/library/yql/providers/dq/common/yql_dq_settings.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ TDqConfiguration::TDqConfiguration() {
110110
if (s.empty()) {
111111
throw yexception() << "Empty value item";
112112
}
113-
auto value = FromStringWithDefault<EEnabledSpillingNodes>(s, EEnabledSpillingNodes::None);
113+
114+
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
114115
res |= ui64(value);
115116
}
116117
return res;

ydb/library/yql/providers/dq/common/yql_dq_settings.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,6 @@ struct TDqSettings {
2828
File /* "file" */,
2929
};
3030

31-
enum class EEnabledSpillingNodes : ui64 {
32-
None = 0ULL /* None */,
33-
GraceJoin = 1ULL /* "GraceJoin" */,
34-
All = ~0ULL /* "All" */,
35-
};
36-
3731
struct TDefault {
3832
static constexpr ui32 MaxTasksPerStage = 20U;
3933
static constexpr ui32 MaxTasksPerOperation = 70U;

ydb/library/yql/providers/dq/opt/logical_optimize.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
137137
bool syncActor = Config->ComputeActorType.Get() != "async";
138138
return NHopping::RewriteAsHoppingWindow(node, ctx, input.Cast(), analyticsHopping, lateArrivalDelay, defaultWatermarksMode, syncActor);
139139
} else {
140-
return DqRewriteAggregate(node, ctx, TypesCtx, true, Config->UseAggPhases.Get().GetOrElse(false), Config->UseFinalizeByKey.Get().GetOrElse(false));
140+
NDq::TSpillingSettings spillingSettings(Config->GetEnabledSpillingNodes());
141+
return DqRewriteAggregate(node, ctx, TypesCtx, true, Config->UseAggPhases.Get().GetOrElse(false), Config->UseFinalizeByKey.Get().GetOrElse(false), spillingSettings.IsAggregationSpillingEnabled());
141142
}
142143
}
143144
return node;

ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,34 @@
244244
}
245245
],
246246
"test.test[aggregate-aggregate_with_deep_aggregated_column--Results]": [],
247+
"test.test[aggregate-disable_blocks_with_spilling--Analyze]": [
248+
{
249+
"checksum": "7887cfe87307d36449cd6afe65636dd1",
250+
"size": 4615,
251+
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Analyze_/plan.txt"
252+
}
253+
],
254+
"test.test[aggregate-disable_blocks_with_spilling--Debug]": [
255+
{
256+
"checksum": "c3bb2f21048ee6f5a0e7846fd28f481d",
257+
"size": 2423,
258+
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Debug_/opt.yql_patched"
259+
}
260+
],
261+
"test.test[aggregate-disable_blocks_with_spilling--Plan]": [
262+
{
263+
"checksum": "7887cfe87307d36449cd6afe65636dd1",
264+
"size": 4615,
265+
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Plan_/plan.txt"
266+
}
267+
],
268+
"test.test[aggregate-disable_blocks_with_spilling--Results]": [
269+
{
270+
"checksum": "42f51df2ad014764141d891357b0b6b6",
271+
"size": 915,
272+
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Results_/results.txt"
273+
}
274+
],
247275
"test.test[aggregate-group_by_column_alias_reuse-default.txt-Analyze]": [
248276
{
249277
"checksum": "bf546487bcd475b8555f2a7883d1f6a0",

ydb/library/yql/tests/sql/sql2yql/canondata/result.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1868,6 +1868,13 @@
18681868
"uri": "https://{canondata_backend}/1942415/e6af6d354a98ef890e03fc9f0ff5926afc11a26b/resource.tar.gz#test_sql2yql.test_aggregate-dedup_state_keys_/sql.yql"
18691869
}
18701870
],
1871+
"test_sql2yql.test[aggregate-disable_blocks_with_spilling]": [
1872+
{
1873+
"checksum": "e1c9df055ae7de78e0d0364ec949dec4",
1874+
"size": 1398,
1875+
"uri": "https://{canondata_backend}/1936947/cdbc6e86b3a08f513dc20af9f537f10f6b930f5d/resource.tar.gz#test_sql2yql.test_aggregate-disable_blocks_with_spilling_/sql.yql"
1876+
}
1877+
],
18711878
"test_sql2yql.test[aggregate-ensure_count]": [
18721879
{
18731880
"checksum": "680e664bf810c0f13951de38d3cf94f7",
@@ -21132,6 +21139,13 @@
2113221139
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-dedup_state_keys_/formatted.sql"
2113321140
}
2113421141
],
21142+
"test_sql_format.test[aggregate-disable_blocks_with_spilling]": [
21143+
{
21144+
"checksum": "ed1c0334420d2ec08b8ccc4020e4fb6b",
21145+
"size": 88,
21146+
"uri": "https://{canondata_backend}/1920236/3d99d8b2ede4d290229a75d3c17d5a932a859473/resource.tar.gz#test_sql_format.test_aggregate-disable_blocks_with_spilling_/formatted.sql"
21147+
}
21148+
],
2113521149
"test_sql_format.test[aggregate-ensure_count]": [
2113621150
{
2113721151
"checksum": "7a2ea2eeaf67cc395330f6718ce49635",
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
in Input input.txt
2+
3+
providers dq
4+
pragma dq.SpillingEngine="file";
5+
pragma dq.EnableSpillingNodes="Aggregation";
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pragma BlockEngine='force';
2+
select count(key) from plato.Input group by key;

0 commit comments

Comments
 (0)