Skip to content

Commit 018a84f

Browse files
[*] optimization
1 parent 6c0e5a8 commit 018a84f

File tree

2 files changed

+22
-16
lines changed

2 files changed

+22
-16
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,19 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
969969
ProcessTxsAndUserActs(ctx);
970970
}
971971

972+
template <class T>
973+
void TPartition::ProcessPendingEvent(TAutoPtr<TEventHandle<T>>& ev, const TActorContext& ctx)
974+
{
975+
if (PendingEvents.empty()) {
976+
// Optimization: if the queue is empty, you can process the message immediately
977+
ProcessPendingEvent(std::unique_ptr<T>(ev->Release().Release()), ctx);
978+
} else {
979+
// We need to keep the order in which the messages arrived
980+
AddPendingEvent(ev);
981+
ProcessPendingEvents(ctx);
982+
}
983+
}
984+
972985
template <>
973986
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvProposePartitionConfig> ev, const TActorContext& ctx)
974987
{
@@ -983,8 +996,7 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor
983996
" Step " << ev->Get()->Step <<
984997
", TxId " << ev->Get()->TxId);
985998

986-
AddPendingEvent(ev);
987-
ProcessPendingEvents(ctx);
999+
ProcessPendingEvent(ev, ctx);
9881000
}
9891001

9901002
template <class T>
@@ -1075,8 +1087,7 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext
10751087
" Step " << ev->Get()->Step <<
10761088
", TxId " << ev->Get()->TxId);
10771089

1078-
AddPendingEvent(ev);
1079-
ProcessPendingEvents(ctx);
1090+
ProcessPendingEvent(ev, ctx);
10801091
}
10811092

10821093
template <>
@@ -1122,8 +1133,7 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
11221133
" Step " << ev->Get()->Step <<
11231134
", TxId " << ev->Get()->TxId);
11241135

1125-
AddPendingEvent(ev);
1126-
ProcessPendingEvents(ctx);
1136+
ProcessPendingEvent(ev, ctx);
11271137
}
11281138

11291139
template <>
@@ -1160,8 +1170,7 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, c
11601170

11611171
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
11621172
{
1163-
AddPendingEvent(ev);
1164-
ProcessPendingEvents(ctx);
1173+
ProcessPendingEvent(ev, ctx);
11651174
}
11661175

11671176
template <>
@@ -1206,8 +1215,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
12061215

12071216
ev->Get()->OriginalPartition = ev->Sender;
12081217

1209-
AddPendingEvent(ev);
1210-
ProcessPendingEvents(ctx);
1218+
ProcessPendingEvent(ev, ctx);
12111219
}
12121220

12131221
void TPartition::WriteInfoResponseHandler(
@@ -1310,8 +1318,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
13101318

13111319
ev->Get()->SupportivePartition = ev->Sender;
13121320

1313-
AddPendingEvent(ev);
1314-
ProcessPendingEvents(ctx);
1321+
ProcessPendingEvent(ev, ctx);
13151322
}
13161323

13171324
template <>
@@ -1328,8 +1335,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
13281335

13291336
ev->Get()->SupportivePartition = ev->Sender;
13301337

1331-
AddPendingEvent(ev);
1332-
ProcessPendingEvents(ctx);
1338+
ProcessPendingEvent(ev, ctx);
13331339
}
13341340

13351341
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
@@ -3643,8 +3649,7 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext
36433649
{
36443650
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
36453651

3646-
AddPendingEvent(ev);
3647-
ProcessPendingEvents(ctx);
3652+
ProcessPendingEvent(ev, ctx);
36483653
}
36493654

36503655
void TPartition::ScheduleNegativeReplies()

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
952952

953953
template <class T> void AddPendingEvent(TAutoPtr<TEventHandle<T>>& ev);
954954
template <class T> void ProcessPendingEvent(std::unique_ptr<T> ev, const TActorContext& ctx);
955+
template <class T> void ProcessPendingEvent(TAutoPtr<TEventHandle<T>>& ev, const TActorContext& ctx);
955956
void ProcessPendingEvents(const TActorContext& ctx);
956957

957958
TRowVersion LastEmittedHeartbeat;

0 commit comments

Comments
 (0)