Skip to content

Commit a71b5d4

Browse files
authored
Merge 6007823 into 1237379
2 parents 1237379 + 6007823 commit a71b5d4

File tree

6 files changed

+102
-7
lines changed

6 files changed

+102
-7
lines changed

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ void TExecutor::Active(const TActorContext &ctx) {
452452

453453
CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(MemTableMemoryConsumersCollection.Get(), Logger.Get(), Broker.Get(), this, loadedState->Comp,
454454
Sprintf("tablet-%" PRIu64, Owner->TabletID())));
455-
DataCleanupLogic = MakeHolder<TDataCleanupLogic>(static_cast<NActors::IActorOps*>(this), this, Owner, Logger.Get(), GcLogic.Get());
455+
DataCleanupLogic = MakeHolder<TDataCleanupLogic>(static_cast<NActors::IActorOps*>(this), this, Owner, Logger.Get(), GcLogic.Get(), BorrowLogic.Get());
456456
LogicRedo->InstallCounters(Counters.Get(), nullptr);
457457

458458
ResourceMetrics = MakeHolder<NMetrics::TResourceMetrics>(Owner->TabletID(), 0, Launcher);
@@ -3103,6 +3103,11 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
31033103
CheckCollectionBarrier(x);
31043104
InFlySnapCollectionBarriers.erase(snapCollectionIt);
31053105
}
3106+
3107+
DataCleanupLogic->OnLogCommited();
3108+
if (DataCleanupLogic->NeedLogSnaphot()) {
3109+
MakeLogSnapshot();
3110+
}
31063111
}
31073112
break;
31083113
case ECommit::Snap:
@@ -3663,7 +3668,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
36633668

36643669
Y_ABORT_UNLESS(InFlyCompactionGcBarriers.emplace(commit->Step, ops->Barrier).second);
36653670

3666-
DataCleanupLogic->OnCompleteCompaction(tableId, CompactionLogic->GetFinishedCompactionInfo(tableId));
3671+
DataCleanupLogic->OnCompleteCompaction(tableId, Generation(), commit->Step, CompactionLogic->GetFinishedCompactionInfo(tableId));
36673672

36683673
AttachLeaseCommit(commit.Get());
36693674
CommitManager->Commit(commit);

ydb/core/tablet_flat/flat_executor_borrowlogic.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,4 +602,17 @@ bool TExecutorBorrowLogic::HasLoanedParts() const {
602602
return false;
603603
}
604604

605+
bool TExecutorBorrowLogic::HasLoanedBlobsBefore(ui32 generation, ui32 step) const {
606+
for (const auto &xpair : BorrowedInfo) {
607+
if (xpair.second.BorrowInfo.FullBorrow) {
608+
for (const auto& blobId : xpair.second.BorrowInfo.Keep) {
609+
if (std::make_tuple(blobId.Generation(), blobId.Step()) < std::tie(generation, step)) {
610+
return true;
611+
}
612+
}
613+
}
614+
}
615+
return false;
616+
}
617+
605618
}}

ydb/core/tablet_flat/flat_executor_borrowlogic.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ class TExecutorBorrowLogic {
171171

172172
// i.e. parts we own, but loaned to others
173173
bool HasLoanedParts() const;
174+
175+
bool HasLoanedBlobsBefore(ui32 generation, ui32 step) const;
174176
};
175177

176178
}}

ydb/core/tablet_flat/flat_executor_data_cleanup_logic.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@
22

