Skip to content

Precompute all replicated connections if one already is #7067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,115 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
return TStatus::Ok;
}

template <typename TExpr>
TVector<TExpr> CollectNodes(const TExprNode::TPtr& input) {
TVector<TExpr> result;

VisitExpr(input, [&result](const TExprNode::TPtr& node) {
if (TExpr::Match(node.Get())) {
result.emplace_back(TExpr(node));
}
return true;
});

return result;
}

bool FindPrecomputedOutputs(TDqStageBase stage, const TParentsMap& parentsMap) {
auto outIt = parentsMap.find(stage.Raw());
if (outIt == parentsMap.end()) {
return false;
}

for (auto& output : outIt->second) {
if (TDqOutput::Match(output)) {
auto connIt = parentsMap.find(output);
if (connIt != parentsMap.end()) {
for (auto maybeConn : connIt->second) {
auto parentIt = parentsMap.find(maybeConn);
if (parentIt != parentsMap.end()) {
for (auto& parent : parentIt->second) {
if (TDqPrecompute::Match(parent) || TDqPhyPrecompute::Match(parent)) {
return true;
}
}
}
}
}
}
}

return false;
}


TExprBase ReplicatePrecompute(TDqStageBase stage, TExprContext& ctx, const TParentsMap& parentsMap) {
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
auto input = stage.Inputs().Item(i);
if (auto maybeConn = stage.Inputs().Item(i).Maybe<TDqConnection>()) {
auto conn = maybeConn.Cast();
if (conn.Maybe<TDqCnValue>() || conn.Maybe<TDqCnUnionAll>()) {
{
auto sourceStage = conn.Output().Stage();
if (!sourceStage.Program().Body().Maybe<TDqReplicate>()) {
continue;
}

if (!FindPrecomputedOutputs(sourceStage, parentsMap)) {
continue;
}
}

auto arg = stage.Program().Args().Arg(i);
auto newArg = Build<TCoArgument>(ctx, stage.Program().Args().Arg(i).Pos())
.Name("_replaced_arg")
.Done();

TVector<TCoArgument> newArgs;
TNodeOnNodeOwnedMap programReplaces;
for (size_t j = 0; j < stage.Program().Args().Size(); ++j) {
auto oldArg = stage.Program().Args().Arg(j);
newArgs.push_back(Build<TCoArgument>(ctx, stage.Program().Args().Arg(i).Pos())
.Name("_replaced_arg_" + ToString(j))
.Done());
if (i == j) {
programReplaces[oldArg.Raw()] = Build<TCoToFlow>(ctx, oldArg.Pos()).Input(newArgs.back()).Done().Ptr();
} else {
programReplaces[oldArg.Raw()] = newArgs.back().Ptr();
}
}

return
Build<TDqStage>(ctx, stage.Pos())
.Inputs(ctx.ReplaceNode(stage.Inputs().Ptr(), input.Ref(), Build<TDqPhyPrecompute>(ctx, input.Pos()).Connection(conn).Done().Ptr()))
.Outputs(stage.Outputs())
.Settings(stage.Settings())
.Program()
.Args(newArgs)
.Body(TExprBase(ctx.ReplaceNodes(stage.Program().Body().Ptr(), programReplaces)))
.Build()
.Done();
}
}
}
return stage;
}

NYql::IGraphTransformer::TStatus ReplicatePrecomputeRule(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
TParentsMap parents;
GatherParents(*input, parents, true);
auto stages = CollectNodes<TDqStageBase>(input);
for (auto& stage : stages) {
auto applied = ReplicatePrecompute(stage, ctx, parents);
if (applied.Raw() != stage.Raw()) {
output = ctx.ReplaceNode(input.Get(), stage.Ref(), applied.Ptr());
return TStatus::Repeat;
}
}
output = input;
return TStatus::Ok;
}

