Skip to content

Commit 24ff85d

Browse files
garyrussellartembilan
authored andcommitted
GH-1876: Seek To Timestamp with Manual Assignment
Resolves #1876 Support initial seek to timestamp when manually assigning partitions. **cherry-pick to 2.7.x**
1 parent 3fb3bb8 commit 24ff85d

File tree

3 files changed

+37
-10
lines changed

3 files changed

+37
-10
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -2629,7 +2629,11 @@ else if (position.equals(SeekPosition.TIMESTAMP)) {
26292629
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
26302630
.offsetsForTimes(
26312631
Collections.singletonMap(offset.getTopicPartition(), offset.getOffset()));
2632-
offsetsForTimes.forEach((tp, ot) -> this.consumer.seek(tp, ot.offset()));
2632+
offsetsForTimes.forEach((tp, ot) -> {
2633+
if (ot != null) {
2634+
this.consumer.seek(tp, ot.offset());
2635+
}
2636+
});
26332637
}
26342638
else {
26352639
this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition()));
@@ -2693,6 +2697,18 @@ private void initPartitionsIfNeeded() {
26932697
.map(Entry::getKey)
26942698
.collect(Collectors.toSet());
26952699
ends.forEach(partitions::remove);
2700+
Map<TopicPartition, Long> times = partitions.entrySet().stream()
2701+
.filter(e -> SeekPosition.TIMESTAMP.equals(e.getValue().seekPosition))
2702+
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().offset));
2703+
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(times);
2704+
offsetsForTimes.forEach((tp, off) -> {
2705+
if (off == null) {
2706+
ends.add(tp);
2707+
}
2708+
else {
2709+
partitions.put(tp, new OffsetMetadata(off.offset(), false, SeekPosition.TIMESTAMP));
2710+
}
2711+
});
26962712
if (beginnings.size() > 0) {
26972713
this.consumer.seekToBeginning(beginnings);
26982714
}
@@ -3279,11 +3295,11 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
32793295

32803296
private static final class OffsetMetadata {
32813297

3282-
private final Long offset;
3298+
final Long offset; // NOSONAR
32833299

3284-
private final boolean relativeToCurrent;
3300+
final boolean relativeToCurrent; // NOSONAR
32853301

3286-
private final SeekPosition seekPosition;
3302+
final SeekPosition seekPosition; // NOSONAR
32873303

32883304
OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) {
32893305
this.offset = offset;

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public enum SeekPosition {
6262
END,
6363

6464
/**
65-
* Seek to the time stamp.
65+
* Seek to the time stamp; if no records exist with a timestamp greater than or
66+
* equal to the timestamp seek to the end.
6667
*/
6768
TIMESTAMP
6869

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.apache.kafka.clients.consumer.ConsumerRecords;
7070
import org.apache.kafka.clients.consumer.KafkaConsumer;
7171
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
72+
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
7273
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
7374
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
7475
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -138,7 +139,7 @@
138139
KafkaMessageListenerContainerTests.topic17, KafkaMessageListenerContainerTests.topic18,
139140
KafkaMessageListenerContainerTests.topic19, KafkaMessageListenerContainerTests.topic20,
140141
KafkaMessageListenerContainerTests.topic21, KafkaMessageListenerContainerTests.topic22,
141-
KafkaMessageListenerContainerTests.topic23 })
142+
KafkaMessageListenerContainerTests.topic23, KafkaMessageListenerContainerTests.topic24 })
142143
public class KafkaMessageListenerContainerTests {
143144

144145
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
@@ -191,6 +192,8 @@ public class KafkaMessageListenerContainerTests {
191192

192193
public static final String topic24 = "testTopic24";
193194

195+
public static final String topic25 = "testTopic24";
196+
194197
private static EmbeddedKafkaBroker embeddedKafka;
195198

196199
@BeforeAll
@@ -2758,15 +2761,20 @@ public void testInitialSeek() throws Exception {
27582761
Thread.sleep(50);
27592762
return emptyRecords;
27602763
});
2761-
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
2764+
2765+
Map<TopicPartition, OffsetAndTimestamp> offsets = new HashMap<>();
2766+
offsets.put(new TopicPartition("foo", 6), new OffsetAndTimestamp(42L, 1234L));
2767+
offsets.put(new TopicPartition("foo", 7), null);
2768+
given(consumer.offsetsForTimes(any())).willReturn(offsets);
2769+
ContainerProperties containerProps = new ContainerProperties(
27622770
new TopicPartitionOffset("foo", 0, SeekPosition.BEGINNING),
27632771
new TopicPartitionOffset("foo", 1, SeekPosition.END),
27642772
new TopicPartitionOffset("foo", 2, 0L),
27652773
new TopicPartitionOffset("foo", 3, Long.MAX_VALUE),
27662774
new TopicPartitionOffset("foo", 4, SeekPosition.BEGINNING),
27672775
new TopicPartitionOffset("foo", 5, SeekPosition.END),
2768-
};
2769-
ContainerProperties containerProps = new ContainerProperties(topicPartition);
2776+
new TopicPartitionOffset("foo", 6, 1234L, SeekPosition.TIMESTAMP),
2777+
new TopicPartitionOffset("foo", 7, 1234L, SeekPosition.TIMESTAMP));
27702778
containerProps.setGroupId("grp");
27712779
containerProps.setAckMode(AckMode.RECORD);
27722780
containerProps.setClientId("clientId");
@@ -2782,9 +2790,11 @@ public void testInitialSeek() throws Exception {
27822790
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 4))));
27832791
verify(consumer).seekToEnd(captor.capture());
27842792
assertThat(captor.getValue())
2785-
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5))));
2793+
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5),
2794+
new TopicPartition("foo", 7))));
27862795
verify(consumer).seek(new TopicPartition("foo", 2), 0L);
27872796
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
2797+
verify(consumer).seek(new TopicPartition("foo", 6), 42L);
27882798
container.stop();
27892799
}
27902800

0 commit comments

Comments
 (0)