Skip to content

24-3: Disable merges for indexImplTables partitions when build is in progress (#10166) #10355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,19 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
subDomainInfo->EffectiveStoragePools(),
shardInfo->BindedChannels);

const auto pathElement = Self->PathsById[pathId];
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxStoreTableStats.PersistSingleStats: main stats from"
<< " datashardId(TabletID)=" << datashardId << " maps to shardIdx: " << shardIdx
<< ", pathId: " << pathId << ", pathId map=" << Self->PathsById[pathId]->Name
<< ", pathId: " << pathId << ", pathId map=" << pathElement->Name
<< ", is column=" << isColumnTable << ", is olap=" << isOlapStore);

const TPartitionStats newStats = PrepareStats(ctx, rec, channelsMapping);

LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Add stats from shard with datashardId(TabletID)=" << datashardId
"Add stats from shard with datashardId(TabletID)=" << datashardId
<< ", pathId " << pathId.LocalPathId
<< ": RowCount " << newStats.RowCount
<< ": RowCount " << newStats.RowCount
<< ", DataSize " << newStats.DataSize
<< (newStats.HasBorrowedData ? ", with borrowed parts" : ""));

Expand Down Expand Up @@ -399,11 +400,14 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].IncrementFor(lag->Seconds());
}

const TTableIndexInfo* index = Self->Indexes.Value(pathElement->ParentPathId, nullptr).Get();
const TTableInfo* mainTableForIndex = Self->GetMainTableForIndex(pathId);

const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
TVector<TShardIdx> shardsToMerge;
if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)) {
if ((!index || index->State == NKikimrSchemeOp::EIndexStateReady)
&& table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)
) {
TTxId txId = Self->GetCachedTxId(ctx);

if (!txId) {
Expand Down
20 changes: 13 additions & 7 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1679,12 +1679,18 @@ namespace NSchemeShardUT_Private {
*index.mutable_data_columns() = {cfg.DataColumns.begin(), cfg.DataColumns.end()};

switch (cfg.IndexType) {
case NKikimrSchemeOp::EIndexTypeGlobal:
*index.mutable_global_index() = Ydb::Table::GlobalIndex();
break;
case NKikimrSchemeOp::EIndexTypeGlobalAsync:
*index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex();
break;
case NKikimrSchemeOp::EIndexTypeGlobal: {
auto& settings = *index.mutable_global_index()->mutable_settings();
if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(settings);
}
} break;
case NKikimrSchemeOp::EIndexTypeGlobalAsync: {
auto& settings = *index.mutable_global_async_index()->mutable_settings();
if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(settings);
}
} break;
default:
UNIT_ASSERT_C(false, "Unknown index type: " << static_cast<ui32>(cfg.IndexType));
}
Expand Down Expand Up @@ -1994,7 +2000,7 @@ namespace NSchemeShardUT_Private {
Runtime.SendToPipe(shardData.ShardId, sender, proposal);
TAutoPtr<IEventHandle> handle;
auto event = Runtime.GrabEdgeEventIf<TEvDataShard::TEvProposeTransactionResult>(handle,
[=](const TEvDataShard::TEvProposeTransactionResult& event) {
[this, shardData](const TEvDataShard::TEvProposeTransactionResult& event) {
return event.GetTxId() == TxId && event.GetOrigin() == shardData.ShardId;
});
activeZone = true;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

#include <util/stream/null.h>

Expand Down Expand Up @@ -361,6 +362,7 @@ namespace NSchemeShardUT_Private {
NKikimrSchemeOp::EIndexType IndexType = NKikimrSchemeOp::EIndexTypeGlobal;
TVector<TString> IndexColumns;
TVector<TString> DataColumns;
TVector<NYdb::NTable::TGlobalIndexSettings> GlobalIndexSettings = {};
};

std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PEERDIR(
ydb/public/lib/scheme_types
ydb/library/yql/public/issue
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_table
)

SRCS(
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

using namespace NKikimr;
using namespace NSchemeShard;
Expand Down
151 changes: 151 additions & 0 deletions ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/testlib/tablet_helpers.h>

#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/metering/metering.h>

#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

using namespace NKikimr;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;
Expand Down Expand Up @@ -759,6 +762,154 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {

}

Y_UNIT_TEST(MergeIndexTableShardsOnlyWhenReady) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
opts.DisableStatsBatching(true);
TTestEnv env(runtime, opts);

NDataShard::gDbStatsReportInterval = TDuration::Seconds(0);

ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

Ydb::Table::GlobalIndexSettings settings;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
partition_at_keys {
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 10 } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 20 } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 30 } }
}
}
)", &settings));

TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> indexApplicationBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpApplyIndexBuild;
});

