Skip to content

DataShard: clean readsets in DataCleanup #15438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard__data_cleanup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class TDataShard::TTxDataCleanup : public NTabletFlatExecutor::TTransactionBase<
"DataCleanup of tablet# " << Self->TabletID()
<< ": expired snapshots removed");
}
Self->OutReadSets.Cleanup(db, ctx);

Self->Executor()->CleanupData(Ev->Get()->Record.GetDataCleanupGeneration());
Self->DataCleanupWaiters.insert({Ev->Get()->Record.GetDataCleanupGeneration(), Ev->Sender});
return true;
Expand Down
206 changes: 163 additions & 43 deletions ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,27 @@ using namespace Tests;

Y_UNIT_TEST_SUITE(DataCleanup) {

static const TString DeletedShortValue("Some_value");
static const TString DeletedLongValue(size_t(100 * 1024), 't');
static const TString PresentShortValue("Some_other_value");
static const TString PresentLongValue(size_t(100 * 1024), 'r');
static const TString DeletedSubkey1("Subkey1");
static const TString PresentSubkey2("Subkey2");
static const TString PresentSubkey3("Subkey3");
static const TString DeletedSubkey4("Subkey4");

static const TString DeletedShortValue1("_Some_value_1_");
static const TString PresentLongValue2(size_t(100 * 1024), 'r');
static const TString PresentShortValue3("_Some_value_3_");
static const TString DeletedLongValue4(size_t(100 * 1024), 't');

int CountBlobsWithSubstring(ui64 tabletId, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& substring) {
int res = 0;
for (const auto& proxyDS : proxyDSs) {
for (const auto& [id, blob] : proxyDS->AllMyBlobs()) {
if (id.TabletID() == tabletId && !blob.DoNotKeep && blob.Buffer.ConvertToString().Contains(substring)) {
++res;
}
}
}
return res;
}

bool BlobStorageContains(const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& value) {
for (const auto& proxyDS : proxyDSs) {
Expand All @@ -22,7 +39,7 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
return false;
}

auto SetupWithTable() {
auto SetupWithTable(bool withCompaction) {
TVector<TServerSettings::TProxyDSPtr> proxyDSs {
MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(0)),
MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(2181038080)),
Expand All @@ -42,39 +59,50 @@ Y_UNIT_TEST_SUITE(DataCleanup) {

auto opts = TShardedTableOptions()
.Columns({
{"key", "Uint32", true, false},
{"value", "Utf8", false, false}
{"key", "Uint32", true, false},
{"subkey", "String", true, false},
{"value", "Utf8", false, false}
});
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);

UploadRows(runtime, "/Root/table-1",
{{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(1))}, {TCell(DeletedShortValue)}
{{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(1)), TCell(DeletedSubkey1)}, {TCell(DeletedShortValue1)}
);
UploadRows(runtime, "/Root/table-1",
{{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(2))}, {TCell(PresentLongValue)}
{{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(2)), TCell(PresentSubkey2)}, {TCell(PresentLongValue2)}
);
UploadRows(runtime, "/Root/table-1",
{{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(3))}, {TCell(PresentShortValue)}
{{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(3)), TCell(PresentSubkey3)}, {TCell(PresentShortValue3)}
);
UploadRows(runtime, "/Root/table-1",
{{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(4))}, {TCell(DeletedLongValue)}
{{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
{TCell::Make(ui32(4)), TCell(DeletedSubkey4)}, {TCell(DeletedLongValue4)}
);

auto compactionResult = CompactTable(runtime, shards.at(0), tableId, true);
UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::OK);

UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedShortValue));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue));
UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedLongValue));

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");

SimulateSleep(runtime, TDuration::Seconds(2));
UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedSubkey1));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey2));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey3));
UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedSubkey4));

// short values inlined in log
UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedShortValue1));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue3));

if (withCompaction) {
auto compactionResult = CompactTable(runtime, shards.at(0), tableId, true);
UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::OK);

// uncompressed long values should be present only after compaction
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue2));
UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedLongValue4));
} else {
// before compaction long values persisted in log only in compressed format
UNIT_ASSERT(!BlobStorageContains(proxyDSs, PresentLongValue2));
UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue4));
}

return std::make_tuple(server, sender, shards, proxyDSs);
}
Expand All @@ -85,22 +113,31 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
UNIT_ASSERT_VALUES_EQUAL(ev.Record.GetDataCleanupGeneration(), generation);
}

void CheckTableData(Tests::TServer::TPtr server, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs) {
auto result = ReadShardedTable(server, "/Root/table-1");
UNIT_ASSERT_EQUAL(result,
"key = 2, value = " + PresentLongValue + "\n"
"key = 3, value = " + PresentShortValue + "\n"
void CheckTableData(Tests::TServer::TPtr server, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& table) {
auto result = ReadShardedTable(server, table);
UNIT_ASSERT_VALUES_EQUAL(result,
"key = 2, subkey = " + PresentSubkey2 + ", value = " + PresentLongValue2 + "\n"
"key = 3, subkey = " + PresentSubkey3 + ", value = " + PresentShortValue3 + "\n"
);
UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedShortValue));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue));
UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue));

UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedSubkey1));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey2));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey3));
UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedSubkey4));

UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedShortValue1));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue2));
UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue3));
UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue4));
}

Y_UNIT_TEST(ForceDataCleanup) {
auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
SimulateSleep(runtime, TDuration::Seconds(2));

auto cleanupAndCheck = [&runtime, &sender, &tableShards](ui64 expectedDataCleanupGeneration) {
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedDataCleanupGeneration);

Expand All @@ -114,13 +151,38 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
cleanupAndCheck(24);
cleanupAndCheck(25);

CheckTableData(server, proxyDSs);
CheckTableData(server, proxyDSs, "/Root/table-1");
}


Y_UNIT_TEST(ForceDataCleanupWithoutCompaction) {
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(false);
auto& runtime = *server->GetRuntime();

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
SimulateSleep(runtime, TDuration::Seconds(2));

auto cleanupAndCheck = [&runtime, &sender, &tableShards](ui64 expectedDataCleanupGeneration) {
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedDataCleanupGeneration);

runtime.SendToPipe(tableShards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries());

auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
CheckResultEvent(*ev->Get(), tableShards.at(0), expectedDataCleanupGeneration);
};

cleanupAndCheck(24);

CheckTableData(server, proxyDSs, "/Root/table-1");
}

Y_UNIT_TEST(MultipleDataCleanups) {
auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
SimulateSleep(runtime, TDuration::Seconds(2));

ui64 expectedGenFirst = 42;
ui64 expectedGenLast = 43;
auto request1 = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedGenFirst);
Expand All @@ -139,13 +201,16 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
CheckResultEvent(*ev->Get(), tableShards.at(0), expectedGenLast);
}

CheckTableData(server, proxyDSs);
CheckTableData(server, proxyDSs, "/Root/table-1");
}

Y_UNIT_TEST(MultipleDataCleanupsWithOldGenerations) {
auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
SimulateSleep(runtime, TDuration::Seconds(2));

ui64 expectedGenFirst = 42;
ui64 expectedGenOld = 10;
auto request1 = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedGenFirst);
Expand All @@ -164,13 +229,16 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
CheckResultEvent(*ev->Get(), tableShards.at(0), expectedGenFirst);
}

CheckTableData(server, proxyDSs);
CheckTableData(server, proxyDSs, "/Root/table-1");
}

Y_UNIT_TEST(ForceDataCleanupWithRestart) {
auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
SimulateSleep(runtime, TDuration::Seconds(2));

ui64 cleanupGeneration = 33;
ui64 oldGeneration = 10;
ui64 olderGeneration = 5;
Expand Down Expand Up @@ -207,7 +275,59 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
CheckResultEvent(*ev->Get(), tableShards.at(0), cleanupGeneration);
}

CheckTableData(server, proxyDSs);
CheckTableData(server, proxyDSs, "/Root/table-1");
}

Y_UNIT_TEST(OutReadSetsCleanedAfterCopyTable) {
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();

UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 3); // in log + after compaction: part switch in log and sst

size_t readSetsWithDeletedSubkey1 = 0;
auto prevObserver = runtime.SetObserverFunc([&readSetsWithDeletedSubkey1](TAutoPtr<IEventHandle> &ev) {
switch (ev->GetTypeRewrite()) {
case TEvTxProcessing::TEvReadSet::EventType: {
auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
if (msg->Record.SerializeAsString().Contains(DeletedSubkey1)) {
++readSetsWithDeletedSubkey1;
}
break;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
});

auto txIdCopy = AsyncCreateCopyTable(server, sender, "/Root", "table-2", "/Root/table-1");
WaitTxNotification(server, sender, txIdCopy);
auto table2Shards = GetTableShards(server, sender, "/Root/table-2");
auto table2Id = ResolveTableId(server, sender, "/Root/table-2");

UNIT_ASSERT_VALUES_EQUAL(readSetsWithDeletedSubkey1, 1);
UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 4); // + outreadset
UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(table2Shards.at(0), proxyDSs, DeletedSubkey1), 1); // in log

ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
ExecSQL(server, sender, "DELETE FROM `/Root/table-2` WHERE key IN (1, 4);");
SimulateSleep(runtime, TDuration::Seconds(2));

UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 5); // + deletion in log
UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(table2Shards.at(0), proxyDSs, DeletedSubkey1), 2); // + deletion in log

auto cleanupAndCheck = [&runtime, &sender](ui64 tabletId, ui64 expectedDataCleanupGeneration) {
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedDataCleanupGeneration);

runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries());

auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
CheckResultEvent(*ev->Get(), tabletId, expectedDataCleanupGeneration);
};

cleanupAndCheck(table2Shards.at(0), 24);
cleanupAndCheck(tableShards.at(0), 24);

CheckTableData(server, proxyDSs, "/Root/table-1");
CheckTableData(server, proxyDSs, "/Root/table-2");
}
}

Expand Down
Loading