From 7d1c38443c2f2fe1f2b95b40d2e593de9926d2f4 Mon Sep 17 00:00:00 2001 From: abicky Date: Sun, 13 Dec 2020 02:36:26 +0900 Subject: [PATCH 1/3] Make "consuming messages with a custom assignment strategy" stable In the example, one consumer sometimes consumers all messages before another consumer joins. As a result, the example is unstable. This commit makes it stable by ensuring all consumers have joined. --- spec/functional/consumer_group_spec.rb | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 23cc1525e..4368f63ad 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -476,12 +476,21 @@ def call(cluster:, members:, partitions:) end end + joinined_consumers = [] consumers = 2.times.map do |i| assignment_strategy = assignment_strategy_class.new(i + 1) kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy) consumer.subscribe(topic) + + allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args| + joinined_consumers |= [consumer] + # Wait until all the consumers try to join to prevent one consumer from processing all messages + raise Kafka::HeartbeatError if joinined_consumers.size < consumers.size + m.call(*args) + end + consumer end From cda766766974ff1d97586c6468d7b9182ef02f8f Mon Sep 17 00:00:00 2001 From: abicky Date: Sat, 12 Dec 2020 21:23:35 +0900 Subject: [PATCH 2/3] Install cmake to install snappy --- .circleci/config.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index c607764b1..fa1a8f04d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,7 @@ jobs: LOG_LEVEL: DEBUG steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec - run: bundle exec rubocop @@ -40,6 +41,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -72,6 +74,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -104,6 +107,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -136,6 +140,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -168,6 +173,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -200,6 +206,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -232,6 +239,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -264,6 +272,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -296,6 +305,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -328,6 +338,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional From fe7bc77212bfdff71cc3705ea68900e328922f9d Mon Sep 17 00:00:00 2001 From: Takeshi Arabiki Date: Thu, 31 Dec 2020 00:15:14 +0900 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Daniel Schierbeck --- spec/functional/consumer_group_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 4368f63ad..322ab9e63 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -476,7 +476,7 @@ def call(cluster:, members:, partitions:) end end - joinined_consumers = [] + joined_consumers = [] consumers = 2.times.map do |i| assignment_strategy = assignment_strategy_class.new(i + 1) @@ -485,9 +485,9 @@ def call(cluster:, members:, partitions:) consumer.subscribe(topic) allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args| - joinined_consumers |= [consumer] + joined_consumers |= [consumer] # Wait until all the consumers try to join to prevent one consumer from processing all messages - raise Kafka::HeartbeatError if joinined_consumers.size < consumers.size + raise Kafka::HeartbeatError if joined_consumers.size < consumers.size m.call(*args) end