Skip to content

Commit a4817e0

Browse files
Alek5andr-Kotovspuchin
authored andcommitted
The race between TEvProposeTransaction and TEvLockStatus (ydb-platform#8517) (ydb-platform#8670)
1 parent 2d87ec8 commit a4817e0

File tree

6 files changed

+109
-63
lines changed

6 files changed

+109
-63
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2167,7 +2167,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21672167

21682168
using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction*>;
21692169
using TEvWriteTxs = THashMap<ui64, NKikimrDataEvents::TEvWrite*>;
2170-
using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>;
2170+
using TTopicTabletTxs = NTopic::TTopicOperationTransactions;
21712171

21722172
void ContinueExecute() {
21732173
if (Stats) {
@@ -2424,10 +2424,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24242424
}
24252425
}
24262426

2427-
for (auto& [_, tx] : topicTxs) {
2428-
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
2429-
*tx.MutableSendingShards() = sendingShards;
2430-
*tx.MutableReceivingShards() = receivingShards;
2427+
for (auto& [_, t] : topicTxs) {
2428+
t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
2429+
*t.tx.MutableSendingShards() = sendingShards;
2430+
*t.tx.MutableReceivingShards() = receivingShards;
24312431
YQL_ENSURE(!arbiter);
24322432
}
24332433
}
@@ -2589,13 +2589,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25892589
writeId = Request.TopicOperations.GetWriteId();
25902590
}
25912591

