Skip to content

Commit b1889da

Browse files
At the start of a partition, the order of messages may change. (#15160) (#15195)
1 parent d650718 commit b1889da

File tree

3 files changed

+171
-60
lines changed

3 files changed

+171
-60
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,8 @@ struct TEvPQ {
10861086
}
10871087

10881088
ui32 Cookie; // InternalPartitionId
1089+
TActorId SupportivePartition;
1090+
10891091
NPQ::TSourceIdMap SrcIdInfo;
10901092
std::deque<NPQ::TDataKey> BodyKeys;
10911093
TVector<NPQ::TClientBlob> BlobsFromHead;
@@ -1102,6 +1104,7 @@ struct TEvPQ {
11021104
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
11031105
ui32 Cookie; // InternalPartitionId
11041106
TString Message;
1107+
TActorId SupportivePartition;
11051108

11061109
TEvGetWriteInfoError(ui32 cookie, TString message) :
11071110
Cookie(cookie),

ydb/core/persqueue/partition.cpp

Lines changed: 150 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
581581
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
582582

583583
FillReadFromTimestamps(ctx);
584-
ResendPendingEvents(ctx);
584+
ProcessPendingEvents(ctx);
585585
ProcessTxsAndUserActs(ctx);
586586

587587
ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
@@ -969,37 +969,69 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
969969
ProcessTxsAndUserActs(ctx);
970970
}
971971

972+
template <class T>
973+
void TPartition::ProcessPendingEvent(TAutoPtr<TEventHandle<T>>& ev, const TActorContext& ctx)
974+
{
975+
if (PendingEvents.empty()) {
976+
// Optimization: if the queue is empty, you can process the message immediately
977+
ProcessPendingEvent(std::unique_ptr<T>(ev->Release().Release()), ctx);
978+
} else {
979+
// We need to keep the order in which the messages arrived
980+
AddPendingEvent(ev);
981+
ProcessPendingEvents(ctx);
982+
}
983+
}
984+
985+
template <>
986+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvProposePartitionConfig> ev, const TActorContext& ctx)
987+
{
988+
PushBackDistrTx(ev.release());
989+
990+
ProcessTxsAndUserActs(ctx);
991+
}
992+
972993
void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx)
973994
{
974995
PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfig" <<
975996
" Step " << ev->Get()->Step <<
976997
", TxId " << ev->Get()->TxId);
977998

978-
PushBackDistrTx(ev->Release());
999+
ProcessPendingEvent(ev, ctx);
1000+
}
9791001

980-
ProcessTxsAndUserActs(ctx);
1002+
template <class T>
1003+
void TPartition::AddPendingEvent(TAutoPtr<TEventHandle<T>>& ev)
1004+
{
1005+
std::unique_ptr<T> p(ev->Release().Release());
1006+
PendingEvents.emplace_back(std::move(p));
9811007
}
9821008

9831009
void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
9841010
{
9851011
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");
9861012

987-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1013+
AddPendingEvent(ev);
9881014
}
9891015

9901016
void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
9911017
{
992-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1018+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit");
1019+
1020+
AddPendingEvent(ev);
9931021
}
9941022

9951023
void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&)
9961024
{
997-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1025+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxRollback");
1026+
1027+
AddPendingEvent(ev);
9981028
}
9991029

10001030
void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
10011031
{
1002-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1032+
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig");
1033+
1034+
AddPendingEvent(ev);
10031035
}
10041036

10051037
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& /* ctx */)
@@ -1009,7 +1041,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc
10091041
Y_ABORT_UNLESS(IsSupportive());
10101042

10111043
ev->Get()->OriginalPartition = ev->Sender;
1012-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1044+
AddPendingEvent(ev);
10131045
}
10141046

10151047
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& /* ctx */)
@@ -1018,7 +1050,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TA
10181050

10191051
Y_ABORT_UNLESS(!IsSupportive());
10201052

1021-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1053+
AddPendingEvent(ev);
10221054
}
10231055

10241056
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& /* ctx */)
@@ -1027,43 +1059,46 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActo
10271059

10281060
Y_ABORT_UNLESS(!IsSupportive());
10291061

1030-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1062+
AddPendingEvent(ev);
10311063
}
10321064

