Skip to content

Add pragma to grace join runtime node with spilling #6253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
lambda = NDq::BuildProgram(
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
ctx, fakeReads);
ctx, fakeReads, {});

NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {

auto paramsType = CollectParameters(stage, ctx);
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
ctx, {});
ctx, {}, {});

auto& programProto = *stageProto.MutableProgram();
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
Expand Down
38 changes: 36 additions & 2 deletions ydb/library/yql/dq/tasks/dq_task_program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,41 @@

#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>

namespace NYql::NDq {

using namespace NKikimr::NMiniKQL;
using namespace NYql::NNodes;


class TSpillingTransformProvider {
public:

TSpillingTransformProvider(const TSpillingSettings& spillingSettings): SpillingSettings(spillingSettings){};

TCallableVisitFunc operator()(TInternName name) {
if (RuntimeVersion >= 50U && SpillingSettings.IsGraceJoinSpillingEnabled() && (name == "GraceJoin" || name == "GraceSelfJoin")) {
return [name](NKikimr::NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
TCallableBuilder callableBuilder(env,
TStringBuilder() << callable.GetType()->GetName() << "WithSpilling",
callable.GetType()->GetReturnType(), false);
for (ui32 i = 0; i < callable.GetInputsCount(); ++i) {
callableBuilder.Add(callable.GetInput(i));
}
return TRuntimeNode(callableBuilder.Build(), false);
};
}

return TCallableVisitFunc();
}

private:

TSpillingSettings SpillingSettings;
};

const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx) {
TVector<const TItemExprType*> memberTypes;

Expand All @@ -27,7 +55,7 @@ const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext

TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,
const NCommon::IMkqlCallableCompiler& compiler, const TTypeEnvironment& typeEnv,
const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads)
const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads, const TSpillingSettings& spillingSettings)
{
TProgramBuilder pgmBuilder(typeEnv, funcRegistry);

Expand All @@ -49,6 +77,13 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp

TRuntimeNode rootNode = MkqlBuildExpr(program.Body().Ref(), ctx);

TExploringNodeVisitor explorer;
if (spillingSettings) {
explorer.Walk(rootNode.GetNode(), typeEnv);
bool wereChanges = false;
rootNode = SinglePassVisitCallables(rootNode, explorer, TSpillingTransformProvider(spillingSettings), typeEnv, true, wereChanges);
}

TStructLiteralBuilder structBuilder(typeEnv);
structBuilder.Add("Program", rootNode);
structBuilder.Add("Inputs", pgmBuilder.NewTuple(inputNodes));
Expand All @@ -64,7 +99,6 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp

auto programNode = structBuilder.Build();

TExploringNodeVisitor explorer;
explorer.Walk(programNode, typeEnv);
ui32 uniqueId = 0;
for (auto& node : explorer.GetNodes()) {
Expand Down
21 changes: 20 additions & 1 deletion ydb/library/yql/dq/tasks/dq_task_program.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,32 @@
#include <ydb/library/yql/providers/common/mkql/yql_provider_mkql.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>

#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

namespace NYql::NDq {

class TSpillingSettings {
public:
TSpillingSettings() = default;
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};

operator bool() const {
return Mask;
}

bool IsGraceJoinSpillingEnabled() const {
return Mask & ui64(TDqConfiguration::EEnabledSpillingNodes::GraceJoin);
}

private:
const ui64 Mask = 0;
};

const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx);

TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,
const NCommon::IMkqlCallableCompiler& compiler, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, TExprContext& exprCtx,
const TVector<NNodes::TExprBase>& reads);
const TVector<NNodes::TExprBase>& reads, const TSpillingSettings& spillingSettings);

} // namespace NYql::NDq
4 changes: 2 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"JoinDict", &WrapJoinDict},
{"GraceJoin", &WrapGraceJoin},
{"GraceSelfJoin", &WrapGraceSelfJoin},
{"GraceJoinWithSpilling", &WrapGraceJoinWithSpilling},
{"GraceSelfJoinWithSpilling", &WrapGraceSelfJoinWithSpilling},
{"GraceJoinWithSpilling", &WrapGraceJoin},
{"GraceSelfJoinWithSpilling", &WrapGraceSelfJoin},
{"MapJoinCore", &WrapMapJoinCore},
{"CommonJoinCore", &WrapCommonJoinCore},
{"CombineCore", &WrapCombineCore},
Expand Down
19 changes: 3 additions & 16 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr

}

IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) {
IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) {
const auto leftFlowNodeIndex = 0;
const auto rightFlowNodeIndex = 1;
const auto joinKindNodeIndex = isSelfJoin ? 1 : 2;
Expand Down Expand Up @@ -1171,28 +1171,15 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod
IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");

return WrapGraceJoinCommon(callable, ctx, false, false);
return WrapGraceJoinCommon(callable, ctx, false, HasSpillingFlag(callable));
}

IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");

return WrapGraceJoinCommon(callable, ctx, true, false);
return WrapGraceJoinCommon(callable, ctx, true, HasSpillingFlag(callable));
}

IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");

return WrapGraceJoinCommon(callable, ctx, false, true);
}

IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");

return WrapGraceJoinCommon(callable, ctx, true, true);
}


}

}
Expand Down
22 changes: 0 additions & 22 deletions ydb/library/yql/minikql/mkql_program_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2160,28 +2160,6 @@ TRuntimeNode TProgramBuilder::GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind jo
return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
}

