Skip to content

Commit 25a5416

Browse files
the TEvTxCalcPredicate message for the completed transaction (#8809) (#9225)
1 parent 3033104 commit 25a5416

File tree

5 files changed

+36
-18
lines changed

5 files changed

+36
-18
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -822,7 +822,7 @@ struct TEvPQ {
822822
};
823823

824824
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
825-
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
825+
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
826826
Step(step),
827827
TxId(txId),
828828
Partition(partition),
@@ -833,7 +833,7 @@ struct TEvPQ {
833833
ui64 Step;
834834
ui64 TxId;
835835
NPQ::TPartitionId Partition;
836-
bool Predicate = false;
836+
TMaybe<bool> Predicate;
837837
};
838838

839839
struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {

ydb/core/persqueue/partition.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,21 @@ void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const
966966

967967
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
968968
{
969+
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
970+
" Step " << ev->Get()->Step <<
971+
", TxId " << ev->Get()->TxId);
972+
973+
if (PlanStep.Defined() && TxId.Defined()) {
974+
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
975+
Send(Tablet,
976+
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
977+
ev->Get()->TxId,
978+
Partition,
979+
Nothing()).Release());
980+
return;
981+
}
982+
}
983+
969984
PushBackDistrTx(ev->Release());
970985

971986
ProcessTxsAndUserActs(ctx);

ydb/core/persqueue/pq_impl.cpp

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3506,7 +3506,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
35063506
" Step " << event.Step <<
35073507
", TxId " << event.TxId <<
35083508
", Partition " << event.Partition <<
3509-
", Predicate " << (event.Predicate ? "true" : "false"));
3509+
", Predicate " << event.Predicate);
35103510

35113511
auto tx = GetTransaction(ctx, event.TxId);
35123512
if (!tx) {
@@ -4212,9 +4212,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42124212

42134213
tx.WriteInProgress = false;
42144214

4215-
//
4216-
// запланированные события будут отправлены в EndWriteTxs
4217-
//
4215+
// scheduled events will be sent to EndWriteTxs
42184216

42194217
tx.State = NKikimrPQ::TTransaction::PREPARED;
42204218
PQ_LOG_D("TxId " << tx.TxId <<
@@ -4242,9 +4240,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42424240

42434241
tx.WriteInProgress = false;
42444242

4245-
//
4246-
// запланированные события будут отправлены в EndWriteTxs
4247-
//
4243+
// scheduled events will be sent to EndWriteTxs
42484244

42494245
tx.State = NKikimrPQ::TTransaction::PLANNED;
42504246
PQ_LOG_D("TxId " << tx.TxId <<
@@ -4274,6 +4270,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42744270
switch (tx.Kind) {
42754271
case NKikimrPQ::TTransaction::KIND_DATA:
42764272
case NKikimrPQ::TTransaction::KIND_CONFIG:
4273+
WriteTx(tx, NKikimrPQ::TTransaction::CALCULATED);
4274+
42774275
tx.State = NKikimrPQ::TTransaction::CALCULATED;
42784276
PQ_LOG_D("TxId " << tx.TxId <<
42794277
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
@@ -4283,14 +4281,12 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42834281
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
42844282
Y_ABORT_UNLESS(false);
42854283
}
4286-
} else {
4287-
break;
42884284
}
42894285

4290-
[[fallthrough]];
4286+
break;
42914287

42924288
case NKikimrPQ::TTransaction::CALCULATED:
4293-
Y_ABORT_UNLESS(!tx.WriteInProgress,
4289+
Y_ABORT_UNLESS(tx.WriteInProgress,
42944290
"PQ %" PRIu64 ", TxId %" PRIu64,
42954291
TabletID(), tx.TxId);
42964292

ydb/core/persqueue/transaction.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,13 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred
215215
{
216216
PQ_LOG_D("Handle TEvTxCalcPredicateResult");
217217

218-
OnPartitionResult(event,
219-
event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
218+
TMaybe<EDecision> decision;
219+
220+
if (event.Predicate.Defined()) {
221+
decision = *event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
222+
}
223+
224+
OnPartitionResult(event, decision);
220225
}
221226

222227
void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event)
@@ -228,14 +233,16 @@ void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvPro
228233
}
229234

230235
template<class E>
231-
void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decision)
236+
void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe<EDecision> decision)
232237
{
233238
Y_ABORT_UNLESS(Step == event.Step);
234239
Y_ABORT_UNLESS(TxId == event.TxId);
235240

236241
Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId));
237242

238-
SetDecision(SelfDecision, decision);
243+
if (decision.Defined()) {
244+
SetDecision(SelfDecision, *decision);
245+
}
239246

240247
++PartitionRepliesCount;
241248

ydb/core/persqueue/transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ struct TDistributedTransaction {
9090
void InitPartitions();
9191

9292
template<class E>
93-
void OnPartitionResult(const E& event, EDecision decision);
93+
void OnPartitionResult(const E& event, TMaybe<EDecision> decision);
9494

9595
TString LogPrefix() const;
9696

0 commit comments

Comments
 (0)