Skip to content

Commit bbb347b

Browse files
doroolegildar-khisambeev
authored andcommitted
pq sdk deadlock has been fixed (between Close and OnReadDone) (#3115)
1 parent 6b379df commit bbb347b

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,10 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
729729
EventsQueue.clear();
730730
}
731731

732+
TRawPartitionStreamEventQueue<UseMigrationProtocol> ExtractQueue() noexcept {
733+
return std::move(EventsQueue);
734+
}
735+
732736
static void GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
733737
size_t& maxEventsCount,
734738
size_t& maxByteSize,
@@ -786,14 +790,16 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
786790

787791
bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
788792
TWaiter waiter;
793+
TVector<TRawPartitionStreamEventQueue<UseMigrationProtocol>> defferedDelete;
789794
with_lock (TParent::Mutex) {
790795
if (TParent::Closed) {
791796
return false;
792797
}
798+
defferedDelete.reserve(TParent::Events.size());
793799
while (!TParent::Events.empty()) {
794800
auto& event = TParent::Events.front();
795801
if (!event.IsEmpty()) {
796-
event.PartitionStream->ClearQueue();
802+
defferedDelete.push_back(event.PartitionStream->ExtractQueue());
797803
}
798804
TParent::Events.pop();
799805
}
@@ -802,6 +808,9 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
802808
waiter = TWaiter(TParent::Waiter.ExtractPromise(), this);
803809
}
804810

811+
// Delayed deletion is necessary to avoid deadlock with PushEvent
812+
defferedDelete.clear();
813+
805814
TReadSessionEventInfo<UseMigrationProtocol> info(event);
806815
ApplyHandler(info, deferred);
807816
deferred.DeferSignalWaiter(std::move(waiter));

0 commit comments

Comments
 (0)