Skip to content

Commit 26811c9

Browse files
authored
Merge 873b9d5 into ba12d29
2 parents ba12d29 + 873b9d5 commit 26811c9

File tree

4 files changed

+74
-75
lines changed

4 files changed

+74
-75
lines changed

ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@
7171
{"Index": 1, "Name": "Columns", "Type": "TExprBase"},
7272
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
7373
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
74-
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"},
75-
{"Index": 5, "Name": "ColumnTypes", "Type": "TExprBase"}
74+
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
7675
]
7776
},
7877
{

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
132132
}
133133

134134
TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) {
135-
if (!EnsureArgsCount(input.Ref(), 6, ctx)) {
135+
if (!EnsureArgsCount(input.Ref(), 5, ctx)) {
136136
return TStatus::Error;
137137
}
138138

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,25 +124,17 @@ class TPqDqIntegration: public TDqIntegrationBase {
124124

125125
const auto token = "cluster:default_" + clusterName;
126126

127-
auto rowSchema = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
128-
TExprNode::TListType colTypes;
129-
const auto& typeItems = rowSchema->GetItems();
130-
colTypes.reserve(typeItems.size());
131-
const auto pos = read->Pos(); // TODO
132-
std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colTypes),
133-
[&](const TItemExprType* item) {
134-
return ctx.NewAtom(pos, FormatType(item->GetItemType()));
135-
});
136-
auto columnTypes = ctx.NewList(pos, std::move(colTypes));
137-
127+
const auto& typeItems = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems();
128+
const auto pos = read->Pos();
129+
138130
TExprNode::TListType colNames;
139131
colNames.reserve(typeItems.size());
140132
std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colNames),
141133
[&](const TItemExprType* item) {
142134
return ctx.NewAtom(pos, item->GetName());
143135
});
144136
auto columnNames = ctx.NewList(pos, std::move(colNames));
145-
137+
146138
auto row = Build<TCoArgument>(ctx, read->Pos())
147139
.Name("row")
148140
.Done();
@@ -153,7 +145,6 @@ class TPqDqIntegration: public TDqIntegrationBase {
153145
.Build()
154146
.Done().Ptr();
155147

156-
157148
return Build<TDqSourceWrap>(ctx, read->Pos())
158149
.Input<TDqPqTopicSource>()
159150
.Topic(pqReadTopic.Topic())
@@ -163,7 +154,6 @@ class TPqDqIntegration: public TDqIntegrationBase {
163154
.Name().Build(token)
164155
.Build()
165156
.FilterPredicate(emptyPredicate)
166-
.ColumnTypes(std::move(columnTypes))
167157
.Build()
168158
.RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx))
169159
.DataSource(pqReadTopic.DataSource().Cast<TCoDataSource>())
@@ -263,14 +253,13 @@ class TPqDqIntegration: public TDqIntegrationBase {
263253
srcDesc.AddMetadataFields(metadata.Value().Maybe<TCoAtom>().Cast().StringValue());
264254
}
265255

