Skip to content

Commit b698251

Browse files
At the start of a partition, the order of messages may change. (ydb-platform#15160)
1 parent bd3c9d7 commit b698251

File tree

3 files changed

+171
-59
lines changed

3 files changed

+171
-59
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,8 @@ struct TEvPQ {
10751075
}
10761076

10771077
ui32 Cookie; // InternalPartitionId
1078+
TActorId SupportivePartition;
1079+
10781080
NPQ::TSourceIdMap SrcIdInfo;
10791081
std::deque<NPQ::TDataKey> BodyKeys;
10801082
TVector<NPQ::TClientBlob> BlobsFromHead;
@@ -1091,6 +1093,7 @@ struct TEvPQ {
10911093
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
10921094
ui32 Cookie; // InternalPartitionId
10931095
TString Message;
1096+
TActorId SupportivePartition;
10941097

10951098
TEvGetWriteInfoError(ui32 cookie, TString message) :
10961099
Cookie(cookie),

ydb/core/persqueue/partition.cpp

Lines changed: 150 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
577577
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
578578

579579
FillReadFromTimestamps(ctx);
580-
ResendPendingEvents(ctx);
580+
ProcessPendingEvents(ctx);
581581
ProcessTxsAndUserActs(ctx);
582582

583583
ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
@@ -951,37 +951,69 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
951951
ProcessTxsAndUserActs(ctx);
952952
}
953953

954+
template <class T>
955+
void TPartition::ProcessPendingEvent(TAutoPtr<TEventHandle<T>>& ev, const TActorContext& ctx)
956+
{
957+
if (PendingEvents.empty()) {
958+
// Optimization: if the queue is empty, you can process the message immediately
959+
ProcessPendingEvent(std::unique_ptr<T>(ev->Release().Release()), ctx);
960+
} else {
961+
// We need to keep the order in which the messages arrived
962+
AddPendingEvent(ev);
963+
ProcessPendingEvents(ctx);
964+
}
965+
}
966+
967+
template <>
968+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvProposePartitionConfig> ev, const TActorContext& ctx)
969+
{
970+
PushBackDistrTx(ev.release());
971+
972+
ProcessTxsAndUserActs(ctx);
973+
}
974+
954975
void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx)
955976
{
956977
PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfig" <<
957978
" Step " << ev->Get()->Step <<
958979
", TxId " << ev->Get()->TxId);
959980

960-
PushBackDistrTx(ev->Release());
981+
ProcessPendingEvent(ev, ctx);
982+
}
961983

962-
ProcessTxsAndUserActs(ctx);
984+
template <class T>
985+
void TPartition::AddPendingEvent(TAutoPtr<TEventHandle<T>>& ev)
986+
{
987+
std::unique_ptr<T> p(ev->Release().Release());
988+
PendingEvents.emplace_back(std::move(p));
963989
}
964990

965991
void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
966992
{
967993
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");
968994

969-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
995+
AddPendingEvent(ev);
970996
}
971997

972998
void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
973999
{
974-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1000+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit");
1001+
1002+
AddPendingEvent(ev);
9751003
}
9761004

9771005
void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&)
9781006
{
979-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1007+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxRollback");
1008+
1009+
AddPendingEvent(ev);
9801010
}
9811011

9821012
void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
9831013
{
984-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1014+
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig");
1015+
1016+
AddPendingEvent(ev);
9851017
}
9861018

9871019
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext&)
@@ -991,7 +1023,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc
9911023
Y_ABORT_UNLESS(IsSupportive());
9921024

9931025
ev->Get()->OriginalPartition = ev->Sender;
994-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1026+
AddPendingEvent(ev);
9951027
}
9961028

9971029
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext&)
@@ -1000,7 +1032,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TA
10001032

10011033
Y_ABORT_UNLESS(!IsSupportive());
10021034

1003-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1035+
AddPendingEvent(ev);
10041036
}
10051037

10061038
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext&)
@@ -1009,43 +1041,46 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActo
10091041

10101042
Y_ABORT_UNLESS(!IsSupportive());
10111043

1012-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1044+
AddPendingEvent(ev);
10131045
}
10141046