template <typename TFunctor>
NYql::IGraphTransformer::TStatus PerformGlobalRule(const TString& ruleName, const NYql::TExprNode::TPtr& input,
NYql::TExprNode::TPtr& output, NYql::TExprContext& ctx, TFunctor func)
Expand Down Expand Up @@ -251,6 +360,8 @@ TAutoPtr<IGraphTransformer> CreateKqpFinalizingOptTransformer(const TIntrusivePt
[kqpCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
output = input;

PERFORM_GLOBAL_RULE("ReplicatePrecompute", input, output, ctx, ReplicatePrecomputeRule);

PERFORM_GLOBAL_RULE("ReplicateMultiUsedConnection", input, output, ctx,
[](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
YQL_ENSURE(TKqlQuery::Match(input.Get()));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2984,7 +2984,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

AssertTableStats(result, "/Root/Test", {
.ExpectedReads = 2,
.ExpectedReads = 1,
.ExpectedDeletes = 2,
});

Expand Down
107 changes: 107 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_returning_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,113 @@ using namespace NYdb::NTable;

Y_UNIT_TEST_SUITE(KqpReturning) {

Y_UNIT_TEST(ReturningTwice) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true);
auto serverSettings = TKikimrSettings().SetAppConfig(appConfig);
TKikimrRunner kikimr(serverSettings);

auto client = kikimr.GetTableClient();
auto session = client.CreateSession().GetValueSync().GetSession();

const auto queryCreate = Q_(R"(
CREATE TABLE IF NOT EXISTS tasks (
hashed_key Uint32,
queue_name String,
task_id String,
worker_id Int32,
running Bool,
eta Timestamp,
lock_timeout Timestamp,
num_fails Int32,
num_reschedules Int32,
body String,
first_fail Timestamp,
idempotency_run_id String,
PRIMARY KEY (hashed_key, queue_name, task_id)
);

CREATE TABLE IF NOT EXISTS tasks_eta_002 (
eta Timestamp,
hashed_key Uint32,
queue_name String,
task_id String,
PRIMARY KEY (eta, hashed_key, queue_name, task_id)
) WITH (
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1
);

CREATE TABLE IF NOT EXISTS tasks_processing_002 (
expiration_ts Timestamp,
hashed_key Uint32,
queue_name String,
task_id String,
PRIMARY KEY (expiration_ts, hashed_key, queue_name, task_id)
) WITH (
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1
);
)");

auto resultCreate = session.ExecuteSchemeQuery(queryCreate).GetValueSync();
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());

{
const auto query = Q_(R"(
--!syntax_v1
DECLARE $eta AS Timestamp;
DECLARE $expiration_ts AS Timestamp;
DECLARE $limit AS Int32;

$to_move = (
SELECT $expiration_ts AS expiration_ts, eta, hashed_key, queue_name, task_id
FROM tasks_eta_002
WHERE eta <= $eta
ORDER BY eta, hashed_key, queue_name, task_id
LIMIT $limit
);

UPSERT INTO tasks_processing_002 (expiration_ts, hashed_key, queue_name, task_id)
SELECT expiration_ts, hashed_key, queue_name, task_id FROM $to_move
RETURNING expiration_ts, hashed_key, queue_name, task_id;

UPSERT INTO tasks (hashed_key, queue_name, task_id, running, lock_timeout)
SELECT hashed_key, queue_name, task_id, True as running, $expiration_ts AS lock_timeout FROM $to_move;

DELETE FROM tasks_eta_002 ON
SELECT eta, hashed_key, queue_name, task_id FROM $to_move;
)");

auto params = TParamsBuilder()
.AddParam("$eta").Timestamp(TInstant::Zero()).Build()
.AddParam("$expiration_ts").Timestamp(TInstant::Zero()).Build()
.AddParam("$limit").Int32(1).Build()
.Build();

NYdb::NTable::TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Full);

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params, execSettings).GetValueSync();
UNIT_ASSERT(result.IsSuccess());

size_t eta_table_access = 0;
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

for (auto phase : stats.query_phases()) {
for (auto table : phase.table_access()) {
if (table.name() == "/Root/tasks_eta_002") {
eta_table_access++;
}
}
}
Cerr << "access count " << eta_table_access << Endl;
UNIT_ASSERT_EQUAL(eta_table_access, 1);
//Cerr << stats.Utf8DebugString() << Endl;
}
}

Y_UNIT_TEST(ReturningSerial) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
Expand Down
Loading