Skip to content

YQL-18053: Add block implementation for AsStruct callable #3485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5570,7 +5570,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
std::string_view arrowFunctionName;
const bool rewriteAsIs = node->IsCallable({"AssumeStrict", "AssumeNonStrict", "Likely"});
if (node->IsList() || rewriteAsIs ||
node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "Exists", "If", "Just", "Member", "Nth", "ToPg", "FromPg", "PgResolvedCall", "PgResolvedOp"}))
node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "Exists", "If", "Just", "AsStruct", "Member", "Nth", "ToPg", "FromPg", "PgResolvedCall", "PgResolvedOp"}))
{
if (node->IsCallable() && !IsSupportedAsBlockType(node->Pos(), *node->GetTypeAnn(), ctx, types)) {
return true;
Expand Down Expand Up @@ -5609,6 +5609,29 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
}
}

// <AsStruct> arguments (i.e. members of the resulting structure)
// are literal tuples, that don't propagate their child rewrites.
// Hence, process these rewrites the following way: wrap the
// complete expressions, supported by the block engine, with
// <AsScalar> callable or apply the rewrite of one is found.
// Otherwise, abort this <AsStruct> rewrite, since one of its
// arguments is neither block nor scalar.
if (node->IsCallable("AsStruct")) {
for (ui32 index = 0; index < node->ChildrenSize(); index++) {
auto member = funcArgs[index];
auto child = member->TailPtr();
TExprNodePtr rewrite;
if (child->IsComplete() && IsSupportedAsBlockType(child->Pos(), *child->GetTypeAnn(), ctx, types)) {
rewrite = ctx.NewCallable(child->Pos(), "AsScalar", { child });
} else if (auto rit = rewrites.find(child.Get()); rit != rewrites.end()) {
rewrite = rit->second;
} else {
return true;
}
funcArgs[index] = ctx.NewList(member->Pos(), {member->HeadPtr(), rewrite});
}
}

