Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add murmur2_random support for message partitioning. #884

Merged
merged 10 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class Client
#
# @param partitioner [Partitioner, nil] the partitioner that should be used by the client.
#
# @param partitioner_klass [String, nil] the partitioner klass that should be used by the client if no partitioner is supplied.
#
# @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that
# implements method token. See {Sasl::OAuth#initialize}
#
Expand All @@ -80,7 +82,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time
ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil,
sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil,
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, partitioner_klass: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
@logger = TaggedLogger.new(logger)
@instrumenter = Instrumenter.new(client_id: client_id)
@seed_brokers = normalize_seed_brokers(seed_brokers)
Expand Down Expand Up @@ -124,7 +126,14 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time
)

@cluster = initialize_cluster
@partitioner = partitioner || Partitioner.new
@partitioner =
if partitioner
partitioner
elsif partitioner_klass
Object.const_get(partitioner_klass).new
else
Partitioner.new
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should introduce a new concept next to Partitioner. Instead, Partitioner should be configured with the hash/digest algorithm, with the default being crc32 but murmur2 being an option. The "random" part would just always be the behavior when no key is present.

So: can we introduce a new parameter to Partitioner#initialize called e.g. hash_function, which allows either crc32 (default) or murmur2. Clients that wish to use murmur2 would need to instantiate Partitioner with that value and pass the instance to producer.

In Racecar and other high-level frameworks, we can add configuration keys for controlling this declaratively.

Copy link
Contributor Author

@divo divo Feb 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good. I was taking inspiration from librdkafka but your idea will also work well with the opt-in behaviour below.

Just to be clear, you would like the Partitioner instance to be passed in, ala sasl_oauth_token_provider, and not the hash_function as a symbol ala compression_codec? I'm only asking because the latter seems like a better fit here, but I don't have much experience writing gems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer the former, since we're talking about varying the hash algo used by Partitioner, but not changing its fundamental strategy. Also, I prefer a minimal change in API – the high level libraries can easily make this a config key without any extra APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for breaking it down for me. I've removed the API change and moved it inside the Partitioner

end

# Delivers a single message to the Kafka cluster.
Expand Down
35 changes: 35 additions & 0 deletions lib/kafka/murmur2_partitioner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

require 'digest/murmurhash'

module Kafka

# Java producer compatible message partitioner
class Murmur2Partitioner
SEED = [0x9747b28c].pack('L')

# Assigns a partition number based on a partition key. If no explicit
# partition key is provided, the message key will be used instead.
#
# If the key is nil, then a random partition is selected. Otherwise, a hash
# of the key is used to deterministically find a partition. As long as the
# number of partitions doesn't change, the same key will always be assigned
# to the same partition.
#
# @param partition_count [Integer] the number of partitions in the topic.
# @param message [Kafka::PendingMessage] the message that should be assigned
# a partition.
# @return [Integer] the partition number.
def call(partition_count, message)
raise ArgumentError if partition_count == 0

key = message.partition_key || message.key

if key.nil?
rand(partition_count)
else
(Digest::MurmurHash2.rawdigest(key, SEED) & 0x7fffffff) % partition_count
end
end
end
end
1 change: 1 addition & 0 deletions lib/kafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require "set"
require "kafka/partitioner"
require "kafka/murmur2_partitioner"
require "kafka/message_buffer"
require "kafka/produce_operation"
require "kafka/pending_message_queue"
Expand Down
1 change: 1 addition & 0 deletions ruby-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_dependency 'digest-crc'
spec.add_dependency "digest-murmurhash"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the dependencies of this gem? The dependency policy for ruby-kafka is pretty strict, so for opt-in behavior the default would be to give an error message when the library cannot be detected and ask the user to install it themselves (would love optional dependencies in gems, but 🤷)

If the gem is really trivial and doesn't include C code then we can add it as a hard dep, but only then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see how the various compression codecs are used. Would a similar setup to that be ok here?

The gem is fairly trivial but it's basically just C code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's C code then I think it should be opt-in, so that would be a good change 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks!


spec.add_development_dependency "bundler", ">= 1.9.5"
spec.add_development_dependency "rake", "~> 10.0"
Expand Down
82 changes: 67 additions & 15 deletions spec/partitioner_spec.rb
Original file line number Diff line number Diff line change
@@ -1,29 +1,81 @@
# frozen_string_literal: true

describe Kafka::Partitioner, "#call" do
let(:partitioner) { Kafka::Partitioner.new }
let(:message) { double(:message, key: nil, partition_key: "yolo") }

it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end
describe "default partitioner" do
let(:partitioner) { Kafka::Partitioner.new }

it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end

it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }

it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }
partition = partitioner.call(3, message)

partition = partitioner.call(3, message)
expect(partition).to eq 2
end

expect(partition).to eq 2
it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }

partitions = 30.times.map { partitioner.call(3, message) }

expect(partitions.uniq).to contain_exactly(0, 1, 2)
end
end

it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }
describe "murmur2 partitioner" do
let(:partitioner) { Kafka::Murmur2Partitioner.new }
let(:message) { double(:message, key: nil, partition_key: "yolo") }

it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end

it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }

partition = partitioner.call(3, message)

expect(partition).to eq 1
end

it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }

partitions = 30.times.map { partitioner.call(3, message) }

partitions = 30.times.map { partitioner.call(3, message) }
expect(partitions.uniq).to contain_exactly(0, 1, 2)
end

expect(partitions.uniq).to contain_exactly(0, 1, 2)
it "picks a Java Kafka compatible partition" do
partition_count = 100
{
# librdkafka test cases taken from tests/0048-partitioner.c
"" => 0x106e08d9 % partition_count,
"this is another string with more length to it perhaps" => 0x4f7703da % partition_count,
"hejsan" => 0x5ec19395 % partition_count,
# Java Kafka test cases taken from UtilsTest.java.
# The Java tests check the result of murmur2 directly,
# so have been ANDd with 0x7fffffff to work here
"21" => (-973932308 & 0x7fffffff) % partition_count,
"foobar" => (-790332482 & 0x7fffffff) % partition_count,
"a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count,
"a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count,
"lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count
}.each do |key, partition|
allow(message).to receive(:partition_key) { key }
expect(partitioner.call(partition_count, message)).to eq partition
end
end
end
end