diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e31a47d6..e1db8b5aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Changes and additions to the library will be listed here. ## Unreleased - Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). +- Add support for `murmur2` based partitioning. ## 1.3.0 diff --git a/README.md b/README.md index ebe3d8365..1ff5b616f 100644 --- a/README.md +++ b/README.md @@ -382,6 +382,16 @@ partitioner = -> (partition_count, message) { ... } Kafka.new(partitioner: partitioner, ...) ``` +##### Supported partitioning schemes + +In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. + +To use `murmur2` hashing pass it as an argument to `Partitioner`. For example: + +```ruby +Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2)) +``` + #### Buffering and Error Handling The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer. diff --git a/lib/kafka/crc32_hash.rb b/lib/kafka/crc32_hash.rb new file mode 100644 index 000000000..1849008a6 --- /dev/null +++ b/lib/kafka/crc32_hash.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "zlib" + +module Kafka + class Crc32Hash + + # crc32 is supported natively + def load; end + + def hash(value) + Zlib.crc32(value) + end + end +end diff --git a/lib/kafka/digest.rb b/lib/kafka/digest.rb new file mode 100644 index 000000000..8ba4cc206 --- /dev/null +++ b/lib/kafka/digest.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require "kafka/crc32_hash" +require "kafka/murmur2_hash" + +module Kafka + module Digest + FUNCTIONS_BY_NAME = { + :crc32 => Crc32Hash.new, + :murmur2 => Murmur2Hash.new + }.freeze + + def self.find_digest(name) + digest = FUNCTIONS_BY_NAME.fetch(name) do + raise LoadError, "Unknown hash function #{name}" + end + + digest.load + digest + end + end +end diff --git a/lib/kafka/murmur2_hash.rb b/lib/kafka/murmur2_hash.rb new file mode 100644 index 000000000..a6223b0d6 --- /dev/null +++ b/lib/kafka/murmur2_hash.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Kafka + class Murmur2Hash + SEED = [0x9747b28c].pack('L') + + def load + require 'digest/murmurhash' + rescue LoadError + raise LoadError, "using murmur2 hashing requires adding a dependency on the `digest-murmurhash` gem to your Gemfile." + end + + def hash(value) + ::Digest::MurmurHash2.rawdigest(value, SEED) & 0x7fffffff + end + end +end diff --git a/lib/kafka/partitioner.rb b/lib/kafka/partitioner.rb index f4fcd2882..e11052442 100644 --- a/lib/kafka/partitioner.rb +++ b/lib/kafka/partitioner.rb @@ -1,11 +1,16 @@ # frozen_string_literal: true -require "zlib" +require "kafka/digest" module Kafka # Assigns partitions to messages. class Partitioner + # @param hash_function [Symbol, nil] the algorithm used to compute a messages + # destination partition. Default is :crc32 + def initialize(hash_function: nil) + @digest = Digest.find_digest(hash_function || :crc32) + end # Assigns a partition number based on a partition key. If no explicit # partition key is provided, the message key will be used instead. @@ -28,7 +33,7 @@ def call(partition_count, message) if key.nil? rand(partition_count) else - Zlib.crc32(key) % partition_count + @digest.hash(key) % partition_count end end end diff --git a/lib/kafka/protocol/record_batch.rb b/lib/kafka/protocol/record_batch.rb index 4201cc737..b9f6ea2c9 100644 --- a/lib/kafka/protocol/record_batch.rb +++ b/lib/kafka/protocol/record_batch.rb @@ -77,7 +77,7 @@ def encode(encoder) record_batch_encoder.write_int8(MAGIC_BYTE) body = encode_record_batch_body - crc = Digest::CRC32c.checksum(body) + crc = ::Digest::CRC32c.checksum(body) record_batch_encoder.write_int32(crc) record_batch_encoder.write(body) diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index 2589e97d5..d50092dc5 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -33,6 +33,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec" spec.add_development_dependency "pry" + spec.add_development_dependency "digest-murmurhash" spec.add_development_dependency "dotenv" spec.add_development_dependency "docker-api" spec.add_development_dependency "rspec-benchmark" diff --git a/spec/digest_spec.rb b/spec/digest_spec.rb new file mode 100644 index 000000000..c4b98a8bf --- /dev/null +++ b/spec/digest_spec.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +describe Kafka::Digest do + describe "crc32" do + let(:digest) { Kafka::Digest.find_digest(:crc32) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1623057525) + end + end + + describe "murmur2" do + let(:digest) { Kafka::Digest.find_digest(:murmur2) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1633766415) + end + end + + describe "unknown hash function" do + it "raises" do + expect { Kafka::Digest.find_digest(:yolo) }.to raise_error + end + end +end diff --git a/spec/partitioner_spec.rb b/spec/partitioner_spec.rb index dafe73557..547dccd13 100644 --- a/spec/partitioner_spec.rb +++ b/spec/partitioner_spec.rb @@ -1,29 +1,81 @@ # frozen_string_literal: true describe Kafka::Partitioner, "#call" do - let(:partitioner) { Kafka::Partitioner.new } let(:message) { double(:message, key: nil, partition_key: "yolo") } - it "deterministically returns a partition number for a partition key and partition count" do - partition = partitioner.call(3, message) - expect(partition).to eq 0 - end + describe "default partitioner" do + let(:partitioner) { Kafka::Partitioner.new } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } - it "falls back to the message key if no partition key is available" do - allow(message).to receive(:partition_key) { nil } - allow(message).to receive(:key) { "hey" } + partition = partitioner.call(3, message) - partition = partitioner.call(3, message) + expect(partition).to eq 2 + end - expect(partition).to eq 2 + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } + + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end end - it "randomly picks a partition if the key is nil" do - allow(message).to receive(:key) { nil } - allow(message).to receive(:partition_key) { nil } + describe "murmur2 partitioner" do + let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) } + let(:message) { double(:message, key: nil, partition_key: "yolo") } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } + + partition = partitioner.call(3, message) + + expect(partition).to eq 1 + end + + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } - partitions = 30.times.map { partitioner.call(3, message) } + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end - expect(partitions.uniq).to contain_exactly(0, 1, 2) + it "picks a Java Kafka compatible partition" do + partition_count = 100 + { + # librdkafka test cases taken from tests/0048-partitioner.c + "" => 0x106e08d9 % partition_count, + "this is another string with more length to it perhaps" => 0x4f7703da % partition_count, + "hejsan" => 0x5ec19395 % partition_count, + # Java Kafka test cases taken from UtilsTest.java. + # The Java tests check the result of murmur2 directly, + # so have been ANDd with 0x7fffffff to work here + "21" => (-973932308 & 0x7fffffff) % partition_count, + "foobar" => (-790332482 & 0x7fffffff) % partition_count, + "a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count, + "a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count, + "lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count + }.each do |key, partition| + allow(message).to receive(:partition_key) { key } + expect(partitioner.call(partition_count, message)).to eq partition + end + end end end