Skip to content

Commit b80d973

Browse files
authored
Merge e9a91df into 20355c6
2 parents 20355c6 + e9a91df commit b80d973

File tree

2 files changed

+192
-1
lines changed

2 files changed

+192
-1
lines changed

ydb/core/kqp/opt/kqp_opt_build_txs.cpp

+85-1
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
771771

772772
TNodeOnNodeOwnedMap phaseStagesMap;
773773
TVector<TKqlQueryResult> phaseResults;
774-
TVector<TDqPhyPrecompute> computedInputs;
774+
TVector<TExprBase> computedInputs;
775775
TNodeSet computedInputsSet;
776776

777777
// Gather all Precompute stages, that are independent of any other stage and form phase of execution
@@ -785,12 +785,96 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
785785
phaseStagesMap.emplace(raw, ptr);
786786
}
787787

788+
{
789+
TNodeOnNodeOwnedMap fullPhaseStagesMap;
790+
for (auto& [_, stagePtr] : phaseStagesMap) {
791+
VisitExpr(stagePtr,
792+
[&](const TExprNode::TPtr& node) {
793+
if (TExprBase(node).Maybe<TDqStage>()) {
794+
fullPhaseStagesMap[node.Get()] = node;
795+
}
796+
return true;
797+
});
798+
}
799+
phaseStagesMap.swap(fullPhaseStagesMap);
800+
801+
TNodeOnNodeOwnedMap fullDependantMap;
802+
VisitExpr(query.Ptr(),
803+
[&](const TExprNode::TPtr& node) {
804+
if (phaseStagesMap.contains(node.Get())) {
805+
return false;
806+
}
807+
if (TExprBase(node).Maybe<TDqStage>()) {
808+
fullDependantMap[node.Get()] = node;
809+
}
810+
return true;
811+
});
812+
dependantStagesMap.swap(fullDependantMap);
813+
}
814+
788815
if (phaseStagesMap.empty()) {
789816
output = query.Ptr();
790817
ctx.AddError(TIssue(ctx.GetPosition(query.Pos()), "Phase stages is empty"));
791818
return TStatus::Error;
792819
}
793820

821+
// so that all outputs to dependent stages are precomputes
822+
{
823+
TSet<TExprNode*> buildingTxStages;
824+
825+
TNodeOnNodeOwnedMap replaces;
826+
827+
for (auto& [_, stagePtr] : dependantStagesMap) {
828+
TDqStage stage(stagePtr);
829+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
830+
auto input = stage.Inputs().Item(i);
831+
if (auto maybeConn = input.Maybe<TDqConnection>()) {
832+
auto conn = maybeConn.Cast();
833+
if (!conn.Maybe<TDqCnValue>() && !conn.Maybe<TDqCnUnionAll>()) {
834+
continue;
835+
}
836+
837+
if (phaseStagesMap.contains(conn.Output().Stage().Raw())) {
838+
auto oldArg = stage.Program().Args().Arg(i);
839+
auto newArg = Build<TCoArgument>(ctx, stage.Program().Args().Arg(i).Pos())
840+
.Name("_replaced_arg")
841+
.Done();
842+
843+
TVector<TCoArgument> newArgs;
844+
TNodeOnNodeOwnedMap programReplaces;
845+
for (size_t j = 0; j < stage.Program().Args().Size(); ++j) {
846+
auto oldArg = stage.Program().Args().Arg(j);
847+
newArgs.push_back(Build<TCoArgument>(ctx, stage.Program().Args().Arg(i).Pos())
848+
.Name("_replaced_arg_" + ToString(j))
849+
.Done());
850+
if (i == j) {
851+
programReplaces[oldArg.Raw()] = Build<TCoToFlow>(ctx, oldArg.Pos()).Input(newArgs.back()).Done().Ptr();
852+
} else {
853+
programReplaces[oldArg.Raw()] = newArgs.back().Ptr();
854+
}
855+
}
856+
857+
replaces[stage.Raw()] =
858+
Build<TDqStage>(ctx, stage.Pos())
859+
.Inputs(ctx.ReplaceNode(stage.Inputs().Ptr(), input.Ref(), Build<TDqPhyPrecompute>(ctx, input.Pos()).Connection(conn).Done().Ptr()))
860+
.Outputs(stage.Outputs())
861+
.Settings(stage.Settings())
862+
.Program()
863+
.Args(newArgs)
864+
.Body(TExprBase(ctx.ReplaceNodes(stage.Program().Body().Ptr(), programReplaces)))
865+
.Build()
866+
.Done().Ptr();
867+
}
868+
}
869+
}
870+
}
871+
872+
if (!replaces.empty()) {
873+
output = ctx.ReplaceNodes(query.Ptr(), replaces);
874+
return TStatus(TStatus::Repeat, true);
875+
}
876+
}
877+
794878
for (auto& [_, stagePtr] : dependantStagesMap) {
795879
TDqStage stage(stagePtr);
796880
auto precomputes = PrecomputeInputs(stage);

ydb/core/kqp/ut/opt/kqp_returning_ut.cpp

+107
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,113 @@ using namespace NYdb::NTable;
1111

1212
Y_UNIT_TEST_SUITE(KqpReturning) {
1313

14+
Y_UNIT_TEST(ReturningTwice) {
15+
NKikimrConfig::TAppConfig appConfig;
16+
appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
17+
appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true);
18+
auto serverSettings = TKikimrSettings().SetAppConfig(appConfig);
19+
TKikimrRunner kikimr(serverSettings);
20+
21+
auto client = kikimr.GetTableClient();
22+
auto session = client.CreateSession().GetValueSync().GetSession();
23+
24+
const auto queryCreate = Q_(R"(
25+
CREATE TABLE IF NOT EXISTS tasks (
26+
hashed_key Uint32,
27+
queue_name String,
28+
task_id String,
29+
worker_id Int32,
30+
running Bool,
31+
eta Timestamp,
32+
lock_timeout Timestamp,
33+
num_fails Int32,
34+
num_reschedules Int32,
35+
body String,
36+
first_fail Timestamp,
37+
idempotency_run_id String,
38+
PRIMARY KEY (hashed_key, queue_name, task_id)
39+
);
40+
41+
CREATE TABLE IF NOT EXISTS tasks_eta_002 (
42+
eta Timestamp,
43+
hashed_key Uint32,
44+
queue_name String,
45+
task_id String,
46+
PRIMARY KEY (eta, hashed_key, queue_name, task_id)
47+
) WITH (
48+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
49+
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1
50+
);
51+
52+
CREATE TABLE IF NOT EXISTS tasks_processing_002 (
53+
expiration_ts Timestamp,
54+
hashed_key Uint32,
55+
queue_name String,
56+
task_id String,
57+
PRIMARY KEY (expiration_ts, hashed_key, queue_name, task_id)
58+
) WITH (
59+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
60+
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1
61+
);
62+
)");
63+
64+
auto resultCreate = session.ExecuteSchemeQuery(queryCreate).GetValueSync();
65+
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());
66+
67+
{
68+
const auto query = Q_(R"(
69+
--!syntax_v1
70+
DECLARE $eta AS Timestamp;
71+
DECLARE $expiration_ts AS Timestamp;
72+
DECLARE $limit AS Int32;
73+
74+
$to_move = (
75+
SELECT $expiration_ts AS expiration_ts, eta, hashed_key, queue_name, task_id
76+
FROM tasks_eta_002
77+
WHERE eta <= $eta
78+
ORDER BY eta, hashed_key, queue_name, task_id
79+
LIMIT $limit
80+
);
81+
82+
UPSERT INTO tasks_processing_002 (expiration_ts, hashed_key, queue_name, task_id)
83+
SELECT expiration_ts, hashed_key, queue_name, task_id FROM $to_move
84+
RETURNING expiration_ts, hashed_key, queue_name, task_id;
85+
86+
UPSERT INTO tasks (hashed_key, queue_name, task_id, running, lock_timeout)
87+
SELECT hashed_key, queue_name, task_id, True as running, $expiration_ts AS lock_timeout FROM $to_move;
88+
89+
DELETE FROM tasks_eta_002 ON
90+
SELECT eta, hashed_key, queue_name, task_id FROM $to_move;
91+
)");
92+
93+
auto params = TParamsBuilder()
94+
.AddParam("$eta").Timestamp(TInstant::Zero()).Build()
95+
.AddParam("$expiration_ts").Timestamp(TInstant::Zero()).Build()
96+
.AddParam("$limit").Int32(1).Build()
97+
.Build();
98+
99+
NYdb::NTable::TExecDataQuerySettings execSettings;
100+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Full);
101+
102+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params, execSettings).GetValueSync();
103+
UNIT_ASSERT(result.IsSuccess());
104+
105+
size_t eta_table_access = 0;
106+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
107+
108+
for (auto phase : stats.query_phases()) {
109+
for (auto table : phase.table_access()) {
110+
if (table.name() == "/Root/tasks_eta_002") {
111+
eta_table_access++;
112+
}
113+
}
114+
}
115+
Cerr << "access count " << eta_table_access << Endl;
116+
UNIT_ASSERT_EQUAL(eta_table_access, 1);
117+
//Cerr << stats.Utf8DebugString() << Endl;
118+
}
119+
}
120+
14121
Y_UNIT_TEST(ReturningSerial) {
15122
NKikimrConfig::TAppConfig appConfig;
16123
appConfig.MutableTableServiceConfig()->SetEnableSequences(true);

0 commit comments

Comments
 (0)