Skip to content

Commit 3332d6e

Browse files
[+] the WriteId field in the TEvProposeTransaction message
1 parent 853e486 commit 3332d6e

File tree

3 files changed

+23
-16
lines changed

3 files changed

+23
-16
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,7 +2173,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21732173

21742174
using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction*>;
21752175
using TEvWriteTxs = THashMap<ui64, NKikimrDataEvents::TEvWrite*>;
2176-
using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>;
2176+
using TTopicTabletTxs = NTopic::TTopicOperationTransactions;
21772177

21782178
void ContinueExecute() {
21792179
if (Stats) {
@@ -2430,10 +2430,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24302430
}
24312431
}
24322432

2433-
for (auto& [_, tx] : topicTxs) {
2434-
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
2435-
*tx.MutableSendingShards() = sendingShards;
2436-
*tx.MutableReceivingShards() = receivingShards;
2433+
for (auto& [_, t] : topicTxs) {
2434+
t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
2435+
*t.tx.MutableSendingShards() = sendingShards;
2436+
*t.tx.MutableReceivingShards() = receivingShards;
24372437
YQL_ENSURE(!arbiter);
24382438
}
24392439
}
@@ -2595,13 +2595,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25952595
writeId = Request.TopicOperations.GetWriteId();
25962596
}
25972597

2598-
for (auto& tx : topicTxs) {
2599-
auto tabletId = tx.first;
2600-
auto& transaction = tx.second;
2598+
for (auto& [tabletId, t] : topicTxs) {
2599+
auto& transaction = t.tx;
26012600

26022601
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
26032602

2604-
if (writeId.Defined()) {
2603+
if (t.hasWrite && writeId.Defined()) {
26052604
auto* w = transaction.MutableWriteId();
26062605
w->SetNodeId(SelfId().NodeId());
26072606
w->SetKeyId(*writeId);

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
105105
HasWriteOperations_ = true;
106106
}
107107

108-
void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
108+
void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
109109
{
110110
Y_ABORT_UNLESS(TabletId_.Defined());
111111
Y_ABORT_UNLESS(Partition_.Defined());
112112

113-
auto& tx = txs[*TabletId_];
113+
auto& t = txs[*TabletId_];
114114

115115
for (auto& [consumer, operations] : Operations_) {
116-
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
116+
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
117117
o->SetPartitionId(*Partition_);
118118
auto [begin, end] = operations.GetRange();
119119
o->SetBegin(begin);
@@ -123,12 +123,13 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
123123
}
124124

125125
if (HasWriteOperations_) {
126-
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
126+
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
127127
o->SetPartitionId(*Partition_);
128128
o->SetPath(*Topic_);
129129
if (SupportivePartition_.Defined()) {
130130
o->SetSupportivePartition(*SupportivePartition_);
131131
}
132+
t.hasWrite = true;
132133
}
133134
}
134135

@@ -355,7 +356,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
355356
return true;
356357
}
357358

358-
void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
359+
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
359360
{
360361
for (auto& [_, operations] : Operations_) {
361362
operations.BuildTopicTxs(txs);

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ class TConsumerOperations {
4242
TDisjointIntervalTree<ui64> Offsets_;
4343
};
4444

45+
struct TTopicOperationTransaction {
46+
NKikimrPQ::TDataTransaction tx;
47+
bool hasWrite = false;
48+
};
49+
50+
using TTopicOperationTransactions = THashMap<ui64, TTopicOperationTransaction>;
51+
4552
class TTopicPartitionOperations {
4653
public:
4754
bool IsValid() const;
@@ -52,7 +59,7 @@ class TTopicPartitionOperations {
5259
void AddOperation(const TString& topic, ui32 partition,
5360
TMaybe<ui32> supportivePartition);
5461

55-
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
62+
void BuildTopicTxs(TTopicOperationTransactions &txs);
5663

5764
void Merge(const TTopicPartitionOperations& rhs);
5865

@@ -109,7 +116,7 @@ class TTopicOperations {
109116
Ydb::StatusIds_StatusCode& status,
110117
TString& message);
111118

112-
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
119+
void BuildTopicTxs(TTopicOperationTransactions &txs);
113120

114121
void Merge(const TTopicOperations& rhs);
115122

0 commit comments

Comments
 (0)