Skip to content

Commit 03395eb

Browse files
The race between TEvProposeTransaction and TEvLockStatus (#8517)
1 parent 8de51e6 commit 03395eb

File tree

6 files changed

+109
-63
lines changed

6 files changed

+109
-63
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,7 +2173,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21732173

21742174
using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction*>;
21752175
using TEvWriteTxs = THashMap<ui64, NKikimrDataEvents::TEvWrite*>;
2176-
using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>;
2176+
using TTopicTabletTxs = NTopic::TTopicOperationTransactions;
21772177

21782178
void ContinueExecute() {
21792179
if (Stats) {
@@ -2430,10 +2430,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24302430
}
24312431
}
24322432

2433-
for (auto& [_, tx] : topicTxs) {
2434-
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
2435-
*tx.MutableSendingShards() = sendingShards;
2436-
*tx.MutableReceivingShards() = receivingShards;
2433+
for (auto& [_, t] : topicTxs) {
2434+
t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
2435+
*t.tx.MutableSendingShards() = sendingShards;
2436+
*t.tx.MutableReceivingShards() = receivingShards;
24372437
YQL_ENSURE(!arbiter);
24382438
}
24392439
}
@@ -2595,13 +2595,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25952595
writeId = Request.TopicOperations.GetWriteId();
25962596
}
25972597

