Skip to content

DataShard and SchemeShard: handle borrowed parts in data erasure #15451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2328,8 +2328,10 @@ message TEvForceDataCleanup {
// Intermediate requests and corresponding TEvForceDataCleanupResult's may be skipped.
message TEvForceDataCleanupResult {
enum EStatus {
OK = 0;
FAILED = 1;
UNKNOWN = 0;
OK = 1;
WRONG_SHARD_STATE = 2;
BORROWED = 3;
};
optional uint64 DataCleanupGeneration = 1; // from corresponding request (or greater)
optional uint64 TabletId = 2;
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/testlib/storage_helpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "storage_helpers.h"

#include <ydb/core/blobstorage/dsproxy/mock/model.h>

namespace NKikimr {
int CountBlobsWithSubstring(ui64 tabletId, const TVector<TIntrusivePtr<NFake::TProxyDS>>& proxyDSs, const TString& substring) {
int res = 0;
for (const auto& proxyDS : proxyDSs) {
for (const auto& [id, blob] : proxyDS->AllMyBlobs()) {
if (id.TabletID() == tabletId && !blob.DoNotKeep && blob.Buffer.ConvertToString().Contains(substring)) {
++res;
}
}
}
return res;
}

bool BlobStorageContains(const TVector<TIntrusivePtr<NFake::TProxyDS>>& proxyDSs, const TString& value) {
for (const auto& proxyDS : proxyDSs) {
for (const auto& [id, blob] : proxyDS->AllMyBlobs()) {
if (!blob.DoNotKeep && blob.Buffer.ConvertToString().Contains(value)) {
return true;
}
}
}
return false;
}
} // namespace NKikimr
8 changes: 8 additions & 0 deletions ydb/core/testlib/storage_helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

#include <ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.h>

namespace NKikimr {
int CountBlobsWithSubstring(ui64 tabletId, const TVector<TIntrusivePtr<NFake::TProxyDS>>& proxyDSs, const TString& substring);
bool BlobStorageContains(const TVector<TIntrusivePtr<NFake::TProxyDS>>& proxyDSs, const TString& value);
} // namespace NKikimr
5 changes: 2 additions & 3 deletions ydb/core/testlib/test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ namespace Tests {
using TControls = NKikimrConfig::TImmediateControlsConfig;
using TLoggerInitializer = std::function<void (TTestActorRuntime&)>;
using TStoragePoolKinds = TDomainsInfo::TDomain::TStoragePoolKinds;
using TProxyDSPtr = TIntrusivePtr<NFake::TProxyDS>;

ui16 Port;
ui16 GrpcPort = 0;
Expand Down Expand Up @@ -171,7 +170,7 @@ namespace Tests {
TString ServerCertFilePath;
bool Verbose = true;
bool UseSectorMap = false;
TVector<TProxyDSPtr> ProxyDSMocks;
TVector<TIntrusivePtr<NFake::TProxyDS>> ProxyDSMocks;

std::function<IActor*(const TTicketParserSettings&)> CreateTicketParser = NKikimr::CreateTicketParser;
std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory;
Expand Down Expand Up @@ -263,7 +262,7 @@ namespace Tests {
return *this;
}

TServerSettings& SetProxyDSMocks(const TVector<TProxyDSPtr>& proxyDSMocks) {
TServerSettings& SetProxyDSMocks(const TVector<TIntrusivePtr<NFake::TProxyDS>>& proxyDSMocks) {
ProxyDSMocks = proxyDSMocks;
return *this;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SRCS(
fake_scheme_shard.h
minikql_compile.h
mock_pq_metacache.h
storage_helpers.cpp
tablet_flat_dummy.cpp
tablet_helpers.cpp
tablet_helpers.h
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/tx/datashard/datashard__data_cleanup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,19 @@ class TDataShard::TTxDataCleanup : public NTabletFlatExecutor::TTransactionBase<
Response = std::make_unique<TEvDataShard::TEvForceDataCleanupResult>(
record.GetDataCleanupGeneration(),
Self->TabletID(),
NKikimrTxDataShard::TEvForceDataCleanupResult::FAILED);
NKikimrTxDataShard::TEvForceDataCleanupResult::WRONG_SHARD_STATE);
return true;
}

if (Self->Executor()->HasLoanedParts()) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
"DataCleanup of tablet# " << Self->TabletID()
<< ": has borrowed parts"
<< ", requested from " << Ev->Sender);
Response = std::make_unique<TEvDataShard::TEvForceDataCleanupResult>(
record.GetDataCleanupGeneration(),
Self->TabletID(),
NKikimrTxDataShard::TEvForceDataCleanupResult::BORROWED);
return true;
}

Expand Down
74 changes: 49 additions & 25 deletions ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include <ydb/core/testlib/storage_helpers.h>

namespace NKikimr {

Expand All @@ -16,31 +17,8 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
static const TString PresentShortValue3("_Some_value_3_");
static const TString DeletedLongValue4(size_t(100 * 1024), 't');

int CountBlobsWithSubstring(ui64 tabletId, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& substring) {
int res = 0;
for (const auto& proxyDS : proxyDSs) {
for (const auto& [id, blob] : proxyDS->AllMyBlobs()) {
if (id.TabletID() == tabletId && !blob.DoNotKeep && blob.Buffer.ConvertToString().Contains(substring)) {
++res;
}
}
}
return res;
}

bool BlobStorageContains(const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& value) {
for (const auto& proxyDS : proxyDSs) {
for (const auto& [id, blob] : proxyDS->AllMyBlobs()) {
if (!blob.DoNotKeep && blob.Buffer.ConvertToString().Contains(value)) {
return true;
}
}
}
return false;
}

auto SetupWithTable(bool withCompaction) {
TVector<TServerSettings::TProxyDSPtr> proxyDSs {
TVector<TIntrusivePtr<NFake::TProxyDS>> proxyDSs {
MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(0)),
MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(2181038080)),
};
Expand Down Expand Up @@ -113,7 +91,7 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
UNIT_ASSERT_VALUES_EQUAL(ev.Record.GetDataCleanupGeneration(), generation);
}

void CheckTableData(Tests::TServer::TPtr server, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& table) {
void CheckTableData(Tests::TServer::TPtr server, const TVector<TIntrusivePtr<NFake::TProxyDS>>& proxyDSs, const TString& table) {
auto result = ReadShardedTable(server, table);
UNIT_ASSERT_VALUES_EQUAL(result,
"key = 2, subkey = " + PresentSubkey2 + ", value = " + PresentLongValue2 + "\n"
Expand Down Expand Up @@ -329,6 +307,52 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
CheckTableData(server, proxyDSs, "/Root/table-1");
CheckTableData(server, proxyDSs, "/Root/table-2");
}

Y_UNIT_TEST(BorrowerDataCleanedAfterCopyTable) {
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();

auto txIdCopy = AsyncCreateCopyTable(server, sender, "/Root", "table-2", "/Root/table-1");
WaitTxNotification(server, sender, txIdCopy);
auto table2Shards = GetTableShards(server, sender, "/Root/table-2");
auto table2Id = ResolveTableId(server, sender, "/Root/table-2");

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
ExecSQL(server, sender, "DELETE FROM `/Root/table-2` WHERE key IN (1, 4);");
SimulateSleep(runtime, TDuration::Seconds(2));

ui64 dataCleanupGeneration = 24;
{
// cleanup for the first table should be failed due to borrowed parts
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(dataCleanupGeneration);
runtime.SendToPipe(tableShards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries());

auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
UNIT_ASSERT_EQUAL(ev->Get()->Record.GetStatus(), NKikimrTxDataShard::TEvForceDataCleanupResult::BORROWED);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetTabletId(), tableShards.at(0));
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetDataCleanupGeneration(), dataCleanupGeneration);
}
{
// cleanup for the second table
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(dataCleanupGeneration);
runtime.SendToPipe(table2Shards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries());

auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
CheckResultEvent(*ev->Get(), table2Shards.at(0), dataCleanupGeneration);
}
{
// next cleanup for the first table should succeed after compaction of the second table
++dataCleanupGeneration;
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(dataCleanupGeneration);
runtime.SendToPipe(tableShards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries());

auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
CheckResultEvent(*ev->Get(), tableShards.at(0), dataCleanupGeneration);
}

CheckTableData(server, proxyDSs, "/Root/table-1");
CheckTableData(server, proxyDSs, "/Root/table-2");
}
}

} // namespace NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h>