266-
for (const auto& column : topicSource.Columns().Cast<TCoAtomList>()) {
267-
srcDesc.AddColumns(column.StringValue());
256+
const auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
257+
for (const auto& item : rowSchema->GetItems()) {
258+
srcDesc.AddColumns(TString(item->GetName()));
259+
srcDesc.AddColumnTypes(FormatType(item->GetItemType()));
260+
Cerr << "----------------------------- Added column " << TString(item->GetName()) << " with type " << FormatType(item->GetItemType()) << "\n";
268261
}
269262

270-
for (const auto& columnTypes : topicSource.ColumnTypes().Cast<TCoAtomList>()) {
271-
srcDesc.AddColumnTypes(columnTypes.StringValue());
272-
}
273-
274263
NYql::NConnector::NApi::TPredicate predicateProto;
275264
if (auto predicate = topicSource.FilterPredicate(); !NYql::IsEmptyFilterPredicate(predicate)) {
276265
TStringBuilder err;

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 63 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
#include "yql_pq_provider_impl.h"
22

33
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
4+
#include <ydb/library/yql/core/yql_opt_utils.h>
45
#include <ydb/library/yql/core/yql_type_helpers.h>
56
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
67
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
78
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
89
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
910
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
1011
#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h>
12+
#include <ydb/library/yql/providers/pq/common/yql_names.h>
1113
#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h>
1214
#include <ydb/library/yql/utils/log/log.h>
1315
#include <ydb/library/yql/utils/plan/plan_utils.h>
@@ -30,22 +32,20 @@ namespace {
3032
}
3133
};
3234

33-
std::unordered_set<TString> GetUsedMetadataFields(const TCoExtractMembers& extract) {
34-
std::unordered_set<TString> usedMetadataFields;
35-
for (const auto extractMember : extract.Members()) {
36-
if (FindPqMetaFieldDescriptorBySysColumn(extractMember.StringValue())) {
37-
usedMetadataFields.emplace(extractMember.StringValue());
38-
}
35+
std::unordered_set<TString> GetUsedColumnNames(const TCoExtractMembers& extractMembers) {
36+
std::unordered_set<TString> usedColumnNames;
37+
for (const auto& member : extractMembers.Members()) {
38+
usedColumnNames.emplace(member.StringValue());
3939
}
4040

41-
return usedMetadataFields;
41+
return usedColumnNames;
4242
}
4343

44-
TVector<TCoNameValueTuple> DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set<TString>& usedMetadataFields) {
44+
TVector<TCoNameValueTuple> DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set<TString>& usedColumnNames) {
4545
TVector<TCoNameValueTuple> newSourceMetadata;
4646
for (auto metadataItem : pqTopic.Metadata()) {
4747
auto metadataName = metadataItem.Cast<TCoNameValueTuple>().Value().Maybe<TCoAtom>().Cast().StringValue();
48-
if (usedMetadataFields.contains(metadataName)) {
48+
if (FindPqMetaFieldDescriptorBySysColumn(metadataName) && usedColumnNames.contains(metadataName)) {
4949
newSourceMetadata.push_back(metadataItem);
5050
}
5151
}
@@ -88,18 +88,18 @@ TCoNameValueTupleList DropUnusedMetadataFromDqWrapSettings(
8888
.Done();
8989
}
9090

91-
TExprNode::TPtr DropUnusedMetadataFieldsFromRowType(
91+
TExprNode::TPtr DropUnusedRowItems(
9292
TPositionHandle position,
9393
const TStructExprType* oldRowType,
94-
const std::unordered_set<TString>& usedMetadataFields,
94+
const std::unordered_set<TString>& usedColumnNames,
9595
TExprContext& ctx)
9696
{
9797
TVector<const TItemExprType*> newFields;
9898
newFields.reserve(oldRowType->GetSize());
9999

100100
for (auto itemExprType : oldRowType->GetItems()) {
101101
const auto columnName = TString(itemExprType->GetName());
102-
if (FindPqMetaFieldDescriptorBySysColumn(columnName) && !usedMetadataFields.contains(columnName)) {
102+
if (!usedColumnNames.contains(columnName)) {
103103
continue;
104104
}
105105

@@ -109,14 +109,14 @@ TExprNode::TPtr DropUnusedMetadataFieldsFromRowType(
109109
return ExpandType(position, *ctx.MakeType<TStructExprType>(newFields), ctx);
110110
}
111111

112-
TExprNode::TPtr DropUnusedMetadataFieldsFromColumns(
112+
TExprNode::TPtr DropUnusedColumns(
113113
TExprBase oldColumns,
114-
const std::unordered_set<TString>& usedMetadataFields,
114+
const std::unordered_set<TString>& usedColumnNames,
115115
TExprContext& ctx)
116116
{
117117
TExprNode::TListType res;
118118
for (const auto& column : oldColumns.Cast<TCoAtomList>()) {
119-
if (FindPqMetaFieldDescriptorBySysColumn(column.StringValue()) && !usedMetadataFields.contains(column.StringValue())) {
119+
if (!usedColumnNames.contains(column.StringValue())) {
120120
continue;
121121
}
122122

@@ -160,57 +160,68 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
160160
}*/
161161

162162
TMaybeNode<TExprBase> ExtractMembersOverDqWrap(TExprBase node, TExprContext& ctx) const {
163-
const auto& extract = node.Cast<TCoExtractMembers>();
164-
const auto& input = extract.Input();
165-
const auto dqSourceWrap = input.Maybe<TDqSourceWrap>();
166-
const auto dqPqTopicSource = dqSourceWrap.Input().Maybe<TDqPqTopicSource>();
167-
const auto pqTopic = dqPqTopicSource.Topic().Maybe<TPqTopic>();
168-
if (!pqTopic) {
163+
const auto& extractMembers = node.Cast<TCoExtractMembers>();
164+
const auto& extractMembersInput = extractMembers.Input();
165+
const auto& maybeDqSourceWrap = extractMembersInput.Maybe<TDqSourceWrap>();
166+
if (!maybeDqSourceWrap) {
167+
return node;
168+
}
169+
170+
const auto& dqSourceWrap = maybeDqSourceWrap.Cast();
171+
if (dqSourceWrap.DataSource().Category() != PqProviderName) {
172+
return node;
173+
}
174+
175+
const auto& maybeDqPqTopicSource = dqSourceWrap.Input().Maybe<TDqPqTopicSource>();
176+
if (!maybeDqPqTopicSource) {
169177
return node;
170178
}
171179

172-
const auto usedMetadataFields = GetUsedMetadataFields(extract);
173-
const auto newSourceMetadata = DropUnusedMetadata(pqTopic.Cast(), usedMetadataFields);
174-
if (newSourceMetadata.size() == pqTopic.Metadata().Cast().Size()) {
180+
const auto& dqPqTopicSource = maybeDqPqTopicSource.Cast();
181+
const auto& pqTopic = dqPqTopicSource.Topic();
182+
183+
auto usedColumnNames = GetUsedColumnNames(extractMembers);
184+
const TStructExprType* inputRowType = pqTopic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
185+
const TStructExprType* outputRowType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
186+
if (outputRowType->GetSize() == 0 && inputRowType->GetSize() > 0) {
187+
auto item = GetLightColumn(*inputRowType);
188+
YQL_ENSURE(item);
189+
YQL_ENSURE(usedColumnNames.insert(TString(item->GetName())).second);
190+
}
191+
192+
const auto oldRowType = pqTopic.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
193+
if (oldRowType->GetSize() == usedColumnNames.size()) {
175194
return node;
176195
}
177196

178-
const auto oldRowType = pqTopic.Ref().GetTypeAnn()
179-
->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
197+
const auto& newSourceMetadata = DropUnusedMetadata(pqTopic, usedColumnNames);
180198

181-
auto newPqTopicSource = Build<TDqPqTopicSource>(ctx, node.Pos())
182-
.InitFrom(dqPqTopicSource.Cast())
199+
const TExprNode::TPtr newPqTopicSource = Build<TDqPqTopicSource>(ctx, dqPqTopicSource.Pos())
200+
.InitFrom(dqPqTopicSource)
183201
.Topic<TPqTopic>()
184-
.InitFrom(pqTopic.Cast())
202+
.InitFrom(pqTopic)
185203
.Metadata().Add(newSourceMetadata).Build()
186-
.Build();
187-
188-
if (dqPqTopicSource.Columns()) {
189-
auto newColumns = DropUnusedMetadataFieldsFromColumns(
190-
dqPqTopicSource.Columns().Cast(),
191-
usedMetadataFields,
192-
ctx);
193-
newPqTopicSource.Columns(newColumns);
194-
}
204+
.RowSpec(DropUnusedRowItems(pqTopic.RowSpec().Pos(), inputRowType, usedColumnNames, ctx))
205+
.Build()
206+
.Columns(DropUnusedColumns(dqPqTopicSource.Columns(), usedColumnNames, ctx))
207+
.Done()
208+
.Ptr();
195209

196-
const auto newDqSourceWrap = Build<TDqSourceWrap>(ctx, node.Pos())
197-
.InitFrom(dqSourceWrap.Cast())
198-
.Input(newPqTopicSource.Done())
199-
.Settings(DropUnusedMetadataFromDqWrapSettings(
200-
dqSourceWrap.Cast(),
201-
newSourceMetadata,
202-
ctx))
203-
.RowType(DropUnusedMetadataFieldsFromRowType(
204-
node.Pos(),
205-
oldRowType,
206-
usedMetadataFields,
207-
ctx))
210+
const TExprNode::TPtr newDqSourceWrap = Build<TDqSourceWrap>(ctx, dqSourceWrap.Pos())
211+
.InitFrom(dqSourceWrap)
212+
.Input(newPqTopicSource)
213+
.Settings(DropUnusedMetadataFromDqWrapSettings(dqSourceWrap, newSourceMetadata, ctx))
214+
.RowType(DropUnusedRowItems(dqSourceWrap.RowType().Pos(), oldRowType, usedColumnNames, ctx))
208215
.Done()
209216
.Ptr();
210217

218+
if (outputRowType->GetSize() == usedColumnNames.size()) {
219+
return newDqSourceWrap;
220+
}
221+
211222
return Build<TCoExtractMembers>(ctx, node.Pos())
212-
.InitFrom(extract)
213-
.Input(ctx.ReplaceNode(input.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap))
223+
.InitFrom(extractMembers)
224+
.Input(ctx.ReplaceNode(extractMembersInput.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap))
214225
.Done();
215226
}
216227

0 commit comments

Comments
 (0)