Skip to content

Commit 4e623e3

Browse files
authored
Merge 6c6f396 into f6729a6
2 parents f6729a6 + 6c6f396 commit 4e623e3

File tree

7 files changed

+240
-11
lines changed

7 files changed

+240
-11
lines changed

ydb/core/tx/schemeshard/schemeshard__table_stats.cpp

+8-4
Original file line numberDiff line numberDiff line change
@@ -259,18 +259,19 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
259259
subDomainInfo->EffectiveStoragePools(),
260260
shardInfo->BindedChannels);
261261

262+
const auto pathElement = Self->PathsById[pathId];
262263
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
263264
"TTxStoreTableStats.PersistSingleStats: main stats from"
264265
<< " datashardId(TabletID)=" << datashardId << " maps to shardIdx: " << shardIdx
265-
<< ", pathId: " << pathId << ", pathId map=" << Self->PathsById[pathId]->Name
266+
<< ", pathId: " << pathId << ", pathId map=" << pathElement->Name
266267
<< ", is column=" << isColumnTable << ", is olap=" << isOlapStore);
267268

268269
const TPartitionStats newStats = PrepareStats(ctx, rec, channelsMapping);
269270

270271
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
271-
"Add stats from shard with datashardId(TabletID)=" << datashardId
272+
"Add stats from shard with datashardId(TabletID)=" << datashardId
272273
<< ", pathId " << pathId.LocalPathId
273-
<< ": RowCount " << newStats.RowCount
274+
<< ": RowCount " << newStats.RowCount
274275
<< ", DataSize " << newStats.DataSize
275276
<< (newStats.HasBorrowedData ? ", with borrowed parts" : ""));
276277

@@ -399,11 +400,14 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
399400
Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].IncrementFor(lag->Seconds());
400401
}
401402

403+
const TTableIndexInfo* index = Self->Indexes.Value(pathElement->ParentPathId, nullptr).Get();
402404
const TTableInfo* mainTableForIndex = Self->GetMainTableForIndex(pathId);
403405

