Skip to content

Commit 7173846

Browse files
authored
Remove change senders upon DROP TABLE KIKIMR-20678 (#901)
1 parent d95e3ab commit 7173846

File tree

4 files changed

+99
-5
lines changed

4 files changed

+99
-5
lines changed

ydb/core/tx/datashard/change_sender_async_index.cpp

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,14 @@ class TAsyncIndexChangeSenderMain
537537
return;
538538
}
539539

540+
if (entry.Self && entry.Self->Info.GetPathState() == NKikimrSchemeOp::EPathStateDrop) {
541+
LOG_D("Index is planned to drop, waiting for the EvRemoveSender command");
542+
543+
RemoveRecords();
544+
KillSenders();
545+
return Become(&TThis::StatePendingRemove);
546+
}
547+
540548
Y_ABORT_UNLESS(entry.ListNodeEntry->Children.size() == 1);
541549
const auto& indexTable = entry.ListNodeEntry->Children.at(0);
542550

@@ -559,7 +567,7 @@ class TAsyncIndexChangeSenderMain
559567
STATEFN(StateResolveIndexTable) {
560568
switch (ev->GetTypeRewrite()) {
561569
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleIndexTable);
562-
sFunc(TEvents::TEvWakeup, ResolveIndexTable);
570+
sFunc(TEvents::TEvWakeup, ResolveIndex);
563571
default:
564572
return StateBase(ev);
565573
}
@@ -638,7 +646,7 @@ class TAsyncIndexChangeSenderMain
638646
STATEFN(StateResolveKeys) {
639647
switch (ev->GetTypeRewrite()) {
640648
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleKeys);
641-
sFunc(TEvents::TEvWakeup, ResolveIndexTable);
649+
sFunc(TEvents::TEvWakeup, ResolveIndex);
642650
default:
643651
return StateBase(ev);
644652
}
@@ -690,7 +698,7 @@ class TAsyncIndexChangeSenderMain
690698
}
691699

692700
void Resolve() override {
693-
ResolveIndexTable();
701+
ResolveIndex();
694702
}
695703

696704
bool IsResolved() const override {
@@ -758,6 +766,11 @@ class TAsyncIndexChangeSenderMain
758766
PassAway();
759767
}
760768

769+
void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
770+
LOG_D("Handle " << ev->Get()->ToString());
771+
RemoveRecords(std::move(ev->Get()->Records));
772+
}
773+
761774
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
762775
RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx);
763776
}
@@ -797,6 +810,15 @@ class TAsyncIndexChangeSenderMain
797810
}
798811
}
799812

813+
STFUNC(StatePendingRemove) {
814+
switch (ev->GetTypeRewrite()) {
815+
hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove);
816+
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
817+
HFunc(NMon::TEvRemoteHttpInfo, Handle);
818+
sFunc(TEvents::TEvPoison, PassAway);
819+
}
820+
}
821+
800822
private:
801823
const TTableId UserTableId;
802824
mutable TMaybe<TString> LogPrefix;

ydb/core/tx/datashard/datashard_change_sending.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
334334
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
335335
}
336336
}
337+
338+
Self->CheckStateChange(ctx);
337339
}
338340

339341
private:

ydb/core/tx/datashard/drop_table_unit.cpp

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class TDropTableUnit : public TExecutionUnit {
1919
const TActorContext &ctx) override;
2020

2121
private:
22+
TVector<THolder<TEvChangeExchange::TEvRemoveSender>> RemoveSenders;
2223
};
2324

2425
TDropTableUnit::TDropTableUnit(TDataShard &dataShard,
@@ -75,6 +76,20 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
7576
Y_ABORT_UNLESS(DataShard.GetPathOwnerId() == schemeTx.GetDropTable().GetPathId().GetOwnerId());
7677
tableId = schemeTx.GetDropTable().GetPathId().GetLocalId();
7778
}
79+
80+
auto it = DataShard.GetUserTables().find(tableId);
81+
Y_ABORT_UNLESS(it != DataShard.GetUserTables().end());
82+
{
83+
for (const auto& [indexPathId, indexInfo] : it->second->Indexes) {
84+
if (indexInfo.Type == TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) {
85+
RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(indexPathId));
86+
}
87+
}
88+
for (const auto& [streamPathId, _] : it->second->CdcStreams) {
89+
RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(streamPathId));
90+
}
91+
}
92+
7893
DataShard.DropUserTable(txc, tableId);
7994

8095
// FIXME: transactions need to specify ownerId
@@ -96,12 +111,15 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
96111
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
97112
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
98113

99-
return EExecutionStatus::ExecutedNoMoreRestarts;
114+
return EExecutionStatus::DelayCompleteNoMoreRestarts;
100115
}
101116

102117
void TDropTableUnit::Complete(TOperation::TPtr,
103-
const TActorContext &)
118+
const TActorContext &ctx)
104119
{
120+
for (auto& ev : RemoveSenders) {
121+
ctx.Send(DataShard.GetChangeSender(), ev.Release());
122+
}
105123
}
106124

107125
THolder<TExecutionUnit> CreateDropTableUnit(TDataShard &dataShard,

ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <ydb/core/base/path.h>
22
#include <ydb/core/scheme/scheme_tablecell.h>
3+
#include <ydb/core/tx/datashard/change_exchange.h>
34
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
45
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
56
#include <ydb/core/testlib/tablet_helpers.h>
@@ -360,4 +361,55 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) {
360361
t.TestEnv->TestWaitNotification(runtime, t.TxId);
361362
});
362363
}
364+
365+
Y_UNIT_TEST_WITH_REBOOTS(DropTableWithInflightChanges) {
366+
T t;
367+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
368+
auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
369+
return TTestActorRuntime::DefaultObserverFunc(ev);
370+
});
371+
372+
TVector<THolder<IEventHandle>> enqueued;
373+
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
374+
if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) {
375+
enqueued.emplace_back(ev.Release());
376+
return TTestActorRuntime::EEventAction::DROP;
377+
}
378+
return origObserver(ev);
379+
});
380+
381+
{
382+
TInactiveZone inactive(activeZone);
383+
TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"(
384+
TableDescription {
385+
Name: "Table"
386+
Columns { Name: "key" Type: "Uint32" }
387+
Columns { Name: "indexed" Type: "Uint32" }
388+
KeyColumnNames: ["key"]
389+
}
390+
IndexDescription {
391+
Name: "UserDefinedIndex"
392+
KeyColumnNames: ["indexed"]
393+
Type: EIndexTypeGlobalAsync
394+
}
395+
)");
396+
t.TestEnv->TestWaitNotification(runtime, t.TxId);
397+
398+
Prepare(runtime, "/MyRoot/Table", {1, 10, 100}, true);
399+
}
400+
401+
TestDropTable(runtime, ++t.TxId, "/MyRoot", "Table");
402+
403+
runtime.SetObserverFunc(origObserver);
404+
for (auto& ev : std::exchange(enqueued, {})) {
405+
runtime.Send(ev.Release(), 0, true);
406+
}
407+
408+
t.TestEnv->TestWaitNotification(runtime, t.TxId);
409+
t.TestEnv->TestWaitTabletDeletion(runtime, {
410+
TTestTxConfig::FakeHiveTablets,
411+
TTestTxConfig::FakeHiveTablets + 1,
412+
});
413+
});
414+
}
363415
}

0 commit comments

Comments
 (0)