Skip to content

Commit 63d494f

Browse files
Add the Consumer::getConsumerName API (#360)
### Motivation C++ client supports setting the consumer name via `ConsumerConfiguration::setConsumerName` but it does not support getting the consumer name for a given consumer. ### Modifications Add the `getConsumerName` method to `ConsumerImplBase` to get the consumer name from the config and a public method with the same name to `Consumer`. Add `ConsumerTest.testConsumerName` to verify the consumer name is set correctly in the subscribe command.
1 parent 751c807 commit 63d494f

9 files changed

+113
-20
lines changed

include/pulsar/Consumer.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,15 @@ class PULSAR_PUBLIC Consumer {
4848
const std::string& getTopic() const;
4949

5050
/**
51-
* @return the consumer name
51+
* @return the subscription name
5252
*/
5353
const std::string& getSubscriptionName() const;
5454

55+
/**
56+
* @return the consumer name
57+
*/
58+
const std::string& getConsumerName() const;
59+
5560
/**
5661
* Unsubscribe the current consumer from the topic.
5762
*

lib/Consumer.cc

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ const std::string& Consumer::getSubscriptionName() const {
3939
return impl_ != NULL ? impl_->getSubscriptionName() : EMPTY_STRING;
4040
}
4141

42+
const std::string& Consumer::getConsumerName() const {
43+
return impl_ ? impl_->getConsumerName() : EMPTY_STRING;
44+
}
45+
4246
Result Consumer::unsubscribe() {
4347
if (!impl_) {
4448
return ResultConsumerNotInitialized;

lib/ConsumerImpl.cc

+3-4
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
8383
availablePermits_(0),
8484
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
8585
consumerId_(client->newConsumerId()),
86-
consumerName_(config_.getConsumerName()),
8786
consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "),
8887
messageListenerRunning_(true),
8988
negativeAcksTracker_(client, *this, conf),
@@ -249,7 +248,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
249248
ClientImplPtr client = client_.lock();
250249
uint64_t requestId = client->newRequestId();
251250
SharedBuffer cmd = Commands::newSubscribe(
252-
topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
251+
topic(), subscription_, consumerId_, requestId, getSubType(), getConsumerName(), subscriptionMode_,
253252
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
254253
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
255254
config_.getKeySharedPolicy(), config_.getPriorityLevel());
@@ -1780,7 +1779,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
17801779
}
17811780
if (result != ResultOk) {
17821781
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
1783-
<< self->consumerName_ << "} Failed to acknowledge the message {"
1782+
<< self->getConsumerName() << "} Failed to acknowledge the message {"
17841783
<< originMessageId
17851784
<< "} of the original topic but send to the DLQ successfully : "
17861785
<< result);
@@ -1793,7 +1792,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
17931792
});
17941793
} else {
17951794
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
1796-
<< self->consumerName_ << "} Failed to send DLQ message to {"
1795+
<< self->getConsumerName() << "} Failed to send DLQ message to {"
17971796
<< self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
17981797
<< "{" << originMessageId << "} : " << res);
17991798
cb(false);

lib/ConsumerImpl.h

-1
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ class ConsumerImpl : public ConsumerImplBase {
216216
std::atomic_int availablePermits_;
217217
const int receiverQueueRefillThreshold_;
218218
uint64_t consumerId_;
219-
std::string consumerName_;
220219
const std::string consumerStr_;
221220
int32_t partitionIndex_ = -1;
222221
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;

lib/ConsumerImplBase.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi
3333
const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
3434
: HandlerBase(client, topic, backoff),
3535
listenerExecutor_(listenerExecutor),
36-
batchReceivePolicy_(conf.getBatchReceivePolicy()) {
36+
batchReceivePolicy_(conf.getBatchReceivePolicy()),
37+
consumerName_(conf.getConsumerName()) {
3738
auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
3839
if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
3940
batchReceivePolicy_ =

lib/ConsumerImplBase.h

+4
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ class ConsumerImplBase : public HandlerBase {
8282
virtual const std::string& getName() const override = 0;
8383
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0;
8484

85+
const std::string& getConsumerName() const noexcept { return consumerName_; }
86+
8587
protected:
8688
// overrided methods from HandlerBase
8789
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override {
@@ -106,6 +108,8 @@ class ConsumerImplBase : public HandlerBase {
106108
virtual bool hasEnoughMessagesForBatchReceive() const = 0;
107109

108110
private:
111+
const std::string consumerName_;
112+
109113
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
110114

111115
friend class MultiTopicsConsumerImpl;

tests/ClientTest.cc

+5-12
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@
2121
#include <pulsar/Version.h>
2222

2323
#include <algorithm>
24-
#include <boost/property_tree/json_parser.hpp>
25-
#include <boost/property_tree/ptree.hpp>
2624
#include <chrono>
2725
#include <future>
2826
#include <sstream>
2927

30-
#include "HttpHelper.h"
28+
#include "PulsarAdminHelper.h"
3129
#include "PulsarFriend.h"
3230
#include "WaitUtils.h"
3331
#include "lib/ClientConnection.h"
@@ -335,18 +333,13 @@ class PulsarWrapper {
335333
// When `subscription` is empty, get client versions of the producers.
336334
// Otherwise, get client versions of the consumers under the subscribe.
337335
static std::vector<std::string> getClientVersions(const std::string &topic, std::string subscription = "") {
338-
const auto url = adminUrl + "admin/v2/persistent/public/default/" + topic + "/stats";
339-
std::string responseData;
340-
int res = makeGetRequest(url, responseData);
341-
if (res != 200) {
342-
LOG_ERROR(url << " failed: " << res);
336+
boost::property_tree::ptree root;
337+
const auto error = getTopicStats(topic, root);
338+
if (!error.empty()) {
339+
LOG_ERROR(error);
343340
return {};
344341
}
345342

346-
std::stringstream stream;
347-
stream << responseData;
348-
boost::property_tree::ptree root;
349-
boost::property_tree::read_json(stream, root);
350343
std::vector<std::string> versions;
351344
if (subscription.empty()) {
352345
for (auto &child : root.get_child("publishers")) {

tests/ConsumerTest.cc

+47-1
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
#include <thread>
3030
#include <vector>
3131

32-
#include "HttpHelper.h"
3332
#include "NoOpsCryptoKeyReader.h"
33+
#include "PulsarAdminHelper.h"
3434
#include "PulsarFriend.h"
3535
#include "SynchronizedQueue.h"
3636
#include "WaitUtils.h"
@@ -1430,4 +1430,50 @@ TEST(ConsumerTest, testCloseAgainBeforeCloseDone) {
14301430
ASSERT_TRUE(*done);
14311431
}
14321432

1433+
inline std::string getConsumerName(const std::string& topic) {
1434+
boost::property_tree::ptree root;
1435+
const auto error = getTopicStats(topic, root);
1436+
if (!error.empty()) {
1437+
LOG_INFO(error);
1438+
return {};
1439+
}
1440+
return root.get_child("subscriptions")
1441+
.get_child("sub")
1442+
.get_child("consumers")
1443+
.front()
1444+
.second.get<std::string>("consumerName");
1445+
}
1446+
1447+
TEST(ConsumerTest, testConsumerName) {
1448+
Client client{lookupUrl};
1449+
Consumer consumer;
1450+
ASSERT_TRUE(consumer.getConsumerName().empty());
1451+
const auto topic1 = "consumer-test-consumer-name-1";
1452+
const auto topic2 = "consumer-test-consumer-name-2";
1453+
1454+
// Default consumer name
1455+
ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub", consumer));
1456+
LOG_INFO("Random consumer name: " << consumer.getConsumerName());
1457+
ASSERT_FALSE(consumer.getConsumerName().empty()); // a random name
1458+
ASSERT_EQ(consumer.getConsumerName(), getConsumerName(topic1));
1459+
consumer.close();
1460+
1461+
// Single-topic consumer
1462+
ConsumerConfiguration conf;
1463+
const std::string consumerName = "custom-consumer";
1464+
conf.setConsumerName(consumerName);
1465+
ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub", conf, consumer));
1466+
ASSERT_EQ(consumerName, consumer.getConsumerName());
1467+
ASSERT_EQ(consumerName, getConsumerName(topic1));
1468+
consumer.close();
1469+
1470+
// Multi-topics consumer
1471+
ASSERT_EQ(ResultOk, client.subscribe(std::vector<std::string>{topic1, topic2}, "sub", conf, consumer));
1472+
ASSERT_EQ(consumerName, consumer.getConsumerName());
1473+
ASSERT_EQ(consumerName, getConsumerName(topic1));
1474+
ASSERT_EQ(consumerName, getConsumerName(topic2));
1475+
1476+
client.close();
1477+
}
1478+
14331479
} // namespace pulsar

tests/PulsarAdminHelper.h

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <boost/property_tree/json_parser.hpp>
22+
#include <boost/property_tree/ptree.hpp>
23+
24+
#include "HttpHelper.h"
25+
26+
namespace pulsar {
27+
28+
inline std::string getTopicStats(const std::string& topic, boost::property_tree::ptree& root) {
29+
const auto url = "http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/stats";
30+
std::string responseData;
31+
int code = makeGetRequest(url, responseData);
32+
if (code != 200) {
33+
return url + " failed: " + std::to_string(code);
34+
}
35+
36+
std::stringstream stream;
37+
stream << responseData;
38+
boost::property_tree::read_json(stream, root);
39+
return "";
40+
}
41+
42+
} // namespace pulsar

0 commit comments

Comments
 (0)