Skip to content

Commit fd35849

Browse files
Alek5andr-Kotovkunga
authored andcommitted
At the start of a partition, the order of messages may change. (ydb-platform#15160) (ydb-platform#15194)
1 parent 6decbfb commit fd35849

File tree

3 files changed

+170
-59
lines changed

3 files changed

+170
-59
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,8 @@ struct TEvPQ {
10751075
}
10761076

10771077
ui32 Cookie; // InternalPartitionId
1078+
TActorId SupportivePartition;
1079+
10781080
NPQ::TSourceIdMap SrcIdInfo;
10791081
std::deque<NPQ::TDataKey> BodyKeys;
10801082
TVector<NPQ::TClientBlob> BlobsFromHead;
@@ -1091,6 +1093,7 @@ struct TEvPQ {
10911093
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
10921094
ui32 Cookie; // InternalPartitionId
10931095
TString Message;
1096+
TActorId SupportivePartition;
10941097

10951098
TEvGetWriteInfoError(ui32 cookie, TString message) :
10961099
Cookie(cookie),

ydb/core/persqueue/partition.cpp

Lines changed: 149 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
577577
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
578578

579579
FillReadFromTimestamps(ctx);
580-
ResendPendingEvents(ctx);
580+
ProcessPendingEvents(ctx);
581581
ProcessTxsAndUserActs(ctx);
582582

583583
ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
@@ -951,37 +951,69 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
951951
ProcessTxsAndUserActs(ctx);
952952
}
953953

954+
template <class T>
955+
void TPartition::ProcessPendingEvent(TAutoPtr<TEventHandle<T>>& ev, const TActorContext& ctx)
956+
{
957+
if (PendingEvents.empty()) {
958+
// Optimization: if the queue is empty, you can process the message immediately
959+
ProcessPendingEvent(std::unique_ptr<T>(ev->Release().Release()), ctx);
960+
} else {
961+
// We need to keep the order in which the messages arrived
962+
AddPendingEvent(ev);
963+
ProcessPendingEvents(ctx);
964+
}
965+
}
966+
967+
template <>
968+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvProposePartitionConfig> ev, const TActorContext& ctx)
969+
{
970+
PushBackDistrTx(ev.release());
971+
972+
ProcessTxsAndUserActs(ctx);
973+
}
974+
954975
void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx)
955976
{
956977
PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfig" <<
957978
" Step " << ev->Get()->Step <<
958979
", TxId " << ev->Get()->TxId);
959980

960-
PushBackDistrTx(ev->Release());
981+
ProcessPendingEvent(ev, ctx);
982+
}
961983

962-
ProcessTxsAndUserActs(ctx);
984+
template <class T>
985+
void TPartition::AddPendingEvent(TAutoPtr<TEventHandle<T>>& ev)
986+
{
987+
std::unique_ptr<T> p(ev->Release().Release());
988+
PendingEvents.emplace_back(std::move(p));
963989
}
964990

965991
void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
966992
{
967993
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");
968994

969-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
995+
AddPendingEvent(ev);
970996
}
971997

972998
void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
973999
{
974-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1000+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit");
1001+
1002+
AddPendingEvent(ev);
9751003
}
9761004

9771005
void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&)
9781006
{
979-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1007+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxRollback");
1008+
1009+
AddPendingEvent(ev);
9801010
}
9811011

9821012
void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
9831013
{
984-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1014+
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig");
1015+
1016+
AddPendingEvent(ev);
9851017
}
9861018

9871019
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext&)
@@ -991,7 +1023,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc
9911023
Y_ABORT_UNLESS(IsSupportive());
9921024

9931025
ev->Get()->OriginalPartition = ev->Sender;
994-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1026+
AddPendingEvent(ev);
9951027
}
9961028

9971029
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext&)
@@ -1000,7 +1032,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TA
10001032

10011033
Y_ABORT_UNLESS(!IsSupportive());
10021034

1003-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1035+
AddPendingEvent(ev);
10041036
}
10051037

10061038
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext&)
@@ -1009,43 +1041,46 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActo
10091041

10101042
Y_ABORT_UNLESS(!IsSupportive());
10111043

1012-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1044+
AddPendingEvent(ev);
10131045
}
10141046

