Skip to content

Commit 49c6194

Browse files
authored
Merge pull request #876 from abicky/make-custom-assignment-strategy-example-stable
Make "consuming messages with a custom assignment strategy" stable
2 parents 6d80b33 + fe7bc77 commit 49c6194

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

spec/functional/consumer_group_spec.rb

+9
Original file line numberDiff line numberDiff line change
@@ -476,12 +476,21 @@ def call(cluster:, members:, partitions:)
476476
end
477477
end
478478

479+
joined_consumers = []
479480
consumers = 2.times.map do |i|
480481
assignment_strategy = assignment_strategy_class.new(i + 1)
481482

482483
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
483484
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy)
484485
consumer.subscribe(topic)
486+
487+
allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args|
488+
joined_consumers |= [consumer]
489+
# Wait until all the consumers try to join to prevent one consumer from processing all messages
490+
raise Kafka::HeartbeatError if joined_consumers.size < consumers.size
491+
m.call(*args)
492+
end
493+
485494
consumer
486495
end
487496

0 commit comments

Comments
 (0)