Skip to content

Commit b98e94b

Browse files
committed
Precompute all replicated connections if one is already precomputed (ydb-platform#7067)
1 parent 064fd75 commit b98e94b

File tree

3 files changed

+219
-1
lines changed

3 files changed

+219
-1
lines changed

ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp

+111
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,115 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
222222
return TStatus::Ok;
223223
}
224224

225+
template <typename TExpr>
226+
TVector<TExpr> CollectNodes(const TExprNode::TPtr& input) {
227+
TVector<TExpr> result;
228+
229+
VisitExpr(input, [&result](const TExprNode::TPtr& node) {
230+
if (TExpr::Match(node.Get())) {
231+
result.emplace_back(TExpr(node));
232+
}
233+
return true;
234+
});
235+
236+
return result;
237+
}
238+
239+
bool FindPrecomputedOutputs(TDqStageBase stage, const TParentsMap& parentsMap) {
240+
auto outIt = parentsMap.find(stage.Raw());
241+
if (outIt == parentsMap.end()) {
242+
return false;
243+
}
244+
245+
for (auto& output : outIt->second) {
246+
if (TDqOutput::Match(output)) {
247+
auto connIt = parentsMap.find(output);
248+
if (connIt != parentsMap.end()) {
249+
for (auto maybeConn : connIt->second) {
250+
auto parentIt = parentsMap.find(maybeConn);
251+
if (parentIt != parentsMap.end()) {
252+
for (auto& parent : parentIt->second) {
253+
if (TDqPrecompute::Match(parent) || TDqPhyPrecompute::Match(parent)) {
254+
return true;
255+
}
256+
}
257+
}
258+
}
259+
}
260+
}
261+
}
262+
263+
return false;
264+
}
265+
266+
267+
TExprBase ReplicatePrecompute(TDqStageBase stage, TExprContext& ctx, const TParentsMap& parentsMap) {
268+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
269+
auto input = stage.Inputs().Item(i);
270+
if (auto maybeConn = stage.Inputs().Item(i).Maybe<TDqConnection>()) {
271+
auto conn = maybeConn.Cast();
272+
if (conn.Maybe<TDqCnValue>() || conn.Maybe<TDqCnUnionAll>()) {
273+
{
274+
auto sourceStage = conn.Output().Stage();
275+
if (!sourceStage.Program().Body().Maybe<TDqReplicate>()) {
276+
continue;
277+
}
278+
279+
if (!FindPrecomputedOutputs(sourceStage, parentsMap)) {
280+
continue;
281+
}
282+
}
283+
284+
auto arg = stage.Program().Args().Arg(i);
285+
auto newArg = Build<TCoArgument>(ctx, stage.Program().Args().Arg(i).Pos())
286+
.Name("_replaced_arg")
287+
.Done();
288+
289+
TVector<TCoArgument> newArgs;
290+
TNodeOnNodeOwnedMap programReplaces;
291+
for (size_t j = 0; j < stage.Program().Args().Size(); ++j) {
292+
auto oldArg = stage.Program().Args().Arg(j);
293+
newArgs.push_back(Build<TCoArgument>(ctx, stage.Program().Args().Arg(i).Pos())
294+
.Name("_replaced_arg_" + ToString(j))
295+
.Done());
296+
if (i == j) {
297+
programReplaces[oldArg.Raw()] = Build<TCoToFlow>(ctx, oldArg.Pos()).Input(newArgs.back()).Done().Ptr();
298+
} else {
299+
programReplaces[oldArg.Raw()] = newArgs.back().Ptr();
300+
}
301+
}
302+
303+
return
304+
Build<TDqStage>(ctx, stage.Pos())
305+
.Inputs(ctx.ReplaceNode(stage.Inputs().Ptr(), input.Ref(), Build<TDqPhyPrecompute>(ctx, input.Pos()).Connection(conn).Done().Ptr()))
306+
.Outputs(stage.Outputs())
307+
.Settings(stage.Settings())
308+
.Program()
309+
.Args(newArgs)
310+
.Body(TExprBase(ctx.ReplaceNodes(stage.Program().Body().Ptr(), programReplaces)))
311+
.Build()
312+
.Done();
313+
}
314+
}
315+
}
316+
return stage;
317+
}
318+
319+
NYql::IGraphTransformer::TStatus ReplicatePrecomputeRule(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
320+
TParentsMap parents;
321+
GatherParents(*input, parents, true);
322+
auto stages = CollectNodes<TDqStageBase>(input);
323+
for (auto& stage : stages) {
324+
auto applied = ReplicatePrecompute(stage, ctx, parents);
325+
if (applied.Raw() != stage.Raw()) {
326+
output = ctx.ReplaceNode(input.Get(), stage.Ref(), applied.Ptr());
327+
return TStatus::Repeat;
328+
}
329+
}
330+
output = input;
331+
return TStatus::Ok;
332+
}
333+
225334
template <typename TFunctor>
226335
NYql::IGraphTransformer::TStatus PerformGlobalRule(const TString& ruleName, const NYql::TExprNode::TPtr& input,
227336
NYql::TExprNode::TPtr& output, NYql::TExprContext& ctx, TFunctor func)
@@ -251,6 +360,8 @@ TAutoPtr<IGraphTransformer> CreateKqpFinalizingOptTransformer(const TIntrusivePt
251360
[kqpCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
252361
output = input;
253362

363+
PERFORM_GLOBAL_RULE("ReplicatePrecompute", input, output, ctx, ReplicatePrecomputeRule);
364+
254365
PERFORM_GLOBAL_RULE("ReplicateMultiUsedConnection", input, output, ctx,
255366
[](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
256367
YQL_ENSURE(TKqlQuery::Match(input.Get()));

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -3095,7 +3095,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
30953095
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
30963096

30973097
AssertTableStats(result, "/Root/Test", {
3098-
.ExpectedReads = 2,
3098+
.ExpectedReads = 1,
30993099
.ExpectedDeletes = 2,
31003100
});
31013101

ydb/core/kqp/ut/opt/kqp_returning_ut.cpp

+107
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,113 @@ using namespace NYdb::NTable;
1111

1212
Y_UNIT_TEST_SUITE(KqpReturning) {
1313

14+
Y_UNIT_TEST(ReturningTwice) {
15+
NKikimrConfig::TAppConfig appConfig;
16+
appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
17+
appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true);
18+
auto serverSettings = TKikimrSettings().SetAppConfig(appConfig);
19+
TKikimrRunner kikimr(serverSettings);
20+
21+
auto client = kikimr.GetTableClient();
22+
auto session = client.CreateSession().GetValueSync().GetSession();
23+
24+
const auto queryCreate = Q_(R"(
25+
CREATE TABLE IF NOT EXISTS tasks (
26+
hashed_key Uint32,
27+
queue_name String,
28+
task_id String,
29+
worker_id Int32,
30+
running Bool,
31+
eta Timestamp,
32+
lock_timeout Timestamp,
33+
num_fails Int32,
34+
num_reschedules Int32,
35+
body String,
36+
first_fail Timestamp,
37+
idempotency_run_id String,
38+
PRIMARY KEY (hashed_key, queue_name, task_id)
39+
);
40+
41+
CREATE TABLE IF NOT EXISTS tasks_eta_002 (
42+
eta Timestamp,
43+
hashed_key Uint32,
44+
queue_name String,
45+
task_id String,
46+
PRIMARY KEY (eta, hashed_key, queue_name, task_id)
47+
) WITH (
48+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
49+
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1
50+
);
51+
52+
CREATE TABLE IF NOT EXISTS tasks_processing_002 (
53+
expiration_ts Timestamp,
54+
hashed_key Uint32,
55+
queue_name String,
56+
task_id String,
57+
PRIMARY KEY (expiration_ts, hashed_key, queue_name, task_id)
58+
) WITH (
59+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
60+
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1
61+
);
62+
)");
63+
64+
auto resultCreate = session.ExecuteSchemeQuery(queryCreate).GetValueSync();
65+
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());
66+
67+
{
68+
const auto query = Q_(R"(
69+
--!syntax_v1
70+
DECLARE $eta AS Timestamp;
71+
DECLARE $expiration_ts AS Timestamp;
72+
DECLARE $limit AS Int32;
73+
74+
$to_move = (
75+
SELECT $expiration_ts AS expiration_ts, eta, hashed_key, queue_name, task_id
76+
FROM tasks_eta_002
77+
WHERE eta <= $eta
78+
ORDER BY eta, hashed_key, queue_name, task_id
79+
LIMIT $limit
80+
);
81+
82+
UPSERT INTO tasks_processing_002 (expiration_ts, hashed_key, queue_name, task_id)
83+
SELECT expiration_ts, hashed_key, queue_name, task_id FROM $to_move
84+
RETURNING expiration_ts, hashed_key, queue_name, task_id;
85+
86+
UPSERT INTO tasks (hashed_key, queue_name, task_id, running, lock_timeout)
87+
SELECT hashed_key, queue_name, task_id, True as running, $expiration_ts AS lock_timeout FROM $to_move;
88+
89+
DELETE FROM tasks_eta_002 ON
90+
SELECT eta, hashed_key, queue_name, task_id FROM $to_move;
91+
)");
92+
93+
auto params = TParamsBuilder()
94+
.AddParam("$eta").Timestamp(TInstant::Zero()).Build()
95+
.AddParam("$expiration_ts").Timestamp(TInstant::Zero()).Build()
96+
.AddParam("$limit").Int32(1).Build()
97+
.Build();
98+
99+
NYdb::NTable::TExecDataQuerySettings execSettings;
100+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Full);
101+
102+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params, execSettings).GetValueSync();
103+
UNIT_ASSERT(result.IsSuccess());
104+
105+
size_t eta_table_access = 0;
106+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
107+
108+
for (auto phase : stats.query_phases()) {
109+
for (auto table : phase.table_access()) {
110+
if (table.name() == "/Root/tasks_eta_002") {
111+
eta_table_access++;
112+
}
113+
}
114+
}
115+
Cerr << "access count " << eta_table_access << Endl;
116+
UNIT_ASSERT_EQUAL(eta_table_access, 1);
117+
//Cerr << stats.Utf8DebugString() << Endl;
118+
}
119+
}
120+
14121
Y_UNIT_TEST(ReturningSerial) {
15122
NKikimrConfig::TAppConfig appConfig;
16123
appConfig.MutableTableServiceConfig()->SetEnableSequences(true);

0 commit comments

Comments
 (0)