1015-
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
1047+
template <>
1048+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate> ev, const TActorContext& ctx)
10161049
{
1017-
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
1018-
" Step " << ev->Get()->Step <<
1019-
", TxId " << ev->Get()->TxId);
1020-
10211050
if (PlanStep.Defined() && TxId.Defined()) {
1022-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1051+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10231052
Send(Tablet,
1024-
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
1025-
ev->Get()->TxId,
1053+
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Step,
1054+
ev->TxId,
10261055
Partition,
10271056
Nothing()).Release());
10281057
return;
10291058
}
10301059
}
10311060

1032-
PushBackDistrTx(ev->Release());
1061+
PushBackDistrTx(ev.release());
10331062

10341063
ProcessTxsAndUserActs(ctx);
10351064
}
10361065

1037-
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1066+
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
10381067
{
1039-
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1068+
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
10401069
" Step " << ev->Get()->Step <<
10411070
", TxId " << ev->Get()->TxId);
10421071

1072+
ProcessPendingEvent(ev, ctx);
1073+
}
1074+
1075+
template <>
1076+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCommit> ev, const TActorContext& ctx)
1077+
{
10431078
if (PlanStep.Defined() && TxId.Defined()) {
1044-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1079+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10451080
PQ_LOG_D("Send TEvTxCommitDone" <<
1046-
" Step " << ev->Get()->Step <<
1047-
", TxId " << ev->Get()->TxId);
1048-
ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release());
1081+
" Step " << ev->Step <<
1082+
", TxId " << ev->TxId);
1083+
ctx.Send(Tablet, MakeCommitDone(ev->Step, ev->TxId).Release());
10491084
return;
10501085
}
10511086
}
@@ -1055,32 +1090,42 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
10551090
Y_ABORT_UNLESS(TransactionsInflight.size() == 1,
10561091
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10571092
TabletID, Partition.OriginalPartitionId,
1058-
ev->Get()->Step, ev->Get()->TxId);
1093+
ev->Step, ev->TxId);
1094+
PendingExplicitMessageGroups = ev->ExplicitMessageGroups;
10591095
} else {
10601096
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
10611097
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10621098
TabletID, Partition.OriginalPartitionId,
1063-
ev->Get()->Step, ev->Get()->TxId);
1064-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1099+
ev->Step, ev->TxId);
1100+
txIter = TransactionsInflight.find(ev->TxId);
10651101
Y_ABORT_UNLESS(!txIter.IsEnd(),
10661102
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10671103
TabletID, Partition.OriginalPartitionId,
1068-
ev->Get()->Step, ev->Get()->TxId);
1104+
ev->Step, ev->TxId);
10691105
}
10701106
Y_ABORT_UNLESS(txIter->second->State == ECommitState::Pending);
10711107

10721108
txIter->second->State = ECommitState::Committed;
10731109
ProcessTxsAndUserActs(ctx);
10741110
}
10751111

1076-
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1112+
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1113+
{
1114+
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1115+
" Step " << ev->Get()->Step <<
1116+
", TxId " << ev->Get()->TxId);
1117+
1118+
ProcessPendingEvent(ev, ctx);
1119+
}
1120+
1121+
template <>
1122+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, const TActorContext& ctx)
10771123
{
1078-
auto* event = ev->Get();
10791124
if (PlanStep.Defined() && TxId.Defined()) {
1080-
if (GetStepAndTxId(*event) < GetStepAndTxId(*PlanStep, *TxId)) {
1125+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10811126
PQ_LOG_D("Rollback for" <<
1082-
" Step " << ev->Get()->Step <<
1083-
", TxId " << ev->Get()->TxId);
1127+
" Step " << ev->Step <<
1128+
", TxId " << ev->TxId);
10841129
return;
10851130
}
10861131
}
@@ -1094,7 +1139,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
10941139
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
10951140
"PQ: %" PRIu64 ", Partition: %" PRIu32,
10961141
TabletID, Partition.OriginalPartitionId);
1097-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1142+
txIter = TransactionsInflight.find(ev->TxId);
10981143
Y_ABORT_UNLESS(!txIter.IsEnd(),
10991144
"PQ: %" PRIu64 ", Partition: %" PRIu32,
11001145
TabletID, Partition.OriginalPartitionId);
@@ -1105,13 +1150,17 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
11051150
ProcessTxsAndUserActs(ctx);
11061151
}
11071152

