Skip to content

Commit c7baca7

Browse files
authored
Merge 00a5df8 into c7ef100
2 parents c7ef100 + 00a5df8 commit c7baca7

File tree

7 files changed

+89
-10
lines changed

7 files changed

+89
-10
lines changed

ydb/core/kqp/common/kqp_yql.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,10 @@ TKqpReadTableSettings ParseInternal(const TCoNameValueTupleList& node) {
169169
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
170170
settings.ForcePrimary = true;
171171
} else if (name == TKqpReadTableSettings::GroupByFieldNames) {
172-
} else {
172+
} else if (name == TKqpReadTableSettings::ShardIdName) {
173+
YQL_ENSURE(tuple.Ref().ChildrenSize() == 2);
174+
settings.ShardId = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value());
175+
}else {
173176
YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'");
174177
}
175178
}
@@ -256,6 +259,17 @@ NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx
256259
.Done());
257260
}
258261

262+
if (ShardId) {
263+
settings.emplace_back(
264+
Build<TCoNameValueTuple>(ctx, pos)
265+
.Name()
266+
.Build(ShardIdName)
267+
.Value<TCoAtom>()
268+
.Value(ToString(*ShardId))
269+
.Build()
270+
.Done());
271+
}
272+
259273
return Build<TCoNameValueTupleList>(ctx, pos)
260274
.Add(settings)
261275
.Done();

