Skip to content

Commit 18604c2

Browse files
authored
rework solomon sink type ann (#6636)
1 parent 220d066 commit 18604c2

File tree

4 files changed

+24
-42
lines changed

4 files changed

+24
-42
lines changed

ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@
105105
{"Index": 1, "Name": "Project", "Type": "TCoAtom"},
106106
{"Index": 2, "Name": "Cluster", "Type": "TCoAtom"},
107107
{"Index": 3, "Name": "Service", "Type": "TCoAtom"},
108-
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
108+
{"Index": 4, "Name": "RowType", "Type": "TExprBase"},
109+
{"Index": 5, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
109110
]
110111
}
111112
]

ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class TSolomonDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase
115115
TStatus HandleSoShard(TExprBase input, TExprContext& ctx) {
116116
YQL_ENSURE(!State_->IsRtmrMode(), "SoShard can't be used in rtmr mode");
117117

118-
if (!EnsureMinArgsCount(input.Ref(), 4, ctx) || !EnsureMaxArgsCount(input.Ref(), 5, ctx)) {
118+
if (!EnsureMinMaxArgsCount(input.Ref(), 5, 6, ctx)) {
119119
return TStatus::Error;
120120
}
121121

@@ -137,6 +137,10 @@ class TSolomonDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase
137137
return TStatus::Error;
138138
}
139139

140+
if (!EnsureType(shard.RowType().Ref(), ctx)) {
141+
return TStatus::Error;
142+
}
143+
140144
if (shard.Token() && !EnsureCallable(shard.Token().Ref(), ctx)) {
141145
return TStatus::Error;
142146
}

ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp

+1-14
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,6 @@ NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonC
4444
}
4545
}
4646

47-
const TTypeAnnotationNode* GetItemType(const TExprNode& node) {
48-
const TTypeAnnotationNode* typeAnn = node.GetTypeAnn();
49-
switch (typeAnn->GetKind()) {
50-
case ETypeAnnotationKind::Flow:
51-
return typeAnn->Cast<TFlowExprType>()->GetItemType();
52-
case ETypeAnnotationKind::Stream:
53-
return typeAnn->Cast<TStreamExprType>()->GetItemType();
54-
default: break;
55-
}
56-
YQL_ENSURE(false, "Invalid solomon sink type " << typeAnn->GetKind());
57-
return nullptr;
58-
}
59-
6047
void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShardScheme& scheme) {
6148
int index = 0;
6249
for (const TItemExprType* structItem : itemType.Cast<TStructExprType>()->GetItems()) {
@@ -316,7 +303,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
316303
shardDesc.SetClusterType(MapClusterType(clusterDesc->GetClusterType()));
317304
shardDesc.SetUseSsl(clusterDesc->GetUseSsl());
318305

319-
const TTypeAnnotationNode* itemType = GetItemType(node);
306+
const TTypeAnnotationNode* itemType = shard.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
320307
FillScheme(*itemType, *shardDesc.MutableScheme());
321308

322309
if (auto maybeToken = shard.Token()) {

ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp

+16-26
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,14 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
100100
YQL_CLOG(INFO, ProviderSolomon) << "Optimize SoWriteToShard";
101101

102102
const auto solomonCluster = TString(write.DataSink().Cluster().Value());
103-
auto shard = BuildSolomonShard(write.Shard().Cast<TCoAtom>(), ctx, solomonCluster);
103+
auto* typeAnn = write.Input().Ref().GetTypeAnn();
104+
const TTypeAnnotationNode* inputItemType = nullptr;
105+
if (!EnsureNewSeqType<false, true, false>(write.Input().Pos(), *typeAnn, ctx, &inputItemType)) {
106+
return {};
107+
}
108+
109+
auto rowTypeNode = ExpandType(write.Pos(), *inputItemType, ctx);
110+
auto shard = BuildSolomonShard(write.Shard().Cast<TCoAtom>(), TExprBase(rowTypeNode), ctx, solomonCluster);
104111

105112
auto dqSink = Build<TDqSink>(ctx, write.Pos())
106113
.DataSink(write.DataSink())
@@ -131,7 +138,7 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
131138
}
132139

133140
private:
134-
TCallable BuildSolomonShard(TCoAtom shardNode, TExprContext& ctx, TString solomonCluster) const {
141+
TCallable BuildSolomonShard(TCoAtom shardNode, TExprBase rowType, TExprContext& ctx, TString solomonCluster) const {
135142
const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(solomonCluster);
136143
YQL_ENSURE(clusterDesc, "Unknown cluster " << solomonCluster);
137144

@@ -148,31 +155,14 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
148155
}
149156
YQL_ENSURE(!cluster.empty(), "Cluster is not defined. You can define it inside connection, or inside query.");
150157

151-
auto solomonClusterAtom = Build<TCoAtom>(ctx, shardNode.Pos())
152-
.Value(solomonCluster)
153-
.Done();
154-
155-
auto projectAtom = Build<TCoAtom>(ctx, shardNode.Pos())
156-
.Value(project)
157-
.Done();
158-
159-
auto clusterAtom = Build<TCoAtom>(ctx, shardNode.Pos())
160-
.Value(cluster)
158+
return Build<TSoShard>(ctx, shardNode.Pos())
159+
.SolomonCluster<TCoAtom>().Value(solomonCluster).Build()
160+
.Project<TCoAtom>().Value(project).Build()
161+
.Cluster<TCoAtom>().Value(cluster).Build()
162+
.Service<TCoAtom>().Value(service).Build()
163+
.RowType(rowType)
164+
.Token<TCoSecureParam>().Name().Build("cluster:default_" + solomonCluster).Build()
161165
.Done();
162-
163-
auto serviceAtom = Build<TCoAtom>(ctx, shardNode.Pos())
164-
.Value(service)
165-
.Done();
166-
167-
auto dqSoShardBuilder = Build<TSoShard>(ctx, shardNode.Pos());
168-
dqSoShardBuilder.SolomonCluster(solomonClusterAtom);
169-
dqSoShardBuilder.Project(projectAtom);
170-
dqSoShardBuilder.Cluster(clusterAtom);
171-
dqSoShardBuilder.Service(serviceAtom);
172-
173-
dqSoShardBuilder.Token<TCoSecureParam>().Name().Build("cluster:default_" + solomonCluster).Build();
174-
175-
return dqSoShardBuilder.Done();
176166
}
177167

178168
private:

0 commit comments

Comments
 (0)