2598-
for (auto& tx : topicTxs) {
2599-
auto tabletId = tx.first;
2600-
auto& transaction = tx.second;
2598+
for (auto& [tabletId, t] : topicTxs) {
2599+
auto& transaction = t.tx;
26012600

26022601
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
26032602

2604-
if (writeId.Defined()) {
2603+
if (t.hasWrite && writeId.Defined()) {
26052604
auto* w = transaction.MutableWriteId();
26062605
w->SetNodeId(SelfId().NodeId());
26072606
w->SetKeyId(*writeId);

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
105105
HasWriteOperations_ = true;
106106
}
107107

108-
void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
108+
void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
109109
{
110110
Y_ABORT_UNLESS(TabletId_.Defined());
111111
Y_ABORT_UNLESS(Partition_.Defined());
112112

113-
auto& tx = txs[*TabletId_];
113+
auto& t = txs[*TabletId_];
114114

115115
for (auto& [consumer, operations] : Operations_) {
116-
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
116+
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
117117
o->SetPartitionId(*Partition_);
118118
auto [begin, end] = operations.GetRange();
119119
o->SetBegin(begin);
@@ -123,12 +123,13 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
123123
}
124124

125125
if (HasWriteOperations_) {
126-
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
126+
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
127127
o->SetPartitionId(*Partition_);
128128
o->SetPath(*Topic_);
129129
if (SupportivePartition_.Defined()) {
130130
o->SetSupportivePartition(*SupportivePartition_);
131131
}
132+
t.hasWrite = true;
132133
}
133134
}
134135

@@ -355,7 +356,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
355356
return true;
356357
}
357358

358-
void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
359+
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
359360
{
360361
for (auto& [_, operations] : Operations_) {
361362
operations.BuildTopicTxs(txs);

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ class TConsumerOperations {
4242
TDisjointIntervalTree<ui64> Offsets_;
4343
};
4444

45+
struct TTopicOperationTransaction {
46+
NKikimrPQ::TDataTransaction tx;
47+
bool hasWrite = false;
48+
};
49+
50+
using TTopicOperationTransactions = THashMap<ui64, TTopicOperationTransaction>;
51+
4552
class TTopicPartitionOperations {
4653
public:
4754
bool IsValid() const;
@@ -52,7 +59,7 @@ class TTopicPartitionOperations {
5259
void AddOperation(const TString& topic, ui32 partition,
5360
TMaybe<ui32> supportivePartition);
5461

55-
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
62+
void BuildTopicTxs(TTopicOperationTransactions &txs);
5663

5764
void Merge(const TTopicPartitionOperations& rhs);
5865

@@ -109,7 +116,7 @@ class TTopicOperations {
109116
Ydb::StatusIds_StatusCode& status,
110117
TString& message);
111118

112-
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
119+
void BuildTopicTxs(TTopicOperationTransactions &txs);
113120

114121
void Merge(const TTopicOperations& rhs);
115122

ydb/core/persqueue/pq_impl.cpp

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,16 +1041,21 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
10411041
for (size_t i = 0; i != info.TxWritesSize(); ++i) {
10421042
auto& txWrite = info.GetTxWrites(i);
10431043
const TWriteId writeId = GetWriteId(txWrite);
1044-
ui32 partitionId = txWrite.GetOriginalPartitionId();
1045-
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());
10461044

1047-
TxWrites[writeId].Partitions.emplace(partitionId, shadowPartitionId);
1045+
TTxWriteInfo& writeInfo = TxWrites[writeId];
1046+
if (txWrite.HasOriginalPartitionId()) {
1047+
ui32 partitionId = txWrite.GetOriginalPartitionId();
1048+
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());
10481049

1049-
AddSupportivePartition(shadowPartitionId);
1050-
CreateSupportivePartitionActor(shadowPartitionId, ctx);
1051-
SubscribeWriteId(writeId, ctx);
1050+
writeInfo.Partitions.emplace(partitionId, shadowPartitionId);
1051+
1052+
AddSupportivePartition(shadowPartitionId);
1053+
CreateSupportivePartitionActor(shadowPartitionId, ctx);
1054+
1055+
NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
1056+
}
10521057

1053-
NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
1058+
SubscribeWriteId(writeId, ctx);
10541059
}
10551060

10561061
NewSupportivePartitions.clear();
@@ -3281,7 +3286,7 @@ bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& ope
32813286
TPartitionId partitionId(operation.GetPartitionId(),
32823287
writeId,
32833288
operation.GetSupportivePartition());
3284-
PQ_LOG_D("partitionId=" << partitionId);
3289+
PQ_LOG_D("PartitionId " << partitionId << " for WriteId " << writeId);
32853290
return Partitions.contains(partitionId);
32863291
}
32873292

@@ -3292,7 +3297,6 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
32923297
}
32933298

32943299
const TWriteId writeId = GetWriteId(txBody);
3295-
PQ_LOG_D("writeId=" << writeId);
32963300

32973301
for (auto& operation : txBody.GetOperations()) {
32983302
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
@@ -3318,7 +3322,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33183322
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
33193323

33203324
if (TabletState != NKikimrPQ::ENormal) {
3321-
PQ_LOG_D("invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
3325+
PQ_LOG_D("TxId " << event.GetTxId() << " invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
33223326
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33233327
event.GetTxId(),
33243328
NKikimrPQ::TError::ERROR,
@@ -3332,7 +3336,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33323336
//
33333337

33343338
if (txBody.OperationsSize() <= 0) {
3335-
PQ_LOG_D("empty list of operations");
3339+
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
33363340
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33373341
event.GetTxId(),
33383342
NKikimrPQ::TError::BAD_REQUEST,
@@ -3342,7 +3346,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33423346
}
33433347

33443348
if (!CheckTxWriteOperations(txBody)) {
3345-
PQ_LOG_D("invalid WriteId " << txBody.GetWriteId());
3349+
PQ_LOG_D("TxId " << event.GetTxId() << " invalid WriteId " << txBody.GetWriteId());
33463350
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33473351
event.GetTxId(),
33483352
NKikimrPQ::TError::BAD_REQUEST,
@@ -3351,9 +3355,36 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33513355
return;
33523356
}
33533357

3358+
if (txBody.HasWriteId()) {
3359+
const TWriteId writeId = GetWriteId(txBody);
3360+
if (!TxWrites.contains(writeId)) {
3361+
PQ_LOG_D("TxId " << event.GetTxId() << " unknown WriteId " << writeId);
3362+
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
3363+
event.GetTxId(),
3364+
NKikimrPQ::TError::BAD_REQUEST,
3365+
"unknown WriteId",
3366+
ctx);
3367+
return;
3368+
}
3369+
3370+
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
3371+
if (writeInfo.Deleting) {
3372+
PQ_LOG_W("TxId " << event.GetTxId() << " WriteId " << writeId << " will be deleted");
3373+
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
3374+
event.GetTxId(),
3375+
NKikimrPQ::TError::BAD_REQUEST,
3376+
"WriteId will be deleted",
3377+
ctx);
3378+
return;
3379+
}
3380+
3381+
writeInfo.TxId = event.GetTxId();
3382+
PQ_LOG_D("TxId " << event.GetTxId() << " has WriteId " << writeId);
3383+
}
3384+
33543385
TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
33553386
if (!partitionId.Defined()) {
3356-
PQ_LOG_D("unknown partition for WriteId " << txBody.GetWriteId());
3387+
PQ_LOG_W("TxId " << event.GetTxId() << " unknown partition for WriteId " << txBody.GetWriteId());
33573388
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
33583389
event.GetTxId(),
33593390
NKikimrPQ::TError::INTERNAL,
@@ -3566,13 +3597,15 @@ bool TPersQueue::CanProcessTxWrites() const
35663597
void TPersQueue::SubscribeWriteId(const TWriteId& writeId,
35673598
const TActorContext& ctx)
35683599
{
3600+
PQ_LOG_D("send TEvSubscribeLock for WriteId " << writeId);
35693601
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
35703602
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId.KeyId, writeId.NodeId));
35713603
}
35723604

35733605
void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId,
35743606
const TActorContext& ctx)
35753607
{
3608+
PQ_LOG_D("send TEvUnsubscribeLock for WriteId " << writeId);
35763609
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
35773610
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId.KeyId, writeId.NodeId));
35783611
}
@@ -3874,11 +3907,16 @@ void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info)
38743907
void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
38753908
{
38763909
for (auto& [writeId, write] : TxWrites) {
3877-
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
3910+
if (write.Partitions.empty()) {
38783911
auto* txWrite = info.MutableTxWrites()->Add();
38793912
SetWriteId(*txWrite, writeId);
3880-
txWrite->SetOriginalPartitionId(partitionId);
3881-
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
3913+
} else {
3914+
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
3915+
auto* txWrite = info.MutableTxWrites()->Add();
3916+
SetWriteId(*txWrite, writeId);
3917+
txWrite->SetOriginalPartitionId(partitionId);
3918+
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
3919+
}
38823920
}
38833921
}
38843922