2592-
for (auto& tx : topicTxs) {
2593-
auto tabletId = tx.first;
2594-
auto& transaction = tx.second;
2592+
for (auto& [tabletId, t] : topicTxs) {
2593+
auto& transaction = t.tx;
25952594

25962595
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
25972596

2598-
if (writeId.Defined()) {
2597+
if (t.hasWrite && writeId.Defined()) {
25992598
auto* w = transaction.MutableWriteId();
26002599
w->SetNodeId(SelfId().NodeId());
26012600
w->SetKeyId(*writeId);

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
105105
HasWriteOperations_ = true;
106106
}
107107

108-
void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
108+
void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
109109
{
110110
Y_ABORT_UNLESS(TabletId_.Defined());
111111
Y_ABORT_UNLESS(Partition_.Defined());
112112

113-
auto& tx = txs[*TabletId_];
113+
auto& t = txs[*TabletId_];
114114

115115
for (auto& [consumer, operations] : Operations_) {
116-
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
116+
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
117117
o->SetPartitionId(*Partition_);
118118
auto [begin, end] = operations.GetRange();
119119
o->SetBegin(begin);
@@ -123,12 +123,13 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
123123
}
124124

125125
if (HasWriteOperations_) {
126-
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
126+
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
127127
o->SetPartitionId(*Partition_);
128128
o->SetPath(*Topic_);
129129
if (SupportivePartition_.Defined()) {
130130
o->SetSupportivePartition(*SupportivePartition_);
131131
}
132+
t.hasWrite = true;
132133
}
133134
}
134135

@@ -355,7 +356,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
355356
return true;
356357
}
357358

358-
void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
359+
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
359360
{
360361
for (auto& [_, operations] : Operations_) {
361362
operations.BuildTopicTxs(txs);

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ class TConsumerOperations {
4242
TDisjointIntervalTree<ui64> Offsets_;
4343
};
4444

45+
struct TTopicOperationTransaction {
46+
NKikimrPQ::TDataTransaction tx;
47+
bool hasWrite = false;
48+
};
49+
50+
using TTopicOperationTransactions = THashMap<ui64, TTopicOperationTransaction>;
51+
4552
class TTopicPartitionOperations {
4653
public:
4754
bool IsValid() const;
@@ -52,7 +59,7 @@ class TTopicPartitionOperations {
5259
void AddOperation(const TString& topic, ui32 partition,
5360
TMaybe<ui32> supportivePartition);
5461

55-
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
62+
void BuildTopicTxs(TTopicOperationTransactions &txs);
5663

5764
void Merge(const TTopicPartitionOperations& rhs);
5865

@@ -109,7 +116,7 @@ class TTopicOperations {
109116
Ydb::StatusIds_StatusCode& status,
110117
TString& message);
111118

112-
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
119+
void BuildTopicTxs(TTopicOperationTransactions &txs);
113120

114121
void Merge(const TTopicOperations& rhs);
115122

ydb/core/persqueue/pq_impl.cpp

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,16 +1041,21 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
10411041
for (size_t i = 0; i != info.TxWritesSize(); ++i) {
10421042
auto& txWrite = info.GetTxWrites(i);
10431043
const TWriteId writeId = GetWriteId(txWrite);
1044-
ui32 partitionId = txWrite.GetOriginalPartitionId();
1045-
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());
10461044

1047-
TxWrites[writeId].Partitions.emplace(partitionId, shadowPartitionId);
1045+
TTxWriteInfo& writeInfo = TxWrites[writeId];
1046+
if (txWrite.HasOriginalPartitionId()) {
1047+
ui32 partitionId = txWrite.GetOriginalPartitionId();
1048+
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());
10481049

1049-
AddSupportivePartition(shadowPartitionId);
1050-
CreateSupportivePartitionActor(shadowPartitionId, ctx);
1051-
SubscribeWriteId(writeId, ctx);
1050+
writeInfo.Partitions.emplace(partitionId, shadowPartitionId);
1051+
1052+
AddSupportivePartition(shadowPartitionId);
1053+
CreateSupportivePartitionActor(shadowPartitionId, ctx);
1054+
1055+
NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
1056+
}
10521057

1053-
NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
1058+
SubscribeWriteId(writeId, ctx);
10541059
}
10551060

10561061
NewSupportivePartitions.clear();
@@ -3283,7 +3288,7 @@ bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& ope
32833288
TPartitionId partitionId(operation.GetPartitionId(),
32843289
writeId,
32853290
operation.GetSupportivePartition());
3286-
PQ_LOG_D("partitionId=" << partitionId);
3291+
PQ_LOG_D("PartitionId " << partitionId << " for WriteId " << writeId);
32873292
return Partitions.contains(partitionId);
32883293
}
32893294

@@ -3294,7 +3299,6 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
32943299
}
32953300

32963301
const TWriteId writeId = GetWriteId(txBody);
3297-
PQ_LOG_D("writeId=" << writeId);
32983302

32993303
for (auto& operation : txBody.GetOperations()) {
33003304
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
@@ -3320,7 +3324,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33203324
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
33213325

33223326
if (TabletState != NKikimrPQ::ENormal) {
3323-
PQ_LOG_D("invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
3327+
PQ_LOG_D("TxId " << event.GetTxId() << " invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
33243328
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33253329
event.GetTxId(),
33263330
NKikimrPQ::TError::ERROR,
@@ -3334,7 +3338,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33343338
//
33353339

33363340
if (txBody.OperationsSize() <= 0) {
3337-
PQ_LOG_D("empty list of operations");
3341+
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
33383342
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33393343
event.GetTxId(),
33403344
NKikimrPQ::TError::BAD_REQUEST,
@@ -3344,7 +3348,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33443348
}
33453349

33463350
if (!CheckTxWriteOperations(txBody)) {
3347-
PQ_LOG_D("invalid WriteId " << txBody.GetWriteId());
3351+
PQ_LOG_D("TxId " << event.GetTxId() << " invalid WriteId " << txBody.GetWriteId());
33483352
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33493353
event.GetTxId(),
33503354
NKikimrPQ::TError::BAD_REQUEST,
@@ -3353,9 +3357,36 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33533357
return;
33543358
}
33553359

3360+
if (txBody.HasWriteId()) {
3361+
const TWriteId writeId = GetWriteId(txBody);
3362+
if (!TxWrites.contains(writeId)) {
3363+
PQ_LOG_D("TxId " << event.GetTxId() << " unknown WriteId " << writeId);
3364+
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
3365+
event.GetTxId(),
3366+
NKikimrPQ::TError::BAD_REQUEST,
3367+
"unknown WriteId",
3368+
ctx);
3369+
return;
3370+
}
3371+
3372+
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
3373+
if (writeInfo.Deleting) {
3374+
PQ_LOG_W("TxId " << event.GetTxId() << " WriteId " << writeId << " will be deleted");
3375+
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
3376+
event.GetTxId(),
3377+
NKikimrPQ::TError::BAD_REQUEST,
3378+
"WriteId will be deleted",
3379+
ctx);
3380+
return;
3381+
}
3382+
3383+
writeInfo.TxId = event.GetTxId();
3384+
PQ_LOG_D("TxId " << event.GetTxId() << " has WriteId " << writeId);
3385+
}
3386+
33563387
TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
33573388
if (!partitionId.Defined()) {
3358-
PQ_LOG_D("unknown partition for WriteId " << txBody.GetWriteId());
3389+
PQ_LOG_W("TxId " << event.GetTxId() << " unknown partition for WriteId " << txBody.GetWriteId());
33593390
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33603391
event.GetTxId(),
33613392
NKikimrPQ::TError::INTERNAL,
@@ -3568,13 +3599,15 @@ bool TPersQueue::CanProcessTxWrites() const
35683599
void TPersQueue::SubscribeWriteId(const TWriteId& writeId,
35693600
const TActorContext& ctx)
35703601
{
3602+
PQ_LOG_D("send TEvSubscribeLock for WriteId " << writeId);
35713603
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
35723604
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId.KeyId, writeId.NodeId));
35733605
}
35743606

35753607
void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId,
35763608
const TActorContext& ctx)
35773609
{
3610+
PQ_LOG_D("send TEvUnsubscribeLock for WriteId " << writeId);
35783611
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
35793612
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId.KeyId, writeId.NodeId));
35803613
}
@@ -3876,11 +3909,16 @@ void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info)
38763909
void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
38773910
{
38783911
for (auto& [writeId, write] : TxWrites) {
3879-
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
3912+
if (write.Partitions.empty()) {
38803913
auto* txWrite = info.MutableTxWrites()->Add();
38813914
SetWriteId(*txWrite, writeId);
3882-
txWrite->SetOriginalPartitionId(partitionId);
3883-
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
3915+
} else {
3916+
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
3917+
auto* txWrite = info.MutableTxWrites()->Add();
3918+
SetWriteId(*txWrite, writeId);
3919+
txWrite->SetOriginalPartitionId(partitionId);
3920+
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
3921+
}
38843922
}
38853923
}
38863924