33
namespace NKikimr::NTabletFlatExecutor {
44

5-
TDataCleanupLogic::TDataCleanupLogic(IOps* ops, IExecutor* executor, ITablet* owner, NUtil::ILogger* logger, TExecutorGCLogic* gcLogic)
5+
TDataCleanupLogic::TDataCleanupLogic(
6+
IOps* ops,
7+
IExecutor* executor,
8+
ITablet* owner,
9+
NUtil::ILogger* logger,
10+
TExecutorGCLogic* gcLogic,
11+
TExecutorBorrowLogic* borrowLogic)
612
: Ops(ops)
713
, Executor(executor)
814
, Owner(owner)
915
, Logger(logger)
1016
, GcLogic(gcLogic)
17+
, BorrowLogic(borrowLogic)
1118
{}
1219

1320
bool TDataCleanupLogic::TryStartCleanup(ui64 dataCleanupGeneration, const TActorContext& ctx) {
@@ -66,6 +73,8 @@ void TDataCleanupLogic::WaitCompaction() {
6673

6774
void TDataCleanupLogic::OnCompleteCompaction(
6875
ui32 tableId,
76+
ui32 generation,
77+
ui32 step,
6978
const TFinishedCompactionInfo& finishedCompactionInfo)
7079
{
7180
if (State != EDataCleanupState::WaitCompaction) {
@@ -75,10 +84,16 @@ void TDataCleanupLogic::OnCompleteCompaction(
7584
if (auto it = CompactingTables.find(tableId); it != CompactingTables.end()) {
7685
if (finishedCompactionInfo.Edge >= it->second.CompactionId) {
7786
CompactingTables.erase(it);
87+
auto compactionTime = TGCTime(generation, step);
88+
LastCompactionTime = Max(LastCompactionTime, compactionTime);
7889
}
7990
}
8091
if (CompactingTables.empty()) {
81-
State = EDataCleanupState::PendingFirstSnapshot;
92+
if (BorrowLogic->HasLoanedBlobsBefore(LastCompactionTime.Generation, LastCompactionTime.Step)) {
93+
State = EDataCleanupState::WaitBorrowed;
94+
} else {
95+
State = EDataCleanupState::PendingFirstSnapshot;
96+
}
8297
}
8398
}
8499

@@ -110,6 +125,15 @@ void TDataCleanupLogic::OnMakeLogSnapshot(ui32 generation, ui32 step) {
110125
}
111126
}
112127

128+
void TDataCleanupLogic::OnLogCommited() {
129+
if (State != EDataCleanupState::WaitBorrowed) {
130+
return;
131+
}
132+
if (!BorrowLogic->HasLoanedBlobsBefore(LastCompactionTime.Generation, LastCompactionTime.Step)) {
133+
State = EDataCleanupState::PendingFirstSnapshot;
134+
}
135+
}
136+
113137
void TDataCleanupLogic::OnSnapshotCommited(ui32 generation, ui32 step) {
114138
switch (State) {
115139
case EDataCleanupState::WaitFirstSnapshot: {

ydb/core/tablet_flat/flat_executor_data_cleanup_logic.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#pragma once
22

3-
#include "tablet_flat_executor.h"
3+
#include "flat_executor_borrowlogic.h"
44
#include "flat_executor_gclogic.h"
5+
#include "tablet_flat_executor.h"
56
#include "util_fmt_logger.h"
67

78
namespace NKikimr::NTabletFlatExecutor {
@@ -11,6 +12,7 @@ class TDataCleanupLogic {
1112
Idle,
1213
PendingCompaction,
1314
WaitCompaction,
15+
WaitBorrowed,
1416
PendingFirstSnapshot,
1517
WaitFirstSnapshot,
1618
PendingSecondSnapshot,
@@ -31,14 +33,21 @@ class TDataCleanupLogic {
3133
using ITablet = NFlatExecutorSetup::ITablet;
3234
using ELnLev = NUtil::ELnLev;
3335

34-
TDataCleanupLogic(IOps* ops, IExecutor* executor, ITablet* owner, NUtil::ILogger* logger, TExecutorGCLogic* gcLogic);
36+
TDataCleanupLogic(
37+
IOps* ops,
38+
IExecutor* executor,
39+
ITablet* owner,
40+
NUtil::ILogger* logger,
41+
TExecutorGCLogic* gcLogic,
42+
TExecutorBorrowLogic* borrowLogic);
3543

3644
bool TryStartCleanup(ui64 dataCleanupGeneration, const TActorContext& ctx);
3745
void OnCompactionPrepared(ui32 tableId, ui64 compactionId);
3846
void WaitCompaction();
39-
void OnCompleteCompaction(ui32 tableId, const TFinishedCompactionInfo& finishedCompactionInfo);
47+
void OnCompleteCompaction(ui32 tableId, ui32 generation, ui32 step, const TFinishedCompactionInfo& finishedCompactionInfo);
4048
bool NeedLogSnaphot();
4149
void OnMakeLogSnapshot(ui32 generation, ui32 step);
50+
void OnLogCommited();
4251
void OnSnapshotCommited(ui32 generation, ui32 step);
4352
void OnCollectedGarbage(const TActorContext& ctx);
4453
void OnGcForStepAckResponse(ui32 generation, ui32 step, const TActorContext& ctx);
@@ -53,12 +62,14 @@ class TDataCleanupLogic {
5362
ITablet* Owner;
5463
NUtil::ILogger* const Logger;
5564
TExecutorGCLogic* const GcLogic;
65+
TExecutorBorrowLogic* const BorrowLogic;
5666

5767
ui64 CurrentDataCleanupGeneration = 0;
5868
ui64 NextDataCleanupGeneration = 0;
5969
EDataCleanupState State = EDataCleanupState::Idle;
6070
THashMap<ui32, TCleanupTableInfo> CompactingTables; // tracks statuses of compaction
6171

72+
TGCTime LastCompactionTime;
6273
// two subsequent are snapshots required to force GC
6374
TGCTime FirstLogSnaphotStep;
6475
TGCTime SecondLogSnaphotStep;

ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,46 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
329329
CheckTableData(server, proxyDSs, "/Root/table-1");
330330
CheckTableData(server, proxyDSs, "/Root/table-2");
331331
}
332+
333+
Y_UNIT_TEST(BorrowerDataCleanedAfterCopyTable) {
334+
auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
335+
auto& runtime = *server->GetRuntime();
336+
337+
auto txIdCopy = AsyncCreateCopyTable(server, sender, "/Root", "table-2", "/Root/table-1");
338+
WaitTxNotification(server, sender, txIdCopy);
339+
auto table2Shards = GetTableShards(server, sender, "/Root/table-2");
340+
auto table2Id = ResolveTableId(server, sender, "/Root/table-2");
341+
342+
ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
343+
ExecSQL(server, sender, "DELETE FROM `/Root/table-2` WHERE key IN (1, 4);");
344+
SimulateSleep(runtime, TDuration::Seconds(2));
345+
346+
ui64 dataCleanupGeneration = 24;
347+
{
348+
// cleanup for the first table should not be completed due to borrowed parts
349+
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(dataCleanupGeneration);
350+
runtime.SendToPipe(tableShards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries());
351+
352+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender, TDuration::Seconds(10));
353+
UNIT_ASSERT(!ev);
354+
}
355+
{
356+
// cleanup for the second table
357+
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(dataCleanupGeneration);
358+
runtime.SendToPipe(table2Shards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries());
359+
360+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
361+
CheckResultEvent(*ev->Get(), table2Shards.at(0), dataCleanupGeneration);
362+
}
363+
{
364+
// cleanup should be continued after compaction of the second table
365+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
366+
CheckResultEvent(*ev->Get(), tableShards.at(0), dataCleanupGeneration);
367+
}
368+
369+
CheckTableData(server, proxyDSs, "/Root/table-1");
370+
CheckTableData(server, proxyDSs, "/Root/table-2");
371+
}
332372
}
333373

334374
} // namespace NKikimr

0 commit comments

Comments
 (0)