Skip to content

Commit 9118643

Browse files
authored
[C++] Reduce redeliverMessages when message listener is enabled (#10726)
### Motivation When consumer's message listener is enabled, application acknowledge processed message in message listener callback, but still tracks the message in `internalListener`, causes sending redundant redeliverMessages ### Modifications trackMessage before invoke message listener callback in `internalListener`
1 parent cb8e079 commit 9118643

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

lib/ConsumerImpl.cc

+6-3
Original file line numberDiff line numberDiff line change
@@ -566,14 +566,15 @@ void ConsumerImpl::internalListener() {
566566
// This will only happen when the connection got reset and we cleared the queue
567567
return;
568568
}
569+
trackMessage(msg);
569570
try {
570571
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
571572
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
572573
messageListener_(Consumer(shared_from_this()), msg);
573574
} catch (const std::exception& e) {
574575
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
575576
}
576-
messageProcessed(msg);
577+
messageProcessed(msg, false);
577578
}
578579

579580
Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
@@ -701,7 +702,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
701702
}
702703
}
703704

704-
void ConsumerImpl::messageProcessed(Message& msg) {
705+
void ConsumerImpl::messageProcessed(Message& msg, bool track) {
705706
Lock lock(mutex_);
706707
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
707708

@@ -712,7 +713,9 @@ void ConsumerImpl::messageProcessed(Message& msg) {
712713
}
713714

714715
increaseAvailablePermits(currentCnx);
715-
trackMessage(msg);
716+
if (track) {
717+
trackMessage(msg);
718+
}
716719
}
717720

718721
/**

lib/ConsumerImpl.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class ConsumerImpl : public ConsumerImplBase,
7878
uint64_t getConsumerId();
7979
void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
8080
bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
81-
void messageProcessed(Message& msg);
81+
void messageProcessed(Message& msg, bool track = true);
8282
inline proto::CommandSubscribe_SubType getSubType();
8383
inline proto::CommandSubscribe_InitialPosition getInitialPosition();
8484
void handleUnsubscribe(Result result, ResultCallback callback);

0 commit comments

Comments
 (0)