Skip to content

Commit 596609e

Browse files
authored
Add immediate commit & rollback for topics in BufferWriteActor (#12669)
1 parent f217802 commit 596609e

File tree

2 files changed

+70
-20
lines changed

2 files changed

+70
-20
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

+30-20
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ namespace {
109109
*protoLocks->AddLocks() = lock;
110110
}
111111
}
112+
113+
void FillTopicsCommit(NKikimrPQ::TDataTransaction& transaction, const NKikimr::NKqp::IKqpTransactionManagerPtr& txManager) {
114+
transaction.SetOp(NKikimrPQ::TDataTransaction::Commit);
115+
const auto prepareSettings = txManager->GetPrepareTransactionInfo();
116+
117+
if (!prepareSettings.ArbiterColumnShard) {
118+
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
119+
transaction.AddSendingShards(sendingShardId);
120+
}
121+
for (const ui64 receivingShardId : prepareSettings.ReceivingShards) {
122+
transaction.AddReceivingShards(receivingShardId);
123+
}
124+
} else {
125+
transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard);
126+
transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
127+
}
128+
}
112129
}
113130

114131

@@ -1594,7 +1611,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
15941611
Close();
15951612
Process();
15961613
SendToExternalShards(false);
1597-
SendToTopics();
1614+
SendToTopics(false);
15981615
}
15991616

16001617
void ImmediateCommit() {
@@ -1613,6 +1630,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
16131630
}
16141631
Close();
16151632
Process();
1633+
SendToTopics(true);
16161634
}
16171635

16181636
void DistributedCommit() {
@@ -1640,6 +1658,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
16401658
CA_LOG_D("Start rollback");
16411659
State = EState::ROLLINGBACK;
16421660
SendToExternalShards(true);
1661+
SendToTopics(true);
16431662
}
16441663

16451664
void SendToExternalShards(bool isRollback) {
@@ -1692,7 +1711,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
16921711
}
16931712
}
16941713

1695-
void SendToTopics() {
1714+
void SendToTopics(bool isImmediateCommit) {
16961715
if (!TxManager->HasTopics()) {
16971716
return;
16981717
}
@@ -1707,29 +1726,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17071726

17081727
for (auto& [tabletId, t] : topicTxs) {
17091728
auto& transaction = t.tx;
1710-
transaction.SetOp(NKikimrPQ::TDataTransaction::Commit);
1711-
1712-
const auto prepareSettings = TxManager->GetPrepareTransactionInfo();
1713-
if (!prepareSettings.ArbiterColumnShard) {
1714-
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
1715-
transaction.AddSendingShards(sendingShardId);
1716-
}
1717-
for (const ui64 receivingShardId : prepareSettings.ReceivingShards) {
1718-
transaction.AddReceivingShards(receivingShardId);
1719-
}
1720-
} else {
1721-
transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard);
1722-
transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
1729+
1730+
if (!isImmediateCommit) {
1731+
FillTopicsCommit(transaction, TxManager);
17231732
}
17241733

1725-
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();
1726-
17271734
if (t.hasWrite && writeId.Defined()) {
17281735
auto* w = transaction.MutableWriteId();
17291736
w->SetNodeId(SelfId().NodeId());
17301737
w->SetKeyId(*writeId);
17311738
}
1732-
transaction.SetImmediate(false);
1739+
transaction.SetImmediate(isImmediateCommit);
1740+
1741+
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();
17331742

17341743
ActorIdToProto(SelfId(), ev->Record.MutableSourceActor());
17351744
ev->Record.MutableData()->Swap(&transaction);
@@ -1738,7 +1747,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17381747
SendTime[tabletId] = TInstant::Now();
17391748
auto traceId = BufferWriteActor.GetTraceId();
17401749

1741-
CA_LOG_D("Preparing KQP transaction on topic tablet: " << tabletId << ", writeId: " << writeId);
1750+
CA_LOG_D("Executing KQP transaction on topic tablet: " << tabletId
1751+
<< ", writeId: " << writeId << ", isImmediateCommit: " << isImmediateCommit);
17421752

17431753
Send(
17441754
MakePipePerNodeCacheID(false),
@@ -1962,7 +1972,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19621972
Rollback();
19631973
State = EState::FINISHED;
19641974
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
1965-
} else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && !WriteInfos.empty() && !TxManager->HasTopics()) {
1975+
} else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && (!WriteInfos.empty() || TxManager->HasTopics())) {
19661976
TxManager->StartExecute();
19671977
ImmediateCommit();
19681978
} else {

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

+40
Original file line numberDiff line numberDiff line change
@@ -2587,6 +2587,24 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopic_4, TFixtureOltpSink)
25872587
TestWriteToTopic9();
25882588
}
25892589

2590+
Y_UNIT_TEST_F(OltpSink_WriteToTopic_5, TFixtureOltpSink)
2591+
{
2592+
CreateTopic("topic_A");
2593+
2594+
NTable::TSession tableSession = CreateTableSession();
2595+
NTable::TTransaction tx = BeginTx(tableSession);
2596+
2597+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
2598+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
2599+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
2600+
2601+
Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
2602+
2603+
RollbackTx(tx, EStatus::SUCCESS);
2604+
2605+
Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
2606+
}
2607+
25902608
Y_UNIT_TEST_F(OltpSink_WriteToTopics_1, TFixtureOltpSink)
25912609
{
25922610
TestWriteToTopic1();
@@ -2720,6 +2738,28 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_4, TFixtureOltpSink)
27202738

27212739
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
27222740
}
2741+
2742+
Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_5, TFixtureOltpSink)
2743+
{
2744+
CreateTopic("topic_A");
2745+
CreateTable("/Root/table_A");
2746+
2747+
NTable::TSession tableSession = CreateTableSession();
2748+
NTable::TTransaction tx = BeginTx(tableSession);
2749+
2750+
auto records = MakeTableRecords();
2751+
WriteToTable("table_A", records, &tx);
2752+
2753+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
2754+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
2755+
2756+
RollbackTx(tx, EStatus::SUCCESS);
2757+
2758+
Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
2759+
CheckTabletKeys("topic_A");
2760+
2761+
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0);
2762+
}
27232763
}
27242764

27252765
}

0 commit comments

Comments
 (0)