TRuntimeNode TProgramBuilder::GraceJoinWithSpilling(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind,
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns,
const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) {

if constexpr (RuntimeVersion < 50U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}

return GraceJoinCommon(__func__, flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
}

TRuntimeNode TProgramBuilder::GraceSelfJoinWithSpilling(TRuntimeNode flowLeft, EJoinKind joinKind,
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns,
const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) {

if constexpr (RuntimeVersion < 50U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}

return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
}

TRuntimeNode TProgramBuilder::ToSortedDict(TRuntimeNode list, bool all, const TUnaryLambda& keySelector,
const TUnaryLambda& payloadSelector, bool isCompact, ui64 itemsCountHint) {
return ToDict(list, all, keySelector, payloadSelector, __func__, isCompact, itemsCountHint);
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/minikql/mkql_program_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ inline void AddAnyJoinSide(EAnyJoinSettings& combined, EAnyJoinSettings value) {
combined = (EAnyJoinSettings)combinedVal;
}

inline bool HasSpillingFlag(const TCallable& callable) {
return TStringBuf(callable.GetType()->GetName()).EndsWith("WithSpilling"_sb);
}

#define MKQL_SCRIPT_TYPES(xx) \
xx(Unknown, 0, unknown, false) \
xx(Python, 1, python, false) \
Expand Down
7 changes: 0 additions & 7 deletions ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1704,13 +1704,6 @@ TMkqlCommonCallableCompiler::TShared::TShared() {

const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);

// TODO: use PRAGMA
bool IsSpillingAllowed = false;
if (RuntimeVersion >= 50U && IsSpillingAllowed) {
return selfJoin
? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
: ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
}
return selfJoin
? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
: ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
Expand Down
15 changes: 15 additions & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "yql_dq_settings.h"
#include <util/string/split.h>

namespace NYql {

Expand Down Expand Up @@ -98,6 +99,20 @@ TDqConfiguration::TDqConfiguration() {

REGISTER_SETTING(*this, _MaxAttachmentsSize);
REGISTER_SETTING(*this, DisableCheckpoints);
REGISTER_SETTING(*this, EnableSpillingNodes)
.Parser([](const TString& v) {
ui64 res = 0;
TVector<TString> vec;
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
for (auto& s: vec) {
if (s.empty()) {
throw yexception() << "Empty value item";
}
auto value = FromString<EEnabledSpillingNodes>(s);
res |= ui64(value);
}
return res;
});
}

} // namespace NYql
13 changes: 13 additions & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ struct TDqSettings {
File /* "file" */,
};

enum class EEnabledSpillingNodes : ui64 {
GraceJoin = 1ULL /* "GraceJoin" */,
All = ~0ULL /* "All" */,
};

struct TDefault {
static constexpr ui32 MaxTasksPerStage = 20U;
static constexpr ui32 MaxTasksPerOperation = 70U;
Expand Down Expand Up @@ -60,6 +65,7 @@ struct TDqSettings {
static constexpr ui32 MaxDPccpDPTableSize = 16400U;
static constexpr ui64 MaxAttachmentsSize = 2_GB;
static constexpr bool SplitStageOnDqReplicate = true;
static constexpr ui64 EnableSpillingNodes = 0;
};

using TPtr = std::shared_ptr<TDqSettings>;
Expand Down Expand Up @@ -131,6 +137,8 @@ struct TDqSettings {
NCommon::TConfSetting<bool, false> DisableLLVMForBlockStages;
NCommon::TConfSetting<bool, false> SplitStageOnDqReplicate;

NCommon::TConfSetting<ui64, false> EnableSpillingNodes;

NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
NCommon::TConfSetting<bool, false> DisableCheckpoints;

Expand Down Expand Up @@ -215,6 +223,11 @@ struct TDqSettings {
return SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != ESpillingEngine::Disable;
}

ui64 GetEnabledSpillingNodes() const {
if (!IsSpillingEnabled()) return 0;
return EnableSpillingNodes.Get().GetOrElse(0);
}

bool IsDqReplicateEnabled(const TTypeAnnotationContext& typesCtx) const {
return EnableDqReplicate.Get().GetOrElse(
typesCtx.BlockEngineMode != EBlockEngineMode::Disable || TDqSettings::TDefault::EnableDqReplicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,11 @@ namespace NYql::NDqs {
Y_ABORT_UNLESS(false);
}
*/
TSpillingSettings spillingSettings{Settings->GetEnabledSpillingNodes()};
StagePrograms[stageInfo.first] = std::make_tuple(
NDq::BuildProgram(
stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry,
ExprContext, fakeReads),
ExprContext, fakeReads, spillingSettings),
stageId, publicId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,10 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

TVector<TExprBase> fakeReads;
auto paramsType = NDq::CollectParameters(programLambda, ctx);
NDq::TSpillingSettings spillingSettings{State->Settings->GetEnabledSpillingNodes()};
*lambda = NDq::BuildProgram(
programLambda, *paramsType, compiler, typeEnv, *State->FunctionRegistry,
ctx, fakeReads);
ctx, fakeReads, spillingSettings);
Comment on lines +764 to +767
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Этот код зовется из HandleResult, который в том числе может исполнять лямбду локально. Тут нужно либо отключать сплиинг для локального исполнения, либо убедиться, что все отработает номрально без spilling service под ногами

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Спиллинг не должен включаться без spilling service под ногами. Нужно добавить проверку в TSpillingTransformProvider, чтобы спиллинг ноды не включались, если спиллинга нет.

Предлагаю это сделать в PR по добавлению прагмы в widecombine, чтобы не ждать 3 часа тестов

}

auto block = MeasureBlock("RuntimeNodeVisitor");
Expand Down
Loading