diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h index 6596894d..63defdb8 100644 --- a/include/pulsar/Consumer.h +++ b/include/pulsar/Consumer.h @@ -48,10 +48,15 @@ class PULSAR_PUBLIC Consumer { const std::string& getTopic() const; /** - * @return the consumer name + * @return the subscription name */ const std::string& getSubscriptionName() const; + /** + * @return the consumer name + */ + const std::string& getConsumerName() const; + /** * Unsubscribe the current consumer from the topic. * diff --git a/lib/Consumer.cc b/lib/Consumer.cc index bda7f862..031cdff9 100644 --- a/lib/Consumer.cc +++ b/lib/Consumer.cc @@ -39,6 +39,10 @@ const std::string& Consumer::getSubscriptionName() const { return impl_ != NULL ? impl_->getSubscriptionName() : EMPTY_STRING; } +const std::string& Consumer::getConsumerName() const { + return impl_ ? impl_->getConsumerName() : EMPTY_STRING; +} + Result Consumer::unsubscribe() { if (!impl_) { return ResultConsumerNotInitialized; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index b4666836..955fb325 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -83,7 +83,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, availablePermits_(0), receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2), consumerId_(client->newConsumerId()), - consumerName_(config_.getConsumerName()), consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "), messageListenerRunning_(true), negativeAcksTracker_(client, *this, conf), @@ -249,7 +248,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c ClientImplPtr client = client_.lock(); uint64_t requestId = client->newRequestId(); SharedBuffer cmd = Commands::newSubscribe( - topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_, + topic(), subscription_, consumerId_, requestId, getSubType(), getConsumerName(), subscriptionMode_, subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(), config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(), config_.getPriorityLevel()); @@ -1780,7 +1779,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa } if (result != ResultOk) { LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {" - << self->consumerName_ << "} Failed to acknowledge the message {" + << self->getConsumerName() << "} Failed to acknowledge the message {" << originMessageId << "} of the original topic but send to the DLQ successfully : " << result); @@ -1793,7 +1792,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa }); } else { LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {" - << self->consumerName_ << "} Failed to send DLQ message to {" + << self->getConsumerName() << "} Failed to send DLQ message to {" << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id " << "{" << originMessageId << "} : " << res); cb(false); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 61d96b1c..5e9ef04e 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -216,7 +216,6 @@ class ConsumerImpl : public ConsumerImplBase { std::atomic_int availablePermits_; const int receiverQueueRefillThreshold_; uint64_t consumerId_; - std::string consumerName_; const std::string consumerStr_; int32_t partitionIndex_ = -1; Promise consumerCreatedPromise_; diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc index 6c86aa44..39212371 100644 --- a/lib/ConsumerImplBase.cc +++ b/lib/ConsumerImplBase.cc @@ -33,7 +33,8 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor) : HandlerBase(client, topic, backoff), listenerExecutor_(listenerExecutor), - batchReceivePolicy_(conf.getBatchReceivePolicy()) { + batchReceivePolicy_(conf.getBatchReceivePolicy()), + consumerName_(conf.getConsumerName()) { auto userBatchReceivePolicy = conf.getBatchReceivePolicy(); if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) { batchReceivePolicy_ = diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 2f3420c6..1b7e86e1 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -82,6 +82,8 @@ class ConsumerImplBase : public HandlerBase { virtual const std::string& getName() const override = 0; virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0; + const std::string& getConsumerName() const noexcept { return consumerName_; } + protected: // overrided methods from HandlerBase Future connectionOpened(const ClientConnectionPtr& cnx) override { @@ -106,6 +108,8 @@ class ConsumerImplBase : public HandlerBase { virtual bool hasEnoughMessagesForBatchReceive() const = 0; private: + const std::string consumerName_; + virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0; friend class MultiTopicsConsumerImpl; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 5b4dd2e8..8c5a32ab 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -21,13 +21,11 @@ #include #include -#include -#include #include #include #include -#include "HttpHelper.h" +#include "PulsarAdminHelper.h" #include "PulsarFriend.h" #include "WaitUtils.h" #include "lib/ClientConnection.h" @@ -335,18 +333,13 @@ class PulsarWrapper { // When `subscription` is empty, get client versions of the producers. // Otherwise, get client versions of the consumers under the subscribe. static std::vector getClientVersions(const std::string &topic, std::string subscription = "") { - const auto url = adminUrl + "admin/v2/persistent/public/default/" + topic + "/stats"; - std::string responseData; - int res = makeGetRequest(url, responseData); - if (res != 200) { - LOG_ERROR(url << " failed: " << res); + boost::property_tree::ptree root; + const auto error = getTopicStats(topic, root); + if (!error.empty()) { + LOG_ERROR(error); return {}; } - std::stringstream stream; - stream << responseData; - boost::property_tree::ptree root; - boost::property_tree::read_json(stream, root); std::vector versions; if (subscription.empty()) { for (auto &child : root.get_child("publishers")) { diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 0836fbf4..3ca48cea 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -29,8 +29,8 @@ #include #include -#include "HttpHelper.h" #include "NoOpsCryptoKeyReader.h" +#include "PulsarAdminHelper.h" #include "PulsarFriend.h" #include "SynchronizedQueue.h" #include "WaitUtils.h" @@ -1430,4 +1430,50 @@ TEST(ConsumerTest, testCloseAgainBeforeCloseDone) { ASSERT_TRUE(*done); } +inline std::string getConsumerName(const std::string& topic) { + boost::property_tree::ptree root; + const auto error = getTopicStats(topic, root); + if (!error.empty()) { + LOG_INFO(error); + return {}; + } + return root.get_child("subscriptions") + .get_child("sub") + .get_child("consumers") + .front() + .second.get("consumerName"); +} + +TEST(ConsumerTest, testConsumerName) { + Client client{lookupUrl}; + Consumer consumer; + ASSERT_TRUE(consumer.getConsumerName().empty()); + const auto topic1 = "consumer-test-consumer-name-1"; + const auto topic2 = "consumer-test-consumer-name-2"; + + // Default consumer name + ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub", consumer)); + LOG_INFO("Random consumer name: " << consumer.getConsumerName()); + ASSERT_FALSE(consumer.getConsumerName().empty()); // a random name + ASSERT_EQ(consumer.getConsumerName(), getConsumerName(topic1)); + consumer.close(); + + // Single-topic consumer + ConsumerConfiguration conf; + const std::string consumerName = "custom-consumer"; + conf.setConsumerName(consumerName); + ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub", conf, consumer)); + ASSERT_EQ(consumerName, consumer.getConsumerName()); + ASSERT_EQ(consumerName, getConsumerName(topic1)); + consumer.close(); + + // Multi-topics consumer + ASSERT_EQ(ResultOk, client.subscribe(std::vector{topic1, topic2}, "sub", conf, consumer)); + ASSERT_EQ(consumerName, consumer.getConsumerName()); + ASSERT_EQ(consumerName, getConsumerName(topic1)); + ASSERT_EQ(consumerName, getConsumerName(topic2)); + + client.close(); +} + } // namespace pulsar diff --git a/tests/PulsarAdminHelper.h b/tests/PulsarAdminHelper.h new file mode 100644 index 00000000..944c7b8d --- /dev/null +++ b/tests/PulsarAdminHelper.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +#include "HttpHelper.h" + +namespace pulsar { + +inline std::string getTopicStats(const std::string& topic, boost::property_tree::ptree& root) { + const auto url = "http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/stats"; + std::string responseData; + int code = makeGetRequest(url, responseData); + if (code != 200) { + return url + " failed: " + std::to_string(code); + } + + std::stringstream stream; + stream << responseData; + boost::property_tree::read_json(stream, root); + return ""; +} + +} // namespace pulsar