Skip to content

Commit 3e37e44

Browse files
Support start message id inclusive (#19)
### Motivation Currently, the pulsar client CPP already supports start message id inclusive, We need to add the feature config to the Python client. ### Modifications * Add `start_message_id_inclusive ` to the consumer configuration
1 parent 5f86fdf commit 3e37e44

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

pulsar/__init__.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,8 @@ def subscribe(self, topic, subscription_name,
697697
crypto_key_reader=None,
698698
replicate_subscription_state_enabled=False,
699699
max_pending_chunked_message=10,
700-
auto_ack_oldest_chunked_message_on_queue_full=False
700+
auto_ack_oldest_chunked_message_on_queue_full=False,
701+
start_message_id_inclusive=False
701702
):
702703
"""
703704
Subscribe to the given topic and subscription combination.
@@ -791,6 +792,10 @@ def my_listener(consumer, message):
791792
can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage.
792793
Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking
793794
if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
795+
Default: `False`.
796+
* start_message_id_inclusive:
797+
Set the consumer to include the given position of any reset operation like Consumer::seek.
798+
794799
Default: `False`.
795800
"""
796801
_check_type(str, subscription_name, 'subscription_name')
@@ -810,6 +815,7 @@ def my_listener(consumer, message):
810815
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
811816
_check_type(int, max_pending_chunked_message, 'max_pending_chunked_message')
812817
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
818+
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
813819

814820
conf = _pulsar.ConsumerConfiguration()
815821
conf.consumer_type(consumer_type)
@@ -838,6 +844,7 @@ def my_listener(consumer, message):
838844
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
839845
conf.max_pending_chunked_message(max_pending_chunked_message)
840846
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
847+
conf.start_message_id_inclusive(start_message_id_inclusive)
841848

842849
c = Consumer()
843850
if isinstance(topic, str):

src/config.cc

+4-1
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,10 @@ void export_config() {
294294
.def("auto_ack_oldest_chunked_message_on_queue_full",
295295
&ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull)
296296
.def("auto_ack_oldest_chunked_message_on_queue_full",
297-
&ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, return_self<>());
297+
&ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, return_self<>())
298+
.def("start_message_id_inclusive", &ConsumerConfiguration::isStartMessageIdInclusive)
299+
.def("start_message_id_inclusive",&ConsumerConfiguration::setStartMessageIdInclusive,
300+
return_self<>());
298301

299302
class_<ReaderConfiguration>("ReaderConfiguration")
300303
.def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>())

tests/pulsar_test.py

+31
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,37 @@ def test_seek(self):
920920
reader.close()
921921
client.close()
922922

923+
def test_seek_inclusive(self):
924+
client = Client(self.serviceUrl)
925+
topic = "my-python-topic-seek-inclusive-" + str(time.time())
926+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, start_message_id_inclusive=True)
927+
producer = client.create_producer(topic)
928+
929+
for i in range(100):
930+
if i > 0:
931+
time.sleep(0.02)
932+
producer.send(b"hello-%d" % i)
933+
934+
ids = []
935+
for i in range(100):
936+
msg = consumer.receive(TM)
937+
self.assertEqual(msg.data(), b"hello-%d" % i)
938+
ids.append(msg.message_id())
939+
consumer.acknowledge(msg)
940+
941+
# seek, and after reconnect, expected receive first message.
942+
consumer.seek(MessageId.earliest)
943+
time.sleep(0.5)
944+
msg = consumer.receive(TM)
945+
self.assertEqual(msg.data(), b"hello-0")
946+
947+
# seek on messageId
948+
consumer.seek(ids[50])
949+
time.sleep(0.5)
950+
msg = consumer.receive(TM)
951+
self.assertEqual(msg.data(), b"hello-50")
952+
client.close()
953+
923954
def test_v2_topics(self):
924955
self._v2_topics(self.serviceUrl)
925956

0 commit comments

Comments
 (0)