1015-
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
1047+
template <>
1048+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate> ev, const TActorContext& ctx)
10161049
{
1017-
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
1018-
" Step " << ev->Get()->Step <<
1019-
", TxId " << ev->Get()->TxId);
1020-
10211050
if (PlanStep.Defined() && TxId.Defined()) {
1022-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1051+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10231052
Send(Tablet,
1024-
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
1025-
ev->Get()->TxId,
1053+
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Step,
1054+
ev->TxId,
10261055
Partition,
10271056
Nothing()).Release());
10281057
return;
10291058
}
10301059
}
10311060

1032-
PushBackDistrTx(ev->Release());
1061+
PushBackDistrTx(ev.release());
10331062

10341063
ProcessTxsAndUserActs(ctx);
10351064
}
10361065

1037-
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1066+
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
10381067
{
1039-
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1068+
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
10401069
" Step " << ev->Get()->Step <<
10411070
", TxId " << ev->Get()->TxId);
10421071

1072+
ProcessPendingEvent(ev, ctx);
1073+
}
1074+
1075+
template <>
1076+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCommit> ev, const TActorContext& ctx)
1077+
{
10431078
if (PlanStep.Defined() && TxId.Defined()) {
1044-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1079+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10451080
PQ_LOG_D("Send TEvTxCommitDone" <<
1046-
" Step " << ev->Get()->Step <<
1047-
", TxId " << ev->Get()->TxId);
1048-
ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release());
1081+
" Step " << ev->Step <<
1082+
", TxId " << ev->TxId);
1083+
ctx.Send(Tablet, MakeCommitDone(ev->Step, ev->TxId).Release());
10491084
return;
10501085
}
10511086
}
@@ -1055,32 +1090,41 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
10551090
Y_ABORT_UNLESS(TransactionsInflight.size() == 1,
10561091
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10571092
TabletID, Partition.OriginalPartitionId,
1058-
ev->Get()->Step, ev->Get()->TxId);
1093+
ev->Step, ev->TxId);
10591094
} else {
10601095
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
10611096
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10621097
TabletID, Partition.OriginalPartitionId,
1063-
ev->Get()->Step, ev->Get()->TxId);
1064-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1098+
ev->Step, ev->TxId);
1099+
txIter = TransactionsInflight.find(ev->TxId);
10651100
Y_ABORT_UNLESS(!txIter.IsEnd(),
10661101
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10671102
TabletID, Partition.OriginalPartitionId,
1068-
ev->Get()->Step, ev->Get()->TxId);
1103+
ev->Step, ev->TxId);
10691104
}
10701105
Y_ABORT_UNLESS(txIter->second->State == ECommitState::Pending);
10711106

10721107
txIter->second->State = ECommitState::Committed;
10731108
ProcessTxsAndUserActs(ctx);
10741109
}
10751110

1076-
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1111+
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1112+
{
1113+
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1114+
" Step " << ev->Get()->Step <<
1115+
", TxId " << ev->Get()->TxId);
1116+
1117+
ProcessPendingEvent(ev, ctx);
1118+
}
1119+
1120+
template <>
1121+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, const TActorContext& ctx)
10771122
{
1078-
auto* event = ev->Get();
10791123
if (PlanStep.Defined() && TxId.Defined()) {
1080-
if (GetStepAndTxId(*event) < GetStepAndTxId(*PlanStep, *TxId)) {
1124+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10811125
PQ_LOG_D("Rollback for" <<
1082-
" Step " << ev->Get()->Step <<
1083-
", TxId " << ev->Get()->TxId);
1126+
" Step " << ev->Step <<
1127+
", TxId " << ev->TxId);
10841128
return;
10851129
}
10861130
}
@@ -1094,7 +1138,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
10941138
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
10951139
"PQ: %" PRIu64 ", Partition: %" PRIu32,
10961140
TabletID, Partition.OriginalPartitionId);
1097-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1141+
txIter = TransactionsInflight.find(ev->TxId);
10981142
Y_ABORT_UNLESS(!txIter.IsEnd(),
10991143
"PQ: %" PRIu64 ", Partition: %" PRIu32,
11001144
TabletID, Partition.OriginalPartitionId);
@@ -1105,13 +1149,17 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
11051149
ProcessTxsAndUserActs(ctx);
11061150
}
11071151

