Skip to content

Commit 69e9e53

Browse files
authored
Merge 6f06a85 into 93067e7
2 parents 93067e7 + 6f06a85 commit 69e9e53

File tree

12 files changed

+150
-15
lines changed

12 files changed

+150
-15
lines changed

ydb/core/kqp/host/kqp_host.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
2121
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
2222
#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
23+
#include <ydb/library/yql/providers/pg/provider/yql_pg_provider_impl.h>
2324
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
2425
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
2526

@@ -1501,6 +1502,14 @@ class TKqpHost : public IKqpHost {
15011502
TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state));
15021503
}
15031504

1505+
void InitPgProvider() {
1506+
auto state = MakeIntrusive<NYql::TPgState>();
1507+
state->Types = TypesCtx.Get();
1508+
1509+
TypesCtx->AddDataSource(NYql::PgProviderName, NYql::CreatePgDataSource(state));
1510+
TypesCtx->AddDataSink(NYql::PgProviderName, NYql::CreatePgDataSink(state));
1511+
}
1512+
15041513
void Init(EKikimrQueryType queryType) {
15051514
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
15061515

@@ -1535,6 +1544,8 @@ class TKqpHost : public IKqpHost {
15351544
InitGenericProvider();
15361545
}
15371546

1547+
InitPgProvider();
1548+
15381549
TypesCtx->UdfResolver = CreateSimpleUdfResolver(FuncRegistry);
15391550
TypesCtx->TimeProvider = TAppData::TimeProvider;
15401551
TypesCtx->RandomProvider = TAppData::RandomProvider;

ydb/core/kqp/host/kqp_translate.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryT
9393
settings.V0ForceDisable = false;
9494
settings.WarnOnV0 = false;
9595
settings.DefaultCluster = cluster;
96-
settings.ClusterMapping = {{cluster, TString(NYql::KikimrProviderName)}};
96+
settings.ClusterMapping = {
97+
{cluster, TString(NYql::KikimrProviderName)},
98+
{"pg_catalog", TString(NYql::PgProviderName)}
99+
};
97100
auto tablePathPrefix = kqpTablePathPrefix;
98101
if (!tablePathPrefix.empty()) {
99102
settings.PathPrefix = tablePathPrefix;

ydb/core/kqp/host/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
ydb/library/yql/providers/generic/provider
3030
ydb/library/yql/providers/result/provider
3131
ydb/library/yql/providers/s3/provider
32+
ydb/library/yql/providers/pg/provider
3233
)
3334

3435
YQL_LAST_ABI_VERSION()

ydb/core/kqp/provider/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ PEERDIR(
4747
ydb/library/yql/providers/common/provider
4848
ydb/library/yql/providers/common/schema/expr
4949
ydb/library/yql/providers/dq/expr_nodes
50+
ydb/library/yql/providers/pg/expr_nodes
5051
ydb/library/yql/providers/result/expr_nodes
5152
ydb/library/yql/providers/result/provider
5253
ydb/library/yql/sql

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+10-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ namespace {
5959
.Done();
6060

6161
astNode.Ptr()->SetTypeAnn(ctx.MakeType<TUnitExprType>());
62-
62+
astNode.Ptr()->SetState(TExprNode::EState::ConstrComplete);
6363
exec.Ptr()->ChildRef(TKiExecDataQuery::idx_Ast) = astNode.Ptr();
6464
}
6565

@@ -603,6 +603,15 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
603603

604604
if (input->Content() == "Result") {
605605
auto result = TMaybeNode<TResult>(input).Cast();
606+
607+
if (auto maybeNth = result.Input().Maybe<TCoNth>()) {
608+
if (auto maybeExecQuery = maybeNth.Tuple().Maybe<TCoRight>().Input().Maybe<TKiExecDataQuery>()) {
609+
input->SetState(TExprNode::EState::ExecutionComplete);
610+
input->SetResult(ctx.NewWorld(input->Pos()));
611+
return SyncOk();
612+
}
613+
}
614+
606615
NKikimrMiniKQL::TType resultType;
607616
TString program;
608617
TStatus status = GetLambdaBody(result.Input().Ptr(), resultType, ctx, program);

ydb/core/kqp/provider/yql_kikimr_opt_build.cpp

+43-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/yql/core/yql_opt_utils.h>
77
#include <ydb/library/yql/utils/log/log.h>
88
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
9+
#include <ydb/library/yql/providers/pg/expr_nodes/yql_pg_expr_nodes.h>
910
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
1011
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
1112

@@ -307,6 +308,16 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
307308
return false;
308309
}
309310

311+
bool IsPgRead(const TExprBase& node, TTypeAnnotationContext& types) {
312+
if (auto maybePgRead = node.Maybe<TPgTableContent>()) {
313+
auto dataSourceProviderIt = types.DataSourceMap.find(NYql::PgProviderName);
314+
if (dataSourceProviderIt != types.DataSourceMap.end()) {
315+
return true;
316+
}
317+
}
318+
return false;
319+
}
320+
310321
bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) {
311322
if (node.Ref().ChildrenSize() <= 1) {
312323
return false;
@@ -383,6 +394,12 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
383394
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
384395
}
385396

397+
if (IsPgRead(node, types)) {
398+
txRes.Ops.insert(node.Raw());
399+
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
400+
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
401+
}
402+
386403
if (auto maybeWrite = node.Maybe<TKiWriteTable>()) {
387404
auto write = maybeWrite.Cast();
388405
if (!checkDataSink(write.DataSink())) {
@@ -930,11 +947,18 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
930947
return ret;
931948
}
932949

933-
TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
950+
TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
951+
if (auto maybeCommit = node.Maybe<TCoCommit>()) {
952+
node = maybeCommit.Cast().World();
953+
}
954+
934955
if (!node.Maybe<TResFill>()) {
935956
return node.Ptr();
936957
}
937958

959+
TKiExecDataQuerySettings execSettings;
960+
execSettings.Mode = KikimrCommitModeFlush(); /*because it is a pure query*/
961+
938962
auto resFill = node.Cast<TResFill>();
939963

940964
if (resFill.DelegatedSource().Value() != KikimrProviderName) {
@@ -957,6 +981,8 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
957981
.Build()
958982
.Operations()
959983
.Build()
984+
.Settings()
985+
.Build()
960986
.Done();
961987

962988
auto exec = Build<TKiExecDataQuery>(ctx, node.Pos())
@@ -968,8 +994,7 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
968994
.QueryBlocks()
969995
.Add({queryBlock})
970996
.Build()
971-
.Settings()
972-
.Build()
997+
.Settings(execSettings.BuildNode(ctx, node.Pos()))
973998
.Ast<TCoVoid>().Build()
974999
.Done();
9751000

@@ -984,7 +1009,21 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
9841009
.Input(exec)
9851010
.Done();
9861011

987-
return ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
1012+
auto newResFill = ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
1013+
auto resCommit = Build<TCoCommit>(ctx, node.Pos())
1014+
.World(newResFill)
1015+
.DataSink<TResultDataSink>()
1016+
.Build()
1017+
.Done();
1018+
1019+
return Build<TCoCommit>(ctx, node.Pos())
1020+
.World(resCommit)
1021+
.DataSink<TKiDataSink>()
1022+
.Category().Build(KikimrProviderName)
1023+
.Cluster().Build(cluster)
1024+
.Build()
1025+
.Settings(execSettings.BuildNode(ctx, node.Pos()))
1026+
.Done().Ptr();
9881027
}
9891028

9901029
TYdbOperation GetTableOp(const TKiWriteTable& write) {

ydb/core/kqp/provider/yql_kikimr_results.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/library/dynumber/dynumber.h>
55
#include <ydb/library/uuid/uuid.h>
66

7+
#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h>
78
#include <ydb/library/yql/providers/common/codec/yql_codec_results.h>
89
#include <ydb/library/yql/public/decimal/yql_decimal.h>
910

@@ -115,6 +116,22 @@ void WriteValueToYson(const TStringStream& stream, NCommon::TYsonResultWriter& w
115116
return;
116117
}
117118

119+
case NKikimrMiniKQL::ETypeKind::Pg:
120+
{
121+
if (value.HasBytes()) {
122+
auto convert = NKikimr::NPg::PgNativeTextFromNativeBinary(
123+
value.GetBytes(), NKikimr::NPg::TypeDescFromPgTypeId(type.GetPg().Getoid())
124+
);
125+
YQL_ENSURE(!convert.Error, "Failed to convert pg value to text: " << *convert.Error);
126+
writer.OnStringScalar(convert.Str);
127+
}
128+
129+
if (value.HasText()) {
130+
writer.OnStringScalar(value.GetText());
131+
}
132+
return;
133+
}
134+
118135
case NKikimrMiniKQL::ETypeKind::Optional:
119136
if (!value.HasOptional()) {
120137
writer.OnEntity();

ydb/core/kqp/ut/common/kqp_ut_common.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
457457
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NActors::NLog::PRI_DEBUG);
458458
// Server->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
459459
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_TRACE);
460-
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_PROXY, NActors::NLog::PRI_DEBUG);
460+
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_PROXY, NActors::NLog::PRI_TRACE);
461461
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NActors::NLog::PRI_DEBUG);
462462
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NActors::NLog::PRI_TRACE);
463463
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_REQUEST, NActors::NLog::PRI_DEBUG);
@@ -467,7 +467,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
467467
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_NODE, NActors::NLog::PRI_DEBUG);
468468
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
469469
// Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_INFO);
470-
470+
// Server->GetRuntime()->SetLogPriority(NKikimrServices::LOCAL_PGWIRE, NActors::NLog::PRI_DEBUG);
471471
RunCall([this, domain = settings.DomainRoot]{
472472
this->Client->InitRootScheme(domain);
473473
return true;

ydb/core/kqp/ut/pg/pg_catalog_ut.cpp

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
2+
#include <library/cpp/testing/unittest/registar.h>
3+
4+
namespace NKikimr {
5+
namespace NKqp {
6+
7+
using namespace NYdb;
8+
using namespace NYdb::NTable;
9+
10+
Y_UNIT_TEST_SUITE(PgCatalog) {
11+
Y_UNIT_TEST(PgType) {
12+
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
13+
auto db = kikimr.GetQueryClient();
14+
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
15+
{
16+
auto result = db.ExecuteQuery(R"(
17+
select typname from pg_catalog.pg_type order by oid
18+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
19+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
20+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
21+
CompareYson(R"([
22+
["bool"];["bytea"];["char"];["name"];["int8"];["int2"];["int2vector"];["int4"];
23+
["regproc"];["text"];["oid"];["tid"];["xid"];["cid"];["oidvector"];["pg_ddl_command"];
24+
["pg_type"];["pg_attribute"];["pg_proc"];["pg_class"];["json"];["xml"];["pg_node_tree"];
25+
["table_am_handler"];["index_am_handler"];["point"];["lseg"];["path"];["box"];["polygon"];
26+
["line"];["cidr"];["float4"];["float8"];["unknown"];["circle"];["macaddr8"];["money"];
27+
["macaddr"];["inet"];["aclitem"];["bpchar"];["varchar"];["date"];["time"];["timestamp"];
28+
["timestamptz"];["interval"];["timetz"];["bit"];["varbit"];["numeric"];["refcursor"];
29+
["regprocedure"];["regoper"];["regoperator"];["regclass"];["regtype"];["record"];["cstring"];
30+
["any"];["anyarray"];["void"];["trigger"];["language_handler"];["internal"];["anyelement"];
31+
["_record"];["anynonarray"];["uuid"];["txid_snapshot"];["fdw_handler"];["pg_lsn"];["tsm_handler"];
32+
["pg_ndistinct"];["pg_dependencies"];["anyenum"];["tsvector"];["tsquery"];["gtsvector"];
33+
["regconfig"];["regdictionary"];["jsonb"];["anyrange"];["event_trigger"];["int4range"];["numrange"];
34+
["tsrange"];["tstzrange"];["daterange"];["int8range"];["jsonpath"];["regnamespace"];["regrole"];
35+
["regcollation"];["int4multirange"];["nummultirange"];["tsmultirange"];["tstzmultirange"];
36+
["datemultirange"];["int8multirange"];["anymultirange"];["anycompatiblemultirange"];
37+
["pg_brin_bloom_summary"];["pg_brin_minmax_multi_summary"];["pg_mcv_list"];["pg_snapshot"];["xid8"];
38+
["anycompatible"];["anycompatiblearray"];["anycompatiblenonarray"];["anycompatiblerange"]
39+
])", FormatResultSetYson(result.GetResultSet(0)));
40+
}
41+
}
42+
}
43+
44+
} // namespace NKqp
45+
} // namespace NKikimr

ydb/core/kqp/ut/pg/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SIZE(MEDIUM)
66

77
SRCS(
88
kqp_pg_ut.cpp
9+
pg_catalog_ut.cpp
910
)
1011

1112
PEERDIR(

ydb/library/yql/providers/pg/provider/yql_pg_datasource.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,4 @@ TIntrusivePtr<IDataProvider> CreatePgDataSource(TPgState::TPtr state) {
125125
return MakeIntrusive<TPgDataSourceImpl>(state);
126126
}
127127

128-
}
128+
}

ydb/library/yql/sql/pg/pg_sql.cpp

+14-6
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,15 @@ class TConverter : public IPGParseEvents {
294294
}
295295

296296
for (const auto& [cluster, provider] : Settings.ClusterMapping) {
297-
Provider = provider;
298-
break;
297+
if (provider != PgProviderName) {
298+
Provider = provider;
299+
break;
300+
}
299301
}
302+
if (!Provider) {
303+
Provider = PgProviderName;
304+
}
305+
Y_ENSURE(!Provider.Empty());
300306

301307
for (size_t i = 0; i < Settings.PgParameterTypeOids.size(); ++i) {
302308
const auto paramName = PREPARED_PARAM_PREFIX + ToString(i + 1);
@@ -1774,7 +1780,7 @@ class TConverter : public IPGParseEvents {
17741780
}
17751781

17761782
if (cinfo.NotNull) {
1777-
constraints.push_back(QL(QA("not_null")));
1783+
constraints.push_back(QL(QA("not_null")));
17781784
}
17791785

17801786
if (cinfo.Default) {
@@ -2547,9 +2553,11 @@ class TConverter : public IPGParseEvents {
25472553
}
25482554

25492555
TAstNode* BuildTableKeyExpression(const TStringBuf relname,
2550-
bool isScheme = false) {
2556+
const TStringBuf cluster, bool isScheme = false
2557+
) {
2558+
TString tableName = (cluster == "pg_catalog") ? TString(relname) : TablePathPrefix + relname;
25512559
return L(A("Key"), QL(QA(isScheme ? "tablescheme" : "table"),
2552-
L(A("String"), QAX(TablePathPrefix + relname))));
2560+
L(A("String"), QAX(std::move(tableName)))));
25532561
}
25542562

25552563
TReadWriteKeyExprs ParseQualifiedRelationName(const TStringBuf catalogname,
@@ -2567,7 +2575,7 @@ class TConverter : public IPGParseEvents {
25672575

25682576
const auto cluster = ResolveCluster(schemaname);
25692577
const auto sinkOrSource = BuildClusterSinkOrSourceExpression(isSink, cluster);
2570-
const auto key = BuildTableKeyExpression(relname, isScheme);
2578+
const auto key = BuildTableKeyExpression(relname, cluster, isScheme);
25712579
return {sinkOrSource, key};
25722580
}
25732581

0 commit comments

Comments
 (0)