Skip to content

[dq] Peephole integration + some transform helpers (YQL-17386) #572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions ydb/library/yql/core/yql_graph_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,30 +264,63 @@ class TNullTransformer final: public TSyncTransformerBase {
};

template <typename TFunctor>
class TFunctorTransformer final: public TSyncTransformerBase {
class TFunctorTransformer: public TSyncTransformerBase {
public:
TFunctorTransformer(TFunctor functor)
: Functor_(std::move(functor)) {}

TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
TStatus status = Functor_(input, output, ctx);
YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Async);

return status;
}

void Rewind() final {
void Rewind() override {
}

private:
TFunctor Functor_;
};

template <typename TFunctor>
class TSinglePassFunctorTransformer final: public TFunctorTransformer<TFunctor> {
using TBase = TFunctorTransformer<TFunctor>;
public:
TSinglePassFunctorTransformer(TFunctor functor)
: TFunctorTransformer<TFunctor>(std::move(functor))
{}

IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
if (Pass_) {
output = input;
return IGraphTransformer::TStatus::Ok;
}
IGraphTransformer::TStatus status = TBase::DoTransform(input, output, ctx);
if (IGraphTransformer::TStatus::Ok == status.Level) {
Pass_ = true;
}
return status;
}

void Rewind() final {
Pass_ = false;
}

private:
bool Pass_ = false;
};

template <typename TFunctor>
THolder<IGraphTransformer> CreateFunctorTransformer(TFunctor functor) {
return MakeHolder<TFunctorTransformer<TFunctor>>(std::move(functor));
}

template <typename TFunctor>
THolder<IGraphTransformer> CreateSinglePassFunctorTransformer(TFunctor functor) {
return MakeHolder<TSinglePassFunctorTransformer<TFunctor>>(std::move(functor));
}

typedef std::function<IGraphTransformer::TStatus(const TExprNode::TPtr&, TExprNode::TPtr&, TExprContext&)> TAsyncTransformCallback;
typedef NThreading::TFuture<TAsyncTransformCallback> TAsyncTransformCallbackFuture;

Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/dq/integration/yql_dq_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TJsonValue;
namespace NYql {

struct TDqSettings;
class TTransformationPipeline;

namespace NCommon {
class TMkqlCallableCompilerBase;
Expand Down Expand Up @@ -72,6 +73,8 @@ class IDqIntegration {
// Return true if node was handled
virtual bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
// Called to configure DQ peephole
virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0;
};

} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,7 @@ bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<T
return false;
}

void TDqIntegrationBase::ConfigurePeepholePipeline(bool, const THashMap<TString, TString>&, TTransformationPipeline*) {
}

} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class TDqIntegrationBase: public IDqIntegration {
void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) override;
bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) override;