@@ -4325,6 +4363,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43254363

43264364
WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);
43274365

4366+
PQ_LOG_D("delete partitions for TxId " << tx.TxId);
4367+
BeginDeletePartitions(tx);
4368+
43284369
tx.State = NKikimrPQ::TTransaction::EXECUTED;
43294370
PQ_LOG_D("TxId " << tx.TxId <<
43304371
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
@@ -4343,8 +4384,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43434384

43444385
case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
43454386
PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive() <<
4346-
", WriteIdIsDisabled " << WriteIdIsDisabled(tx.WriteId));
4347-
if (tx.HaveAllRecipientsReceive() && WriteIdIsDisabled(tx.WriteId)) {
4387+
", AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted(tx.WriteId));
4388+
if (tx.HaveAllRecipientsReceive() && AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)) {
43484389
DeleteTx(tx);
43494390
// implicitly switch to the state DELETING
43504391
}
@@ -4369,7 +4410,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43694410
}
43704411
}
43714412

4372-
bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
4413+
bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const
43734414
{
43744415
if (!writeId.Defined()) {
43754416
return true;
@@ -4380,26 +4421,21 @@ bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
43804421
TabletID(), writeId->NodeId, writeId->KeyId);
43814422
const TTxWriteInfo& writeInfo = TxWrites.at(*writeId);
43824423

4383-
bool disabled =
4384-
(writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
4424+
PQ_LOG_D("WriteId " << *writeId <<
4425+
" Partitions.size=" << writeInfo.Partitions.size());
4426+
bool deleted =
43854427
writeInfo.Partitions.empty()
43864428
;
43874429

4388-
PQ_LOG_D("WriteId " << *writeId << " is " << (disabled ? "disabled" : "enabled"));
4389-
4390-
return disabled;
4430+
return deleted;
43914431
}
43924432

43934433
void TPersQueue::DeleteWriteId(const TMaybe<TWriteId>& writeId)
43944434
{
4395-
if (!writeId.Defined()) {
4435+
if (!writeId.Defined() || !TxWrites.contains(*writeId)) {
43964436
return;
43974437
}
43984438

4399-
Y_ABORT_UNLESS(TxWrites.contains(*writeId),
4400-
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
4401-
TabletID(), writeId->NodeId, writeId->KeyId);
4402-
44034439
PQ_LOG_D("delete WriteId " << *writeId);
44044440
TxWrites.erase(*writeId);
44054441
}
@@ -4729,7 +4765,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
47294765
}
47304766
}
47314767

4732-
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx)
4768+
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev)
47334769
{
47344770
PQ_LOG_D("Handle TEvLongTxService::TEvLockStatus " << ev->Get()->Record.ShortDebugString());
47354771

@@ -4750,22 +4786,14 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
47504786
return;
47514787
}
47524788

4753-
if (!writeInfo.TxId.Defined()) {
4754-
PQ_LOG_D("delete write info for WriteId " << writeId);
4755-
// the message TEvProposeTransaction will not come anymore
4756-
BeginDeletePartitions(writeInfo);
4789+
if (writeInfo.TxId.Defined()) {
4790+
// the message `TEvProposeTransaction` has already arrived
4791+
PQ_LOG_D("there is already a transaction TxId " << writeInfo.TxId << " for WriteId " << writeId);
47574792
return;
47584793
}
47594794

4760-
ui64 txId = *writeInfo.TxId;
4761-
PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << txId);
4762-
4763-
auto* tx = GetTransaction(ctx, txId);
4764-
if (!tx ||
4765-
(tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
4766-
(tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
4767-
BeginDeletePartitions(writeInfo);
4768-
}
4795+
PQ_LOG_D("delete partitions for WriteId " << writeId);
4796+
BeginDeletePartitions(writeInfo);
47694797
}
47704798

47714799
void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
@@ -4865,6 +4893,16 @@ void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
48654893
writeInfo.Deleting = true;
48664894
}
48674895

4896+
void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx)
4897+
{
4898+
if (!tx.WriteId.Defined() || !TxWrites.contains(*tx.WriteId)) {
4899+
return;
4900+
}
4901+
4902+
TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId);
4903+
BeginDeletePartitions(writeInfo);
4904+
}
4905+
48684906
TString TPersQueue::LogPrefix() const {
48694907
return TStringBuilder() << "[PQ: " << TabletID() << "] ";
48704908
}
@@ -4919,7 +4957,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
49194957
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
49204958
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
49214959
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
4922-
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
4960+
hFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
49234961
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
49244962
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
49254963
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);

0 commit comments

Comments
 (0)