Skip to content

Commit 0c1fe2f

Browse files
dcherednikadameat
authored andcommitted
Do not skip index update if index has non equatable types as data column. (ydb-platform#787)
YQL has no implementation to check those values are equal.
1 parent a26e0bc commit 0c1fe2f

File tree

2 files changed

+110
-17
lines changed

2 files changed

+110
-17
lines changed

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp

+42-17
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ TExprBase MakeNonexistingRowsFilter(const TDqPhyPrecompute& inputRows, const TDq
177177
TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecompute& inputRows,
178178
const TDqPhyPrecompute& lookupDict, const THashSet<TStringBuf>& inputColumns,
179179
const THashSet<TStringBuf>& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos,
180-
TExprContext& ctx)
180+
TExprContext& ctx, bool opt)
181181
{
182182
// Check if we can update index table from just input data
183183
bool allColumnFromInput = true; // - indicate all data from input
@@ -250,14 +250,18 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
250250
.Build()
251251
.Done()
252252
);
253+
TExprNode::TPtr member = payload.Ptr();
254+
if (opt) {
255+
member = Build<TCoNth>(ctx, pos)
256+
.Tuple(member)
257+
.Index().Build(0)
258+
.Done().Ptr();
259+
}
253260
presentKeyRow.emplace_back(
254261
Build<TCoNameValueTuple>(ctx, pos)
255262
.Name(columnAtom)
256263
.Value<TCoMember>()
257-
.Struct<TCoNth>()
258-
.Tuple(payload)
259-
.Index().Build(0)
260-
.Build()
264+
.Struct(member)
261265
.Name(columnAtom)
262266
.Build()
263267
.Done()
@@ -269,19 +273,29 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
269273
.Add(presentKeyRow)
270274
.Done();
271275

276+
TExprNode::TPtr b;
277+
278+
if (opt) {
279+
b = Build<TCoFlatOptionalIf>(ctx, pos)
280+
.Predicate<TCoNth>()
281+
.Tuple(payload)
282+
.Index().Build(1)
283+
.Build()
284+
.Value<TCoJust>()
285+
.Input(presentKeyRowStruct)
286+
.Build()
287+
.Done().Ptr();
288+
} else {
289+
b = Build<TCoJust>(ctx, pos)
290+
.Input(presentKeyRowStruct)
291+
.Done().Ptr();
292+
}
293+
272294
TExprBase flatmapBody = Build<TCoIfPresent>(ctx, pos)
273295
.Optional(lookup)
274296
.PresentHandler<TCoLambda>()
275297
.Args(payload)
276-
.Body<TCoFlatOptionalIf>()
277-
.Predicate<TCoNth>()
278-
.Tuple(payload)
279-
.Index().Build(1)
280-
.Build()
281-
.Value<TCoJust>()
282-
.Input(presentKeyRowStruct)
283-
.Build()
284-
.Build()
298+
.Body(b)
285299
.Build()
286300
.MissingValue<TCoJust>()
287301
.Input<TCoAsStruct>()
@@ -531,12 +545,18 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
531545
auto indexTableColumnsWithoutData = indexTableColumns;
532546

533547
bool indexDataColumnsUpdated = false;
548+
bool optUpsert = true;
534549
for (const auto& column : indexDesc->DataColumns) {
535550
// TODO: Conder not fetching/updating data columns without input value.
536551
YQL_ENSURE(indexTableColumns.emplace(column).second);
537552

538553
if (inputColumnsSet.contains(column)) {
539554
indexDataColumnsUpdated = true;
555+
// 'skip index update' optimization checks given value equal to saved one
556+
// so the type must be equatable to perform it
557+
auto t = table.GetColumnType(column);
558+
YQL_ENSURE(t);
559+
optUpsert &= t->IsEquatable();
540560
}
541561
}
542562

@@ -695,7 +715,9 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
695715

696716
if (indexKeyColumnsUpdated) {
697717
// Have to delete old index value from index table in case when index key columns were updated
698-
auto deleteIndexKeys = MakeRowsFromTupleDict(lookupDictRecomputed, pk, indexTableColumnsWithoutData, pos, ctx);
718+
auto deleteIndexKeys = optUpsert
719+
? MakeRowsFromTupleDict(lookupDictRecomputed, pk, indexTableColumnsWithoutData, pos, ctx)
720+
: MakeRowsFromDict(lookupDict.Cast(), pk, indexTableColumnsWithoutData, pos, ctx);
699721

700722
auto indexDelete = Build<TKqlDeleteRows>(ctx, pos)
701723
.Table(tableNode)
@@ -711,8 +733,11 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
711733
needIndexTableUpdate = needIndexTableUpdate || indexKeyColumnsUpdated || indexDataColumnsUpdated;
712734

713735
if (needIndexTableUpdate) {
714-
auto upsertIndexRows = MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDictRecomputed,
715-
inputColumnsSet, indexTableColumns, table, pos, ctx);
736+
auto upsertIndexRows = optUpsert
737+
? MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDictRecomputed,
738+
inputColumnsSet, indexTableColumns, table, pos, ctx, true)
739+
: MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDict.Cast(),
740+
inputColumnsSet, indexTableColumns, table, pos, ctx, false);
716741

717742
auto indexUpsert = Build<TKqlUpsertRows>(ctx, pos)
718743
.Table(tableNode)

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

+68
Original file line numberDiff line numberDiff line change
@@ -2396,6 +2396,74 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
23962396
}
23972397
}
23982398

2399+
void CheckUpsertNonEquatableType(bool notNull) {
2400+
auto kqpSetting = NKikimrKqp::TKqpSetting();
2401+
kqpSetting.SetName("_KqpYqlSyntaxVersion");
2402+
kqpSetting.SetValue("1");
2403+
2404+
auto settings = TKikimrSettings()
2405+
.SetKqpSettings({kqpSetting});
2406+
TKikimrRunner kikimr(settings);
2407+
auto db = kikimr.GetTableClient();
2408+
auto session = db.CreateSession().GetValueSync().GetSession();
2409+
2410+
TString createTableSql = R"(
2411+
CREATE TABLE `TableWithJson` (
2412+
id Int64,
2413+
name Utf8,
2414+
slug Json %s,
2415+
parent_id Int64,
2416+
PRIMARY KEY (id),
2417+
INDEX json_parent_id_index GLOBAL ON (parent_id, id) COVER (name, slug)
2418+
);
2419+
)";
2420+
2421+
createTableSql = Sprintf(createTableSql.data(), notNull ? "NOT NULL" : "");
2422+
2423+
{
2424+
auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync();
2425+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2426+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
2427+
}
2428+
2429+
const TString query = R"(
2430+
UPSERT INTO `TableWithJson` (
2431+
id,
2432+
name,
2433+
slug,
2434+
parent_id
2435+
)
2436+
VALUES (
2437+
1,
2438+
'Q',
2439+
JSON(@@"dispenser"@@),
2440+
666
2441+
);
2442+
)";
2443+
2444+
auto result = session.ExecuteDataQuery(
2445+
query,
2446+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
2447+
.ExtractValueSync();
2448+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
2449+
2450+
{
2451+
const auto& yson = ReadTablePartToYson(session, "/Root/TableWithJson");
2452+
const TString expected = R"([[[1];["Q"];["\"dispenser\""];[666]]])";
2453+
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
2454+
}
2455+
2456+
{
2457+
const auto& yson = ReadTablePartToYson(session, "/Root/TableWithJson/json_parent_id_index/indexImplTable");
2458+
const TString expected = R"([[[666];[1];["Q"];["\"dispenser\""]]])";
2459+
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
2460+
}
2461+
}
2462+
2463+
Y_UNIT_TEST_TWIN(CheckUpsertNonEquatableType, NotNull) {
2464+
CheckUpsertNonEquatableType(NotNull);
2465+
}
2466+
23992467
Y_UNIT_TEST(UniqAndNoUniqSecondaryIndexWithCover) {
24002468
auto setting = NKikimrKqp::TKqpSetting();
24012469
auto serverSettings = TKikimrSettings()

0 commit comments

Comments
 (0)