File tree 3 files changed +43
-12
lines changed
3 files changed +43
-12
lines changed Original file line number Diff line number Diff line change 4
4
5
5
module Kafka
6
6
module Compression
7
+ CODEC_NAMES = {
8
+ 1 => :gzip ,
9
+ 2 => :snappy ,
10
+ 3 => :lz4 ,
11
+ } . freeze
12
+
13
+ CODECS = {
14
+ :gzip => GzipCodec . new ,
15
+ :snappy => SnappyCodec . new ,
16
+ :lz4 => LZ4Codec . new ,
17
+ } . freeze
18
+
19
+ def self . codecs
20
+ CODECS . keys
21
+ end
22
+
7
23
def self . find_codec ( name )
8
- case name
9
- when nil then nil
10
- when :snappy then SnappyCodec . new
11
- when :gzip then GzipCodec . new
12
- when :lz4 then LZ4Codec . new
13
- else raise "Unknown compression codec #{ name } "
24
+ CODECS . fetch ( name ) do
25
+ raise "Unknown compression codec #{ name } "
14
26
end
15
27
end
16
28
17
29
def self . find_codec_by_id ( codec_id )
18
- case codec_id
19
- when 1 then GzipCodec . new
20
- when 2 then SnappyCodec . new
21
- when 3 then LZ4Codec . new
22
- else raise "Unknown codec id #{ codec_id } "
30
+ codec_name = CODEC_NAMES . fetch ( codec_id ) do
31
+ raise "Unknown codec id #{ codec_id } "
23
32
end
33
+
34
+ find_codec ( codec_name )
24
35
end
25
36
end
26
37
end
Original file line number Diff line number Diff line change @@ -21,7 +21,9 @@ class Compressor
21
21
# @param threshold [Integer] the minimum number of messages in a message set
22
22
# that will trigger compression.
23
23
def initialize ( codec_name : nil , threshold : 1 , instrumenter :)
24
- @codec = Compression . find_codec ( codec_name )
24
+ # Codec may be nil, in which case we won't compress.
25
+ @codec = codec_name && Compression . find_codec ( codec_name )
26
+
25
27
@threshold = threshold
26
28
@instrumenter = instrumenter
27
29
end
Original file line number Diff line number Diff line change
1
+ describe Kafka ::Compression do
2
+ Kafka ::Compression . codecs . each do |codec_name |
3
+ describe codec_name . to_s do
4
+ it "encodes and decodes data" do
5
+ data = "yolo"
6
+ codec = Kafka ::Compression . find_codec ( codec_name )
7
+
8
+ expect ( codec . decompress ( codec . compress ( data ) ) ) . to eq data
9
+ end
10
+
11
+ it "has a consistent codec id" do
12
+ codec = Kafka ::Compression . find_codec ( codec_name )
13
+
14
+ expect ( Kafka ::Compression . find_codec_by_id ( codec . codec_id ) ) . to eq codec
15
+ end
16
+ end
17
+ end
18
+ end
You can’t perform that action at this time.
0 commit comments