Skip to content

Commit eda727a

Browse files
authored
YQL-9517: RPC Arrow reader YT column converters (#599)
* YQL-9517: RPC Arrow reader YT column converters * .make + bool * wideflow => stream * yson fix * fix * fix * remove unused
1 parent 1daec26 commit eda727a

14 files changed

+856
-165
lines changed

ydb/library/yql/minikql/computation/mkql_block_reader.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ struct TConverterTraits {
184184
using TTuple = TTupleBlockItemConverter<Nullable>;
185185
template <typename T, bool Nullable>
186186
using TFixedSize = TFixedSizeBlockItemConverter<T, Nullable>;
187-
template <typename TStringType, bool Nullable, NUdf::EPgStringType PgString = NUdf::EPgStringType::None>
187+
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String, NUdf::EPgStringType PgString = NUdf::EPgStringType::None>
188188
using TStrings = TStringBlockItemConverter<TStringType, Nullable, PgString>;
189189
using TExtOptional = TExternalOptionalBlockItemConverter;
190190

@@ -193,15 +193,15 @@ struct TConverterTraits {
193193
return std::make_unique<TFixedSize<ui64, true>>();
194194
} else {
195195
if (desc.Typelen == -1) {
196-
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EPgStringType::Text>>();
196+
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EDataSlot::String, NUdf::EPgStringType::Text>>();
197197
ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen);
198198
return ret;
199199
} else if (desc.Typelen == -2) {
200-
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EPgStringType::CString>>();
200+
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EDataSlot::String, NUdf::EPgStringType::CString>>();
201201
ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen);
202202
return ret;
203203
} else {
204-
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EPgStringType::Fixed>>();
204+
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EDataSlot::String, NUdf::EPgStringType::Fixed>>();
205205
ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen);
206206
return ret;
207207
}

ydb/library/yql/minikql/computation/mkql_block_transport.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ struct TSerializerTraits {
500500
using TTuple = TTupleBlockSerializer<Nullable>;
501501
template <typename T, bool Nullable>
502502
using TFixedSize = TFixedSizeBlockSerializer<sizeof(T), Nullable>;
503-
template <typename TStringType, bool Nullable>
503+
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
504504
using TStrings = TStringBlockSerializer<TStringType, Nullable>;
505505
using TExtOptional = TExtOptionalBlockSerializer;
506506

@@ -519,7 +519,7 @@ struct TDeserializerTraits {
519519
using TTuple = TTupleBlockDeserializer<Nullable>;
520520
template <typename T, bool Nullable>
521521
using TFixedSize = TFixedSizeBlockDeserializer<sizeof(T), Nullable>;
522-
template <typename TStringType, bool Nullable>
522+
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
523523
using TStrings = TStringBlockDeserializer<TStringType, Nullable>;
524524
using TExtOptional = TExtOptionalBlockDeserializer;
525525

ydb/library/yql/minikql/mkql_type_builder.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -2425,7 +2425,7 @@ struct TComparatorTraits {
24252425
using TTuple = NUdf::TTupleBlockItemComparator<Nullable>;
24262426
template <typename T, bool Nullable>
24272427
using TFixedSize = NUdf::TFixedSizeBlockItemComparator<T, Nullable>;
2428-
template <typename TStringType, bool Nullable>
2428+
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
24292429
using TStrings = NUdf::TStringBlockItemComparator<TStringType, Nullable>;
24302430
using TExtOptional = NUdf::TExternalOptionalBlockItemComparator;
24312431

@@ -2441,7 +2441,7 @@ struct THasherTraits {
24412441
using TTuple = NUdf::TTupleBlockItemHasher<Nullable>;
24422442
template <typename T, bool Nullable>
24432443
using TFixedSize = NUdf::TFixedSizeBlockItemHasher<T, Nullable>;
2444-
template <typename TStringType, bool Nullable>
2444+
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
24452445
using TStrings = NUdf::TStringBlockItemHasher<TStringType, Nullable>;
24462446
using TExtOptional = NUdf::TExternalOptionalBlockItemHasher;
24472447

ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp

-14
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,6 @@ TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<cons
2222
return Nothing();
2323
}
2424

25-
bool TDqIntegrationBase::CanBlockReadTypes(const TStructExprType* node) {
26-
for (const auto& e: node->GetItems()) {
27-
// Check type
28-
auto type = e->GetItemType();
29-
while (ETypeAnnotationKind::Optional == type->GetKind()) {
30-
type = type->Cast<TOptionalExprType>()->GetItemType();
31-
}
32-
if (ETypeAnnotationKind::Data != type->GetKind()) {
33-
return false;
34-
}
35-
}
36-
return true;
37-
}
38-
3925
TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) {
4026
return read;
4127
}

ydb/library/yql/providers/dq/opt/dqs_opt.cpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/library/yql/core/type_ann/type_ann_core.h>
1111
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
1212
#include <ydb/library/yql/core/yql_type_annotation.h>
13+
#include <ydb/library/yql/core/yql_opt_utils.h>
1314

1415
#include <ydb/library/yql/dq/opt/dq_opt.h>
1516
#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
@@ -92,12 +93,13 @@ namespace NYql::NDqs {
9293
}
9394

9495
YQL_CLOG(INFO, ProviderDq) << "DqsRewritePhyBlockReadOnDqIntegration";
95-
9696
return Build<TCoWideFromBlocks>(ctx, node->Pos())
97-
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
98-
.Input(readWideWrap.Input())
99-
.Flags(readWideWrap.Flags())
100-
.Token(readWideWrap.Token())
97+
.Input(Build<TCoToFlow>(ctx, node->Pos())
98+
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
99+
.Input(readWideWrap.Input())
100+
.Flags(readWideWrap.Flags())
101+
.Token(readWideWrap.Token())
102+
.Done())
101103
.Done())
102104
.Done().Ptr();
103105
}, ctx, optSettings);

ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ class TDqsDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
155155
}
156156

157157
types.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)));
158+
input->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TMultiExprType>(types)));
159+
return TStatus::Ok;
158160
}
159161

160162
input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TMultiExprType>(types)));

0 commit comments

Comments
 (0)