const TString blockFuncName = rewriteAsIs ? ToString(node->Content()) :
(TString("Block") + (node->IsList() ? "AsTuple" : node->Content()));
if (node->IsCallable({"And", "Or", "Xor"}) && funcArgs.size() > 2) {
Expand Down
51 changes: 51 additions & 0 deletions ydb/library/yql/core/type_ann/type_ann_blocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,57 @@ IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprN
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus BlockAsStructWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

TVector<const TItemExprType*> members;
bool onlyScalars = true;
for (auto& child : input->Children()) {
auto nameNode = child->Child(0);
if (!EnsureAtom(*nameNode, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
auto valueNode = child->Child(1);
if (!EnsureBlockOrScalarType(*valueNode, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

bool isScalar;
const TTypeAnnotationNode* blockItemType = GetBlockItemType(*valueNode->GetTypeAnn(), isScalar);

onlyScalars = onlyScalars && isScalar;
members.push_back(ctx.Expr.MakeType<TItemExprType>(nameNode->Content(), blockItemType));
}

auto structType = ctx.Expr.MakeType<TStructExprType>(members);
if (!structType->Validate(input->Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

auto less = [](const TExprNode::TPtr& left, const TExprNode::TPtr& right) {
return left->Head().Content() < right->Head().Content();
};

if (!IsSorted(input->Children().begin(), input->Children().end(), less)) {
auto list = input->ChildrenList();
Sort(list.begin(), list.end(), less);
output = ctx.Expr.ChangeChildren(*input, std::move(list));
return IGraphTransformer::TStatus::Repeat;
}

const TTypeAnnotationNode* resultType;
if (onlyScalars) {
resultType = ctx.Expr.MakeType<TScalarExprType>(structType);
} else {
resultType = ctx.Expr.MakeType<TBlockExprType>(structType);
}
input->SetTypeAnn(resultType);
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus BlockAsTupleWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_blocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus BlockLogicalWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockIfWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockAsStructWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockAsTupleWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockNthWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockMemberWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12286,6 +12286,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["BlockNot"] = &BlockLogicalWrapper;
Functions["BlockIf"] = &BlockIfWrapper;
Functions["BlockJust"] = &BlockJustWrapper;
Functions["BlockAsStruct"] = &BlockAsStructWrapper;
Functions["BlockAsTuple"] = &BlockAsTupleWrapper;
Functions["BlockMember"] = &BlockMemberWrapper;
Functions["BlockNth"] = &BlockNthWrapper;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/yql_expr_constraint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
Functions["Limit"] = &TCallableConstraintTransformer::TakeWrap;
Functions["Member"] = &TCallableConstraintTransformer::MemberWrap;
Functions["AsStruct"] = &TCallableConstraintTransformer::AsStructWrap;
Functions["BlockAsStruct"] = &TCallableConstraintTransformer::AsStructWrap;
Functions["Just"] = &TCallableConstraintTransformer::FromFirst<TPassthroughConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfSortedConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode, TMultiConstraintNode>;
Functions["Unwrap"] = &TCallableConstraintTransformer::FromFirst<TPassthroughConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfSortedConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode, TMultiConstraintNode>;
Functions["Ensure"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "mkql_block_tuple.h"
#include "mkql_block_container.h"

#include <ydb/library/yql/minikql/computation/mkql_block_impl.h>

Expand All @@ -15,9 +15,9 @@ namespace NMiniKQL {

namespace {

class TBlockAsTupleExec {
class TBlockAsContainerExec {
public:
TBlockAsTupleExec(const TVector<TType*>& argTypes, const std::shared_ptr<arrow::DataType>& returnArrowType)
TBlockAsContainerExec(const TVector<TType*>& argTypes, const std::shared_ptr<arrow::DataType>& returnArrowType)
: ArgTypes(argTypes)
, ReturnArrowType(returnArrowType)
{}
Expand Down Expand Up @@ -66,10 +66,10 @@ class TBlockAsTupleExec {
const std::shared_ptr<arrow::DataType> ReturnArrowType;
};

std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockAsTupleKernel(const TVector<TType*>& argTypes, TType* resultType) {
std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockAsContainerKernel(const TVector<TType*>& argTypes, TType* resultType) {
std::shared_ptr<arrow::DataType> returnArrowType;
MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
auto exec = std::make_shared<TBlockAsTupleExec>(argTypes, returnArrowType);
auto exec = std::make_shared<TBlockAsContainerExec>(argTypes, returnArrowType);
auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
[exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
return exec->Exec(ctx, batch, res);
Expand All @@ -81,17 +81,17 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockAsTupleKernel(const TVect

} // namespace

IComputationNode* WrapBlockAsTuple(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
IComputationNode* WrapBlockAsContainer(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
TComputationNodePtrVector argsNodes;
TVector<TType*> argsTypes;
for (ui32 i = 0; i < callable.GetInputsCount(); ++i) {
argsNodes.push_back(LocateNode(ctx.NodeLocator, callable, i));
argsTypes.push_back(callable.GetInput(i).GetStaticType());
}

auto kernel = MakeBlockAsTupleKernel(argsTypes, callable.GetType()->GetReturnType());
auto kernel = MakeBlockAsContainerKernel(argsTypes, callable.GetType()->GetReturnType());
return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel);
}

}
}
} // namespace NMiniKQL
} // namespace NKikimr
10 changes: 10 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_container.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>

namespace NKikimr {
namespace NMiniKQL {

IComputationNode* WrapBlockAsContainer(TCallable& callable, const TComputationNodeFactoryContext& ctx);

} // namespace NMiniKQL
} // namespace NKikimr
10 changes: 0 additions & 10 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.h

This file was deleted.

5 changes: 3 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "mkql_blocks.h"
#include "mkql_block_agg.h"
#include "mkql_block_coalesce.h"
#include "mkql_block_container.h"
#include "mkql_block_exists.h"
#include "mkql_block_getelem.h"
#include "mkql_block_if.h"
Expand All @@ -16,7 +17,6 @@
#include "mkql_block_compress.h"
#include "mkql_block_skiptake.h"
#include "mkql_block_top.h"
#include "mkql_block_tuple.h"
#include "mkql_callable.h"
#include "mkql_chain_map.h"
#include "mkql_chain1_map.h"
Expand Down Expand Up @@ -297,7 +297,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"BlockNot", &WrapBlockNot},
{"BlockJust", &WrapBlockJust},
{"BlockCompress", &WrapBlockCompress},
{"BlockAsTuple", &WrapBlockAsTuple},
{"BlockAsTuple", &WrapBlockAsContainer},
{"BlockAsStruct", &WrapBlockAsContainer},
{"BlockMember", &WrapBlockMember},
{"BlockNth", &WrapBlockNth},
{"BlockExpandChunked", &WrapBlockExpandChunked},
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/minikql/comp_nodes/ya.make.inc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ SET(ORIG_SOURCES
mkql_block_agg_some.cpp
mkql_block_agg_sum.cpp
mkql_block_coalesce.cpp
mkql_block_container.cpp
mkql_block_exists.cpp
mkql_block_getelem.cpp
mkql_block_if.cpp
Expand All @@ -24,7 +25,6 @@ SET(ORIG_SOURCES
mkql_block_func.cpp
mkql_block_skiptake.cpp
mkql_block_top.cpp
mkql_block_tuple.cpp
mkql_blocks.cpp
mkql_callable.cpp
mkql_chain_map.cpp
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/minikql/mkql_program_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,28 @@ TRuntimeNode TProgramBuilder::BlockNth(TRuntimeNode tuple, ui32 index) {
return TRuntimeNode(callableBuilder.Build(), false);
}

TRuntimeNode TProgramBuilder::BlockAsStruct(const TArrayRef<std::pair<std::string_view, TRuntimeNode>>& args) {
MKQL_ENSURE(!args.empty(), "Expected at least one argument");

TBlockType::EShape resultShape = TBlockType::EShape::Scalar;
TVector<std::pair<std::string_view, TType*>> members;
for (const auto& x : args) {
auto blockType = AS_TYPE(TBlockType, x.second.GetStaticType());
members.emplace_back(x.first, blockType->GetItemType());
if (blockType->GetShape() == TBlockType::EShape::Many) {
resultShape = TBlockType::EShape::Many;
}
}

auto returnType = NewBlockType(NewStructType(members), resultShape);
TCallableBuilder callableBuilder(Env, __func__, returnType);
for (const auto& x : args) {
callableBuilder.Add(x.second);
}

return TRuntimeNode(callableBuilder.Build(), false);
}

TRuntimeNode TProgramBuilder::BlockAsTuple(const TArrayRef<const TRuntimeNode>& args) {
MKQL_ENSURE(!args.empty(), "Expected at least one argument");

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/mkql_program_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class TProgramBuilder : public TTypeBuilder {
TRuntimeNode BlockExists(TRuntimeNode data);
TRuntimeNode BlockMember(TRuntimeNode structure, const std::string_view& memberName);
TRuntimeNode BlockNth(TRuntimeNode tuple, ui32 index);
TRuntimeNode BlockAsStruct(const TArrayRef<std::pair<std::string_view, TRuntimeNode>>& args);
TRuntimeNode BlockAsTuple(const TArrayRef<const TRuntimeNode>& args);
TRuntimeNode BlockToPg(TRuntimeNode input, TType* returnType);
TRuntimeNode BlockFromPg(TRuntimeNode input, TType* returnType);
Expand Down
8 changes: 8 additions & 0 deletions ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2737,6 +2737,14 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.BlockNth(tupleObj, index);
});

AddCallable("BlockAsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
std::vector<std::pair<std::string_view, TRuntimeNode>> members;
for (const auto& x : node.Children()) {
members.emplace_back(x->Head().Content(), MkqlBuildExpr(x->Tail(), ctx));
}
return ctx.ProgramBuilder.BlockAsStruct(members);
});

AddCallable("BlockAsTuple", [](const TExprNode& node, TMkqlBuildContext& ctx) {
TVector<TRuntimeNode> args;
for (const auto& x : node.Children()) {
Expand Down
24 changes: 12 additions & 12 deletions ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -535,23 +535,23 @@
"test.test[blocks-filter_direct_col--Results]": [],
"test.test[blocks-member--Analyze]": [
{
"checksum": "b08274fd137c1878d90520c832f06fd3",
"size": 3676,
"uri": "https://{canondata_backend}/1889210/75a1d72834c0a9de8b328ec130be934f6cc6cea0/resource.tar.gz#test.test_blocks-member--Analyze_/plan.txt"
"checksum": "4d80733bb5655340645b981057ba9910",
"size": 3703,
"uri": "https://{canondata_backend}/1942525/5635585a917e888e1628404d5ff137a2e18f23ca/resource.tar.gz#test.test_blocks-member--Analyze_/plan.txt"
}
],
"test.test[blocks-member--Debug]": [
{
"checksum": "a30b76ba380ee4f694dc731aa2771fed",
"size": 1467,
"uri": "https://{canondata_backend}/1937027/96028d31f8e29253c9276a86f02284e2a71add76/resource.tar.gz#test.test_blocks-member--Debug_/opt.yql_patched"
"checksum": "be5b35d7624905acc850940a1ff74780",
"size": 1837,
"uri": "https://{canondata_backend}/1775319/6624c18402d2e5473f3dcf5d9248a5e624496fd5/resource.tar.gz#test.test_blocks-member--Debug_/opt.yql_patched"
}
],
"test.test[blocks-member--Plan]": [
{
"checksum": "b08274fd137c1878d90520c832f06fd3",
"size": 3676,
"uri": "https://{canondata_backend}/1889210/75a1d72834c0a9de8b328ec130be934f6cc6cea0/resource.tar.gz#test.test_blocks-member--Plan_/plan.txt"
"checksum": "4d80733bb5655340645b981057ba9910",
"size": 3703,
"uri": "https://{canondata_backend}/1942525/5635585a917e888e1628404d5ff137a2e18f23ca/resource.tar.gz#test.test_blocks-member--Plan_/plan.txt"
}
],
"test.test[blocks-member--Results]": [],
Expand Down Expand Up @@ -586,9 +586,9 @@
],
"test.test[blocks-sort_two_mix--Debug]": [
{
"checksum": "60076d20175ed4eb32b22f7be43cb490",
"size": 1915,
"uri": "https://{canondata_backend}/1936947/a99026e839b7e22714c2a9a81971a3b5e3ed1eb4/resource.tar.gz#test.test_blocks-sort_two_mix--Debug_/opt.yql_patched"
"checksum": "8324668b9f66ec5f9fc3e70217a057e9",
"size": 2006,
"uri": "https://{canondata_backend}/1937429/5efa179cb9a9173602a23e7c0e313970073e2969/resource.tar.gz#test.test_blocks-sort_two_mix--Debug_/opt.yql_patched"
}
],
"test.test[blocks-sort_two_mix--Plan]": [
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/tests/sql/dq_file/part13/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,9 @@
],
"test.test[blocks-sort_one_desc--Debug]": [
{
"checksum": "3ff65a165f6fd32950d8b56ba98d95b4",
"size": 1767,
"uri": "https://{canondata_backend}/1936997/93899b3de50fae3f9677baacc98094a7a629590a/resource.tar.gz#test.test_blocks-sort_one_desc--Debug_/opt.yql_patched"
"checksum": "7d4eab41033eb90eb99af4870531b0d6",
"size": 1827,
"uri": "https://{canondata_backend}/1937429/1a0fd6a532256a80615a0e2f24e1f1ec999cb7ef/resource.tar.gz#test.test_blocks-sort_one_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-sort_one_desc--Plan]": [
Expand All @@ -552,9 +552,9 @@
],
"test.test[blocks-top_sort_one_desc--Debug]": [
{
"checksum": "be5f2a8fbecfbe65f97c1ba40a556497",
"size": 1806,
"uri": "https://{canondata_backend}/1936997/93899b3de50fae3f9677baacc98094a7a629590a/resource.tar.gz#test.test_blocks-top_sort_one_desc--Debug_/opt.yql_patched"
"checksum": "c8bfd2fd80dc1bf818f5f61d99ff8ba9",
"size": 1866,
"uri": "https://{canondata_backend}/1937429/1a0fd6a532256a80615a0e2f24e1f1ec999cb7ef/resource.tar.gz#test.test_blocks-top_sort_one_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-top_sort_one_desc--Plan]": [
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -680,9 +680,9 @@
],
"test.test[blocks-sort_two_desc--Debug]": [
{
"checksum": "46f3a87a89918a4c8e91293f82f6f639",
"size": 1923,
"uri": "https://{canondata_backend}/1600758/aad142702907f13e911494c1a7b312bad34f692a/resource.tar.gz#test.test_blocks-sort_two_desc--Debug_/opt.yql_patched"
"checksum": "fdc217d5f0a13a7f060e53c0b26bf955",
"size": 2007,
"uri": "https://{canondata_backend}/1936273/dc087a913c2a638b764a20e1066eb33c1b05f57b/resource.tar.gz#test.test_blocks-sort_two_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-sort_two_desc--Plan]": [
Expand All @@ -702,9 +702,9 @@
],
"test.test[blocks-top_sort_two_desc--Debug]": [
{
"checksum": "0e61c46d05ae7985462f918f60bab5d5",
"size": 1962,
"uri": "https://{canondata_backend}/1600758/aad142702907f13e911494c1a7b312bad34f692a/resource.tar.gz#test.test_blocks-top_sort_two_desc--Debug_/opt.yql_patched"
"checksum": "00c63032e96f06a8b468d46263b7087c",
"size": 2046,
"uri": "https://{canondata_backend}/1936273/dc087a913c2a638b764a20e1066eb33c1b05f57b/resource.tar.gz#test.test_blocks-top_sort_two_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-top_sort_two_desc--Plan]": [
Expand Down
Loading
Loading