ydb/core/kqp/common/kqp_yql.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,14 @@ struct TKqpReadTableSettings {
8080
static constexpr TStringBuf SequentialSettingName = "Sequential";
8181
static constexpr TStringBuf ForcePrimaryName = "ForcePrimary";
8282
static constexpr TStringBuf GroupByFieldNames = "GroupByFieldNames";
83+
static constexpr TStringBuf ShardIdName = "ShardId";
8384

8485
TVector<TString> SkipNullKeys;
8586
TExprNode::TPtr ItemsLimit;
8687
bool Reverse = false;
8788
bool Sorted = false;
8889
TMaybe<ui64> SequentialInFlight;
90+
TMaybe<ui64> ShardId;
8991
bool ForcePrimary = false;
9092

9193
void AddSkipNullKey(const TString& key);

ydb/core/kqp/executer_actor/kqp_partition_helper.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -712,11 +712,13 @@ THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadOlapRan
712712
return shardInfoMap;
713713

714714
for (const auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
715-
auto& shardInfo = shardInfoMap[partition.ShardId];
715+
if (!readRanges.HasShardId() || readRanges.GetShardId() == partition.ShardId) {
716+
auto& shardInfo = shardInfoMap[partition.ShardId];
716717

717-
YQL_ENSURE(!shardInfo.KeyReadRanges);
718-
shardInfo.KeyReadRanges.ConstructInPlace();
719-
shardInfo.KeyReadRanges->CopyFrom(ranges);
718+
YQL_ENSURE(!shardInfo.KeyReadRanges);
719+
shardInfo.KeyReadRanges.ConstructInPlace();
720+
shardInfo.KeyReadRanges->CopyFrom(ranges);
721+
}
720722
}
721723

722724
return shardInfoMap;

ydb/core/kqp/opt/kqp_opt_kql.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,15 @@ bool HasIndexesToWrite(const TKikimrTableDescription& tableData) {
9090
return hasIndexesToWrite;
9191
}
9292

93-
TExprBase BuildReadTable(const TCoAtomList& columns, TPositionHandle pos, const TKikimrTableDescription& tableData, bool forcePrimary,
93+
TExprBase BuildReadTable(const TCoAtomList& columns, TPositionHandle pos, const TKikimrTableDescription& tableData, bool forcePrimary, TMaybe<ui64> shardId,
9494
TExprContext& ctx)
9595
{
9696
TExprNode::TPtr readTable;
9797
const auto& tableMeta = BuildTableMeta(tableData, pos, ctx);
9898

9999
TKqpReadTableSettings settings;
100100
settings.ForcePrimary = forcePrimary;
101+
settings.ShardId = shardId;
101102

102103
readTable = Build<TKqlReadTableRanges>(ctx, pos)
103104
.Table(tableMeta)
@@ -117,8 +118,10 @@ TExprBase BuildReadTable(const TKiReadTable& read, const TKikimrTableDescription
117118
bool withSystemColumns, TExprContext& ctx)
118119
{
119120
const auto& columns = read.GetSelectColumns(ctx, tableData, withSystemColumns);
120-
121-
auto readNode = BuildReadTable(columns, read.Pos(), tableData, forcePrimary, ctx);
121+
const auto shardId = NYql::HasSetting(read.Settings().Ref(), "shardid")
122+
? TMaybe<ui64>{FromString<ui64>(NYql::GetSetting(read.Settings().Ref(), "shardid")->Child(1)->Content())}
123+
: TMaybe<ui64>{};
124+
auto readNode = BuildReadTable(columns, read.Pos(), tableData, forcePrimary, shardId, ctx);
122125

123126
return readNode;
124127
}
@@ -430,7 +433,7 @@ TExprBase BuildRowsToDelete(const TKikimrTableDescription& tableData, bool withS
430433
const auto tableMeta = BuildTableMeta(tableData, pos, ctx);
431434
const auto tableColumns = BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/);
432435

433-
const auto allRows = BuildReadTable(tableColumns, pos, tableData, false, ctx);
436+
const auto allRows = BuildReadTable(tableColumns, pos, tableData, false, {}, ctx);
434437

435438
return Build<TCoFilter>(ctx, pos)
436439
.Input(allRows)
@@ -509,7 +512,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
509512
TExprBase BuildRowsToUpdate(const TKikimrTableDescription& tableData, bool withSystemColumns, const TCoLambda& filter,
510513
const TPositionHandle pos, TExprContext& ctx)
511514
{
512-
auto kqlReadTable = BuildReadTable(BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/), pos, tableData, false, ctx);
515+
auto kqlReadTable = BuildReadTable(BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/), pos, tableData, false, {}, ctx);
513516

514517
return Build<TCoFilter>(ctx, pos)
515518
.Input(kqlReadTable)

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,9 @@ void FillReadRanges(const TReader& read, const TKikimrTableMetadata&, TProto& re
397397

398398
if constexpr (std::is_same_v<TProto, NKqpProto::TKqpPhyOpReadOlapRanges>) {
399399
readProto.SetSorted(settings.Sorted);
400+
if (settings.ShardId) {
401+
readProto.SetShardId(*settings.ShardId);
402+
}
400403
}
401404

402405
readProto.SetReverse(settings.Reverse);

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3215,6 +3215,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
32153215
auto client = kikimr.GetTableClient();
32163216
Tests::NCommon::TLoggerInit(kikimr).Initialize();
32173217

3218+
3219+
3220+
32183221
{
32193222
auto result = kikimr.GetQueryClient()
32203223
.ExecuteQuery(R"(
@@ -3255,5 +3258,55 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
32553258
CompareYson(result, R"([[0u;"\"-inf\""];[1u;"\"inf\""];[2u;"\"inf\""];[3u;"\"-inf\""]])");
32563259
}
32573260
}
3261+
3262+
Y_UNIT_TEST(SingleShardRead) {
3263+
std::unique_ptr<TKikimrRunner> Kikimr;
3264+
auto settings = TKikimrSettings().SetWithSampleTables(false);
3265+
auto kikimr = std::make_unique<TKikimrRunner>(settings);
3266+
Tests::NCommon::TLoggerInit(*kikimr).Initialize();
3267+
auto queryClient = kikimr->GetQueryClient();
3268+
const auto noTx = NQuery::TTxControl::NoTx();
3269+
{
3270+
auto result = queryClient.ExecuteQuery(R"(
3271+
CREATE TABLE Test (
3272+
Id Uint32 not null,
3273+
Name String not null,
3274+
Comment String,
3275+
PRIMARY KEY (Name, Id)
3276+
) WITH (
3277+
STORE = COLUMN,
3278+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3
3279+
);
3280+
3281+
)", noTx).GetValueSync();
3282+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3283+
3284+
result = queryClient.ExecuteQuery(R"(
3285+
UPSERT INTO Test (Id, Name, Comment) VALUES
3286+
(10, "n1", "aa"),
3287+
(20, "n2", "bb"),
3288+
(30, "n3", "cc"),
3289+
(40, "n4", "dd"),
3290+
(50, "n5", "ee")
3291+
)", noTx).GetValueSync();
3292+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3293+
}
3294+
auto tableClient = kikimr->GetTableClient();
3295+
auto tableClientSession = tableClient.GetSession().GetValueSync().GetSession();
3296+
auto result = tableClientSession.DescribeTable("/Root/Test", NYdb::NTable::TDescribeTableSettings{}.WithKeyShardBoundary(true).WithShardNodesInfo(true).WithPartitionStatistics(true).WithTableStatistics(true)).ExtractValueSync();
3297+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3298+
//TODO USE shard ids from the table description. Not avaiable now
3299+
{
3300+
auto result = queryClient.ExecuteQuery("SELECT * FROM Test WITH (ShardId = '72075186224037888')", noTx).GetValueSync();
3301+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3302+
CompareYson("[[[\"bb\"];20u;\"n2\"];[[\"dd\"];40u;\"n4\"]]", FormatResultSetYson(result.GetResultSet(0)));
3303+
result = queryClient.ExecuteQuery("SELECT * FROM Test WITH ShardId = '72075186224037889'", noTx).GetValueSync();
3304+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3305+
CompareYson("[[[\"ee\"];50u;\"n5\"]]", FormatResultSetYson(result.GetResultSet(0)));
3306+
result = queryClient.ExecuteQuery("SELECT * FROM Test WITH (ShardId = '72075186224037890')", noTx).GetValueSync();
3307+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3308+
CompareYson("[[[\"aa\"];10u;\"n1\"];[[\"cc\"];30u;\"n3\"]]", FormatResultSetYson(result.GetResultSet(0)));
3309+
}
3310+
}
32583311
}
32593312
}

ydb/core/protos/kqp_physical.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@ message TKqpPhyOpReadOlapRanges {
211211
EReadType ReadType = 13;
212212

213213
repeated string GroupByColumnNames = 14;
214+
// If set, only data from the specified shard will be processed
215+
optional uint64 ShardId = 15;
214216
}
215217

216218
message TKqpPhyOpReadRanges {

0 commit comments

Comments
 (0)