1
1
/*
2
- * Copyright 2019-2023 the original author or authors.
2
+ * Copyright 2019-2024 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
54
54
* @author Gary Russell
55
55
* @author Francois Rosiere
56
56
* @author Wang Zhiyang
57
+ *
57
58
* @since 2.3.1
58
59
*
59
60
*/
60
61
public class DefaultAfterRollbackProcessorTests {
61
62
62
- @ SuppressWarnings ("deprecation" )
63
63
@ Test
64
64
void testClassifier () {
65
65
AtomicReference <ConsumerRecord <?, ?>> recovered = new AtomicReference <>();
@@ -142,10 +142,27 @@ void testBackOffNoBatchRecover() {
142
142
verify (backOff , times (3 )).start ();
143
143
}
144
144
145
+ @ Test
146
+ void testEarlyExitBackOff () {
147
+ DefaultAfterRollbackProcessor <String , String > processor = new DefaultAfterRollbackProcessor <>(
148
+ new FixedBackOff (10_000 , 1 ));
149
+ @ SuppressWarnings ("unchecked" )
150
+ Consumer <String , String > consumer = mock (Consumer .class );
151
+ ConsumerRecord <String , String > record1 = new ConsumerRecord <>("foo" , 0 , 0L , "foo" , "bar" );
152
+ ConsumerRecord <String , String > record2 = new ConsumerRecord <>("foo" , 1 , 1L , "foo" , "bar" );
153
+ List <ConsumerRecord <String , String >> records = Arrays .asList (record1 , record2 );
154
+ IllegalStateException illegalState = new IllegalStateException ();
155
+ MessageListenerContainer container = mock (MessageListenerContainer .class );
156
+ given (container .isRunning ()).willReturn (false );
157
+ long t1 = System .currentTimeMillis ();
158
+ processor .process (records , consumer , container , illegalState , true , EOSMode .V2 );
159
+ assertThat (System .currentTimeMillis () < t1 + 5_000 ).isTrue ();
160
+ }
161
+
145
162
@ Test
146
163
void testNoEarlyExitBackOff () {
147
164
DefaultAfterRollbackProcessor <String , String > processor = new DefaultAfterRollbackProcessor <>(
148
- new FixedBackOff (1 , 200 ));
165
+ new FixedBackOff (200 , 1 ));
149
166
@ SuppressWarnings ("unchecked" )
150
167
Consumer <String , String > consumer = mock (Consumer .class );
151
168
ConsumerRecord <String , String > record1 = new ConsumerRecord <>("foo" , 0 , 0L , "foo" , "bar" );
@@ -156,7 +173,7 @@ void testNoEarlyExitBackOff() {
156
173
given (container .isRunning ()).willReturn (true );
157
174
long t1 = System .currentTimeMillis ();
158
175
processor .process (records , consumer , container , illegalState , true , EOSMode .V2 );
159
- assertThat (System .currentTimeMillis () >= t1 + 200 );
176
+ assertThat (System .currentTimeMillis () >= t1 + 200 ). isTrue () ;
160
177
}
161
178
162
179
}
0 commit comments