Skip to content

Fix opt rules for generic queries. (KIKIMR-16294) #860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/kqp_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
}

bool IsGenericQuery() const {
return QueryCtx->Type == NYql::EKikimrQueryType::Query;
return QueryCtx->Type == NYql::EKikimrQueryType::Query || QueryCtx->Type == NYql::EKikimrQueryType::Script;
}
};

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/opt/kqp_opt_build_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,9 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
YQL_CLOG(TRACE, ProviderKqp) << "[BuildTx] " << KqpExprToPrettyString(*result, ctx)
<< ", isPrecompute: " << isPrecompute;

auto& transformer = KqpCtx->IsDataQuery() ? *DataTxTransformer : *ScanTxTransformer;
auto& transformer = KqpCtx->IsScanQuery() ? *ScanTxTransformer : *DataTxTransformer;


transformer.Rewind();
BuildTxTransformer->Init(KqpCtx->QueryCtx->Type, isPrecompute);
auto expr = result;
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ TExprBase KqpRewriteIndexRead(const TExprBase& node, TExprContext& ctx, const TK
}

TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!kqpCtx.IsDataQuery()) {
if (kqpCtx.IsScanQuery()) {
// TODO: Enable index lookup for scan queries as we now support stream lookups.
return node;
}

Expand Down Expand Up @@ -499,7 +500,7 @@ bool KeySelectorAllMembers(const TCoLambda& lambda, const TSet<TString> & applyC
}

// Construct a new lambda with renamed attributes based on the mapping
// If we see a complex expression in the key selector, we just pass it on into
// If we see a complex expression in the key selector, we just pass it on into
// the new lambda
TCoLambda RenameKeySelector(const TCoLambda& lambda, TExprContext& ctx, const THashMap<TString,TString>& map) {
// If its single member lambda body
Expand Down Expand Up @@ -533,7 +534,7 @@ TCoLambda RenameKeySelector(const TCoLambda& lambda, TExprContext& ctx, const TH
.Done();
members.push_back(member);
}

return Build<TCoLambda>(ctx, lambda.Pos())
.Args({arg})
.Body<TExprList>()
Expand All @@ -544,8 +545,8 @@ TCoLambda RenameKeySelector(const TCoLambda& lambda, TExprContext& ctx, const TH
}


// If we have a top-sort over flatmap, we can push it throught is, so that the
// RewriteTopSortOverIndexRead rule can fire next. If the flatmap renames some of the sort
// If we have a top-sort over flatmap, we can push it throught is, so that the
// RewriteTopSortOverIndexRead rule can fire next. If the flatmap renames some of the sort
// attributes, we need to use the original names in the top-sort. When pushing TopSort below
// FlatMap, we change FlatMap to OrderedFlatMap to preserve the order of its input.
TExprBase KqpRewriteTopSortOverFlatMap(const TExprBase& node, TExprContext& ctx) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
&& supportedStreamJoinKinds.contains(join.JoinType().Value());

bool needPrecomputeLeft = kqpCtx.IsDataQuery()
bool needPrecomputeLeft = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
&& !join.LeftInput().Maybe<TCoParameter>()
&& !IsParameterToListOfStructsRepack(join.LeftInput())
&& !useStreamIndexLookupJoin;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
TMaybeNode<TCoAtom> indexName;

//TODO: remove this branch KIKIMR-15255, KIKIMR-15321
if (!readMatch && kqpCtx.IsDataQuery()) {
if (!readMatch && (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())) {
if (auto readRangesMatch = MatchRead<TKqlReadTableRangesBase>(flatmap.Input())) {
auto read = readRangesMatch->Read.Cast<TKqlReadTableRangesBase>();
if (TCoVoid::Match(read.Ranges().Raw())) {
Expand Down Expand Up @@ -239,6 +239,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
}

TMaybeNode<TExprBase> readInput;
// TODO: Use single implementation for all kinds of queries.
if (useDataOrGenericQueryLookup) {
auto lookupKeys = BuildEquiRangeLookup(keyRange, tableDesc, read.Pos(), ctx);

Expand Down
40 changes: 23 additions & 17 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,34 +374,40 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
{
auto buildLookup = [&] (TExprNode::TPtr keys, TMaybe<TExprBase>& result) {
if (indexName) {
if (kqpCtx.IsDataQuery()) {
if (kqpCtx.IsScanQuery()) {
if (kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
result = Build<TKqlStreamLookupIndex>(ctx, node.Pos())
.Table(read.Table())
.Columns(read.Columns())
.LookupKeys(keys)
.Index(indexName.Cast())
.LookupKeys(keys)
.Done();
}
} else {
result = Build<TKqlLookupIndex>(ctx, node.Pos())
.Table(read.Table())
.Columns(read.Columns())
.LookupKeys(keys)
.Index(indexName.Cast())
.Done();
} else if (kqpCtx.IsScanQuery() && kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
result = Build<TKqlStreamLookupIndex>(ctx, node.Pos())
}
} else {
if (kqpCtx.IsScanQuery()) {
if (kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
result = Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(read.Table())
.Columns(read.Columns())
.LookupKeys(keys)
.Done();
}
} else {
result = Build<TKqlLookupTable>(ctx, node.Pos())
.Table(read.Table())
.Columns(read.Columns())
.LookupKeys(keys)
.Index(indexName.Cast())
.LookupKeys(keys)
.Done();
}
} else if (kqpCtx.IsDataQuery()) {
result = Build<TKqlLookupTable>(ctx, node.Pos())
.Table(read.Table())
.Columns(read.Columns())
.LookupKeys(keys)
.Done();
} else if (kqpCtx.IsScanQuery() && kqpCtx.Config->EnableKqpScanQueryStreamLookup) {
result = Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(read.Table())
.Columns(read.Columns())
.LookupKeys(keys)
.Done();
}
};

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
// TODO: Allow push to left stage for data queries.
// It is now possible as we don't use datashard transactions for reads in data queries.
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
pushLeftStage, KqpCtx.Config->GetHashJoinMode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
value Utf8 NOT NULL
)
) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key
ORDER BY key
)"
, "external_source"_a = externalDataSourceName
, "ydb_table"_a = ydbTable)).ExtractValueSync();
Expand Down Expand Up @@ -448,6 +449,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
value Utf8 NOT NULL
)
) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key
ORDER BY key
)"
, "external_source"_a = externalDataSourceName
, "ydb_table"_a = ydbTable)).ExtractValueSync();
Expand Down
48 changes: 10 additions & 38 deletions ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
WHERE Key = $key;
)"), params);

