Skip to content

Commit 91fe302

Browse files
Merge 018a84f into ce94a36
2 parents ce94a36 + 018a84f commit 91fe302

File tree

3 files changed

+171
-60
lines changed

3 files changed

+171
-60
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,8 @@ struct TEvPQ {
10861086
}
10871087

10881088
ui32 Cookie; // InternalPartitionId
1089+
TActorId SupportivePartition;
1090+
10891091
NPQ::TSourceIdMap SrcIdInfo;
10901092
std::deque<NPQ::TDataKey> BodyKeys;
10911093
TVector<NPQ::TClientBlob> BlobsFromHead;
@@ -1102,6 +1104,7 @@ struct TEvPQ {
11021104
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
11031105
ui32 Cookie; // InternalPartitionId
11041106
TString Message;
1107+
TActorId SupportivePartition;
11051108

11061109
TEvGetWriteInfoError(ui32 cookie, TString message) :
11071110
Cookie(cookie),

ydb/core/persqueue/partition.cpp

Lines changed: 150 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
581581
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
582582

583583
FillReadFromTimestamps(ctx);
584-
ResendPendingEvents(ctx);
584+
ProcessPendingEvents(ctx);
585585
ProcessTxsAndUserActs(ctx);
586586

587587
ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
@@ -969,37 +969,69 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
969969
ProcessTxsAndUserActs(ctx);
970970
}
971971

972+
template <class T>
973+
void TPartition::ProcessPendingEvent(TAutoPtr<TEventHandle<T>>& ev, const TActorContext& ctx)
974+
{
975+
if (PendingEvents.empty()) {
976+
// Optimization: if the queue is empty, you can process the message immediately
977+
ProcessPendingEvent(std::unique_ptr<T>(ev->Release().Release()), ctx);
978+
} else {
979+
// We need to keep the order in which the messages arrived
980+
AddPendingEvent(ev);
981+
ProcessPendingEvents(ctx);
982+
}
983+
}
984+
985+
template <>
986+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvProposePartitionConfig> ev, const TActorContext& ctx)
987+
{
988+
PushBackDistrTx(ev.release());
989+
990+
ProcessTxsAndUserActs(ctx);
991+
}
992+
972993
void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx)
973994
{
974995
PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfig" <<
975996
" Step " << ev->Get()->Step <<
976997
", TxId " << ev->Get()->TxId);
977998

978-
PushBackDistrTx(ev->Release());
999+
ProcessPendingEvent(ev, ctx);
1000+
}
9791001

980-
ProcessTxsAndUserActs(ctx);
1002+
template <class T>
1003+
void TPartition::AddPendingEvent(TAutoPtr<TEventHandle<T>>& ev)
1004+
{
1005+
std::unique_ptr<T> p(ev->Release().Release());
1006+
PendingEvents.emplace_back(std::move(p));
9811007
}
9821008

9831009
void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
9841010
{
9851011
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");
9861012

987-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1013+
AddPendingEvent(ev);
9881014
}
9891015

9901016
void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
9911017
{
992-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1018+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit");
1019+
1020+
AddPendingEvent(ev);
9931021
}
9941022

9951023
void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&)
9961024
{
997-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1025+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxRollback");
1026+
1027+
AddPendingEvent(ev);
9981028
}
9991029

10001030
void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
10011031
{
1002-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1032+
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig");
1033+
1034+
AddPendingEvent(ev);
10031035
}
10041036

10051037
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& /* ctx */)
@@ -1009,7 +1041,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc
10091041
Y_ABORT_UNLESS(IsSupportive());
10101042

10111043
ev->Get()->OriginalPartition = ev->Sender;
1012-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1044+
AddPendingEvent(ev);
10131045
}
10141046

10151047
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& /* ctx */)
@@ -1018,7 +1050,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TA
10181050

10191051
Y_ABORT_UNLESS(!IsSupportive());
10201052

1021-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1053+
AddPendingEvent(ev);
10221054
}
10231055

10241056
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& /* ctx */)
@@ -1027,43 +1059,46 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActo
10271059

10281060
Y_ABORT_UNLESS(!IsSupportive());
10291061

1030-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1062+
AddPendingEvent(ev);
10311063
}
10321064

