1
1
/*
2
- * Copyright 2021-2022 the original author or authors.
2
+ * Copyright 2021-2023 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
18
18
19
19
import static org .assertj .core .api .Assertions .assertThat ;
20
20
import static org .assertj .core .api .Assertions .fail ;
21
+ import static org .mockito .Mockito .spy ;
21
22
22
23
import java .util .HashMap ;
23
24
import java .util .Map ;
52
53
import org .springframework .kafka .test .context .EmbeddedKafka ;
53
54
import org .springframework .messaging .handler .annotation .Header ;
54
55
import org .springframework .retry .annotation .Backoff ;
55
- import org .springframework .scheduling .TaskScheduler ;
56
- import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
57
56
import org .springframework .stereotype .Component ;
58
57
import org .springframework .test .annotation .DirtiesContext ;
59
58
import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
60
59
import org .springframework .util .backoff .FixedBackOff ;
61
60
62
61
/**
63
62
* @author Tomaz Fernandes
63
+ * @author Cenk Akin
64
64
* @since 2.8.3
65
65
*/
66
66
@ SpringJUnitConfig
67
67
@ DirtiesContext
68
68
@ EmbeddedKafka (topics = { RetryTopicSameContainerFactoryIntegrationTests .FIRST_TOPIC ,
69
- RetryTopicSameContainerFactoryIntegrationTests .SECOND_TOPIC }, partitions = 1 )
69
+ RetryTopicSameContainerFactoryIntegrationTests .SECOND_TOPIC , RetryTopicSameContainerFactoryIntegrationTests . THIRD_TOPIC }, partitions = 1 )
70
70
public class RetryTopicSameContainerFactoryIntegrationTests {
71
71
72
72
private static final Logger logger = LoggerFactory .getLogger (RetryTopicSameContainerFactoryIntegrationTests .class );
@@ -75,6 +75,8 @@ public class RetryTopicSameContainerFactoryIntegrationTests {
75
75
76
76
public final static String SECOND_TOPIC = "myRetryTopic2" ;
77
77
78
+ public final static String THIRD_TOPIC = "myRetryTopic3" ;
79
+
78
80
@ Autowired
79
81
private KafkaTemplate <String , String > sendKafkaTemplate ;
80
82
@@ -87,9 +89,13 @@ void shouldRetryFirstAndSecondTopics() {
87
89
sendKafkaTemplate .send (FIRST_TOPIC , "Testing topic 1" );
88
90
logger .debug ("Sending message to topic " + SECOND_TOPIC );
89
91
sendKafkaTemplate .send (SECOND_TOPIC , "Testing topic 2" );
90
- assertThat (awaitLatch (latchContainer .countDownLatch1 )).isTrue ();
92
+ logger .debug ("Sending message to topic " + THIRD_TOPIC );
93
+ sendKafkaTemplate .send (THIRD_TOPIC , "Testing topic 3" );
94
+ assertThat (awaitLatch (latchContainer .countDownLatchFirstRetryable )).isTrue ();
91
95
assertThat (awaitLatch (latchContainer .countDownLatchDltOne )).isTrue ();
92
- assertThat (awaitLatch (latchContainer .countDownLatch2 )).isTrue ();
96
+ assertThat (awaitLatch (latchContainer .countDownLatchSecondRetryable )).isTrue ();
97
+ assertThat (awaitLatch (latchContainer .countDownLatchDltSecond )).isTrue ();
98
+ assertThat (awaitLatch (latchContainer .countDownLatchBasic )).isTrue ();
93
99
assertThat (awaitLatch (latchContainer .customizerLatch )).isTrue ();
94
100
}
95
101
@@ -104,7 +110,7 @@ private boolean awaitLatch(CountDownLatch latch) {
104
110
}
105
111
106
112
@ Component
107
- static class RetryableKafkaListener {
113
+ static class FirstRetryableKafkaListener {
108
114
109
115
@ Autowired
110
116
CountDownLatchContainer countDownLatchContainer ;
@@ -116,9 +122,9 @@ static class RetryableKafkaListener {
116
122
topicSuffixingStrategy = TopicSuffixingStrategy .SUFFIX_WITH_INDEX_VALUE )
117
123
@ KafkaListener (topics = RetryTopicSameContainerFactoryIntegrationTests .FIRST_TOPIC )
118
124
public void listen (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
119
- countDownLatchContainer .countDownLatch1 .countDown ();
125
+ countDownLatchContainer .countDownLatchFirstRetryable .countDown ();
120
126
logger .warn (in + " from " + topic );
121
- throw new RuntimeException ("test " );
127
+ throw new RuntimeException ("from FirstRetryableKafkaListener " );
122
128
}
123
129
124
130
@ DltHandler
@@ -129,30 +135,52 @@ public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
129
135
}
130
136
131
137
@ Component
132
- static class BasicKafkaListener {
138
+ static class SecondRetryableKafkaListener {
133
139
134
140
@ Autowired
135
141
CountDownLatchContainer countDownLatchContainer ;
136
142
143
+ @ RetryableTopic
137
144
@ KafkaListener (topics = RetryTopicSameContainerFactoryIntegrationTests .SECOND_TOPIC )
145
+ public void listen (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
146
+ countDownLatchContainer .countDownLatchSecondRetryable .countDown ();
147
+ logger .info (in + " from " + topic );
148
+ throw new RuntimeException ("from SecondRetryableKafkaListener" );
149
+ }
150
+
151
+ @ DltHandler
152
+ public void dlt (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
153
+ countDownLatchContainer .countDownLatchDltSecond .countDown ();
154
+ logger .warn (in + " from " + topic );
155
+ }
156
+ }
157
+
158
+
159
+ @ Component
160
+ static class BasicKafkaListener {
161
+
162
+ @ KafkaListener (topics = RetryTopicSameContainerFactoryIntegrationTests .THIRD_TOPIC )
138
163
public void listen (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
139
164
logger .info (in + " from " + topic );
140
- throw new RuntimeException ("another test " );
165
+ throw new RuntimeException ("from BasicKafkaListener " );
141
166
}
142
167
}
143
168
144
169
@ Component
145
170
static class CountDownLatchContainer {
146
171
147
- CountDownLatch countDownLatch1 = new CountDownLatch (4 );
148
- CountDownLatch countDownLatch2 = new CountDownLatch (1 );
172
+ CountDownLatch countDownLatchFirstRetryable = new CountDownLatch (4 );
173
+ CountDownLatch countDownLatchSecondRetryable = new CountDownLatch (3 );
149
174
CountDownLatch countDownLatchDltOne = new CountDownLatch (1 );
150
- CountDownLatch customizerLatch = new CountDownLatch (6 );
175
+ CountDownLatch countDownLatchDltSecond = new CountDownLatch (1 );
176
+
177
+ CountDownLatch countDownLatchBasic = new CountDownLatch (1 );
178
+ CountDownLatch customizerLatch = new CountDownLatch (10 );
151
179
}
152
180
153
181
@ EnableKafka
154
182
@ Configuration
155
- static class Config extends RetryTopicConfigurationSupport {
183
+ static class Config {
156
184
157
185
@ Autowired
158
186
EmbeddedKafkaBroker broker ;
@@ -163,8 +191,13 @@ CountDownLatchContainer latchContainer() {
163
191
}
164
192
165
193
@ Bean
166
- RetryableKafkaListener retryableKafkaListener () {
167
- return new RetryableKafkaListener ();
194
+ FirstRetryableKafkaListener firstRetryableKafkaListener () {
195
+ return new FirstRetryableKafkaListener ();
196
+ }
197
+
198
+ @ Bean
199
+ SecondRetryableKafkaListener secondRetryableKafkaListener () {
200
+ return new SecondRetryableKafkaListener ();
168
201
}
169
202
170
203
@ Bean
@@ -184,7 +217,7 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
184
217
props .setIdlePartitionEventInterval (100L );
185
218
factory .setConsumerFactory (consumerFactory );
186
219
DefaultErrorHandler errorHandler = new DefaultErrorHandler (
187
- (cr , ex ) -> latchContainer .countDownLatch2 .countDown (),
220
+ (cr , ex ) -> latchContainer .countDownLatchBasic .countDown (),
188
221
new FixedBackOff (0 , 2 ));
189
222
factory .setCommonErrorHandler (errorHandler );
190
223
factory .setConcurrency (1 );
@@ -236,8 +269,8 @@ public ConsumerFactory<String, String> consumerFactory() {
236
269
}
237
270
238
271
@ Bean
239
- TaskScheduler sched () {
240
- return new ThreadPoolTaskScheduler ( );
272
+ RetryTopicComponentFactory componentFactory () {
273
+ return spy ( new RetryTopicComponentFactory () );
241
274
}
242
275
243
276
}
0 commit comments