Skip to content

Support of wide dates/interval/decimal/dynumber in ToPg #3511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/library/yql/core/type_ann/type_ann_pg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ const TTypeAnnotationNode* ToPgImpl(TPositionHandle pos, const TTypeAnnotationNo
pgType = "int8";
break;
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Decimal:
case NUdf::EDataSlot::DyNumber:
pgType = "numeric";
break;
case NUdf::EDataSlot::Float:
Expand All @@ -468,13 +470,17 @@ const TTypeAnnotationNode* ToPgImpl(TPositionHandle pos, const TTypeAnnotationNo
pgType = "text";
break;
case NUdf::EDataSlot::Date:
case NUdf::EDataSlot::Date32:
pgType = "date";
break;
case NUdf::EDataSlot::Datetime:
case NUdf::EDataSlot::Datetime64:
case NUdf::EDataSlot::Timestamp:
case NUdf::EDataSlot::Timestamp64:
pgType = "timestamp";
break;
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval64:
pgType = "interval";
break;
case NUdf::EDataSlot::Json:
Expand Down
17 changes: 17 additions & 0 deletions ydb/library/yql/parser/pg_wrapper/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <ydb/library/yql/parser/pg_wrapper/interface/utils.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
#include <ydb/library/dynumber/dynumber.h>
#include <ydb/library/yql/public/decimal/yql_decimal.h>
#include <util/generic/singleton.h>

#include <arrow/compute/cast.h>
Expand All @@ -15,6 +17,7 @@
extern "C" {
#include "utils/date.h"
#include "utils/timestamp.h"
#include "utils/fmgrprotos.h"
}

namespace NYql {
Expand Down Expand Up @@ -171,6 +174,20 @@ Numeric Uint64ToPgNumeric(ui64 value) {
return ret2;
}

Numeric DecimalToPgNumeric(const NUdf::TUnboxedValuePod& value, ui8 precision, ui8 scale) {
const auto str = NYql::NDecimal::ToString(value.GetInt128(), precision, scale);
Y_ENSURE(str);
return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
PointerGetDatum(str), Int32GetDatum(0), Int32GetDatum(-1));
}

Numeric DyNumberToPgNumeric(const NUdf::TUnboxedValuePod& value) {
auto str = NKikimr::NDyNumber::DyNumberToString(value.AsStringRef());
Y_ENSURE(str);
return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
PointerGetDatum(str->c_str()), Int32GetDatum(0), Int32GetDatum(-1));
}