1033-
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
1065+
template <>
1066+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate> ev, const TActorContext& ctx)
10341067
{
1035-
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
1036-
" Step " << ev->Get()->Step <<
1037-
", TxId " << ev->Get()->TxId);
1038-
10391068
if (PlanStep.Defined() && TxId.Defined()) {
1040-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1069+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10411070
Send(Tablet,
1042-
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
1043-
ev->Get()->TxId,
1071+
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Step,
1072+
ev->TxId,
10441073
Partition,
10451074
Nothing()).Release());
10461075
return;
10471076
}
10481077
}
10491078

1050-
PushBackDistrTx(ev->Release());
1079+
PushBackDistrTx(ev.release());
10511080

10521081
ProcessTxsAndUserActs(ctx);
10531082
}
10541083

1055-
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1084+
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
10561085
{
1057-
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1086+
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
10581087
" Step " << ev->Get()->Step <<
10591088
", TxId " << ev->Get()->TxId);
10601089

1090+
ProcessPendingEvent(ev, ctx);
1091+
}
1092+
1093+
template <>
1094+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCommit> ev, const TActorContext& ctx)
1095+
{
10611096
if (PlanStep.Defined() && TxId.Defined()) {
1062-
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
1097+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
10631098
PQ_LOG_D("Send TEvTxCommitDone" <<
1064-
" Step " << ev->Get()->Step <<
1065-
", TxId " << ev->Get()->TxId);
1066-
ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release());
1099+
" Step " << ev->Step <<
1100+
", TxId " << ev->TxId);
1101+
ctx.Send(Tablet, MakeCommitDone(ev->Step, ev->TxId).Release());
10671102
return;
10681103
}
10691104
}
@@ -1073,33 +1108,42 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
10731108
Y_ABORT_UNLESS(TransactionsInflight.size() == 1,
10741109
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10751110
TabletID, Partition.OriginalPartitionId,
1076-
ev->Get()->Step, ev->Get()->TxId);
1077-
PendingExplicitMessageGroups = ev->Get()->ExplicitMessageGroups;
1111+
ev->Step, ev->TxId);
1112+
PendingExplicitMessageGroups = ev->ExplicitMessageGroups;
10781113
} else {
10791114
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
10801115
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10811116
TabletID, Partition.OriginalPartitionId,
1082-
ev->Get()->Step, ev->Get()->TxId);
1083-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1117+
ev->Step, ev->TxId);
1118+
txIter = TransactionsInflight.find(ev->TxId);
10841119
Y_ABORT_UNLESS(!txIter.IsEnd(),
10851120
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
10861121
TabletID, Partition.OriginalPartitionId,
1087-
ev->Get()->Step, ev->Get()->TxId);
1122+
ev->Step, ev->TxId);
10881123
}
10891124
Y_ABORT_UNLESS(txIter->second->State == ECommitState::Pending);
10901125

10911126
txIter->second->State = ECommitState::Committed;
10921127
ProcessTxsAndUserActs(ctx);
10931128
}
10941129

1095-
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1130+
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
1131+
{
1132+
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
1133+
" Step " << ev->Get()->Step <<
1134+
", TxId " << ev->Get()->TxId);
1135+
1136+
ProcessPendingEvent(ev, ctx);
1137+
}
1138+
1139+
template <>
1140+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, const TActorContext& ctx)
10961141
{
1097-
auto* event = ev->Get();
10981142
if (PlanStep.Defined() && TxId.Defined()) {
1099-
if (GetStepAndTxId(*event) < GetStepAndTxId(*PlanStep, *TxId)) {
1143+
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
11001144
PQ_LOG_D("Rollback for" <<
1101-
" Step " << ev->Get()->Step <<
1102-
", TxId " << ev->Get()->TxId);
1145+
" Step " << ev->Step <<
1146+
", TxId " << ev->TxId);
11031147
return;
11041148
}
11051149
}
@@ -1113,7 +1157,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
11131157
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
11141158
"PQ: %" PRIu64 ", Partition: %" PRIu32,
11151159
TabletID, Partition.OriginalPartitionId);
1116-
txIter = TransactionsInflight.find(ev->Get()->TxId);
1160+
txIter = TransactionsInflight.find(ev->TxId);
11171161
Y_ABORT_UNLESS(!txIter.IsEnd(),
11181162
"PQ: %" PRIu64 ", Partition: %" PRIu32,
11191163
TabletID, Partition.OriginalPartitionId);
@@ -1124,13 +1168,17 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
11241168
ProcessTxsAndUserActs(ctx);
11251169
}
11261170

