Skip to content

Commit 69259bd

Browse files
committed
add TSpillingSettings
1 parent 870b68e commit 69259bd

File tree

7 files changed

+32
-19
lines changed

7 files changed

+32
-19
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
764764
lambda = NDq::BuildProgram(
765765
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
766766
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
767-
ctx, fakeReads);
767+
ctx, fakeReads, {});
768768

769769
NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
770770
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
778778

779779
auto paramsType = CollectParameters(stage, ctx);
780780
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
781-
ctx, {});
781+
ctx, {}, {});
782782

783783
auto& programProto = *stageProto.MutableProgram();
784784
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);

ydb/library/yql/dq/tasks/dq_task_program.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
66
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
77

8-
#include <util/generic/xrange.h>
9-
108
namespace NYql::NDq {
119

1210
using namespace NKikimr::NMiniKQL;
@@ -15,13 +13,17 @@ using namespace NYql::NNodes;
1513

1614
class TSpillingTransformProvider {
1715
public:
16+
17+
TSpillingTransformProvider(const TSpillingSettings& spillingSettings): SpillingSettings(spillingSettings){};
18+
19+
1820
TCallableVisitFunc operator()(TInternName name) {
19-
if (name == "GraceJoin" || name == "GraceSelfJoin") {
21+
if (RuntimeVersion >= 50U && SpillingSettings.EnableSpillingInGraceJoin && (name == "GraceJoin" || name == "GraceSelfJoin")) {
2022
return [name](NKikimr::NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
2123
TCallableBuilder callableBuilder(env,
2224
TStringBuilder() << callable.GetType()->GetName() << "WithSpilling",
2325
callable.GetType()->GetReturnType(), false);
24-
for (ui32 i: xrange(callable.GetInputsCount())) {
26+
for (ui32 i = 0; i < callable.GetInputsCount(); ++i) {
2527
callableBuilder.Add(callable.GetInput(i));
2628
}
2729
return TRuntimeNode(callableBuilder.Build(), false);
@@ -31,6 +33,10 @@ class TSpillingTransformProvider {
3133

3234
return TCallableVisitFunc();
3335
}
36+
37+
private:
38+
39+
TSpillingSettings SpillingSettings;
3440
};
3541

3642
const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx) {
@@ -51,7 +57,7 @@ const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext
5157

5258
TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,
5359
const NCommon::IMkqlCallableCompiler& compiler, const TTypeEnvironment& typeEnv,
54-
const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads)
60+
const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads, const TSpillingSettings& spillingSettings)
5561
{
5662
TProgramBuilder pgmBuilder(typeEnv, funcRegistry);
5763

@@ -73,14 +79,10 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
7379

7480
TRuntimeNode rootNode = MkqlBuildExpr(program.Body().Ref(), ctx);
7581

76-
if (RuntimeVersion >= 50U) {
77-
TExploringNodeVisitor explorer;
78-
explorer.Walk(rootNode.GetNode(), typeEnv);
79-
bool wereChanges = false;
80-
rootNode = SinglePassVisitCallables(rootNode, explorer, TSpillingTransformProvider(), typeEnv, true, wereChanges);
81-
82-
std::cerr << "MISHA " << wereChanges << std::endl;
83-
}
82+
TExploringNodeVisitor explorer;
83+
explorer.Walk(rootNode.GetNode(), typeEnv);
84+
bool wereChanges = false;
85+
rootNode = SinglePassVisitCallables(rootNode, explorer, TSpillingTransformProvider(spillingSettings), typeEnv, true, wereChanges);
8486

8587
TStructLiteralBuilder structBuilder(typeEnv);
8688
structBuilder.Add("Program", rootNode);
@@ -97,7 +99,6 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
9799

98100
auto programNode = structBuilder.Build();
99101

100-
TExploringNodeVisitor explorer;
101102
explorer.Walk(programNode, typeEnv);
102103
ui32 uniqueId = 0;
103104
for (auto& node : explorer.GetNodes()) {

ydb/library/yql/dq/tasks/dq_task_program.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99

1010
namespace NYql::NDq {
1111

12+
struct TSpillingSettings {
13+
bool EnableSpillingInGraceJoin = false;
14+
};
15+
1216
const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx);
1317

1418
TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,
1519
const NCommon::IMkqlCallableCompiler& compiler, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
1620
const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, TExprContext& exprCtx,
17-
const TVector<NNodes::TExprBase>& reads);
21+
const TVector<NNodes::TExprBase>& reads, const TSpillingSettings& spillingSettings);
1822

1923
} // namespace NYql::NDq

ydb/library/yql/providers/dq/common/yql_dq_settings.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ struct TDqSettings {
218218
return SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != ESpillingEngine::Disable;
219219
}
220220

221+
bool IsSpillingInGraceJoinEnabled() const {
222+
return IsSpillingEnabled() && EnableSpillingInGraceJoin.Get().GetOrElse(false);
223+
}
224+
221225
bool IsDqReplicateEnabled(const TTypeAnnotationContext& typesCtx) const {
222226
return EnableDqReplicate.Get().GetOrElse(
223227
typesCtx.BlockEngineMode != EBlockEngineMode::Disable || TDqSettings::TDefault::EnableDqReplicate);

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "execution_planner.h"
22

33
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
4+
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
45
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
56
#include <ydb/library/yql/providers/dq/opt/dqs_opt.h>
67
#include <ydb/library/yql/providers/dq/opt/logical_optimize.h>
@@ -676,10 +677,12 @@ namespace NYql::NDqs {
676677
Y_ABORT_UNLESS(false);
677678
}
678679
*/
680+
681+
TSpillingSettings spillingSettings{Settings->IsSpillingInGraceJoinEnabled()};
679682
StagePrograms[stageInfo.first] = std::make_tuple(
680683
NDq::BuildProgram(
681684
stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry,
682-
ExprContext, fakeReads),
685+
ExprContext, fakeReads, spillingSettings),
683686
stageId, publicId);
684687
}
685688
}

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,9 +761,10 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
761761

762762
TVector<TExprBase> fakeReads;
763763
auto paramsType = NDq::CollectParameters(programLambda, ctx);
764+
NDq::TSpillingSettings spillingSettings{State->Settings->IsSpillingInGraceJoinEnabled()};
764765
*lambda = NDq::BuildProgram(
765766
programLambda, *paramsType, compiler, typeEnv, *State->FunctionRegistry,
766-
ctx, fakeReads);
767+
ctx, fakeReads, spillingSettings);
767768
}
768769

769770
auto block = MeasureBlock("RuntimeNodeVisitor");

0 commit comments

Comments
 (0)