Skip to content

Commit 92ff6c1

Browse files
authored
Handle Write in DQ recapture (#8212)
1 parent 5b61f0c commit 92ff6c1

File tree

6 files changed

+20
-3
lines changed

6 files changed

+20
-3
lines changed

ydb/library/yql/dq/integration/yql_dq_integration.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class IDqIntegration {
5252
virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0;
5353
virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
5454
virtual TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0;
55+
virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
5556

5657
// Nothing if callable is not for writing,
5758
// false if callable is for writing and there are some errors (they are added to ctx),

ydb/library/yql/dq/opt/dq_opt_log.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& c
349349
return node;
350350
}
351351

352-
IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) {
352+
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) {
353353
TOptimizeExprSettings settings{&typesCtx};
354354
auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) {
355355
if (auto maybeRead = TMaybeNode<TCoRight>(node).Input()) {
@@ -364,6 +364,16 @@ IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::T
364364
}
365365
}
366366
}
367+
} else if (node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::World
368+
&& !TCoCommit::Match(node.Get())
369+
&& node->ChildrenSize() > 1
370+
&& TCoDataSink::Match(node->Child(1))) {
371+
auto dataSinkName = node->Child(1)->Child(0)->Content();
372+
auto dataSink = typesCtx.DataSinkMap.FindPtr(dataSinkName);
373+
YQL_ENSURE(dataSink);
374+
if (auto dqIntegration = (*dataSink)->GetDqIntegration()) {
375+
return dqIntegration->RecaptureWrite(node, ctx);
376+
}
367377
}
368378

369379
return node;

ydb/library/yql/dq/opt/dq_opt_log.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx);
5959

6060
NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);
6161

62-
IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config);
62+
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config);
6363

6464
NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);
6565

ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ TMaybe<TOptimizerStatistics> TDqIntegrationBase::ReadStatistics(const TExprNode:
3232
return Nothing();
3333
}
3434

35+
TExprNode::TPtr TDqIntegrationBase::RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) {
36+
Y_UNUSED(ctx);
37+
return write;
38+
}
39+
3540
TMaybe<bool> TDqIntegrationBase::CanWrite(const TExprNode&, TExprContext&) {
3641
return Nothing();
3742
}

ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class TDqIntegrationBase: public IDqIntegration {
1313
TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override;
1414
TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override;
1515
TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override;
16+
TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
1617
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;
1718
TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override;
1819
bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override;

ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase {
9797

9898
State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default);
9999

100-
IGraphTransformer::TStatus status = NDq::DqWrapRead(input, output, ctx, *State_->TypeCtx, *State_->Settings);
100+
IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, *State_->Settings);
101101
if (input != output) {
102102
YQL_CLOG(INFO, ProviderDq) << "DqsRecapture";
103103
// TODO: Add before/after recapture transformers

0 commit comments

Comments
 (0)