Skip to content

Commit f74b14b

Browse files
authored
Read from single column shard (#14614) (#16044)
1 parent 222f64f commit f74b14b

File tree

7 files changed

+88
-10
lines changed

7 files changed

+88
-10
lines changed

ydb/core/kqp/common/kqp_yql.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,10 @@ TKqpReadTableSettings ParseInternal(const TCoNameValueTupleList& node) {
169169
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
170170
settings.ForcePrimary = true;
171171
} else if (name == TKqpReadTableSettings::GroupByFieldNames) {
172-
} else {
172+
} else if (name == TKqpReadTableSettings::TabletIdName) {
173+
YQL_ENSURE(tuple.Ref().ChildrenSize() == 2);
174+
settings.TabletId = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value());
175+
}else {
173176
YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'");
174177
}
175178
}
@@ -256,6 +259,17 @@ NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx
256259
.Done());
257260
}
258261

262+
if (TabletId) {
263+
settings.emplace_back(
264+
Build<TCoNameValueTuple>(ctx, pos)
265+
.Name()
266+
.Build(TabletIdName)
267+
.Value<TCoAtom>()
268+
.Value(ToString(*TabletId))
269+
.Build()
270+
.Done());
271+
}
272+
259273
return Build<TCoNameValueTupleList>(ctx, pos)
260274
.Add(settings)
261275
.Done();

