Skip to content

Commit 55a7c88

Browse files
authored
Merge pull request #824 from dollarshaveclub/offset-checkpoint-safeguard
Critical bug fix: Extra sanity checking when marking offsets as processed
2 parents 2757e27 + a23195c commit 55a7c88

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

lib/kafka/offset_manager.rb

+12-1
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,20 @@ def set_default_offset(topic, default_offset)
5050
# @param offset [Integer] the offset of the message that should be marked as processed.
5151
# @return [nil]
5252
def mark_as_processed(topic, partition, offset)
53-
@uncommitted_offsets += 1
53+
unless @group.assigned_to?(topic, partition)
54+
@logger.debug "Not marking #{topic}/#{partition}:#{offset} as processed for partition not assigned to this consumer."
55+
return
56+
end
5457
@processed_offsets[topic] ||= {}
5558

59+
last_processed_offset = @processed_offsets[topic][partition] || -1
60+
if last_processed_offset > offset + 1
61+
@logger.debug "Not overwriting newer offset #{topic}/#{partition}:#{last_processed_offset - 1} with older #{offset}"
62+
return
63+
end
64+
65+
@uncommitted_offsets += 1
66+
5667
# The committed offset should always be the offset of the next message that the
5768
# application will read, thus adding one to the last message processed.
5869
@processed_offsets[topic][partition] = offset + 1

spec/offset_manager_spec.rb

+45
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
}
2222
let(:offset_retention_time) { nil }
2323
let(:commit_interval) { 0 }
24+
let(:partition_assignments) { { 'greetings' => [0, 1, 2] } }
2425

2526
before do
2627
allow(group).to receive(:commit_offsets)
28+
allow(group).to receive(:assigned_to?) do |topic, partition|
29+
(partition_assignments[topic] || []).include?(partition)
30+
end
2731
allow(fetcher).to receive(:seek)
2832
end
2933

@@ -43,6 +47,46 @@
4347

4448
expect(group).to have_received(:commit_offsets).with(expected_offsets)
4549
end
50+
51+
context "after calling #mark_as_processed with offsets from non-assigned partitions" do
52+
it "only commits offsets from assigned partitions" do
53+
offset_manager.mark_as_processed("greetings", 0, 42)
54+
offset_manager.mark_as_processed("greetings", 1, 13)
55+
offset_manager.mark_as_processed("greetings", 5, 75)
56+
offset_manager.mark_as_processed("seasons-greetings", 3, 15)
57+
58+
offset_manager.commit_offsets
59+
60+
expected_offsets = {
61+
"greetings" => {
62+
0 => 43,
63+
1 => 14,
64+
}
65+
}
66+
67+
expect(group).to have_received(:commit_offsets).with(expected_offsets)
68+
end
69+
end
70+
71+
context "after marking offsets as processed for the same partition but out of order" do
72+
it "committs the newest offset" do
73+
offset_manager.mark_as_processed("greetings", 0, 42)
74+
offset_manager.mark_as_processed("greetings", 1, 579)
75+
offset_manager.mark_as_processed("greetings", 0, 5)
76+
offset_manager.mark_as_processed("greetings", 1, 95)
77+
78+
offset_manager.commit_offsets
79+
80+
expected_offsets = {
81+
"greetings" => {
82+
0 => 43,
83+
1 => 580
84+
}
85+
}
86+
87+
expect(group).to have_received(:commit_offsets).with(expected_offsets)
88+
end
89+
end
4690
end
4791

4892
describe "#commit_offsets_if_necessary" do
@@ -192,6 +236,7 @@ def partition_offset_info(offset)
192236
end
193237

194238
describe "#clear_offsets_excluding" do
239+
let(:partition_assignments) { { 'x' => [0, 1] } }
195240
it "clears offsets except for the partitions in the exclusion list" do
196241
offset_manager.mark_as_processed("x", 0, 42)
197242
offset_manager.mark_as_processed("x", 1, 13)

0 commit comments

Comments
 (0)