namespace NKikimr {
namespace NSchemeShard {
Expand Down Expand Up @@ -179,6 +180,9 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase {
"Close pipe to deleted shardIdx " << ShardIdx << " tabletId " << TabletId);
Self->PipeClientCache->ForceClose(ctx, ui64(TabletId));
}
if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
Self->Execute(Self->CreateTxCancelDataErasureShards({ShardIdx}));
}
Comment on lines +183 to +185
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему потребовалось переносить запуск CancelDataErasureShards сюда?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В прошлом месте таблетка ещё не была удалена, и очистка могла успешно завершится до удаления этой таблетки, что плохо и нарушает гарантии удаления. Сюда же попадаем уже после того, как хайв ответил, что таблетка удалена.
(Да, ещё остаётся проблема, что хайв на самом деле отвечает до того, как удалил данные в блобсторадже, но это собираемся отдельно доделывать).

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void TRootDataErasureManager::ScheduleDataErasureWakeup() {
IsDataErasureWakeupScheduled = true;

LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"[RootDataErasureManager] ScheduleDataErasureWakeup: Interval# " << CurrentWakeupInterval);
"[RootDataErasureManager] ScheduleDataErasureWakeup: Interval# " << CurrentWakeupInterval << ", Timestamp# " << AppData(ctx)->TimeProvider->Now());
}