1033-
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
1065+
template <>
1066+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate> ev, const TActorContext& ctx)
10341067
{
1035-
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
1036-
" Step " << ev->Get()->Step <<
1037-
", TxId " << ev->Get()->TxId);
1038-
10391068
if (PlanStep.Defined() && TxId.Defined()) {
1040-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1069+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10411070
Send(Tablet,
1042-
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
1043-
ev->Get()->TxId,
1071+
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Step,
1072+
ev->TxId,
10441073
Partition,
10451074
Nothing()).Release());
10461075
return;
10471076
}
10481077
}
10491078

1050-
PushBackDistrTx(ev->Release());
1079+
PushBackDistrTx(ev.release());
10511080

10521081
ProcessTxsAndUserActs(ctx);
10531082
}
10541083

1055-
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1084+
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
10561085
{
1057-
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1086+
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
10581087
" Step " << ev->Get()->Step <<
10591088
", TxId " << ev->Get()->TxId);
10601089

1090+
ProcessPendingEvent(ev, ctx);
1091+
}
1092+
1093+
template <>
1094+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCommit> ev, const TActorContext& ctx)
1095+
{
10611096
if (PlanStep.Defined() && TxId.Defined()) {
1062-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1097+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10631098
PQ_LOG_D("Send TEvTxCommitDone" <<
1064-
" Step " << ev->Get()->Step <<
1065-
", TxId " << ev->Get()->TxId);
1066-
ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release());
1099+
" Step " << ev->Step <<
1100+
", TxId " << ev->TxId);
1101+
ctx.Send(Tablet, MakeCommitDone(ev->Step, ev->TxId).Release());
10671102
return;
10681103
}
10691104
}
@@ -1073,33 +1108,42 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
10731108
Y_ABORT_UNLESS(TransactionsInflight.size() == 1,
10741109
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10751110
TabletID, Partition.OriginalPartitionId,
1076-
ev->Get()->Step, ev->Get()->TxId);
1077-
PendingExplicitMessageGroups = ev->Get()->ExplicitMessageGroups;
1111+
ev->Step, ev->TxId);
1112+
PendingExplicitMessageGroups = ev->ExplicitMessageGroups;
10781113
} else {
10791114
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
10801115
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10811116
TabletID, Partition.OriginalPartitionId,
1082-
ev->Get()->Step, ev->Get()->TxId);
1083-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1117+
ev->Step, ev->TxId);
1118+
txIter = TransactionsInflight.find(ev->TxId);
10841119
Y_ABORT_UNLESS(!txIter.IsEnd(),
10851120
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10861121
TabletID, Partition.OriginalPartitionId,
1087-
ev->Get()->Step, ev->Get()->TxId);
1122+
ev->Step, ev->TxId);
10881123
}
10891124
Y_ABORT_UNLESS(txIter->second->State == ECommitState::Pending);
10901125

10911126
txIter->second->State = ECommitState::Committed;
10921127
ProcessTxsAndUserActs(ctx);
10931128
}
10941129

1095-
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1130+
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1131+
{
1132+
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1133+
" Step " << ev->Get()->Step <<
1134+
", TxId " << ev->Get()->TxId);
1135+
1136+
ProcessPendingEvent(ev, ctx);
1137+
}
1138+
1139+
template <>
1140+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, const TActorContext& ctx)
10961141
{
1097-
auto* event = ev->Get();
10981142
if (PlanStep.Defined() && TxId.Defined()) {
1099-
if (GetStepAndTxId(*event) < GetStepAndTxId(*PlanStep, *TxId)) {
1143+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
11001144
PQ_LOG_D("Rollback for" <<
1101-
" Step " << ev->Get()->Step <<
1102-
", TxId " << ev->Get()->TxId);
1145+
" Step " << ev->Step <<
1146+
", TxId " << ev->TxId);
11031147
return;
11041148
}
11051149
}
@@ -1113,7 +1157,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
11131157
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
11141158
"PQ: %" PRIu64 ", Partition: %" PRIu32,
11151159
TabletID, Partition.OriginalPartitionId);
1116-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1160+
txIter = TransactionsInflight.find(ev->TxId);
11171161
Y_ABORT_UNLESS(!txIter.IsEnd(),
11181162
"PQ: %" PRIu64 ", Partition: %" PRIu32,
11191163
TabletID, Partition.OriginalPartitionId);
@@ -1124,13 +1168,17 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
11241168
ProcessTxsAndUserActs(ctx);
11251169
}
11261170