ui64 indexInitializationTx = 0;
using TEvent = TEvSchemeShard::TEvModifySchemeTransaction;
auto indexInitializationObserver = runtime.AddObserver<TEvent>([&indexInitializationTx](const TEvent::TPtr& ev) {
const auto& record = ev->Get()->Record;
if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexBuild) {
indexInitializationTx = record.GetTxId();
}
}
);

const ui64 buildIndexTx = ++txId;
TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", TBuildIndexConfig{
"ByValue", NKikimrSchemeOp::EIndexTypeGlobal, { "value" }, {},
{ NYdb::NTable::TGlobalIndexSettings::FromProto(settings) }
});

runtime.WaitFor("index initialization", [&indexInitializationTx]{
return indexInitializationTx != 0;
});
env.TestWaitNotification(runtime, indexInitializationTx);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
NLs::PathExist,
NLs::IndexState(NKikimrSchemeOp::EIndexStateWriteOnly)
});

TVector<ui64> indexShards;
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
const auto& partitions = record.GetPathDescription().GetTablePartitions();
indexShards.clear();
indexShards.reserve(partitions.size());
for (const auto& partition : partitions) {
indexShards.emplace_back(partition.GetDatashardId());
}
};
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
NLs::PathExist,
NLs::PartitionCount(4),
shardCollector
});
UNIT_ASSERT_VALUES_EQUAL(indexShards.size(), 4);

{
// make sure no shards are merged
TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> mergeBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions;
});

{
// wait for all index shards to send statistics
THashSet<ui64> shardsWithStats;
using TEvType = TEvDataShard::TEvPeriodicTableStats;
auto statsObserver = runtime.AddObserver<TEvType>([&shardsWithStats](const TEvType::TPtr& ev) {
shardsWithStats.emplace(ev->Get()->Record.GetDatashardId());
});

runtime.WaitFor("all index shards to send statistics", [&]{
return AllOf(indexShards, [&shardsWithStats](ui64 indexShard) {
return shardsWithStats.contains(indexShard);
});
});
}

// we expect to not have observed any attempts to merge
UNIT_ASSERT(mergeBlocker.empty());

// wait for 1 minute to ensure that no merges have been started by SchemeShard
env.SimulateSleep(runtime, TDuration::Minutes(1));
UNIT_ASSERT(mergeBlocker.empty());
}

// splits are allowed even if the index is not ready
TestSplitTable(runtime, ++txId, "/MyRoot/Table/ByValue/indexImplTable", Sprintf(R"(
SourceTabletId: %lu
SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5 } } } }
)",
indexShards.front()
)
);
env.TestWaitNotification(runtime, txId);

indexApplicationBlocker.Stop().Unblock();
env.TestWaitNotification(runtime, buildIndexTx);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
NLs::IndexState(NKikimrSchemeOp::EIndexStateReady)
});

// wait until all index impl table shards are merged into one
while (true) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
shardCollector
});
if (indexShards.size() > 1) {
// If a merge happens, old shards are deleted and replaced with a new one.
// That is why we need to wait for * all * the shards to be deleted.
env.TestWaitTabletDeletion(runtime, indexShards);
} else {
break;
}
}
}

Y_UNIT_TEST(DropIndex) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down
64 changes: 64 additions & 0 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/tablet_flat/util_fmt_cell.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_utils.h>

Expand Down Expand Up @@ -277,6 +278,69 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
// test requires more txids than cached at start
}

Y_UNIT_TEST(MergeIndexTableShards) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
TTestEnv env(runtime, opts);

ui64 txId = 100;

TBlockEvents<TEvDataShard::TEvPeriodicTableStats> statsBlocker(runtime);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
}
IndexDescription {
Name: "ByValue"
KeyColumnNames: ["value"]
IndexImplTableDescription {
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "A" } } } }
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "B" } } } }
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "C" } } } }
}
}
)"
);
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true),
{ NLs::PartitionCount(4) }
);

statsBlocker.Stop().Unblock();

TVector<ui64> indexShards;
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
const auto& partitions = record.GetPathDescription().GetTablePartitions();
indexShards.clear();
indexShards.reserve(partitions.size());
for (const auto& partition : partitions) {
indexShards.emplace_back(partition.GetDatashardId());
}
};

// wait until all index impl table shards are merged into one
while (true) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
shardCollector
});
if (indexShards.size() > 1) {
// If a merge happens, old shards are deleted and replaced with a new one.
// That is why we need to wait for * all * the shards to be deleted.
env.TestWaitTabletDeletion(runtime, indexShards);
} else {
break;
}
}
}

Y_UNIT_TEST(AutoMergeInOne) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
Expand Down
Loading