Skip to content

Commit 548cbba

Browse files
committed
KIKIMR-20539: pure pg_catalog selects
1 parent 4366d88 commit 548cbba

File tree

13 files changed

+142
-17
lines changed

13 files changed

+142
-17
lines changed

ydb/core/kqp/host/kqp_host.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
2121
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
2222
#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
23+
#include <ydb/library/yql/providers/pg/provider/yql_pg_provider_impl.h>
2324
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
2425
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
2526

@@ -1501,6 +1502,14 @@ class TKqpHost : public IKqpHost {
15011502
TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state));
15021503
}
15031504

1505+
void InitPgProvider() {
1506+
auto state = MakeIntrusive<NYql::TPgState>();
1507+
state->Types = TypesCtx.Get();
1508+
1509+
TypesCtx->AddDataSource(NYql::PgProviderName, NYql::CreatePgDataSource(state));
1510+
TypesCtx->AddDataSink(NYql::PgProviderName, NYql::CreatePgDataSink(state));
1511+
}
1512+
15041513
void Init(EKikimrQueryType queryType) {
15051514
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
15061515

@@ -1535,6 +1544,8 @@ class TKqpHost : public IKqpHost {
15351544
InitGenericProvider();
15361545
}
15371546

1547+
InitPgProvider();
1548+
15381549
TypesCtx->UdfResolver = CreateSimpleUdfResolver(FuncRegistry);
15391550
TypesCtx->TimeProvider = TAppData::TimeProvider;
15401551
TypesCtx->RandomProvider = TAppData::RandomProvider;

ydb/core/kqp/host/kqp_translate.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryT
9393
settings.V0ForceDisable = false;
9494
settings.WarnOnV0 = false;
9595
settings.DefaultCluster = cluster;
96-
settings.ClusterMapping = {{cluster, TString(NYql::KikimrProviderName)}};
96+
settings.ClusterMapping = {
97+
{cluster, TString(NYql::KikimrProviderName)},
98+
{"pg_catalog", TString(NYql::PgProviderName)}
99+
};
97100
auto tablePathPrefix = kqpTablePathPrefix;
98101
if (!tablePathPrefix.empty()) {
99102
settings.PathPrefix = tablePathPrefix;

ydb/core/kqp/host/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
ydb/library/yql/providers/generic/provider
3030
ydb/library/yql/providers/result/provider
3131
ydb/library/yql/providers/s3/provider
32+
ydb/library/yql/providers/pg/provider
3233
)
3334

3435
YQL_LAST_ABI_VERSION()

ydb/core/kqp/provider/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ PEERDIR(
4646
ydb/library/yql/providers/common/provider
4747
ydb/library/yql/providers/common/schema/expr
4848
ydb/library/yql/providers/dq/expr_nodes
49+
ydb/library/yql/providers/pg/expr_nodes
4950
ydb/library/yql/providers/result/expr_nodes
5051
ydb/library/yql/providers/result/provider
5152
ydb/library/yql/sql/settings

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ namespace {
5959
.Done();
6060

6161
astNode.Ptr()->SetTypeAnn(ctx.MakeType<TUnitExprType>());
62+
// this node remains in TypeComplete stage, so YQL pipeline fails, as it can not
63+
// retrieve constraints for this empty atom.
6264

65+
// astNode.Ptr()->SetState(TExprNode::EState::ConstrComplete);
66+
67+
// if i set ConstrComplete manually, KiExecDataQuery! passes all optimizers and gets to the mkql compiler.
68+
// this means that there is an optimizer that does not work, when it should.
6369
exec.Ptr()->ChildRef(TKiExecDataQuery::idx_Ast) = astNode.Ptr();
6470
}
6571

ydb/core/kqp/provider/yql_kikimr_opt_build.cpp

+43-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/yql/core/yql_opt_utils.h>
77
#include <ydb/library/yql/utils/log/log.h>
88
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
9+
#include <ydb/library/yql/providers/pg/expr_nodes/yql_pg_expr_nodes.h>
910
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
1011
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
1112

@@ -307,6 +308,16 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
307308
return false;
308309
}
309310

311+
bool IsPgRead(const TExprBase& node, TTypeAnnotationContext& types) {
312+
if (auto maybePgRead = node.Maybe<TPgTableContent>()) {
313+
auto dataSourceProviderIt = types.DataSourceMap.find(NYql::PgProviderName);
314+
if (dataSourceProviderIt != types.DataSourceMap.end()) {
315+
return true;
316+
}
317+
}
318+
return false;
319+
}
320+
310321
bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) {
311322
if (node.Ref().ChildrenSize() <= 1) {
312323
return false;
@@ -383,6 +394,12 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
383394
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
384395
}
385396

397+
if (IsPgRead(node, types)) {
398+
txRes.Ops.insert(node.Raw());
399+
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
400+
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
401+
}
402+
386403
if (auto maybeWrite = node.Maybe<TKiWriteTable>()) {
387404
auto write = maybeWrite.Cast();
388405
if (!checkDataSink(write.DataSink())) {
@@ -930,11 +947,18 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
930947
return ret;
931948
}
932949

933-
TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
950+
TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
951+
if (auto maybeCommit = node.Maybe<TCoCommit>()) {
952+
node = maybeCommit.Cast().World();
953+
}
954+
934955
if (!node.Maybe<TResFill>()) {
935956
return node.Ptr();
936957
}
937958

959+
TKiExecDataQuerySettings execSettings;
960+
execSettings.Mode = KikimrCommitModeFlush(); /*because it is a pure query*/
961+
938962
auto resFill = node.Cast<TResFill>();
939963

940964
if (resFill.DelegatedSource().Value() != KikimrProviderName) {
@@ -957,6 +981,8 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
957981
.Build()
958982
.Operations()
959983
.Build()
984+
.Settings()
985+
.Build()
960986
.Done();
961987

962988
auto exec = Build<TKiExecDataQuery>(ctx, node.Pos())
@@ -968,8 +994,7 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
968994
.QueryBlocks()
969995
.Add({queryBlock})
970996
.Build()
971-
.Settings()
972-
.Build()
997+
.Settings(execSettings.BuildNode(ctx, node.Pos()))
973998
.Ast<TCoVoid>().Build()
974999
.Done();
9751000

@@ -984,7 +1009,21 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
9841009
.Input(exec)
9851010
.Done();
9861011

987-
return ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
1012+
auto newResFill = ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
1013+
auto resCommit = Build<TCoCommit>(ctx, node.Pos())
1014+
.World(newResFill)
1015+
.DataSink<TResultDataSink>()
1016+
.Build()
1017+
.Done();
1018+
1019+
return Build<TCoCommit>(ctx, node.Pos())
1020+
.World(resCommit)
1021+
.DataSink<TKiDataSink>()
1022+
.Category().Build(KikimrProviderName)
1023+
.Cluster().Build(cluster)
1024+
.Build()
1025+
.Settings(execSettings.BuildNode(ctx, node.Pos()))
1026+
.Done().Ptr();
9881027
}
9891028

9901029
TYdbOperation GetTableOp(const TKiWriteTable& write) {

ydb/core/kqp/provider/yql_kikimr_results.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,19 @@ void WriteValueToYson(const TStringStream& stream, NCommon::TYsonResultWriter& w
115115
return;
116116
}
117117

118+
// I am unsure whether the select from pg_catalog should enter this branch.
119+
case NKikimrMiniKQL::ETypeKind::Pg:
120+
{
121+
if (value.HasBytes()) {
122+
writer.OnStringScalar(value.GetBytes());
123+
}
124+
125+
if (value.HasText()) {
126+
writer.OnStringScalar(value.GetText());
127+
}
128+
return;
129+
}
130+
118131
case NKikimrMiniKQL::ETypeKind::Optional:
119132
if (!value.HasOptional()) {
120133
writer.OnEntity();

ydb/core/kqp/ut/common/kqp_ut_common.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ void TKikimrRunner::CreateSampleTables() {
442442

443443
void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
444444
// Server->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
445-
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_DEBUG);
445+
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_TRACE);
446446
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_INFO);
447447
// Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);
448448
// Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG);
@@ -451,11 +451,11 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
451451
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_TRACE);
452452
// Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG);
453453
// Server->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG);
454-
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_WORKER, NActors::NLog::PRI_DEBUG);
455-
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NActors::NLog::PRI_DEBUG);
454+
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_WORKER, NActors::NLog::PRI_TRACE);
455+
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NActors::NLog::PRI_TRACE);
456456
// Server->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
457457
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_TRACE);
458-
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_PROXY, NActors::NLog::PRI_DEBUG);
458+
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_PROXY, NActors::NLog::PRI_TRACE);
459459
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NActors::NLog::PRI_DEBUG);
460460
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NActors::NLog::PRI_DEBUG);
461461
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_REQUEST, NActors::NLog::PRI_DEBUG);
@@ -464,7 +464,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
464464
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG);
465465
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_NODE, NActors::NLog::PRI_DEBUG);
466466
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
467-
467+
// Server->GetRuntime()->SetLogPriority(NKikimrServices::LOCAL_PGWIRE, NActors::NLog::PRI_DEBUG);
468468
RunCall([this, domain = settings.DomainRoot]{
469469
this->Client->InitRootScheme(domain);
470470
return true;

ydb/core/kqp/ut/pg/pg_catalog_ut.cpp

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
2+
#include <library/cpp/testing/unittest/registar.h>
3+
4+
namespace NKikimr {
5+
namespace NKqp {
6+
7+
using namespace NYdb;
8+
using namespace NYdb::NTable;
9+
10+
Y_UNIT_TEST_SUITE(PgCatalog) {
11+
Y_UNIT_TEST(PgType) {
12+
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
13+
auto db = kikimr.GetQueryClient();
14+
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
15+
{
16+
auto result = db.ExecuteQuery(R"(
17+
select 1
18+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
19+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
20+
21+
result = db.ExecuteQuery(R"(
22+
select typname from pg_catalog.pg_type
23+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
24+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
25+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
26+
UNIT_ASSERT_STRINGS_UNEQUAL(R"([
27+
["1";"one"];
28+
["2";"two"];
29+
["3";"three"]
30+
])", FormatResultSetYson(result.GetResultSet(0)));
31+
}
32+
}
33+
}
34+
35+
} // namespace NKqp
36+
} // namespace NKikimr

ydb/core/kqp/ut/pg/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SIZE(MEDIUM)
66

77
SRCS(
88
kqp_pg_ut.cpp
9+
pg_catalog_ut.cpp
910
)
1011

1112
PEERDIR(

ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -2683,6 +2683,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
26832683
return ctx.ProgramBuilder.PgClone(input, dependentNodes);
26842684
});
26852685

2686+
26862687
AddCallable("PgTableContent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
26872688
auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
26882689
return ctx.ProgramBuilder.PgTableContent(

ydb/library/yql/providers/pg/provider/yql_pg_datasource.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,4 @@ TIntrusivePtr<IDataProvider> CreatePgDataSource(TPgState::TPtr state) {
125125
return MakeIntrusive<TPgDataSourceImpl>(state);
126126
}
127127

128-
}
128+
}

ydb/library/yql/sql/pg/pg_sql.cpp

+19-6
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,20 @@ class TConverter : public IPGParseEvents {
294294
}
295295

296296
for (const auto& [cluster, provider] : Settings.ClusterMapping) {
297-
Provider = provider;
298-
break;
297+
if (provider == KikimrProviderName) {
298+
Provider = provider;
299+
break;
300+
}
301+
if (provider == YtProviderName) {
302+
Provider = provider;
303+
break;
304+
}
305+
if (provider == S3ProviderName) {
306+
Provider = provider;
307+
break;
308+
}
299309
}
310+
Y_ENSURE(!Provider.Empty());
300311

301312
for (size_t i = 0; i < Settings.PgParameterTypeOids.size(); ++i) {
302313
const auto paramName = PREPARED_PARAM_PREFIX + ToString(i + 1);
@@ -2424,10 +2435,12 @@ class TConverter : public IPGParseEvents {
24242435
return L(isSink ? A("DataSink") : A("DataSource"), QAX(*p), QAX(schemaname.Data()));
24252436
}
24262437

2427-
TAstNode* BuildTableKeyExpression(const TStringBuf relname,
2428-
bool isScheme = false) {
2438+
TAstNode* BuildTableKeyExpression(const TStringBuf relname,
2439+
const TStringBuf cluster, bool isScheme = false
2440+
) {
2441+
TString tableName = (cluster == "pg_catalog") ? TString(relname) : TablePathPrefix + relname;
24292442
return L(A("Key"), QL(QA(isScheme ? "tablescheme" : "table"),
2430-
L(A("String"), QAX(TablePathPrefix + relname))));
2443+
L(A("String"), QAX(std::move(tableName)))));
24312444
}
24322445

24332446
TReadWriteKeyExprs ParseQualifiedRelationName(const TStringBuf catalogname,
@@ -2445,7 +2458,7 @@ class TConverter : public IPGParseEvents {
24452458

24462459
const auto cluster = !schemaname.Empty() && schemaname != "public" ? schemaname : Settings.DefaultCluster;
24472460
const auto sinkOrSource = BuildClusterSinkOrSourceExpression(isSink, cluster);
2448-
const auto key = BuildTableKeyExpression(relname, isScheme);
2461+
const auto key = BuildTableKeyExpression(relname, cluster, isScheme);
24492462
return {sinkOrSource, key};
24502463
}
24512464

0 commit comments

Comments
 (0)