1127-
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1128-
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1129-
TActorId originalPartition = ev->Get()->OriginalPartition;
1130-
if (!originalPartition) {
1131-
// original message
1132-
originalPartition = ev->Sender;
1133-
}
1171+
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1172+
{
1173+
ProcessPendingEvent(ev, ctx);
1174+
}
1175+
1176+
template <>
1177+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoRequest> ev, const TActorContext& ctx)
1178+
{
1179+
TActorId originalPartition = ev->OriginalPartition;
1180+
Y_ABORT_UNLESS(originalPartition != TActorId());
1181+
11341182
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
11351183
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
11361184
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
@@ -1162,6 +1210,14 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
11621210
ctx.Send(originalPartition, response);
11631211
}
11641212

1213+
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1214+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1215+
1216+
ev->Get()->OriginalPartition = ev->Sender;
1217+
1218+
ProcessPendingEvent(ev, ctx);
1219+
}
1220+
11651221
void TPartition::WriteInfoResponseHandler(
11661222
const TActorId& sender,
11671223
TGetWriteInfoResp&& ev,
@@ -1246,17 +1302,36 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
12461302
return ret;
12471303
}
12481304

1305+
template <>
1306+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoResponse> ev, const TActorContext& ctx)
1307+
{
1308+
const auto sender = ev->SupportivePartition;
1309+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1310+
}
1311+
12491312
void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) {
12501313
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse");
1251-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1314+
1315+
ev->Get()->SupportivePartition = ev->Sender;
1316+
1317+
ProcessPendingEvent(ev, ctx);
12521318
}
12531319

1320+
template <>
1321+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoError> ev, const TActorContext& ctx)
1322+
{
1323+
const auto sender = ev->SupportivePartition;
1324+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1325+
}
12541326

12551327
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
12561328
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
12571329
"Cookie " << ev->Get()->Cookie <<
12581330
", Message " << ev->Get()->Message);
1259-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1331+
1332+
ev->Get()->SupportivePartition = ev->Sender;
1333+
1334+
ProcessPendingEvent(ev, ctx);
12601335
}
12611336

12621337
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
@@ -2692,16 +2767,6 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
26922767
TxIdHasChanged = true;
26932768
}
26942769

2695-
void TPartition::ResendPendingEvents(const TActorContext& ctx)
2696-
{
2697-
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());
2698-
2699-
while (!PendingEvents.empty()) {
2700-
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
2701-
PendingEvents.pop_front();
2702-
}
2703-
}
2704-
27052770
TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx)
27062771
{
27072772
if (AffectedUsers.size() >= MAX_USERS) {
@@ -3554,14 +3619,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
35543619

35553620
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
35563621
{
3622+
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3623+
35573624
Y_ABORT_UNLESS(IsSupportive());
35583625

3559-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3626+
AddPendingEvent(ev);
35603627
}
35613628

3562-
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
3629+
template <>
3630+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition> ev, const TActorContext& ctx)
35633631
{
3564-
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3632+
Y_UNUSED(ev);
35653633

35663634
Y_ABORT_UNLESS(IsSupportive());
35673635
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
@@ -3571,6 +3639,13 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& c
35713639
ProcessTxsAndUserActs(ctx);
35723640
}
35733641

3642+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
3643+
{
3644+
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3645+
3646+
ProcessPendingEvent(ev, ctx);
3647+
}
3648+
35743649
void TPartition::ScheduleNegativeReplies()
35753650
{
35763651
auto processQueue = [&](std::deque<TUserActionAndTransactionEvent>& queue) {
@@ -3641,6 +3716,23 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac
36413716
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
36423717
}
36433718

3719+
void TPartition::ProcessPendingEvents(const TActorContext& ctx)
3720+
{
3721+
PQ_LOG_D("Process pending events. Count " << PendingEvents.size());
3722+
3723+
while (!PendingEvents.empty()) {
3724+
auto ev = std::move(PendingEvents.front());
3725+
PendingEvents.pop_front();
3726+
3727+
auto visitor = [this, &ctx](auto&& v) {
3728+
using T = std::decay_t<decltype(v)>;
3729+
ProcessPendingEvent(std::forward<T>(v), ctx);
3730+
};
3731+
3732+
std::visit(visitor, std::move(ev));
3733+
}
3734+
}
3735+
36443736
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
36453737
{
36463738
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

0 commit comments

Comments
 (0)