Skip to content

Commit 877e0b1

Browse files
committed
Improve concurrent handling of result in WebAsyncManager
1. Use state transitions 2. Increase synchronized scope in setConcurrentResultAndDispatch See spring-projectsgh-32340
1 parent 379ffac commit 877e0b1

File tree

1 file changed

+87
-51
lines changed

1 file changed

+87
-51
lines changed

spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java

+87-51
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.Callable;
2424
import java.util.concurrent.Future;
25+
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import jakarta.servlet.http.HttpServletRequest;
2728
import org.apache.commons.logging.Log;
@@ -33,7 +34,6 @@
3334
import org.springframework.util.Assert;
3435
import org.springframework.web.context.request.RequestAttributes;
3536
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
36-
import org.springframework.web.util.DisconnectedClientHelper;
3737

3838
/**
3939
* The central class for managing asynchronous request processing, mainly intended
@@ -67,16 +67,6 @@ public final class WebAsyncManager {
6767

6868
private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
6969

70-
/**
71-
* Log category to use for network failure after a client has gone away.
72-
* @see DisconnectedClientHelper
73-
*/
74-
private static final String DISCONNECTED_CLIENT_LOG_CATEGORY =
75-
"org.springframework.web.server.DisconnectedClient";
76-
77-
private static final DisconnectedClientHelper disconnectedClientHelper =
78-
new DisconnectedClientHelper(DISCONNECTED_CLIENT_LOG_CATEGORY);
79-
8070
private static final CallableProcessingInterceptor timeoutCallableInterceptor =
8171
new TimeoutCallableProcessingInterceptor();
8272

