Skip to content

Commit 2c59b51

Browse files
committed
spring-projectsGH-2554: Fix DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME value
Fixes spring-projects#2554
1 parent 0d1727d commit 2c59b51

File tree

2 files changed

+251
-2
lines changed

2 files changed

+251
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,6 +62,6 @@ private RetryTopicBeanNames() {
6262
* The bean name of the internally registered scheduler wrapper, if needed.
6363
*/
6464
public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME =
65-
"defaultRetryTopicKafkaTemplate";
65+
"defaultRetryTopicSchedulerWrapper";
6666

6767
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright 2021-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.clients.producer.ProducerConfig;
21+
import org.apache.kafka.common.serialization.StringDeserializer;
22+
import org.apache.kafka.common.serialization.StringSerializer;
23+
import org.junit.jupiter.api.Test;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.kafka.annotation.DltHandler;
30+
import org.springframework.kafka.annotation.EnableKafka;
31+
import org.springframework.kafka.annotation.KafkaListener;
32+
import org.springframework.kafka.annotation.RetryableTopic;
33+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
34+
import org.springframework.kafka.core.*;
35+
import org.springframework.kafka.listener.ContainerProperties;
36+
import org.springframework.kafka.listener.DefaultErrorHandler;
37+
import org.springframework.kafka.support.KafkaHeaders;
38+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
39+
import org.springframework.kafka.test.context.EmbeddedKafka;
40+
import org.springframework.messaging.handler.annotation.Header;
41+
import org.springframework.retry.annotation.Backoff;
42+
import org.springframework.stereotype.Component;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
45+
import org.springframework.util.backoff.FixedBackOff;
46+
47+
import java.util.HashMap;
48+
import java.util.Map;
49+
import java.util.concurrent.CountDownLatch;
50+
import java.util.concurrent.TimeUnit;
51+
52+
import static org.assertj.core.api.Assertions.assertThat;
53+
import static org.assertj.core.api.Assertions.fail;
54+
import static org.mockito.Mockito.spy;
55+
import static org.mockito.Mockito.verify;
56+
57+
/**
58+
* @author Cenk Akin
59+
* @since 3.0.3
60+
*/
61+
@SpringJUnitConfig
62+
@DirtiesContext
63+
@EmbeddedKafka(topics = {RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC,
64+
RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC}, partitions = 1)
65+
public class RetryTopicMultipleListenerIntegrationTests {
66+
67+
private static final Logger logger = LoggerFactory.getLogger(RetryTopicMultipleListenerIntegrationTests.class);
68+
69+
public final static String FIRST_TOPIC = "myRetryTopic1";
70+
71+
public final static String SECOND_TOPIC = "myRetryTopic2";
72+
73+
@Autowired
74+
private KafkaTemplate<String, String> sendKafkaTemplate;
75+
76+
@Autowired
77+
private CountDownLatchContainer latchContainer;
78+
79+
@Test
80+
void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory componentFactory) {
81+
logger.debug("Sending message to topic " + FIRST_TOPIC);
82+
sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1");
83+
logger.debug("Sending message to topic " + SECOND_TOPIC);
84+
sendKafkaTemplate.send(SECOND_TOPIC, "Testing topic 2");
85+
assertThat(awaitLatch(latchContainer.firstCountDownLatch)).isTrue();
86+
assertThat(awaitLatch(latchContainer.firstCountDownLatchDlt)).isTrue();
87+
assertThat(awaitLatch(latchContainer.secondCountDownLatch)).isTrue();
88+
assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue();
89+
verify(componentFactory).destinationTopicResolver();
90+
}
91+
92+
private boolean awaitLatch(CountDownLatch latch) {
93+
try {
94+
return latch.await(150, TimeUnit.SECONDS);
95+
}
96+
catch (Exception e) {
97+
fail(e.getMessage());
98+
throw new RuntimeException(e);
99+
}
100+
}
101+
102+
@Component
103+
static class FirstKafkaListener {
104+
105+
@Autowired
106+
CountDownLatchContainer countDownLatchContainer;
107+
108+
@RetryableTopic(
109+
attempts = "4",
110+
backoff = @Backoff(delay = 10, multiplier = 2.0),
111+
autoCreateTopics = "false",
112+
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
113+
@KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC)
114+
public void firstListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
115+
countDownLatchContainer.firstCountDownLatch.countDown();
116+
logger.warn(in + " from " + topic);
117+
throw new RuntimeException("test");
118+
}
119+
120+
@DltHandler
121+
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
122+
countDownLatchContainer.firstCountDownLatchDlt.countDown();
123+
logger.warn(in + " from " + topic);
124+
}
125+
}
126+
127+
@Component
128+
static class SecondKafkaListener {
129+
130+
@Autowired
131+
CountDownLatchContainer countDownLatchContainer;
132+
133+
@RetryableTopic()
134+
@KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC)
135+
public void secondListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
136+
countDownLatchContainer.secondCountDownLatch.countDown();
137+
logger.info(in + " from " + topic);
138+
throw new RuntimeException("another test");
139+
}
140+
141+
@DltHandler
142+
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
143+
countDownLatchContainer.secondCountDownLatchDlt.countDown();
144+
logger.warn(in + " from " + topic);
145+
}
146+
}
147+
148+
@Component
149+
static class CountDownLatchContainer {
150+
151+
CountDownLatch firstCountDownLatch = new CountDownLatch(4);
152+
CountDownLatch secondCountDownLatch = new CountDownLatch(3);
153+
CountDownLatch firstCountDownLatchDlt = new CountDownLatch(1);
154+
155+
CountDownLatch secondCountDownLatchDlt = new CountDownLatch(1);
156+
CountDownLatch customizerLatch = new CountDownLatch(6);
157+
}
158+
159+
@EnableKafka
160+
@Configuration
161+
static class Config {
162+
163+
@Autowired
164+
EmbeddedKafkaBroker broker;
165+
166+
@Bean
167+
CountDownLatchContainer latchContainer() {
168+
return new CountDownLatchContainer();
169+
}
170+
171+
@Bean
172+
FirstKafkaListener firstKafkaListener() {
173+
return new FirstKafkaListener();
174+
}
175+
176+
@Bean
177+
SecondKafkaListener secondKafkaListener() {
178+
return new SecondKafkaListener();
179+
}
180+
181+
@Bean
182+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
183+
ConsumerFactory<String, String> consumerFactory, CountDownLatchContainer latchContainer) {
184+
185+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
186+
factory.setConsumerFactory(consumerFactory);
187+
ContainerProperties props = factory.getContainerProperties();
188+
props.setIdleEventInterval(100L);
189+
props.setPollTimeout(50L);
190+
props.setIdlePartitionEventInterval(100L);
191+
factory.setConsumerFactory(consumerFactory);
192+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
193+
(cr, ex) -> latchContainer.secondCountDownLatch.countDown(),
194+
new FixedBackOff(0, 2));
195+
factory.setCommonErrorHandler(errorHandler);
196+
factory.setConcurrency(1);
197+
factory.setContainerCustomizer(
198+
container -> latchContainer.customizerLatch.countDown());
199+
return factory;
200+
}
201+
202+
@Bean
203+
public ProducerFactory<String, String> producerFactory() {
204+
Map<String, Object> configProps = new HashMap<>();
205+
configProps.put(
206+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
207+
this.broker.getBrokersAsString());
208+
configProps.put(
209+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
210+
StringSerializer.class);
211+
configProps.put(
212+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
213+
StringSerializer.class);
214+
return new DefaultKafkaProducerFactory<>(configProps);
215+
}
216+
217+
@Bean
218+
public KafkaTemplate<String, String> kafkaTemplate() {
219+
return new KafkaTemplate<>(producerFactory());
220+
}
221+
222+
@Bean
223+
public ConsumerFactory<String, String> consumerFactory() {
224+
Map<String, Object> props = new HashMap<>();
225+
props.put(
226+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
227+
this.broker.getBrokersAsString());
228+
props.put(
229+
ConsumerConfig.GROUP_ID_CONFIG,
230+
"groupId");
231+
props.put(
232+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
233+
StringDeserializer.class);
234+
props.put(
235+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
236+
StringDeserializer.class);
237+
props.put(
238+
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
239+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
240+
241+
return new DefaultKafkaConsumerFactory<>(props);
242+
}
243+
244+
@Bean
245+
RetryTopicComponentFactory componentFactory() {
246+
return spy(new RetryTopicComponentFactory());
247+
}
248+
}
249+
}

0 commit comments

Comments
 (0)