Skip to content

Commit 6a1069f

Browse files
authored
[dq] Peephole integration + some transform helpers (YQL-17386) (#572)
* [dq] Peephole integration + some transform helpers * Integrate providers ph into dq exec * Change interface
1 parent 8a076ec commit 6a1069f

File tree

5 files changed

+102
-23
lines changed

5 files changed

+102
-23
lines changed

ydb/library/yql/core/yql_graph_transformer.h

+36-3
Original file line numberDiff line numberDiff line change
@@ -264,30 +264,63 @@ class TNullTransformer final: public TSyncTransformerBase {
264264
};
265265

266266
template <typename TFunctor>
267-
class TFunctorTransformer final: public TSyncTransformerBase {
267+
class TFunctorTransformer: public TSyncTransformerBase {
268268
public:
269269
TFunctorTransformer(TFunctor functor)
270270
: Functor_(std::move(functor)) {}
271271

272-
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
272+
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
273273
TStatus status = Functor_(input, output, ctx);
274274
YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Async);
275275

276276
return status;
277277
}
278278

279-
void Rewind() final {
279+
void Rewind() override {
280280
}
281281

282282
private:
283283
TFunctor Functor_;
284284
};
285285

286+
template <typename TFunctor>
287+
class TSinglePassFunctorTransformer final: public TFunctorTransformer<TFunctor> {
288+
using TBase = TFunctorTransformer<TFunctor>;
289+
public:
290+
TSinglePassFunctorTransformer(TFunctor functor)
291+
: TFunctorTransformer<TFunctor>(std::move(functor))
292+
{}
293+
294+
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
295+
if (Pass_) {
296+
output = input;
297+
return IGraphTransformer::TStatus::Ok;
298+
}
299+
IGraphTransformer::TStatus status = TBase::DoTransform(input, output, ctx);
300+
if (IGraphTransformer::TStatus::Ok == status.Level) {
301+
Pass_ = true;
302+
}
303+
return status;
304+
}
305+
306+
void Rewind() final {
307+
Pass_ = false;
308+
}
309+
310+
private:
311+
bool Pass_ = false;
312+
};
313+
286314
template <typename TFunctor>
287315
THolder<IGraphTransformer> CreateFunctorTransformer(TFunctor functor) {
288316
return MakeHolder<TFunctorTransformer<TFunctor>>(std::move(functor));
289317
}
290318

319+
template <typename TFunctor>
320+
THolder<IGraphTransformer> CreateSinglePassFunctorTransformer(TFunctor functor) {
321+
return MakeHolder<TSinglePassFunctorTransformer<TFunctor>>(std::move(functor));
322+
}
323+
291324
typedef std::function<IGraphTransformer::TStatus(const TExprNode::TPtr&, TExprNode::TPtr&, TExprContext&)> TAsyncTransformCallback;
292325
typedef NThreading::TFuture<TAsyncTransformCallback> TAsyncTransformCallbackFuture;
293326

ydb/library/yql/dq/integration/yql_dq_integration.h

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TJsonValue;
2222
namespace NYql {
2323

2424
struct TDqSettings;
25+
class TTransformationPipeline;
2526

2627
namespace NCommon {
2728
class TMkqlCallableCompilerBase;
@@ -72,6 +73,8 @@ class IDqIntegration {
7273
// Return true if node was handled
7374
virtual bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
7475
virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
76+
// Called to configure DQ peephole
77+
virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0;
7578
};
7679

7780
} // namespace NYql

ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,7 @@ bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<T
9292
return false;
9393
}
9494

95+
void TDqIntegrationBase::ConfigurePeepholePipeline(bool, const THashMap<TString, TString>&, TTransformationPipeline*) {
96+
}
97+
9598
} // namespace NYql

ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class TDqIntegrationBase: public IDqIntegration {
2626
void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) override;
2727
bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
2828
bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
29+
void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) override;
30+
2931
protected:
3032
bool CanBlockReadTypes(const TStructExprType* node);
3133
};

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

