Skip to content

Commit eceb59a

Browse files
authored
Fix opt rules for generic queries. (KIKIMR-16294) (#860)
* Fix opt rules for generic queries. (KIKIMR-16294)
1 parent b7bc4dd commit eceb59a

10 files changed

+53
-65
lines changed

ydb/core/kqp/opt/kqp_opt.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
3434
}
3535

3636
bool IsGenericQuery() const {
37-
return QueryCtx->Type == NYql::EKikimrQueryType::Query;
37+
return QueryCtx->Type == NYql::EKikimrQueryType::Query || QueryCtx->Type == NYql::EKikimrQueryType::Script;
3838
}
3939
};
4040

ydb/core/kqp/opt/kqp_opt_build_txs.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,9 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
745745
YQL_CLOG(TRACE, ProviderKqp) << "[BuildTx] " << KqpExprToPrettyString(*result, ctx)
746746
<< ", isPrecompute: " << isPrecompute;
747747

748-
auto& transformer = KqpCtx->IsDataQuery() ? *DataTxTransformer : *ScanTxTransformer;
748+
auto& transformer = KqpCtx->IsScanQuery() ? *ScanTxTransformer : *DataTxTransformer;
749+
750+
749751
transformer.Rewind();
750752
BuildTxTransformer->Init(KqpCtx->QueryCtx->Type, isPrecompute);
751753
auto expr = result;

ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp

+6-5
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@ TExprBase KqpRewriteIndexRead(const TExprBase& node, TExprContext& ctx, const TK
346346
}
347347