1127-
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1128-
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1129-
TActorId originalPartition = ev->Get()->OriginalPartition;
1130-
if (!originalPartition) {
1131-
// original message
1132-
originalPartition = ev->Sender;
1133-
}
1171+
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
1172+
{
1173+
ProcessPendingEvent(ev, ctx);
1174+
}
1175+
1176+
template <>
1177+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoRequest> ev, const TActorContext& ctx)
1178+
{
1179+
TActorId originalPartition = ev->OriginalPartition;
1180+
Y_ABORT_UNLESS(originalPartition != TActorId());
1181+
11341182
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
11351183
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
11361184
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
@@ -1162,6 +1210,14 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
11621210
ctx.Send(originalPartition, response);
11631211
}
11641212

1213+
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1214+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
1215+
1216+
ev->Get()->OriginalPartition = ev->Sender;
1217+
1218+
ProcessPendingEvent(ev, ctx);
1219+
}
1220+
11651221
void TPartition::WriteInfoResponseHandler(
11661222
const TActorId& sender,
11671223
TGetWriteInfoResp&& ev,
@@ -1250,17 +1306,36 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
12501306
return ret;
12511307
}
12521308

1309+
template <>
1310+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoResponse> ev, const TActorContext& ctx)
1311+
{
1312+
const auto sender = ev->SupportivePartition;
1313+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1314+
}
1315+
12531316
void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) {
12541317
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse");
1255-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1318+
1319+
ev->Get()->SupportivePartition = ev->Sender;
1320+
1321+
ProcessPendingEvent(ev, ctx);
12561322
}
12571323

1324+
template <>
1325+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoError> ev, const TActorContext& ctx)
1326+
{
1327+
const auto sender = ev->SupportivePartition;
1328+
WriteInfoResponseHandler(sender, ev.release(), ctx);
1329+
}
12581330

12591331
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
12601332
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
12611333
"Cookie " << ev->Get()->Cookie <<
12621334
", Message " << ev->Get()->Message);
1263-
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
1335+
1336+
ev->Get()->SupportivePartition = ev->Sender;
1337+
1338+
ProcessPendingEvent(ev, ctx);
12641339
}
12651340

12661341
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
@@ -2698,16 +2773,6 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
26982773
TxIdHasChanged = true;
26992774
}
27002775

2701-
void TPartition::ResendPendingEvents(const TActorContext& ctx)
2702-
{
2703-
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());
2704-
2705-
while (!PendingEvents.empty()) {
2706-
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
2707-
PendingEvents.pop_front();
2708-
}
2709-
}
2710-
27112776
TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx)
27122777
{
27132778
if (AffectedUsers.size() >= MAX_USERS) {
@@ -3560,14 +3625,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
35603625

35613626
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
35623627
{
3628+
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3629+
35633630
Y_ABORT_UNLESS(IsSupportive());
35643631

3565-
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3632+
AddPendingEvent(ev);
35663633
}
35673634

3568-
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
3635+
template <>
3636+
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition> ev, const TActorContext& ctx)
35693637
{
3570-
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3638+
Y_UNUSED(ev);
35713639

35723640
Y_ABORT_UNLESS(IsSupportive());
35733641
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
@@ -3577,6 +3645,13 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& c
35773645
ProcessTxsAndUserActs(ctx);
35783646
}
35793647

3648+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
3649+
{
3650+
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3651+
3652+
ProcessPendingEvent(ev, ctx);
3653+
}
3654+
35803655
void TPartition::ScheduleNegativeReplies()
35813656
{
35823657
auto processQueue = [&](std::deque<TUserActionAndTransactionEvent>& queue) {
@@ -3647,6 +3722,23 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac
36473722
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
36483723
}
36493724

3725+
void TPartition::ProcessPendingEvents(const TActorContext& ctx)
3726+
{
3727+
PQ_LOG_D("Process pending events. Count " << PendingEvents.size());
3728+
3729+
while (!PendingEvents.empty()) {
3730+
auto ev = std::move(PendingEvents.front());
3731+
PendingEvents.pop_front();
3732+
3733+
auto visitor = [this, &ctx](auto&& v) {
3734+
using T = std::decay_t<decltype(v)>;
3735+
ProcessPendingEvent(std::forward<T>(v), ctx);
3736+
};
3737+
3738+
std::visit(visitor, std::move(ev));
3739+
}
3740+
}
3741+
36503742
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
36513743
{
36523744
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

0 commit comments

Comments
 (0)