1108-
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1109-
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1110-
TActorId originalPartition = ev->Get()->OriginalPartition;
1111-
if (!originalPartition) {
1112-
// original message
1113-
originalPartition = ev->Sender;
1114-
}
1153+
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1154+
{
1155+
ProcessPendingEvent(ev, ctx);
1156+
}
1157+
1158+
template <>
1159+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoRequest> ev, const TActorContext& ctx)
1160+
{
1161+
TActorId originalPartition = ev->OriginalPartition;
1162+
Y_ABORT_UNLESS(originalPartition != TActorId());
1163+
11151164
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
11161165
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
11171166
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
@@ -1143,6 +1192,14 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
11431192
ctx.Send(originalPartition, response);
11441193
}
11451194

1195+
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1196+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1197+
1198+
ev->Get()->OriginalPartition = ev->Sender;
1199+
1200+
ProcessPendingEvent(ev, ctx);
1201+
}
1202+
11461203
void TPartition::WriteInfoResponseHandler(
11471204
const TActorId& sender,
11481205
TGetWriteInfoResp&& ev,
@@ -1231,17 +1288,36 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
12311288
return ret;
12321289
}
12331290

1291+
template <>
1292+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoResponse> ev, const TActorContext& ctx)
1293+
{
1294+
const auto sender = ev->SupportivePartition;
1295+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1296+
}
1297+
12341298
void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) {
12351299
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse");
1236-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1300+
1301+
ev->Get()->SupportivePartition = ev->Sender;
1302+
1303+
ProcessPendingEvent(ev, ctx);
12371304
}
12381305

1306+
template <>
1307+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoError> ev, const TActorContext& ctx)
1308+
{
1309+
const auto sender = ev->SupportivePartition;
1310+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1311+
}
12391312

12401313
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
12411314
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
12421315
"Cookie " << ev->Get()->Cookie <<
12431316
", Message " << ev->Get()->Message);
1244-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1317+
1318+
ev->Get()->SupportivePartition = ev->Sender;
1319+
1320+
ProcessPendingEvent(ev, ctx);
12451321
}
12461322

12471323
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
@@ -2619,16 +2695,6 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
26192695
TxIdHasChanged = true;
26202696
}
26212697

2622-
void TPartition::ResendPendingEvents(const TActorContext& ctx)
2623-
{
2624-
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());
2625-
2626-
while (!PendingEvents.empty()) {
2627-
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
2628-
PendingEvents.pop_front();
2629-
}
2630-
}
2631-
26322698
TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx)
26332699
{
26342700
if (AffectedUsers.size() >= MAX_USERS) {
@@ -3495,14 +3561,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
34953561

34963562
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
34973563
{
3564+
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3565+
34983566
Y_ABORT_UNLESS(IsSupportive());
34993567

3500-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3568+
AddPendingEvent(ev);
35013569
}
35023570

3503-
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
3571+
template <>
3572+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition> ev, const TActorContext& ctx)
35043573
{
3505-
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3574+
Y_UNUSED(ev);
35063575

35073576
Y_ABORT_UNLESS(IsSupportive());
35083577
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
@@ -3512,6 +3581,13 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& c
35123581
ProcessTxsAndUserActs(ctx);
35133582
}
35143583

3584+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
3585+
{
3586+
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3587+
3588+
ProcessPendingEvent(ev, ctx);
3589+
}
3590+
35153591
void TPartition::ScheduleNegativeReplies()
35163592
{
35173593
auto processQueue = [&](std::deque<TUserActionAndTransactionEvent>& queue) {
@@ -3582,6 +3658,23 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac
35823658
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
35833659
}
35843660

3661+
void TPartition::ProcessPendingEvents(const TActorContext& ctx)
3662+
{
3663+
PQ_LOG_D("Process pending events. Count " << PendingEvents.size());
3664+
3665+
while (!PendingEvents.empty()) {
3666+
auto ev = std::move(PendingEvents.front());
3667+
PendingEvents.pop_front();
3668+
3669+
auto visitor = [this, &ctx](auto&& v) {
3670+
using T = std::decay_t<decltype(v)>;
3671+
ProcessPendingEvent(std::forward<T>(v), ctx);
3672+
};
3673+
3674+
std::visit(visitor, std::move(ev));
3675+
}
3676+
}
3677+
35853678
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
35863679
{
35873680
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

0 commit comments

Comments
 (0)