6
6
#include < ydb/library/yql/core/yql_opt_utils.h>
7
7
#include < ydb/library/yql/utils/log/log.h>
8
8
#include < ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
9
+ #include < ydb/library/yql/providers/pg/expr_nodes/yql_pg_expr_nodes.h>
9
10
#include < ydb/library/yql/dq/integration/yql_dq_integration.h>
10
11
#include < ydb/library/yql/providers/dq/common/yql_dq_settings.h>
11
12
@@ -307,6 +308,16 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
307
308
return false ;
308
309
}
309
310
311
+ bool IsPgRead (const TExprBase& node, TTypeAnnotationContext& types) {
312
+ if (auto maybePgRead = node.Maybe <TPgTableContent>()) {
313
+ auto dataSourceProviderIt = types.DataSourceMap .find (NYql::PgProviderName);
314
+ if (dataSourceProviderIt != types.DataSourceMap .end ()) {
315
+ return true ;
316
+ }
317
+ }
318
+ return false ;
319
+ }
320
+
310
321
bool IsDqWrite (const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) {
311
322
if (node.Ref ().ChildrenSize () <= 1 ) {
312
323
return false ;
@@ -383,6 +394,12 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
383
394
return ExploreTx (TExprBase (worldChild), ctx, dataSink, txRes, tablesData, types);
384
395
}
385
396
397
+ if (IsPgRead (node, types)) {
398
+ txRes.Ops .insert (node.Raw ());
399
+ TExprNode::TPtr worldChild = node.Raw ()->ChildPtr (0 );
400
+ return ExploreTx (TExprBase (worldChild), ctx, dataSink, txRes, tablesData, types);
401
+ }
402
+
386
403
if (auto maybeWrite = node.Maybe <TKiWriteTable>()) {
387
404
auto write = maybeWrite.Cast ();
388
405
if (!checkDataSink (write .DataSink ())) {
@@ -930,11 +947,18 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
930
947
return ret;
931
948
}
932
949
933
- TExprNode::TPtr KiBuildResult (TExprBase node, const TString& cluster, TExprContext& ctx) {
950
+ TExprNode::TPtr KiBuildResult (TExprBase node, const TString& cluster, TExprContext& ctx) {
951
+ if (auto maybeCommit = node.Maybe <TCoCommit>()) {
952
+ node = maybeCommit.Cast ().World ();
953
+ }
954
+
934
955
if (!node.Maybe <TResFill>()) {
935
956
return node.Ptr ();
936
957
}
937
958
959
+ TKiExecDataQuerySettings execSettings;
960
+ execSettings.Mode = KikimrCommitModeFlush (); /* because it is a pure query*/
961
+
938
962
auto resFill = node.Cast <TResFill>();
939
963
940
964
if (resFill.DelegatedSource ().Value () != KikimrProviderName) {
@@ -957,6 +981,8 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
957
981
.Build ()
958
982
.Operations ()
959
983
.Build ()
984
+ .Settings ()
985
+ .Build ()
960
986
.Done ();
961
987
962
988
auto exec = Build<TKiExecDataQuery>(ctx, node.Pos ())
@@ -968,8 +994,7 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
968
994
.QueryBlocks ()
969
995
.Add ({queryBlock})
970
996
.Build ()
971
- .Settings ()
972
- .Build ()
997
+ .Settings (execSettings.BuildNode (ctx, node.Pos ()))
973
998
.Ast <TCoVoid>().Build ()
974
999
.Done ();
975
1000
@@ -984,7 +1009,21 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
984
1009
.Input (exec)
985
1010
.Done ();
986
1011
987
- return ctx.ChangeChild (*ctx.ChangeChild (resFill.Ref (), 0 , world.Ptr ()), 3 , data.Ptr ());
1012
+ auto newResFill = ctx.ChangeChild (*ctx.ChangeChild (resFill.Ref (), 0 , world.Ptr ()), 3 , data.Ptr ());
1013
+ auto resCommit = Build<TCoCommit>(ctx, node.Pos ())
1014
+ .World (newResFill)
1015
+ .DataSink <TResultDataSink>()
1016
+ .Build ()
1017
+ .Done ();
1018
+
1019
+ return Build<TCoCommit>(ctx, node.Pos ())
1020
+ .World (resCommit)
1021
+ .DataSink <TKiDataSink>()
1022
+ .Category ().Build (KikimrProviderName)
1023
+ .Cluster ().Build (cluster)
1024
+ .Build ()
1025
+ .Settings (execSettings.BuildNode (ctx, node.Pos ()))
1026
+ .Done ().Ptr ();
988
1027
}
989
1028
990
1029
TYdbOperation GetTableOp (const TKiWriteTable& write) {
0 commit comments