From fb860655b433969799088b6e0160bf4d527991c8 Mon Sep 17 00:00:00 2001 From: Anton Romanov Date: Fri, 22 Dec 2023 16:29:17 +0100 Subject: [PATCH 1/7] Pushdown coalesce as YQL kernel. [Draft] --- ydb/core/formats/arrow/program.h | 28 ++ ydb/core/kqp/host/kqp_type_ann.cpp | 2 +- .../opt/physical/kqp_opt_phy_olap_filter.cpp | 80 ++-- .../kqp/query_compiler/kqp_olap_compiler.cpp | 356 ++++++++---------- ydb/core/protos/ssa.proto | 4 + ydb/core/tx/program/program.cpp | 8 + 6 files changed, 263 insertions(+), 215 deletions(-) diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h index b599a0c92a35..dbfa46f0f816 100644 --- a/ydb/core/formats/arrow/program.h +++ b/ydb/core/formats/arrow/program.h @@ -128,6 +128,34 @@ class TAssign { , FuncOpts(nullptr) {} + explicit TAssign(const TColumnInfo& column, i8 value) + : Column(column) + , Operation(EOperation::Constant) + , Constant(std::make_shared(value)) + , FuncOpts(nullptr) + {} + + explicit TAssign(const TColumnInfo& column, ui8 value) + : Column(column) + , Operation(EOperation::Constant) + , Constant(std::make_shared(value)) + , FuncOpts(nullptr) + {} + + explicit TAssign(const TColumnInfo& column, i16 value) + : Column(column) + , Operation(EOperation::Constant) + , Constant(std::make_shared(value)) + , FuncOpts(nullptr) + {} + + explicit TAssign(const TColumnInfo& column, ui16 value) + : Column(column) + , Operation(EOperation::Constant) + , Constant(std::make_shared(value)) + , FuncOpts(nullptr) + {} + explicit TAssign(const TColumnInfo& column, i32 value) : Column(column) , Operation(EOperation::Constant) diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 82336041db8a..7c33b9e39d68 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -844,7 +844,7 @@ bool ValidateOlapFilterConditions(const TExprNode* node, const TStructExprType* if (!EnsureAtom(*op, ctx)) { return false; } - if (!op->IsAtom({"eq", "neq", "lt", "lte", "gt", "gte", "string_contains", "starts_with", "ends_with", "+", "-", "*", "/", "%"})) { + if (!op->IsAtom({"eq", "neq", "lt", "lte", "gt", "gte", "string_contains", "starts_with", "ends_with", "+", "-", "*", "/", "%", "??"})) { ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unexpected OLAP binary operation: " << op->Content() )); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp index 3fa704508ba4..98cf26b8d456 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp @@ -99,12 +99,27 @@ bool IsFalseLiteral(TExprBase node) { return node.Maybe() && !FromString(node.Cast().Literal().Value()); } -std::optional> ExtractArithmeticParameters(const TCoBinaryArithmetic& op, TExprContext& ctx, TPositionHandle pos); +std::optional> ExtractBinaryFunctionParameters(const TExprBase& op, TExprContext& ctx, TPositionHandle pos); +std::vector> ExtractComparisonParameters(const TCoCompare& predicate, TExprContext& ctx, TPositionHandle pos); + +TMaybeNode ComparisonPushdown(const std::vector>& parameters, const TCoCompare& predicate, TExprContext& ctx, TPositionHandle pos); + +TMaybeNode YqlCoalescePushdown(const TCoCoalesce& coalesce, TExprContext& ctx) { + if (const auto params = ExtractBinaryFunctionParameters(coalesce, ctx, coalesce.Pos())) { + return Build(ctx, coalesce.Pos()) + .Operator().Value("??", TNodeFlags::Default).Build() + .Left(params->first) + .Right(params->second) + .Done(); + } + + return NullNode; +} TVector ConvertComparisonNode(const TExprBase& nodeIn, TExprContext& ctx, TPositionHandle pos) { TVector out; - auto convertNode = [&ctx, &pos](const TExprBase& node) -> TMaybeNode { + const auto convertNode = [&ctx, &pos](const TExprBase& node) -> TMaybeNode { if (node.Maybe()) { return node; } @@ -140,7 +155,7 @@ TVector ConvertComparisonNode(const TExprBase& nodeIn, TExprContext& builder.ReturningType(maybeReturningType.Cast()); } else { builder.ReturningType() - .Type(ctx.NewAtom(node.Pos(), "Utf8")) + .Type().Value("Utf8", TNodeFlags::Default).Build() .Build(); } return builder.Done(); @@ -148,15 +163,26 @@ TVector ConvertComparisonNode(const TExprBase& nodeIn, TExprContext& if (const auto maybeArithmetic = node.Maybe()) { if (const auto arithmetic = maybeArithmetic.Cast(); !arithmetic.Maybe()) { - if (const auto params = ExtractArithmeticParameters(arithmetic, ctx, pos)) { + if (const auto params = ExtractBinaryFunctionParameters(arithmetic, ctx, pos)) { return Build(ctx, pos) - .Operator(ctx.NewAtom(pos, arithmetic.Ref().Content(), TNodeFlags::Default)) + .Operator().Value(arithmetic.Ref().Content(), TNodeFlags::Default).Build() .Left(params->first) .Right(params->second) .Done(); } } } + + if (const auto maybeCoalesce = node.Maybe()) { + return YqlCoalescePushdown(maybeCoalesce.Cast(), ctx); + } + + if (const auto maybeCompare = node.Maybe()) { + if (const auto params = ExtractComparisonParameters(maybeCompare.Cast(), ctx, pos); !params.empty()) { + return ComparisonPushdown(params, maybeCompare.Cast(), ctx, pos); + } + } + return NullNode; }; @@ -192,14 +218,14 @@ TVector ConvertComparisonNode(const TExprBase& nodeIn, TExprContext& return out; } -std::optional> ExtractArithmeticParameters(const TCoBinaryArithmetic& op, TExprContext& ctx, TPositionHandle pos) +std::optional> ExtractBinaryFunctionParameters(const TExprBase& op, TExprContext& ctx, TPositionHandle pos) { - const auto left = ConvertComparisonNode(op.Left(), ctx, pos); + const auto left = ConvertComparisonNode(TExprBase(op.Ref().HeadPtr()), ctx, pos); if (left.size() != 1U) { return std::nullopt; } - const auto right = ConvertComparisonNode(op.Right(), ctx, pos); + const auto right = ConvertComparisonNode(TExprBase(op.Ref().TailPtr()), ctx, pos); if (right.size() != 1U) { return std::nullopt; } @@ -207,9 +233,9 @@ std::optional> ExtractArithmeticParameters(const return std::make_pair(left.front(), right.front()); } -TVector> ExtractComparisonParameters(const TCoCompare& predicate, TExprContext& ctx, TPositionHandle pos) +std::vector> ExtractComparisonParameters(const TCoCompare& predicate, TExprContext& ctx, TPositionHandle pos) { - TVector> out; + std::vector> out; auto left = ConvertComparisonNode(predicate.Left(), ctx, pos); if (left.empty()) { @@ -280,13 +306,13 @@ TExprBase BuildOneElementComparison(const std::pair& param YQL_ENSURE(!compareOperator.empty(), "Unsupported comparison node: " << predicate.Ptr()->Content()); return Build(ctx, pos) - .Operator(ctx.NewAtom(pos, compareOperator, TNodeFlags::Default)) + .Operator().Value(compareOperator, TNodeFlags::Default).Build() .Left(parameter.first) .Right(parameter.second) .Done(); } -TMaybeNode ComparisonPushdown(const TVector>& parameters, const TCoCompare& predicate, +TMaybeNode ComparisonPushdown(const std::vector>& parameters, const TCoCompare& predicate, TExprContext& ctx, TPositionHandle pos) { ui32 conditionsCount = parameters.size(); @@ -342,7 +368,7 @@ TMaybeNode ComparisonPushdown(const TVector(ctx, pos) - .Operator(ctx.NewAtom(pos, "eq")) + .Operator().Value("eq", TNodeFlags::Default).Build() .Left(parameters[j].first) .Right(parameters[j].second) .Done()); @@ -360,6 +386,16 @@ TMaybeNode ComparisonPushdown(const TVector SimplePredicatePushdown(const TCoCompare& predicate, TExprContext& ctx, TPositionHandle pos) +{ + auto parameters = ExtractComparisonParameters(predicate, ctx, pos); + if (parameters.empty()) { + return NullNode; + } + + return ComparisonPushdown(parameters, predicate, ctx, pos); +} + // TODO: Check how to reduce columns if they are not needed. Unfortunately columnshard need columns list // for every column present in program even if it is not used in result set. //#define ENABLE_COLUMNS_PRUNING @@ -419,8 +455,6 @@ TMaybeNode SafeCastPredicatePushdown(const TCoFlatMap& inputFlatmap, * FlatMap (Member(), FlatMap(SafeCast(), Just(Comparison)) * FlatMap (SafeCast(), FlatMap(SafeCast(), Just(Comparison)) */ - TVector> out; - auto left = ConvertComparisonNode(inputFlatmap.Input(), ctx, pos); if (left.empty()) { return NullNode; @@ -434,7 +468,7 @@ TMaybeNode SafeCastPredicatePushdown(const TCoFlatMap& inputFlatmap, auto predicate = flatmap.Lambda().Body().Cast().Input().Cast(); - TVector> parameters; + std::vector> parameters; if (left.size() != right.size()) { return NullNode; } @@ -446,18 +480,14 @@ TMaybeNode SafeCastPredicatePushdown(const TCoFlatMap& inputFlatmap, return ComparisonPushdown(parameters, predicate, ctx, pos); } -TMaybeNode SimplePredicatePushdown(const TCoCompare& predicate, TExprContext& ctx, TPositionHandle pos) +TMaybeNode CoalescePushdown(const TCoCoalesce& coalesce, TExprContext& ctx, TPositionHandle pos) { - auto parameters = ExtractComparisonParameters(predicate, ctx, pos); - if (parameters.empty()) { - return NullNode; + if constexpr (NSsa::RuntimeVersion >= 4U) { + if (const auto node = YqlCoalescePushdown(coalesce, ctx)) { + return node; + } } - return ComparisonPushdown(parameters, predicate, ctx, pos); -} - -TMaybeNode CoalescePushdown(const TCoCoalesce& coalesce, TExprContext& ctx, TPositionHandle pos) -{ auto predicate = coalesce.Predicate(); if (auto maybeFlatmap = predicate.Maybe()) { return SafeCastPredicatePushdown(maybeFlatmap.Cast(), ctx, pos); diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp index 3eb99152c4e8..044cccf14b3c 100644 --- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp @@ -182,29 +182,11 @@ class TKqpOlapCompileContext { } const TTypeAnnotationNode* GetArgType(const TExprBase& arg) { - auto argType = arg.Ptr()->GetTypeAnn(); + const auto argType = arg.Ptr()->GetTypeAnn(); if (arg.Maybe() && argType->GetKind() == ETypeAnnotationKind::Unit) { // Column name return ConvertToBlockType(GetColumnTypeByName(arg.Cast().Value())); } - if (ETypeAnnotationKind::Data == argType->GetKind()) { - switch (argType->Cast()->GetSlot()) { - case EDataSlot::Int8: - argType = ExprContext.MakeType(EDataSlot::Int32); - break; - case EDataSlot::Int16: - argType = ExprContext.MakeType(EDataSlot::Int32); - break; - case EDataSlot::Uint8: - argType = ExprContext.MakeType(EDataSlot::Uint32); - break; - case EDataSlot::Uint16: - argType = ExprContext.MakeType(EDataSlot::Uint32); - break; - default: - break; - } - } return ExprContext.MakeType(argType); } private: @@ -252,77 +234,77 @@ std::unordered_set YqlKernelCmpFuncs = { "string_contains", "starts_with", "ends_with" }; -const TProgram::TAssignment* CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx); +ui64 CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx); ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx); -ui32 ConvertValueToColumn(const TCoDataCtor& value, TKqpOlapCompileContext& ctx) +ui64 ConvertValueToColumn(const TCoDataCtor& value, TKqpOlapCompileContext& ctx) { - TProgram::TAssignment* ssaValue = ctx.CreateAssignCmd(); - + constexpr bool yqlTypes = NKikimr::NSsa::RuntimeVersion >= 4U; + auto *const ssaValue = ctx.CreateAssignCmd(); + const auto& nodeValue = value.Cast().Literal().Value(); if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetText(TString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetBytes(TString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); - ssaValue->MutableConstant()->SetBool(FromString(nodeValue)); + if constexpr (yqlTypes) + ssaValue->MutableConstant()->SetUint8(FromString(nodeValue) ? 1U : 0U); + else + ssaValue->MutableConstant()->SetBool(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetFloat(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetDouble(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); - ssaValue->MutableConstant()->SetInt32(FromString(nodeValue)); + if constexpr (yqlTypes) + ssaValue->MutableConstant()->SetInt8(FromString(nodeValue)); + else + ssaValue->MutableConstant()->SetInt32(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); - ssaValue->MutableConstant()->SetInt32(FromString(nodeValue)); + if constexpr (yqlTypes) + ssaValue->MutableConstant()->SetInt16(FromString(nodeValue)); + else + ssaValue->MutableConstant()->SetInt32(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetInt32(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetInt64(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); - ssaValue->MutableConstant()->SetUint32(FromString(nodeValue)); + if constexpr (yqlTypes) + ssaValue->MutableConstant()->SetUint8(FromString(nodeValue)); + else + ssaValue->MutableConstant()->SetUint32(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); - ssaValue->MutableConstant()->SetUint32(FromString(nodeValue)); + if constexpr (yqlTypes) + ssaValue->MutableConstant()->SetUint16(FromString(nodeValue)); + else + ssaValue->MutableConstant()->SetUint32(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetUint32(FromString(nodeValue)); } else if (value.Maybe()) { - auto nodeValue = value.Cast().Literal().Value(); ssaValue->MutableConstant()->SetUint64(FromString(nodeValue)); } else { - YQL_ENSURE(false, "Unsupported content: " << value.Ptr()->Content()); + YQL_ENSURE(false, "Unsupported content: " << value.Ref().Content()); } return ssaValue->GetColumn().GetId(); } -ui32 ConvertParameterToColumn(const TCoParameter& parameter, TKqpOlapCompileContext& ctx) +ui64 ConvertParameterToColumn(const TCoParameter& parameter, TKqpOlapCompileContext& ctx) { - TProgram::TAssignment* ssaValue = ctx.CreateAssignCmd(); - - auto name = TString(parameter.Name().Value()); - auto maybeType = parameter.Type().Maybe(); + auto *const ssaValue = ctx.CreateAssignCmd(); - YQL_ENSURE(maybeType.IsValid(), "Unknown type content in conversion: " << parameter.Type().Ptr()->Content()); + const auto& name = parameter.Name().StringValue(); + const auto maybeType = parameter.Type().Maybe(); auto newParameter = ssaValue->MutableParameter(); newParameter->SetName(name); ctx.AddParameterName(name); - return ssaValue->GetColumn().GetId(); } -TProgram::TAssignment* ConvertSafeCastToColumn(const ui64 &columnId, const std::string& targetType, TKqpOlapCompileContext& ctx) { +ui64 ConvertSafeCastToColumn(const ui64 &columnId, const std::string& targetType, TKqpOlapCompileContext& ctx) { TProgram::TAssignment* assignCmd = ctx.CreateAssignCmd(); ui32 castFunction = TProgram::TAssignment::FUNC_UNSPECIFIED; @@ -357,13 +339,12 @@ TProgram::TAssignment* ConvertSafeCastToColumn(const ui64 &columnId, const std:: auto newCast = assignCmd->MutableFunction(); newCast->SetId(castFunction); newCast->AddArguments()->SetId(columnId); - return assignCmd; + return assignCmd->GetColumn().GetId(); } -TProgram::TAssignment* ConvertSafeCastToColumn(const TExprBase& colName, const std::string& targetType, TKqpOlapCompileContext& ctx) +ui64 ConvertSafeCastToColumn(const TExprBase& colName, const std::string& targetType, TKqpOlapCompileContext& ctx) { - auto columnId = GetOrCreateColumnId(colName, ctx); - return ConvertSafeCastToColumn(columnId, targetType, ctx); + return ConvertSafeCastToColumn(GetOrCreateColumnId(colName, ctx), targetType, ctx); } ui64 ConvertSafeCastToColumn(const TCoSafeCast& cast, TKqpOlapCompileContext& ctx) @@ -374,19 +355,17 @@ ui64 ConvertSafeCastToColumn(const TCoSafeCast& cast, TKqpOlapCompileContext& ct maybeDataType = maybeOptionalType.Cast().ItemType().Maybe(); } } - YQL_ENSURE(maybeDataType.IsValid()); - auto assignCmd = ConvertSafeCastToColumn(cast.Value(), maybeDataType.Cast().Type().StringValue(), ctx); - return assignCmd->GetColumn().GetId(); + return ConvertSafeCastToColumn(cast.Value(), maybeDataType.Cast().Type().StringValue(), ctx); } ui64 ConvertJsonValueToColumn(const TKqpOlapJsonValue& jsonValueCallable, TKqpOlapCompileContext& ctx) { Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_VALUE pushdown is supported starting from the v3 of SSA runtime."); - ui32 columnId = GetOrCreateColumnId(jsonValueCallable.Column(), ctx); - ui32 pathId = GetOrCreateColumnId(jsonValueCallable.Path(), ctx); + const auto columnId = GetOrCreateColumnId(jsonValueCallable.Column(), ctx); + const auto pathId = GetOrCreateColumnId(jsonValueCallable.Path(), ctx); - TProgram::TAssignment* command = ctx.CreateAssignCmd(); - auto* jsonValueFunc = command->MutableFunction(); + auto *const command = ctx.CreateAssignCmd(); + auto *const jsonValueFunc = command->MutableFunction(); jsonValueFunc->AddArguments()->SetId(columnId); jsonValueFunc->AddArguments()->SetId(pathId); @@ -426,15 +405,14 @@ ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx) { YQL_ENSURE(false, "Unknown node in OLAP comparison compiler: " << node.Ptr()->Content()); } -const TProgram::TAssignment* CompileSimpleArrowComparison(const TKqpOlapFilterBinaryOp& comparison, - TKqpOlapCompileContext& ctx) +ui64 CompileSimpleArrowComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOlapCompileContext& ctx) { // Columns should be created before comparison, otherwise comparison fail to find columns - ui32 leftColumnId = GetOrCreateColumnId(comparison.Left(), ctx); - ui32 rightColumnId = GetOrCreateColumnId(comparison.Right(), ctx); + const auto leftColumnId = GetOrCreateColumnId(comparison.Left(), ctx); + const auto rightColumnId = GetOrCreateColumnId(comparison.Right(), ctx); - TProgram::TAssignment* command = ctx.CreateAssignCmd(); - auto* cmpFunc = command->MutableFunction(); + auto *const command = ctx.CreateAssignCmd(); + auto *const cmpFunc = command->MutableFunction(); ui32 function = TProgram::TAssignment::FUNC_UNSPECIFIED; if (comparison.Operator() == "eq") { @@ -455,18 +433,17 @@ const TProgram::TAssignment* CompileSimpleArrowComparison(const TKqpOlapFilterBi cmpFunc->AddArguments()->SetId(leftColumnId); cmpFunc->AddArguments()->SetId(rightColumnId); - return command; + return command->GetColumn().GetId(); } -const TProgram::TAssignment* CompileYqlKernelComparison(const TKqpOlapFilterBinaryOp& comparison, - TKqpOlapCompileContext& ctx) +ui64 CompileYqlKernelComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOlapCompileContext& ctx) { // Columns should be created before comparison, otherwise comparison fail to find columns - ui32 leftColumnId = GetOrCreateColumnId(comparison.Left(), ctx); - ui32 rightColumnId = GetOrCreateColumnId(comparison.Right(), ctx); + const auto leftColumnId = GetOrCreateColumnId(comparison.Left(), ctx); + const auto rightColumnId = GetOrCreateColumnId(comparison.Right(), ctx); - TProgram::TAssignment* command = ctx.CreateAssignCmd(); - auto* cmpFunc = command->MutableFunction(); + auto *const command = ctx.CreateAssignCmd(); + auto *const cmpFunc = command->MutableFunction(); ui32 function = TProgram::TAssignment::FUNC_UNSPECIFIED; bool isYqlKernelsSupported = (NKikimr::NSsa::RuntimeVersion >= 3); @@ -515,22 +492,17 @@ const TProgram::TAssignment* CompileYqlKernelComparison(const TKqpOlapFilterBina return ConvertSafeCastToColumn(command->GetColumn().GetId(), "Boolean", ctx); } - return command; + return command->GetColumn().GetId(); } struct TTypedColumn { - const ui64 Id; - const TTypeAnnotationNode *const Type; + const ui64 Id = 0ULL; + const TTypeAnnotationNode *const Type = nullptr; }; TTypedColumn GetOrCreateColumnIdAndType(const TExprBase& node, TKqpOlapCompileContext& ctx); -struct TTypedAssigment { - const TProgram::TAssignment *const Assigment; - const TTypeAnnotationNode *const Type; -}; - -TTypedAssigment CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& operation, TKqpOlapCompileContext& ctx) +TTypedColumn CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& operation, TKqpOlapCompileContext& ctx) { // Columns should be created before operation, otherwise operation fail to find columns const auto leftColumn = GetOrCreateColumnIdAndType(operation.Left(), ctx); @@ -574,6 +546,9 @@ TTypedAssigment CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& op } else if (oper == "%"sv) { op = TKernelRequestBuilder::EBinaryOp::Mod; compare = false; + } else if (oper == "??"sv) { + op = TKernelRequestBuilder::EBinaryOp::Coalesce; + compare = false; } else { YQL_ENSURE(false, "Unknown binary OLAP operation: " << oper); } @@ -584,41 +559,39 @@ TTypedAssigment CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& op cmpFunc->SetKernelIdx(kernel.first); cmpFunc->AddArguments()->SetId(leftColumn.Id); cmpFunc->AddArguments()->SetId(rightColumn.Id); - return {command, kernel.second}; + return {command->GetColumn().GetId(), kernel.second}; } -const TProgram::TAssignment* CompileYqlKernelAnyComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOlapCompileContext& ctx) +template +const TTypedColumn BuildLogicalProgram(const TExprNode::TChildrenType& args, const TFunc function, TKqpOlapCompileContext& ctx) { - return CompileYqlKernelBinaryOperation(comparison, ctx).Assigment; -} - -TTypedColumn GetOrCreateColumnIdAndType(const TExprBase& node, TKqpOlapCompileContext& ctx) { - if (const auto& maybeBinaryOp = node.Maybe()) { - const auto& pair = CompileYqlKernelBinaryOperation(maybeBinaryOp.Cast(), ctx); - return {pair.Assigment->GetColumn().GetId(), pair.Type}; + const auto childrenCount = args.size(); + if (childrenCount == 1) { + // NOT operation is handled separately, thus only one available situation here: + // this is binary operation with only one node, just build this node and return. + return GetOrCreateColumnIdAndType(TExprBase(args.front()), ctx); } - return {GetOrCreateColumnId(node, ctx), ctx.GetArgType(node)}; -} + const bool twoArgs = 2U == childrenCount; + const auto half = childrenCount >> 1U; + const auto left = twoArgs ? GetOrCreateColumnIdAndType(TExprBase(args.front()), ctx) : BuildLogicalProgram(args.subspan(0U, half), function, ctx); + const auto right = twoArgs ? GetOrCreateColumnIdAndType(TExprBase(args.back()), ctx) : BuildLogicalProgram(args.subspan(half), function, ctx); -const TProgram::TAssignment* CompileComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOlapCompileContext& ctx) -{ - if constexpr (NKikimr::NSsa::RuntimeVersion >= 4U) { - if (ctx.CheckYqlCompatibleArgsTypes(comparison)) { - return CompileYqlKernelAnyComparison(comparison, ctx); - } else { - return ConvertSafeCastToColumn(CompileSimpleArrowComparison(comparison, ctx)->GetColumn().GetId(), "Uint8", ctx); - } - } + auto *const logicalOp = ctx.CreateAssignCmd(); + auto *const logicalFunc = logicalOp->MutableFunction(); + logicalFunc->AddArguments()->SetId(left.Id); + logicalFunc->AddArguments()->SetId(right.Id); - std::string op = comparison.Operator().StringValue().c_str(); - if (SimpleArrowCmpFuncs.contains(op)) { - return CompileSimpleArrowComparison(comparison, ctx); - } else if (YqlKernelCmpFuncs.contains(op)) { - return CompileYqlKernelComparison(comparison, ctx); + const auto block = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); + if constexpr (std::is_same()) { + const auto idx = ctx.GetKernelRequestBuilder().AddBinaryOp(function, block, block, block); + logicalFunc->SetKernelIdx(idx); + logicalFunc->SetFunctionType(TProgram::YQL_KERNEL); } else { - YQL_ENSURE(false, "Unknown comparison operator: " << op); + logicalFunc->SetFunctionType(function); } + + return {logicalOp->GetColumn().GetId(), block}; } const TProgram::TAssignment* InvertResult(TProgram::TAssignment* command, TKqpOlapCompileContext& ctx) @@ -632,9 +605,10 @@ const TProgram::TAssignment* InvertResult(TProgram::TAssignment* command, TKqpOl } template -const TProgram::TAssignment* CompileExists(const TKqpOlapFilterExists& exists, TKqpOlapCompileContext& ctx) +const TTypedColumn CompileExists(const TKqpOlapFilterExists& exists, TKqpOlapCompileContext& ctx) { - const ui32 columnId = GetOrCreateColumnId(exists.Column(), ctx); + const auto type = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(NSsa::RuntimeVersion >= 4U ? EDataSlot::Uint8 : EDataSlot::Bool)); + const auto columnId = GetOrCreateColumnId(exists.Column(), ctx); auto *const command = ctx.CreateAssignCmd(); auto *const isNullFunc = command->MutableFunction(); @@ -643,92 +617,115 @@ const TProgram::TAssignment* CompileExists(const TKqpOlapFilterExists& exists, T if constexpr (Empty) { if constexpr (NSsa::RuntimeVersion >= 4U) { - return ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx); + return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), type}; } else { - return command; + return {command->GetColumn().GetId(), type}; } } auto *const notCommand = InvertResult(command, ctx); if constexpr (NSsa::RuntimeVersion >= 4U) { - return ConvertSafeCastToColumn(notCommand->GetColumn().GetId(), "Uint8", ctx); + return {ConvertSafeCastToColumn(notCommand->GetColumn().GetId(), "Uint8", ctx), type}; } else { - return notCommand; + return {notCommand->GetColumn().GetId(), type}; } } -const TProgram::TAssignment* CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallable, TKqpOlapCompileContext& ctx) { - Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_EXISTS pushdown is supported starting from the v3 of SSA runtime."); +const TTypedColumn BuildLogicalNot(const TExprBase& arg, TKqpOlapCompileContext& ctx) { + if (const auto maybeExists = arg.Maybe()) { + return CompileExists(maybeExists.Cast(), ctx); + } - ui32 columnId = GetOrCreateColumnId(jsonExistsCallable.Column(), ctx); - ui32 pathId = GetOrCreateColumnId(jsonExistsCallable.Path(), ctx); + // Not is a special way in case it has only one child + const auto value = GetOrCreateColumnIdAndType(arg, ctx); + auto *const notOp = ctx.CreateAssignCmd(); + auto *const notFunc = notOp->MutableFunction(); - TProgram::TAssignment* command = ctx.CreateAssignCmd(); - auto* jsonExistsFunc = command->MutableFunction(); + notFunc->AddArguments()->SetId(value.Id); - jsonExistsFunc->AddArguments()->SetId(columnId); - jsonExistsFunc->AddArguments()->SetId(pathId); + if constexpr (NSsa::RuntimeVersion >= 4U) { + const auto block = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool)); + const auto idx = ctx.GetKernelRequestBuilder().AddUnaryOp(TKernelRequestBuilder::EUnaryOp::Not, block, block); + notFunc->SetKernelIdx(idx); + notFunc->SetFunctionType(TProgram::YQL_KERNEL); + } else + notFunc->SetId(TProgram::TAssignment::FUNC_BINARY_NOT); - jsonExistsFunc->SetFunctionType(TProgram::YQL_KERNEL); - auto idx = ctx.AddYqlKernelJsonExists( - jsonExistsCallable.Column(), - jsonExistsCallable.Path(), - ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); - jsonExistsFunc->SetKernelIdx(idx); + return {notOp->GetColumn().GetId(), value.Type}; +} - return command; +TTypedColumn GetOrCreateColumnIdAndType(const TExprBase& node, TKqpOlapCompileContext& ctx) { + if (const auto& maybeBinaryOp = node.Maybe()) { + if (const auto& binaryOp = maybeBinaryOp.Cast(); ctx.CheckYqlCompatibleArgsTypes(binaryOp)) { + return CompileYqlKernelBinaryOperation(binaryOp, ctx); + } else { + return { + ConvertSafeCastToColumn(CompileSimpleArrowComparison(binaryOp, ctx), "Uint8", ctx), + ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Uint8)) + }; + } + } else if (const auto& maybeAnd = node.Maybe()) { + return BuildLogicalProgram(maybeAnd.Ref().Children(), TKernelRequestBuilder::EBinaryOp::And, ctx); + } else if (const auto& maybeOr = node.Maybe()) { + return BuildLogicalProgram(maybeOr.Ref().Children(), TKernelRequestBuilder::EBinaryOp::Or, ctx); + } else if (const auto& maybeXor = node.Maybe()) { + return BuildLogicalProgram(maybeXor.Ref().Children(), TKernelRequestBuilder::EBinaryOp::Xor, ctx); + } else if (const auto& maybeNot = node.Maybe()) { + return BuildLogicalNot(maybeNot.Cast().Value(), ctx); + } + + return {GetOrCreateColumnId(node, ctx), ctx.GetArgType(node)}; } -template -const TProgram::TAssignment* BuildLogicalProgram(const TExprNode::TChildrenType& args, const TFunc function, TKqpOlapCompileContext& ctx) +ui64 CompileComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOlapCompileContext& ctx) { - const auto childrenCount = args.size(); - if (childrenCount == 1) { - // NOT operation is handled separately, thus only one available situation here: - // this is binary operation with only one node, just build this node and return. - return CompileCondition(TExprBase(args.front()), ctx); + if constexpr (NKikimr::NSsa::RuntimeVersion >= 4U) { + if (ctx.CheckYqlCompatibleArgsTypes(comparison)) { + return CompileYqlKernelBinaryOperation(comparison, ctx).Id; + } else { + return ConvertSafeCastToColumn(CompileSimpleArrowComparison(comparison, ctx), "Uint8", ctx); + } } - const TProgram::TAssignment* left = nullptr; - const TProgram::TAssignment* right = nullptr; - - if (childrenCount == 2) { - // Nice, we can build logical operation with two child as expected - left = CompileCondition(TExprBase(args[0]), ctx); - right = CompileCondition(TExprBase(args[1]), ctx); + std::string op = comparison.Operator().StringValue().c_str(); + if (SimpleArrowCmpFuncs.contains(op)) { + return CompileSimpleArrowComparison(comparison, ctx); + } else if (YqlKernelCmpFuncs.contains(op)) { + return CompileYqlKernelComparison(comparison, ctx); } else { - // >2 children - split incoming vector in the middle call this function recursively. - auto leftArgs = args.Slice(0, childrenCount / 2); - auto rightArgs = args.Slice(childrenCount / 2); - - left = BuildLogicalProgram(leftArgs, function, ctx); - right = BuildLogicalProgram(rightArgs, function, ctx); + YQL_ENSURE(false, "Unknown comparison operator: " << op); } +} - auto *const logicalOp = ctx.CreateAssignCmd(); - auto *const logicalFunc = logicalOp->MutableFunction(); - logicalFunc->AddArguments()->SetId(left->GetColumn().GetId()); - logicalFunc->AddArguments()->SetId(right->GetColumn().GetId()); +ui64 CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallable, TKqpOlapCompileContext& ctx) { + Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_EXISTS pushdown is supported starting from the v3 of SSA runtime."); - if constexpr (std::is_same()) { - const auto block = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool)); - const auto idx = ctx.GetKernelRequestBuilder().AddBinaryOp(function, block, block, block); - logicalFunc->SetKernelIdx(idx); - logicalFunc->SetFunctionType(TProgram::YQL_KERNEL); - } else { - logicalFunc->SetFunctionType(function); - } + const auto columnId = GetOrCreateColumnId(jsonExistsCallable.Column(), ctx); + const auto pathId = GetOrCreateColumnId(jsonExistsCallable.Path(), ctx); + + auto *const command = ctx.CreateAssignCmd(); + auto *const jsonExistsFunc = command->MutableFunction(); + + jsonExistsFunc->AddArguments()->SetId(columnId); + jsonExistsFunc->AddArguments()->SetId(pathId); + + jsonExistsFunc->SetFunctionType(TProgram::YQL_KERNEL); + auto idx = ctx.AddYqlKernelJsonExists( + jsonExistsCallable.Column(), + jsonExistsCallable.Path(), + ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); + jsonExistsFunc->SetKernelIdx(idx); - return logicalOp; + return command->GetColumn().GetId(); } -const TProgram::TAssignment* CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx) { +ui64 CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx) { if (const auto maybeCompare = condition.Maybe()) { return CompileComparison(maybeCompare.Cast(), ctx); } if (const auto maybeExists = condition.Maybe()) { - return CompileExists(maybeExists.Cast(), ctx); + return CompileExists(maybeExists.Cast(), ctx).Id; } if (const auto maybeJsonExists = condition.Maybe()) { @@ -736,26 +733,7 @@ const TProgram::TAssignment* CompileCondition(const TExprBase& condition, TKqpOl } if (const auto maybeNot = condition.Maybe()) { - if (const auto maybeExists = maybeNot.Cast().Value().Maybe()) { - return CompileExists(maybeExists.Cast(), ctx); - } - - // Not is a special way in case it has only one child - auto *const value = CompileCondition(maybeNot.Cast().Value(), ctx); - auto *const notOp = ctx.CreateAssignCmd(); - auto *const notFunc = notOp->MutableFunction(); - - notFunc->AddArguments()->SetId(value->GetColumn().GetId()); - - if constexpr (NSsa::RuntimeVersion >= 4U) { - const auto block = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool)); - const auto idx = ctx.GetKernelRequestBuilder().AddUnaryOp(TKernelRequestBuilder::EUnaryOp::Not, block, block); - notFunc->SetKernelIdx(idx); - notFunc->SetFunctionType(TProgram::YQL_KERNEL); - } else - notFunc->SetId(TProgram::TAssignment::FUNC_BINARY_NOT); - - return notOp; + return BuildLogicalNot(maybeNot.Cast().Value(), ctx).Id; } ui32 function = TProgram::TAssignment::FUNC_UNSPECIFIED; @@ -775,15 +753,15 @@ const TProgram::TAssignment* CompileCondition(const TExprBase& condition, TKqpOl } if constexpr (NSsa::RuntimeVersion >= 4U) - return BuildLogicalProgram(condition.Ref().Children(), op, ctx); + return BuildLogicalProgram(condition.Ref().Children(), op, ctx).Id; else - return BuildLogicalProgram(condition.Ref().Children(), function, ctx); + return BuildLogicalProgram(condition.Ref().Children(), function, ctx).Id; } void CompileFilter(const TKqpOlapFilter& filterNode, TKqpOlapCompileContext& ctx) { - auto* condition = CompileCondition(filterNode.Condition(), ctx); + const auto condition = CompileCondition(filterNode.Condition(), ctx); auto* filter = ctx.CreateFilter(); - filter->MutablePredicate()->SetId(condition->GetColumn().GetId()); + filter->MutablePredicate()->SetId(condition); } std::vector CollectAggregationInfos(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx) { diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto index fc2213393981..69db3462922b 100644 --- a/ydb/core/protos/ssa.proto +++ b/ydb/core/protos/ssa.proto @@ -33,6 +33,10 @@ message TProgram { double Double = 7; bytes Bytes = 8; string Text = 9; + int32 Int8 = 10; + uint32 Uint8 = 11; + int32 Int16 = 12; + uint32 Uint16 = 13; } } diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp index 8537990e375e..49967d75d4a5 100644 --- a/ydb/core/tx/program/program.cpp +++ b/ydb/core/tx/program/program.cpp @@ -250,6 +250,14 @@ NSsa::TAssign TProgramBuilder::MakeConstant(const NSsa::TColumnInfo& name, const switch (constant.GetValueCase()) { case TId::kBool: return TAssign(name, constant.GetBool()); + case TId::kInt8: + return TAssign(name, i8(constant.GetInt8())); + case TId::kUint8: + return TAssign(name, ui8(constant.GetUint8())); + case TId::kInt16: + return TAssign(name, i16(constant.GetInt16())); + case TId::kUint16: + return TAssign(name, ui16(constant.GetUint16())); case TId::kInt32: return TAssign(name, constant.GetInt32()); case TId::kUint32: From 97fb075d9c58236cf1f323eed234cca7cadff35f Mon Sep 17 00:00:00 2001 From: Anton Romanov Date: Sun, 24 Dec 2023 15:14:25 +0100 Subject: [PATCH 2/7] Add some nulls into test source table. --- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 10 ++++++++-- ydb/core/testlib/cs_helper.cpp | 15 +++++++++++++-- ydb/core/testlib/cs_helper.h | 10 +++++----- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 83373dbbca8e..fbed97714aeb 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -517,10 +517,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } }; - void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false) { UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead TLocalHelper lHelper(kikimr); + if (withSomeNulls) + lHelper.WithSomeNulls(); NYdb::NLongTx::TClient client(kikimr.GetDriver()); NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); @@ -1547,7 +1549,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { scanSettings.Explain(true); TLocalHelper(kikimr).CreateTestOlapTable(); - WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 5); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 5, true); Tests::NCommon::TLoggerInit(kikimr).Initialize(); auto tableClient = kikimr.GetTableClient(); @@ -1555,7 +1557,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) { // TODO: Add support for DqPhyPrecompute push-down: Cast((2+2) as Uint64) std::vector testData = { R"(`resource_id` = `uid`)", + R"(`resource_id` != `uid`)", R"(`resource_id` = "10001")", + R"(`resource_id` != "10001")", R"(`level` = 1)", R"(`level` = Int8("1"))", R"(`level` = Int16("1"))", @@ -1596,6 +1600,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { R"(`uid` > `resource_id`)", R"(`level` IS NULL)", R"(`level` IS NOT NULL)", + R"(`message` IS NULL)", + R"(`message` IS NOT NULL)", R"((`level`, `uid`) > (Int32("1"), NULL))", R"((`level`, `uid`) != (Int32("1"), NULL))", R"(`level` >= CAST("2" As Int32))", diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index a2cb1668991b..557d0cfff98f 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -133,14 +133,25 @@ std::shared_ptr THelper::TestArrowBatch(ui64 pathIdBegin, ui jsonInfo["a"]["c"] = "asds"; jsonInfo["b"] = "asd"; + size_t index = 1ULL; + const auto magic = WithSomeNulls_ ? 3ULL : 0ULL; for (size_t i = 0; i < rowCount; ++i) { std::string uid("uid_" + std::to_string(tsBegin + i)); std::string message("some prefix " + std::string(1024 + i % 200, 'x')); Y_ABORT_UNLESS(b1.Append(tsBegin + i * tsStepUs).ok()); Y_ABORT_UNLESS(b2.Append(std::to_string(pathIdBegin + i)).ok()); Y_ABORT_UNLESS(b3.Append(uid).ok()); - Y_ABORT_UNLESS(b4.Append(i % 5).ok()); - Y_ABORT_UNLESS(b5.Append(message).ok()); + + if (magic && !(++index % magic)) + Y_ABORT_UNLESS(b4.AppendNull().ok()); + else + Y_ABORT_UNLESS(b4.Append(i % 5).ok()); + + if (magic && !(++index % magic)) + Y_ABORT_UNLESS(b5.AppendNull().ok()); + else + Y_ABORT_UNLESS(b5.Append(message).ok()); + jsonInfo["a"]["b"] = i; auto jsonStringBase = jsonInfo.GetStringRobust(); Y_ABORT_UNLESS(b6.Append(jsonStringBase.data(), jsonStringBase.size()).ok()); diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index d013ca3d1afe..8714eb0727d4 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -27,6 +27,7 @@ class THelper: public THelperSchemaless { std::shared_ptr GetArrowSchema() const; YDB_FLAG_ACCESSOR(WithJsonDocument, false); TString ShardingMethod = "HASH_FUNCTION_CONSISTENCY_64"; + bool WithSomeNulls_ = false; protected: void CreateOlapTableWithStore(TString tableName = "olapTable", TString storeName = "olapStore", ui32 storeShardsCount = 4, ui32 tableShardsCount = 3); @@ -41,19 +42,18 @@ class THelper: public THelperSchemaless { static constexpr const char * PROTO_SCHEMA = R"( Columns { Name: "timestamp" Type: "Timestamp" NotNull: true } - #Columns { Name: "resource_type" Type: "Utf8" } Columns { Name: "resource_id" Type: "Utf8" } Columns { Name: "uid" Type: "Utf8" } Columns { Name: "level" Type: "Int32" } Columns { Name: "message" Type: "Utf8" } - #Columns { Name: "json_payload" Type: "Json" } - #Columns { Name: "ingested_at" Type: "Timestamp" } - #Columns { Name: "saved_at" Type: "Timestamp" } - #Columns { Name: "request_id" Type: "Utf8" } KeyColumnNames: "timestamp" Engine: COLUMN_ENGINE_REPLACING_TIMESERIES )"; + void WithSomeNulls() { + WithSomeNulls_ = true; + }; + virtual std::vector GetShardingColumns() const { return {"timestamp", "uid"}; } From 045db3d7dfdc8501ab1dfa95e7383939929996cf Mon Sep 17 00:00:00 2001 From: Anton Romanov Date: Wed, 27 Dec 2023 11:07:11 +0100 Subject: [PATCH 3/7] JsonExist a little fix. --- .../kqp/query_compiler/kqp_olap_compiler.cpp | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp index 044cccf14b3c..e5c54bd41122 100644 --- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp @@ -402,7 +402,7 @@ ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx) { return ConvertJsonValueToColumn(maybeJsonValue.Cast(), ctx); } - YQL_ENSURE(false, "Unknown node in OLAP comparison compiler: " << node.Ptr()->Content()); + YQL_ENSURE(false, "Unknown node in OLAP comparison compiler: " << node.Ref().Content()); } ui64 CompileSimpleArrowComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOlapCompileContext& ctx) @@ -631,6 +631,33 @@ const TTypedColumn CompileExists(const TKqpOlapFilterExists& exists, TKqpOlapCom } } +const TTypedColumn CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallable, TKqpOlapCompileContext& ctx) { + Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_EXISTS pushdown is supported starting from the v3 of SSA runtime."); + + const auto columnId = GetOrCreateColumnId(jsonExistsCallable.Column(), ctx); + const auto pathId = GetOrCreateColumnId(jsonExistsCallable.Path(), ctx); + + auto *const command = ctx.CreateAssignCmd(); + auto *const jsonExistsFunc = command->MutableFunction(); + + jsonExistsFunc->AddArguments()->SetId(columnId); + jsonExistsFunc->AddArguments()->SetId(pathId); + + jsonExistsFunc->SetFunctionType(TProgram::YQL_KERNEL); + const auto idx = ctx.AddYqlKernelJsonExists( + jsonExistsCallable.Column(), + jsonExistsCallable.Path(), + ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); + jsonExistsFunc->SetKernelIdx(idx); + + const auto type = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(NSsa::RuntimeVersion >= 4U ? EDataSlot::Uint8 : EDataSlot::Bool)); + if constexpr (NSsa::RuntimeVersion >= 4U) { + return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), type}; + } else { + return {command->GetColumn().GetId(), type}; + } +} + const TTypedColumn BuildLogicalNot(const TExprBase& arg, TKqpOlapCompileContext& ctx) { if (const auto maybeExists = arg.Maybe()) { return CompileExists(maybeExists.Cast(), ctx); @@ -697,28 +724,6 @@ ui64 CompileComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOlapCompile } } -ui64 CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallable, TKqpOlapCompileContext& ctx) { - Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_EXISTS pushdown is supported starting from the v3 of SSA runtime."); - - const auto columnId = GetOrCreateColumnId(jsonExistsCallable.Column(), ctx); - const auto pathId = GetOrCreateColumnId(jsonExistsCallable.Path(), ctx); - - auto *const command = ctx.CreateAssignCmd(); - auto *const jsonExistsFunc = command->MutableFunction(); - - jsonExistsFunc->AddArguments()->SetId(columnId); - jsonExistsFunc->AddArguments()->SetId(pathId); - - jsonExistsFunc->SetFunctionType(TProgram::YQL_KERNEL); - auto idx = ctx.AddYqlKernelJsonExists( - jsonExistsCallable.Column(), - jsonExistsCallable.Path(), - ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); - jsonExistsFunc->SetKernelIdx(idx); - - return command->GetColumn().GetId(); -} - ui64 CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx) { if (const auto maybeCompare = condition.Maybe()) { return CompileComparison(maybeCompare.Cast(), ctx); @@ -729,7 +734,7 @@ ui64 CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx) { } if (const auto maybeJsonExists = condition.Maybe()) { - return CompileJsonExists(maybeJsonExists.Cast(), ctx); + return CompileJsonExists(maybeJsonExists.Cast(), ctx).Id; } if (const auto maybeNot = condition.Maybe()) { From de9de3bb82e15c4dfeb0681a30e7bd12b21437b2 Mon Sep 17 00:00:00 2001 From: Anton Romanov Date: Fri, 29 Dec 2023 17:10:33 +0100 Subject: [PATCH 4/7] Fix test --- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index fbed97714aeb..221aa8e3ab95 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1825,15 +1825,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { for (auto& op : operators) { if (op.GetMapSafe().at("Name") == "TableFullScan") { UNIT_ASSERT(op.GetMapSafe().at("SsaProgram").IsDefined()); - auto ssa = op.GetMapSafe().at("SsaProgram").GetStringRobust(); - int filterCmdCount = 0; - std::string::size_type pos = 0; - std::string filterCmd = R"("Filter":{)"; - while ((pos = ssa.find(filterCmd, pos)) != std::string::npos) { - ++filterCmdCount; - pos += filterCmd.size(); - } - UNIT_ASSERT_EQUAL(filterCmdCount, 2); + const auto ssa = op.GetMapSafe().at("SsaProgram").GetStringRobust(); + UNIT_ASSERT(ssa.Contains(R"("Filter":{)")); } } } From 9e2de83dd2f956b9a6392cd595524aa96f1e55da Mon Sep 17 00:00:00 2001 From: Anton Romanov Date: Fri, 29 Dec 2023 18:07:53 +0100 Subject: [PATCH 5/7] Fix make return type for binary operations. --- .../kqp/query_compiler/kqp_olap_compiler.cpp | 130 ++++++++++-------- 1 file changed, 74 insertions(+), 56 deletions(-) diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp index e5c54bd41122..0adb34cbffbd 100644 --- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -93,40 +94,54 @@ class TKqpOlapCompileContext { return Program.AddCommand()->MutableProjection(); } - const TTypeAnnotationNode& GetCommonType(const TTypeAnnotationNode& left, const TTypeAnnotationNode& right) { - if (right.IsBlock()) - return right; - if (left.IsBlock()) - return left; - return right; + const TTypeAnnotationNode* GetReturnType(const TTypeAnnotationNode& left, const TTypeAnnotationNode& right, const TTypeAnnotationNode* resultItemType) const { + bool isScalarLeft, isScalarRight; + const auto leftItemType = GetBlockItemType(left, isScalarLeft); + const auto rightItemType = GetBlockItemType(right, isScalarRight); + + if (!resultItemType) { + const auto& leftCleanType = RemoveOptionality(*leftItemType); + const auto& rightCleanType = RemoveOptionality(*rightItemType); + YQL_ENSURE(IsSameAnnotation(leftCleanType, rightCleanType), "Expected same type kind."); + resultItemType = &rightCleanType; + } + + if (ETypeAnnotationKind::Optional == leftItemType->GetKind() || ETypeAnnotationKind::Optional == rightItemType->GetKind()) { + resultItemType = ExprContext.MakeType(resultItemType); + } + + if (isScalarLeft && isScalarRight) + return ExprContext.MakeType(resultItemType); + else + return ExprContext.MakeType(resultItemType); } - std::pair AddYqlKernelBinaryFunc(TKernelRequestBuilder::EBinaryOp op, const TTypeAnnotationNode& argTypeOne, const TTypeAnnotationNode& argTypeTwo, const TTypeAnnotationNode* retType) { - const auto retBlockType = retType ? ConvertToBlockType(retType) : &GetCommonType(argTypeOne, argTypeTwo) ; + std::pair AddYqlKernelBinaryFunc(TKernelRequestBuilder::EBinaryOp op, const TTypeAnnotationNode& argTypeOne, const TTypeAnnotationNode& argTypeTwo, const TTypeAnnotationNode* retType) const { + const auto retBlockType = GetReturnType(argTypeOne, argTypeTwo, retType); return std::make_pair(YqlKernelRequestBuilder->AddBinaryOp(op, &argTypeOne, &argTypeTwo, retBlockType), retBlockType); } - ui32 AddYqlKernelBinaryFunc(TKernelRequestBuilder::EBinaryOp op, const TExprBase& arg1, const TExprBase& arg2, const TTypeAnnotationNode* retType) { + ui32 AddYqlKernelBinaryFunc(TKernelRequestBuilder::EBinaryOp op, const TExprBase& arg1, const TExprBase& arg2, const TTypeAnnotationNode* retType) const { const auto arg1Type = GetArgType(arg1); const auto arg2Type = GetArgType(arg2); return AddYqlKernelBinaryFunc(op, *arg1Type, *arg2Type, retType).first; } - ui32 AddYqlKernelJsonExists(const TExprBase& arg1, const TExprBase& arg2, const TTypeAnnotationNode* retType) { - auto arg1Type = GetArgType(arg1); - auto arg2Type = GetArgType(arg2); - auto retBlockType = ConvertToBlockType(retType); + ui32 AddYqlKernelJsonExists(const TExprBase& arg1, const TExprBase& arg2, const TTypeAnnotationNode* retType) const { + const auto arg1Type = GetArgType(arg1); + const auto arg2Type = GetArgType(arg2); + const auto retBlockType = ConvertToBlockType(retType); return YqlKernelRequestBuilder->JsonExists(arg1Type, arg2Type, retBlockType); } - ui32 AddYqlKernelJsonValue(const TExprBase& arg1, const TExprBase& arg2, const TTypeAnnotationNode* retType) { - auto arg1Type = GetArgType(arg1); - auto arg2Type = GetArgType(arg2); - auto retBlockType = ConvertToBlockType(retType); + ui32 AddYqlKernelJsonValue(const TExprBase& arg1, const TExprBase& arg2, const TTypeAnnotationNode* retType) const { + const auto arg1Type = GetArgType(arg1); + const auto arg2Type = GetArgType(arg2); + const auto retBlockType = ConvertToBlockType(retType); return YqlKernelRequestBuilder->JsonValue(arg1Type, arg2Type, retBlockType); } - void AddParameterName(const TString& name) { + void AddParameterName(const TString& name) const { ReadProto.AddOlapProgramParameterNames(name); } @@ -147,11 +162,11 @@ class TKqpOlapCompileContext { KqpAggColNameToId.emplace(colName, id); } - std::vector GetResultColNames() { + std::vector GetResultColNames() const { return ResultColNames; } - bool IsEmptyProgram() { + bool IsEmptyProgram() const { return Program.GetCommand().empty(); } @@ -159,7 +174,7 @@ class TKqpOlapCompileContext { return ExprContext; } - bool CheckYqlCompatibleArgsTypes(const TKqpOlapFilterBinaryOp& operation) { + bool CheckYqlCompatibleArgsTypes(const TKqpOlapFilterBinaryOp& operation) const { if (const auto maybe = operation.Left().Maybe()) { if (const auto type = GetColumnTypeByName(maybe.Cast().Value()); type->GetKind() == ETypeAnnotationKind::Data) { if (const auto info = GetDataTypeInfo(type->Cast()->GetSlot()); !(info.Features & (NUdf::EDataTypeFeatures::StringType | NUdf::EDataTypeFeatures::NumericType))) { @@ -181,7 +196,7 @@ class TKqpOlapCompileContext { return *YqlKernelRequestBuilder; } - const TTypeAnnotationNode* GetArgType(const TExprBase& arg) { + const TTypeAnnotationNode* GetArgType(const TExprBase& arg) const { const auto argType = arg.Ptr()->GetTypeAnn(); if (arg.Maybe() && argType->GetKind() == ETypeAnnotationKind::Unit) { // Column name @@ -189,15 +204,15 @@ class TKqpOlapCompileContext { } return ExprContext.MakeType(argType); } -private: - const TTypeAnnotationNode* ConvertToBlockType(const TTypeAnnotationNode* type) { + + const TTypeAnnotationNode* ConvertToBlockType(const TTypeAnnotationNode* type) const { if (!type->IsBlock()) { return ExprContext.MakeType(type); } return type; } - - const TTypeAnnotationNode* GetColumnTypeByName(const std::string_view &name) { +private: + const TTypeAnnotationNode* GetColumnTypeByName(const std::string_view &name) const { auto *rowItemType = GetSeqItemType(Row.Ptr()->GetTypeAnn()); YQL_ENSURE(rowItemType->GetKind() == ETypeAnnotationKind::Struct, "Input for OLAP lambda must contain Struct inside."); auto structType = rowItemType->Cast(); @@ -358,7 +373,12 @@ ui64 ConvertSafeCastToColumn(const TCoSafeCast& cast, TKqpOlapCompileContext& ct return ConvertSafeCastToColumn(cast.Value(), maybeDataType.Cast().Type().StringValue(), ctx); } -ui64 ConvertJsonValueToColumn(const TKqpOlapJsonValue& jsonValueCallable, TKqpOlapCompileContext& ctx) { +struct TTypedColumn { + const ui64 Id = 0ULL; + const TTypeAnnotationNode *const Type = nullptr; +}; + +const TTypedColumn ConvertJsonValueToColumn(const TKqpOlapJsonValue& jsonValueCallable, TKqpOlapCompileContext& ctx) { Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_VALUE pushdown is supported starting from the v3 of SSA runtime."); const auto columnId = GetOrCreateColumnId(jsonValueCallable.Column(), ctx); @@ -371,14 +391,15 @@ ui64 ConvertJsonValueToColumn(const TKqpOlapJsonValue& jsonValueCallable, TKqpOl jsonValueFunc->AddArguments()->SetId(pathId); jsonValueFunc->SetFunctionType(TProgram::YQL_KERNEL); - auto returningTypeArg = jsonValueCallable.ReturningType(); - auto idx = ctx.AddYqlKernelJsonValue( + const auto returningTypeArg = jsonValueCallable.ReturningType(); + const auto type = ctx.ExprCtx().MakeType(returningTypeArg.Ref().GetTypeAnn()->Cast()->GetType()); + const auto idx = ctx.AddYqlKernelJsonValue( jsonValueCallable.Column(), jsonValueCallable.Path(), - ctx.ExprCtx().MakeType(returningTypeArg.Ref().GetTypeAnn()->Cast()->GetType())); + type); jsonValueFunc->SetKernelIdx(idx); - return command->GetColumn().GetId(); + return {command->GetColumn().GetId(), ctx.ConvertToBlockType(type)}; } ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx) { @@ -399,7 +420,7 @@ ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx) { } if (auto maybeJsonValue = node.Maybe()) { - return ConvertJsonValueToColumn(maybeJsonValue.Cast(), ctx); + return ConvertJsonValueToColumn(maybeJsonValue.Cast(), ctx).Id; } YQL_ENSURE(false, "Unknown node in OLAP comparison compiler: " << node.Ref().Content()); @@ -456,7 +477,7 @@ ui64 CompileYqlKernelComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOl auto idx = ctx.AddYqlKernelBinaryFunc(TKernelRequestBuilder::EBinaryOp::StringContains, comparison.Left(), comparison.Right(), - ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); + ctx.ExprCtx().MakeType(EDataSlot::Bool)); cmpFunc->SetKernelIdx(idx); needCastToBool = true; } @@ -467,7 +488,7 @@ ui64 CompileYqlKernelComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOl auto idx = ctx.AddYqlKernelBinaryFunc(TKernelRequestBuilder::EBinaryOp::StartsWith, comparison.Left(), comparison.Right(), - ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); + ctx.ExprCtx().MakeType(EDataSlot::Bool)); cmpFunc->SetKernelIdx(idx); needCastToBool = true; } @@ -478,7 +499,7 @@ ui64 CompileYqlKernelComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOl auto idx = ctx.AddYqlKernelBinaryFunc(TKernelRequestBuilder::EBinaryOp::EndsWith, comparison.Left(), comparison.Right(), - ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); + ctx.ExprCtx().MakeType(EDataSlot::Bool)); cmpFunc->SetKernelIdx(idx); needCastToBool = true; } @@ -495,11 +516,6 @@ ui64 CompileYqlKernelComparison(const TKqpOlapFilterBinaryOp& comparison, TKqpOl return command->GetColumn().GetId(); } -struct TTypedColumn { - const ui64 Id = 0ULL; - const TTypeAnnotationNode *const Type = nullptr; -}; - TTypedColumn GetOrCreateColumnIdAndType(const TExprBase& node, TKqpOlapCompileContext& ctx); TTypedColumn CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& operation, TKqpOlapCompileContext& ctx) @@ -512,7 +528,7 @@ TTypedColumn CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& opera auto *const cmpFunc = command->MutableFunction(); TKernelRequestBuilder::EBinaryOp op; - bool compare = true; + const TTypeAnnotationNode* type = ctx.ExprCtx().MakeType(EDataSlot::Bool); if (const std::string_view& oper = operation.Operator().Value(); oper == "string_contains"sv) { op = TKernelRequestBuilder::EBinaryOp::StringContains; } else if (oper == "starts_with"sv) { @@ -533,28 +549,28 @@ TTypedColumn CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& opera op = TKernelRequestBuilder::EBinaryOp::GreaterOrEqual; } else if (oper == "+"sv) { op = TKernelRequestBuilder::EBinaryOp::Add; - compare = false; + type = nullptr; } else if (oper == "-"sv) { op = TKernelRequestBuilder::EBinaryOp::Sub; - compare = false; + type = nullptr; } else if (oper == "*"sv) { op = TKernelRequestBuilder::EBinaryOp::Mul; - compare = false; + type = nullptr; } else if (oper == "/"sv) { op = TKernelRequestBuilder::EBinaryOp::Div; - compare = false; + type = nullptr; } else if (oper == "%"sv) { op = TKernelRequestBuilder::EBinaryOp::Mod; - compare = false; + type = nullptr; } else if (oper == "??"sv) { op = TKernelRequestBuilder::EBinaryOp::Coalesce; - compare = false; + bool stub; + type = GetBlockItemType(*rightColumn.Type, stub); } else { YQL_ENSURE(false, "Unknown binary OLAP operation: " << oper); } - const auto kernel = ctx.AddYqlKernelBinaryFunc(op, *leftColumn.Type, *rightColumn.Type, - compare ? ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool)) : nullptr); + const auto kernel = ctx.AddYqlKernelBinaryFunc(op, *leftColumn.Type, *rightColumn.Type, type); cmpFunc->SetFunctionType(TProgram::YQL_KERNEL); cmpFunc->SetKernelIdx(kernel.first); cmpFunc->AddArguments()->SetId(leftColumn.Id); @@ -607,7 +623,7 @@ const TProgram::TAssignment* InvertResult(TProgram::TAssignment* command, TKqpOl template const TTypedColumn CompileExists(const TKqpOlapFilterExists& exists, TKqpOlapCompileContext& ctx) { - const auto type = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(NSsa::RuntimeVersion >= 4U ? EDataSlot::Uint8 : EDataSlot::Bool)); + const auto type = ctx.ExprCtx().MakeType(EDataSlot::Bool); const auto columnId = GetOrCreateColumnId(exists.Column(), ctx); auto *const command = ctx.CreateAssignCmd(); auto *const isNullFunc = command->MutableFunction(); @@ -617,7 +633,7 @@ const TTypedColumn CompileExists(const TKqpOlapFilterExists& exists, TKqpOlapCom if constexpr (Empty) { if constexpr (NSsa::RuntimeVersion >= 4U) { - return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), type}; + return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), ctx.ConvertToBlockType(type)}; } else { return {command->GetColumn().GetId(), type}; } @@ -625,7 +641,7 @@ const TTypedColumn CompileExists(const TKqpOlapFilterExists& exists, TKqpOlapCom auto *const notCommand = InvertResult(command, ctx); if constexpr (NSsa::RuntimeVersion >= 4U) { - return {ConvertSafeCastToColumn(notCommand->GetColumn().GetId(), "Uint8", ctx), type}; + return {ConvertSafeCastToColumn(notCommand->GetColumn().GetId(), "Uint8", ctx), ctx.ConvertToBlockType(type)}; } else { return {notCommand->GetColumn().GetId(), type}; } @@ -644,17 +660,17 @@ const TTypedColumn CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallabl jsonExistsFunc->AddArguments()->SetId(pathId); jsonExistsFunc->SetFunctionType(TProgram::YQL_KERNEL); + const auto type = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool)); const auto idx = ctx.AddYqlKernelJsonExists( jsonExistsCallable.Column(), jsonExistsCallable.Path(), - ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool))); + type); jsonExistsFunc->SetKernelIdx(idx); - const auto type = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(NSsa::RuntimeVersion >= 4U ? EDataSlot::Uint8 : EDataSlot::Bool)); if constexpr (NSsa::RuntimeVersion >= 4U) { - return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), type}; + return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), ctx.ConvertToBlockType(type)}; } else { - return {command->GetColumn().GetId(), type}; + return {command->GetColumn().GetId(), ctx.ConvertToBlockType(type)}; } } @@ -699,6 +715,8 @@ TTypedColumn GetOrCreateColumnIdAndType(const TExprBase& node, TKqpOlapCompileCo return BuildLogicalProgram(maybeXor.Ref().Children(), TKernelRequestBuilder::EBinaryOp::Xor, ctx); } else if (const auto& maybeNot = node.Maybe()) { return BuildLogicalNot(maybeNot.Cast().Value(), ctx); + } else if (const auto& maybeJsonValue = node.Maybe()) { + return ConvertJsonValueToColumn(maybeJsonValue.Cast(), ctx); } return {GetOrCreateColumnId(node, ctx), ctx.GetArgType(node)}; From a9ddf8dce361ea3ec051e07d93e21ab3c2741930 Mon Sep 17 00:00:00 2001 From: Anton Romanov Date: Tue, 2 Jan 2024 23:10:47 +0100 Subject: [PATCH 6/7] Some fixes. --- .../opt/physical/kqp_opt_phy_olap_filter.cpp | 7 ++- .../kqp/query_compiler/kqp_olap_compiler.cpp | 58 ++++++++++--------- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp index 98cf26b8d456..1ce97d75bba6 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -483,8 +484,10 @@ TMaybeNode SafeCastPredicatePushdown(const TCoFlatMap& inputFlatmap, TMaybeNode CoalescePushdown(const TCoCoalesce& coalesce, TExprContext& ctx, TPositionHandle pos) { if constexpr (NSsa::RuntimeVersion >= 4U) { - if (const auto node = YqlCoalescePushdown(coalesce, ctx)) { - return node; + if (!FindNode(coalesce.Ptr(), [](const TExprNode::TPtr& node) { return TCoJsonValue::Match(node.Get()); })) { + if (const auto node = YqlCoalescePushdown(coalesce, ctx)) { + return node; + } } } diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp index 0adb34cbffbd..51b75b7349ed 100644 --- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp @@ -402,6 +402,33 @@ const TTypedColumn ConvertJsonValueToColumn(const TKqpOlapJsonValue& jsonValueCa return {command->GetColumn().GetId(), ctx.ConvertToBlockType(type)}; } +const TTypedColumn CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallable, TKqpOlapCompileContext& ctx) { + Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_EXISTS pushdown is supported starting from the v3 of SSA runtime."); + + const auto columnId = GetOrCreateColumnId(jsonExistsCallable.Column(), ctx); + const auto pathId = GetOrCreateColumnId(jsonExistsCallable.Path(), ctx); + + auto *const command = ctx.CreateAssignCmd(); + auto *const jsonExistsFunc = command->MutableFunction(); + + jsonExistsFunc->AddArguments()->SetId(columnId); + jsonExistsFunc->AddArguments()->SetId(pathId); + + jsonExistsFunc->SetFunctionType(TProgram::YQL_KERNEL); + const auto type = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool)); + const auto idx = ctx.AddYqlKernelJsonExists( + jsonExistsCallable.Column(), + jsonExistsCallable.Path(), + type); + jsonExistsFunc->SetKernelIdx(idx); + + if constexpr (NSsa::RuntimeVersion >= 4U) { + return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), ctx.ConvertToBlockType(type)}; + } else { + return {command->GetColumn().GetId(), ctx.ConvertToBlockType(type)}; + } +} + ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx) { if (auto maybeData = node.Maybe()) { return ConvertValueToColumn(maybeData.Cast(), ctx); @@ -423,6 +450,10 @@ ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx) { return ConvertJsonValueToColumn(maybeJsonValue.Cast(), ctx).Id; } + if (const auto maybeJsonExists = node.Maybe()) { + return CompileJsonExists(maybeJsonExists.Cast(), ctx).Id; + } + YQL_ENSURE(false, "Unknown node in OLAP comparison compiler: " << node.Ref().Content()); } @@ -647,33 +678,6 @@ const TTypedColumn CompileExists(const TKqpOlapFilterExists& exists, TKqpOlapCom } } -const TTypedColumn CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallable, TKqpOlapCompileContext& ctx) { - Y_ABORT_UNLESS(NKikimr::NSsa::RuntimeVersion >= 3, "JSON_EXISTS pushdown is supported starting from the v3 of SSA runtime."); - - const auto columnId = GetOrCreateColumnId(jsonExistsCallable.Column(), ctx); - const auto pathId = GetOrCreateColumnId(jsonExistsCallable.Path(), ctx); - - auto *const command = ctx.CreateAssignCmd(); - auto *const jsonExistsFunc = command->MutableFunction(); - - jsonExistsFunc->AddArguments()->SetId(columnId); - jsonExistsFunc->AddArguments()->SetId(pathId); - - jsonExistsFunc->SetFunctionType(TProgram::YQL_KERNEL); - const auto type = ctx.ExprCtx().MakeType(ctx.ExprCtx().MakeType(EDataSlot::Bool)); - const auto idx = ctx.AddYqlKernelJsonExists( - jsonExistsCallable.Column(), - jsonExistsCallable.Path(), - type); - jsonExistsFunc->SetKernelIdx(idx); - - if constexpr (NSsa::RuntimeVersion >= 4U) { - return {ConvertSafeCastToColumn(command->GetColumn().GetId(), "Uint8", ctx), ctx.ConvertToBlockType(type)}; - } else { - return {command->GetColumn().GetId(), ctx.ConvertToBlockType(type)}; - } -} - const TTypedColumn BuildLogicalNot(const TExprBase& arg, TKqpOlapCompileContext& ctx) { if (const auto maybeExists = arg.Maybe()) { return CompileExists(maybeExists.Cast(), ctx); From ab1e5ffe9b6757f270bfa5b70e0358416569da5a Mon Sep 17 00:00:00 2001 From: Anton Romanov Date: Tue, 2 Jan 2024 23:12:28 +0100 Subject: [PATCH 7/7] Enable tests. --- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index c2249b260d39..811b4d03d72b 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -4922,9 +4922,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Json_GetValue) { - // Should be fixed after Arrow kernel implementation for JSON_VALUE - // https://st.yandex-team.ru/KIKIMR-17903 - return; TAggregationTestCase testCase; testCase.SetQuery(R"( SELECT id, JSON_VALUE(jsonval, "$.col1"), JSON_VALUE(jsondoc, "$.col1") FROM `/Root/tableWithNulls` @@ -4941,9 +4938,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Json_GetValue_ToString) { - // Should be fixed after Arrow kernel implementation for JSON_VALUE - // https://st.yandex-team.ru/KIKIMR-17903 - return; TAggregationTestCase testCase; testCase.SetQuery(R"( SELECT id, JSON_VALUE(jsonval, "$.col1" RETURNING String), JSON_VALUE(jsondoc, "$.col1") FROM `/Root/tableWithNulls` @@ -4960,9 +4954,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Json_GetValue_ToInt) { - // Should be fixed after Arrow kernel implementation for JSON_VALUE - // https://st.yandex-team.ru/KIKIMR-17903 - return; TAggregationTestCase testCase; testCase.SetQuery(R"( SELECT id, JSON_VALUE(jsonval, "$.obj.obj_col2_int" RETURNING Int), JSON_VALUE(jsondoc, "$.obj.obj_col2_int" RETURNING Int) FROM `/Root/tableWithNulls` @@ -5027,9 +5018,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Json_Exists) { - // Should be fixed after Arrow kernel implementation for JSON_EXISTS - // https://st.yandex-team.ru/KIKIMR-17903 - return; TAggregationTestCase testCase; testCase.SetQuery(R"( SELECT id, JSON_EXISTS(jsonval, "$.col1"), JSON_EXISTS(jsondoc, "$.col1") FROM `/Root/tableWithNulls`