Skip to content

Commit 7a49804

Browse files
improve resharding tests (#5497)
1 parent 14233a8 commit 7a49804

File tree

13 files changed

+44
-16
lines changed

13 files changed

+44
-16
lines changed

ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,10 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
269269
private:
270270
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");
271271

272-
void WaitResharding() {
272+
void WaitResharding(const TString& hint = "") {
273273
const TInstant start = TInstant::Now();
274274
bool clean = false;
275-
while (TInstant::Now() - start < TDuration::Seconds(200)) {
275+
while (TInstant::Now() - start < TDuration::Seconds(20)) {
276276
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
277277
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
278278
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
@@ -283,7 +283,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
283283
}
284284
UNIT_ASSERT_VALUES_EQUAL(result.GetList().size(), 1);
285285
Sleep(TDuration::Seconds(1));
286-
Cerr << "WAIT_FINISHED..." << Endl;
286+
Cerr << "RESHARDING_WAIT_FINISHED... (" << hint << ")" << Endl;
287287
}
288288
AFL_VERIFY(clean);
289289
}
@@ -317,7 +317,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
317317
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
318318
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
319319

320-
TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 16, 4);
320+
TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 24, 4);
321321
auto tableClient = Kikimr.GetTableClient();
322322

323323
Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
@@ -354,19 +354,23 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
354354
}
355355