1108-
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1109-
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1110-
TActorId originalPartition = ev->Get()->OriginalPartition;
1111-
if (!originalPartition) {
1112-
// original message
1113-
originalPartition = ev->Sender;
1114-
}
1152+
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1153+
{
1154+
ProcessPendingEvent(ev, ctx);
1155+
}
1156+
1157+
template <>
1158+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoRequest> ev, const TActorContext& ctx)
1159+
{
1160+
TActorId originalPartition = ev->OriginalPartition;
1161+
Y_ABORT_UNLESS(originalPartition != TActorId());
1162+
11151163
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
11161164
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
11171165
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
@@ -1143,6 +1191,14 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
11431191
ctx.Send(originalPartition, response);
11441192
}
11451193

1194+
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1195+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1196+
1197+
ev->Get()->OriginalPartition = ev->Sender;
1198+
1199+
ProcessPendingEvent(ev, ctx);
1200+
}
1201+
11461202
void TPartition::WriteInfoResponseHandler(
11471203
const TActorId& sender,
11481204
TGetWriteInfoResp&& ev,
@@ -1231,17 +1287,36 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
12311287
return ret;
12321288
}
12331289

1290+
template <>
1291+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoResponse> ev, const TActorContext& ctx)
1292+
{
1293+
const auto sender = ev->SupportivePartition;
1294+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1295+
}
1296+
12341297
void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) {
12351298
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse");
1236-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1299+
1300+
ev->Get()->SupportivePartition = ev->Sender;
1301+
1302+
ProcessPendingEvent(ev, ctx);
12371303
}
12381304

1305+
template <>
1306+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoError> ev, const TActorContext& ctx)
1307+
{
1308+
const auto sender = ev->SupportivePartition;
1309+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1310+
}
12391311

12401312
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
12411313
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
12421314
"Cookie " << ev->Get()->Cookie <<
12431315
", Message " << ev->Get()->Message);
1244-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1316+
1317+
ev->Get()->SupportivePartition = ev->Sender;
1318+
1319+
ProcessPendingEvent(ev, ctx);
12451320
}
12461321

12471322
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
@@ -2619,16 +2694,6 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
26192694
TxIdHasChanged = true;
26202695
}
26212696

2622-
void TPartition::ResendPendingEvents(const TActorContext& ctx)
2623-
{
2624-
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());
2625-
2626-
while (!PendingEvents.empty()) {
2627-
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
2628-
PendingEvents.pop_front();
2629-
}
2630-
}
2631-
26322697
TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx)
26332698
{
26342699
if (AffectedUsers.size() >= MAX_USERS) {
@@ -3495,14 +3560,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
34953560

34963561
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
34973562
{
3563+
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3564+
34983565
Y_ABORT_UNLESS(IsSupportive());
34993566

3500-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3567+
AddPendingEvent(ev);
35013568
}
35023569

3503-
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
3570+
template <>
3571+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition> ev, const TActorContext& ctx)
35043572
{
3505-
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3573+
Y_UNUSED(ev);
35063574

35073575
Y_ABORT_UNLESS(IsSupportive());
35083576
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
@@ -3512,6 +3580,13 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& c
35123580
ProcessTxsAndUserActs(ctx);
35133581
}
35143582

3583+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
3584+
{
3585+
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3586+
3587+
ProcessPendingEvent(ev, ctx);
3588+
}
3589+
35153590
void TPartition::ScheduleNegativeReplies()
35163591
{
35173592
auto processQueue = [&](std::deque<TUserActionAndTransactionEvent>& queue) {
@@ -3582,6 +3657,23 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac
35823657
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
35833658
}
35843659

3660+
void TPartition::ProcessPendingEvents(const TActorContext& ctx)
3661+
{
3662+
PQ_LOG_D("Process pending events. Count " << PendingEvents.size());
3663+
3664+
while (!PendingEvents.empty()) {
3665+
auto ev = std::move(PendingEvents.front());
3666+
PendingEvents.pop_front();
3667+
3668+
auto visitor = [this, &ctx](auto&& v) {
3669+
using T = std::decay_t<decltype(v)>;
3670+
ProcessPendingEvent(std::forward<T>(v), ctx);
3671+
};
3672+
3673+
std::visit(visitor, std::move(ev));
3674+
}
3675+
}
3676+
35853677
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
35863678
{
35873679
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

0 commit comments

Comments
 (0)