Skip to content

Commit f1be286

Browse files
authored
pq sdk deadlock has been fixed (between Close and OnReadDone) (#3115)
1 parent b38dea6 commit f1be286

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,10 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
739739
EventsQueue.clear();
740740
}
741741

742+
TRawPartitionStreamEventQueue<UseMigrationProtocol> ExtractQueue() noexcept {
743+
return std::move(EventsQueue);
744+
}
745+
742746
static void GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
743747
size_t& maxEventsCount,
744748
size_t& maxByteSize,
@@ -796,14 +800,16 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
796800

797801
bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
798802
TWaiter waiter;
803+
TVector<TRawPartitionStreamEventQueue<UseMigrationProtocol>> defferedDelete;
799804
with_lock (TParent::Mutex) {
800805
if (TParent::Closed) {
801806
return false;
802807
}
808+
defferedDelete.reserve(TParent::Events.size());
803809
while (!TParent::Events.empty()) {
804810
auto& event = TParent::Events.front();
805811
if (!event.IsEmpty()) {
806-
event.PartitionStream->ClearQueue();
812+
defferedDelete.push_back(event.PartitionStream->ExtractQueue());
807813
}
808814
TParent::Events.pop();
809815
}
@@ -812,6 +818,9 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
812818
waiter = TWaiter(TParent::Waiter.ExtractPromise(), this);
813819
}
814820

821+
// Delayed deletion is necessary to avoid deadlock with PushEvent
822+
defferedDelete.clear();
823+
815824
TReadSessionEventInfo<UseMigrationProtocol> info(event);
816825
ApplyHandler(info, deferred);
817826
deferred.DeferSignalWaiter(std::move(waiter));

0 commit comments

Comments
 (0)