Numeric PgFloatToNumeric(double item, ui64 scale, int digits) {
double intPart, fracPart;
bool error;
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/parser/pg_wrapper/arrow_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ extern "C" {
namespace NYql {

Numeric Uint64ToPgNumeric(ui64 value);
Numeric DecimalToPgNumeric(const NUdf::TUnboxedValuePod& value, ui8 precision, ui8 scale);
Numeric DyNumberToPgNumeric(const NUdf::TUnboxedValuePod& value);
Numeric PgFloatToNumeric(double item, ui64 scale, int digits);
Numeric PgDecimal128ToNumeric(arrow::Decimal128 val, int32_t precision, int32_t scale, Numeric high_bits_mul);
TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType);
Expand Down
135 changes: 100 additions & 35 deletions ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1908,15 +1908,15 @@ class TPgCast : public TMutableComputationNode<TPgCast> {
bool ConvertLength = false;
};

const i32 PgDateShift = 10957;
const i64 PgTimestampShift = 946684800000000ll;
const i32 PgDateShift = UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE;
const i64 PgTimestampShift = USECS_PER_DAY * (UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE);

inline i32 Date2Pg(ui16 value) {
return i32(value) - PgDateShift;
inline i32 Date2Pg(i32 value) {
return value + PgDateShift;
}

inline i64 Timestamp2Pg(ui64 value) {
return i64(value) - PgTimestampShift;
inline i64 Timestamp2Pg(i64 value) {
return value + PgTimestampShift;
}

inline Interval* Interval2Pg(i64 value) {
Expand Down Expand Up @@ -1957,6 +1957,8 @@ NUdf::TUnboxedValuePod ConvertToPgValue(NUdf::TUnboxedValuePod value, TMaybe<NUd
return ScalarDatumToPod(Int64GetDatum(value.Get<i64>()));
case NUdf::EDataSlot::Uint64:
return PointerDatumToPod(NumericGetDatum(Uint64ToPgNumeric(value.Get<ui64>())));
case NUdf::EDataSlot::DyNumber:
return PointerDatumToPod(NumericGetDatum(DyNumberToPgNumeric(value)));
case NUdf::EDataSlot::Float:
return ScalarDatumToPod(Float4GetDatum(value.Get<float>()));
case NUdf::EDataSlot::Double:
Expand All @@ -1979,10 +1981,23 @@ NUdf::TUnboxedValuePod ConvertToPgValue(NUdf::TUnboxedValuePod value, TMaybe<NUd
auto res = Timestamp2Pg(value.Get<ui64>());
return ScalarDatumToPod(res);
}
case NUdf::EDataSlot::Interval: {
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval64: {
auto res = Interval2Pg(value.Get<i64>());
return PointerDatumToPod(PointerGetDatum(res));
}
case NUdf::EDataSlot::Date32: {
auto res = Date2Pg(value.Get<i32>());
return ScalarDatumToPod(res);
}
case NUdf::EDataSlot::Datetime64: {
auto res = Timestamp2Pg(value.Get<i64>() * 1000000ull);
return ScalarDatumToPod(res);
}
case NUdf::EDataSlot::Timestamp64: {
auto res = Timestamp2Pg(value.Get<i64>());
return ScalarDatumToPod(res);
}
case NUdf::EDataSlot::Json: {
auto input = MakeCString(value.AsStringRef());
auto res = DirectFunctionCall1Coll(json_in, DEFAULT_COLLATION_OID, PointerGetDatum(input));
Expand Down Expand Up @@ -2169,9 +2184,10 @@ template <NUdf::EDataSlot Slot>
class TToPg : public TMutableComputationNode<TToPg<Slot>> {
typedef TMutableComputationNode<TToPg<Slot>> TBaseComputation;
public:
TToPg(TComputationMutables& mutables, IComputationNode* arg)
TToPg(TComputationMutables& mutables, IComputationNode* arg, TDataType* argType)
: TBaseComputation(mutables)
, Arg(arg)
, ArgType(argType)
{
}

Expand All @@ -2181,7 +2197,13 @@ class TToPg : public TMutableComputationNode<TToPg<Slot>> {
return value.Release();
}

return ConvertToPgValue<Slot>(value);
if constexpr (Slot == NUdf::EDataSlot::Decimal) {
auto decimalType = static_cast<TDataDecimalType*>(ArgType);
return PointerDatumToPod(NumericGetDatum(DecimalToPgNumeric(value,
decimalType->GetParams().first, decimalType->GetParams().second)));
} else {
return ConvertToPgValue<Slot>(value);
}
}

private:
Expand All @@ -2190,6 +2212,7 @@ class TToPg : public TMutableComputationNode<TToPg<Slot>> {
}

IComputationNode* const Arg;
TDataType* ArgType;
};

class TPgArray : public TMutableComputationNode<TPgArray> {
Expand Down Expand Up @@ -2781,7 +2804,32 @@ struct TToPgExec {
}
break;
}
case NUdf::EDataSlot::Interval: {
case NUdf::EDataSlot::Date32: {
auto inputPtr = array.GetValues<i32>(1);
auto outputPtr = res->array()->GetMutableValues<ui64>(1);
for (size_t i = 0; i < length; ++i) {
outputPtr[i] = Int32GetDatum(Date2Pg(inputPtr[i]));
}
break;
}
case NUdf::EDataSlot::Datetime64: {
auto inputPtr = array.GetValues<i64>(1);
auto outputPtr = res->array()->GetMutableValues<ui64>(1);
for (size_t i = 0; i < length; ++i) {
outputPtr[i] = Int64GetDatum(Timestamp2Pg(inputPtr[i] * 1000000ull));
}
break;
}
case NUdf::EDataSlot::Timestamp64: {
auto inputPtr = array.GetValues<i64>(1);
auto outputPtr = res->array()->GetMutableValues<ui64>(1);
for (size_t i = 0; i < length; ++i) {
outputPtr[i] = Int64GetDatum(Timestamp2Pg(inputPtr[i]));
}
break;
}
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval64: {
NUdf::TFixedSizeBlockReader<i64, true> reader;
NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
for (size_t i = 0; i < length; ++i) {
Expand Down Expand Up @@ -2881,10 +2929,14 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeToPgKernel(TType* inputType, T
case NUdf::EDataSlot::Date:
case NUdf::EDataSlot::Datetime:
case NUdf::EDataSlot::Timestamp:
case NUdf::EDataSlot::Date32:
case NUdf::EDataSlot::Datetime64:
case NUdf::EDataSlot::Timestamp64:
break;
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8:
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval64:
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Yson:
case NUdf::EDataSlot::Json:
Expand Down Expand Up @@ -3117,56 +3169,69 @@ TComputationNodeFactory GetPgFactory() {
argType = AS_TYPE(TOptionalType, argType)->GetItemType();
}

auto sourceDataSlot = AS_TYPE(TDataType, argType)->GetDataSlot();
auto dataType = AS_TYPE(TDataType, argType);
auto sourceDataSlot = dataType->GetDataSlot();
switch (*sourceDataSlot) {
case NUdf::EDataSlot::Bool:
return new TToPg<NUdf::EDataSlot::Bool>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Bool>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Int8:
return new TToPg<NUdf::EDataSlot::Int8>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Int8>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Uint8:
return new TToPg<NUdf::EDataSlot::Uint8>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Uint8>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Int16:
return new TToPg<NUdf::EDataSlot::Int16>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Int16>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Uint16:
return new TToPg<NUdf::EDataSlot::Uint16>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Uint16>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Int32:
return new TToPg<NUdf::EDataSlot::Int32>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Int32>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Uint32:
return new TToPg<NUdf::EDataSlot::Uint32>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Uint32>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Int64:
return new TToPg<NUdf::EDataSlot::Int64>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Int64>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Uint64:
return new TToPg<NUdf::EDataSlot::Uint64>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Uint64>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Float:
return new TToPg<NUdf::EDataSlot::Float>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Float>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Double:
return new TToPg<NUdf::EDataSlot::Double>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Double>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Utf8:
return new TToPg<NUdf::EDataSlot::Utf8>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Utf8>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::String:
return new TToPg<NUdf::EDataSlot::String>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::String>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Date:
return new TToPg<NUdf::EDataSlot::Date>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Date>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Datetime:
return new TToPg<NUdf::EDataSlot::Datetime>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Datetime>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Timestamp:
return new TToPg<NUdf::EDataSlot::Timestamp>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Timestamp>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Interval:
return new TToPg<NUdf::EDataSlot::Interval>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Interval>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::TzDate:
return new TToPg<NUdf::EDataSlot::TzDate>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::TzDate>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::TzDatetime:
return new TToPg<NUdf::EDataSlot::TzDatetime>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::TzDatetime>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::TzTimestamp:
return new TToPg<NUdf::EDataSlot::TzTimestamp>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::TzTimestamp>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Date32:
return new TToPg<NUdf::EDataSlot::Date32>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Datetime64:
return new TToPg<NUdf::EDataSlot::Datetime64>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Timestamp64:
return new TToPg<NUdf::EDataSlot::Timestamp64>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Interval64:
return new TToPg<NUdf::EDataSlot::Interval64>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Uuid:
return new TToPg<NUdf::EDataSlot::Uuid>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Uuid>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Yson:
return new TToPg<NUdf::EDataSlot::Yson>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Yson>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Json:
return new TToPg<NUdf::EDataSlot::Json>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::Json>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::JsonDocument:
return new TToPg<NUdf::EDataSlot::JsonDocument>(ctx.Mutables, arg);
return new TToPg<NUdf::EDataSlot::JsonDocument>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::Decimal:
return new TToPg<NUdf::EDataSlot::Decimal>(ctx.Mutables, arg, dataType);
case NUdf::EDataSlot::DyNumber:
return new TToPg<NUdf::EDataSlot::DyNumber>(ctx.Mutables, arg, dataType);
default:
ythrow yexception() << "Unsupported type: " << NUdf::GetDataTypeInfo(*sourceDataSlot).Name;
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/parser/pg_wrapper/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ PEERDIR(
ydb/library/yql/public/issue
ydb/library/yql/public/udf
ydb/library/yql/utils
ydb/library/yql/public/decimal
ydb/library/binary_json
ydb/library/dynumber
ydb/library/uuid

contrib/libs/icu
Expand Down
18 changes: 9 additions & 9 deletions ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -535,23 +535,23 @@
"test.test[blocks-filter_direct_col--Results]": [],
"test.test[blocks-pg_to_dates--Analyze]": [
{
"checksum": "b1cbcf335f36d3b6d3235831623e96da",
"size": 3724,
"uri": "https://{canondata_backend}/1937429/4d28be10a189df952d19e45ac95af76010238866/resource.tar.gz#test.test_blocks-pg_to_dates--Analyze_/plan.txt"
"checksum": "2a21c8668a5f5f9b4304777c6aa3c3ca",
"size": 3799,
"uri": "https://{canondata_backend}/1775059/79419adaad52cfab3aa7900a1f1cb4a6b393feb3/resource.tar.gz#test.test_blocks-pg_to_dates--Analyze_/plan.txt"
}
],
"test.test[blocks-pg_to_dates--Debug]": [
{
"checksum": "bc8d4bb89d0111c2f476394873f813d8",
"size": 1557,
"uri": "https://{canondata_backend}/1937429/4d28be10a189df952d19e45ac95af76010238866/resource.tar.gz#test.test_blocks-pg_to_dates--Debug_/opt.yql_patched"
"checksum": "d2a83a82851f3aa631091c7803038c11",
"size": 1864,
"uri": "https://{canondata_backend}/1775059/79419adaad52cfab3aa7900a1f1cb4a6b393feb3/resource.tar.gz#test.test_blocks-pg_to_dates--Debug_/opt.yql_patched"
}
],
"test.test[blocks-pg_to_dates--Plan]": [
{
"checksum": "b1cbcf335f36d3b6d3235831623e96da",
"size": 3724,
"uri": "https://{canondata_backend}/1937429/4d28be10a189df952d19e45ac95af76010238866/resource.tar.gz#test.test_blocks-pg_to_dates--Plan_/plan.txt"
"checksum": "2a21c8668a5f5f9b4304777c6aa3c3ca",
"size": 3799,
"uri": "https://{canondata_backend}/1775059/79419adaad52cfab3aa7900a1f1cb4a6b393feb3/resource.tar.gz#test.test_blocks-pg_to_dates--Plan_/plan.txt"
}
],
"test.test[blocks-pg_to_dates--Results]": [],
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part14/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,28 @@
}
],
"test.test[pg-nullif-default.txt-Results]": [],
"test.test[pg-numeric_to_pg-default.txt-Analyze]": [
{
"checksum": "b4dd508a329723c74293d80f0278c705",
"size": 505,
"uri": "https://{canondata_backend}/1847551/c04b6845f7d6b8061d0f3bb18348cc2396fe3c4b/resource.tar.gz#test.test_pg-numeric_to_pg-default.txt-Analyze_/plan.txt"
}
],
"test.test[pg-numeric_to_pg-default.txt-Debug]": [
{
"checksum": "9b4c62254b0fdbd9ad13924e5f3f4ea7",
"size": 544,
"uri": "https://{canondata_backend}/1847551/c04b6845f7d6b8061d0f3bb18348cc2396fe3c4b/resource.tar.gz#test.test_pg-numeric_to_pg-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[pg-numeric_to_pg-default.txt-Plan]": [
{
"checksum": "b4dd508a329723c74293d80f0278c705",
"size": 505,
"uri": "https://{canondata_backend}/1847551/c04b6845f7d6b8061d0f3bb18348cc2396fe3c4b/resource.tar.gz#test.test_pg-numeric_to_pg-default.txt-Plan_/plan.txt"
}
],
"test.test[pg-numeric_to_pg-default.txt-Results]": [],
"test.test[pg-range_function_multi-default.txt-Analyze]": [
{
"checksum": "b2a2eb5d6b0a138ee924c128fc7738ef",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1848,9 +1848,9 @@
],
"test.test[pg-dates_to_pg-default.txt-Debug]": [
{
"checksum": "622dfc6e57dd85854252f5a29bf8d1bf",
"size": 559,
"uri": "https://{canondata_backend}/1777230/92358f07848628e912a541ea35cf562f3ca2e131/resource.tar.gz#test.test_pg-dates_to_pg-default.txt-Debug_/opt.yql_patched"
"checksum": "578a7fb60483ffab4f7538c4393df173",
"size": 931,
"uri": "https://{canondata_backend}/995452/c7d73273c2fd9304580eab1dcf56a57ae2b37fa7/resource.tar.gz#test.test_pg-dates_to_pg-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[pg-dates_to_pg-default.txt-Plan]": [
Expand Down
Loading