Skip to content

Commit 6bb1a84

Browse files
fix(kqp): return one result row per key from stream join actor for left semi strategy (#6642)
1 parent a22a03f commit 6bb1a84

File tree

8 files changed

+89
-6
lines changed

8 files changed

+89
-6
lines changed

ydb/core/kqp/common/kqp_yql.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName";
4848

4949
static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
5050
static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;
51+
static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;
5152

5253
struct TKqpReadTableSettings {
5354
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";

ydb/core/kqp/host/kqp_type_ann.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,9 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
502502
return TStatus::Error;
503503
}
504504

505-
if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) {
505+
if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName
506+
|| lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) {
507+
506508
if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
507509
return TStatus::Error;
508510
}
@@ -1682,7 +1684,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
16821684

16831685
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
16841686

1685-
} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) {
1687+
} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName
1688+
|| lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) {
1689+
16861690
if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
16871691
return TStatus::Error;
16881692
}

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,11 +395,15 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
395395
}
396396
}
397397

398+
auto strategy = join.JoinType().Value() == "LeftSemi"
399+
? TKqpStreamLookupSemiJoinStrategyName
400+
: TKqpStreamLookupJoinStrategyName;
401+
398402
TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
399403
.Table(rightLookup.MainTable)
400404
.LookupKeys(leftInput)
401405
.Columns(lookupColumns.Cast())
402-
.LookupStrategy().Build(TKqpStreamLookupJoinStrategyName)
406+
.LookupStrategy().Build(strategy)
403407
.Done();
404408

405409
// Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>>

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view
9191
lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP;
9292
} else if (strategy == "LookupJoinRows"sv) {
9393
lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN;
94+
} else if (strategy == "LookupSemiJoinRows"sv) {
95+
lookupStrategy = NKqpProto::EStreamLookupStrategy::SEMI_JOIN;
9496
}
9597

9698
YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED,
@@ -1275,7 +1277,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
12751277

12761278
break;
12771279
}
1278-
case NKqpProto::EStreamLookupStrategy::JOIN: {
1280+
case NKqpProto::EStreamLookupStrategy::JOIN:
1281+
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: {
12791282
YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple);
12801283
const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
12811284
YQL_ENSURE(inputTupleType->GetSize() == 2);

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
142142
, HolderFactory(holderFactory)
143143
, InputDesc(inputDesc)
144144
, TablePath(settings.GetTable().GetPath())
145-
, TableId(MakeTableId(settings.GetTable())) {
145+
, TableId(MakeTableId(settings.GetTable()))
146+
, Strategy(settings.GetLookupStrategy()) {
146147

147148
KeyColumns.reserve(settings.GetKeyColumns().size());
148149
i32 keyOrder = 0;
@@ -748,6 +749,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
748749
auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
749750
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
750751

752+
if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
753+
// Semi join should return one result row per key
754+
continue;
755+
}
756+
751757
TReadResultStats rowStats;
752758
i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
753759
auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId);
@@ -962,6 +968,7 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq
962968
case NKqpProto::EStreamLookupStrategy::LOOKUP:
963969
return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
964970
case NKqpProto::EStreamLookupStrategy::JOIN:
971+
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN:
965972
return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
966973
default:
967974
return {};

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class TKqpStreamLookupWorker {
7171
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
7272
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
7373
std::vector<TSysTables::TTableColumnInfo> Columns;
74+
const NKqpProto::EStreamLookupStrategy Strategy;
7475
};
7576

7677
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,

ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,7 @@ Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) {
10231023
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(appConfig);;
10241024
serverSettings.SetKqpSettings(settings);
10251025

1026-
TKikimrRunner kikimr(settings);
1026+
TKikimrRunner kikimr(serverSettings);
10271027
auto db = kikimr.GetTableClient();
10281028
auto session = db.CreateSession().GetValueSync().GetSession();
10291029

@@ -1182,6 +1182,68 @@ Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) {
11821182
}
11831183
}
11841184

1185+
Y_UNIT_TEST_TWIN(LeftSemiJoinWithDuplicatesInRightTable, StreamLookupJoin) {
1186+
NKikimrConfig::TAppConfig appConfig;
1187+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
1188+
auto settings = TKikimrSettings().SetAppConfig(appConfig);
1189+
TKikimrRunner kikimr(settings);
1190+
auto db = kikimr.GetTableClient();
1191+
auto session = db.CreateSession().GetValueSync().GetSession();
1192+
1193+
{ // create tables
1194+
const TString query = R"(
1195+
CREATE TABLE `/Root/Left` (
1196+
Key1 Int64,
1197+
Key2 Int64,
1198+
Value String,
1199+
PRIMARY KEY (Key1, Key2)
1200+
);
1201+
1202+
CREATE TABLE `/Root/Right` (
1203+
Key1 Int64,
1204+
Key2 Int64,
1205+
Value String,
1206+
PRIMARY KEY (Key1, Key2)
1207+
);
1208+
)";
1209+
UNIT_ASSERT(session.ExecuteSchemeQuery(query).GetValueSync().IsSuccess());
1210+
}
1211+
1212+
{ // fill tables
1213+
const TString query = R"(
1214+
REPLACE INTO `/Root/Left` (Key1, Key2, Value) VALUES
1215+
(1, 10, "value1"),
1216+
(2, 20, "value2"),
1217+
(3, 30, "value3");
1218+
1219+
REPLACE INTO `/Root/Right` (Key1, Key2, Value) VALUES
1220+
(10, 100, "value1"),
1221+
(10, 101, "value1"),
1222+
(10, 102, "value1"),
1223+
(20, 200, "value2"),
1224+
(20, 201, "value2"),
1225+
(30, 300, "value3");
1226+
)";
1227+
UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess());
1228+
}
1229+
1230+
{
1231+
const TString query = R"(
1232+
SELECT l.Key1, l.Key2, l.Value
1233+
FROM `/Root/Left` AS l
1234+
LEFT SEMI JOIN `/Root/Right` AS r
1235+
ON l.Key2 = r.Key1 ORDER BY l.Key1, l.Key2, l.Value
1236+
)";
1237+
1238+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1239+
CompareYson(R"([
1240+
[[1];[10];["value1"]];
1241+
[[2];[20];["value2"]];
1242+
[[3];[30];["value3"]]
1243+
])", FormatResultSetYson(result.GetResultSet(0)));
1244+
}
1245+
}
1246+
11851247
} // suite
11861248

11871249
} // namespace NKqp

ydb/core/protos/kqp_physical.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ enum EStreamLookupStrategy {
263263
UNSPECIFIED = 0;
264264
LOOKUP = 1;
265265
JOIN = 2;
266+
SEMI_JOIN = 3;
266267
};
267268

268269
message TKqpPhyCnStreamLookup {

0 commit comments

Comments
 (0)