Skip to content

Commit 1f95d62

Browse files
authored
[dq] Integrate YT peephole transforms into DQ integration (YQL-17386) (#605)
1 parent 3facfc0 commit 1f95d62

File tree

20 files changed

+223
-180
lines changed

20 files changed

+223
-180
lines changed

ydb/library/yql/providers/dq/planner/execution_planner.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ namespace NYql::NDqs {
590590
BUILD_CONNECTION(TDqCnMerge, BuildMergeChannels);
591591
YQL_ENSURE(false, "Unknown stage connection type: " << input.Cast<NNodes::TCallable>().CallableName());
592592
} else {
593-
YQL_ENSURE(input.Maybe<TDqSource>());
593+
YQL_ENSURE(input.Maybe<TDqSource>(), "Unknown stage input: " << input.Cast<NNodes::TCallable>().CallableName());
594594
}
595595
}
596596
}

ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.h>
88
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
99
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
10+
#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
1011
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
1112
#include <ydb/library/yql/core/yql_opt_utils.h>
1213
#include <ydb/library/yql/utils/yql_panic.h>
@@ -22,6 +23,7 @@
2223

2324
#include <util/generic/xrange.h>
2425
#include <util/string/cast.h>
26+
#include <util/stream/str.h>
2527

2628
namespace NYql {
2729

@@ -1061,13 +1063,17 @@ void RegisterDqYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler)
10611063

10621064
auto values = NCommon::MkqlBuildExpr(wideWrite.Input().Ref(), ctx);
10631065

1064-
TYtOutTable table{GetSetting(wideWrite.Settings().Ref(), "outTable")->Child(1)};
1065-
auto inputItemType = NCommon::BuildType(wideWrite.Input().Ref(), GetSeqItemType(*table.Ref().GetTypeAnn()), ctx.ProgramBuilder);
1066+
auto tableName = GetSetting(wideWrite.Settings().Ref(), "tableName")->Child(1)->Content();
1067+
auto tableType = GetSetting(wideWrite.Settings().Ref(), "tableType")->Child(1)->Content();
1068+
1069+
TStringStream err;
1070+
auto inputItemType = NCommon::ParseTypeFromYson(tableType, ctx.ProgramBuilder, err);
1071+
YQL_ENSURE(inputItemType, "Parse type error: " << err.Str());
10661072

10671073
auto structType = AS_TYPE(TStructType, inputItemType);
10681074
values = NarrowFlow(values, *structType, ctx);
10691075
values = ctx.ProgramBuilder.Map(values, [&](TRuntimeNode item) {
1070-
return BuildDqWrite(item, table.Name().Value(), ctx);
1076+
return BuildDqWrite(item, tableName, ctx);
10711077
});
10721078
return values;
10731079
});

ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp

+76-26
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
221221
}
222222

223223
bool hasNonDeterministicFunctions = false;
224-
if (const auto status = PeepHoleOptimizeBeforeExec<true>(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
224+
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
225225
return SyncStatus(status);
226226
}
227227

@@ -628,7 +628,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
628628
}
629629

