Skip to content

Commit 24479f4

Browse files
authored
Moved optimizers from providers/dq to dq (#1758)
1 parent 46c8dba commit 24479f4

13 files changed

+420
-359
lines changed

ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json

+26
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,32 @@
257257
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
258258
{"Index": 1, "Name": "Name", "Type": "TCoAtom"}
259259
]
260+
},
261+
{
262+
"Name": "TDqReadWrapBase",
263+
"Base": "TExprBase",
264+
"Match": {"Type": "CallableBase"},
265+
"Builder": {"Generate": "None"},
266+
"Children": [
267+
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
268+
{"Index": 1, "Name": "Flags", "Type": "TCoAtomList"},
269+
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
270+
]
271+
},
272+
{
273+
"Name": "TDqReadWrap",
274+
"Base": "TDqReadWrapBase",
275+
"Match": {"Type": "Callable", "Name": "DqReadWrap"}
276+
},
277+
{
278+
"Name": "TDqReadWideWrap",
279+
"Base": "TDqReadWrapBase",
280+
"Match": {"Type": "Callable", "Name": "DqReadWideWrap"}
281+
},
282+
{
283+
"Name": "TDqReadBlockWideWrap",
284+
"Base": "TDqReadWrapBase",
285+
"Match": {"Type": "Callable", "Name": "DqReadBlockWideWrap"}
260286
}
261287
]
262288
}

ydb/library/yql/dq/opt/dq_opt_log.cpp

+318
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/library/yql/core/yql_opt_utils.h>
1111
#include <ydb/library/yql/core/yql_type_annotation.h>
1212
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
13+
#include <ydb/library/yql/dq/integration/yql_dq_optimization.h>
1314

1415
using namespace NYql::NNodes;
1516

@@ -374,4 +375,321 @@ TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx, TTypeAnnotat
374375
return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx, typeAnnCtx));
375376
}
376377

