Skip to content

onPartitionsAssigned fired without SeekPosition.BEGINNING/END #2178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
dcheung2 opened this issue Mar 21, 2022 · 0 comments · Fixed by #2181
Closed

onPartitionsAssigned fired without SeekPosition.BEGINNING/END #2178

dcheung2 opened this issue Mar 21, 2022 · 0 comments · Fixed by #2181

Comments

@dcheung2
Copy link

dcheung2 commented Mar 21, 2022

In what version(s) of Spring for Apache Kafka are you seeing this issue?

2.8.2

Describe the bug

When using a custom ContainerFactory that create ContainerProperties(TopicPartitionOffset)
with SeekPosition.END

ConsumerSeekAware.onPartitionsAssigned will receive an empty map.

To Reproduce

A custom ContainerFactory that does not use consumer group

       class BroadcastContainerFactory extends ConcurrentKafkaListenerContainerFactory{
   	protected ConcurrentMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint){
           var properties = new ContainerProperties(new TopicPartitionOffset("foo", 0, null, SeekPosition.END));
           return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
        }
      }

Expected behavior

onPartitionsAssigned() should receive assignments that
either

  • single map has an value of null
  • or) single map has an value of actual offset

** Study **
I've inspected code in https://github.com/spring-projects/spring-kafka/blob/2.8.x/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java#L2933
.

It is pass the same partitions map for doInitialSeeks and onPartitionsAssigned

Likely that using new TopicPartitionOffset("foo", 0, System.currentTimeMillis(), SeekPosition.TIMESTAMP) (for END)
new TopicPartitionOffset("foo", 0, 0, SeekPosition.TIMESTAMP) (for BEGINNING)
could workaround.

@garyrussell garyrussell added this to the 3.0.0-M3 milestone Mar 21, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 21, 2022
Resolves spring-projects#2178

The wrong collection was used as the source for the `assignments` map,

`SeekPosition.BEGINNING` and `.END` entries are removed; use the
`definedPartitions` field instead.

**cherry-pick to 2.8.x, 2.7.x**
artembilan pushed a commit that referenced this issue Mar 21, 2022
Resolves #2178

The wrong collection was used as the source for the `assignments` map,

`SeekPosition.BEGINNING` and `.END` entries are removed; use the
`definedPartitions` field instead.

**cherry-pick to 2.8.x, 2.7.x**
artembilan pushed a commit that referenced this issue Mar 21, 2022
Resolves #2178

The wrong collection was used as the source for the `assignments` map,

`SeekPosition.BEGINNING` and `.END` entries are removed; use the
`definedPartitions` field instead.

**cherry-pick to 2.8.x, 2.7.x**
artembilan pushed a commit that referenced this issue Mar 21, 2022
Resolves #2178

The wrong collection was used as the source for the `assignments` map,

`SeekPosition.BEGINNING` and `.END` entries are removed; use the
`definedPartitions` field instead.

**cherry-pick to 2.8.x, 2.7.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants