Skip to content

Commit 6fbc6cb

Browse files
authored
GH-2284: Publish Auth Error Events
Fixes #2284 * Added Events for auth failed and successful retry of auth * publish of these events * unit testing * new files only 2022 * copy paste error * javadoc wrapping * remove `@since` tags not at top level * event documentation * `@author` on changed classes * corrected javadoc
1 parent f3215c8 commit 6fbc6cb

File tree

5 files changed

+188
-3
lines changed

5 files changed

+188
-3
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+11-1
Original file line numberDiff line numberDiff line change
@@ -2778,6 +2778,8 @@ An error message is also logged when this condition occurs.
27782778
* `ConsumerStoppingEvent`: published by each consumer just before stopping.
27792779
* `ConsumerStoppedEvent`: published after the consumer is closed.
27802780
See <<thread-safety>>.
2781+
* `ConsumerRetryAuthEvent`: published when authentication or authorization of a consumer fails and is being retried.
2782+
* `ConsumerRetryAuthSuccessfulEvent`: published when authentication or authorization has been retried successfully. Can only occur when there has been a `ConsumerRetryAuthEvent` before.
27812783
* `ContainerStoppedEvent`: published when all consumers have stopped.
27822784

27832785
IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
@@ -2837,7 +2839,15 @@ The `ConsumerPartitionPausedEvent`, `ConsumerPartitionResumedEvent` events have
28372839
* `container`: The listener container or the parent listener container, if the source container is a child.
28382840
* `partition`: The `TopicPartition` instance involved.
28392841

2840-
The `ConsumerStartingEvent`, `ConsumerStartingEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent` and `ContainerStoppedEvent` events have the following properties:
2842+
The `ConsumerRetryAuthEvent` event has the following properties:
2843+
2844+
* `source`: The listener container instance that published the event.
2845+
* `container`: The listener container or the parent listener container, if the source container is a child.
2846+
* `reason`
2847+
** `AUTHENTICATION` - the event was published because of an authentication exception.
2848+
** `AUTHORIZATION` - the event was published because of an authorization exception.
2849+
2850+
The `ConsumerStartingEvent`, `ConsumerStartingEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent`, `ConsumerRetryAuthSuccessfulEvent` and `ContainerStoppedEvent` events have the following properties:
28412851