348348
TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
349-
if (!kqpCtx.IsDataQuery()) {
349+
if (kqpCtx.IsScanQuery()) {
350+
// TODO: Enable index lookup for scan queries as we now support stream lookups.
350351
return node;
351352
}
352353

@@ -499,7 +500,7 @@ bool KeySelectorAllMembers(const TCoLambda& lambda, const TSet<TString> & applyC
499500
}
500501

501502
// Construct a new lambda with renamed attributes based on the mapping
502-
// If we see a complex expression in the key selector, we just pass it on into
503+
// If we see a complex expression in the key selector, we just pass it on into
503504
// the new lambda
504505
TCoLambda RenameKeySelector(const TCoLambda& lambda, TExprContext& ctx, const THashMap<TString,TString>& map) {
505506
// If its single member lambda body
@@ -533,7 +534,7 @@ TCoLambda RenameKeySelector(const TCoLambda& lambda, TExprContext& ctx, const TH
533534
.Done();
534535
members.push_back(member);
535536
}
536-
537+
537538
return Build<TCoLambda>(ctx, lambda.Pos())
538539
.Args({arg})
539540
.Body<TExprList>()
@@ -544,8 +545,8 @@ TCoLambda RenameKeySelector(const TCoLambda& lambda, TExprContext& ctx, const TH
544545
}
545546

546547

547-
// If we have a top-sort over flatmap, we can push it throught is, so that the
548-
// RewriteTopSortOverIndexRead rule can fire next. If the flatmap renames some of the sort
548+
// If we have a top-sort over flatmap, we can push it throught is, so that the
549+
// RewriteTopSortOverIndexRead rule can fire next. If the flatmap renames some of the sort
549550
// attributes, we need to use the original names in the top-sort. When pushing TopSort below
550551
// FlatMap, we change FlatMap to OrderedFlatMap to preserve the order of its input.
551552
TExprBase KqpRewriteTopSortOverFlatMap(const TExprBase& node, TExprContext& ctx) {

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
618618
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
619619
&& supportedStreamJoinKinds.contains(join.JoinType().Value());
620620

621-
bool needPrecomputeLeft = kqpCtx.IsDataQuery()
621+
bool needPrecomputeLeft = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
622622
&& !join.LeftInput().Maybe<TCoParameter>()
623623
&& !IsParameterToListOfStructsRepack(join.LeftInput())
624624
&& !useStreamIndexLookupJoin;

ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
144144
TMaybeNode<TCoAtom> indexName;
145145

146146
//TODO: remove this branch KIKIMR-15255, KIKIMR-15321
147-
if (!readMatch && kqpCtx.IsDataQuery()) {
147+
if (!readMatch && (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())) {
148148
if (auto readRangesMatch = MatchRead<TKqlReadTableRangesBase>(flatmap.Input())) {
149149
auto read = readRangesMatch->Read.Cast<TKqlReadTableRangesBase>();
150150
if (TCoVoid::Match(read.Ranges().Raw())) {
@@ -239,6 +239,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
239239
}
240240

241241
TMaybeNode<TExprBase> readInput;
242+
// TODO: Use single implementation for all kinds of queries.
242243
if (useDataOrGenericQueryLookup) {
243244
auto lookupKeys = BuildEquiRangeLookup(keyRange, tableDesc, read.Pos(), ctx);
244245

ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

+23-17
Original file line numberDiff line numberDiff line change
@@ -374,34 +374,40 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
374374
{
375375
auto buildLookup = [&] (TExprNode::TPtr keys, TMaybe<TExprBase>& result) {
376376
if (indexName) {
377-
if (kqpCtx.IsDataQuery()) {
377+
if (kqpCtx.IsScanQuery()) {
378+
if (kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
379+
result = Build<TKqlStreamLookupIndex>(ctx, node.Pos())
380+
.Table(read.Table())
381+
.Columns(read.Columns())
382+
.LookupKeys(keys)
383+
.Index(indexName.Cast())
384+
.LookupKeys(keys)
385+
.Done();
386+
}
387+
} else {
378388
result = Build<TKqlLookupIndex>(ctx, node.Pos())
379389
.Table(read.Table())
380390
.Columns(read.Columns())
381391
.LookupKeys(keys)
382392
.Index(indexName.Cast())
383393
.Done();
384-
} else if (kqpCtx.IsScanQuery() && kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
385-
result = Build<TKqlStreamLookupIndex>(ctx, node.Pos())
394+
}
395+
} else {
396+
if (kqpCtx.IsScanQuery()) {
397+
if (kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
398+
result = Build<TKqlStreamLookupTable>(ctx, node.Pos())
399+
.Table(read.Table())
400+
.Columns(read.Columns())
401+
.LookupKeys(keys)
402+
.Done();
403+
}
404+
} else {
405+
result = Build<TKqlLookupTable>(ctx, node.Pos())
386406
.Table(read.Table())
387407
.Columns(read.Columns())
388408
.LookupKeys(keys)
389-
.Index(indexName.Cast())
390-
.LookupKeys(keys)
391409
.Done();
392410
}
393-
} else if (kqpCtx.IsDataQuery()) {
394-
result = Build<TKqlLookupTable>(ctx, node.Pos())
395-
.Table(read.Table())
396-
.Columns(read.Columns())
397-
.LookupKeys(keys)
398-
.Done();
399-
} else if (kqpCtx.IsScanQuery() && kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
400-
result = Build<TKqlStreamLookupTable>(ctx, node.Pos())
401-
.Table(read.Table())
402-
.Columns(read.Columns())
403-
.LookupKeys(keys)
404-
.Done();
405411
}
406412
};
407413

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
410410
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx,
411411
IOptimizationContext& optCtx, const TGetParents& getParents)
412412
{
413+
// TODO: Allow push to left stage for data queries.
414+
// It is now possible as we don't use datashard transactions for reads in data queries.
413415
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
414416
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
415417
pushLeftStage, KqpCtx.Config->GetHashJoinMode()

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
309309
value Utf8 NOT NULL
310310
)
311311
) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key
312+
ORDER BY key
312313
)"
313314
, "external_source"_a = externalDataSourceName
314315
, "ydb_table"_a = ydbTable)).ExtractValueSync();
@@ -448,6 +449,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
448449
value Utf8 NOT NULL
449450
)
450451
) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key
452+
ORDER BY key
451453
)"
452454
, "external_source"_a = externalDataSourceName
453455
, "ydb_table"_a = ydbTable)).ExtractValueSync();

ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp

+10-38
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
162162
WHERE Key = $key;
163163
)"), params);
164164

165-
if (QueryService) {
166-
// TODO: Fix QS.
167-
return;
168-
}
165+
// Cerr << stats.query_plan() << Endl;
169166

170167
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
171168
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
@@ -208,16 +205,12 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
208205

209206
// Cerr << stats.query_plan() << Endl;
210207