378+
IDqOptimization* GetDqOptCallback(const TExprBase& providerRead, TTypeAnnotationContext& typeAnnCtx) {
379+
if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) {
380+
auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content();
381+
auto datasource = typeAnnCtx.DataSourceMap.FindPtr(dataSourceName);
382+
YQL_ENSURE(datasource);
383+
return (*datasource)->GetDqOptimization();
384+
}
385+
return nullptr;
386+
}
387+
388+
TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx) {
389+
const auto unordered = node.Cast<TCoUnorderedBase>();
390+
if (const auto maybeRead = unordered.Input().Maybe<TDqReadWrapBase>().Input()) {
391+
if (enableDqReplicate) {
392+
const TParentsMap* parentsMap = getParents();
393+
auto parentsIt = parentsMap->find(unordered.Input().Raw());
394+
YQL_ENSURE(parentsIt != parentsMap->cend());
395+
if (parentsIt->second.size() > 1) {
396+
return node;
397+
}
398+
}
399+
auto providerRead = maybeRead.Cast();
400+
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
401+
auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx);
402+
if (!updatedRead) {
403+
return {};
404+
}
405+
if (updatedRead != providerRead.Ptr()) {
406+
return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
407+
}
408+
}
409+
}
410+
411+
return node;
412+
}
413+
414+
TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx) {
415+
auto extract = node.Cast<TCoExtractMembers>();
416+
if (const auto maybeRead = extract.Input().Maybe<TDqReadWrap>().Input()) {
417+
if (enableDqReplicate) {
418+
const TParentsMap* parentsMap = getParents();
419+
auto parentsIt = parentsMap->find(extract.Input().Raw());
420+
YQL_ENSURE(parentsIt != parentsMap->cend());
421+
if (parentsIt->second.size() > 1) {
422+
return node;
423+
}
424+
}
425+
auto providerRead = maybeRead.Cast();
426+
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
427+
auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx);
428+
if (!updatedRead) {
429+
return {};
430+
}
431+
if (updatedRead != providerRead.Ptr()) {
432+
return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrap::idx_Input, std::move(updatedRead)));
433+
}
434+
}
435+
}
436+
437+
return node;
438+
}
439+
440+
TMaybeNode<TExprBase> TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
441+
auto countBase = node.Cast<TCoCountBase>();
442+
443+
// TODO: support via precomputes
444+
if (!TCoIntegralCtor::Match(countBase.Count().Raw())) {
445+
return node;
446+
}
447+
448+
if (const auto maybeRead = countBase.Input().Maybe<TDqReadWrapBase>().Input()) {
449+
auto providerRead = maybeRead.Cast();
450+
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
451+
auto updatedRead = dqOpt->ApplyTakeOrSkip(providerRead.Ptr(), countBase.Ptr(), ctx);
452+
if (!updatedRead) {
453+
return {};
454+
}
455+
if (updatedRead != providerRead.Ptr()) {
456+
return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
457+
}
458+
}
459+
}
460+
461+
return node;
462+
}
463+
464+
TMaybeNode<TExprBase> ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
465+
auto extend = node.Cast<TCoExtendBase>();
466+
const bool ordered = node.Maybe<TCoOrderedExtend>().IsValid();
467+
const TExprNode* flags = nullptr;
468+
const TExprNode* token = nullptr;
469+
bool first = true;
470+
std::unordered_map<IDqOptimization*, std::vector<std::pair<size_t, TExprNode::TPtr>>> readers;
471+
IDqOptimization* prevDqOpt = nullptr;
472+
for (size_t i = 0; i < extend.ArgCount(); ++i) {
473+
const auto child = extend.Arg(i);
474+
if (!TDqReadWrapBase::Match(child.Raw())) {
475+
prevDqOpt = nullptr;
476+
continue;
477+
}
478+
auto dqReadWrap = child.Cast<TDqReadWrapBase>();
479+
480+
if (first) {
481+
flags = dqReadWrap.Flags().Raw();
482+
token = dqReadWrap.Token().Raw();
483+
first = false;
484+
} else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw()) {
485+
prevDqOpt = nullptr;
486+
continue;
487+
}
488+
IDqOptimization* dqOpt = GetDqOptCallback(dqReadWrap.Input(), typeAnnCtx);
489+
if (!dqOpt) {
490+
prevDqOpt = nullptr;
491+
continue;
492+
}
493+
if (ordered && prevDqOpt != dqOpt) {
494+
readers[dqOpt].assign(1, std::make_pair(i, dqReadWrap.Input().Ptr()));
495+
} else {
496+
readers[dqOpt].emplace_back(i, dqReadWrap.Input().Ptr());
497+
}
498+
prevDqOpt = dqOpt;
499+
}
500+
501+
if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) {
502+
return node;
503+
}
504+
505+
TExprNode::TListType newChildren = extend.Ref().ChildrenList();
506+
for (auto& [dqOpt, list]: readers) {
507+
if (list.size() > 1) {
508+
TExprNode::TListType inReaders;
509+
std::transform(list.begin(), list.end(), std::back_inserter(inReaders), [](const auto& item) { return item.second; });
510+
TExprNode::TListType outReaders = dqOpt->ApplyExtend(inReaders, ordered, ctx);
511+
if (outReaders.empty()) {
512+
return {};
513+
}
514+
if (inReaders == outReaders) {
515+
return node;
516+
}
517+
YQL_ENSURE(outReaders.size() <= inReaders.size());
518+
size_t i = 0;
519+
for (; i < outReaders.size(); ++i) {
520+
newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i]));
521+
}
522+
for (; i < list.size(); ++i) {
523+
newChildren[list[i].first] = nullptr;
524+
}
525+
}
526+
}
527+
newChildren.erase(std::remove(newChildren.begin(), newChildren.end(), TExprNode::TPtr{}), newChildren.end());
528+
YQL_ENSURE(!newChildren.empty());
529+
if (newChildren.size() > 1) {
530+
return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(newChildren)));
531+
} else {
532+
return TExprBase(newChildren.front());
533+
}
534+
}
535+
536+
TMaybeNode<TExprBase> DqReadWideWrapFieldSubset(TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx) {
537+
auto map = node.Cast<TCoMapBase>();
538+
539+
if (const auto maybeRead = map.Input().Maybe<TDqReadWideWrap>().Input()) {
540+
const TParentsMap* parentsMap = getParents();
541+
auto parentsIt = parentsMap->find(map.Input().Raw());
542+
YQL_ENSURE(parentsIt != parentsMap->cend());
543+
if (parentsIt->second.size() > 1) {
544+
return node;
545+
}
546+
547+
TDynBitMap unusedArgs;
548+
for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) {
549+
if (auto parentsIt = parentsMap->find(map.Lambda().Args().Arg(i).Raw()); parentsIt == parentsMap->cend() || parentsIt->second.empty()) {
550+
unusedArgs.Set(i);
551+
}
552+
}
553+
if (unusedArgs.Empty()) {
554+
return node;
555+
}
556+
557+
auto providerRead = maybeRead.Cast();
558+
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
559+
560+
auto structType = GetSeqItemType(*providerRead.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]).Cast<TStructExprType>();
561+
TExprNode::TListType newMembers;
562+
for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) {
563+
if (!unusedArgs.Get(i)) {
564+
newMembers.push_back(ctx.NewAtom(providerRead.Pos(), structType->GetItems().at(i)->GetName()));
565+
}
566+
}
567+
568+
auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), ctx.NewList(providerRead.Pos(), std::move(newMembers)), ctx);
569+
if (!updatedRead) {
570+
return {};
571+
}
572+
if (updatedRead == providerRead.Ptr()) {
573+
return node;
574+
}
575+
576+
TExprNode::TListType newArgs;
577+
TNodeOnNodeOwnedMap replaces;
578+
for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) {
579+
if (!unusedArgs.Get(i)) {
580+
auto newArg = ctx.NewArgument(map.Lambda().Args().Arg(i).Pos(), map.Lambda().Args().Arg(i).Name());
581+
newArgs.push_back(newArg);
582+
replaces.emplace(map.Lambda().Args().Arg(i).Raw(), std::move(newArg));
583+
}
584+
}
585+
586+
auto newLambda = ctx.NewLambda(
587+
map.Lambda().Pos(),
588+
ctx.NewArguments(map.Lambda().Args().Pos(), std::move(newArgs)),
589+
ctx.ReplaceNodes(GetLambdaBody(map.Lambda().Ref()), replaces));
590+
591+
return Build<TCoMapBase>(ctx, map.Pos())
592+
.CallableName(map.CallableName())
593+
.Input<TDqReadWideWrap>()
594+
.InitFrom(map.Input().Cast<TDqReadWideWrap>())
595+
.Input(updatedRead)
596+
.Build()
597+
.Lambda(newLambda)
598+
.Done();
599+
}
600+
}
601+
return node;
602+
}
603+
604+
TMaybeNode<TExprBase> DqReadWrapByProvider(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
605+
auto providerRead = node.Cast<TDqReadWrapBase>().Input();
606+
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
607+
auto updatedRead = dqOpt->RewriteRead(providerRead.Ptr(), ctx);
608+
if (!updatedRead) {
609+
return {};
610+
}
611+
if (updatedRead != providerRead.Ptr()) {
612+
return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
613+
}
614+
}
615+
return node;
616+
}
617+
618+
TMaybeNode<TExprBase> ExtractMembersOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx) {
619+
auto providerRead = node.Cast<TDqReadWrap>().Input();
620+
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
621+
TNodeOnNodeOwnedMap toOptimize;
622+
TExprNode::TPtr res;
623+
bool error = false;
624+
OptimizeSubsetFieldsForNodeWithMultiUsage(node.Ptr(), *getParents(), toOptimize, ctx,
625+
[&] (const TExprNode::TPtr& input, const TExprNode::TPtr& members, const TParentsMap&, TExprContext& ctx) -> TExprNode::TPtr {
626+
auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), members, ctx);
627+
if (!updatedRead) {
628+
error = true;
629+
return {};
630+
}
631+
if (updatedRead != providerRead.Ptr()) {
632+
res = ctx.ChangeChild(node.Ref(), TDqReadWrap::idx_Input, std::move(updatedRead));
633+
return res;
634+
}
635+
636+
return input;
637+
}
638+
);
639+
if (error) {
640+
return {};
641+
}
642+
if (!toOptimize.empty()) {
643+
for (auto& [s, d]: toOptimize) {
644+
optCtx.RemapNode(*s, d);
645+
}
646+
return TExprBase(res);
647+
}
648+
}
649+
650+
return node;
651+
}
652+
653+
TMaybeNode<TExprBase> UnorderedOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx) {
654+
auto providerRead = node.Cast<TDqReadWrapBase>().Input();
655+
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
656+
auto parentsMap = getParents();
657+
auto it = parentsMap->find(node.Raw());
658+
if (it == parentsMap->cend() || it->second.size() <= 1) {
659+
return node;
660+
}
661+
662+
bool hasUnordered = false;
663+
for (auto parent: it->second) {
664+
if (TCoUnorderedBase::Match(parent)) {
665+
hasUnordered = true;
666+
} else if (!TCoAggregateBase::Match(parent) && !TCoFlatMap::Match(parent)) {
667+
return node;
668+
}
669+
}
670+
671+
if (!hasUnordered) {
672+
return node;
673+
}
674+
675+
auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx);
676+
if (!updatedRead) {
677+
return {};
678+
}
679+
if (updatedRead != providerRead.Ptr()) {
680+
auto newDqReadWrap = ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead));
681+
for (auto parent: it->second) {
682+
if (TCoUnorderedBase::Match(parent)) {
683+
optCtx.RemapNode(*parent, newDqReadWrap);
684+
} else if (TCoAggregateBase::Match(parent) || TCoFlatMap::Match(parent)) {
685+
optCtx.RemapNode(*parent, ctx.ChangeChild(*parent, 0, TExprNode::TPtr(newDqReadWrap)));
686+
}
687+
}
688+
689+
return TExprBase(newDqReadWrap);
690+
}
691+
}
692+
return node;
693+
}
694+
377695
}

