Skip to content

Data erasure refactoring #15007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 5 additions & 41 deletions ydb/core/tx/schemeshard/schemeshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -698,64 +698,28 @@ namespace TEvSchemeShard {
};

struct TEvTenantDataErasureResponse : TEventPB<TEvTenantDataErasureResponse, NKikimrScheme::TEvTenantDataErasureResponse, EvTenantDataErasureResponse> {
enum class EStatus {
UNSPECIFIED,
COMPLETED,
IN_PROGRESS,
};

TEvTenantDataErasureResponse() = default;

TEvTenantDataErasureResponse(const TPathId& pathId, ui64 generation, const EStatus& status) {
TEvTenantDataErasureResponse(const TPathId& pathId, ui64 generation, const NKikimrScheme::TEvTenantDataErasureResponse::EStatus& status) {
Record.MutablePathId()->SetOwnerId(pathId.OwnerId);
Record.MutablePathId()->SetLocalId(pathId.LocalPathId);
Record.SetGeneration(generation);
Record.SetStatus(ConvertStatus(status));
Record.SetStatus(status);
}

TEvTenantDataErasureResponse(ui64 ownerId, ui64 localPathId, ui64 generation, const EStatus& status)
TEvTenantDataErasureResponse(ui64 ownerId, ui64 localPathId, ui64 generation, const NKikimrScheme::TEvTenantDataErasureResponse::EStatus& status)
: TEvTenantDataErasureResponse(TPathId(ownerId, localPathId), generation, status)
{}

NKikimrScheme::TEvTenantDataErasureResponse::EStatus ConvertStatus(const EStatus& status) {
switch (status) {
case EStatus::UNSPECIFIED:
return NKikimrScheme::TEvTenantDataErasureResponse::UNSPECIFIED;
case EStatus::COMPLETED:
return NKikimrScheme::TEvTenantDataErasureResponse::COMPLETED;
case EStatus::IN_PROGRESS:
return NKikimrScheme::TEvTenantDataErasureResponse::IN_PROGRESS;
}
}
};

struct TEvDataErasureInfoRequest : TEventPB<TEvDataErasureInfoRequest, NKikimrScheme::TEvDataErasureInfoRequest, EvDataErasureInfoRequest> {};

struct TEvDataErasureInfoResponse : TEventPB<TEvDataErasureInfoResponse, NKikimrScheme::TEvDataErasureInfoResponse, EvDataErasureInfoResponse> {
enum class EStatus {
UNSPECIFIED,
COMPLETED,
IN_PROGRESS_TENANT,
IN_PROGRESS_BSC,
};

TEvDataErasureInfoResponse() = default;
TEvDataErasureInfoResponse(ui64 generation, const EStatus& status) {
TEvDataErasureInfoResponse(ui64 generation, const NKikimrScheme::TEvDataErasureInfoResponse::EStatus& status) {
Record.SetGeneration(generation);
Record.SetStatus(ConvertStatus(status));
}

NKikimrScheme::TEvDataErasureInfoResponse::EStatus ConvertStatus(const EStatus& status) {
switch (status) {
case EStatus::UNSPECIFIED:
return NKikimrScheme::TEvDataErasureInfoResponse::UNSPECIFIED;
case EStatus::COMPLETED:
return NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED;
case EStatus::IN_PROGRESS_TENANT:
return NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_TENANT;
case EStatus::IN_PROGRESS_BSC:
return NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_BSC;
}
Record.SetStatus(status);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ TDataErasureManager::TDataErasureManager(TSchemeShard* const schemeShard)
: SchemeShard(schemeShard)
{}

TDataErasureManager::EStatus TDataErasureManager::GetStatus() const {
EDataErasureStatus TDataErasureManager::GetStatus() const {
return Status;
}

void TDataErasureManager::SetStatus(const EStatus& status) {
void TDataErasureManager::SetStatus(const EDataErasureStatus& status) {
Status = status;
}

Expand Down
18 changes: 5 additions & 13 deletions ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ namespace NKikimr::NSchemeShard {
class TSchemeShard;

class TDataErasureManager {
public:
enum class EStatus : ui32 {
UNSPECIFIED,
COMPLETED,
IN_PROGRESS,
IN_PROGRESS_BSC,
};

protected:
TSchemeShard* const SchemeShard;
EStatus Status = EStatus::UNSPECIFIED;
EDataErasureStatus Status = EDataErasureStatus::UNSPECIFIED;
ui64 Generation = 0;
bool Running = false;

Expand Down Expand Up @@ -62,8 +54,8 @@ class TDataErasureManager {

void Clear();

EStatus GetStatus() const;
void SetStatus(const EStatus& status);
EDataErasureStatus GetStatus() const;
void SetStatus(const EDataErasureStatus& status);

void IncGeneration();
void SetGeneration(ui64 generation);
Expand Down Expand Up @@ -97,7 +89,7 @@ using TQueue = NOperationQueue::TOperationQueueWithTimer<
private:
TStarter Starter;
TQueue* Queue = nullptr;
THashMap<TPathId, EStatus> WaitingDataErasureTenants;
THashMap<TPathId, EDataErasureStatus> WaitingDataErasureTenants;
THashMap<TPathId, TActorId> ActivePipes;

TDuration DataErasureInterval;
Expand Down Expand Up @@ -169,7 +161,7 @@ using TQueue = NOperationQueue::TOperationQueueWithTimer<
private:
TStarter Starter;
TQueue* Queue = nullptr;
THashMap<TShardIdx, EStatus> WaitingDataErasureShards;
THashMap<TShardIdx, EDataErasureStatus> WaitingDataErasureShards;
THashMap<TShardIdx, TActorId> ActivePipes;

public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ void TRootDataErasureManager::Start() {
"[RootDataErasureManager] Start: Status# " << static_cast<ui32>(Status));

Queue->Start();
if (Status == EStatus::UNSPECIFIED) {
if (Status == EDataErasureStatus::UNSPECIFIED) {
SchemeShard->MarkFirstRunRootDataErasureManager();
ScheduleDataErasureWakeup();
} else if (Status == EStatus::COMPLETED) {
} else if (Status == EDataErasureStatus::COMPLETED) {
ScheduleDataErasureWakeup();
} else {
ClearOperationQueue();
Expand Down Expand Up @@ -111,7 +111,7 @@ void TRootDataErasureManager::ClearWaitingDataErasureRequests() {
}

void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) {
Status = EStatus::IN_PROGRESS;
Status = EDataErasureStatus::IN_PROGRESS;
StartTime = AppData(SchemeShard->ActorContext())->TimeProvider->Now();
for (auto& [pathId, subdomain] : SchemeShard->SubDomains) {
auto path = TPath::Init(pathId, SchemeShard);
Expand All @@ -122,14 +122,14 @@ void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) {
continue;
}
Enqueue(pathId);
WaitingDataErasureTenants[pathId] = EStatus::IN_PROGRESS;
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(static_cast<ui32>(WaitingDataErasureTenants[pathId]));
WaitingDataErasureTenants[pathId] = EDataErasureStatus::IN_PROGRESS;
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(WaitingDataErasureTenants[pathId]);
}
if (WaitingDataErasureTenants.empty()) {
Status = EStatus::IN_PROGRESS_BSC;
Status = EDataErasureStatus::IN_PROGRESS_BSC;
}
db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status,
Schema::DataErasureGenerations::StartTime>(static_cast<ui32>(Status), StartTime.MicroSeconds());
Schema::DataErasureGenerations::StartTime>(Status, StartTime.MicroSeconds());

const auto ctx = SchemeShard->ActorContext();
LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand All @@ -139,13 +139,13 @@ void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) {
}

void TRootDataErasureManager::Continue() {
if (Status == EStatus::IN_PROGRESS) {
if (Status == EDataErasureStatus::IN_PROGRESS) {
for (const auto& [pathId, status] : WaitingDataErasureTenants) {
if (status == EStatus::IN_PROGRESS) {
if (status == EDataErasureStatus::IN_PROGRESS) {
Enqueue(pathId);
}
}
} else if (Status == EStatus::IN_PROGRESS_BSC) {
} else if (Status == EDataErasureStatus::IN_PROGRESS_BSC) {
SendRequestToBSC();
}

Expand Down Expand Up @@ -313,16 +313,16 @@ void TRootDataErasureManager::OnDone(const TPathId& pathId, NIceDb::TNiceDb& db)
ActivePipes.erase(pathId);
auto it = WaitingDataErasureTenants.find(pathId);
if (it != WaitingDataErasureTenants.end()) {
it->second = EStatus::COMPLETED;
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(static_cast<ui32>(it->second));
it->second = EDataErasureStatus::COMPLETED;
db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(it->second);
}

SchemeShard->TabletCounters->Cumulative()[COUNTER_DATA_ERASURE_OK].Increment(1);
UpdateMetrics();

bool isDataErasureCompleted = true;
for (const auto& [pathId, status] : WaitingDataErasureTenants) {
if (status == EStatus::IN_PROGRESS) {
if (status == EDataErasureStatus::IN_PROGRESS) {
isDataErasureCompleted = false;
break;
}
Expand All @@ -331,8 +331,8 @@ void TRootDataErasureManager::OnDone(const TPathId& pathId, NIceDb::TNiceDb& db)
if (isDataErasureCompleted) {
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"[RootDataErasureManager] Data erasure in tenants is completed. Send request to BS controller");
Status = EStatus::IN_PROGRESS_BSC;
db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status>(static_cast<ui32>(Status));
Status = EDataErasureStatus::IN_PROGRESS_BSC;
db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status>(Status);
}
}

Expand Down Expand Up @@ -366,7 +366,7 @@ void TRootDataErasureManager::SendRequestToBSC() {
}

void TRootDataErasureManager::Complete() {
Status = EStatus::COMPLETED;
Status = EDataErasureStatus::COMPLETED;
auto ctx = SchemeShard->ActorContext();
FinishTime = AppData(ctx)->TimeProvider->Now();
TDuration dataErasureDuration = FinishTime - StartTime;
Expand All @@ -391,27 +391,23 @@ bool TRootDataErasureManager::Restore(NIceDb::TNiceDb& db) {
return false;
}
if (rowset.EndOfSet()) {
Status = EStatus::UNSPECIFIED;
Status = EDataErasureStatus::UNSPECIFIED;
} else {
Generation = 0;
Status = EStatus::UNSPECIFIED;
Status = EDataErasureStatus::UNSPECIFIED;
while (!rowset.EndOfSet()) {
ui64 generation = rowset.GetValue<Schema::DataErasureGenerations::Generation>();
if (generation >= Generation) {
Generation = generation;
StartTime = TInstant::FromValue(rowset.GetValue<Schema::DataErasureGenerations::StartTime>());
ui32 statusValue = rowset.GetValue<Schema::DataErasureGenerations::Status>();
if (statusValue >= static_cast<ui32>(EStatus::UNSPECIFIED) &&
statusValue <= static_cast<ui32>(EStatus::IN_PROGRESS_BSC)) {
Status = static_cast<EStatus>(statusValue);
}
Status = rowset.GetValue<Schema::DataErasureGenerations::Status>();
}

if (!rowset.Next()) {
return false;
}
}
if (Status == EStatus::UNSPECIFIED || Status == EStatus::COMPLETED) {
if (Status == EDataErasureStatus::UNSPECIFIED || Status == EDataErasureStatus::COMPLETED) {
auto ctx = SchemeShard->ActorContext();
TDuration interval = AppData(ctx)->TimeProvider->Now() - StartTime;
if (interval > DataErasureInterval) {
Expand Down Expand Up @@ -439,24 +435,18 @@ bool TRootDataErasureManager::Restore(NIceDb::TNiceDb& db) {

Y_ABORT_UNLESS(SchemeShard->SubDomains.contains(pathId));

ui32 statusValue = rowset.GetValue<Schema::WaitingDataErasureTenants::Status>();
EStatus status = EStatus::COMPLETED;
if (statusValue >= static_cast<ui32>(EStatus::UNSPECIFIED) &&
statusValue <= static_cast<ui32>(EStatus::IN_PROGRESS_BSC)) {
status = static_cast<EStatus>(statusValue);
}

EDataErasureStatus status = rowset.GetValue<Schema::WaitingDataErasureTenants::Status>();
WaitingDataErasureTenants[pathId] = status;
if (status == EStatus::IN_PROGRESS) {
if (status == EDataErasureStatus::IN_PROGRESS) {
numberDataErasureTenantsInRunning++;
}

if (!rowset.Next()) {
return false;
}
}
if (Status == EStatus::IN_PROGRESS && (WaitingDataErasureTenants.empty() || numberDataErasureTenantsInRunning == 0)) {
Status = EStatus::IN_PROGRESS_BSC;
if (Status == EDataErasureStatus::IN_PROGRESS && (WaitingDataErasureTenants.empty() || numberDataErasureTenantsInRunning == 0)) {
Status = EDataErasureStatus::IN_PROGRESS_BSC;
}
}

Expand All @@ -475,10 +465,10 @@ bool TRootDataErasureManager::Remove(const TPathId& pathId) {
if (it != WaitingDataErasureTenants.end()) {
Queue->Remove(pathId);
ActivePipes.erase(pathId);
WaitingDataErasureTenants[pathId] = EStatus::COMPLETED;
WaitingDataErasureTenants[pathId] = EDataErasureStatus::COMPLETED;
bool isDataErasureCompleted = true;
for (const auto& [pathId, status] : WaitingDataErasureTenants) {
if (status == EStatus::IN_PROGRESS) {
if (status == EDataErasureStatus::IN_PROGRESS) {
isDataErasureCompleted = false;
break;
}
Expand Down Expand Up @@ -529,9 +519,9 @@ struct TSchemeShard::TTxDataErasureManagerInit : public TSchemeShard::TRwTxBase
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxDataErasureManagerInit Execute at schemeshard: " << Self->TabletID());
NIceDb::TNiceDb db(txc.DB);
Self->DataErasureManager->SetStatus(TDataErasureManager::EStatus::COMPLETED);
Self->DataErasureManager->SetStatus(EDataErasureStatus::COMPLETED);
db.Table<Schema::DataErasureGenerations>().Key(0).Update<Schema::DataErasureGenerations::Status,
Schema::DataErasureGenerations::StartTime>(static_cast<ui32>(Self->DataErasureManager->GetStatus()), AppData(ctx)->TimeProvider->Now().MicroSeconds());
Schema::DataErasureGenerations::StartTime>(Self->DataErasureManager->GetStatus(), AppData(ctx)->TimeProvider->Now().MicroSeconds());
}

void DoComplete(const TActorContext& ctx) override {
Expand Down Expand Up @@ -568,7 +558,7 @@ struct TSchemeShard::TTxRunDataErasure : public TSchemeShard::TRwTxBase {
dataErasureManager->IncGeneration();
dataErasureManager->Run(db);
}
if (Self->DataErasureManager->GetStatus() == TDataErasureManager::EStatus::IN_PROGRESS_BSC) {
if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS_BSC) {
NeedSendRequestToBSC = true;
}
}
Expand Down Expand Up @@ -617,7 +607,7 @@ struct TSchemeShard::TTxCompleteDataErasureTenant : public TSchemeShard::TRwTxBa
record.GetPathId().GetOwnerId(),
record.GetPathId().GetLocalId());
manager->OnDone(pathId, db);
if (manager->GetStatus() == TDataErasureManager::EStatus::IN_PROGRESS_BSC) {
if (manager->GetStatus() == EDataErasureStatus::IN_PROGRESS_BSC) {
NeedSendRequestToBSC = true;
}
}
Expand Down Expand Up @@ -663,7 +653,7 @@ struct TSchemeShard::TTxCompleteDataErasureBSC : public TSchemeShard::TRwTxBase
if (record.GetCompleted()) {
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Data shred in BSC is completed");
manager->Complete();
db.Table<Schema::DataErasureGenerations>().Key(Self->DataErasureManager->GetGeneration()).Update<Schema::DataErasureGenerations::Status>(static_cast<ui32>(Self->DataErasureManager->GetStatus()));
db.Table<Schema::DataErasureGenerations>().Key(Self->DataErasureManager->GetGeneration()).Update<Schema::DataErasureGenerations::Status>(Self->DataErasureManager->GetStatus());
} else {
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Progress data shred in BSC " << record.GetProgress10k());
NeedScheduleRequestToBSC = true;
Expand Down
Loading
Loading