Skip to content

Commit 1f88b11

Browse files
correct insert table cleaning (#6379)
1 parent 1d57390 commit 1f88b11

File tree

4 files changed

+48
-20
lines changed

4 files changed

+48
-20
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
99
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
1010
NIceDb::TNiceDb db(txc.DB);
1111

12+
Self->TryAbortWrites(db, dbTable, std::move(WriteIdsToAbort));
13+
1214
NOlap::TBlobManagerDb blobManagerDb(txc.DB);
1315
auto allAborted = Self->InsertTable->GetAborted();
1416
auto storage = Self->StoragesManager->GetInsertOperator();

ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
namespace NKikimr::NColumnShard {
66
class TTxInsertTableCleanup: public TTransactionBase<TColumnShard> {
77
private:
8+
THashSet<TWriteId> WriteIdsToAbort;
89
std::shared_ptr<NOlap::IBlobsDeclareRemovingAction> BlobsAction;
910
public:
10-
TTxInsertTableCleanup(TColumnShard* self)
11-
: TBase(self) {
12-
Y_ABORT_UNLESS(self->InsertTable->GetAborted().size());
11+
TTxInsertTableCleanup(TColumnShard* self, THashSet<TWriteId>&& writeIdsToAbort)
12+
: TBase(self)
13+
, WriteIdsToAbort(std::move(writeIdsToAbort)) {
14+
Y_ABORT_UNLESS(WriteIdsToAbort.size() || self->InsertTable->GetAborted().size());
1315
}
1416

1517
~TTxInsertTableCleanup() {

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -259,18 +259,38 @@ void TColumnShard::LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, con
259259
}
260260

261261
bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId, const ui64 txId) {
262-
auto* lw = LongTxWrites.FindPtr(writeId);
263-
AFL_VERIFY(lw)("write_id", (ui64)writeId)("tx_id", txId);
264-
const ui64 prepared = lw->PreparedTxId;
265-
AFL_VERIFY(!prepared || txId == prepared)("tx", txId)("prepared", prepared);
266-
Schema::EraseLongTxWrite(db, writeId);
267-
auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId];
268-
ltxParts.erase(lw->WritePartId);
269-
if (ltxParts.empty()) {
270-
AFL_VERIFY(LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId));
271-
}
272-
LongTxWrites.erase(writeId);
273-
return true;
262+
if (auto* lw = LongTxWrites.FindPtr(writeId)) {
263+
ui64 prepared = lw->PreparedTxId;
264+
if (!prepared || txId == prepared) {
265+
Schema::EraseLongTxWrite(db, writeId);
266+
auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId];
267+
ltxParts.erase(lw->WritePartId);
268+
if (ltxParts.empty()) {
269+
LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId);
270+
}
271+
LongTxWrites.erase(writeId);
272+
return true;
273+
}
274+
}
275+
return false;
276+
}
277+
278+
void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort) {
279+
std::vector<TWriteId> failedAborts;
280+
for (auto& writeId : writesToAbort) {
281+
if (!RemoveLongTxWrite(db, writeId, 0)) {
282+
failedAborts.push_back(writeId);
283+
}
284+
}
285+
if (failedAborts.size()) {
286+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
287+
}
288+
for (auto& writeId : failedAborts) {
289+
writesToAbort.erase(writeId);
290+
}
291+
if (!writesToAbort.empty()) {
292+
InsertTable->Abort(dbTable, writesToAbort);
293+
}
274294
}
275295

276296
void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc) {
@@ -455,7 +475,9 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
455475
// TODO: Allow to read old snapshots after DROP
456476
TBlobGroupSelector dsGroupSelector(Info());
457477
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
458-
InsertTable->DropPath(dbTable, pathId);
478+
THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);
479+
480+
TryAbortWrites(db, dbTable, std::move(writesToAbort));
459481
}
460482

461483
void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version,
@@ -827,21 +849,22 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co
827849
}
828850

829851
void TColumnShard::SetupCleanupInsertTable() {
852+
auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now());
853+
830854
if (BackgroundController.IsCleanupInsertTableActive()) {
831855
ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress");
832856
return;
833857
}
834858

835-
if (!InsertTable->GetAborted().size()) {
859+
if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
836860
return;
837861
}
838-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size());
862+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());
839863
BackgroundController.StartCleanupInsertTable();
840-
Execute(new TTxInsertTableCleanup(this), TActorContext::AsActorContext());
864+
Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
841865
}
842866

843867
void TColumnShard::Die(const TActorContext& ctx) {
844-
// TODO
845868
CleanupActors(ctx);
846869
NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe);
847870
UnregisterMediatorTimeCast();

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ class TColumnShard
535535

536536
void TryRegisterMediatorTimeCast();
537537
void UnregisterMediatorTimeCast();
538+
void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort);
538539

539540
bool WaitPlanStep(ui64 step);
540541
void SendWaitPlanStep(ui64 step);

0 commit comments

Comments
 (0)