Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrading from version 0.5.5 to 0.7 created a pause of ~ 2.5 seconds between the time message is produced and the time it is consumed. #823

Closed
ElenaHenderson opened this issue Apr 21, 2020 · 14 comments · Fixed by #825

Comments

@ElenaHenderson
Copy link
Contributor

ElenaHenderson commented Apr 21, 2020

Upgrading from version 0.5.5 to 0.7 created a pause of ~ 2.5 seconds between the time message is produced and the time it is consumed.

Note that this issue never happened before 0.7 upgrade and when we downgraded back to 0.5.5, the issue went away.

We would greatly appreciate any suggestions on what configuration settings on consumer or producer side we should try to tune.

If this is a bug report, please fill out the following:

  • Version of Ruby: 2.6.5
  • Version of Kafka: 1.1.1
  • Version of ruby-kafka: ruby-kafka 0.7.0

Please verify that the problem you're seeing hasn't been fixed by the current master of ruby-kafka.

I am talking to the team to see if we can actually do this.

Steps to reproduce
Expected outcome

No pause between the time when message is produced and then picked up by consumer.

Here is graph that shows the first block when provider-requests-events consumer finished message processing and produced a new message for state-machine-events consumer and state-machine-events consumer started processing it (second block). Note there is no gap between two blocks (this was before 0.7 upgrade):

Screen Shot 2020-04-21 at 3 44 40 PM

Actual outcome

~1 second pause between the time a message is produced and then picked up by consumer.
Note the gap between two blocks (after 0.7 upgrade):.

Screen Shot 2020-04-21 at 3 42 07 PM

We are using phobos lib so these are our current consumer and producer settings:

producer:
  # number of seconds a broker can wait for replicas to acknowledge
  # a write before responding with a timeout
  ack_timeout: 5
  # number of replicas that must acknowledge a write, or `:all`
  # if all in-sync replicas must acknowledge
  required_acks: :all
  # number of retries that should be attempted before giving up sending
  # messages to the cluster. Does not include the original attempt
  max_retries: 5
  # number of seconds to wait between retries
  retry_backoff: 2
  # number of messages allowed in the buffer before new writes will
  # raise {BufferOverflow} exceptions
  max_buffer_size: 1000
  # maximum size of the buffer in bytes. Attempting to produce messages
  # when the buffer reaches this size will result in {BufferOverflow} being raised
  max_buffer_bytesize: 10000000
  # name of the compression codec to use, or nil if no compression should be performed.
  # Valid codecs: `:snappy` and `:gzip`
  compression_codec:
  # number of messages that needs to be in a message set before it should be compressed.
  # Note that message sets are per-partition rather than per-topic or per-producer
  compression_threshold: 1
  # maximum number of messages allowed in the queue. Only used for async_producer
  max_queue_size: 1000
  # if greater than zero, the number of buffered messages that will automatically
  # trigger a delivery. Only used for async_producer
  delivery_threshold: 100
  # if greater than zero, the number of seconds between automatic message
  # deliveries. Only used for async_producer
  delivery_interval: 3

consumer:
  # number of seconds after which, if a client hasn't contacted the Kafka cluster,
  # it will be kicked out of the group
  session_timeout: 120
  # interval between offset commits, in seconds
  offset_commit_interval: 10
  # number of messages that can be processed before their offsets are committed.
  # If zero, offset commits are not triggered by message processing
  offset_commit_threshold: 1
  # interval between heartbeats; must be less than the session window
  heartbeat_interval: 10
  # How long to retain offsets
  offset_retention_time: 2592000 # 30 days

Thank you so much for looking into this!

@paneq
Copy link

paneq commented Apr 24, 2020

Reproduce

Producer

require "kafka"
require "json"

kafka = Kafka.new(["127.0.0.1:9092"], client_id: "my-application")

loop do
  kafka.deliver_message({time: Time.now.to_f}.to_json, topic: "greetings", key: "Hello")
  puts "delivered"
  sleep(0.5)
end

Consumer 1 (min_bytes: 1, max_wait_time: 1)

require "kafka"
require "logger"
require "json"

kafka = Kafka.new(["127.0.0.1:9092"], logger: Logger.new(STDOUT))

