diff --git a/ydb/core/tx/datashard/datashard__data_cleanup.cpp b/ydb/core/tx/datashard/datashard__data_cleanup.cpp index b1342c339098..9d261614db67 100644 --- a/ydb/core/tx/datashard/datashard__data_cleanup.cpp +++ b/ydb/core/tx/datashard/datashard__data_cleanup.cpp @@ -55,6 +55,8 @@ class TDataShard::TTxDataCleanup : public NTabletFlatExecutor::TTransactionBase< "DataCleanup of tablet# " << Self->TabletID() << ": expired snapshots removed"); } + Self->OutReadSets.Cleanup(db, ctx); + Self->Executor()->CleanupData(Ev->Get()->Record.GetDataCleanupGeneration()); Self->DataCleanupWaiters.insert({Ev->Get()->Record.GetDataCleanupGeneration(), Ev->Sender}); return true; diff --git a/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp b/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp index a464c8a3ef19..0a918aa010a5 100644 --- a/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp @@ -6,10 +6,27 @@ using namespace Tests; Y_UNIT_TEST_SUITE(DataCleanup) { - static const TString DeletedShortValue("Some_value"); - static const TString DeletedLongValue(size_t(100 * 1024), 't'); - static const TString PresentShortValue("Some_other_value"); - static const TString PresentLongValue(size_t(100 * 1024), 'r'); + static const TString DeletedSubkey1("Subkey1"); + static const TString PresentSubkey2("Subkey2"); + static const TString PresentSubkey3("Subkey3"); + static const TString DeletedSubkey4("Subkey4"); + + static const TString DeletedShortValue1("_Some_value_1_"); + static const TString PresentLongValue2(size_t(100 * 1024), 'r'); + static const TString PresentShortValue3("_Some_value_3_"); + static const TString DeletedLongValue4(size_t(100 * 1024), 't'); + + int CountBlobsWithSubstring(ui64 tabletId, const TVector& proxyDSs, const TString& substring) { + int res = 0; + for (const auto& proxyDS : proxyDSs) { + for (const auto& [id, blob] : proxyDS->AllMyBlobs()) { + if (id.TabletID() == tabletId && !blob.DoNotKeep && blob.Buffer.ConvertToString().Contains(substring)) { + ++res; + } + } + } + return res; + } bool BlobStorageContains(const TVector& proxyDSs, const TString& value) { for (const auto& proxyDS : proxyDSs) { @@ -22,7 +39,7 @@ Y_UNIT_TEST_SUITE(DataCleanup) { return false; } - auto SetupWithTable() { + auto SetupWithTable(bool withCompaction) { TVector proxyDSs { MakeIntrusive(TGroupId::FromValue(0)), MakeIntrusive(TGroupId::FromValue(2181038080)), @@ -42,39 +59,50 @@ Y_UNIT_TEST_SUITE(DataCleanup) { auto opts = TShardedTableOptions() .Columns({ - {"key", "Uint32", true, false}, - {"value", "Utf8", false, false} + {"key", "Uint32", true, false}, + {"subkey", "String", true, false}, + {"value", "Utf8", false, false} }); auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); UploadRows(runtime, "/Root/table-1", - {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}}, - {TCell::Make(ui32(1))}, {TCell(DeletedShortValue)} + {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}}, + {TCell::Make(ui32(1)), TCell(DeletedSubkey1)}, {TCell(DeletedShortValue1)} ); UploadRows(runtime, "/Root/table-1", - {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}}, - {TCell::Make(ui32(2))}, {TCell(PresentLongValue)} + {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}}, + {TCell::Make(ui32(2)), TCell(PresentSubkey2)}, {TCell(PresentLongValue2)} ); UploadRows(runtime, "/Root/table-1", - {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}}, - {TCell::Make(ui32(3))}, {TCell(PresentShortValue)} + {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}}, + {TCell::Make(ui32(3)), TCell(PresentSubkey3)}, {TCell(PresentShortValue3)} ); UploadRows(runtime, "/Root/table-1", - {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}}, - {TCell::Make(ui32(4))}, {TCell(DeletedLongValue)} + {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}}, + {TCell::Make(ui32(4)), TCell(DeletedSubkey4)}, {TCell(DeletedLongValue4)} ); - auto compactionResult = CompactTable(runtime, shards.at(0), tableId, true); - UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::OK); - - UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedShortValue)); - UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue)); - UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue)); - UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedLongValue)); - - ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);"); - - SimulateSleep(runtime, TDuration::Seconds(2)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedSubkey1)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey2)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey3)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedSubkey4)); + + // short values inlined in log + UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedShortValue1)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue3)); + + if (withCompaction) { + auto compactionResult = CompactTable(runtime, shards.at(0), tableId, true); + UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::OK); + + // uncompressed long values should be present only after compaction + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue2)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedLongValue4)); + } else { + // before compaction long values persisted in log only in compressed format + UNIT_ASSERT(!BlobStorageContains(proxyDSs, PresentLongValue2)); + UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue4)); + } return std::make_tuple(server, sender, shards, proxyDSs); } @@ -85,22 +113,31 @@ Y_UNIT_TEST_SUITE(DataCleanup) { UNIT_ASSERT_VALUES_EQUAL(ev.Record.GetDataCleanupGeneration(), generation); } - void CheckTableData(Tests::TServer::TPtr server, const TVector& proxyDSs) { - auto result = ReadShardedTable(server, "/Root/table-1"); - UNIT_ASSERT_EQUAL(result, - "key = 2, value = " + PresentLongValue + "\n" - "key = 3, value = " + PresentShortValue + "\n" + void CheckTableData(Tests::TServer::TPtr server, const TVector& proxyDSs, const TString& table) { + auto result = ReadShardedTable(server, table); + UNIT_ASSERT_VALUES_EQUAL(result, + "key = 2, subkey = " + PresentSubkey2 + ", value = " + PresentLongValue2 + "\n" + "key = 3, subkey = " + PresentSubkey3 + ", value = " + PresentShortValue3 + "\n" ); - UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedShortValue)); - UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue)); - UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue)); - UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue)); + + UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedSubkey1)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey2)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey3)); + UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedSubkey4)); + + UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedShortValue1)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue2)); + UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue3)); + UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue4)); } Y_UNIT_TEST(ForceDataCleanup) { - auto [server, sender, tableShards, proxyDSs] = SetupWithTable(); + auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true); auto& runtime = *server->GetRuntime(); + ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);"); + SimulateSleep(runtime, TDuration::Seconds(2)); + auto cleanupAndCheck = [&runtime, &sender, &tableShards](ui64 expectedDataCleanupGeneration) { auto request = MakeHolder(expectedDataCleanupGeneration); @@ -114,13 +151,38 @@ Y_UNIT_TEST_SUITE(DataCleanup) { cleanupAndCheck(24); cleanupAndCheck(25); - CheckTableData(server, proxyDSs); + CheckTableData(server, proxyDSs, "/Root/table-1"); + } + + + Y_UNIT_TEST(ForceDataCleanupWithoutCompaction) { + auto [server, sender, tableShards, proxyDSs] = SetupWithTable(false); + auto& runtime = *server->GetRuntime(); + + ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);"); + SimulateSleep(runtime, TDuration::Seconds(2)); + + auto cleanupAndCheck = [&runtime, &sender, &tableShards](ui64 expectedDataCleanupGeneration) { + auto request = MakeHolder(expectedDataCleanupGeneration); + + runtime.SendToPipe(tableShards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries()); + + auto ev = runtime.GrabEdgeEventRethrow(sender); + CheckResultEvent(*ev->Get(), tableShards.at(0), expectedDataCleanupGeneration); + }; + + cleanupAndCheck(24); + + CheckTableData(server, proxyDSs, "/Root/table-1"); } Y_UNIT_TEST(MultipleDataCleanups) { - auto [server, sender, tableShards, proxyDSs] = SetupWithTable(); + auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true); auto& runtime = *server->GetRuntime(); + ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);"); + SimulateSleep(runtime, TDuration::Seconds(2)); + ui64 expectedGenFirst = 42; ui64 expectedGenLast = 43; auto request1 = MakeHolder(expectedGenFirst); @@ -139,13 +201,16 @@ Y_UNIT_TEST_SUITE(DataCleanup) { CheckResultEvent(*ev->Get(), tableShards.at(0), expectedGenLast); } - CheckTableData(server, proxyDSs); + CheckTableData(server, proxyDSs, "/Root/table-1"); } Y_UNIT_TEST(MultipleDataCleanupsWithOldGenerations) { - auto [server, sender, tableShards, proxyDSs] = SetupWithTable(); + auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true); auto& runtime = *server->GetRuntime(); + ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);"); + SimulateSleep(runtime, TDuration::Seconds(2)); + ui64 expectedGenFirst = 42; ui64 expectedGenOld = 10; auto request1 = MakeHolder(expectedGenFirst); @@ -164,13 +229,16 @@ Y_UNIT_TEST_SUITE(DataCleanup) { CheckResultEvent(*ev->Get(), tableShards.at(0), expectedGenFirst); } - CheckTableData(server, proxyDSs); + CheckTableData(server, proxyDSs, "/Root/table-1"); } Y_UNIT_TEST(ForceDataCleanupWithRestart) { - auto [server, sender, tableShards, proxyDSs] = SetupWithTable(); + auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true); auto& runtime = *server->GetRuntime(); + ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);"); + SimulateSleep(runtime, TDuration::Seconds(2)); + ui64 cleanupGeneration = 33; ui64 oldGeneration = 10; ui64 olderGeneration = 5; @@ -207,7 +275,59 @@ Y_UNIT_TEST_SUITE(DataCleanup) { CheckResultEvent(*ev->Get(), tableShards.at(0), cleanupGeneration); } - CheckTableData(server, proxyDSs); + CheckTableData(server, proxyDSs, "/Root/table-1"); + } + + Y_UNIT_TEST(OutReadSetsCleanedAfterCopyTable) { + auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true); + auto& runtime = *server->GetRuntime(); + + UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 3); // in log + after compaction: part switch in log and sst + + size_t readSetsWithDeletedSubkey1 = 0; + auto prevObserver = runtime.SetObserverFunc([&readSetsWithDeletedSubkey1](TAutoPtr &ev) { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + auto* msg = ev->Get(); + if (msg->Record.SerializeAsString().Contains(DeletedSubkey1)) { + ++readSetsWithDeletedSubkey1; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto txIdCopy = AsyncCreateCopyTable(server, sender, "/Root", "table-2", "/Root/table-1"); + WaitTxNotification(server, sender, txIdCopy); + auto table2Shards = GetTableShards(server, sender, "/Root/table-2"); + auto table2Id = ResolveTableId(server, sender, "/Root/table-2"); + + UNIT_ASSERT_VALUES_EQUAL(readSetsWithDeletedSubkey1, 1); + UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 4); // + outreadset + UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(table2Shards.at(0), proxyDSs, DeletedSubkey1), 1); // in log + + ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);"); + ExecSQL(server, sender, "DELETE FROM `/Root/table-2` WHERE key IN (1, 4);"); + SimulateSleep(runtime, TDuration::Seconds(2)); + + UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 5); // + deletion in log + UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(table2Shards.at(0), proxyDSs, DeletedSubkey1), 2); // + deletion in log + + auto cleanupAndCheck = [&runtime, &sender](ui64 tabletId, ui64 expectedDataCleanupGeneration) { + auto request = MakeHolder(expectedDataCleanupGeneration); + + runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + auto ev = runtime.GrabEdgeEventRethrow(sender); + CheckResultEvent(*ev->Get(), tabletId, expectedDataCleanupGeneration); + }; + + cleanupAndCheck(table2Shards.at(0), 24); + cleanupAndCheck(tableShards.at(0), 24); + + CheckTableData(server, proxyDSs, "/Root/table-1"); + CheckTableData(server, proxyDSs, "/Root/table-2"); } }