@@ -560,16 +560,20 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
560
560
}
561
561
562
562
if (!query.Effects ().Empty ()) {
563
- auto tx = BuildTx (query.Effects ().Ptr (), ctx, /* isPrecompute */ false );
564
- if (!tx) {
565
- return TStatus::Error;
566
- }
563
+ auto collectedEffects = CollectEffects (query.Effects (), ctx);
567
564
568
- if (!CheckEffectsTx (tx.Cast (), query, ctx)) {
569
- return TStatus::Error;
570
- }
565
+ for (auto & effects : collectedEffects) {
566
+ auto tx = BuildTx (effects.Ptr (), ctx, /* isPrecompute */ false );
567
+ if (!tx) {
568
+ return TStatus::Error;
569
+ }
571
570
572
- BuildCtx->PhysicalTxs .emplace_back (tx.Cast ());
571
+ if (!CheckEffectsTx (tx.Cast (), effects, ctx)) {
572
+ return TStatus::Error;
573
+ }
574
+
575
+ BuildCtx->PhysicalTxs .emplace_back (tx.Cast ());
576
+ }
573
577
}
574
578
575
579
return TStatus::Ok;
@@ -581,8 +585,86 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
581
585
}
582
586
583
587
private:
584
- bool HasTableEffects (const TKqlQuery& query) const {
585
- for (const TExprBase& effect : query.Effects ()) {
588
+ TVector<TExprList> CollectEffects (const TExprList& list, TExprContext& ctx) {
589
+ struct TEffectsInfo {
590
+ enum class EType {
591
+ KQP_EFFECT,
592
+ KQP_SINK,
593
+ EXTERNAL_SINK,
594
+ };
595
+
596
+ EType Type;
597
+ THashSet<TStringBuf> TablesPathIds;
598
+ TVector<TExprNode::TPtr> Exprs;
599
+ };
600
+ TVector<TEffectsInfo> effectsInfos;
601
+
602
+ for (const auto & expr : list) {
603
+ if (auto sinkEffect = expr.Maybe <TKqpSinkEffect>()) {
604
+ const size_t sinkIndex = FromString (TStringBuf (sinkEffect.Cast ().SinkIndex ()));
605
+ const auto stage = sinkEffect.Cast ().Stage ().Maybe <TDqStageBase>();
606
+ YQL_ENSURE (stage);
607
+ YQL_ENSURE (stage.Cast ().Outputs ());
608
+ const auto outputs = stage.Cast ().Outputs ().Cast ();
609
+ YQL_ENSURE (sinkIndex < outputs.Size ());
610
+ const auto sink = outputs.Item (sinkIndex).Maybe <TDqSink>();
611
+ YQL_ENSURE (sink);
612
+
613
+ const auto sinkSettings = sink.Cast ().Settings ().Maybe <TKqpTableSinkSettings>();
614
+ if (!sinkSettings) {
615
+ // External writes always use their own physical transaction.
616
+ effectsInfos.emplace_back ();
617
+ effectsInfos.back ().Type = TEffectsInfo::EType::EXTERNAL_SINK;
618
+ effectsInfos.back ().Exprs .push_back (expr.Ptr ());
619
+ } else {
620
+ // Two table sinks can't be executed in one physical transaction if they write into one table.
621
+ const TStringBuf tablePathId = sinkSettings.Cast ().Table ().PathId ().Value ();
622
+
623
+ auto it = std::find_if (
624
+ std::begin (effectsInfos),
625
+ std::end (effectsInfos),
626
+ [&tablePathId](const auto & effectsInfo) {
627
+ return effectsInfo.Type == TEffectsInfo::EType::KQP_SINK
628
+ && !effectsInfo.TablesPathIds .contains (tablePathId);
629
+ });
630
+ if (it == std::end (effectsInfos)) {
631
+ effectsInfos.emplace_back ();
632
+ it = std::prev (std::end (effectsInfos));
633
+ it->Type = TEffectsInfo::EType::KQP_SINK;
634
+ }
635
+ it->TablesPathIds .insert (tablePathId);
636
+ it->Exprs .push_back (expr.Ptr ());
637
+ }
638
+ } else {
639
+ // Table effects are executed all in one physical transaction.
640
+ auto it = std::find_if (
641
+ std::begin (effectsInfos),
642
+ std::end (effectsInfos),
643
+ [](const auto & effectsInfo) { return effectsInfo.Type == TEffectsInfo::EType::KQP_EFFECT; });
644
+ if (it == std::end (effectsInfos)) {
645
+ effectsInfos.emplace_back ();
646
+ it = std::prev (std::end (effectsInfos));
647
+ it->Type = TEffectsInfo::EType::KQP_EFFECT;
648
+ }
649
+ it->Exprs .push_back (expr.Ptr ());
650
+ }
651
+ }
652
+
653
+ TVector<TExprList> results;
654
+
655
+ for (const auto & effects : effectsInfos) {
656
+ auto builder = Build<TExprList>(ctx, list.Pos ());
657
+ for (const auto & expr : effects.Exprs ) {
658
+ builder.Add (expr);
659
+ }
660
+ results.push_back (builder.Done ());
661
+ }
662
+
663
+ return results;
664
+ }
665
+
666
+ bool HasTableEffects (const TExprList& effectsList) const {
667
+ for (const TExprBase& effect : effectsList) {
586
668
if (auto maybeSinkEffect = effect.Maybe <TKqpSinkEffect>()) {
587
669
// (KqpSinkEffect (DqStage (... ((DqSink '0 (DataSink '"kikimr") ...)))) '0)
588
670
auto sinkEffect = maybeSinkEffect.Cast ();
@@ -608,7 +690,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
608
690
return false ;
609
691
}
610
692
611
- bool CheckEffectsTx (TKqpPhysicalTx tx, const TKqlQuery& query , TExprContext& ctx) const {
693
+ bool CheckEffectsTx (TKqpPhysicalTx tx, const TExprList& effectsList , TExprContext& ctx) const {
612
694
TMaybeNode<TExprBase> blackistedNode;
613
695
VisitExpr (tx.Ptr (), [&blackistedNode](const TExprNode::TPtr& exprNode) {
614
696
if (blackistedNode) {
@@ -630,7 +712,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
630
712
return true ;
631
713
});
632
714
633
- if (blackistedNode && HasTableEffects (query )) {
715
+ if (blackistedNode && HasTableEffects (effectsList )) {
634
716
ctx.AddError (TIssue (ctx.GetPosition (blackistedNode.Cast ().Pos ()), TStringBuilder ()
635
717
<< " Callable not expected in effects tx: " << blackistedNode.Cast <TCallable>().CallableName ()));
636
718
return false ;
0 commit comments