diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dcb0e7e8..04ff2d26e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Changes and additions to the library will be listed here. - Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). - Add support for `murmur2` based partitioning. - Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877). +- Handle SyncGroup responses with a non-zero error and no assignments (#896). ## 1.3.0 diff --git a/lib/kafka/protocol/sync_group_response.rb b/lib/kafka/protocol/sync_group_response.rb index a1e4aab83..148945095 100644 --- a/lib/kafka/protocol/sync_group_response.rb +++ b/lib/kafka/protocol/sync_group_response.rb @@ -13,9 +13,12 @@ def initialize(error_code:, member_assignment:) end def self.decode(decoder) + error_code = decoder.int16 + member_assignment_bytes = decoder.bytes + new( - error_code: decoder.int16, - member_assignment: MemberAssignment.decode(Decoder.from_string(decoder.bytes)), + error_code: error_code, + member_assignment: member_assignment_bytes ? MemberAssignment.decode(Decoder.from_string(member_assignment_bytes)) : nil ) end end diff --git a/spec/protocol/sync_group_response_spec.rb b/spec/protocol/sync_group_response_spec.rb new file mode 100644 index 000000000..8979f244c --- /dev/null +++ b/spec/protocol/sync_group_response_spec.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +describe Kafka::Protocol::SyncGroupResponse do + describe ".decode" do + subject(:response) { Kafka::Protocol::SyncGroupResponse.decode(decoder) } + + let(:decoder) { Kafka::Protocol::Decoder.new(buffer) } + let(:buffer) { StringIO.new(response_bytes) } + + context "the response is successful" do + let(:response_bytes) { "\x00\x00\x00\x00\x007\x00\x00\x00\x00\x00\x01\x00\x1Fsome-topic-f064d6897583eb395896\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 0 + expect(response.member_assignment.topics).to eq({ "some-topic-f064d6897583eb395896" => [0, 1] }) + end + end + + context "the response is not successful" do + let(:response_bytes) { "\x00\x19\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 25 + expect(response.member_assignment).to be_nil + end + end + end +end