185
185
"annotated25" , "annotated25reply1" , "annotated25reply2" , "annotated26" , "annotated27" , "annotated28" ,
186
186
"annotated29" , "annotated30" , "annotated30reply" , "annotated31" , "annotated32" , "annotated33" ,
187
187
"annotated34" , "annotated35" , "annotated36" , "annotated37" , "foo" , "manualStart" , "seekOnIdle" ,
188
- "annotated38" , "annotated38reply" , "annotated39" , "annotated40" , "annotated41" , "annotated42" })
188
+ "annotated38" , "annotated38reply" , "annotated39" , "annotated40" , "annotated41" , "annotated42" ,
189
+ "annotated43" , "annotated43reply" })
189
190
@ TestPropertySource (properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10" )
190
191
public class EnableKafkaIntegrationTests {
191
192
@@ -439,6 +440,15 @@ public void testInterface() throws Exception {
439
440
template .send ("annotated7" , 0 , "foo" );
440
441
template .flush ();
441
442
assertThat (this .ifaceListener .getLatch1 ().await (60 , TimeUnit .SECONDS )).isTrue ();
443
+ Map <String , Object > consumerProps = new HashMap <>(this .consumerFactory .getConfigurationProperties ());
444
+ consumerProps .put (ConsumerConfig .GROUP_ID_CONFIG , "testInterface" );
445
+ ConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(consumerProps );
446
+ Consumer <Integer , String > consumer = cf .createConsumer ();
447
+ this .embeddedKafka .consumeFromAnEmbeddedTopic (consumer , "annotated43reply" );
448
+ template .send ("annotated43" , 0 , "foo" );
449
+ ConsumerRecord <Integer , String > reply = KafkaTestUtils .getSingleRecord (consumer , "annotated43reply" );
450
+ assertThat (reply ).extracting (rec -> rec .value ()).isEqualTo ("FOO" );
451
+ consumer .close ();
442
452
}
443
453
444
454
@ Test
@@ -1376,8 +1386,8 @@ public MultiListenerNoDefault multiNoDefault() {
1376
1386
}
1377
1387
1378
1388
@ Bean
1379
- public MultiListenerSendTo multiListenerSendTo () {
1380
- return new MultiListenerSendTo ();
1389
+ public MultiListenerSendToImpl multiListenerSendTo () {
1390
+ return new MultiListenerSendToImpl ();
1381
1391
}
1382
1392
1383
1393
@ Bean
@@ -2299,6 +2309,10 @@ interface IfaceListener<T> {
2299
2309
2300
2310
void listen (T foo );
2301
2311
2312
+ @ SendTo ("annotated43reply" )
2313
+ @ KafkaListener (id = "ifcR" , topics = "annotated43" )
2314
+ String reply (String in );
2315
+
2302
2316
}
2303
2317
2304
2318
static class IfaceListenerImpl implements IfaceListener <String > {
@@ -2319,6 +2333,11 @@ public void listenTx(String foo) {
2319
2333
latch2 .countDown ();
2320
2334
}
2321
2335
2336
+ @ Override
2337
+ public String reply (String in ) {
2338
+ return in .toUpperCase ();
2339
+ }
2340
+
2322
2341
public CountDownLatch getLatch1 () {
2323
2342
return latch1 ;
2324
2343
}
@@ -2425,15 +2444,25 @@ public void bar(@Valid ValidatedClass val) {
2425
2444
2426
2445
@ KafkaListener (id = "multiSendTo" , topics = "annotated25" )
2427
2446
@ SendTo ("annotated25reply1" )
2428
- static class MultiListenerSendTo {
2447
+ interface MultiListenerSendTo {
2448
+
2449
+ @ KafkaHandler
2450
+ String foo (String in );
2429
2451
2430
2452
@ KafkaHandler
2453
+ @ SendTo ("!{'annotated25reply2'}" )
2454
+ String bar (KafkaNull nul , int key );
2455
+
2456
+ }
2457
+
2458
+ static class MultiListenerSendToImpl implements MultiListenerSendTo {
2459
+
2460
+ @ Override
2431
2461
public String foo (String in ) {
2432
2462
return in .toUpperCase ();
2433
2463
}
2434
2464
2435
- @ KafkaHandler
2436
- @ SendTo ("!{'annotated25reply2'}" )
2465
+ @ Override
2437
2466
public String bar (@ Payload (required = false ) KafkaNull nul ,
2438
2467
@ Header (KafkaHeaders .RECEIVED_KEY ) int key ) {
2439
2468
return "BAR" ;
0 commit comments