if (QueryService) {
// TODO: Fix QS.
return;
}
// Cerr << stats.query_plan() << Endl;

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
Expand Down Expand Up @@ -208,16 +205,12 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {

// Cerr << stats.query_plan() << Endl;

if (QueryService) {
// TODO: Fix QS.
return;
}

AssertTableStats(stats, "/Root/Join1", {
.ExpectedReads = 3,
});

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); // Precompute limit Min(1001,$limit),
// For data query, additional precompute for LIMIT: Min(1001,$limit)
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), QueryService ? 1 : 2);
for (const auto& phase : stats.query_phases()) {
UNIT_ASSERT(phase.affected_shards() <= 1);
}
Expand All @@ -226,15 +219,16 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
NJson::ReadJsonTree(stats.query_plan(), &plan, true);

auto stages = FindPlanStages(plan);
UNIT_ASSERT_VALUES_EQUAL(stages.size(), 4);
// TODO: Should be 2/3 stages?
UNIT_ASSERT_VALUES_EQUAL(stages.size(), QueryService ? 3 : 4);

i64 totalTasks = 0;
for (const auto& stage : stages) {
if (stage.GetMapSafe().contains("Stats")) {
totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("Tasks").GetIntegerSafe();
}
}
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 3);
UNIT_ASSERT_VALUES_EQUAL(totalTasks, QueryService ? 2 : 3);
}

Y_UNIT_TEST_TWIN(RangeRead, QueryService) {
Expand All @@ -257,11 +251,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {

// Cerr << stats.query_plan() << Endl;

if (QueryService) {
// TODO: Fix QS.
return;
}

AssertTableStats(stats, "/Root/Join1", {
.ExpectedReads = 5,
});
Expand All @@ -281,7 +270,10 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("Tasks").GetIntegerSafe();
}
}
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2);

// Not implicit limit (1000 rows) for QueryService means no sequential reads.
// TODO: Consider enabling sequential reads even without rows limit.
UNIT_ASSERT_VALUES_EQUAL(totalTasks, QueryService ? 3 : 2);
}

Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) {
Expand Down Expand Up @@ -315,11 +307,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
[[9];[101u];["Two"]]
])", FormatResultSetYson(results[0]));

if (QueryService) {
// TODO: Fix QS.
return;
}

AssertTableStats(stats, "/Root/Join1", {
.ExpectedReads = 9,
});
Expand Down Expand Up @@ -473,11 +460,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
WHERE Key = $key;
)"), params);

if (QueryService) {
// TODO: Fix QS.
return;
}

AssertTableStats(stats, "/Root/EightShard", {
.ExpectedReads = 1,
.ExpectedUpdates = 1,
Expand Down Expand Up @@ -506,11 +488,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
WHERE Key = $key AND Text = $text;
)"), params);

if (QueryService) {
// TODO: Fix QS.
return;
}

AssertTableStats(stats, "/Root/EightShard", {
.ExpectedReads = 1,
.ExpectedDeletes = 1,
Expand Down Expand Up @@ -656,11 +633,6 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
WHERE t1.Key = $key;
)"), params);

if (QueryService) {
// TODO: Fix QS.
return;
}

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
} else {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2712,7 +2712,9 @@ Y_UNIT_TEST_SUITE(KqpPg) {

{
auto result = db.ExecuteQuery(R"(
SELECT left_table.*, right_table.val2 FROM left_table, right_table WHERE left_table.id=right_table.id
SELECT left_table.*, right_table.val2 FROM left_table, right_table
WHERE left_table.id=right_table.id
ORDER BY left_table.id
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

Expand Down