Skip to content
This repository was archived by the owner on Mar 30, 2023. It is now read-only.

Commit cdfcc25

Browse files
garyrussellartembilan
authored andcommitted
Upgrade to SK 2.5.0
- add deliveryAttempt header when retry is not configured - temporary work around for gradle bug, jar with test classifier missing from CP
1 parent 3272861 commit cdfcc25

File tree

5 files changed

+46
-52
lines changed

5 files changed

+46
-52
lines changed

build.gradle

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ ext {
6161
log4jVersion = '2.13.0'
6262
springIntegrationVersion = '5.3.0.BUILD-SNAPSHOT'
6363
springIntegrationKotlinVersion = '0.0.3.BUILD-SNAPSHOT'
64-
springKafkaVersion = '2.4.3.BUILD-SNAPSHOT'
64+
springKafkaVersion = '2.5.0.BUILD-SNAPSHOT'
65+
66+
kafkaVersion = '2.4.1'
67+
scalaVersion = '2.12'
6568

6669
idPrefix = 'kafka'
6770

@@ -111,6 +114,9 @@ dependencies {
111114
testCompile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
112115
testCompile 'org.junit.jupiter:junit-jupiter-api'
113116

117+
// temporary - gradle bug prevents getting this transitively
118+
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
119+
114120
// To avoid compiler warnings about @API annotations in JUnit code
115121
testCompileOnly 'org.apiguardian:apiguardian-api:1.0.0'
116122

src/main/java/org/springframework/integration/kafka/dsl/Kafka.java

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.integration.kafka.dsl;
1818

19-
import java.util.Arrays;
2019
import java.util.regex.Pattern;
2120

2221
import org.apache.kafka.common.TopicPartition;
@@ -286,26 +285,6 @@ KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerCon
286285
containerProperties), listenerMode);
287286
}
288287