630630
bool hasNonDeterministicFunctions = false;
631-
if (const auto status = PeepHoleOptimizeBeforeExec<false>(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
631+
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
632632
return SyncStatus(status);
633633
}
634634

@@ -685,50 +685,100 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
685685
auto clusterStr = TString{cluster.Value()};
686686

687687
delegatedNode = input->ChildPtr(TYtDqProcessWrite::idx_Input);
688-
if (const auto status = SubstTables(delegatedNode, State_, false, ctx); status.Level == TStatus::Error) {
689-
return SyncStatus(status);
688+
689+
auto server = State_->Gateway->GetClusterServer(clusterStr);
690+
YQL_ENSURE(server, "Invalid YT cluster: " << clusterStr);
691+
692+
NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, clusterStr, tmpTable.Name().StringValue(), tmpFolder);
693+
realTable.Append(true);
694+
YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId");
695+
696+
NYT::TNode writerOptions = NYT::TNode::CreateMap();
697+
if (auto maxRowWeight = config->MaxRowWeight.Get(clusterStr)) {
698+
writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight->GetValue());
699+
}
700+
701+
NYT::TNode outSpec;
702+
NYT::TNode type;
703+
{
704+
auto rowSpec = TYqlRowSpecInfo(tmpTable.RowSpec());
705+
NYT::TNode spec;
706+
rowSpec.FillCodecNode(spec[YqlRowSpecAttribute]);
707+
outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec));
708+
type = rowSpec.GetTypeNode();
690709
}
691-
bool hasNonDeterministicFunctions = false;
692-
TYtExtraPeepHoleSettings settings;
693-
settings.CurrentCluster = clusterStr;
694-
settings.TmpTable = &tmpTable;
695-
settings.TmpFolder = tmpFolder;
696-
settings.Config = config;
697-
if (const auto status = PeepHoleOptimizeBeforeExec<false>(delegatedNode, delegatedNode, State_,
698-
hasNonDeterministicFunctions, ctx, settings); status.Level == TStatus::Error) {
699-
return SyncStatus(status);
710+
711+
// These settings will be passed to YT peephole callback from DQ
712+
auto settings = Build<TCoNameValueTupleList>(ctx, delegatedNode->Pos())
713+
.Add()
714+
.Name().Value("yt_cluster", TNodeFlags::Default).Build()
715+
.Value<TCoAtom>().Value(clusterStr).Build()
716+
.Build()
717+
.Add()
718+
.Name().Value("yt_server", TNodeFlags::Default).Build()
719+
.Value<TCoAtom>().Value(server).Build()
720+
.Build()
721+
.Add()
722+
.Name().Value("yt_table", TNodeFlags::Default).Build()
723+
.Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build()
724+
.Build()
725+
.Add()
726+
.Name().Value("yt_tableName", TNodeFlags::Default).Build()
727+
.Value<TCoAtom>().Value(tmpTable.Name().Value()).Build()
728+
.Build()
729+
.Add()
730+
.Name().Value("yt_tableType", TNodeFlags::Default).Build()
731+
.Value<TCoAtom>().Value(NYT::NodeToYsonString(type)).Build()
732+
.Build()
733+
.Add()
734+
.Name().Value("yt_writeOptions", TNodeFlags::Default).Build()
735+
.Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build()
736+
.Build()
737+
.Add()
738+
.Name().Value("yt_outSpec", TNodeFlags::Default).Build()
739+
.Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build()
740+
.Build()
741+
.Add()
742+
.Name().Value("yt_tx", TNodeFlags::Default).Build()
743+
.Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build()
744+
.Build()
745+
.Done().Ptr();
746+
747+
auto atomType = ctx.MakeType<TUnitExprType>();
748+
749+
for (auto child: settings->Children()) {
750+
child->Child(0)->SetTypeAnn(atomType);
751+
child->Child(0)->SetState(TExprNode::EState::ConstrComplete);
752+
child->Child(1)->SetTypeAnn(atomType);
753+
child->Child(1)->SetState(TExprNode::EState::ConstrComplete);
700754
}
701755

702756
delegatedNode = Build<TPull>(ctx, delegatedNode->Pos())
703757
.Input(std::move(delegatedNode))
704758
.BytesLimit()
705759
.Value(TString())
706-
.Build()
760+
.Build()
707761
.RowsLimit()
708762
.Value(0U)
709-
.Build()
763+
.Build()
710764
.FormatDetails()
711765
.Value(ui32(NYson::EYsonFormat::Binary))
712-
.Build()
713-
.Settings()
714-
.Build()
766+
.Build()
767+
.Settings(settings)
715768
.Format()
716769
.Value(0U)
717-
.Build()
770+
.Build()
718771
.PublicId()
719772
.Value(ToString(State_->Types->TranslateOperationId(input->UniqueId())))
720-
.Build()
773+
.Build()
721774
.Discard()
722775
.Value(ToString(true), TNodeFlags::Default)
723-
.Build()
776+
.Build()
724777
.Origin(input)
725-
.Done()
726-
.Ptr();
727-
728-
auto atomType = ctx.MakeType<TUnitExprType>();
778+
.Done().Ptr();
729779

730780
for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails,
731-
TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) {
781+
TResOrPullBase::idx_Settings, TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) {
732782
delegatedNode->Child(idx)->SetTypeAnn(atomType);
733783
delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete);
734784
}

ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class TYtDataSourceExecTransformer : public TExecTransformerBase {
135135
}
136136

137137
bool hasNonDeterministicFunctions = false;
138-
if (const auto status = PeepHoleOptimizeBeforeExec<true>(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx); status.Level != IGraphTransformer::TStatus::Ok) {
138+
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx); status.Level != IGraphTransformer::TStatus::Ok) {
139139
return SyncStatus(status);
140140
}
141141

ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp

+26-42
Original file line numberDiff line numberDiff line change
@@ -143,25 +143,21 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
143143
.Body<TDqWrite>()
144144
.Input(CloneCompleteFlow(fill.Content().Body().Ptr(), ctx))
145145
.Provider().Value(YtProviderName).Build()
146-
.Settings<TCoNameValueTupleList>()
147-
.Add()
148-
.Name().Value("table").Build()
149-
.Value(fill.Output().Item(0)).Build()
150-
.Build()
151-
.Build()
146+
.Settings<TCoNameValueTupleList>().Build()
152147
.Build()
153-
.Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos()))
154148
.Build()
155-
.Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
149+
.Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos()))
156150
.Build()
157-
.ColumnHints().Build()
151+
.Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
158152
.Build()
159-
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
153+
.ColumnHints().Build()
160154
.Build()
155+
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
156+
.Build()
161157
.Second<TYtFill>()
162158
.InitFrom(fill)
163159
.Settings(NYql::AddSetting(fill.Settings().Ref(), EYtSettingType::NoDq, {}, ctx))
164-
.Build()
160+
.Build()
165161
.Done();
166162
}
167163
}
@@ -248,21 +244,17 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
248244
.Body<TDqWrite>()
249245
.Input(std::move(work))
250246
.Provider().Value(YtProviderName).Build()
251-
.Settings<TCoNameValueTupleList>()
252-
.Add()
253-
.Name().Value("table", TNodeFlags::Default).Build()
254-
.Value(sort.Output().Item(0)).Build()
255-
.Build()
256-
.Build()
247+
.Settings<TCoNameValueTupleList>().Build()
257248
.Build()
258-
.Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, sort.Pos()))
259249
.Build()
260-
.Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
250+
.Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, sort.Pos()))
261251
.Build()
262-
.ColumnHints().Build()
252+
.Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
263253
.Build()
264-
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
254+
.ColumnHints().Build()
265255
.Build()
256+
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
257+
.Build()
266258
.Second(std::move(operation))
267259
.Done();
268260
}
@@ -381,21 +373,17 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
381373
.DataSource<TYtDSource>()
382374
.Category(map.DataSink().Category())
383375
.Cluster(map.DataSink().Cluster())
384-
.Build()
385-
.Input(map.Input())
386376
.Build()
387-
.Settings(std::move(settings))
377+
.Input(map.Input())
388378
.Build()
379+
.Settings(std::move(settings))
389380
.Build()
390381
.Build()
391-
.Provider().Value(YtProviderName).Build()
392-
.Settings<TCoNameValueTupleList>()
393-
.Add()
394-
.Name().Value("table", TNodeFlags::Default).Build()
395-
.Value(map.Output().Item(0)).Build()
396-
.Build()
397382
.Build()
383+
.Provider().Value(YtProviderName).Build()
384+
.Settings<TCoNameValueTupleList>().Build()
398385
.Build()
386+
.Build()
399387
.Settings(TDqStageSettings{.PartitionMode = ordered ? TDqStageSettings::EPartitionMode::Single : TDqStageSettings::EPartitionMode::Default}.BuildNode(ctx, map.Pos()))
400388
.Done();
401389

@@ -599,27 +587,23 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
599587
.SortDirections(std::move(sortDirs))
600588
.SortKeySelectorLambda(std::move(sortKeys))
601589
.ListHandlerLambda(std::move(reducer))
602-
.Build()
603-
.Provider().Value(YtProviderName).Build()
604-
.template Settings<TCoNameValueTupleList>()
605-
.Add()
606-
.Name().Value("table", TNodeFlags::Default).Build()
607-
.Value(reduce.Output().Item(0)).Build()
608-
.Build()
609590
.Build()
591+
.Provider().Value(YtProviderName).Build()
592+
.template Settings<TCoNameValueTupleList>().Build()
610593
.Build()
611-
.Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, reduce.Pos()))
612594
.Build()
613-
.Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
595+
.Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, reduce.Pos()))
614596
.Build()
615-
.ColumnHints().Build()
597+
.Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
616598
.Build()
617-
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
599+
.ColumnHints().Build()
618600
.Build()
601+
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
602+
.Build()
619603
.template Second<TYtOperation>()
620604
.InitFrom(reduce)
621605
.Settings(NYql::AddSetting(reduce.Settings().Ref(), EYtSettingType::NoDq, {}, ctx))
622-
.Build()
606+
.Build()
623607
.Done();
624608
}
625609

0 commit comments

Comments
 (0)