Skip to content

Commit 9c8982c

Browse files
committed
add duntime nodes
1 parent 629070c commit 9c8982c

File tree

6 files changed

+46
-48
lines changed

6 files changed

+46
-48
lines changed

ydb/library/yql/dq/tasks/dq_task_program.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,37 @@
22

33
#include <ydb/library/yql/core/yql_expr_optimize.h>
44
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
5+
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
56
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
67

8+
#include <util/generic/xrange.h>
9+
710
namespace NYql::NDq {
811

912
using namespace NKikimr::NMiniKQL;
1013
using namespace NYql::NNodes;
1114

15+
16+
class TSpillingTransformProvider {
17+
public:
18+
TCallableVisitFunc operator()(TInternName name) {
19+
if (name == "GraceJoinCore" || name == "GraceSelfJoinCore") {
20+
return [name](NKikimr::NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
21+
TCallableBuilder callableBuilder(env,
22+
TStringBuilder() << callable.GetType()->GetName() << "WithSpilling",
23+
callable.GetType()->GetReturnType(), false);
24+
for (ui32 i: xrange(callable.GetInputsCount())) {
25+
callableBuilder.Add(callable.GetInput(i));
26+
}
27+
return TRuntimeNode(callableBuilder.Build(), false);
28+
};
29+
}
30+
31+
32+
return TCallableVisitFunc();
33+
}
34+
};
35+
1236
const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx) {
1337
TVector<const TItemExprType*> memberTypes;
1438

@@ -49,6 +73,15 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
4973

5074
TRuntimeNode rootNode = MkqlBuildExpr(program.Body().Ref(), ctx);
5175

76+
if (RuntimeVersion >= 50U) {
77+
TExploringNodeVisitor explorer;
78+
explorer.Walk(rootNode.GetNode(), typeEnv);
79+
bool wereChanges = false;
80+
rootNode = SinglePassVisitCallables(rootNode, explorer, TSpillingTransformProvider(), typeEnv, true, wereChanges);
81+
82+
std::cerr << "MISHA " << wereChanges << std::endl;
83+
}
84+
5285
TStructLiteralBuilder structBuilder(typeEnv);
5386
structBuilder.Add("Program", rootNode);
5487
structBuilder.Add("Inputs", pgmBuilder.NewTuple(inputNodes));

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,29 +1170,25 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod
11701170

11711171
IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
11721172
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
1173+
bool isSpillingAllowed = false;
1174+
if (callable.GetType()->GetName() == "GraceJoinCoreWithSpilling") {
1175+
isSpillingAllowed = true;
1176+
}
11731177

1174-
return WrapGraceJoinCommon(callable, ctx, false, false);
1178+
return WrapGraceJoinCommon(callable, ctx, false, isSpillingAllowed);
11751179
}
11761180

11771181
IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
11781182
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");
1179-
1180-
return WrapGraceJoinCommon(callable, ctx, true, false);
1181-
}
1182-
1183-
IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
1184-
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
11851183

1186-
return WrapGraceJoinCommon(callable, ctx, false, true);
1187-
}
1188-
1189-
IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
1190-
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");
1184+
bool isSpillingAllowed = false;
1185+
if (callable.GetType()->GetName() == "GraceSelfJoinCoreWithSpilling") {
1186+
isSpillingAllowed = true;
1187+
}
11911188

1192-
return WrapGraceJoinCommon(callable, ctx, true, true);
1189+
return WrapGraceJoinCommon(callable, ctx, true, isSpillingAllowed);
11931190
}
11941191

1195-
11961192
}
11971193

11981194
}

ydb/library/yql/minikql/mkql_program_builder.cpp

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2160,28 +2160,6 @@ TRuntimeNode TProgramBuilder::GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind jo
21602160
return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
21612161
}
21622162

2163-
TRuntimeNode TProgramBuilder::GraceJoinWithSpilling(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind,
2164-
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns,
2165-
const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) {
2166-
2167-
if constexpr (RuntimeVersion < 50U) {
2168-
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
2169-
}
2170-
2171-
return GraceJoinCommon(__func__, flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
2172-
}
2173-
2174-
TRuntimeNode TProgramBuilder::GraceSelfJoinWithSpilling(TRuntimeNode flowLeft, EJoinKind joinKind,
2175-
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns,
2176-
const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) {
2177-
2178-
if constexpr (RuntimeVersion < 50U) {
2179-
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
2180-
}
2181-
2182-
return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
2183-
}
2184-
21852163
TRuntimeNode TProgramBuilder::ToSortedDict(TRuntimeNode list, bool all, const TUnaryLambda& keySelector,
21862164
const TUnaryLambda& payloadSelector, bool isCompact, ui64 itemsCountHint) {
21872165
return ToDict(list, all, keySelector, payloadSelector, __func__, isCompact, itemsCountHint);

ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1704,13 +1704,6 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
17041704

17051705
const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
17061706

1707-
// TODO: use PRAGMA
1708-
bool IsSpillingAllowed = false;
1709-
if (RuntimeVersion >= 50U && IsSpillingAllowed) {
1710-
return selfJoin
1711-
? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
1712-
: ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
1713-
}
17141707
return selfJoin
17151708
? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
17161709
: ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);

ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@ void RegisterDqsMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, cons
1616
return TRuntimeNode();
1717
});
1818

19-
bool isSpillingEnabled = GetParamFromPragma();
20-
if (isSpillingEnabled) {
21-
compiler.OverrideCallable("GraceJoinCore", MakeGraceJoinCoreWithSpillingCallableCompiler());
22-
compiler.OverrideCallable("GraceSelfJoinCore", MakeGraceSelfJoinCoreWithSpillingCallableCompiler());
23-
}
2419
auto integrations = GetUniqueIntegrations(ctx);
2520
std::for_each(integrations.cbegin(), integrations.cend(), std::bind(&IDqIntegration::RegisterMkqlCompiler, std::placeholders::_1, std::ref(compiler)));
2621
}

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "execution_planner.h"
22

33
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
4+
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
45
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
56
#include <ydb/library/yql/providers/dq/opt/dqs_opt.h>
67
#include <ydb/library/yql/providers/dq/opt/logical_optimize.h>
@@ -28,6 +29,8 @@
2829
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
2930
#include <ydb/library/actors/core/event_pb.h>
3031

32+
#include <util/generic/xrange.h>
33+
3134
#include <stack>
3235

3336
using namespace NYql;

0 commit comments

Comments
 (0)