@@ -771,7 +771,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
771
771
772
772
TNodeOnNodeOwnedMap phaseStagesMap;
773
773
TVector<TKqlQueryResult> phaseResults;
774
- TVector<TDqPhyPrecompute > computedInputs;
774
+ TVector<TExprBase > computedInputs;
775
775
TNodeSet computedInputsSet;
776
776
777
777
// Gather all Precompute stages, that are independent of any other stage and form phase of execution
@@ -785,12 +785,92 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
785
785
phaseStagesMap.emplace (raw, ptr);
786
786
}
787
787
788
+ {
789
+ TNodeOnNodeOwnedMap fullPhaseStagesMap;
790
+ for (auto & [_, stagePtr] : phaseStagesMap) {
791
+ VisitExpr (stagePtr,
792
+ [&](const TExprNode::TPtr& node) {
793
+ if (TExprBase (node).Maybe <TDqStage>()) {
794
+ fullPhaseStagesMap[node.Get ()] = node;
795
+ }
796
+ return true ;
797
+ });
798
+ }
799
+ phaseStagesMap.swap (fullPhaseStagesMap);
800
+
801
+ TNodeOnNodeOwnedMap fullDependantMap;
802
+ VisitExpr (query.Ptr (),
803
+ [&](const TExprNode::TPtr& node) {
804
+ if (phaseStagesMap.contains (node.Get ())) {
805
+ return false ;
806
+ }
807
+ if (TExprBase (node).Maybe <TDqStage>()) {
808
+ fullDependantMap[node.Get ()] = node;
809
+ }
810
+ return true ;
811
+ });
812
+ dependantStagesMap.swap (fullDependantMap);
813
+ }
814
+
788
815
if (phaseStagesMap.empty ()) {
789
816
output = query.Ptr ();
790
817
ctx.AddError (TIssue (ctx.GetPosition (query.Pos ()), " Phase stages is empty" ));
791
818
return TStatus::Error;
792
819
}
793
820
821
+ // so that all outputs to dependent stages are precomputes
822
+ {
823
+ TSet<TExprNode*> buildingTxStages;
824
+
825
+ TNodeOnNodeOwnedMap replaces;
826
+
827
+ for (auto & [_, stagePtr] : dependantStagesMap) {
828
+ TDqStage stage (stagePtr);
829
+ for (size_t i = 0 ; i < stage.Inputs ().Size (); ++i) {
830
+ auto input = stage.Inputs ().Item (i);
831
+ if (auto maybeConn = input.Maybe <TDqConnection>()) {
832
+ auto conn = maybeConn.Cast ();
833
+ if (phaseStagesMap.contains (conn.Output ().Stage ().Raw ())) {
834
+ auto oldArg = stage.Program ().Args ().Arg (i);
835
+ auto newArg = Build<TCoArgument>(ctx, stage.Program ().Args ().Arg (i).Pos ())
836
+ .Name (" _replaced_arg" )
837
+ .Done ();
838
+
839
+ TVector<TCoArgument> newArgs;
840
+ TNodeOnNodeOwnedMap programReplaces;
841
+ for (size_t j = 0 ; j < stage.Program ().Args ().Size (); ++j) {
842
+ auto oldArg = stage.Program ().Args ().Arg (j);
843
+ newArgs.push_back (Build<TCoArgument>(ctx, stage.Program ().Args ().Arg (i).Pos ())
844
+ .Name (" _replaced_arg_" + ToString (j))
845
+ .Done ());
846
+ if (i == j) {
847
+ programReplaces[oldArg.Raw ()] = Build<TCoToFlow>(ctx, oldArg.Pos ()).Input (newArgs.back ()).Done ().Ptr ();
848
+ } else {
849
+ programReplaces[oldArg.Raw ()] = newArgs.back ().Ptr ();
850
+ }
851
+ }
852
+
853
+ replaces[stage.Raw ()] =
854
+ Build<TDqStage>(ctx, stage.Pos ())
855
+ .Inputs (ctx.ReplaceNode (stage.Inputs ().Ptr (), input.Ref (), Build<TDqPhyPrecompute>(ctx, input.Pos ()).Connection (conn).Done ().Ptr ()))
856
+ .Outputs (stage.Outputs ())
857
+ .Settings (stage.Settings ())
858
+ .Program ()
859
+ .Args (newArgs)
860
+ .Body (TExprBase (ctx.ReplaceNodes (stage.Program ().Body ().Ptr (), programReplaces)))
861
+ .Build ()
862
+ .Done ().Ptr ();
863
+ }
864
+ }
865
+ }
866
+ }
867
+
868
+ if (!replaces.empty ()) {
869
+ output = ctx.ReplaceNodes (query.Ptr (), replaces);
870
+ return TStatus (TStatus::Repeat, true );
871
+ }
872
+ }
873
+
794
874
for (auto & [_, stagePtr] : dependantStagesMap) {
795
875
TDqStage stage (stagePtr);
796
876
auto precomputes = PrecomputeInputs (stage);
0 commit comments