182
182
"annotated25" , "annotated25reply1" , "annotated25reply2" , "annotated26" , "annotated27" , "annotated28" ,
183
183
"annotated29" , "annotated30" , "annotated30reply" , "annotated31" , "annotated32" , "annotated33" ,
184
184
"annotated34" , "annotated35" , "annotated36" , "annotated37" , "foo" , "manualStart" , "seekOnIdle" ,
185
- "annotated38" , "annotated38reply" , "annotated39" , "annotated40" , "annotated41" , "annotated42" })
185
+ "annotated38" , "annotated38reply" , "annotated39" , "annotated40" , "annotated41" , "annotated42" ,
186
+ "annotated43" , "annotated43reply" })
186
187
@ TestPropertySource (properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10" )
187
188
public class EnableKafkaIntegrationTests {
188
189
@@ -431,6 +432,15 @@ public void testInterface() throws Exception {
431
432
template .send ("annotated7" , 0 , "foo" );
432
433
template .flush ();
433
434
assertThat (this .ifaceListener .getLatch1 ().await (60 , TimeUnit .SECONDS )).isTrue ();
435
+ Map <String , Object > consumerProps = new HashMap <>(this .consumerFactory .getConfigurationProperties ());
436
+ consumerProps .put (ConsumerConfig .GROUP_ID_CONFIG , "testInterface" );
437
+ ConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(consumerProps );
438
+ Consumer <Integer , String > consumer = cf .createConsumer ();
439
+ this .embeddedKafka .consumeFromAnEmbeddedTopic (consumer , "annotated43reply" );
440
+ template .send ("annotated43" , 0 , "foo" );
441
+ ConsumerRecord <Integer , String > reply = KafkaTestUtils .getSingleRecord (consumer , "annotated43reply" );
442
+ assertThat (reply ).extracting (rec -> rec .value ()).isEqualTo ("FOO" );
443
+ consumer .close ();
434
444
}
435
445
436
446
@ Test
@@ -1373,8 +1383,8 @@ public MultiListenerNoDefault multiNoDefault() {
1373
1383
}
1374
1384
1375
1385
@ Bean
1376
- public MultiListenerSendTo multiListenerSendTo () {
1377
- return new MultiListenerSendTo ();
1386
+ public MultiListenerSendToImpl multiListenerSendTo () {
1387
+ return new MultiListenerSendToImpl ();
1378
1388
}
1379
1389
1380
1390
@ Bean
@@ -2296,6 +2306,10 @@ interface IfaceListener<T> {
2296
2306
2297
2307
void listen (T foo );
2298
2308
2309
+ @ SendTo ("annotated43reply" )
2310
+ @ KafkaListener (id = "ifcR" , topics = "annotated43" )
2311
+ String reply (String in );
2312
+
2299
2313
}
2300
2314
2301
2315
static class IfaceListenerImpl implements IfaceListener <String > {
@@ -2316,6 +2330,11 @@ public void listenTx(String foo) {
2316
2330
latch2 .countDown ();
2317
2331
}
2318
2332
2333
+ @ Override
2334
+ public String reply (String in ) {
2335
+ return in .toUpperCase ();
2336
+ }
2337
+
2319
2338
public CountDownLatch getLatch1 () {
2320
2339
return latch1 ;
2321
2340
}
@@ -2422,15 +2441,25 @@ public void bar(@Valid ValidatedClass val) {
2422
2441
2423
2442
@ KafkaListener (id = "multiSendTo" , topics = "annotated25" )
2424
2443
@ SendTo ("annotated25reply1" )
2425
- static class MultiListenerSendTo {
2444
+ interface MultiListenerSendTo {
2445
+
2446
+ @ KafkaHandler
2447
+ String foo (String in );
2426
2448
2427
2449
@ KafkaHandler
2450
+ @ SendTo ("!{'annotated25reply2'}" )
2451
+ String bar (KafkaNull nul , int key );
2452
+
2453
+ }
2454
+
2455
+ static class MultiListenerSendToImpl implements MultiListenerSendTo {
2456
+
2457
+ @ Override
2428
2458
public String foo (String in ) {
2429
2459
return in .toUpperCase ();
2430
2460
}
2431
2461
2432
- @ KafkaHandler
2433
- @ SendTo ("!{'annotated25reply2'}" )
2462
+ @ Override
2434
2463
public String bar (@ Payload (required = false ) KafkaNull nul ,
2435
2464
@ Header (KafkaHeaders .RECEIVED_KEY ) int key ) {
2436
2465
return "BAR" ;
0 commit comments