# Consumers with the same group id will form a Consumer Group together.
consumer = kafka.consumer(group_id: "my-consumer")
#
# # It's possible to subscribe to multiple topics by calling `subscribe`
# # repeatedly.
consumer.subscribe("greetings")
#
# # Stop the consumer when the SIGTERM signal is sent to the process.
# # It's better to shut down gracefully than to kill the process.
trap("TERM") { consumer.stop }
#
# # This will loop indefinitely, yielding each message in turn.
consumer.each_message(min_bytes: 1, max_wait_time: 1) do |message|
  now = Time.now.to_f
  sent = JSON.parse(message.value)['time']
  diff = now - sent
  puts("diff: #{diff.round(2)}")
end

Effect:

Notice the diff in the output which is the latency in processing particular message.

D, [2020-04-24T11:03:22.069025 #43278] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:03:22.241911 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 6 from 127.0.0.1:9092
D, [2020-04-24T11:03:22.242069 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:22.242214 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 7 to 127.0.0.1:9092
D, [2020-04-24T11:03:22.242391 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 7 from 127.0.0.1:9092
D, [2020-04-24T11:03:22.748550 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 7 from 127.0.0.1:9092
D, [2020-04-24T11:03:22.748691 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:22.748882 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 8 to 127.0.0.1:9092
D, [2020-04-24T11:03:22.749067 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 8 from 127.0.0.1:9092
D, [2020-04-24T11:03:23.261769 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 8 from 127.0.0.1:9092
D, [2020-04-24T11:03:23.261879 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:23.262027 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 9 to 127.0.0.1:9092
D, [2020-04-24T11:03:23.262198 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 9 from 127.0.0.1:9092
D, [2020-04-24T11:03:23.776049 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 9 from 127.0.0.1:9092
D, [2020-04-24T11:03:23.776163 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:23.776278 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 10 to 127.0.0.1:9092
D, [2020-04-24T11:03:23.776469 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 10 from 127.0.0.1:9092
diff: 1.84
D, [2020-04-24T11:03:24.071774 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7659 as processed
diff: 1.33
D, [2020-04-24T11:03:24.072448 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7660 as processed
diff: 0.82
D, [2020-04-24T11:03:24.072714 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7661 as processed
diff: 0.31
D, [2020-04-24T11:03:24.072871 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7662 as processed
D, [2020-04-24T11:03:24.073009 #43278] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:03:24.283852 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 10 from 127.0.0.1:9092
D, [2020-04-24T11:03:24.283967 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:24.284099 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 11 to 127.0.0.1:9092
D, [2020-04-24T11:03:24.284254 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 11 from 127.0.0.1:9092
D, [2020-04-24T11:03:24.796466 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 11 from 127.0.0.1:9092
D, [2020-04-24T11:03:24.796654 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:24.796867 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 12 to 127.0.0.1:9092
D, [2020-04-24T11:03:24.797072 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 12 from 127.0.0.1:9092
D, [2020-04-24T11:03:25.308333 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 12 from 127.0.0.1:9092
D, [2020-04-24T11:03:25.308524 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:25.308723 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 13 to 127.0.0.1:9092
D, [2020-04-24T11:03:25.309017 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 13 from 127.0.0.1:9092
D, [2020-04-24T11:03:25.821799 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 13 from 127.0.0.1:9092
D, [2020-04-24T11:03:25.822161 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:25.822506 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 14 to 127.0.0.1:9092
D, [2020-04-24T11:03:25.822743 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 14 from 127.0.0.1:9092
diff: 1.8
D, [2020-04-24T11:03:26.075831 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7663 as processed
diff: 1.29
D, [2020-04-24T11:03:26.075963 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7664 as processed
diff: 0.78
D, [2020-04-24T11:03:26.076135 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7665 as processed
diff: 0.27
D, [2020-04-24T11:03:26.076183 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7666 as processed
D, [2020-04-24T11:03:26.076211 #43278] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:03:26.327101 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 14 from 127.0.0.1:9092
D, [2020-04-24T11:03:26.327227 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:26.327360 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 15 to 127.0.0.1:9092
D, [2020-04-24T11:03:26.327467 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 15 from 127.0.0.1:9092
D, [2020-04-24T11:03:26.841006 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 15 from 127.0.0.1:9092
D, [2020-04-24T11:03:26.841135 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:26.841247 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 16 to 127.0.0.1:9092
D, [2020-04-24T11:03:26.841399 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 16 from 127.0.0.1:9092
D, [2020-04-24T11:03:27.350748 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 16 from 127.0.0.1:9092
D, [2020-04-24T11:03:27.350912 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:27.351028 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 17 to 127.0.0.1:9092
D, [2020-04-24T11:03:27.351193 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 17 from 127.0.0.1:9092
D, [2020-04-24T11:03:27.861489 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 17 from 127.0.0.1:9092
D, [2020-04-24T11:03:27.861617 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:03:27.861728 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 18 to 127.0.0.1:9092
D, [2020-04-24T11:03:27.861897 #43278] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 18 from 127.0.0.1:9092
D, [2020-04-24T11:03:28.077897 #43278] DEBUG -- : [[my-consumer] {}:] Sending heartbeat...
D, [2020-04-24T11:03:28.078084 #43278] DEBUG -- : [[my-consumer] {}:] [heartbeat] Sending heartbeat API request 6 to 127.0.0.1:9092
D, [2020-04-24T11:03:28.078386 #43278] DEBUG -- : [[my-consumer] {}:] [heartbeat] Waiting for response 6 from 127.0.0.1:9092
D, [2020-04-24T11:03:28.082045 #43278] DEBUG -- : [[my-consumer] {}:] [heartbeat] Received response 6 from 127.0.0.1:9092
diff: 1.76
D, [2020-04-24T11:03:28.082371 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7667 as processed
diff: 1.25
D, [2020-04-24T11:03:28.082714 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7668 as processed
diff: 0.74
D, [2020-04-24T11:03:28.083037 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7669 as processed
diff: 0.23
D, [2020-04-24T11:03:28.083230 #43278] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7670 as processed
D, [2020-04-24T11:03:28.083353 #43278] DEBUG -- : [[my-consumer] {}:] No batches to process

Consumer 1 (min_bytes: 1, max_wait_time: 0.1)

consumer.each_message(min_bytes: 1, max_wait_time: 0.1) do |message|
  now = Time.now.to_f
  sent = JSON.parse(message.value)['time']
  diff = now - sent
  puts("diff: #{diff.round(2)}")
end

Effect

D, [2020-04-24T11:04:51.990653 #43304] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:04:52.004563 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 21 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.004865 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.005227 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 22 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.005541 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 22 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.110848 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 22 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.111239 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.111669 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 23 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.112047 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 23 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.216537 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 23 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.216676 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.216779 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 24 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.216867 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 24 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.319223 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 24 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.319334 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.319431 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 25 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.319541 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 25 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.414299 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 25 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.414458 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.414589 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 26 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.414790 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 26 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.519345 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 26 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.519472 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.519614 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 27 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.519925 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 27 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.623066 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 27 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.623185 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.623319 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 28 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.623518 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 28 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.729121 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 28 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.729238 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.729401 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 29 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.729603 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 29 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.833999 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 29 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.834182 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.834404 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 30 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.834706 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 30 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.927059 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 30 from 127.0.0.1:9092
D, [2020-04-24T11:04:52.927275 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:52.927419 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 31 to 127.0.0.1:9092
D, [2020-04-24T11:04:52.927633 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 31 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.031440 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 31 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.031561 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.031736 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 32 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.031895 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 32 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.136360 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 32 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.136589 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.136799 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 33 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.137102 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 33 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.241058 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 33 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.241374 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.241661 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 34 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.242014 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 34 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.345769 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 34 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.346032 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.346357 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 35 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.346811 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 35 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.438731 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 35 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.438885 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.439024 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 36 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.439205 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 36 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.542337 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 36 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.542538 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.542807 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 37 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.543177 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 37 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.648742 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 37 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.648918 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.649106 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 38 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.649574 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 38 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.754181 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 38 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.754375 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.754624 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 39 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.755264 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 39 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.858814 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 39 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.859018 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.859300 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 40 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.859666 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 40 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.952155 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 40 from 127.0.0.1:9092
D, [2020-04-24T11:04:53.952330 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:53.952451 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 41 to 127.0.0.1:9092
D, [2020-04-24T11:04:53.952593 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 41 from 127.0.0.1:9092
diff: 1.59
D, [2020-04-24T11:04:53.993239 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7836 as processed
diff: 1.07
D, [2020-04-24T11:04:53.993424 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7837 as processed
diff: 0.56
D, [2020-04-24T11:04:53.993596 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7838 as processed
diff: 0.05
D, [2020-04-24T11:04:53.993724 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7839 as processed
D, [2020-04-24T11:04:53.993764 #43304] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:04:54.056313 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 41 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.056516 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.056747 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 42 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.057101 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 42 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.164349 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 42 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.164567 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.164800 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 43 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.165107 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 43 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.273079 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 43 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.273381 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.273627 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 44 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.273909 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 44 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.377242 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 44 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.377533 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.377820 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 45 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.378089 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 45 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.464869 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 45 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.465056 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.465243 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 46 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.465517 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 46 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.571232 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 46 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.571437 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.571745 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 47 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.571987 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 47 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.679368 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 47 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.679749 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.680077 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 48 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.680728 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 48 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.784658 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 48 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.784820 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.785001 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 49 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.785204 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 49 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.888447 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 49 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.888621 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.888970 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 50 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.889336 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 50 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.975498 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 50 from 127.0.0.1:9092
D, [2020-04-24T11:04:54.975852 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:54.976130 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 51 to 127.0.0.1:9092
D, [2020-04-24T11:04:54.976441 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 51 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.083224 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 51 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.083414 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.083774 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 52 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.084085 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 52 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.189697 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 52 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.189906 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.190422 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 53 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.190813 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 53 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.298475 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 53 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.298691 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.298929 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 54 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.299123 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 54 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.406402 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 54 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.406686 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.406918 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 55 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.407183 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 55 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.488292 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 55 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.488692 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.489105 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 56 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.489721 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 56 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.596085 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 56 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.596250 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.596404 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 57 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.596535 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 57 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.700191 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 57 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.700441 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.700703 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 58 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.701001 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 58 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.806277 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 58 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.806528 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.806747 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 59 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.807138 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 59 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.913880 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 59 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.914140 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.914458 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 60 to 127.0.0.1:9092
D, [2020-04-24T11:04:55.914856 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 60 from 127.0.0.1:9092
diff: 1.54
D, [2020-04-24T11:04:55.996779 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7840 as processed
diff: 1.03
D, [2020-04-24T11:04:55.997207 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7841 as processed
D, [2020-04-24T11:04:55.997709 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 60 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.997831 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:04:55.998108 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 61 to 127.0.0.1:9092
diff: 0.52
D, [2020-04-24T11:04:55.998342 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7842 as processed
diff: 0.01
D, [2020-04-24T11:04:55.998766 #43304] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 61 from 127.0.0.1:9092
D, [2020-04-24T11:04:55.998888 #43304] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:7843 as processed
D, [2020-04-24T11:04:55.999119 #43304] DEBUG -- : [[my-consumer] {}:] No batches to process

Consumer 3 (min_bytes: 1, max_wait_time: 1), ruby-kafka code changed from sleep 2 to sleep 0.1

consumer.each_message(min_bytes: 1, max_wait_time: 1) do |message|
  now = Time.now.to_f
  sent = JSON.parse(message.value)['time']
  diff = now - sent
  puts("diff: #{diff.round(2)}")
end

Hardcoded sleep changed here:

Effect:

D, [2020-04-24T11:08:09.218740 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:09.222593 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 8 from 127.0.0.1:9092
D, [2020-04-24T11:08:09.222770 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:08:09.222873 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 9 to 127.0.0.1:9092
D, [2020-04-24T11:08:09.222999 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 9 from 127.0.0.1:9092
diff: 0.1
D, [2020-04-24T11:08:09.320826 #43343] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:8222 as processed
D, [2020-04-24T11:08:09.320902 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:09.424535 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:09.527207 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:09.630089 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:09.734363 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:09.738358 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 9 from 127.0.0.1:9092
D, [2020-04-24T11:08:09.738516 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:08:09.738658 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 10 to 127.0.0.1:9092
D, [2020-04-24T11:08:09.738808 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 10 from 127.0.0.1:9092
diff: 0.11
D, [2020-04-24T11:08:09.837722 #43343] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:8223 as processed
D, [2020-04-24T11:08:09.838035 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:09.941652 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.042086 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.143646 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.242994 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 10 from 127.0.0.1:9092
D, [2020-04-24T11:08:10.243093 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:08:10.243203 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 11 to 127.0.0.1:9092
D, [2020-04-24T11:08:10.243425 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 11 from 127.0.0.1:9092
diff: 0.01
D, [2020-04-24T11:08:10.248061 #43343] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:8224 as processed
D, [2020-04-24T11:08:10.248143 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.352809 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.457203 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.561735 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.666170 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process
D, [2020-04-24T11:08:10.755365 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Received response 11 from 127.0.0.1:9092
D, [2020-04-24T11:08:10.755508 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] Fetching batches
D, [2020-04-24T11:08:10.755690 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Sending fetch API request 12 to 127.0.0.1:9092
D, [2020-04-24T11:08:10.755849 #43343] DEBUG -- : [[my-consumer] {greetings: 0}:] [fetch] Waiting for response 12 from 127.0.0.1:9092
diff: 0.02
D, [2020-04-24T11:08:10.770807 #43343] DEBUG -- : [[my-consumer] {}:] Marking greetings/0:8225 as processed
D, [2020-04-24T11:08:10.770930 #43343] DEBUG -- : [[my-consumer] {}:] No batches to process

Summary

Changing the hardcoded sleep value manually is the only way to achieve low-latency for topics with low amount of writes.

Comment

It's very hard for me to say why this sleep is there and what the consequences are of making it smaller. Please at least make it configurable.

I spent plenty of time playing with max_wait_time only to discover that it does not change anything and in the worst case scenario latency was always much higher than I expected (~1.5s), only to realize that this hardcoded sleep is what causes all the issues.

@ElenaHenderson
Copy link
Contributor Author

@paneq Thank you for adding great details!

We also modified max_wait_time to 0.1 while trying to make the pause smaller and it did not make any difference.

@dasch
Copy link
Contributor

dasch commented Apr 27, 2020

I’d be open to a PR that changes the sleep to use the max_wait_time value instead of a hardcoded value.

@ElenaHenderson
Copy link
Contributor Author

@dasch Sounds good!

May I assume that Ruby can do sleep of 100 ms or 10 ms?

@dasch
Copy link
Contributor

dasch commented Apr 28, 2020

Yup, but you need to use a Float, e.g. 0.1 is 100ms.

@aaronpkahn
Copy link

@dasch we (Tesla vehicle backend services) are also experiencing this significant consumer lag and can help validate the fix works well. This is currently blocking a transition from rabbit to kafka, so please let me know if anything can be done to accelerate the upgrade.

@dasch
Copy link
Contributor

dasch commented Apr 29, 2020

@renobeguh if you can validate that #825 works it would be great!

@aaronpkahn
Copy link

@dasch we just validated the change. Using a max_wait_time of 0.01 we saw event processing lag drop from about 1500ms to 150ms.

@dasch
Copy link
Contributor

dasch commented Apr 30, 2020

Awesome, thanks!

@aaronpkahn
Copy link

@dasch do you have an estimate on when this change will be merged and published? We'd like to reference the new version, and are weighing hosting our own forked version temporarily.

@dasch
Copy link
Contributor

dasch commented May 5, 2020

I think the PR can be merged and a new release cut – if there's a regression as mentioned we can get that fixed once we get a reproducible report.

@dasch dasch closed this as completed May 5, 2020
@dasch
Copy link
Contributor

dasch commented May 5, 2020

v1.1.0.beta1 has been released. @renobeguh can you upgrade to that in production and verify that there are no regressions?

@ElenaHenderson
Copy link
Contributor Author

@dasch @paneq @renobeguh Thank you so much for debugging, testing and resolving this issue!

@aaronpkahn
Copy link

@dasch thank you we're upgrading and testing today

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants