Skip to content

Commit c9e20ce

Browse files
Add pragma to grace join runtime node with spilling (#6253)
1 parent 7b7a29c commit c9e20ce

File tree

13 files changed

+99
-54
lines changed

13 files changed

+99
-54
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
764764
lambda = NDq::BuildProgram(
765765
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
766766
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
767-
ctx, fakeReads);
767+
ctx, fakeReads, {});
768768

769769
NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
770770
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
778778

779779
auto paramsType = CollectParameters(stage, ctx);
780780
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
781-
ctx, {});
781+
ctx, {}, {});
782782

783783
auto& programProto = *stageProto.MutableProgram();
784784
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);

ydb/library/yql/dq/tasks/dq_task_program.cpp

+36-2
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,41 @@
22

33
#include <ydb/library/yql/core/yql_expr_optimize.h>
44
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
5+
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
56
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
67

78
namespace NYql::NDq {
89

910
using namespace NKikimr::NMiniKQL;
1011
using namespace NYql::NNodes;
1112

13+
14+
class TSpillingTransformProvider {
15+
public:
16+
17+
TSpillingTransformProvider(const TSpillingSettings& spillingSettings): SpillingSettings(spillingSettings){};
18+
19+
TCallableVisitFunc operator()(TInternName name) {
20+
if (RuntimeVersion >= 50U && SpillingSettings.IsGraceJoinSpillingEnabled() && (name == "GraceJoin" || name == "GraceSelfJoin")) {
21+
return [name](NKikimr::NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
22+
TCallableBuilder callableBuilder(env,
23+
TStringBuilder() << callable.GetType()->GetName() << "WithSpilling",
24+
callable.GetType()->GetReturnType(), false);
25+
for (ui32 i = 0; i < callable.GetInputsCount(); ++i) {
26+
callableBuilder.Add(callable.GetInput(i));
27+
}
28+
return TRuntimeNode(callableBuilder.Build(), false);
29+
};
30+
}
31+
32+
return TCallableVisitFunc();
33+
}
34+
35+
private:
36+
37+
TSpillingSettings SpillingSettings;
38+
};
39+
1240
const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx) {
1341
TVector<const TItemExprType*> memberTypes;
1442

@@ -27,7 +55,7 @@ const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext
2755

2856
TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,
2957
const NCommon::IMkqlCallableCompiler& compiler, const TTypeEnvironment& typeEnv,
30-
const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads)
58+
const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads, const TSpillingSettings& spillingSettings)
3159
{
3260
TProgramBuilder pgmBuilder(typeEnv, funcRegistry);
3361

@@ -49,6 +77,13 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
4977

5078
TRuntimeNode rootNode = MkqlBuildExpr(program.Body().Ref(), ctx);
5179

80+
TExploringNodeVisitor explorer;
81+
if (spillingSettings) {
82+
explorer.Walk(rootNode.GetNode(), typeEnv);
83+
bool wereChanges = false;
84+
rootNode = SinglePassVisitCallables(rootNode, explorer, TSpillingTransformProvider(spillingSettings), typeEnv, true, wereChanges);
85+
}
86+
5287
TStructLiteralBuilder structBuilder(typeEnv);
5388
structBuilder.Add("Program", rootNode);
5489
structBuilder.Add("Inputs", pgmBuilder.NewTuple(inputNodes));
@@ -64,7 +99,6 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
6499

65100
auto programNode = structBuilder.Build();
66101

67-
TExploringNodeVisitor explorer;
68102
explorer.Walk(programNode, typeEnv);
69103
ui32 uniqueId = 0;
70104
for (auto& node : explorer.GetNodes()) {

ydb/library/yql/dq/tasks/dq_task_program.h

+20-1
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,32 @@
77
#include <ydb/library/yql/providers/common/mkql/yql_provider_mkql.h>
88
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
99

10+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
11+
1012
namespace NYql::NDq {
1113

14+
class TSpillingSettings {
15+
public:
16+
TSpillingSettings() = default;
17+
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};
18+
19+
operator bool() const {
20+
return Mask;
21+
}
22+
23+
bool IsGraceJoinSpillingEnabled() const {
24+
return Mask & ui64(TDqConfiguration::EEnabledSpillingNodes::GraceJoin);
25+
}
26+
27+
private:
28+
const ui64 Mask = 0;
29+
};
30+
1231
const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx);
1332

1433
TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,
1534
const NCommon::IMkqlCallableCompiler& compiler, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
1635
const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, TExprContext& exprCtx,
17-
const TVector<NNodes::TExprBase>& reads);
36+
const TVector<NNodes::TExprBase>& reads, const TSpillingSettings& spillingSettings);
1837

1938
} // namespace NYql::NDq

ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
234234
{"JoinDict", &WrapJoinDict},
235235
{"GraceJoin", &WrapGraceJoin},
236236
{"GraceSelfJoin", &WrapGraceSelfJoin},
237-
{"GraceJoinWithSpilling", &WrapGraceJoinWithSpilling},
238-
{"GraceSelfJoinWithSpilling", &WrapGraceSelfJoinWithSpilling},
237+
{"GraceJoinWithSpilling", &WrapGraceJoin},
238+
{"GraceSelfJoinWithSpilling", &WrapGraceSelfJoin},
239239
{"MapJoinCore", &WrapMapJoinCore},
240240
{"CommonJoinCore", &WrapCommonJoinCore},
241241
{"CombineCore", &WrapCombineCore},

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

+3-16
Original file line numberDiff line numberDiff line change
@@ -1096,7 +1096,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
10961096

10971097
}
10981098

1099-
IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) {
1099+
IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) {
11001100
const auto leftFlowNodeIndex = 0;
11011101
const auto rightFlowNodeIndex = 1;
11021102
const auto joinKindNodeIndex = isSelfJoin ? 1 : 2;
@@ -1175,28 +1175,15 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod
11751175
IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
11761176
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
11771177

1178-
return WrapGraceJoinCommon(callable, ctx, false, false);
1178+
return WrapGraceJoinCommon(callable, ctx, false, HasSpillingFlag(callable));
11791179
}
11801180

11811181
IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
11821182
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");
11831183

1184-
return WrapGraceJoinCommon(callable, ctx, true, false);
1184+
return WrapGraceJoinCommon(callable, ctx, true, HasSpillingFlag(callable));
11851185
}
11861186

1187-
IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
1188-
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
1189-
1190-
return WrapGraceJoinCommon(callable, ctx, false, true);
1191-
}
1192-
1193-
IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
1194-
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");
1195-
1196-
return WrapGraceJoinCommon(callable, ctx, true, true);
1197-
}
1198-
1199-
12001187
}
12011188

12021189
}

ydb/library/yql/minikql/mkql_program_builder.cpp

-22
Original file line numberDiff line numberDiff line change
@@ -2160,28 +2160,6 @@ TRuntimeNode TProgramBuilder::GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind jo
21602160
return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
21612161
}
21622162

2163-
TRuntimeNode TProgramBuilder::GraceJoinWithSpilling(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind,
2164-
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns,
2165-
const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) {
2166-
2167-
if constexpr (RuntimeVersion < 50U) {
2168-
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
2169-
}
2170-
2171-
return GraceJoinCommon(__func__, flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
2172-
}
2173-
2174-
TRuntimeNode TProgramBuilder::GraceSelfJoinWithSpilling(TRuntimeNode flowLeft, EJoinKind joinKind,
2175-
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns,
2176-
const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) {
2177-
2178-
if constexpr (RuntimeVersion < 50U) {
2179-
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
2180-
}
2181-
2182-
return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
2183-
}
2184-
21852163
TRuntimeNode TProgramBuilder::ToSortedDict(TRuntimeNode list, bool all, const TUnaryLambda& keySelector,
21862164
const TUnaryLambda& payloadSelector, bool isCompact, ui64 itemsCountHint) {
21872165
return ToDict(list, all, keySelector, payloadSelector, __func__, isCompact, itemsCountHint);

ydb/library/yql/minikql/mkql_program_builder.h

+4
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ inline void AddAnyJoinSide(EAnyJoinSettings& combined, EAnyJoinSettings value) {
8686
combined = (EAnyJoinSettings)combinedVal;
8787
}
8888

89+
inline bool HasSpillingFlag(const TCallable& callable) {
90+
return TStringBuf(callable.GetType()->GetName()).EndsWith("WithSpilling"_sb);
91+
}
92+
8993
#define MKQL_SCRIPT_TYPES(xx) \
9094
xx(Unknown, 0, unknown, false) \
9195
xx(Python, 1, python, false) \

ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp

-7
Original file line numberDiff line numberDiff line change
@@ -1704,13 +1704,6 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
17041704

17051705
const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
17061706

1707-
// TODO: use PRAGMA
1708-
bool IsSpillingAllowed = false;
1709-
if (RuntimeVersion >= 50U && IsSpillingAllowed) {
1710-
return selfJoin
1711-
? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
1712-
: ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
1713-
}
17141707
return selfJoin
17151708
? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
17161709
: ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);

ydb/library/yql/providers/dq/common/yql_dq_settings.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "yql_dq_settings.h"
2+
#include <util/string/split.h>
23

34
namespace NYql {
45

@@ -98,6 +99,20 @@ TDqConfiguration::TDqConfiguration() {
9899

99100
REGISTER_SETTING(*this, _MaxAttachmentsSize);
100101
REGISTER_SETTING(*this, DisableCheckpoints);
102+
REGISTER_SETTING(*this, EnableSpillingNodes)
103+
.Parser([](const TString& v) {
104+
ui64 res = 0;
105+
TVector<TString> vec;
106+
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
107+
for (auto& s: vec) {
108+
if (s.empty()) {
109+
throw yexception() << "Empty value item";
110+
}
111+
auto value = FromString<EEnabledSpillingNodes>(s);
112+
res |= ui64(value);
113+
}
114+
return res;
115+
});
101116
}
102117

103118
} // namespace NYql

ydb/library/yql/providers/dq/common/yql_dq_settings.h

+13
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ struct TDqSettings {
2828
File /* "file" */,
2929
};
3030

31+
enum class EEnabledSpillingNodes : ui64 {
32+
GraceJoin = 1ULL /* "GraceJoin" */,
33+
All = ~0ULL /* "All" */,
34+
};
35+
3136
struct TDefault {
3237
static constexpr ui32 MaxTasksPerStage = 20U;
3338
static constexpr ui32 MaxTasksPerOperation = 70U;
@@ -60,6 +65,7 @@ struct TDqSettings {
6065
static constexpr ui32 MaxDPccpDPTableSize = 16400U;
6166
static constexpr ui64 MaxAttachmentsSize = 2_GB;
6267
static constexpr bool SplitStageOnDqReplicate = true;
68+
static constexpr ui64 EnableSpillingNodes = 0;
6369
};
6470

6571
using TPtr = std::shared_ptr<TDqSettings>;
@@ -131,6 +137,8 @@ struct TDqSettings {
131137
NCommon::TConfSetting<bool, false> DisableLLVMForBlockStages;
132138
NCommon::TConfSetting<bool, false> SplitStageOnDqReplicate;
133139

140+
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
141+
134142
NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
135143
NCommon::TConfSetting<bool, false> DisableCheckpoints;
136144

@@ -215,6 +223,11 @@ struct TDqSettings {
215223
return SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != ESpillingEngine::Disable;
216224
}
217225

226+
ui64 GetEnabledSpillingNodes() const {
227+
if (!IsSpillingEnabled()) return 0;
228+
return EnableSpillingNodes.Get().GetOrElse(0);
229+
}
230+
218231
bool IsDqReplicateEnabled(const TTypeAnnotationContext& typesCtx) const {
219232
return EnableDqReplicate.Get().GetOrElse(
220233
typesCtx.BlockEngineMode != EBlockEngineMode::Disable || TDqSettings::TDefault::EnableDqReplicate);

ydb/library/yql/providers/dq/planner/execution_planner.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -676,10 +676,11 @@ namespace NYql::NDqs {
676676
Y_ABORT_UNLESS(false);
677677
}
678678
*/
679+
TSpillingSettings spillingSettings{Settings->GetEnabledSpillingNodes()};
679680
StagePrograms[stageInfo.first] = std::make_tuple(
680681
NDq::BuildProgram(
681682
stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry,
682-
ExprContext, fakeReads),
683+
ExprContext, fakeReads, spillingSettings),
683684
stageId, publicId);
684685
}
685686
}

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -760,9 +760,10 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
760760

761761
TVector<TExprBase> fakeReads;
762762
auto paramsType = NDq::CollectParameters(programLambda, ctx);
763+
NDq::TSpillingSettings spillingSettings{State->Settings->GetEnabledSpillingNodes()};
763764
*lambda = NDq::BuildProgram(
764765
programLambda, *paramsType, compiler, typeEnv, *State->FunctionRegistry,
765-
ctx, fakeReads);
766+
ctx, fakeReads, spillingSettings);
766767
}
767768

768769
auto block = MeasureBlock("RuntimeNodeVisitor");

0 commit comments

Comments
 (0)