404406
const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
405407
TVector<TShardIdx> shardsToMerge;
406-
if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)) {
408+
if ((!index || index->State == NKikimrSchemeOp::EIndexStateReady)
409+
&& table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)
410+
) {
407411
TTxId txId = Self->GetCachedTxId(ctx);
408412

409413
if (!txId) {

ydb/core/tx/schemeshard/ut_helpers/helpers.cpp

+13-7
Original file line numberDiff line numberDiff line change
@@ -1679,12 +1679,18 @@ namespace NSchemeShardUT_Private {
16791679
*index.mutable_data_columns() = {cfg.DataColumns.begin(), cfg.DataColumns.end()};
16801680

16811681
switch (cfg.IndexType) {
1682-
case NKikimrSchemeOp::EIndexTypeGlobal:
1683-
*index.mutable_global_index() = Ydb::Table::GlobalIndex();
1684-
break;
1685-
case NKikimrSchemeOp::EIndexTypeGlobalAsync:
1686-
*index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex();
1687-
break;
1682+
case NKikimrSchemeOp::EIndexTypeGlobal: {
1683+
auto& settings = *index.mutable_global_index()->mutable_settings();
1684+
if (cfg.GlobalIndexSettings) {
1685+
cfg.GlobalIndexSettings[0].SerializeTo(settings);
1686+
}
1687+
} break;
1688+
case NKikimrSchemeOp::EIndexTypeGlobalAsync: {
1689+
auto& settings = *index.mutable_global_async_index()->mutable_settings();
1690+
if (cfg.GlobalIndexSettings) {
1691+
cfg.GlobalIndexSettings[0].SerializeTo(settings);
1692+
}
1693+
} break;
16881694
default:
16891695
UNIT_ASSERT_C(false, "Unknown index type: " << static_cast<ui32>(cfg.IndexType));
16901696
}
@@ -1994,7 +2000,7 @@ namespace NSchemeShardUT_Private {
19942000
Runtime.SendToPipe(shardData.ShardId, sender, proposal);
19952001
TAutoPtr<IEventHandle> handle;
19962002
auto event = Runtime.GrabEdgeEventIf<TEvDataShard::TEvProposeTransactionResult>(handle,
1997-
[=](const TEvDataShard::TEvProposeTransactionResult& event) {
2003+
[this, shardData](const TEvDataShard::TEvProposeTransactionResult& event) {
19982004
return event.GetTxId() == TxId && event.GetOrigin() == shardData.ShardId;
19992005
});
20002006
activeZone = true;

ydb/core/tx/schemeshard/ut_helpers/helpers.h

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <ydb/library/yql/minikql/mkql_alloc.h>
2222
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
23+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
2324

2425
#include <util/stream/null.h>
2526

@@ -361,6 +362,7 @@ namespace NSchemeShardUT_Private {
361362
NKikimrSchemeOp::EIndexType IndexType = NKikimrSchemeOp::EIndexTypeGlobal;
362363
TVector<TString> IndexColumns;
363364
TVector<TString> DataColumns;
365+
TVector<NYdb::NTable::TGlobalIndexSettings> GlobalIndexSettings = {};
364366
};
365367

366368
std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal);

ydb/core/tx/schemeshard/ut_helpers/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ PEERDIR(
2222
ydb/public/lib/scheme_types
2323
ydb/library/yql/public/issue
2424
ydb/public/sdk/cpp/client/ydb_driver
25+
ydb/public/sdk/cpp/client/ydb_table
2526
)
2627

2728
SRCS(

ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
66
#include <ydb/core/testlib/tablet_helpers.h>
77
#include <ydb/public/lib/deprecated/kicli/kicli.h>
8+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
89

910
using namespace NKikimr;
1011
using namespace NSchemeShard;

ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp

+151
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
22
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
3+
#include <ydb/core/testlib/actors/block_events.h>
34
#include <ydb/core/testlib/tablet_helpers.h>
45

56
#include <ydb/core/tx/datashard/datashard.h>
67
#include <ydb/core/metering/metering.h>
78

9+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
10+
811
using namespace NKikimr;
912
using namespace NSchemeShard;
1013
using namespace NSchemeShardUT_Private;
@@ -759,6 +762,154 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {
759762

760763
}
761764

765+
Y_UNIT_TEST(MergeIndexTableShardsOnlyWhenReady) {
766+
TTestBasicRuntime runtime;
767+
768+
TTestEnvOptions opts;
769+
opts.EnableBackgroundCompaction(false);
770+
opts.DisableStatsBatching(true);
771+
TTestEnv env(runtime, opts);
772+
773+
NDataShard::gDbStatsReportInterval = TDuration::Seconds(0);
774+
775+
ui64 txId = 100;
776+
777+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
778+
Name: "Table"
779+
Columns { Name: "key" Type: "Uint64" }
780+
Columns { Name: "value" Type: "Uint64" }
781+
KeyColumnNames: ["key"]
782+
)");
783+
env.TestWaitNotification(runtime, txId);
784+
785+
Ydb::Table::GlobalIndexSettings settings;
786+
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
787+
partition_at_keys {
788+
split_points {
789+
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
790+
value { items { uint64_value: 10 } }
791+
}
792+
split_points {
793+
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
794+
value { items { uint64_value: 20 } }
795+
}
796+
split_points {
797+
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
798+
value { items { uint64_value: 30 } }
799+
}
800+
}
801+
)", &settings));
802+
803+
TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> indexApplicationBlocker(runtime, [](const auto& ev) {
804+
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
805+
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpApplyIndexBuild;
806+
});
807+
808+
ui64 indexInitializationTx = 0;
809+
using TEvent = TEvSchemeShard::TEvModifySchemeTransaction;
810+
auto indexInitializationObserver = runtime.AddObserver<TEvent>([&indexInitializationTx](const TEvent::TPtr& ev) {
811+
const auto& record = ev->Get()->Record;
812+
if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexBuild) {
813+
indexInitializationTx = record.GetTxId();
814+
}
815+
}
816+
);
817+
818+
const ui64 buildIndexTx = ++txId;
819+
TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", TBuildIndexConfig{
820+
"ByValue", NKikimrSchemeOp::EIndexTypeGlobal, { "value" }, {},
821+
{ NYdb::NTable::TGlobalIndexSettings::FromProto(settings) }
822+
});
823+
824+
runtime.WaitFor("index initialization", [&indexInitializationTx]{
825+
return indexInitializationTx != 0;
826+
});
827+
env.TestWaitNotification(runtime, indexInitializationTx);
828+
829+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
830+
NLs::PathExist,
831+
NLs::IndexState(NKikimrSchemeOp::EIndexStateWriteOnly)
832+
});
833+
834+
TVector<ui64> indexShards;
835+
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
836+
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
837+
const auto& partitions = record.GetPathDescription().GetTablePartitions();
838+
indexShards.clear();
839+
indexShards.reserve(partitions.size());
840+
for (const auto& partition : partitions) {
841+
indexShards.emplace_back(partition.GetDatashardId());
842+
}
843+
};
844+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
845+
NLs::PathExist,
846+
NLs::PartitionCount(4),
847+
shardCollector
848+
});
849+
UNIT_ASSERT_VALUES_EQUAL(indexShards.size(), 4);
850+
851+
{
852+
// make sure no shards are merged
853+
TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> mergeBlocker(runtime, [](const auto& ev) {
854+
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
855+
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions;
856+
});
857+
858+
{
859+
// wait for all index shards to send statistics
860+
THashSet<ui64> shardsWithStats;
861+
using TEvType = TEvDataShard::TEvPeriodicTableStats;
862+
auto statsObserver = runtime.AddObserver<TEvType>([&shardsWithStats](const TEvType::TPtr& ev) {
863+
shardsWithStats.emplace(ev->Get()->Record.GetDatashardId());
864+
});
865+
866+
runtime.WaitFor("all index shards to send statistics", [&]{
867+
return AllOf(indexShards, [&shardsWithStats](ui64 indexShard) {
868+
return shardsWithStats.contains(indexShard);
869+
});
870+
});
871+
}
872+
873+
// we expect to not have observed any attempts to merge
874+
UNIT_ASSERT(mergeBlocker.empty());
875+
876+
// wait for 1 minute to ensure that no merges have been started by SchemeShard
877+
env.SimulateSleep(runtime, TDuration::Minutes(1));
878+
UNIT_ASSERT(mergeBlocker.empty());
879+
}
880+
881+
// splits are allowed even if the index is not ready
882+
TestSplitTable(runtime, ++txId, "/MyRoot/Table/ByValue/indexImplTable", Sprintf(R"(
883+
SourceTabletId: %lu
884+
SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5 } } } }
885+
)",
886+
indexShards.front()
887+
)
888+
);
889+
env.TestWaitNotification(runtime, txId);
890+
891+
indexApplicationBlocker.Stop().Unblock();
892+
env.TestWaitNotification(runtime, buildIndexTx);
893+
894+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
895+
NLs::IndexState(NKikimrSchemeOp::EIndexStateReady)
896+
});
897+
898+
// wait until all index impl table shards are merged into one
899+
while (true) {
900+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
901+
shardCollector
902+
});
903+
if (indexShards.size() > 1) {
904+
// If a merge happens, old shards are deleted and replaced with a new one.
905+
// That is why we need to wait for * all * the shards to be deleted.
906+
env.TestWaitTabletDeletion(runtime, indexShards);
907+
} else {
908+
break;
909+
}
910+
}
911+
}
912+
762913
Y_UNIT_TEST(DropIndex) {
763914
TTestBasicRuntime runtime;
764915
TTestEnv env(runtime);

ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp

+64
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <ydb/core/tablet_flat/util_fmt_cell.h>
2+
#include <ydb/core/testlib/actors/block_events.h>
23
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
34
#include <ydb/core/tx/schemeshard/schemeshard_utils.h>
45

@@ -277,6 +278,69 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
277278
// test requires more txids than cached at start
278279
}
279280