ydb/core/kqp/common/kqp_yql.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ struct TKqpReadTableSettings {
5858
static constexpr TStringBuf SequentialSettingName = "Sequential";
5959
static constexpr TStringBuf ForcePrimaryName = "ForcePrimary";
6060
static constexpr TStringBuf GroupByFieldNames = "GroupByFieldNames";
61+
static constexpr TStringBuf TabletIdName = "TabletId";
6162

6263
TVector<TString> SkipNullKeys;
6364
TExprNode::TPtr ItemsLimit;
6465
bool Reverse = false;
6566
bool Sorted = false;
6667
TMaybe<ui64> SequentialInFlight;
68+
TMaybe<ui64> TabletId;
6769
bool ForcePrimary = false;
6870

6971
void AddSkipNullKey(const TString& key);

ydb/core/kqp/executer_actor/kqp_partition_helper.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -712,11 +712,13 @@ THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadOlapRan
712712
return shardInfoMap;
713713

714714
for (const auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
715-
auto& shardInfo = shardInfoMap[partition.ShardId];
715+
if (!readRanges.HasTabletId() || readRanges.GetTabletId() == partition.ShardId) {
716+
auto& shardInfo = shardInfoMap[partition.ShardId];
716717

717-
YQL_ENSURE(!shardInfo.KeyReadRanges);
718-
shardInfo.KeyReadRanges.ConstructInPlace();
719-
shardInfo.KeyReadRanges->CopyFrom(ranges);
718+
YQL_ENSURE(!shardInfo.KeyReadRanges);
719+
shardInfo.KeyReadRanges.ConstructInPlace();
720+
shardInfo.KeyReadRanges->CopyFrom(ranges);
721+
}
720722
}
721723

722724
return shardInfoMap;

ydb/core/kqp/opt/kqp_opt_kql.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,15 @@ bool HasIndexesToWrite(const TKikimrTableDescription& tableData) {
9090
return hasIndexesToWrite;
9191
}
9292

93-
TExprBase BuildReadTable(const TCoAtomList& columns, TPositionHandle pos, const TKikimrTableDescription& tableData, bool forcePrimary,
93+
TExprBase BuildReadTable(const TCoAtomList& columns, TPositionHandle pos, const TKikimrTableDescription& tableData, bool forcePrimary, TMaybe<ui64> tabletId,
9494
TExprContext& ctx)
9595
{
9696
TExprNode::TPtr readTable;
9797
const auto& tableMeta = BuildTableMeta(tableData, pos, ctx);
9898

9999
TKqpReadTableSettings settings;
100100
settings.ForcePrimary = forcePrimary;
101+
settings.TabletId = tabletId;
101102

102103
readTable = Build<TKqlReadTableRanges>(ctx, pos)
103104
.Table(tableMeta)
@@ -117,8 +118,10 @@ TExprBase BuildReadTable(const TKiReadTable& read, const TKikimrTableDescription
117118
bool withSystemColumns, TExprContext& ctx)
118119
{
119120
const auto& columns = read.GetSelectColumns(ctx, tableData, withSystemColumns);
120-
121-
auto readNode = BuildReadTable(columns, read.Pos(), tableData, forcePrimary, ctx);
121+
const auto tabletId = NYql::HasSetting(read.Settings().Ref(), "tabletid")
122+
? TMaybe<ui64>{FromString<ui64>(NYql::GetSetting(read.Settings().Ref(), "tabletid")->Child(1)->Content())}
123+
: TMaybe<ui64>{};
124+
auto readNode = BuildReadTable(columns, read.Pos(), tableData, forcePrimary, tabletId, ctx);
122125

123126
return readNode;
124127
}
@@ -407,7 +410,7 @@ TExprBase BuildRowsToDelete(const TKikimrTableDescription& tableData, bool withS
407410
const auto tableMeta = BuildTableMeta(tableData, pos, ctx);
408411
const auto tableColumns = BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/);
409412

410-
const auto allRows = BuildReadTable(tableColumns, pos, tableData, false, ctx);
413+
const auto allRows = BuildReadTable(tableColumns, pos, tableData, false, {}, ctx);
411414

412415
return Build<TCoFilter>(ctx, pos)
413416
.Input(allRows)
@@ -480,7 +483,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
480483
TExprBase BuildRowsToUpdate(const TKikimrTableDescription& tableData, bool withSystemColumns, const TCoLambda& filter,
481484
const TPositionHandle pos, TExprContext& ctx)
482485
{
483-
auto kqlReadTable = BuildReadTable(BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/), pos, tableData, false, ctx);
486+
auto kqlReadTable = BuildReadTable(BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/), pos, tableData, false, {}, ctx);
484487

485488
return Build<TCoFilter>(ctx, pos)
486489
.Input(kqlReadTable)

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,9 @@ void FillReadRanges(const TReader& read, const TKikimrTableMetadata&, TProto& re
367367

368368
if constexpr (std::is_same_v<TProto, NKqpProto::TKqpPhyOpReadOlapRanges>) {
369369
readProto.SetSorted(settings.Sorted);
370+
if (settings.TabletId) {
371+
readProto.SetTabletId(*settings.TabletId);
372+
}
370373
}
371374

372375
readProto.SetReverse(settings.Reverse);

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3146,6 +3146,58 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
31463146
}
31473147
}
31483148

3149+
Y_UNIT_TEST(SingleShardRead) {
3150+
NKikimrConfig::TAppConfig appConfig;
3151+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
3152+
auto settings = TKikimrSettings()
3153+
.SetAppConfig(appConfig)
3154+
.SetWithSampleTables(false);
3155+
auto kikimr = std::make_unique<TKikimrRunner>(settings);
3156+
Tests::NCommon::TLoggerInit(*kikimr).Initialize();
3157+
auto queryClient = kikimr->GetQueryClient();
3158+
const auto noTx = NQuery::TTxControl::NoTx();
3159+
{
3160+
auto result = queryClient.ExecuteQuery(R"(
3161+
CREATE TABLE Test (
3162+
Id Uint32 not null,
3163+
Name String not null,
3164+
Comment String,
3165+
PRIMARY KEY (Name, Id)
3166+
) WITH (
3167+
STORE = COLUMN,
3168+
PARTITION_COUNT = 3
3169+
);
3170+
3171+
)", noTx).GetValueSync();
3172+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3173+
3174+
result = queryClient.ExecuteQuery(R"(
3175+
UPSERT INTO Test (Id, Name, Comment) VALUES
3176+
(10, "n1", "aa"),
3177+
(20, "n2", "bb"),
3178+
(30, "n3", "cc"),
3179+
(40, "n4", "dd"),
3180+
(50, "n5", "ee")
3181+
)", noTx).GetValueSync();
3182+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3183+
}
3184+
auto tableClient = kikimr->GetTableClient();
3185+
auto tableClientSession = tableClient.GetSession().GetValueSync().GetSession();
3186+
auto result = tableClientSession.DescribeTable("/Root/Test", NYdb::NTable::TDescribeTableSettings{}.WithKeyShardBoundary(true).WithPartitionStatistics(true).WithTableStatistics(true)).ExtractValueSync();
3187+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3188+
//TODO USE shard ids from the table description. Not avaiable now
3189+
{
3190+
auto result = queryClient.ExecuteQuery("SELECT * FROM Test WITH TabletId = '72075186224037888'", noTx).GetValueSync();
3191+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3192+
CompareYson("[[[\"bb\"];20u;\"n2\"];[[\"dd\"];40u;\"n4\"]]", FormatResultSetYson(result.GetResultSet(0)));
3193+
result = queryClient.ExecuteQuery("SELECT * FROM Test WITH TabletId = '72075186224037889'", noTx).GetValueSync();
3194+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3195+
CompareYson("[[[\"ee\"];50u;\"n5\"]]", FormatResultSetYson(result.GetResultSet(0)));
3196+
result = queryClient.ExecuteQuery("SELECT * FROM Test WITH TabletId = '72075186224037890'", noTx).GetValueSync();
3197+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3198+
CompareYson("[[[\"aa\"];10u;\"n1\"];[[\"cc\"];30u;\"n3\"]]", FormatResultSetYson(result.GetResultSet(0)));
3199+
}
3200+
}
31493201
}
31503202

31513203
}

ydb/core/protos/kqp_physical.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ message TKqpPhyOpReadOlapRanges {
205205
EReadType ReadType = 13;
206206

207207
repeated string GroupByColumnNames = 14;
208+
// If set, only data from the specified tablet will be processed
209+
optional uint64 TabletId = 15;
208210
}
209211

210212
message TKqpPhyOpReadRanges {

0 commit comments

Comments
 (0)