Skip to content

Commit 13b1325

Browse files
molotkov-andblinkov
authored andcommitted
Data erasure refactoring (#15007)
1 parent 678a2ca commit 13b1325

14 files changed

+266
-428
lines changed

ydb/core/tx/schemeshard/schemeshard.h

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -698,64 +698,28 @@ namespace TEvSchemeShard {
698698
};
699699

700700
struct TEvTenantDataErasureResponse : TEventPB<TEvTenantDataErasureResponse, NKikimrScheme::TEvTenantDataErasureResponse, EvTenantDataErasureResponse> {
701-
enum class EStatus {
702-
UNSPECIFIED,
703-
COMPLETED,
704-
IN_PROGRESS,
705-
};
706701

707702
TEvTenantDataErasureResponse() = default;
708-
709-
TEvTenantDataErasureResponse(const TPathId& pathId, ui64 generation, const EStatus& status) {
703+
TEvTenantDataErasureResponse(const TPathId& pathId, ui64 generation, const NKikimrScheme::TEvTenantDataErasureResponse::EStatus& status) {
710704
Record.MutablePathId()->SetOwnerId(pathId.OwnerId);
711705
Record.MutablePathId()->SetLocalId(pathId.LocalPathId);
712706
Record.SetGeneration(generation);
713-
Record.SetStatus(ConvertStatus(status));
707+
Record.SetStatus(status);
714708
}
715709

716-
TEvTenantDataErasureResponse(ui64 ownerId, ui64 localPathId, ui64 generation, const EStatus& status)
710+
TEvTenantDataErasureResponse(ui64 ownerId, ui64 localPathId, ui64 generation, const NKikimrScheme::TEvTenantDataErasureResponse::EStatus& status)
717711
: TEvTenantDataErasureResponse(TPathId(ownerId, localPathId), generation, status)
718712
{}
719-
720-
NKikimrScheme::TEvTenantDataErasureResponse::EStatus ConvertStatus(const EStatus& status) {
721-
switch (status) {
722-
case EStatus::UNSPECIFIED:
723-
return NKikimrScheme::TEvTenantDataErasureResponse::UNSPECIFIED;
724-
case EStatus::COMPLETED:
725-
return NKikimrScheme::TEvTenantDataErasureResponse::COMPLETED;
726-
case EStatus::IN_PROGRESS:
727-
return NKikimrScheme::TEvTenantDataErasureResponse::IN_PROGRESS;
728-
}
729-
}
730713
};
731714

732715
struct TEvDataErasureInfoRequest : TEventPB<TEvDataErasureInfoRequest, NKikimrScheme::TEvDataErasureInfoRequest, EvDataErasureInfoRequest> {};
733716

734717
struct TEvDataErasureInfoResponse : TEventPB<TEvDataErasureInfoResponse, NKikimrScheme::TEvDataErasureInfoResponse, EvDataErasureInfoResponse> {
735-
enum class EStatus {
736-
UNSPECIFIED,
737-
COMPLETED,
738-
IN_PROGRESS_TENANT,
739-
IN_PROGRESS_BSC,
740-
};
741718

742719
TEvDataErasureInfoResponse() = default;
743-
TEvDataErasureInfoResponse(ui64 generation, const EStatus& status) {
720+
TEvDataErasureInfoResponse(ui64 generation, const NKikimrScheme::TEvDataErasureInfoResponse::EStatus& status) {
744721
Record.SetGeneration(generation);
745-
Record.SetStatus(ConvertStatus(status));
746-
}
747-
748-
NKikimrScheme::TEvDataErasureInfoResponse::EStatus ConvertStatus(const EStatus& status) {
749-
switch (status) {
750-
case EStatus::UNSPECIFIED:
751-
return NKikimrScheme::TEvDataErasureInfoResponse::UNSPECIFIED;
752-
case EStatus::COMPLETED:
753-
return NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED;
754-
case EStatus::IN_PROGRESS_TENANT:
755-
return NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_TENANT;
756-
case EStatus::IN_PROGRESS_BSC:
757-
return NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_BSC;
758-
}
722+
Record.SetStatus(status);
759723
}
760724
};
761725

ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ TDataErasureManager::TDataErasureManager(TSchemeShard* const schemeShard)
66
: SchemeShard(schemeShard)
77
{}
88

9-
TDataErasureManager::EStatus TDataErasureManager::GetStatus() const {
9+
EDataErasureStatus TDataErasureManager::GetStatus() const {
1010
return Status;
1111
}
1212

13-
void TDataErasureManager::SetStatus(const EStatus& status) {
13+
void TDataErasureManager::SetStatus(const EDataErasureStatus& status) {
1414
Status = status;
1515
}
1616

ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,9 @@ namespace NKikimr::NSchemeShard {
2121
class TSchemeShard;
2222

2323
class TDataErasureManager {
24-
public:
25-
enum class EStatus : ui32 {
26-
UNSPECIFIED,
27-
COMPLETED,
28-
IN_PROGRESS,
29-
IN_PROGRESS_BSC,
30-
};
31-
3224
protected:
3325
TSchemeShard* const SchemeShard;
34-
EStatus Status = EStatus::UNSPECIFIED;
26+
EDataErasureStatus Status = EDataErasureStatus::UNSPECIFIED;
3527
ui64 Generation = 0;
3628
bool Running = false;
3729

@@ -62,8 +54,8 @@ class TDataErasureManager {
6254

6355
void Clear();
6456

65-
EStatus GetStatus() const;
66-
void SetStatus(const EStatus& status);
57+
EDataErasureStatus GetStatus() const;
58+
void SetStatus(const EDataErasureStatus& status);
6759

6860
void IncGeneration();
6961
void SetGeneration(ui64 generation);
@@ -97,7 +89,7 @@ using TQueue = NOperationQueue::TOperationQueueWithTimer<
9789
private:
9890
TStarter Starter;
9991
TQueue* Queue = nullptr;
100-
THashMap<TPathId, EStatus> WaitingDataErasureTenants;
92+
THashMap<TPathId, EDataErasureStatus> WaitingDataErasureTenants;
10193
THashMap<TPathId, TActorId> ActivePipes;
10294

10395
TDuration DataErasureInterval;
@@ -169,7 +161,7 @@ using TQueue = NOperationQueue::TOperationQueueWithTimer<
169161
private:
170162
TStarter Starter;
171163
TQueue* Queue = nullptr;
172-
THashMap<TShardIdx, EStatus> WaitingDataErasureShards;
164+
THashMap<TShardIdx, EDataErasureStatus> WaitingDataErasureShards;
173165
THashMap<TShardIdx, TActorId> ActivePipes;
174166

175167
public:

ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ void TRootDataErasureManager::Start() {
6666
"[RootDataErasureManager] Start: Status# " << static_cast<ui32>(Status));
6767

6868
Queue->Start();
69-
if (Status == EStatus::UNSPECIFIED) {
69+
if (Status == EDataErasureStatus::UNSPECIFIED) {
7070
SchemeShard->MarkFirstRunRootDataErasureManager();
7171
ScheduleDataErasureWakeup();
72-
} else if (Status == EStatus::COMPLETED) {
72+
} else if (Status == EDataErasureStatus::COMPLETED) {
7373
ScheduleDataErasureWakeup();
7474
} else {
7575
ClearOperationQueue();
@@ -111,7 +111,7 @@ void TRootDataErasureManager::ClearWaitingDataErasureRequests() {
111111
}
112112

113113
void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) {
114-
Status = EStatus::IN_PROGRESS;
114+
Status = EDataErasureStatus::IN_PROGRESS;
115115
StartTime = AppData(SchemeShard->ActorContext())->TimeProvider->Now();
116116
for (auto& [pathId, subdomain] : SchemeShard->SubDomains) {
117117
auto path = TPath::Init(pathId, SchemeShard);
@@ -122,14 +122,14 @@ void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) {
122122
continue;
123123
}
124124
Enqueue(pathId);
125-
WaitingDataErasureTenants[pathId] = EStatus::IN_PROGRESS;
126-
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(static_cast<ui32>(WaitingDataErasureTenants[pathId]));
125+
WaitingDataErasureTenants[pathId] = EDataErasureStatus::IN_PROGRESS;
126+
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(WaitingDataErasureTenants[pathId]);
127127
}
128128
if (WaitingDataErasureTenants.empty()) {
129-
Status = EStatus::IN_PROGRESS_BSC;
129+
Status = EDataErasureStatus::IN_PROGRESS_BSC;
130130
}
131131
db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status,
132-
Schema::DataErasureGenerations::StartTime>(static_cast<ui32>(Status), StartTime.MicroSeconds());
132+
Schema::DataErasureGenerations::StartTime>(Status, StartTime.MicroSeconds());
133133

134134
const auto ctx = SchemeShard->ActorContext();
135135
LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -139,13 +139,13 @@ void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) {
139139
}
140140

141141
void TRootDataErasureManager::Continue() {
142-
if (Status == EStatus::IN_PROGRESS) {
142+
if (Status == EDataErasureStatus::IN_PROGRESS) {
143143
for (const auto& [pathId, status] : WaitingDataErasureTenants) {
144-
if (status == EStatus::IN_PROGRESS) {
144+
if (status == EDataErasureStatus::IN_PROGRESS) {
145145
Enqueue(pathId);
146146
}
147147
}
148-
} else if (Status == EStatus::IN_PROGRESS_BSC) {
148+
} else if (Status == EDataErasureStatus::IN_PROGRESS_BSC) {
149149
SendRequestToBSC();
150150
}
151151

@@ -313,16 +313,16 @@ void TRootDataErasureManager::OnDone(const TPathId& pathId, NIceDb::TNiceDb& db)
313313
ActivePipes.erase(pathId);
314314
auto it = WaitingDataErasureTenants.find(pathId);
315315
if (it != WaitingDataErasureTenants.end()) {
316-
it->second = EStatus::COMPLETED;
317-
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(static_cast<ui32>(it->second));
316+
it->second = EDataErasureStatus::COMPLETED;
317+
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(it->second);
318318
}
319319

320320
SchemeShard->TabletCounters->Cumulative()[COUNTER_DATA_ERASURE_OK].Increment(1);
321321
UpdateMetrics();
322322

323323
bool isDataErasureCompleted = true;
324324
for (const auto& [pathId, status] : WaitingDataErasureTenants) {
325-
if (status == EStatus::IN_PROGRESS) {
325+
if (status == EDataErasureStatus::IN_PROGRESS) {
326326
isDataErasureCompleted = false;
327327
break;
328328
}
@@ -331,8 +331,8 @@ void TRootDataErasureManager::OnDone(const TPathId& pathId, NIceDb::TNiceDb& db)
331331
if (isDataErasureCompleted) {
332332
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
333333
"[RootDataErasureManager] Data erasure in tenants is completed. Send request to BS controller");
334-
Status = EStatus::IN_PROGRESS_BSC;
335-
db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status>(static_cast<ui32>(Status));
334+
Status = EDataErasureStatus::IN_PROGRESS_BSC;
335+
db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status>(Status);
336336
}
337337
}
338338

@@ -366,7 +366,7 @@ void TRootDataErasureManager::SendRequestToBSC() {
366366
}
367367

368368
void TRootDataErasureManager::Complete() {
369-
Status = EStatus::COMPLETED;
369+
Status = EDataErasureStatus::COMPLETED;
370370
auto ctx = SchemeShard->ActorContext();
371371
FinishTime = AppData(ctx)->TimeProvider->Now();
372372
TDuration dataErasureDuration = FinishTime - StartTime;
@@ -391,27 +391,23 @@ bool TRootDataErasureManager::Restore(NIceDb::TNiceDb& db) {
391391
return false;
392392
}
393393
if (rowset.EndOfSet()) {
394-
Status = EStatus::UNSPECIFIED;
394+
Status = EDataErasureStatus::UNSPECIFIED;
395395
} else {
396396
Generation = 0;
397-
Status = EStatus::UNSPECIFIED;
397+
Status = EDataErasureStatus::UNSPECIFIED;
398398
while (!rowset.EndOfSet()) {
399399
ui64 generation = rowset.GetValue<Schema::DataErasureGenerations::Generation>();
400400
if (generation >= Generation) {
401401
Generation = generation;
402402
StartTime = TInstant::FromValue(rowset.GetValue<Schema::DataErasureGenerations::StartTime>());
403-
ui32 statusValue = rowset.GetValue<Schema::DataErasureGenerations::Status>();
404-
if (statusValue >= static_cast<ui32>(EStatus::UNSPECIFIED) &&
405-
statusValue <= static_cast<ui32>(EStatus::IN_PROGRESS_BSC)) {
406-
Status = static_cast<EStatus>(statusValue);
407-
}
403+
Status = rowset.GetValue<Schema::DataErasureGenerations::Status>();
408404
}
409405

410406
if (!rowset.Next()) {
411407
return false;
412408
}
413409
}
414-
if (Status == EStatus::UNSPECIFIED || Status == EStatus::COMPLETED) {
410+
if (Status == EDataErasureStatus::UNSPECIFIED || Status == EDataErasureStatus::COMPLETED) {
415411
auto ctx = SchemeShard->ActorContext();
416412
TDuration interval = AppData(ctx)->TimeProvider->Now() - StartTime;
417413
if (interval > DataErasureInterval) {
@@ -439,24 +435,18 @@ bool TRootDataErasureManager::Restore(NIceDb::TNiceDb& db) {
439435

440436
Y_ABORT_UNLESS(SchemeShard->SubDomains.contains(pathId));
441437

442-
ui32 statusValue = rowset.GetValue<Schema::WaitingDataErasureTenants::Status>();
443-
EStatus status = EStatus::COMPLETED;
444-
if (statusValue >= static_cast<ui32>(EStatus::UNSPECIFIED) &&
445-
statusValue <= static_cast<ui32>(EStatus::IN_PROGRESS_BSC)) {
446-
status = static_cast<EStatus>(statusValue);
447-
}
448-
438+
EDataErasureStatus status = rowset.GetValue<Schema::WaitingDataErasureTenants::Status>();
449439
WaitingDataErasureTenants[pathId] = status;
450-
if (status == EStatus::IN_PROGRESS) {
440+
if (status == EDataErasureStatus::IN_PROGRESS) {
451441
numberDataErasureTenantsInRunning++;
452442
}
453443

454444
if (!rowset.Next()) {
455445
return false;
456446
}
457447
}
458-
if (Status == EStatus::IN_PROGRESS && (WaitingDataErasureTenants.empty() || numberDataErasureTenantsInRunning == 0)) {
459-
Status = EStatus::IN_PROGRESS_BSC;
448+
if (Status == EDataErasureStatus::IN_PROGRESS && (WaitingDataErasureTenants.empty() || numberDataErasureTenantsInRunning == 0)) {
449+
Status = EDataErasureStatus::IN_PROGRESS_BSC;
460450
}
461451
}
462452

@@ -475,10 +465,10 @@ bool TRootDataErasureManager::Remove(const TPathId& pathId) {
475465
if (it != WaitingDataErasureTenants.end()) {
476466
Queue->Remove(pathId);
477467
ActivePipes.erase(pathId);
478-
WaitingDataErasureTenants[pathId] = EStatus::COMPLETED;
468+
WaitingDataErasureTenants[pathId] = EDataErasureStatus::COMPLETED;
479469
bool isDataErasureCompleted = true;
480470
for (const auto& [pathId, status] : WaitingDataErasureTenants) {
481-
if (status == EStatus::IN_PROGRESS) {
471+
if (status == EDataErasureStatus::IN_PROGRESS) {
482472
isDataErasureCompleted = false;
483473
break;
484474
}
@@ -529,9 +519,9 @@ struct TSchemeShard::TTxDataErasureManagerInit : public TSchemeShard::TRwTxBase
529519
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
530520
"TTxDataErasureManagerInit Execute at schemeshard: " << Self->TabletID());
531521
NIceDb::TNiceDb db(txc.DB);
532-
Self->DataErasureManager->SetStatus(TDataErasureManager::EStatus::COMPLETED);
522+
Self->DataErasureManager->SetStatus(EDataErasureStatus::COMPLETED);
533523
db.Table<Schema::DataErasureGenerations>().Key(0).Update<Schema::DataErasureGenerations::Status,
534-
Schema::DataErasureGenerations::StartTime>(static_cast<ui32>(Self->DataErasureManager->GetStatus()), AppData(ctx)->TimeProvider->Now().MicroSeconds());
524+
Schema::DataErasureGenerations::StartTime>(Self->DataErasureManager->GetStatus(), AppData(ctx)->TimeProvider->Now().MicroSeconds());
535525
}
536526

537527
void DoComplete(const TActorContext& ctx) override {
@@ -568,7 +558,7 @@ struct TSchemeShard::TTxRunDataErasure : public TSchemeShard::TRwTxBase {
568558
dataErasureManager->IncGeneration();
569559
dataErasureManager->Run(db);
570560
}
571-
if (Self->DataErasureManager->GetStatus() == TDataErasureManager::EStatus::IN_PROGRESS_BSC) {
561+
if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS_BSC) {
572562
NeedSendRequestToBSC = true;
573563
}
574564
}
@@ -617,7 +607,7 @@ struct TSchemeShard::TTxCompleteDataErasureTenant : public TSchemeShard::TRwTxBa
617607
record.GetPathId().GetOwnerId(),
618608
record.GetPathId().GetLocalId());
619609
manager->OnDone(pathId, db);
620-
if (manager->GetStatus() == TDataErasureManager::EStatus::IN_PROGRESS_BSC) {
610+
if (manager->GetStatus() == EDataErasureStatus::IN_PROGRESS_BSC) {
621611
NeedSendRequestToBSC = true;
622612
}
623613
}
@@ -663,7 +653,7 @@ struct TSchemeShard::TTxCompleteDataErasureBSC : public TSchemeShard::TRwTxBase
663653
if (record.GetCompleted()) {
664654
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Data shred in BSC is completed");
665655
manager->Complete();
666-
db.Table<Schema::DataErasureGenerations>().Key(Self->DataErasureManager->GetGeneration()).Update<Schema::DataErasureGenerations::Status>(static_cast<ui32>(Self->DataErasureManager->GetStatus()));
656+
db.Table<Schema::DataErasureGenerations>().Key(Self->DataErasureManager->GetGeneration()).Update<Schema::DataErasureGenerations::Status>(Self->DataErasureManager->GetStatus());
667657
} else {
668658
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Progress data shred in BSC " << record.GetProgress10k());
669659
NeedScheduleRequestToBSC = true;

0 commit comments

Comments
 (0)