ydb/library/yql/dq/opt/dq_opt_log.h

+17
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <functional>
1010

1111
namespace NYql {
12+
class IOptimizationContext;
1213
struct TTypeAnnotationContext;
1314
struct TDqSettings;
1415
struct IProviderContext;
@@ -58,4 +59,20 @@ NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& c
5859

5960
IOptimizer* MakeNativeOptimizer(const IOptimizer::TInput& input, const std::function<void(const TString&)>& log);
6061

62+
NNodes::TMaybeNode<NNodes::TExprBase> UnorderedOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx);
63+
64+
NNodes::TMaybeNode<NNodes::TExprBase> ExtractMembersOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx);
65+
66+
NNodes::TMaybeNode<NNodes::TExprBase> TakeOrSkipOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);
67+
68+
NNodes::TMaybeNode<NNodes::TExprBase> ExtendOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);
69+
70+
NNodes::TMaybeNode<NNodes::TExprBase> DqReadWideWrapFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx);
71+
72+
NNodes::TMaybeNode<NNodes::TExprBase> DqReadWrapByProvider(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);
73+
74+
NNodes::TMaybeNode<NNodes::TExprBase> ExtractMembersOverDqReadWrapMultiUsage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx);
75+
76+
NNodes::TMaybeNode<NNodes::TExprBase> UnorderedOverDqReadWrapMultiUsage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx);
77+
6178
} // namespace NYql::NDq

0 commit comments

Comments
 (0)