356356
CheckCount(230000);
357-
{
357+
for (ui32 i = 0; i < 2; ++i) {
358358
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
359359
auto session = tableClient.CreateSession().GetValueSync().GetSession();
360360
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
361361
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
362+
WaitResharding("SPLIT:" + ::ToString(i));
363+
}
364+
{
365+
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
366+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
367+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
368+
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
362369
}
363-
364-
WaitResharding();
365370
AFL_VERIFY(csController->GetShardingFiltersCount().Val() == 0);
366371
CheckCount(230000);
367-
368372
i64 count = csController->GetShardingFiltersCount().Val();
369-
AFL_VERIFY(count == 16)("count", count);
373+
AFL_VERIFY(count >= 16)("count", count);
370374
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
371375
csController->WaitIndexation(TDuration::Seconds(5));
372376
csController->WaitCompactions(TDuration::Seconds(5));
@@ -376,15 +380,15 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
376380
CheckCount(230000);
377381

378382
AFL_VERIFY(count == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
379-
const ui32 portionsCount = 8;
380-
for (ui32 i = 0; i < 3; ++i) {
383+
const ui32 portionsCount = 16;
384+
for (ui32 i = 0; i < 4; ++i) {
381385
{
382386
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
383387
auto session = tableClient.CreateSession().GetValueSync().GetSession();
384388
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
385389
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
386390
}
387-
WaitResharding();
391+
WaitResharding("MERGE:" + ::ToString(i));
388392
// csController->WaitCleaning(TDuration::Seconds(5));
389393

390394
CheckCount(230000);

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,7 @@ message TShardingTransfer {
16491649
optional uint64 DestinationTabletId = 1;
16501650
repeated uint64 SourceTabletIds = 2;
16511651
optional bool Moving = 3 [default = false];
1652+
optional string SessionId = 4;
16521653
}
16531654

16541655
message TShardingTransfers {

ydb/core/tx/columnshard/data_sharing/common/session/common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
namespace NKikimr::NOlap::NDataSharing {
1010

1111
TString TCommonSession::DebugString() const {
12-
return TStringBuilder() << "{id=" << SessionId << ";context=" << TransferContext.DebugString() << ";}";
12+
return TStringBuilder() << "{id=" << SessionId << ";context=" << TransferContext.DebugString() << ";state=" << State << ";}";
1313
}
1414

1515
bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) {

ydb/core/tx/columnshard/data_sharing/common/session/common.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ class TManager;
2121
namespace NKikimr::NOlap::NDataSharing {
2222

2323
class TCommonSession {
24-
private:
24+
public:
2525
enum class EState {
2626
Created,
2727
Prepared,
2828
InProgress,
2929
Finished
3030
};
3131

32+
private:
3233
static ui64 GetNextRuntimeId() {
3334
static TAtomicCounter Counter = 0;
3435
return (ui64)Counter.Inc();
@@ -56,6 +57,7 @@ class TCommonSession {
5657
: SessionId(sessionId)
5758
, Info(info)
5859
, TransferContext(transferContext) {
60+
AFL_VERIFY(!!SessionId);
5961
}
6062

6163
const TTransferContext& GetTransferContext() const {
@@ -70,6 +72,10 @@ class TCommonSession {
7072
return State == EState::Prepared;
7173
}
7274

75+
bool IsFinished() const {
76+
return State == EState::Finished;
77+
}
78+
7379
bool IsInProgress() const {
7480
return State == EState::InProgress;
7581
}

ydb/core/tx/columnshard/data_sharing/common/session/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ PEERDIR(
1111
ydb/core/tablet_flat
1212
)
1313

14+
GENERATE_ENUM_SERIALIZATION(common.h)
15+
1416
END()

ydb/core/tx/columnshard/transactions/operators/sharing.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ bool TSharingTransactionOperator::DoParse(TColumnShard& owner, const TString& da
2525
if (currentSession) {
2626
SessionExistsFlag = true;
2727
SharingTask = currentSession;
28+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "session_exists")("session_id", SharingTask->GetSessionId())("info", SharingTask->DebugString());
2829
} else {
2930
SharingTask->Confirm();
3031
}

ydb/core/tx/columnshard/transactions/operators/sharing.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ class TSharingTransactionOperator: public IProposeTxOperator {
2626
virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
2727
}
2828
virtual bool DoIsAsync() const override {
29-
return true;
29+
AFL_VERIFY(SharingTask);
30+
return !SharingTask->IsFinished();
3031
}
3132
virtual bool DoParse(TColumnShard& owner, const TString& data) override;
3233
virtual TString DoDebugString() const override {

ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class TTxChainActor: public NKikimr::NOlap::NBackground::TSessionActor {
1717
NActors::TActorId TxAllocatorClient;
1818

1919
void SendCurrentTxToSS() {
20+
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("chain_tx", SessionLogic->GetTxData().GetTransactions()[SessionLogic->GetStepForExecute()].DebugString());
2021
auto evModification = std::make_unique<TEvSchemeShard::TEvModifySchemeTransaction>(SessionLogic->GetCurrentTxIdVerified(), (ui64)TabletId);
2122
*evModification->Record.AddTransaction() = SessionLogic->GetTxData().GetTransactions()[SessionLogic->GetStepForExecute()];
2223
NActors::TActivationContext::AsActorContext().Send(TabletActorId, evModification.release());

ydb/core/tx/schemeshard/olap/layout/layout.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ class TColumnTablesLayout {
129129
public:
130130
TColumnTablesLayout(std::vector<TTablesGroup>&& groups);
131131

132+
void RemoveGroupsWithPathId(const TPathId& pathId) {
133+
const auto pred = [&](const TTablesGroup& item) {
134+
return item.GetTableIds().GetIds().contains(pathId);
135+
};
136+
Groups.erase(std::remove_if(Groups.begin(), Groups.end(), pred), Groups.end());
137+
}
138+
132139
static std::vector<ui64> ShardIdxToTabletId(const std::vector<TShardIdx>& shards, const TSchemeShard& ss);
133140

134141
static TColumnTablesLayout BuildTrivial(const std::vector<ui64>& tabletIds);

ydb/core/tx/schemeshard/olap/operations/alter/in_store/resharding/update.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ TConclusionStatus TInStoreShardingUpdate::DoInitialize(const TUpdateInitializati
2626
auto layoutPolicy = storeInfo.GetTablesLayoutPolicy();
2727
auto currentLayout = context.GetSSOperationContext()->SS->ColumnTables.GetTablesLayout(TColumnTablesLayout::ShardIdxToTabletId(
2828
storeInfo.GetColumnShards(), *context.GetSSOperationContext()->SS));
29+
currentLayout.RemoveGroupsWithPathId(context.GetOriginalEntity().GetPathId());
2930
auto tablePtr = context.GetSSOperationContext()->SS->ColumnTables.GetVerifiedPtr(context.GetOriginalEntity().GetPathId());
3031
if (context.GetModification()->GetAlterColumnTable().GetReshardColumnTable().GetIncrease()) {
3132
const ui32 shardsCount = inStoreTable.GetTableInfoPtrVerified()->GetColumnShards().size();

ydb/core/tx/schemeshard/olap/operations/alter/in_store/transfer/update.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ NKikimr::TConclusionStatus TInStoreShardsTransfer::DoInitializeImpl(const TUpdat
1313
auto sharding = table.GetTableInfo()->GetShardingVerified(table.GetTableSchemaVerified());
1414
for (auto&& alter : context.GetModification()->GetAlterColumnTable().GetAlterShards().GetTransfer().GetTransfers()) {
1515
NKikimrColumnShardDataSharingProto::TDestinationSession destinationSession;
16-
destinationSession.SetSessionId("SHARE_TO_SHARD::" + ::ToString(alter.GetDestinationTabletId()));
16+
destinationSession.SetSessionId(alter.GetSessionId());
1717
*destinationSession.MutableInitiatorController() = NKikimr::NOlap::NDataSharing::TInitiatorControllerContainer(
1818
std::make_shared<NKikimr::NOlap::NDataSharing::TSSInitiatorController>(context.GetSSOperationContext()->SS->TabletID(), 0)).SerializeToProto();
1919
{

ydb/core/tx/sharding/hash_intervals.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ NKikimr::TConclusion<std::vector<NKikimrSchemeOp::TAlterShards>> TConsistencySha
2828
auto& transfer = *alter.MutableTransfer()->AddTransfers();
2929
transfer.SetDestinationTabletId(newTabletIds[idx]);
3030
transfer.AddSourceTabletIds(i);
31+
transfer.SetSessionId("SPLIT_TO::" + ::ToString(::ToString(newTabletIds[idx])) + "::" + TGUID::CreateTimebased().AsGuidString());
3132
result.emplace_back(alter);
3233
}
3334
{
@@ -74,6 +75,7 @@ NKikimr::TConclusion<std::vector<NKikimrSchemeOp::TAlterShards>> TConsistencySha
7475
transfer.SetDestinationTabletId(newTabletIds[idx]);
7576
transfer.AddSourceTabletIds(from1);
7677
transfer.AddSourceTabletIds(from2);
78+
transfer.SetSessionId("MERGE_TO::" + ::ToString(::ToString(newTabletIds[idx])) + "::" + TGUID::CreateTimebased().AsGuidString());
7779
transfer.SetMoving(true);
7880
result.emplace_back(alter);
7981
}

ydb/core/tx/sharding/hash_modulo.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ NKikimr::TConclusion<std::vector<NKikimrSchemeOp::TAlterShards>> THashShardingMo
6464
auto& transfer = *alter.MutableTransfer()->AddTransfers();
6565
transfer.SetDestinationTabletId(newTabletIds[idx]);
6666
transfer.AddSourceTabletIds(i);
67+
transfer.SetSessionId("SPLIT_TO::" + ::ToString(::ToString(newTabletIds[idx])) + "::" + TGUID::CreateTimebased().AsGuidString());
6768
result.emplace_back(alter);
6869
}
6970
{
@@ -194,6 +195,7 @@ NKikimr::TConclusion<std::vector<NKikimrSchemeOp::TAlterShards>> THashShardingMo
194195
transfer.SetDestinationTabletId(newTabletIds[idx]);
195196
transfer.AddSourceTabletIds(from1);
196197
transfer.AddSourceTabletIds(from2);
198+
transfer.SetSessionId("MERGE_TO::" + ::ToString(::ToString(newTabletIds[idx])) + "::" + TGUID::CreateTimebased().AsGuidString());
197199
transfer.SetMoving(true);
198200
result.emplace_back(alter);
199201
}

0 commit comments

Comments
 (0)