diff --git a/ydb/library/yql/dq/integration/yql_dq_integration.h b/ydb/library/yql/dq/integration/yql_dq_integration.h index 67f317477181..202486f65556 100644 --- a/ydb/library/yql/dq/integration/yql_dq_integration.h +++ b/ydb/library/yql/dq/integration/yql_dq_integration.h @@ -52,6 +52,7 @@ class IDqIntegration { virtual TMaybe EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector& nodes, TExprContext& ctx) = 0; virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0; virtual TMaybe ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0; + virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0; // Nothing if callable is not for writing, // false if callable is for writing and there are some errors (they are added to ctx), diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp index a0ad02b81860..ac9757c8a2f7 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp @@ -349,7 +349,7 @@ NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& c return node; } -IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) { +IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) { TOptimizeExprSettings settings{&typesCtx}; auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) { if (auto maybeRead = TMaybeNode(node).Input()) { @@ -364,6 +364,16 @@ IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::T } } } + } else if (node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::World + && !TCoCommit::Match(node.Get()) + && node->ChildrenSize() > 1 + && TCoDataSink::Match(node->Child(1))) { + auto dataSinkName = node->Child(1)->Child(0)->Content(); + auto dataSink = typesCtx.DataSinkMap.FindPtr(dataSinkName); + YQL_ENSURE(dataSink); + if (auto dqIntegration = (*dataSink)->GetDqIntegration()) { + return dqIntegration->RecaptureWrite(node, ctx); + } } return node; diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h index e3c7f618d24d..15cd87ce7ebe 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.h +++ b/ydb/library/yql/dq/opt/dq_opt_log.h @@ -59,7 +59,7 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx); NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents); -IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config); +IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config); NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx); diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 04b343055415..630181553d56 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -32,6 +32,11 @@ TMaybe TDqIntegrationBase::ReadStatistics(const TExprNode: return Nothing(); } +TExprNode::TPtr TDqIntegrationBase::RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) { + Y_UNUSED(ctx); + return write; +} + TMaybe TDqIntegrationBase::CanWrite(const TExprNode&, TExprContext&) { return Nothing(); } diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h index facb55a68f64..9372ebcbeba2 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h @@ -13,6 +13,7 @@ class TDqIntegrationBase: public IDqIntegration { TMaybe EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector& nodes, TExprContext& ctx) override; TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override; TMaybe ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override; + TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override; void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override; TMaybe CanWrite(const TExprNode& write, TExprContext& ctx) override; bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index ff73dbf048e5..f257a155e96d 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -97,7 +97,7 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase { State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default); - IGraphTransformer::TStatus status = NDq::DqWrapRead(input, output, ctx, *State_->TypeCtx, *State_->Settings); + IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, *State_->Settings); if (input != output) { YQL_CLOG(INFO, ProviderDq) << "DqsRecapture"; // TODO: Add before/after recapture transformers