Skip to content

Commit 761fb8f

Browse files
committed
Allow multiple listeners on ResponseBodyEmitter
Prior to this commit, `ResponseBodyEmitter` woud accept a single `Runnable` callback on each of its `onTimeout`, `onError` or `onCompletion` methods. This would limit the developers' ability to register multiple sets of callbacks: one for managing the publication of streaming values, another one for managing other concerns like keep-alive signals to maintain the connection. This commit now allows multiple calls to `onTimeout`, `onError` and `onCompletion` and will register all callbacks accordingly. Closes gh-33356
1 parent 2b6639e commit 761fb8f

File tree

2 files changed

+80
-16
lines changed

2 files changed

+80
-16
lines changed

Diff for: spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java

+20-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,9 @@
1717
package org.springframework.web.servlet.mvc.method.annotation;
1818

1919
import java.io.IOException;
20+
import java.util.ArrayList;
2021
import java.util.LinkedHashSet;
22+
import java.util.List;
2123
import java.util.Set;
2224
import java.util.function.Consumer;
2325

@@ -59,6 +61,7 @@
5961
*
6062
* @author Rossen Stoyanchev
6163
* @author Juergen Hoeller
64+
* @author Brian Clozel
6265
* @since 4.2
6366
*/
6467
public class ResponseBodyEmitter {
@@ -271,29 +274,32 @@ public synchronized void completeWithError(Throwable ex) {
271274
/**
272275
* Register code to invoke when the async request times out. This method is
273276
* called from a container thread when an async request times out.
277+
* <p>As of 6.2, one can register multiple callbacks for this event.
274278
*/
275279
public synchronized void onTimeout(Runnable callback) {
276-
this.timeoutCallback.setDelegate(callback);
280+
this.timeoutCallback.addDelegate(callback);
277281
}
278282

279283
/**
280284
* Register code to invoke for an error during async request processing.
281285
* This method is called from a container thread when an error occurred
282286
* while processing an async request.
287+
* <p>As of 6.2, one can register multiple callbacks for this event.
283288
* @since 5.0
284289
*/
285290
public synchronized void onError(Consumer<Throwable> callback) {
286-
this.errorCallback.setDelegate(callback);
291+
this.errorCallback.addDelegate(callback);
287292
}
288293

289294
/**
290295
* Register code to invoke when the async request completes. This method is
291296
* called from a container thread when an async request completed for any
292297
* reason including timeout and network error. This method is useful for
293298
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
299+
* <p>As of 6.2, one can register multiple callbacks for this event.
294300
*/
295301
public synchronized void onCompletion(Runnable callback) {
296-
this.completionCallback.setDelegate(callback);
302+
this.completionCallback.addDelegate(callback);
297303
}
298304

299305

@@ -363,37 +369,35 @@ public MediaType getMediaType() {
363369

364370
private class DefaultCallback implements Runnable {
365371

366-
@Nullable
367-
private Runnable delegate;
372+
private List<Runnable> delegates = new ArrayList<>(1);
368373

369-
public void setDelegate(Runnable delegate) {
370-
this.delegate = delegate;
374+
public void addDelegate(Runnable delegate) {
375+
this.delegates.add(delegate);
371376
}
372377

373378
@Override
374379
public void run() {
375380
ResponseBodyEmitter.this.complete = true;
376-
if (this.delegate != null) {
377-
this.delegate.run();
381+
for (Runnable delegate : this.delegates) {
382+
delegate.run();
378383
}
379384
}
380385
}
381386

382387

383388
private class ErrorCallback implements Consumer<Throwable> {
384389

385-
@Nullable
386-
private Consumer<Throwable> delegate;
390+
private List<Consumer<Throwable>> delegates = new ArrayList<>(1);
387391

388-
public void setDelegate(Consumer<Throwable> callback) {
389-
this.delegate = callback;
392+
public void addDelegate(Consumer<Throwable> callback) {
393+
this.delegates.add(callback);
390394
}
391395

392396
@Override
393397
public void accept(Throwable t) {
394398
ResponseBodyEmitter.this.complete = true;
395-
if (this.delegate != null) {
396-
this.delegate.accept(t);
399+
for(Consumer<Throwable> delegate : this.delegates) {
400+
delegate.accept(t);
397401
}
398402
}
399403
}

Diff for: spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java

+60
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.web.servlet.mvc.method.annotation;
1818

1919
import java.io.IOException;
20+
import java.util.function.Consumer;
2021

2122
import org.junit.jupiter.api.Test;
2223
import org.junit.jupiter.api.extension.ExtendWith;
@@ -31,6 +32,7 @@
3132
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
3233
import static org.mockito.ArgumentMatchers.any;
3334
import static org.mockito.ArgumentMatchers.anySet;
35+
import static org.mockito.ArgumentMatchers.eq;
3436
import static org.mockito.BDDMockito.willThrow;
3537
import static org.mockito.Mockito.mock;
3638
import static org.mockito.Mockito.verify;
@@ -41,6 +43,7 @@
4143
*
4244
* @author Rossen Stoyanchev
4345
* @author Tomasz Nurkiewicz
46+
* @author Brian Clozel
4447
*/
4548
@ExtendWith(MockitoExtension.class)
4649
public class ResponseBodyEmitterTests {
@@ -197,6 +200,25 @@ void onTimeoutAfterHandlerInitialized() throws Exception {
197200
verify(runnable).run();
198201
}
199202

203+
@Test
204+
void multipleOnTimeoutCallbacks() throws Exception {
205+
this.emitter.initialize(this.handler);
206+
207+
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
208+
verify(this.handler).onTimeout(captor.capture());
209+
verify(this.handler).onCompletion(any());
210+
211+
Runnable first = mock();
212+
Runnable second = mock();
213+
this.emitter.onTimeout(first);
214+
this.emitter.onTimeout(second);
215+
216+
assertThat(captor.getValue()).isNotNull();
217+
captor.getValue().run();
218+
verify(first).run();
219+
verify(second).run();
220+
}
221+
200222
@Test
201223
void onCompletionBeforeHandlerInitialized() throws Exception {
202224
Runnable runnable = mock();
@@ -228,4 +250,42 @@ void onCompletionAfterHandlerInitialized() throws Exception {
228250
verify(runnable).run();
229251
}
230252

253+
@Test
254+
void multipleOnCompletionCallbacks() throws Exception {
255+
this.emitter.initialize(this.handler);
256+
257+
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
258+
verify(this.handler).onTimeout(any());
259+
verify(this.handler).onCompletion(captor.capture());
260+
261+
Runnable first = mock();
262+
Runnable second = mock();
263+
this.emitter.onCompletion(first);
264+
this.emitter.onCompletion(second);
265+
266+
assertThat(captor.getValue()).isNotNull();
267+
captor.getValue().run();
268+
verify(first).run();
269+
verify(second).run();
270+
}
271+
272+
@Test
273+
void multipleOnErrorCallbacks() throws Exception {
274+
this.emitter.initialize(this.handler);
275+
276+
ArgumentCaptor<Consumer<Throwable>> captor = ArgumentCaptor.<Consumer<Throwable>, Consumer>forClass(Consumer.class);
277+
verify(this.handler).onError(captor.capture());
278+
279+
Consumer<Throwable> first = mock();
280+
Consumer<Throwable> second = mock();
281+
this.emitter.onError(first);
282+
this.emitter.onError(second);
283+
284+
assertThat(captor.getValue()).isNotNull();
285+
IllegalStateException illegalStateException = new IllegalStateException();
286+
captor.getValue().accept(illegalStateException);
287+
verify(first).accept(eq(illegalStateException));
288+
verify(second).accept(eq(illegalStateException));
289+
}
290+
231291
}

0 commit comments

Comments
 (0)