diff --git a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp index 8255e13914e3..bfd9839cdf62 100644 --- a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp +++ b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp @@ -222,6 +222,115 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu return TStatus::Ok; } +template +TVector CollectNodes(const TExprNode::TPtr& input) { + TVector result; + + VisitExpr(input, [&result](const TExprNode::TPtr& node) { + if (TExpr::Match(node.Get())) { + result.emplace_back(TExpr(node)); + } + return true; + }); + + return result; +} + +bool FindPrecomputedOutputs(TDqStageBase stage, const TParentsMap& parentsMap) { + auto outIt = parentsMap.find(stage.Raw()); + if (outIt == parentsMap.end()) { + return false; + } + + for (auto& output : outIt->second) { + if (TDqOutput::Match(output)) { + auto connIt = parentsMap.find(output); + if (connIt != parentsMap.end()) { + for (auto maybeConn : connIt->second) { + auto parentIt = parentsMap.find(maybeConn); + if (parentIt != parentsMap.end()) { + for (auto& parent : parentIt->second) { + if (TDqPrecompute::Match(parent) || TDqPhyPrecompute::Match(parent)) { + return true; + } + } + } + } + } + } + } + + return false; +} + + +TExprBase ReplicatePrecompute(TDqStageBase stage, TExprContext& ctx, const TParentsMap& parentsMap) { + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + auto input = stage.Inputs().Item(i); + if (auto maybeConn = stage.Inputs().Item(i).Maybe()) { + auto conn = maybeConn.Cast(); + if (conn.Maybe() || conn.Maybe()) { + { + auto sourceStage = conn.Output().Stage(); + if (!sourceStage.Program().Body().Maybe()) { + continue; + } + + if (!FindPrecomputedOutputs(sourceStage, parentsMap)) { + continue; + } + } + + auto arg = stage.Program().Args().Arg(i); + auto newArg = Build(ctx, stage.Program().Args().Arg(i).Pos()) + .Name("_replaced_arg") + .Done(); + + TVector newArgs; + TNodeOnNodeOwnedMap programReplaces; + for (size_t j = 0; j < stage.Program().Args().Size(); ++j) { + auto oldArg = stage.Program().Args().Arg(j); + newArgs.push_back(Build(ctx, stage.Program().Args().Arg(i).Pos()) + .Name("_replaced_arg_" + ToString(j)) + .Done()); + if (i == j) { + programReplaces[oldArg.Raw()] = Build(ctx, oldArg.Pos()).Input(newArgs.back()).Done().Ptr(); + } else { + programReplaces[oldArg.Raw()] = newArgs.back().Ptr(); + } + } + + return + Build(ctx, stage.Pos()) + .Inputs(ctx.ReplaceNode(stage.Inputs().Ptr(), input.Ref(), Build(ctx, input.Pos()).Connection(conn).Done().Ptr())) + .Outputs(stage.Outputs()) + .Settings(stage.Settings()) + .Program() + .Args(newArgs) + .Body(TExprBase(ctx.ReplaceNodes(stage.Program().Body().Ptr(), programReplaces))) + .Build() + .Done(); + } + } + } + return stage; +} + +NYql::IGraphTransformer::TStatus ReplicatePrecomputeRule(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + TParentsMap parents; + GatherParents(*input, parents, true); + auto stages = CollectNodes(input); + for (auto& stage : stages) { + auto applied = ReplicatePrecompute(stage, ctx, parents); + if (applied.Raw() != stage.Raw()) { + output = ctx.ReplaceNode(input.Get(), stage.Ref(), applied.Ptr()); + return TStatus::Repeat; + } + } + output = input; + return TStatus::Ok; +} + template NYql::IGraphTransformer::TStatus PerformGlobalRule(const TString& ruleName, const NYql::TExprNode::TPtr& input, NYql::TExprNode::TPtr& output, NYql::TExprContext& ctx, TFunctor func) @@ -251,6 +360,8 @@ TAutoPtr CreateKqpFinalizingOptTransformer(const TIntrusivePt [kqpCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus { output = input; + PERFORM_GLOBAL_RULE("ReplicatePrecompute", input, output, ctx, ReplicatePrecomputeRule); + PERFORM_GLOBAL_RULE("ReplicateMultiUsedConnection", input, output, ctx, [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { YQL_ENSURE(TKqlQuery::Match(input.Get())); diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 5f633c1d2d33..42eed20d4d26 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -2984,7 +2984,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); AssertTableStats(result, "/Root/Test", { - .ExpectedReads = 2, + .ExpectedReads = 1, .ExpectedDeletes = 2, }); diff --git a/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp b/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp index 837d81d654de..3d562af77241 100644 --- a/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp @@ -11,6 +11,113 @@ using namespace NYdb::NTable; Y_UNIT_TEST_SUITE(KqpReturning) { +Y_UNIT_TEST(ReturningTwice) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(true); + appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true); + auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); + TKikimrRunner kikimr(serverSettings); + + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + + const auto queryCreate = Q_(R"( + CREATE TABLE IF NOT EXISTS tasks ( + hashed_key Uint32, + queue_name String, + task_id String, + worker_id Int32, + running Bool, + eta Timestamp, + lock_timeout Timestamp, + num_fails Int32, + num_reschedules Int32, + body String, + first_fail Timestamp, + idempotency_run_id String, + PRIMARY KEY (hashed_key, queue_name, task_id) + ); + + CREATE TABLE IF NOT EXISTS tasks_eta_002 ( + eta Timestamp, + hashed_key Uint32, + queue_name String, + task_id String, + PRIMARY KEY (eta, hashed_key, queue_name, task_id) + ) WITH ( + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1 + ); + + CREATE TABLE IF NOT EXISTS tasks_processing_002 ( + expiration_ts Timestamp, + hashed_key Uint32, + queue_name String, + task_id String, + PRIMARY KEY (expiration_ts, hashed_key, queue_name, task_id) + ) WITH ( + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1 + ); + )"); + + auto resultCreate = session.ExecuteSchemeQuery(queryCreate).GetValueSync(); + UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString()); + + { + const auto query = Q_(R"( + --!syntax_v1 + DECLARE $eta AS Timestamp; + DECLARE $expiration_ts AS Timestamp; + DECLARE $limit AS Int32; + + $to_move = ( + SELECT $expiration_ts AS expiration_ts, eta, hashed_key, queue_name, task_id + FROM tasks_eta_002 + WHERE eta <= $eta + ORDER BY eta, hashed_key, queue_name, task_id + LIMIT $limit + ); + + UPSERT INTO tasks_processing_002 (expiration_ts, hashed_key, queue_name, task_id) + SELECT expiration_ts, hashed_key, queue_name, task_id FROM $to_move + RETURNING expiration_ts, hashed_key, queue_name, task_id; + + UPSERT INTO tasks (hashed_key, queue_name, task_id, running, lock_timeout) + SELECT hashed_key, queue_name, task_id, True as running, $expiration_ts AS lock_timeout FROM $to_move; + + DELETE FROM tasks_eta_002 ON + SELECT eta, hashed_key, queue_name, task_id FROM $to_move; + )"); + + auto params = TParamsBuilder() + .AddParam("$eta").Timestamp(TInstant::Zero()).Build() + .AddParam("$expiration_ts").Timestamp(TInstant::Zero()).Build() + .AddParam("$limit").Int32(1).Build() + .Build(); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Full); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params, execSettings).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + size_t eta_table_access = 0; + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + for (auto phase : stats.query_phases()) { + for (auto table : phase.table_access()) { + if (table.name() == "/Root/tasks_eta_002") { + eta_table_access++; + } + } + } + Cerr << "access count " << eta_table_access << Endl; + UNIT_ASSERT_EQUAL(eta_table_access, 1); + //Cerr << stats.Utf8DebugString() << Endl; + } +} + Y_UNIT_TEST(ReturningSerial) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableSequences(true);