Skip to content

Commit a56d16b

Browse files
authored
Merge pull request #884 from zendesk/divo/murmur2
Add `murmur2_random` support for message partitioning.
2 parents 0d6c451 + fd89412 commit a56d16b

10 files changed

+174
-18
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Changes and additions to the library will be listed here.
55
## Unreleased
66

77
- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866).
8+
- Add support for `murmur2` based partitioning.
89

910
## 1.3.0
1011

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,16 @@ partitioner = -> (partition_count, message) { ... }
382382
Kafka.new(partitioner: partitioner, ...)
383383
```
384384

385+
##### Supported partitioning schemes
386+
387+
In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers.
388+
389+
To use `murmur2` hashing pass it as an argument to `Partitioner`. For example:
390+
391+
```ruby
392+
Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2))
393+
```
394+
385395
#### Buffering and Error Handling
386396

387397
The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer.

lib/kafka/crc32_hash.rb

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# frozen_string_literal: true
2+
3+
require "zlib"
4+
5+
module Kafka
6+
class Crc32Hash
7+
8+
# crc32 is supported natively
9+
def load; end
10+
11+
def hash(value)
12+
Zlib.crc32(value)
13+
end
14+
end
15+
end

lib/kafka/digest.rb

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
require "kafka/crc32_hash"
4+
require "kafka/murmur2_hash"
5+
6+
module Kafka
7+
module Digest
8+
FUNCTIONS_BY_NAME = {
9+
:crc32 => Crc32Hash.new,
10+
:murmur2 => Murmur2Hash.new
11+
}.freeze
12+
13+
def self.find_digest(name)
14+
digest = FUNCTIONS_BY_NAME.fetch(name) do
15+
raise LoadError, "Unknown hash function #{name}"
16+
end
17+
18+
digest.load
19+
digest
20+
end
21+
end
22+
end

lib/kafka/murmur2_hash.rb

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
class Murmur2Hash
5+
SEED = [0x9747b28c].pack('L')
6+
7+
def load
8+
require 'digest/murmurhash'
9+
rescue LoadError
10+
raise LoadError, "using murmur2 hashing requires adding a dependency on the `digest-murmurhash` gem to your Gemfile."
11+
end
12+
13+
def hash(value)
14+
::Digest::MurmurHash2.rawdigest(value, SEED) & 0x7fffffff
15+
end
16+
end
17+
end

lib/kafka/partitioner.rb

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
# frozen_string_literal: true
22

3-
require "zlib"
3+
require "kafka/digest"
44

55
module Kafka
66

77
# Assigns partitions to messages.
88
class Partitioner
9+
# @param hash_function [Symbol, nil] the algorithm used to compute a messages
10+
# destination partition. Default is :crc32
11+
def initialize(hash_function: nil)
12+
@digest = Digest.find_digest(hash_function || :crc32)
13+
end
914

1015
# Assigns a partition number based on a partition key. If no explicit
1116
# partition key is provided, the message key will be used instead.
@@ -28,7 +33,7 @@ def call(partition_count, message)
2833
if key.nil?
2934
rand(partition_count)
3035
else
31-
Zlib.crc32(key) % partition_count
36+
@digest.hash(key) % partition_count
3237
end
3338
end
3439
end

lib/kafka/protocol/record_batch.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def encode(encoder)
7777
record_batch_encoder.write_int8(MAGIC_BYTE)
7878

7979
body = encode_record_batch_body
80-
crc = Digest::CRC32c.checksum(body)
80+
crc = ::Digest::CRC32c.checksum(body)
8181

8282
record_batch_encoder.write_int32(crc)
8383
record_batch_encoder.write(body)

ruby-kafka.gemspec

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Gem::Specification.new do |spec|
3333
spec.add_development_dependency "rake", "~> 10.0"
3434
spec.add_development_dependency "rspec"
3535
spec.add_development_dependency "pry"
36+
spec.add_development_dependency "digest-murmurhash"
3637
spec.add_development_dependency "dotenv"
3738
spec.add_development_dependency "docker-api"
3839
spec.add_development_dependency "rspec-benchmark"

spec/digest_spec.rb

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
describe Kafka::Digest do
4+
describe "crc32" do
5+
let(:digest) { Kafka::Digest.find_digest(:crc32) }
6+
7+
it "is supported" do
8+
expect(digest).to be_truthy
9+
end
10+
11+
it "produces hash for value" do
12+
expect(digest.hash("yolo")).to eq(1623057525)
13+
end
14+
end
15+
16+
describe "murmur2" do
17+
let(:digest) { Kafka::Digest.find_digest(:murmur2) }
18+
19+
it "is supported" do
20+
expect(digest).to be_truthy
21+
end
22+
23+
it "produces hash for value" do
24+
expect(digest.hash("yolo")).to eq(1633766415)
25+
end
26+
end
27+
28+
describe "unknown hash function" do
29+
it "raises" do
30+
expect { Kafka::Digest.find_digest(:yolo) }.to raise_error
31+
end
32+
end
33+
end

spec/partitioner_spec.rb

+67-15
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,81 @@
11
# frozen_string_literal: true
22

33
describe Kafka::Partitioner, "#call" do
4-
let(:partitioner) { Kafka::Partitioner.new }
54
let(:message) { double(:message, key: nil, partition_key: "yolo") }
65

7-
it "deterministically returns a partition number for a partition key and partition count" do
8-
partition = partitioner.call(3, message)
9-
expect(partition).to eq 0
10-
end
6+
describe "default partitioner" do
7+
let(:partitioner) { Kafka::Partitioner.new }
8+
9+
it "deterministically returns a partition number for a partition key and partition count" do
10+
partition = partitioner.call(3, message)
11+
expect(partition).to eq 0
12+
end
13+
14+
it "falls back to the message key if no partition key is available" do
15+
allow(message).to receive(:partition_key) { nil }
16+
allow(message).to receive(:key) { "hey" }
1117

12-
it "falls back to the message key if no partition key is available" do
13-
allow(message).to receive(:partition_key) { nil }
14-
allow(message).to receive(:key) { "hey" }
18+
partition = partitioner.call(3, message)
1519

16-
partition = partitioner.call(3, message)
20+
expect(partition).to eq 2
21+
end
1722

18-
expect(partition).to eq 2
23+
it "randomly picks a partition if the key is nil" do
24+
allow(message).to receive(:key) { nil }
25+
allow(message).to receive(:partition_key) { nil }
26+
27+
partitions = 30.times.map { partitioner.call(3, message) }
28+
29+
expect(partitions.uniq).to contain_exactly(0, 1, 2)
30+
end
1931
end
2032

21-
it "randomly picks a partition if the key is nil" do
22-
allow(message).to receive(:key) { nil }
23-
allow(message).to receive(:partition_key) { nil }
33+
describe "murmur2 partitioner" do
34+
let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) }
35+
let(:message) { double(:message, key: nil, partition_key: "yolo") }
36+
37+
it "deterministically returns a partition number for a partition key and partition count" do
38+
partition = partitioner.call(3, message)
39+
expect(partition).to eq 0
40+
end
41+
42+
it "falls back to the message key if no partition key is available" do
43+
allow(message).to receive(:partition_key) { nil }
44+
allow(message).to receive(:key) { "hey" }
45+
46+
partition = partitioner.call(3, message)
47+
48+
expect(partition).to eq 1
49+
end
50+
51+
it "randomly picks a partition if the key is nil" do
52+
allow(message).to receive(:key) { nil }
53+
allow(message).to receive(:partition_key) { nil }
54+
55+
partitions = 30.times.map { partitioner.call(3, message) }
2456

25-
partitions = 30.times.map { partitioner.call(3, message) }
57+
expect(partitions.uniq).to contain_exactly(0, 1, 2)
58+
end
2659

27-
expect(partitions.uniq).to contain_exactly(0, 1, 2)
60+
it "picks a Java Kafka compatible partition" do
61+
partition_count = 100
62+
{
63+
# librdkafka test cases taken from tests/0048-partitioner.c
64+
"" => 0x106e08d9 % partition_count,
65+
"this is another string with more length to it perhaps" => 0x4f7703da % partition_count,
66+
"hejsan" => 0x5ec19395 % partition_count,
67+
# Java Kafka test cases taken from UtilsTest.java.
68+
# The Java tests check the result of murmur2 directly,
69+
# so have been ANDd with 0x7fffffff to work here
70+
"21" => (-973932308 & 0x7fffffff) % partition_count,
71+
"foobar" => (-790332482 & 0x7fffffff) % partition_count,
72+
"a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count,
73+
"a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count,
74+
"lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count
75+
}.each do |key, partition|
76+
allow(message).to receive(:partition_key) { key }
77+
expect(partitioner.call(partition_count, message)).to eq partition
78+
end
79+
end
2880
end
2981
end

0 commit comments

Comments
 (0)