211-
if (QueryService) {
212-
// TODO: Fix QS.
213-
return;
214-
}
215-
216208
AssertTableStats(stats, "/Root/Join1", {
217209
.ExpectedReads = 3,
218210
});
219211

220-
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); // Precompute limit Min(1001,$limit),
212+
// For data query, additional precompute for LIMIT: Min(1001,$limit)
213+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), QueryService ? 1 : 2);
221214
for (const auto& phase : stats.query_phases()) {
222215
UNIT_ASSERT(phase.affected_shards() <= 1);
223216
}
@@ -226,15 +219,16 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
226219
NJson::ReadJsonTree(stats.query_plan(), &plan, true);
227220

228221
auto stages = FindPlanStages(plan);
229-
UNIT_ASSERT_VALUES_EQUAL(stages.size(), 4);
222+
// TODO: Should be 2/3 stages?
223+
UNIT_ASSERT_VALUES_EQUAL(stages.size(), QueryService ? 3 : 4);
230224

231225
i64 totalTasks = 0;
232226
for (const auto& stage : stages) {
233227
if (stage.GetMapSafe().contains("Stats")) {
234228
totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("Tasks").GetIntegerSafe();
235229
}
236230
}
237-
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 3);
231+
UNIT_ASSERT_VALUES_EQUAL(totalTasks, QueryService ? 2 : 3);
238232
}
239233

240234
Y_UNIT_TEST_TWIN(RangeRead, QueryService) {
@@ -257,11 +251,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
257251

258252
// Cerr << stats.query_plan() << Endl;
259253

260-
if (QueryService) {
261-
// TODO: Fix QS.
262-
return;
263-
}
264-
265254
AssertTableStats(stats, "/Root/Join1", {
266255
.ExpectedReads = 5,
267256
});
@@ -281,7 +270,10 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
281270
totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("Tasks").GetIntegerSafe();
282271
}
283272
}
284-
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2);
273+
274+
// Not implicit limit (1000 rows) for QueryService means no sequential reads.
275+
// TODO: Consider enabling sequential reads even without rows limit.
276+
UNIT_ASSERT_VALUES_EQUAL(totalTasks, QueryService ? 3 : 2);
285277
}
286278

287279
Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) {
@@ -315,11 +307,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
315307
[[9];[101u];["Two"]]
316308
])", FormatResultSetYson(results[0]));
317309

318-
if (QueryService) {
319-
// TODO: Fix QS.
320-
return;
321-
}
322-
323310
AssertTableStats(stats, "/Root/Join1", {
324311
.ExpectedReads = 9,
325312
});
@@ -473,11 +460,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
473460
WHERE Key = $key;
474461
)"), params);
475462

476-
if (QueryService) {
477-
// TODO: Fix QS.
478-
return;
479-
}
480-
481463
AssertTableStats(stats, "/Root/EightShard", {
482464
.ExpectedReads = 1,
483465
.ExpectedUpdates = 1,
@@ -506,11 +488,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
506488
WHERE Key = $key AND Text = $text;
507489
)"), params);
508490

509-
if (QueryService) {
510-
// TODO: Fix QS.
511-
return;
512-
}
513-
514491
AssertTableStats(stats, "/Root/EightShard", {
515492
.ExpectedReads = 1,
516493
.ExpectedDeletes = 1,
@@ -656,11 +633,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
656633
WHERE t1.Key = $key;
657634
)"), params);
658635

659-
if (QueryService) {
660-
// TODO: Fix QS.
661-
return;
662-
}
663-
664636
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
665637
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
666638
} else {

ydb/core/kqp/ut/pg/kqp_pg_ut.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -2712,7 +2712,9 @@ Y_UNIT_TEST_SUITE(KqpPg) {
27122712

27132713
{
27142714
auto result = db.ExecuteQuery(R"(
2715-
SELECT left_table.*, right_table.val2 FROM left_table, right_table WHERE left_table.id=right_table.id
2715+
SELECT left_table.*, right_table.val2 FROM left_table, right_table
2716+
WHERE left_table.id=right_table.id
2717+
ORDER BY left_table.id
27162718
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
27172719
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
27182720

0 commit comments

Comments
 (0)