281+
Y_UNIT_TEST(MergeIndexTableShards) {
282+
TTestBasicRuntime runtime;
283+
284+
TTestEnvOptions opts;
285+
opts.EnableBackgroundCompaction(false);
286+
TTestEnv env(runtime, opts);
287+
288+
ui64 txId = 100;
289+
290+
TBlockEvents<TEvDataShard::TEvPeriodicTableStats> statsBlocker(runtime);
291+
292+
TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
293+
TableDescription {
294+
Name: "Table"
295+
Columns { Name: "key" Type: "Uint64" }
296+
Columns { Name: "value" Type: "Utf8" }
297+
KeyColumnNames: ["key"]
298+
}
299+
IndexDescription {
300+
Name: "ByValue"
301+
KeyColumnNames: ["value"]
302+
IndexImplTableDescription {
303+
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "A" } } } }
304+
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "B" } } } }
305+
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "C" } } } }
306+
}
307+
}
308+
)"
309+
);
310+
env.TestWaitNotification(runtime, txId);
311+
312+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true),
313+
{ NLs::PartitionCount(4) }
314+
);
315+
316+
statsBlocker.Stop().Unblock();
317+
318+
TVector<ui64> indexShards;
319+
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
320+
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
321+
const auto& partitions = record.GetPathDescription().GetTablePartitions();
322+
indexShards.clear();
323+
indexShards.reserve(partitions.size());
324+
for (const auto& partition : partitions) {
325+
indexShards.emplace_back(partition.GetDatashardId());
326+
}
327+
};
328+
329+
// wait until all index impl table shards are merged into one
330+
while (true) {
331+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
332+
shardCollector
333+
});
334+
if (indexShards.size() > 1) {
335+
// If a merge happens, old shards are deleted and replaced with a new one.
336+
// That is why we need to wait for * all * the shards to be deleted.
337+
env.TestWaitTabletDeletion(runtime, indexShards);
338+
} else {
339+
break;
340+
}
341+
}
342+
}
343+
280344
Y_UNIT_TEST(AutoMergeInOne) {
281345
TTestWithReboots t;
282346
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {

0 commit comments

Comments
 (0)