Skip to content

Commit 0631e36

Browse files
authored
[fix] Avoid resource leakage of AckGroupingTracker (#185)
1 parent 70ccd83 commit 0631e36

File tree

3 files changed

+13
-0
lines changed

3 files changed

+13
-0
lines changed

lib/AckGroupingTrackerEnabled.cc

+6
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
5151
const HandlerBasePtr& handlerPtr, uint64_t consumerId,
5252
long ackGroupingTimeMs, long ackGroupingMaxSize)
5353
: AckGroupingTracker(),
54+
isClosed_(false),
5455
handlerWeakPtr_(handlerPtr),
5556
consumerId_(consumerId),
5657
nextCumulativeAckMsgId_(MessageId::earliest()),
@@ -110,6 +111,7 @@ void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId)
110111
}
111112

112113
void AckGroupingTrackerEnabled::close() {
114+
isClosed_ = true;
113115
this->flush();
114116
std::lock_guard<std::mutex> lock(this->mutexTimer_);
115117
if (this->timer_) {
@@ -164,6 +166,10 @@ void AckGroupingTrackerEnabled::flushAndClean() {
164166
}
165167

166168
void AckGroupingTrackerEnabled::scheduleTimer() {
169+
if (isClosed_) {
170+
return;
171+
}
172+
167173
std::lock_guard<std::mutex> lock(this->mutexTimer_);
168174
this->timer_ = this->executor_->createDeadlineTimer();
169175
this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));

lib/AckGroupingTrackerEnabled.h

+4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <pulsar/MessageId.h>
2323

24+
#include <atomic>
2425
#include <boost/asio/deadline_timer.hpp>
2526
#include <cstdint>
2627
#include <mutex>
@@ -71,6 +72,9 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
7172
//! Method for scheduling grouping timer.
7273
void scheduleTimer();
7374

75+
//! State
76+
std::atomic_bool isClosed_;
77+
7478
//! The connection handler.
7579
HandlerBaseWeakPtr handlerWeakPtr_;
7680

lib/ConsumerImpl.cc

+3
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,9 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
12121212
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
12131213

12141214
void ConsumerImpl::shutdown() {
1215+
if (ackGroupingTrackerPtr_) {
1216+
ackGroupingTrackerPtr_->close();
1217+
}
12151218
incomingMessages_.clear();
12161219
possibleSendToDeadLetterTopicMessages_.clear();
12171220
resetCnx();

0 commit comments

Comments
 (0)