|
21 | 21 | }
|
22 | 22 | let(:offset_retention_time) { nil }
|
23 | 23 | let(:commit_interval) { 0 }
|
| 24 | + let(:partition_assignments) { { 'greetings' => [0, 1, 2] } } |
24 | 25 |
|
25 | 26 | before do
|
26 | 27 | allow(group).to receive(:commit_offsets)
|
| 28 | + allow(group).to receive(:assigned_to?) do |topic, partition| |
| 29 | + (partition_assignments[topic] || []).include?(partition) |
| 30 | + end |
27 | 31 | allow(fetcher).to receive(:seek)
|
28 | 32 | end
|
29 | 33 |
|
|
43 | 47 |
|
44 | 48 | expect(group).to have_received(:commit_offsets).with(expected_offsets)
|
45 | 49 | end
|
| 50 | + |
| 51 | + context "after calling #mark_as_processed with offsets from non-assigned partitions" do |
| 52 | + it "only commits offsets from assigned partitions" do |
| 53 | + offset_manager.mark_as_processed("greetings", 0, 42) |
| 54 | + offset_manager.mark_as_processed("greetings", 1, 13) |
| 55 | + offset_manager.mark_as_processed("greetings", 5, 75) |
| 56 | + offset_manager.mark_as_processed("seasons-greetings", 3, 15) |
| 57 | + |
| 58 | + offset_manager.commit_offsets |
| 59 | + |
| 60 | + expected_offsets = { |
| 61 | + "greetings" => { |
| 62 | + 0 => 43, |
| 63 | + 1 => 14, |
| 64 | + } |
| 65 | + } |
| 66 | + |
| 67 | + expect(group).to have_received(:commit_offsets).with(expected_offsets) |
| 68 | + end |
| 69 | + end |
| 70 | + |
| 71 | + context "after committing offsets for the same partition out of order" do |
| 72 | + it "committs the newest offset" do |
| 73 | + offset_manager.mark_as_processed("greetings", 0, 42) |
| 74 | + offset_manager.mark_as_processed("greetings", 1, 579) |
| 75 | + offset_manager.mark_as_processed("greetings", 0, 5) |
| 76 | + offset_manager.mark_as_processed("greetings", 1, 95) |
| 77 | + |
| 78 | + offset_manager.commit_offsets |
| 79 | + |
| 80 | + expected_offsets = { |
| 81 | + "greetings" => { |
| 82 | + 0 => 43, |
| 83 | + 1 => 580 |
| 84 | + } |
| 85 | + } |
| 86 | + |
| 87 | + expect(group).to have_received(:commit_offsets).with(expected_offsets) |
| 88 | + end |
| 89 | + end |
46 | 90 | end
|
47 | 91 |
|
48 | 92 | describe "#commit_offsets_if_necessary" do
|
@@ -192,6 +236,7 @@ def partition_offset_info(offset)
|
192 | 236 | end
|
193 | 237 |
|
194 | 238 | describe "#clear_offsets_excluding" do
|
| 239 | + let(:partition_assignments) { { 'x' => [0, 1] } } |
195 | 240 | it "clears offsets except for the partitions in the exclusion list" do
|
196 | 241 | offset_manager.mark_as_processed("x", 0, 42)
|
197 | 242 | offset_manager.mark_as_processed("x", 1, 13)
|
|
0 commit comments