34
34
import org .apache .kafka .streams .StreamsBuilder ;
35
35
import org .apache .kafka .streams .StreamsConfig ;
36
36
import org .apache .kafka .streams .errors .DeserializationExceptionHandler .DeserializationHandlerResponse ;
37
+ import org .apache .kafka .streams .errors .ErrorHandlerContext ;
37
38
import org .apache .kafka .streams .kstream .KStream ;
38
- import org .apache .kafka .streams .processor .ProcessorContext ;
39
39
import org .apache .kafka .streams .processor .WallclockTimestampExtractor ;
40
40
import org .junit .jupiter .api .Test ;
41
41
@@ -94,9 +94,9 @@ void viaStringProperty() {
94
94
Recoverer .class .getName ());
95
95
handler .configure (configs );
96
96
assertThat (KafkaTestUtils .getPropertyValue (handler , "recoverer" )).isInstanceOf (Recoverer .class );
97
- assertThat (handler .handle ((ProcessorContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
97
+ assertThat (handler .handle ((ErrorHandlerContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
98
98
new IllegalArgumentException ())).isEqualTo (DeserializationHandlerResponse .CONTINUE );
99
- assertThat (handler .handle ((ProcessorContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
99
+ assertThat (handler .handle ((ErrorHandlerContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
100
100
new IllegalStateException ())).isEqualTo (DeserializationHandlerResponse .FAIL );
101
101
}
102
102
@@ -107,9 +107,9 @@ void viaClassProperty() {
107
107
configs .put (RecoveringDeserializationExceptionHandler .KSTREAM_DESERIALIZATION_RECOVERER , Recoverer .class );
108
108
handler .configure (configs );
109
109
assertThat (KafkaTestUtils .getPropertyValue (handler , "recoverer" )).isInstanceOf (Recoverer .class );
110
- assertThat (handler .handle ((ProcessorContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
110
+ assertThat (handler .handle ((ErrorHandlerContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
111
111
new IllegalArgumentException ())).isEqualTo (DeserializationHandlerResponse .CONTINUE );
112
- assertThat (handler .handle ((ProcessorContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
112
+ assertThat (handler .handle ((ErrorHandlerContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
113
113
new IllegalStateException ())).isEqualTo (DeserializationHandlerResponse .FAIL );
114
114
}
115
115
@@ -121,16 +121,16 @@ void viaObjectProperty() {
121
121
configs .put (RecoveringDeserializationExceptionHandler .KSTREAM_DESERIALIZATION_RECOVERER , rec );
122
122
handler .configure (configs );
123
123
assertThat (KafkaTestUtils .getPropertyValue (handler , "recoverer" )).isSameAs (rec );
124
- assertThat (handler .handle ((ProcessorContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
124
+ assertThat (handler .handle ((ErrorHandlerContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
125
125
new IllegalArgumentException ())).isEqualTo (DeserializationHandlerResponse .CONTINUE );
126
- assertThat (handler .handle ((ProcessorContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
126
+ assertThat (handler .handle ((ErrorHandlerContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
127
127
new IllegalStateException ())).isEqualTo (DeserializationHandlerResponse .FAIL );
128
128
}
129
129
130
130
@ Test
131
131
void withNoRecoverer () {
132
132
RecoveringDeserializationExceptionHandler handler = new RecoveringDeserializationExceptionHandler ();
133
- assertThat (handler .handle ((ProcessorContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
133
+ assertThat (handler .handle ((ErrorHandlerContext ) null , new ConsumerRecord <>("foo" , 0 , 0 , null , null ),
134
134
new IllegalArgumentException ())).isEqualTo (DeserializationHandlerResponse .FAIL );
135
135
}
136
136
0 commit comments