Skip to content

Commit 6c0e5a8

Browse files
[*] deferred message queue
1 parent 5451705 commit 6c0e5a8

File tree

1 file changed

+19
-8
lines changed

1 file changed

+19
-8
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,8 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor
983983
" Step " << ev->Get()->Step <<
984984
", TxId " << ev->Get()->TxId);
985985

986-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvProposePartitionConfig>(ev->Release().Release()), ctx);
986+
AddPendingEvent(ev);
987+
ProcessPendingEvents(ctx);
987988
}
988989

989990
template <class T>
@@ -1074,7 +1075,8 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext
10741075
" Step " << ev->Get()->Step <<
10751076
", TxId " << ev->Get()->TxId);
10761077

1077-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate>(ev->Release().Release()), ctx);
1078+
AddPendingEvent(ev);
1079+
ProcessPendingEvents(ctx);
10781080
}
10791081

10801082
template <>
@@ -1120,7 +1122,8 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
11201122
" Step " << ev->Get()->Step <<
11211123
", TxId " << ev->Get()->TxId);
11221124

1123-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCommit>(ev->Release().Release()), ctx);
1125+
AddPendingEvent(ev);
1126+
ProcessPendingEvents(ctx);
11241127
}
11251128

11261129
template <>
@@ -1157,7 +1160,8 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, c
11571160

11581161
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
11591162
{
1160-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback>(ev->Release().Release()), ctx);
1163+
AddPendingEvent(ev);
1164+
ProcessPendingEvents(ctx);
11611165
}
11621166

11631167
template <>
@@ -1201,7 +1205,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
12011205
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
12021206

12031207
ev->Get()->OriginalPartition = ev->Sender;
1204-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoRequest>(ev->Release().Release()), ctx);
1208+
1209+
AddPendingEvent(ev);
1210+
ProcessPendingEvents(ctx);
12051211
}
12061212

12071213
void TPartition::WriteInfoResponseHandler(
@@ -1303,7 +1309,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
13031309
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse");
13041310

13051311
ev->Get()->SupportivePartition = ev->Sender;
1306-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoResponse>(ev->Release().Release()), ctx);
1312+
1313+
AddPendingEvent(ev);
1314+
ProcessPendingEvents(ctx);
13071315
}
13081316

13091317
template <>
@@ -1319,7 +1327,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
13191327
", Message " << ev->Get()->Message);
13201328

13211329
ev->Get()->SupportivePartition = ev->Sender;
1322-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoError>(ev->Release().Release()), ctx);
1330+
1331+
AddPendingEvent(ev);
1332+
ProcessPendingEvents(ctx);
13231333
}
13241334

13251335
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
@@ -3633,7 +3643,8 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext
36333643
{
36343644
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
36353645

3636-
ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition>(ev->Release().Release()), ctx);
3646+
AddPendingEvent(ev);
3647+
ProcessPendingEvents(ctx);
36373648
}
36383649

36393650
void TPartition::ScheduleNegativeReplies()

0 commit comments

Comments
 (0)