Skip to content

Commit 05f6089

Browse files
Merge c94c049 into cc448a1
2 parents cc448a1 + c94c049 commit 05f6089

File tree

5 files changed

+36
-18
lines changed

5 files changed

+36
-18
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -822,7 +822,7 @@ struct TEvPQ {
822822
};
823823

824824
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
825-
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
825+
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
826826
Step(step),
827827
TxId(txId),
828828
Partition(partition),
@@ -833,7 +833,7 @@ struct TEvPQ {
833833
ui64 Step;
834834
ui64 TxId;
835835
NPQ::TPartitionId Partition;
836-
bool Predicate = false;
836+
TMaybe<bool> Predicate;
837837
};
838838

839839
struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {

ydb/core/persqueue/partition.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,21 @@ void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const
966966

967967
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
968968
{
969+
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
970+
" Step " << ev->Get()->Step <<
971+
", TxId " << ev->Get()->TxId);
972+
973+
if (PlanStep.Defined() && TxId.Defined()) {
974+
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
975+
Send(Tablet,
976+
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
977+
ev->Get()->TxId,
978+
Partition,
979+
Nothing()).Release());
980+
return;
981+
}
982+
}
983+
969984
PushBackDistrTx(ev->Release());
970985

971986
ProcessTxsAndUserActs(ctx);

ydb/core/persqueue/pq_impl.cpp

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3504,7 +3504,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
35043504
" Step " << event.Step <<
35053505
", TxId " << event.TxId <<
35063506
", Partition " << event.Partition <<
3507-
", Predicate " << (event.Predicate ? "true" : "false"));
3507+
", Predicate " << event.Predicate);
35083508

35093509
auto tx = GetTransaction(ctx, event.TxId);
35103510
if (!tx) {
@@ -4210,9 +4210,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42104210

42114211
tx.WriteInProgress = false;
42124212

4213-
//
4214-
// запланированные события будут отправлены в EndWriteTxs
4215-
//
4213+
// scheduled events will be sent to EndWriteTxs
42164214

42174215
tx.State = NKikimrPQ::TTransaction::PREPARED;
42184216
PQ_LOG_D("TxId " << tx.TxId <<
@@ -4240,9 +4238,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42404238

42414239
tx.WriteInProgress = false;
42424240

4243-
//
4244-
// запланированные события будут отправлены в EndWriteTxs
4245-
//
4241+
// scheduled events will be sent to EndWriteTxs
42464242

42474243
tx.State = NKikimrPQ::TTransaction::PLANNED;
42484244
PQ_LOG_D("TxId " << tx.TxId <<
@@ -4272,6 +4268,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42724268
switch (tx.Kind) {
42734269
case NKikimrPQ::TTransaction::KIND_DATA:
42744270
case NKikimrPQ::TTransaction::KIND_CONFIG:
4271+
WriteTx(tx, NKikimrPQ::TTransaction::CALCULATED);
4272+
42754273
tx.State = NKikimrPQ::TTransaction::CALCULATED;
42764274
PQ_LOG_D("TxId " << tx.TxId <<
42774275
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
@@ -4281,14 +4279,12 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
42814279
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
42824280
Y_ABORT_UNLESS(false);
42834281
}
4284-
} else {
4285-
break;
42864282
}
42874283

4288-
[[fallthrough]];
4284+
break;
42894285

42904286
case NKikimrPQ::TTransaction::CALCULATED:
4291-
Y_ABORT_UNLESS(!tx.WriteInProgress,
4287+
Y_ABORT_UNLESS(tx.WriteInProgress,
42924288
"PQ %" PRIu64 ", TxId %" PRIu64,
42934289
TabletID(), tx.TxId);
42944290

ydb/core/persqueue/transaction.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,13 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred
215215
{
216216
PQ_LOG_D("Handle TEvTxCalcPredicateResult");
217217

218-
OnPartitionResult(event,
219-
event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
218+
TMaybe<EDecision> decision;
219+
220+
if (event.Predicate.Defined()) {
221+
decision = *event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
222+
}
223+
224+
OnPartitionResult(event, decision);
220225
}
221226

222227
void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event)
@@ -228,14 +233,16 @@ void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvPro
228233
}
229234

230235
template<class E>
231-
void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decision)
236+
void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe<EDecision> decision)
232237
{
233238
Y_ABORT_UNLESS(Step == event.Step);
234239
Y_ABORT_UNLESS(TxId == event.TxId);
235240

236241
Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId));
237242

238-
SetDecision(SelfDecision, decision);
243+
if (decision.Defined()) {
244+
SetDecision(SelfDecision, *decision);
245+
}
239246

240247
++PartitionRepliesCount;
241248

ydb/core/persqueue/transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ struct TDistributedTransaction {
9090
void InitPartitions();
9191

9292
template<class E>
93-
void OnPartitionResult(const E& event, EDecision decision);
93+
void OnPartitionResult(const E& event, TMaybe<EDecision> decision);
9494

9595
TString LogPrefix() const;
9696

0 commit comments

Comments
 (0)