protected:
bool CanBlockReadTypes(const TStructExprType* node);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,33 @@ struct TPublicIds {

struct TDqsPipelineConfigurator : public IPipelineConfigurator {
public:
TDqsPipelineConfigurator(const TDqStatePtr& state)
TDqsPipelineConfigurator(const TDqStatePtr& state, const THashMap<TString, TString>& providerParams)
: State_(state)
, ProviderParams_(providerParams)
{
for (const auto& ds: State_->TypeCtx->DataSources) {
if (const auto dq = ds->GetDqIntegration()) {
UniqIntegrations_.emplace(dq);
}
}
for (const auto& ds: State_->TypeCtx->DataSinks) {
if (const auto dq = ds->GetDqIntegration()) {
UniqIntegrations_.emplace(dq);
}
}
}
private:
void AfterCreate(TTransformationPipeline*) const final {}

void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
// First truncate graph by calculated precomputes
pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes");

// Then apply provider specific transformers on truncated graph
std::for_each(UniqIntegrations_.cbegin(), UniqIntegrations_.cend(), [&](const auto dqInt) {
dqInt->ConfigurePeepholePipeline(true, ProviderParams_, pipeline);
});

if (State_->Settings->UseBlockReader.Get().GetOrElse(false)) {
pipeline->Add(NDqs::CreateDqsRewritePhyBlockReadOnDqIntegrationTransformer(*pipeline->GetTypeAnnotationContext()), "ReplaceWideReadsWithBlock");
}
Expand All @@ -255,10 +273,16 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator {
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
}

void AfterOptimize(TTransformationPipeline*) const final {}
void AfterOptimize(TTransformationPipeline* pipeline) const final {
std::for_each(UniqIntegrations_.cbegin(), UniqIntegrations_.cend(), [&](const auto dqInt) {
dqInt->ConfigurePeepholePipeline(false, ProviderParams_, pipeline);
});
}

private:
TDqStatePtr State_;
THashMap<TString, TString> ProviderParams_;
std::unordered_set<IDqIntegration*> UniqIntegrations_;
};

TExprNode::TPtr DqMarkBlockStage(const TDqPhyStage& stage, TExprContext& ctx) {
Expand Down Expand Up @@ -792,9 +816,16 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
try {
auto result = TMaybeNode<TResult>(input).Cast();

THashMap<TString, TString> resSettings;
for (auto s: result.Settings()) {
if (auto val = s.Value().Maybe<TCoAtom>()) {
resSettings.emplace(s.Name().Value(), val.Cast().Value());
}
}

auto precomputes = FindIndependentPrecomputes(result.Input().Ptr());
if (!precomputes.empty()) {
auto status = HandlePrecomputes(precomputes, ctx);
auto status = HandlePrecomputes(precomputes, ctx, resSettings);
if (status.Level != TStatus::Ok) {
if (status == TStatus::Async) {
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
Expand All @@ -813,10 +844,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
settings->_AllResultsBytesLimit = 64_MB;
}

int level;
TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx);
{
auto block = MeasureBlock("PeepHole");
if (const auto status = PeepHole(resInput, resInput, ctx, resSettings); status.Level != TStatus::Ok) {
return SyncStatus(status);
}
}

THashMap<TString, TString> secureParams;
NCommon::FillSecureParams(result.Input().Ptr(), *State->TypeCtx, secureParams);
NCommon::FillSecureParams(resInput, *State->TypeCtx, secureParams);

auto graphParams = GatherGraphParams(result.Input().Ptr());
auto graphParams = GatherGraphParams(resInput);
bool hasGraphParams = !graphParams.empty();

TString type;
Expand Down Expand Up @@ -847,15 +887,6 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
settings->EnableFullResultWrite = enableFullResultWrite;
}

int level;
TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx);
{
auto block = MeasureBlock("PeepHole");
if (const auto status = PeepHole(resInput, resInput, ctx); status.Level != TStatus::Ok) {
return SyncStatus(status);
}
}

TString lambda;
bool untrustedUdfFlag;
TUploadList uploadList;
Expand Down Expand Up @@ -1111,6 +1142,13 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
TInstant startTime = TInstant::Now();
auto pull = TPull(input);

THashMap<TString, TString> pullSettings;
for (auto s: pull.Settings()) {
if (auto val = s.Value().Maybe<TCoAtom>()) {
pullSettings.emplace(s.Name().Value(), val.Cast().Value());
}
}

YQL_ENSURE(!TMaybeNode<TDqQuery>(pull.Input().Ptr()) || State->Settings->EnableComputeActor.Get().GetOrElse(false),
"DqQuery is not supported with worker actor");

Expand All @@ -1120,7 +1158,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr());
if (!precomputes.empty()) {
auto status = HandlePrecomputes(precomputes, ctx);
auto status = HandlePrecomputes(precomputes, ctx, pullSettings);
if (status.Level != TStatus::Ok) {
if (status == TStatus::Async) {
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
Expand All @@ -1145,7 +1183,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
optimizedInput->SetTypeAnn(pull.Input().Ref().GetTypeAnn());
optimizedInput->CopyConstraints(pull.Input().Ref());

auto status = PeepHole(optimizedInput, optimizedInput, ctx);
auto status = PeepHole(optimizedInput, optimizedInput, ctx, pullSettings);
if (status.Level != TStatus::Ok) {
return SyncStatus(status);
}
Expand Down Expand Up @@ -1593,7 +1631,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
});
}

IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx) {
IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap<TString, TString>& providerParams) {

IDataProvider::TFillSettings fillSettings;
fillSettings.AllResultsBytesLimit.Clear();
Expand All @@ -1620,7 +1658,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

auto optimizedInput = input;
optimizedInput->SetState(TExprNode::EState::ConstrComplete);
auto status = PeepHole(optimizedInput, optimizedInput, ctx);
auto status = PeepHole(optimizedInput, optimizedInput, ctx, providerParams);
if (status.Level != TStatus::Ok) {
return combinedStatus.Combine(status);
}
Expand Down Expand Up @@ -1840,8 +1878,8 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
return combinedStatus;
}

IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const {
TDqsPipelineConfigurator peepholeConfig(State);
IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx, const THashMap<TString, TString>& providerParams) const {
TDqsPipelineConfigurator peepholeConfig(State, providerParams);
TDqsFinalPipelineConfigurator finalPeepholeConfg;
TPeepholeSettings peepholeSettings;
peepholeSettings.CommonConfig = &peepholeConfig;
Expand Down