Skip to content

Commit 3bc00a9

Browse files
garyrussellartembilan
authored andcommitted
GH-2178: Fix CSA.onPartitionsAssigned (Manual)
Resolves #2178 The wrong collection was used as the source for the `assignments` map, `SeekPosition.BEGINNING` and `.END` entries are removed; use the `definedPartitions` field instead. **cherry-pick to 2.8.x, 2.7.x**
1 parent e4d9641 commit 3bc00a9

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2885,7 +2885,7 @@ private void initPartitionsIfNeeded() {
28852885
});
28862886
doInitialSeeks(partitions, beginnings, ends);
28872887
if (this.consumerSeekAwareListener != null) {
2888-
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
2888+
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
28892889
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
28902890
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
28912891
this.seekCallback);

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -2790,7 +2790,21 @@ public void testInitialSeek() throws Exception {
27902790
containerProps.setGroupId("grp");
27912791
containerProps.setAckMode(AckMode.RECORD);
27922792
containerProps.setClientId("clientId");
2793-
containerProps.setMessageListener((MessageListener) r -> { });
2793+
2794+
Map<TopicPartition, Long> assigned = new HashMap<>();
2795+
class Listener extends AbstractConsumerSeekAware implements MessageListener {
2796+
2797+
@Override
2798+
public void onMessage(Object data) {
2799+
}
2800+
2801+
@Override
2802+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
2803+
assigned.putAll(assignments);
2804+
}
2805+
2806+
}
2807+
containerProps.setMessageListener(new Listener());
27942808
containerProps.setMissingTopicsFatal(false);
27952809
KafkaMessageListenerContainer<Integer, String> container =
27962810
new KafkaMessageListenerContainer<>(cf, containerProps);
@@ -2808,6 +2822,7 @@ public void testInitialSeek() throws Exception {
28082822
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
28092823
verify(consumer).seek(new TopicPartition("foo", 6), 42L);
28102824
container.stop();
2825+
assertThat(assigned).hasSize(8);
28112826
}
28122827

28132828
@SuppressWarnings({ "unchecked", "rawtypes" })

0 commit comments

Comments
 (0)