@@ -95,12 +85,7 @@ public final class WebAsyncManager {
9585
@Nullable
9686
private volatile Object[] concurrentResultContext;
9787

98-
/*
99-
* Whether the concurrentResult is an error. If such errors remain unhandled, some
100-
* Servlet containers will call AsyncListener#onError at the end, after the ASYNC
101-
* and/or the ERROR dispatch (Boot's case), and we need to ignore those.
102-
*/
103-
private volatile boolean errorHandlingInProgress;
88+
private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
10489

10590
private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<>();
10691

@@ -262,6 +247,12 @@ public void registerDeferredResultInterceptors(DeferredResultProcessingIntercept
262247
* {@linkplain #getConcurrentResultContext() concurrentResultContext}.
263248
*/
264249
public void clearConcurrentResult() {
250+
if (!this.state.compareAndSet(State.RESULT_SET, State.NOT_STARTED)) {
251+
if (logger.isDebugEnabled()) {
252+
logger.debug("Unexpected call to clear: [" + this.state.get() + "]");
253+
}
254+
return;
255+
}
265256
synchronized (WebAsyncManager.this) {
266257
this.concurrentResult = RESULT_NONE;
267258
this.concurrentResultContext = null;
@@ -302,6 +293,11 @@ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object..
302293
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
303294
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
304295

296+
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
297+
throw new IllegalStateException(
298+
"Unexpected call to startCallableProcessing: [" + this.state.get() + "]");
299+
}
300+
305301
Long timeout = webAsyncTask.getTimeout();
306302
if (timeout != null) {
307303
this.asyncWebRequest.setTimeout(timeout);
@@ -322,7 +318,7 @@ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object..
322318

323319
this.asyncWebRequest.addTimeoutHandler(() -> {
324320
if (logger.isDebugEnabled()) {
325-
logger.debug("Async request timeout for " + formatUri(this.asyncWebRequest));
321+
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
326322
}
327323
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
328324
if (result != CallableProcessingInterceptor.RESULT_NONE) {
@@ -331,14 +327,12 @@ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object..
331327
});
332328

333329
this.asyncWebRequest.addErrorHandler(ex -> {
334-
if (!this.errorHandlingInProgress) {
335-
if (logger.isDebugEnabled()) {
336-
logger.debug("Async request error for " + formatUri(this.asyncWebRequest) + ": " + ex);
337-
}
338-
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
339-
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
340-
setConcurrentResultAndDispatch(result);
330+
if (logger.isDebugEnabled()) {
331+
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest) + ": " + ex);
341332
}
333+
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
334+
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
335+
setConcurrentResultAndDispatch(result);
342336
});
343337

344338
this.asyncWebRequest.addCompletionHandler(() ->
@@ -370,31 +364,34 @@ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object..
370364
}
371365

372366
private void setConcurrentResultAndDispatch(@Nullable Object result) {
367+
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
373368
synchronized (WebAsyncManager.this) {
374-
if (this.concurrentResult != RESULT_NONE) {
369+
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
370+
if (logger.isDebugEnabled()) {
371+
logger.debug("Async result already set: " +
372+
"[" + this.state.get() + "], ignored result: " + result +
373+
" for " + formatUri(this.asyncWebRequest));
374+
}
375375
return;
376376
}
377-
this.concurrentResult = result;
378-
this.errorHandlingInProgress = (result instanceof Throwable);
379-
}
380377

381-
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
382-
if (this.asyncWebRequest.isAsyncComplete()) {
378+
this.concurrentResult = result;
383379
if (logger.isDebugEnabled()) {
384-
logger.debug("Async result set but request already complete: " + formatUri(this.asyncWebRequest));
380+
logger.debug("Async result set to: " + result + " for " + formatUri(this.asyncWebRequest));
385381
}
386-
return;
387-
}
388382

389-
if (result instanceof Exception ex && disconnectedClientHelper.checkAndLogClientDisconnectedException(ex)) {
390-
return;
391-
}
383+
if (this.asyncWebRequest.isAsyncComplete()) {
384+
if (logger.isDebugEnabled()) {
385+
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
386+
}
387+
return;
388+
}
392389

393-
if (logger.isDebugEnabled()) {
394-
logger.debug("Async " + (this.errorHandlingInProgress ? "error" : "result set") +
395-
", dispatch to " + formatUri(this.asyncWebRequest));
390+
if (logger.isDebugEnabled()) {
391+
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
392+
}
393+
this.asyncWebRequest.dispatch();
396394
}
397-
this.asyncWebRequest.dispatch();
398395
}
399396

400397
/**
@@ -417,6 +414,11 @@ public void startDeferredResultProcessing(
417414
Assert.notNull(deferredResult, "DeferredResult must not be null");
418415
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
419416

417+
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
418+
throw new IllegalStateException(
419+
"Unexpected call to startDeferredResultProcessing: [" + this.state.get() + "]");
420+
}
421+
420422
Long timeout = deferredResult.getTimeoutValue();
421423
if (timeout != null) {
422424
this.asyncWebRequest.setTimeout(timeout);
@@ -430,6 +432,9 @@ public void startDeferredResultProcessing(
430432
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
431433

432434
this.asyncWebRequest.addTimeoutHandler(() -> {
435+
if (logger.isDebugEnabled()) {
436+
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
437+
}
433438
try {
434439
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
435440
}
@@ -439,16 +444,17 @@ public void startDeferredResultProcessing(
439444
});
440445

441446
this.asyncWebRequest.addErrorHandler(ex -> {
442-
if (!this.errorHandlingInProgress) {
443-
try {
444-
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
445-
return;
446-
}
447-
deferredResult.setErrorResult(ex);
448-
}
449-
catch (Throwable interceptorEx) {
450-
setConcurrentResultAndDispatch(interceptorEx);
447+
if (logger.isDebugEnabled()) {
448+
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest));
449+
}
450+
try {
451+
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
452+
return;
451453
}
454+
deferredResult.setErrorResult(ex);
455+
}
456+
catch (Throwable interceptorEx) {
457+
setConcurrentResultAndDispatch(interceptorEx);
452458
}
453459
});
454460

@@ -474,10 +480,13 @@ private void startAsyncProcessing(Object[] processingContext) {
474480
synchronized (WebAsyncManager.this) {
475481
this.concurrentResult = RESULT_NONE;
476482
this.concurrentResultContext = processingContext;
477-
this.errorHandlingInProgress = false;
478483
}
479484

480485
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
486+
if (logger.isDebugEnabled()) {
487+
logger.debug("Started async request for " + formatUri(this.asyncWebRequest));
488+
}
489+
481490
this.asyncWebRequest.startAsync();
482491
if (logger.isDebugEnabled()) {
483492
logger.debug("Started async request");
@@ -489,4 +498,31 @@ private static String formatUri(AsyncWebRequest asyncWebRequest) {
489498
return (request != null ? request.getRequestURI() : "servlet container");
490499
}
491500

501+
502+
/**
503+
* Represents a state for {@link WebAsyncManager} to be in.
504+
* <p><pre>
505+
* NOT_STARTED <------+
506+
* | |
507+
* v |
508+
* ASYNC_PROCESSING |
509+
* | |
510+
* v |
511+
* RESULT_SET -------+
512+
* </pre>
513+
* @since 5.3.33
514+
*/
515+
private enum State {
516+
517+
/** No async processing in progress. */
518+
NOT_STARTED,
519+
520+
/** Async handling has started, but the result hasn't been set yet. */
521+
ASYNC_PROCESSING,
522+
523+
/** The result is set, and an async dispatch was performed, unless there is a network error. */
524+
RESULT_SET
525+
526+
}
527+
492528
}

0 commit comments

Comments
 (0)