@@ -4323,6 +4361,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43234361

43244362
WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);
43254363

4364+
PQ_LOG_D("delete partitions for TxId " << tx.TxId);
4365+
BeginDeletePartitions(tx);
4366+
43264367
tx.State = NKikimrPQ::TTransaction::EXECUTED;
43274368
PQ_LOG_D("TxId " << tx.TxId <<
43284369
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
@@ -4341,8 +4382,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43414382

43424383
case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
43434384
PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive() <<
4344-
", WriteIdIsDisabled " << WriteIdIsDisabled(tx.WriteId));
4345-
if (tx.HaveAllRecipientsReceive() && WriteIdIsDisabled(tx.WriteId)) {
4385+
", AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted(tx.WriteId));
4386+
if (tx.HaveAllRecipientsReceive() && AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)) {
43464387
DeleteTx(tx);
43474388
// implicitly switch to the state DELETING
43484389
}
@@ -4367,7 +4408,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43674408
}
43684409
}
43694410

4370-
bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
4411+
bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const
43714412
{
43724413
if (!writeId.Defined()) {
43734414
return true;
@@ -4378,26 +4419,21 @@ bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
43784419
TabletID(), writeId->NodeId, writeId->KeyId);
43794420
const TTxWriteInfo& writeInfo = TxWrites.at(*writeId);
43804421

4381-
bool disabled =
4382-
(writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
4422+
PQ_LOG_D("WriteId " << *writeId <<
4423+
" Partitions.size=" << writeInfo.Partitions.size());
4424+
bool deleted =
43834425
writeInfo.Partitions.empty()
43844426
;
43854427

4386-
PQ_LOG_D("WriteId " << *writeId << " is " << (disabled ? "disabled" : "enabled"));
4387-
4388-
return disabled;
4428+
return deleted;
43894429
}
43904430

43914431
void TPersQueue::DeleteWriteId(const TMaybe<TWriteId>& writeId)
43924432
{
4393-
if (!writeId.Defined()) {
4433+
if (!writeId.Defined() || !TxWrites.contains(*writeId)) {
43944434
return;
43954435
}
43964436

4397-
Y_ABORT_UNLESS(TxWrites.contains(*writeId),
4398-
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
4399-
TabletID(), writeId->NodeId, writeId->KeyId);
4400-
44014437
PQ_LOG_D("delete WriteId " << *writeId);
44024438
TxWrites.erase(*writeId);
44034439
}
@@ -4727,7 +4763,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
47274763
}
47284764
}
47294765

4730-
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx)
4766+
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev)
47314767
{
47324768
PQ_LOG_D("Handle TEvLongTxService::TEvLockStatus " << ev->Get()->Record.ShortDebugString());
47334769

@@ -4748,22 +4784,14 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
47484784
return;
47494785
}
47504786

4751-
if (!writeInfo.TxId.Defined()) {
4752-
PQ_LOG_D("delete write info for WriteId " << writeId);
4753-
// the message TEvProposeTransaction will not come anymore
4754-
BeginDeletePartitions(writeInfo);
4787+
if (writeInfo.TxId.Defined()) {
4788+
// the message `TEvProposeTransaction` has already arrived
4789+
PQ_LOG_D("there is already a transaction TxId " << writeInfo.TxId << " for WriteId " << writeId);
47554790
return;
47564791
}
47574792

4758-
ui64 txId = *writeInfo.TxId;
4759-
PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << txId);
4760-
4761-
auto* tx = GetTransaction(ctx, txId);
4762-
if (!tx ||
4763-
(tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
4764-
(tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
4765-
BeginDeletePartitions(writeInfo);
4766-
}
4793+
PQ_LOG_D("delete partitions for WriteId " << writeId);
4794+
BeginDeletePartitions(writeInfo);
47674795
}
47684796

47694797
void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
@@ -4863,6 +4891,16 @@ void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
48634891
writeInfo.Deleting = true;
48644892
}
48654893

4894+
void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx)
4895+
{
4896+
if (!tx.WriteId.Defined() || !TxWrites.contains(*tx.WriteId)) {
4897+
return;
4898+
}
4899+
4900+
TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId);
4901+
BeginDeletePartitions(writeInfo);
4902+
}
4903+
48664904
TString TPersQueue::LogPrefix() const {
48674905
return TStringBuilder() << "[PQ: " << TabletID() << "] ";
48684906
}
@@ -4917,7 +4955,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
49174955
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
49184956
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
49194957
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
4920-
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
4958+
hFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
49214959
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
49224960
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
49234961
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);

0 commit comments

Comments
 (0)