Skip to content

Commit a6a6c96

Browse files
The TEvProposeTransactionResult message was sent earlier by TEvTxCommitDone (#6233)
1 parent 3eca5e8 commit a6a6c96

File tree

4 files changed

+34
-18
lines changed

4 files changed

+34
-18
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -836,13 +836,17 @@ void TPersQueue::ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult&
836836
return;
837837
}
838838

839-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " LastStep " << LastStep << " LastTxId " << LastTxId);
839+
PQ_LOG_D("PlanStep " << PlanStep << ", PlanTxId " << PlanTxId <<
840+
", ExecStep " << ExecStep << ", ExecTxId " << ExecTxId);
840841
}
841842

842843
void TPersQueue::InitPlanStep(const NKikimrPQ::TTabletTxInfo& info)
843844
{
844-
LastStep = info.GetLastStep();
845-
LastTxId = info.GetLastTxId();
845+
PlanStep = info.GetPlanStep();
846+
PlanTxId = info.GetPlanTxId();
847+
848+
ExecStep = info.GetExecStep();
849+
ExecTxId = info.GetExecTxId();
846850
}
847851

848852
void TPersQueue::ReadTxWrites(const NKikimrClient::TKeyValueResponse::TReadResult& read,
@@ -3582,7 +3586,7 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
35823586
txAcks[ActorIdFromProto(tx.GetAckTo())].push_back(tx.GetTxId());
35833587
}
35843588

3585-
if (step >= LastStep) {
3589+
if (step >= PlanStep) {
35863590
ui64 lastPlannedTxId = 0;
35873591

35883592
for (ui64 txId : txIds) {
@@ -3617,13 +3621,15 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
36173621
lastPlannedTxId = txId;
36183622
}
36193623

3620-
LastStep = step;
3621-
LastTxId = lastPlannedTxId;
3624+
PlanStep = step;
3625+
PlanTxId = lastPlannedTxId;
3626+
3627+
PQ_LOG_D("PlanStep " << PlanStep << ", PlanTxId " << PlanTxId);
36223628
} else {
36233629
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
36243630
"Tablet " << TabletID() <<
36253631
" Old plan step " << step <<
3626-
", LastStep: " << LastStep);
3632+
", PlanStep: " << PlanStep);
36273633
}
36283634

36293635
SchedulePlanStepAck(step, txAcks);
@@ -3699,8 +3705,11 @@ void TPersQueue::AddCmdWriteTabletTxInfo(NKikimrClient::TKeyValueRequest& reques
36993705

37003706
void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info)
37013707
{
3702-
info.SetLastStep(LastStep);
3703-
info.SetLastTxId(LastTxId);
3708+
info.SetPlanStep(PlanStep);
3709+
info.SetPlanTxId(PlanTxId);
3710+
3711+
info.SetExecStep(ExecStep);
3712+
info.SetExecTxId(ExecTxId);
37043713
}
37053714

37063715
void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
@@ -4032,6 +4041,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
40324041
", (!TxQueue.empty())=" << !TxQueue.empty());
40334042

40344043
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
4044+
std::tie(ExecStep, ExecTxId) = TxQueue.front();
4045+
PQ_LOG_D("ExecStep " << ExecStep << ", ExecTxId " << ExecTxId);
4046+
40354047
switch (tx.Kind) {
40364048
case NKikimrPQ::TTransaction::KIND_DATA:
40374049
SendEvTxCalcPredicateToPartitions(ctx, tx);
@@ -4111,8 +4123,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
41114123
", tx.HaveParticipantsDecision()=" << tx.HaveParticipantsDecision());
41124124

41134125
if (tx.HaveParticipantsDecision()) {
4114-
SendEvProposeTransactionResult(ctx, tx);
4115-
41164126
if (tx.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT) {
41174127
SendEvTxCommitToPartitions(ctx, tx);
41184128
} else {
@@ -4136,6 +4146,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
41364146
Y_ABORT_UNLESS(!TxQueue.empty());
41374147
Y_ABORT_UNLESS(TxQueue.front().second == tx.TxId);
41384148

4149+
SendEvProposeTransactionResult(ctx, tx);
4150+
41394151
switch (tx.Kind) {
41404152
case NKikimrPQ::TTransaction::KIND_DATA:
41414153
SendEvReadSetAckToSenders(ctx, tx);
@@ -4380,7 +4392,7 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR
43804392
Txs.emplace(tx.GetTxId(), tx);
43814393

43824394
if (tx.HasStep()) {
4383-
if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(LastStep, LastTxId)) {
4395+
if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
43844396
plannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
43854397
}
43864398
}
@@ -4420,7 +4432,7 @@ void TPersQueue::OnInitComplete(const TActorContext& ctx)
44204432

44214433
ui64 TPersQueue::GetAllowedStep() const
44224434
{
4423-
return Max(LastStep + 1,
4435+
return Max(PlanStep + 1,
44244436
MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : TAppData::TimeProvider->Now().MilliSeconds());
44254437
}
44264438

ydb/core/persqueue/pq_impl.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
272272
//
273273
THashMap<ui64, TDistributedTransaction> Txs;
274274
TQueue<std::pair<ui64, ui64>> TxQueue;
275-
ui64 LastStep = 0;
276-
ui64 LastTxId = 0;
275+
ui64 PlanStep = 0;
276+
ui64 PlanTxId = 0;
277+
ui64 ExecStep = 0;
278+
ui64 ExecTxId = 0;
277279

278280
TDeque<std::unique_ptr<TEvPersQueue::TEvProposeTransaction>> EvProposeTransactionQueue;
279281
TDeque<std::pair<TActorId, std::unique_ptr<TEvTxProcessing::TEvPlanStep>>> EvPlanStepQueue;

ydb/core/persqueue/transaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque
324324
Y_ABORT_UNLESS(SourceActor != TActorId());
325325
ActorIdToProto(SourceActor, tx.MutableSourceActor());
326326

327-
PQ_LOG_D("save tx " << tx.DebugString());
327+
PQ_LOG_D("save tx " << tx.ShortDebugString());
328328

329329
TString value;
330330
Y_ABORT_UNLESS(tx.SerializeToString(&value));

ydb/core/protos/pqconfig.proto

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,8 +1126,10 @@ message TTabletTxInfo {
11261126
optional uint32 InternalPartitionId = 3;
11271127
};
11281128

1129-
optional uint64 LastStep = 2;
1130-
optional uint64 LastTxId = 3;
1129+
optional uint64 PlanStep = 2;
1130+
optional uint64 PlanTxId = 3;
11311131
repeated TTxWriteInfo TxWrites = 4;
11321132
optional uint64 NextSupportivePartitionId = 5;
1133+
optional uint64 ExecStep = 6;
1134+
optional uint64 ExecTxId = 7;
11331135
}

0 commit comments

Comments
 (0)