43
43
import org .springframework .kafka .core .ProducerFactory ;
44
44
import org .springframework .kafka .event .ContainerStoppedEvent ;
45
45
import org .springframework .kafka .test .EmbeddedKafkaBroker ;
46
+ import org .springframework .kafka .test .condition .LogLevels ;
46
47
import org .springframework .kafka .test .context .EmbeddedKafka ;
47
48
import org .springframework .kafka .test .utils .KafkaTestUtils ;
48
49
import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
@@ -59,25 +60,32 @@ public class ContainerGroupSequencerTests {
59
60
private static final LogAccessor LOGGER = new LogAccessor (LogFactory .getLog (ContainerGroupSequencerTests .class ));
60
61
61
62
@ Test
62
- void sequenceCompletes (@ Autowired Config config , @ Autowired KafkaTemplate <Integer , String > template )
63
+ @ LogLevels (classes = { ContainerGroupSequencerTests .class , ContainerGroupSequencer .class }, level = "DEBUG" )
64
+ void sequenceCompletes (@ Autowired Config config , @ Autowired KafkaTemplate <Integer , String > template ,
65
+ @ Autowired ContainerGroupSequencer sequencer )
63
66
throws InterruptedException {
64
67
68
+ sequencer .start ();
65
69
template .send ("ContainerGroupSequencerTests" , "test" );
66
70
assertThat (config .stopped .await (10 , TimeUnit .SECONDS ))
67
71
.as ("stopped latch still has a count of %d" , config .stopped .getCount ())
68
72
.isTrue ();
69
73
List <String > order = config .order ;
70
- assertThat (order .get (0 ))
71
- .as ("out of order %s" )
74
+ String expected = order .get (0 );
75
+ assertThat (expected )
76
+ .as ("out of order %s" , expected )
72
77
.isIn ("one" , "two" );
73
- assertThat (order .get (1 ))
74
- .as ("out of order %s" )
78
+ expected = order .get (1 );
79
+ assertThat (expected )
80
+ .as ("out of order %s" , expected )
75
81
.isIn ("one" , "two" );
76
- assertThat (order .get (2 ))
77
- .as ("out of order %s" )
82
+ expected = order .get (2 );
83
+ assertThat (expected )
84
+ .as ("out of order %s" , expected )
78
85
.isIn ("three" , "four" );
79
- assertThat (order .get (3 ))
80
- .as ("out of order %s" )
86
+ expected = order .get (3 );
87
+ assertThat (expected )
88
+ .as ("out of order %s" , expected )
81
89
.isIn ("three" , "four" );
82
90
assertThat (config .receivedAt .get (3 ) - config .receivedAt .get (0 )).isGreaterThanOrEqualTo (1000 );
83
91
}
@@ -95,24 +103,28 @@ public static class Config {
95
103
96
104
@ KafkaListener (id = "one" , topics = "ContainerGroupSequencerTests" , containerGroup = "g1" , concurrency = "2" )
97
105
public void listen1 (String in ) {
106
+ LOGGER .debug (in );
98
107
this .order .add ("one" );
99
108
this .receivedAt .add (System .currentTimeMillis ());
100
109
}
101
110
102
111
@ KafkaListener (id = "two" , topics = "ContainerGroupSequencerTests" , containerGroup = "g1" , concurrency = "2" )
103
112
public void listen2 (String in ) {
113
+ LOGGER .debug (in );
104
114
this .order .add ("two" );
105
115
this .receivedAt .add (System .currentTimeMillis ());
106
116
}
107
117
108
118
@ KafkaListener (id = "three" , topics = "ContainerGroupSequencerTests" , containerGroup = "g2" , concurrency = "2" )
109
119
public void listen3 (String in ) {
120
+ LOGGER .debug (in );
110
121
this .order .add ("three" );
111
122
this .receivedAt .add (System .currentTimeMillis ());
112
123
}
113
124
114
125
@ KafkaListener (id = "four" , topics = "ContainerGroupSequencerTests" , containerGroup = "g2" , concurrency = "2" )
115
126
public void listen4 (String in ) {
127
+ LOGGER .debug (in );
116
128
this .order .add ("four" );
117
129
this .receivedAt .add (System .currentTimeMillis ());
118
130
}
@@ -127,6 +139,7 @@ public void stopped(ContainerStoppedEvent event) {
127
139
ContainerGroupSequencer sequencer (KafkaListenerEndpointRegistry registry ) {
128
140
ContainerGroupSequencer sequencer = new ContainerGroupSequencer (registry , 1000 , "g1" , "g2" );
129
141
sequencer .setStopLastGroupWhenIdle (true );
142
+ sequencer .setAutoStartup (false );
130
143
return sequencer ;
131
144
}
132
145
0 commit comments