Skip to content

Commit cb8e079

Browse files
authored
[Issue 9204][Python/C++]Expose replicateSubscriptionState setting for python/c++ consumer (#10243)
Fixes #9204 ### Motivation Currently, in the python and C++ client, the "replicateSubscriptionState" feature can no be enabled in the consumer creation. ### Modifications Expose the "replicateSubscriptionState" setting in the consumer's construct function.
1 parent 1bcb56c commit cb8e079

9 files changed

+42
-7
lines changed

include/pulsar/ConsumerConfiguration.h

+13
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
352352
*/
353353
InitialPosition getSubscriptionInitialPosition() const;
354354

355+
/**
356+
* Set whether the subscription status should be replicated.
357+
* The default value is `false`.
358+
*
359+
* @param replicateSubscriptionState whether the subscription status should be replicated
360+
*/
361+
void setReplicateSubscriptionStateEnabled(bool enabled);
362+
363+
/**
364+
* @return whether the subscription status should be replicated
365+
*/
366+
bool isReplicateSubscriptionStateEnabled() const;
367+
355368
/**
356369
* Check whether the message has a specific property attached.
357370
*

lib/Commands.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
256256
const std::map<std::string, std::string>& metadata,
257257
const SchemaInfo& schemaInfo,
258258
CommandSubscribe_InitialPosition subscriptionInitialPosition,
259-
KeySharedPolicy keySharedPolicy) {
259+
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy) {
260260
BaseCommand cmd;
261261
cmd.set_type(BaseCommand::SUBSCRIBE);
262262
CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -269,6 +269,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
269269
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
270270
subscribe->set_read_compacted(readCompacted);
271271
subscribe->set_initialposition(subscriptionInitialPosition);
272+
subscribe->set_replicate_subscription_state(replicateSubscriptionState);
272273

273274
if (isBuiltInSchema(schemaInfo.getSchemaType())) {
274275
subscribe->set_allocated_schema(getSchema(schemaInfo));

lib/Commands.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class Commands {
8989
bool readCompacted, const std::map<std::string, std::string>& metadata,
9090
const SchemaInfo& schemaInfo,
9191
proto::CommandSubscribe_InitialPosition subscriptionInitialPosition,
92-
KeySharedPolicy keySharedPolicy);
92+
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy);
9393

9494
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
9595

lib/ConsumerConfiguration.cc

+8
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) {
163163

164164
int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod; }
165165

166+
void ConsumerConfiguration::setReplicateSubscriptionStateEnabled(bool enabled) {
167+
impl_->replicateSubscriptionStateEnabled = enabled;
168+
}
169+
170+
bool ConsumerConfiguration::isReplicateSubscriptionStateEnabled() const {
171+
return impl_->replicateSubscriptionStateEnabled;
172+
}
173+
166174
bool ConsumerConfiguration::hasProperty(const std::string& name) const {
167175
const std::map<std::string, std::string>& m = impl_->properties;
168176
return m.find(name) != m.end();

lib/ConsumerConfigurationImpl.h

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ struct ConsumerConfigurationImpl {
4444
bool readCompacted;
4545
InitialPosition subscriptionInitialPosition;
4646
int patternAutoDiscoveryPeriod;
47+
bool replicateSubscriptionStateEnabled;
4748
std::map<std::string, std::string> properties;
4849
KeySharedPolicy keySharedPolicy;
4950

lib/ConsumerImpl.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,10 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
184184

185185
ClientImplPtr client = client_.lock();
186186
uint64_t requestId = client->newRequestId();
187-
SharedBuffer cmd =
188-
Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_,
189-
subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties(),
190-
config_.getSchema(), getInitialPosition(), config_.getKeySharedPolicy());
187+
SharedBuffer cmd = Commands::newSubscribe(
188+
topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
189+
startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(),
190+
config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy());
191191
cnx->sendRequestWithId(cmd, requestId)
192192
.addListener(
193193
std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));

python/pulsar/__init__.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,8 @@ def subscribe(self, topic, subscription_name,
599599
properties=None,
600600
pattern_auto_discovery_period=60,
601601
initial_position=InitialPosition.Latest,
602-
crypto_key_reader=None
602+
crypto_key_reader=None,
603+
replicate_subscription_state_enabled=False
603604
):
604605
"""
605606
Subscribe to the given topic and subscription combination.
@@ -675,6 +676,9 @@ def my_listener(consumer, message):
675676
* crypto_key_reader:
676677
Symmetric encryption class implementation, configuring public key encryption messages for the producer
677678
and private key decryption messages for the consumer
679+
* replicate_subscription_state_enabled:
680+
Set whether the subscription status should be replicated.
681+
Default: `False`.
678682
"""
679683
_check_type(str, subscription_name, 'subscription_name')
680684
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -716,6 +720,8 @@ def my_listener(consumer, message):
716720
if crypto_key_reader:
717721
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
718722

723+
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
724+
719725
c = Consumer()
720726
if isinstance(topic, str):
721727
# Single topic

python/pulsar_test.py

+4
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ def test_consumer_config(self):
9999
conf.consumer_name("my-name")
100100
self.assertEqual(conf.consumer_name(), "my-name")
101101

102+
self.assertEqual(conf.replicate_subscription_state_enabled(), False)
103+
conf.replicate_subscription_state_enabled(True)
104+
self.assertEqual(conf.replicate_subscription_state_enabled(), True)
105+
102106
def test_client_logger(self):
103107
logger = logging.getLogger("pulsar")
104108
Client(self.serviceUrl, logger=logger)

python/src/config.cc

+2
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ void export_config() {
273273
.def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
274274
.def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
275275
.def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>())
276+
.def("replicate_subscription_state_enabled", &ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
277+
.def("replicate_subscription_state_enabled", &ConsumerConfiguration::isReplicateSubscriptionStateEnabled)
276278
;
277279

278280
class_<ReaderConfiguration>("ReaderConfiguration")

0 commit comments

Comments
 (0)