289-
/**
290-
* Create an initial
291-
* {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.
292-
* @param consumerFactory the {@link ConsumerFactory}.
293-
* @param topicPartitions the {@link TopicPartition} vararg.
294-
* @param <K> the Kafka message key type.
295-
* @param <V> the Kafka message value type.
296-
* @return the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
297-
* @deprecated in favor of {@link #messageDrivenChannelAdapter(ConsumerFactory, TopicPartitionOffset...)}.
298-
*/
299-
@Deprecated
300-
public static <K, V>
301-
KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(
302-
ConsumerFactory<K, V> consumerFactory,
303-
org.springframework.kafka.support.TopicPartitionInitialOffset... topicPartitions) {
304-
305-
return messageDrivenChannelAdapter(consumerFactory, KafkaMessageDrivenChannelAdapter.ListenerMode.record,
306-
topicPartitions);
307-
}
308-
309288
/**
310289
* Create an initial
311290
* {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.
@@ -324,33 +303,6 @@ KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerCon
324303
topicPartitions);
325304
}
326305

327-
/**
328-
* Create an initial
329-
* {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.
330-
* @param consumerFactory the {@link ConsumerFactory}.
331-
* @param listenerMode the {@link KafkaMessageDrivenChannelAdapter.ListenerMode}.
332-
* @param topicPartitions the {@link TopicPartition} vararg.
333-
* @param <K> the Kafka message key type.
334-
* @param <V> the Kafka message value type.
335-
* @return the
336-
* KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
337-
* @deprecated in favor of
338-
* {@link #messageDrivenChannelAdapter(ConsumerFactory, org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.ListenerMode, TopicPartitionOffset...)}
339-
*/
340-
@Deprecated
341-
public static <K, V>
342-
KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(
343-
ConsumerFactory<K, V> consumerFactory,
344-
KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode,
345-
org.springframework.kafka.support.TopicPartitionInitialOffset... topicPartitions) {
346-
347-
return messageDrivenChannelAdapter(
348-
new KafkaMessageListenerContainerSpec<>(consumerFactory,
349-
Arrays.stream(topicPartitions)
350-
.map(org.springframework.kafka.support.TopicPartitionInitialOffset::toTPO)
351-
.toArray(TopicPartitionOffset[]::new)), listenerMode);
352-
}
353-
354306
/**
355307
* Create an initial
356308
* {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.

src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,13 +16,15 @@
1616

1717
package org.springframework.integration.kafka.inbound;
1818

19+
import java.nio.ByteBuffer;
1920
import java.util.Map;
2021
import java.util.concurrent.atomic.AtomicInteger;
2122
import java.util.function.BiConsumer;
2223

2324
import org.apache.kafka.clients.consumer.Consumer;
2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526
import org.apache.kafka.common.TopicPartition;
27+
import org.apache.kafka.common.header.Header;
2628

2729
import org.springframework.core.AttributeAccessor;
2830
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -86,6 +88,8 @@ public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport implem
8688

8789
private boolean bindSourceRecord;
8890

91+
private boolean containerDeliveryAttemptPresent;
92+
8993
/**
9094
* Construct an instance with the provided container.
9195
* @param messageListenerContainer the container.
@@ -178,6 +182,8 @@ protected void onInit() {
178182
this.retryTemplate.registerListener(this.listener);
179183
}
180184
this.messageListenerContainer.getContainerProperties().setMessageListener(kafkaListener);
185+
this.containerDeliveryAttemptPresent = this.messageListenerContainer.getContainerProperties()
186+
.isDeliveryAttemptHeader();
181187
}
182188

183189
@Override
@@ -295,6 +301,11 @@ private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> recor
295301
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
296302
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
297303
}
304+
else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
305+
Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
306+
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,
307+
new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
308+
}
298309
if (KafkaInboundGateway.this.bindSourceRecord) {
299310
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
300311
}
@@ -306,6 +317,11 @@ private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> recor
306317
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
307318
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
308319
}
320+
else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
321+
Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
322+
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,
323+
new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
324+
}
309325
if (KafkaInboundGateway.this.bindSourceRecord) {
310326
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
311327
}

src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.kafka.inbound;
1818

19+
import java.nio.ByteBuffer;
1920
import java.util.List;
2021
import java.util.Map;
2122
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +25,7 @@
2425
import org.apache.kafka.clients.consumer.Consumer;
2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
2627
import org.apache.kafka.common.TopicPartition;
28+
import org.apache.kafka.common.header.Header;
2729

2830
import org.springframework.core.AttributeAccessor;
2931
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -99,6 +101,8 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo
99101

100102
private boolean bindSourceRecord;
101103

104+
private boolean containerDeliveryAttemptPresent;
105+
102106
/**
103107
* Construct an instance with mode {@link ListenerMode#record}.
104108
* @param messageListenerContainer the container.
@@ -310,6 +314,8 @@ protected void onInit() {
310314
}
311315
this.messageListenerContainer.getContainerProperties().setMessageListener(listener);
312316
}
317+
this.containerDeliveryAttemptPresent = this.messageListenerContainer.getContainerProperties()
318+
.isDeliveryAttemptHeader();
313319
}
314320

315321
@Override
@@ -452,6 +458,11 @@ private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> recor
452458
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
453459
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
454460
}
461+
else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
462+
Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
463+
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,
464+
new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
465+
}
455466
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
456467
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
457468
}
@@ -463,6 +474,11 @@ private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> recor
463474
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
464475
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
465476
}
477+
else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
478+
Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
479+
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,
480+
new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
481+
}
466482
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
467483
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
468484
}

src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,7 @@
6666
import org.springframework.kafka.core.ProducerFactory;
6767
import org.springframework.kafka.listener.ContainerProperties;
6868
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
69+
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
6970
import org.springframework.kafka.support.Acknowledgment;
7071
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
7172
import org.springframework.kafka.support.KafkaHeaders;
@@ -329,8 +330,10 @@ void testInboundRecordNoRetryRecover() {
329330
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
330331
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
331332
ContainerProperties containerProps = new ContainerProperties(topic5);
333+
containerProps.setDeliveryAttemptHeader(true);
332334
KafkaMessageListenerContainer<Integer, String> container =
333335
new KafkaMessageListenerContainer<>(cf, containerProps);
336+
container.setErrorHandler(new SeekToCurrentErrorHandler());
334337
KafkaMessageDrivenChannelAdapter<Integer, String> adapter = new KafkaMessageDrivenChannelAdapter<>(container);
335338
MessageChannel out = new DirectChannel() {
336339

@@ -371,6 +374,7 @@ protected boolean doSend(Message<?> message, long timeout) {
371374
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5);
372375
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
373376
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
377+
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(1);
374378

375379
adapter.stop();
376380
}

0 commit comments

Comments
 (0)