Skip to content

KIKIMR-20539: pure pg_catalog selects #950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
#include <ydb/library/yql/providers/pg/provider/yql_pg_provider_impl.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>

Expand Down Expand Up @@ -1501,6 +1502,14 @@ class TKqpHost : public IKqpHost {
TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state));
}

void InitPgProvider() {
auto state = MakeIntrusive<NYql::TPgState>();
state->Types = TypesCtx.Get();

TypesCtx->AddDataSource(NYql::PgProviderName, NYql::CreatePgDataSource(state));
TypesCtx->AddDataSink(NYql::PgProviderName, NYql::CreatePgDataSink(state));
}

void Init(EKikimrQueryType queryType) {
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);

Expand Down Expand Up @@ -1535,6 +1544,8 @@ class TKqpHost : public IKqpHost {
InitGenericProvider();
}

InitPgProvider();

TypesCtx->UdfResolver = CreateSimpleUdfResolver(FuncRegistry);
TypesCtx->TimeProvider = TAppData::TimeProvider;
TypesCtx->RandomProvider = TAppData::RandomProvider;
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/host/kqp_translate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryT
settings.V0ForceDisable = false;
settings.WarnOnV0 = false;
settings.DefaultCluster = cluster;
settings.ClusterMapping = {{cluster, TString(NYql::KikimrProviderName)}};
settings.ClusterMapping = {
{cluster, TString(NYql::KikimrProviderName)},
{"pg_catalog", TString(NYql::PgProviderName)}
};
auto tablePathPrefix = kqpTablePathPrefix;
if (!tablePathPrefix.empty()) {
settings.PathPrefix = tablePathPrefix;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ PEERDIR(
ydb/library/yql/providers/generic/provider
ydb/library/yql/providers/result/provider
ydb/library/yql/providers/s3/provider
ydb/library/yql/providers/pg/provider
)

YQL_LAST_ABI_VERSION()
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ PEERDIR(
ydb/library/yql/providers/common/provider
ydb/library/yql/providers/common/schema/expr
ydb/library/yql/providers/dq/expr_nodes
ydb/library/yql/providers/pg/expr_nodes
ydb/library/yql/providers/result/expr_nodes
ydb/library/yql/providers/result/provider
ydb/library/yql/sql
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace {
.Done();

astNode.Ptr()->SetTypeAnn(ctx.MakeType<TUnitExprType>());

astNode.Ptr()->SetState(TExprNode::EState::ConstrComplete);
exec.Ptr()->ChildRef(TKiExecDataQuery::idx_Ast) = astNode.Ptr();
}

Expand Down Expand Up @@ -605,6 +605,15 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

if (input->Content() == "Result") {
auto result = TMaybeNode<TResult>(input).Cast();

if (auto maybeNth = result.Input().Maybe<TCoNth>()) {
if (auto maybeExecQuery = maybeNth.Tuple().Maybe<TCoRight>().Input().Maybe<TKiExecDataQuery>()) {
input->SetState(TExprNode::EState::ExecutionComplete);
input->SetResult(ctx.NewWorld(input->Pos()));
return SyncOk();
}
}

NKikimrMiniKQL::TType resultType;
TString program;
TStatus status = GetLambdaBody(result.Input().Ptr(), resultType, ctx, program);
Expand Down
52 changes: 47 additions & 5 deletions ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
#include <ydb/library/yql/providers/pg/expr_nodes/yql_pg_expr_nodes.h>
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

Expand Down Expand Up @@ -307,6 +308,16 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
return false;
}

bool IsPgRead(const TExprBase& node, TTypeAnnotationContext& types) {
if (auto maybePgRead = node.Maybe<TPgTableContent>()) {
auto dataSourceProviderIt = types.DataSourceMap.find(NYql::PgProviderName);
if (dataSourceProviderIt != types.DataSourceMap.end()) {
return true;
}
}
return false;
}

bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) {
if (node.Ref().ChildrenSize() <= 1) {
return false;
Expand Down Expand Up @@ -383,6 +394,12 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
}

if (IsPgRead(node, types)) {
txRes.Ops.insert(node.Raw());
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
}

if (auto maybeWrite = node.Maybe<TKiWriteTable>()) {
auto write = maybeWrite.Cast();
if (!checkDataSink(write.DataSink())) {
Expand Down Expand Up @@ -930,11 +947,21 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
return ret;
}

TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
if (!node.Maybe<TResFill>()) {
TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
if (auto maybeCommit = node.Maybe<TCoCommit>()) {
auto world = maybeCommit.Cast().World();
if (!world.Maybe<TResFill>()) {
return node.Ptr();
} else {
node = world;
}
} else {
return node.Ptr();
}

TKiExecDataQuerySettings execSettings;
execSettings.Mode = KikimrCommitModeFlush(); /*because it is a pure query*/

auto resFill = node.Cast<TResFill>();

if (resFill.DelegatedSource().Value() != KikimrProviderName) {
Expand All @@ -957,6 +984,8 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
.Build()
.Operations()
.Build()
.Settings()
.Build()
.Done();

auto exec = Build<TKiExecDataQuery>(ctx, node.Pos())
Expand All @@ -968,8 +997,7 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
.QueryBlocks()
.Add({queryBlock})
.Build()
.Settings()
.Build()
.Settings(execSettings.BuildNode(ctx, node.Pos()))
.Ast<TCoVoid>().Build()
.Done();

Expand All @@ -984,7 +1012,21 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
.Input(exec)
.Done();

return ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
auto newResFill = ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
auto resCommit = Build<TCoCommit>(ctx, node.Pos())
.World(newResFill)
.DataSink<TResultDataSink>()
.Build()
.Done();

return Build<TCoCommit>(ctx, node.Pos())
.World(resCommit)
.DataSink<TKiDataSink>()
.Category().Build(KikimrProviderName)
.Cluster().Build(cluster)
.Build()
.Settings(execSettings.BuildNode(ctx, node.Pos()))
.Done().Ptr();
}

TYdbOperation GetTableOp(const TKiWriteTable& write) {
Expand Down
19 changes: 19 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/library/dynumber/dynumber.h>
#include <ydb/library/uuid/uuid.h>

#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h>
#include <ydb/library/yql/providers/common/codec/yql_codec_results.h>
#include <ydb/library/yql/public/decimal/yql_decimal.h>

Expand Down Expand Up @@ -115,6 +116,24 @@ void WriteValueToYson(const TStringStream& stream, NCommon::TYsonResultWriter& w
return;
}

case NKikimrMiniKQL::ETypeKind::Pg:
{
if (value.GetValueValueCase() == NKikimrMiniKQL::TValue::kNullFlagValue) {
writer.OnEntity();
} else if (value.HasBytes()) {
auto convert = NKikimr::NPg::PgNativeTextFromNativeBinary(
value.GetBytes(), NKikimr::NPg::TypeDescFromPgTypeId(type.GetPg().Getoid())
);
YQL_ENSURE(!convert.Error, "Failed to convert pg value to text: " << *convert.Error);
writer.OnStringScalar(convert.Str);
} else if (value.HasText()) {
writer.OnStringScalar(value.GetText());
} else {
YQL_ENSURE(false, "malformed pg value");
}
return;
}

case NKikimrMiniKQL::ETypeKind::Optional:
if (!value.HasOptional()) {
writer.OnEntity();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
SetupLogLevelFromTestParam(NKikimrServices::KQP_NODE);
SetupLogLevelFromTestParam(NKikimrServices::KQP_BLOBS_STORAGE);
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD);
SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE);

RunCall([this, domain = settings.DomainRoot]{
this->Client->InitRootScheme(domain);
Expand Down
45 changes: 45 additions & 0 deletions ydb/core/kqp/ut/pg/pg_catalog_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr {
namespace NKqp {

using namespace NYdb;
using namespace NYdb::NTable;

Y_UNIT_TEST_SUITE(PgCatalog) {
Y_UNIT_TEST(PgType) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto db = kikimr.GetQueryClient();
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
{
auto result = db.ExecuteQuery(R"(
select typname from pg_catalog.pg_type order by oid
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
CompareYson(R"([
["bool"];["bytea"];["char"];["name"];["int8"];["int2"];["int2vector"];["int4"];
["regproc"];["text"];["oid"];["tid"];["xid"];["cid"];["oidvector"];["pg_ddl_command"];
["pg_type"];["pg_attribute"];["pg_proc"];["pg_class"];["json"];["xml"];["pg_node_tree"];
["table_am_handler"];["index_am_handler"];["point"];["lseg"];["path"];["box"];["polygon"];
["line"];["cidr"];["float4"];["float8"];["unknown"];["circle"];["macaddr8"];["money"];
["macaddr"];["inet"];["aclitem"];["bpchar"];["varchar"];["date"];["time"];["timestamp"];
["timestamptz"];["interval"];["timetz"];["bit"];["varbit"];["numeric"];["refcursor"];
["regprocedure"];["regoper"];["regoperator"];["regclass"];["regtype"];["record"];["cstring"];
["any"];["anyarray"];["void"];["trigger"];["language_handler"];["internal"];["anyelement"];
["_record"];["anynonarray"];["uuid"];["txid_snapshot"];["fdw_handler"];["pg_lsn"];["tsm_handler"];
["pg_ndistinct"];["pg_dependencies"];["anyenum"];["tsvector"];["tsquery"];["gtsvector"];
["regconfig"];["regdictionary"];["jsonb"];["anyrange"];["event_trigger"];["int4range"];["numrange"];
["tsrange"];["tstzrange"];["daterange"];["int8range"];["jsonpath"];["regnamespace"];["regrole"];
["regcollation"];["int4multirange"];["nummultirange"];["tsmultirange"];["tstzmultirange"];
["datemultirange"];["int8multirange"];["anymultirange"];["anycompatiblemultirange"];
["pg_brin_bloom_summary"];["pg_brin_minmax_multi_summary"];["pg_mcv_list"];["pg_snapshot"];["xid8"];
["anycompatible"];["anycompatiblearray"];["anycompatiblenonarray"];["anycompatiblerange"]
])", FormatResultSetYson(result.GetResultSet(0)));
}
}
}

} // namespace NKqp
} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/pg/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SIZE(MEDIUM)

SRCS(
kqp_pg_ut.cpp
pg_catalog_ut.cpp
)

PEERDIR(
Expand Down
17 changes: 15 additions & 2 deletions ydb/core/kqp/ut/yql/kqp_yql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,19 @@ Y_UNIT_TEST_SUITE(KqpYql) {
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(EvaluateExprPgNull) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteDataQuery(Q1_(R"(
SELECT EvaluateExpr( CAST(NULL AS PgInt8) );
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

CompareYson(R"([[#]])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(Discard) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
Expand Down Expand Up @@ -720,13 +733,13 @@ Y_UNIT_TEST_SUITE(KqpYql) {
.EndList()
.Build()
.Build();

const auto query = Q_(R"(
DECLARE $rows AS
List<Struct<
Key: Uuid,
Value: Int32?>>;

SELECT * FROM AS_TABLE($rows) ORDER BY Key;
)");
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params).ExtractValueSync();
Expand Down
13 changes: 11 additions & 2 deletions ydb/core/ydb_convert/ydb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,17 @@ void ConvertMiniKQLValueToYdbValue(const NKikimrMiniKQL::TType& inputType,
break;
}
case NKikimrMiniKQL::ETypeKind::Pg: {
const auto& stringRef = inputValue.GetText();
output.set_text_value(stringRef.data(), stringRef.size());
if (inputValue.GetValueValueCase() == NKikimrMiniKQL::TValue::kNullFlagValue) {
output.Setnull_flag_value(::google::protobuf::NULL_VALUE);
} else if (inputValue.HasBytes()) {
const auto& stringRef = inputValue.GetBytes();
output.set_bytes_value(stringRef.data(), stringRef.size());
} else if (inputValue.HasText()) {
const auto& stringRef = inputValue.GetText();
output.set_text_value(stringRef.data(), stringRef.size());
} else {
Y_ENSURE(false, "malformed pg value");
}
break;
}
default: {
Expand Down
23 changes: 16 additions & 7 deletions ydb/library/mkql_proto/mkql_proto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,8 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, NK

case TType::EKind::Pg: {
if (!value) {
// do not set Text field
return;
res.SetNullFlagValue(::google::protobuf::NULL_VALUE);
break;
}
auto pgType = static_cast<TPgType*>(type);
auto textValue = NYql::NCommon::PgValueToNativeText(value, pgType->GetTypeId());
Expand Down Expand Up @@ -599,8 +599,8 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd

case TType::EKind::Pg: {
if (!value) {
// do not set Text field
return;
res.set_null_flag_value(::google::protobuf::NULL_VALUE);
break;
}
auto pgType = static_cast<TPgType*>(type);
auto textValue = NYql::NCommon::PgValueToNativeText(value, pgType->GetTypeId());
Expand Down Expand Up @@ -1698,14 +1698,17 @@ NUdf::TUnboxedValue TProtoImporter::ImportValueFromProto(const TType* type, cons
}

case TType::EKind::Pg: {
if (value.GetValueCase() == Ydb::Value::kNullFlagValue) {
return NYql::NUdf::TUnboxedValue();
}
const TPgType* pgType = static_cast<const TPgType*>(type);
NYql::NUdf::TUnboxedValue unboxedValue;
if (value.Hastext_value()) {
unboxedValue = NYql::NCommon::PgValueFromNativeText(value.Gettext_value(), pgType->GetTypeId());
} else if (value.Hasbytes_value()) {
unboxedValue = NYql::NCommon::PgValueFromNativeBinary(value.Getbytes_value(), pgType->GetTypeId());
} else {
MKQL_ENSURE(false, "empty pg value proto");
MKQL_ENSURE(false, "malformed pg value");
}
return unboxedValue;
}
Expand All @@ -1728,10 +1731,16 @@ NUdf::TUnboxedValue TProtoImporter::ImportValueFromProto(const TType* type, cons

case TType::EKind::Pg: {
auto pgType = static_cast<const TPgType*>(type);
if (!value.HasBytes()) {
if (value.GetValueValueCase() == NKikimrMiniKQL::TValue::kNullFlagValue) {
return NUdf::TUnboxedValue();
}
return NYql::NCommon::PgValueFromNativeBinary(value.GetBytes(), pgType->GetTypeId());
if (value.HasBytes()) {
return NYql::NCommon::PgValueFromNativeBinary(value.GetBytes(), pgType->GetTypeId());
}
if (value.HasText()) {
return NYql::NCommon::PgValueFromNativeText(value.GetBytes(), pgType->GetTypeId());
}
MKQL_ENSURE(false, "malformed pg value");
}

case TType::EKind::Optional: {
Expand Down
Loading