28422852
* `source`: The listener container instance that published the event.
28432853
* `container`: The listener container or the parent listener container, if the source container is a child.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
/**
20+
* An event published when authentication or authorization of a consumer fails and
21+
* is being retried. Contains the reason for this event.
22+
*
23+
* @author Daniel Gentes
24+
* @since 3.0
25+
*
26+
*/
27+
public class ConsumerRetryAuthEvent extends KafkaEvent {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
/**
32+
* Reasons for retrying auth a consumer.
33+
*/
34+
public enum Reason {
35+
/**
36+
* An authentication exception occurred.
37+
*/
38+
AUTHENTICATION,
39+
40+
/**
41+
* An authorization exception occurred.
42+
*/
43+
AUTHORIZATION
44+
}
45+
46+
private final Reason reason;
47+
48+
/**
49+
* Construct an instance with the provided source and container.
50+
* @param source the container instance that generated the event.
51+
* @param container the container or the parent container
52+
* if the container is a child.
53+
* @param reason the reason.
54+
*/
55+
public ConsumerRetryAuthEvent(Object source, Object container, Reason reason) {
56+
super(source, container);
57+
this.reason = reason;
58+
}
59+
60+
/**
61+
* Return the reason for the auth failure.
62+
* @return the reason.
63+
*/
64+
public Reason getReason() {
65+
return this.reason;
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return "ConsumerRetryAuthEvent [source=" + getSource() + ", reason=" + this.reason + "]";
71+
}
72+
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
/**
20+
* An event published when authentication or authorization has been retried successfully.
21+
*
22+
* @author Daniel Gentes
23+
* @since 3.0
24+
*
25+
*/
26+
public class ConsumerRetryAuthSuccessfulEvent extends KafkaEvent {
27+
28+
private static final long serialVersionUID = 1L;
29+
30+
/**
31+
* Construct an instance with the provided source and container.
32+
* @param source the container instance that generated the event.
33+
* @param container the container or the parent container
34+
* if the container is a child.
35+
*/
36+
public ConsumerRetryAuthSuccessfulEvent(Object source, Object container) {
37+
super(source, container);
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return "ConsumerRetryAuthSuccessfulEvent [source=" + getSource() + "]";
43+
}
44+
45+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+34
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@
8282
import org.springframework.kafka.event.ConsumerPartitionResumedEvent;
8383
import org.springframework.kafka.event.ConsumerPausedEvent;
8484
import org.springframework.kafka.event.ConsumerResumedEvent;
85+
import org.springframework.kafka.event.ConsumerRetryAuthEvent;
86+
import org.springframework.kafka.event.ConsumerRetryAuthSuccessfulEvent;
8587
import org.springframework.kafka.event.ConsumerStartedEvent;
8688
import org.springframework.kafka.event.ConsumerStartingEvent;
8789
import org.springframework.kafka.event.ConsumerStoppedEvent;
@@ -146,6 +148,7 @@
146148
* @author Lukasz Kaminski
147149
* @author Tomaz Fernandes
148150
* @author Francois Rosiere
151+
* @author Daniel Gentes
149152
*/
150153
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
151154
extends AbstractMessageListenerContainer<K, V> {
@@ -529,6 +532,30 @@ private void publishConsumerFailedToStart() {
529532
}
530533
}
531534

535+
private void publishRetryAuthEvent(Throwable throwable) {
536+
ApplicationEventPublisher publisher = getApplicationEventPublisher();
537+
if (publisher != null) {
538+
ConsumerRetryAuthEvent.Reason reason;
539+
if (throwable instanceof AuthenticationException) {
540+
reason = ConsumerRetryAuthEvent.Reason.AUTHENTICATION;
541+
}
542+
else if (throwable instanceof AuthorizationException) {
543+
reason = ConsumerRetryAuthEvent.Reason.AUTHORIZATION;
544+
}
545+
else {
546+
throw new IllegalArgumentException("Only Authentication or Authorization Excetions are allowed", throwable);
547+
}
548+
publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason));
549+
}
550+
}
551+
552+
private void publishRetryAuthSuccessfulEvent() {
553+
ApplicationEventPublisher publisher = getApplicationEventPublisher();
554+
if (publisher != null) {
555+
publisher.publishEvent(new ConsumerRetryAuthSuccessfulEvent(this, this.thisOrParentContainer));
556+
}
557+
}
558+
532559
@Override
533560
protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
534561
return this.thisOrParentContainer;
@@ -1237,9 +1264,14 @@ public void run() {
12371264
initAssignedPartitions();
12381265
publishConsumerStartedEvent();
12391266
Throwable exitThrowable = null;
1267+
boolean failedAuthRetry = false;
12401268
while (isRunning()) {
12411269
try {
12421270
pollAndInvoke();
1271+
if (failedAuthRetry) {
1272+
publishRetryAuthSuccessfulEvent();
1273+
failedAuthRetry = false;
1274+
}
12431275
}
12441276
catch (NoOffsetForPartitionException nofpe) {
12451277
this.fatalError = true;
@@ -1259,6 +1291,8 @@ public void run() {
12591291
ListenerConsumer.this.logger.error(ae,
12601292
"Authentication/Authorization Exception, retrying in "
12611293
+ this.authExceptionRetryInterval.toMillis() + " ms");
1294+
publishRetryAuthEvent(ae);
1295+
failedAuthRetry = true;
12621296
// We can't pause/resume here, as KafkaConsumer doesn't take pausing
12631297
// into account when committing, hence risk of being flooded with
12641298
// GroupAuthorizationExceptions.

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@
102102
import org.springframework.kafka.core.ProducerFactory;
103103
import org.springframework.kafka.event.ConsumerPausedEvent;
104104
import org.springframework.kafka.event.ConsumerResumedEvent;
105+
import org.springframework.kafka.event.ConsumerRetryAuthEvent;
106+
import org.springframework.kafka.event.ConsumerRetryAuthSuccessfulEvent;
105107
import org.springframework.kafka.event.ConsumerStoppedEvent;
106108
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
107109
import org.springframework.kafka.event.ConsumerStoppingEvent;
@@ -135,6 +137,7 @@
135137
* @author Loic Talhouarne
136138
* @author Lukasz Kaminski
137139
* @author Ray Chuan Tay
140+
* @author Daniel Gentes
138141
*/
139142
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
140143
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
@@ -3224,9 +3227,17 @@ void testNotFatalErrorOnAuthorizationException() throws Exception {
32243227
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
32253228
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
32263229
CountDownLatch latch = new CountDownLatch(2);
3230+
CountDownLatch retryEvent = new CountDownLatch(2);
3231+
CountDownLatch retrySuccessfulEventFired = new CountDownLatch(1);
3232+
AtomicReference<ConsumerRetryAuthEvent.Reason> reason = new AtomicReference<>();
32273233
willAnswer(invoc -> {
3228-
latch.countDown();
3229-
throw new TopicAuthorizationException("test");
3234+
if (latch.getCount() > 0) {
3235+
latch.countDown();
3236+
throw new TopicAuthorizationException("test");
3237+
}
3238+
else {
3239+
return new ConsumerRecords<>(Collections.emptyMap());
3240+
}
32303241
}).given(consumer).poll(any());
32313242

32323243
ContainerProperties containerProps = new ContainerProperties(topic1);
@@ -3236,8 +3247,20 @@ void testNotFatalErrorOnAuthorizationException() throws Exception {
32363247
containerProps.setAuthExceptionRetryInterval(Duration.ofMillis(100));
32373248
KafkaMessageListenerContainer<Integer, String> container =
32383249
new KafkaMessageListenerContainer<>(cf, containerProps);
3250+
container.setApplicationEventPublisher(e -> {
3251+
if (e instanceof ConsumerRetryAuthEvent) {
3252+
reason.set(((ConsumerRetryAuthEvent) e).getReason());
3253+
retryEvent.countDown();
3254+
}
3255+
else if (e instanceof ConsumerRetryAuthSuccessfulEvent) {
3256+
retrySuccessfulEventFired.countDown();
3257+
}
3258+
});
32393259
container.start();
32403260
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3261+
assertThat(retryEvent.await(10, TimeUnit.SECONDS)).isTrue();
3262+
assertThat(reason.get()).isEqualTo(ConsumerRetryAuthEvent.Reason.AUTHORIZATION);
3263+
assertThat(retrySuccessfulEventFired.await(10, TimeUnit.SECONDS)).isTrue();
32413264
container.stop();
32423265
assertThat(container.isInExpectedState()).isTrue();
32433266
}

0 commit comments

Comments
 (0)