diff --git a/lib/kafka/compression.rb b/lib/kafka/compression.rb index 8e7848192..08a04af5c 100644 --- a/lib/kafka/compression.rb +++ b/lib/kafka/compression.rb @@ -4,23 +4,34 @@ module Kafka module Compression + CODEC_NAMES = { + 1 => :gzip, + 2 => :snappy, + 3 => :lz4, + }.freeze + + CODECS = { + :gzip => GzipCodec.new, + :snappy => SnappyCodec.new, + :lz4 => LZ4Codec.new, + }.freeze + + def self.codecs + CODECS.keys + end + def self.find_codec(name) - case name - when nil then nil - when :snappy then SnappyCodec.new - when :gzip then GzipCodec.new - when :lz4 then LZ4Codec.new - else raise "Unknown compression codec #{name}" + CODECS.fetch(name) do + raise "Unknown compression codec #{name}" end end def self.find_codec_by_id(codec_id) - case codec_id - when 1 then GzipCodec.new - when 2 then SnappyCodec.new - when 3 then LZ4Codec.new - else raise "Unknown codec id #{codec_id}" + codec_name = CODEC_NAMES.fetch(codec_id) do + raise "Unknown codec id #{codec_id}" end + + find_codec(codec_name) end end end diff --git a/lib/kafka/compressor.rb b/lib/kafka/compressor.rb index 27699f8af..756c270b8 100644 --- a/lib/kafka/compressor.rb +++ b/lib/kafka/compressor.rb @@ -21,7 +21,9 @@ class Compressor # @param threshold [Integer] the minimum number of messages in a message set # that will trigger compression. def initialize(codec_name: nil, threshold: 1, instrumenter:) - @codec = Compression.find_codec(codec_name) + # Codec may be nil, in which case we won't compress. + @codec = codec_name && Compression.find_codec(codec_name) + @threshold = threshold @instrumenter = instrumenter end diff --git a/spec/compression_spec.rb b/spec/compression_spec.rb new file mode 100644 index 000000000..8485486d2 --- /dev/null +++ b/spec/compression_spec.rb @@ -0,0 +1,18 @@ +describe Kafka::Compression do + Kafka::Compression.codecs.each do |codec_name| + describe codec_name.to_s do + it "encodes and decodes data" do + data = "yolo" + codec = Kafka::Compression.find_codec(codec_name) + + expect(codec.decompress(codec.compress(data))).to eq data + end + + it "has a consistent codec id" do + codec = Kafka::Compression.find_codec(codec_name) + + expect(Kafka::Compression.find_codec_by_id(codec.codec_id)).to eq codec + end + end + end +end