void TRootDataErasureManager::WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,14 @@ struct TSchemeShard::TTxCompleteDataErasureShard : public TSchemeShard::TRwTxBas
"TTxCompleteDataErasureShard Execute at schemestard: " << Self->TabletID());
const auto& record = Ev->Get()->Record;

if (record.GetStatus() != NKikimrTxDataShard::TEvForceDataCleanupResult::OK) {
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxCompleteDataErasureShard: data erasure failed at DataShard #" << record.GetTabletId()
<< " with status: " << NKikimrTxDataShard::TEvForceDataCleanupResult::EStatus_Name(record.GetStatus())
<< ", schemestard: " << Self->TabletID());
return; // will be retried after timout in the queue in TTenantDataErasureManager::OnTimeout()
}

auto& manager = Self->DataErasureManager;
const ui64 cleanupGeneration = record.GetDataCleanupGeneration();
if (cleanupGeneration != manager->GetGeneration()) {
Expand Down
13 changes: 4 additions & 9 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7106,32 +7106,27 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T
newPartitioningSet.reserve(newPartitioning.size());
const auto& oldPartitioning = tableInfo->GetPartitions();

std::vector<TShardIdx> dataErasureShards;
for (const auto& p: newPartitioning) {
if (!oldPartitioning.empty())
newPartitioningSet.insert(p.ShardIdx);

const auto& partitionStats = tableInfo->GetStats().PartitionStats;
auto it = partitionStats.find(p.ShardIdx);
std::vector<TShardIdx> dataErasureShards;
if (it != partitionStats.end()) {
EnqueueBackgroundCompaction(p.ShardIdx, it->second);
UpdateShardMetrics(p.ShardIdx, it->second);
dataErasureShards.push_back(p.ShardIdx);
}
if (DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
Execute(CreateTxAddEntryToDataErasure(dataErasureShards), this->ActorContext());
}
}
if (DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
Execute(CreateTxAddEntryToDataErasure(dataErasureShards), this->ActorContext());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А насколько это вообще безопасно тут делать? Ведь SetPartitioning вызывается из операции split/merge в транзакции, а здесь шедулится какая-то другая транзакция. И завершение split/merge может успешно закоммититься, а до этой транзакции даже очередь не дойдёт. В итоге шарды окажутся просто потерянными? Ну и ещё смущает, что SetPartitioning вызывается в рамках загрузки schemeshard'а, вообще тут кажется никогда не предполагалось какой-то такой сложной логики/действий.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, главное, что смущает -- что SetPartitioning() перегружается несвойственными ей делами. Я уже предлагал посмотреть, как можно переделать.

От вызова SetPartitioning() во время TxInit защищаться и не надо -- это ключевая вещь для продолжения процесса data erasure, если он уже работал до рестарта. Хотя конечно не хватает комментариев c описанием зависимостей в порядке загрузки состояния DataErasureManager и шардов таблиц.

до этой транзакции даже очередь не дойдёт

Это если schemeshard перезапустится?
Тогда статус data erasure будет IN_PROGRESS и как раз SetPartitioning() во время TxInit отработают и обновят актуальный список шардов для чистки. Логически верно, но кажется напряжно. Было бы лучше на рестарте выполнять один проход по общему списку шардов вместо отдельной транзакции на каждую таблицу.

}

std::vector<TShardIdx> cancelDataErasureShards;
for (const auto& p: oldPartitioning) {
if (!newPartitioningSet.contains(p.ShardIdx)) {
// note that queues might not contain the shard
OnShardRemoved(p.ShardIdx);
cancelDataErasureShards.push_back(p.ShardIdx);
}
if (DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) {
Execute(CreateTxCancelDataErasureShards(cancelDataErasureShards), this->ActorContext());
}
}
}
Expand Down
Loading
Loading