+58-20
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,33 @@ struct TPublicIds {
224224

225225
struct TDqsPipelineConfigurator : public IPipelineConfigurator {
226226
public:
227-
TDqsPipelineConfigurator(const TDqStatePtr& state)
227+
TDqsPipelineConfigurator(const TDqStatePtr& state, const THashMap<TString, TString>& providerParams)
228228
: State_(state)
229+
, ProviderParams_(providerParams)
229230
{
231+
for (const auto& ds: State_->TypeCtx->DataSources) {
232+
if (const auto dq = ds->GetDqIntegration()) {
233+
UniqIntegrations_.emplace(dq);
234+
}
235+
}
236+
for (const auto& ds: State_->TypeCtx->DataSinks) {
237+
if (const auto dq = ds->GetDqIntegration()) {
238+
UniqIntegrations_.emplace(dq);
239+
}
240+
}
230241
}
231242
private:
232243
void AfterCreate(TTransformationPipeline*) const final {}
233244

234245
void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
246+
// First truncate graph by calculated precomputes
235247
pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes");
248+
249+
// Then apply provider specific transformers on truncated graph
250+
std::for_each(UniqIntegrations_.cbegin(), UniqIntegrations_.cend(), [&](const auto dqInt) {
251+
dqInt->ConfigurePeepholePipeline(true, ProviderParams_, pipeline);
252+
});
253+
236254
if (State_->Settings->UseBlockReader.Get().GetOrElse(false)) {
237255
pipeline->Add(NDqs::CreateDqsRewritePhyBlockReadOnDqIntegrationTransformer(*pipeline->GetTypeAnnotationContext()), "ReplaceWideReadsWithBlock");
238256
}
@@ -255,10 +273,16 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator {
255273
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
256274
}
257275

258-
void AfterOptimize(TTransformationPipeline*) const final {}
276+
void AfterOptimize(TTransformationPipeline* pipeline) const final {
277+
std::for_each(UniqIntegrations_.cbegin(), UniqIntegrations_.cend(), [&](const auto dqInt) {
278+
dqInt->ConfigurePeepholePipeline(false, ProviderParams_, pipeline);
279+
});
280+
}
259281

260282
private:
261283
TDqStatePtr State_;
284+
THashMap<TString, TString> ProviderParams_;
285+
std::unordered_set<IDqIntegration*> UniqIntegrations_;
262286
};
263287

264288
TExprNode::TPtr DqMarkBlockStage(const TDqPhyStage& stage, TExprContext& ctx) {
@@ -792,9 +816,16 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
792816
try {
793817
auto result = TMaybeNode<TResult>(input).Cast();
794818

819+
THashMap<TString, TString> resSettings;
820+
for (auto s: result.Settings()) {
821+
if (auto val = s.Value().Maybe<TCoAtom>()) {
822+
resSettings.emplace(s.Name().Value(), val.Cast().Value());
823+
}
824+
}
825+
795826
auto precomputes = FindIndependentPrecomputes(result.Input().Ptr());
796827
if (!precomputes.empty()) {
797-
auto status = HandlePrecomputes(precomputes, ctx);
828+
auto status = HandlePrecomputes(precomputes, ctx, resSettings);
798829
if (status.Level != TStatus::Ok) {
799830
if (status == TStatus::Async) {
800831
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
@@ -813,10 +844,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
813844
settings->_AllResultsBytesLimit = 64_MB;
814845
}
815846

847+
int level;
848+
TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx);
849+
{
850+
auto block = MeasureBlock("PeepHole");
851+
if (const auto status = PeepHole(resInput, resInput, ctx, resSettings); status.Level != TStatus::Ok) {
852+
return SyncStatus(status);
853+
}
854+
}
855+
816856
THashMap<TString, TString> secureParams;
817-
NCommon::FillSecureParams(result.Input().Ptr(), *State->TypeCtx, secureParams);
857+
NCommon::FillSecureParams(resInput, *State->TypeCtx, secureParams);
818858

819-
auto graphParams = GatherGraphParams(result.Input().Ptr());
859+
auto graphParams = GatherGraphParams(resInput);
820860
bool hasGraphParams = !graphParams.empty();
821861

822862
TString type;
@@ -847,15 +887,6 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
847887
settings->EnableFullResultWrite = enableFullResultWrite;
848888
}
849889

850-
int level;
851-
TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx);
852-
{
853-
auto block = MeasureBlock("PeepHole");
854-
if (const auto status = PeepHole(resInput, resInput, ctx); status.Level != TStatus::Ok) {
855-
return SyncStatus(status);
856-
}
857-
}
858-
859890
TString lambda;
860891
bool untrustedUdfFlag;
861892
TUploadList uploadList;
@@ -1111,6 +1142,13 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
11111142
TInstant startTime = TInstant::Now();
11121143
auto pull = TPull(input);
11131144

1145+
THashMap<TString, TString> pullSettings;
1146+
for (auto s: pull.Settings()) {
1147+
if (auto val = s.Value().Maybe<TCoAtom>()) {
1148+
pullSettings.emplace(s.Name().Value(), val.Cast().Value());
1149+
}
1150+
}
1151+
11141152
YQL_ENSURE(!TMaybeNode<TDqQuery>(pull.Input().Ptr()) || State->Settings->EnableComputeActor.Get().GetOrElse(false),
11151153
"DqQuery is not supported with worker actor");
11161154

@@ -1120,7 +1158,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
11201158

11211159
auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr());
11221160
if (!precomputes.empty()) {
1123-
auto status = HandlePrecomputes(precomputes, ctx);
1161+
auto status = HandlePrecomputes(precomputes, ctx, pullSettings);
11241162
if (status.Level != TStatus::Ok) {
11251163
if (status == TStatus::Async) {
11261164
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
@@ -1145,7 +1183,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
11451183
optimizedInput->SetTypeAnn(pull.Input().Ref().GetTypeAnn());
11461184
optimizedInput->CopyConstraints(pull.Input().Ref());
11471185

1148-
auto status = PeepHole(optimizedInput, optimizedInput, ctx);
1186+
auto status = PeepHole(optimizedInput, optimizedInput, ctx, pullSettings);
11491187
if (status.Level != TStatus::Ok) {
11501188
return SyncStatus(status);
11511189
}
@@ -1593,7 +1631,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
15931631
});
15941632
}
15951633

1596-
IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx) {
1634+
IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap<TString, TString>& providerParams) {
15971635

15981636
IDataProvider::TFillSettings fillSettings;
15991637
fillSettings.AllResultsBytesLimit.Clear();
@@ -1620,7 +1658,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
16201658

16211659
auto optimizedInput = input;
16221660
optimizedInput->SetState(TExprNode::EState::ConstrComplete);
1623-
auto status = PeepHole(optimizedInput, optimizedInput, ctx);
1661+
auto status = PeepHole(optimizedInput, optimizedInput, ctx, providerParams);
16241662
if (status.Level != TStatus::Ok) {
16251663
return combinedStatus.Combine(status);
16261664
}
@@ -1840,8 +1878,8 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
18401878
return combinedStatus;
18411879
}
18421880

1843-
IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const {
1844-
TDqsPipelineConfigurator peepholeConfig(State);
1881+
IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx, const THashMap<TString, TString>& providerParams) const {
1882+
TDqsPipelineConfigurator peepholeConfig(State, providerParams);
18451883
TDqsFinalPipelineConfigurator finalPeepholeConfg;
18461884
TPeepholeSettings peepholeSettings;
18471885
peepholeSettings.CommonConfig = &peepholeConfig;

0 commit comments

Comments
 (0)