Skip to content

Commit f7fc362

Browse files
committed
Properly handle completion while in READING state
Closes gh-26834
1 parent ae7d520 commit f7fc362

File tree

2 files changed

+42
-23
lines changed

2 files changed

+42
-23
lines changed

Diff for: spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

+39-21
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
6868
@Nullable
6969
private volatile Subscriber<? super T> subscriber;
7070

71-
private volatile boolean completionBeforeDemand;
71+
private volatile boolean completionPending;
7272

7373
@Nullable
74-
private volatile Throwable errorBeforeDemand;
74+
private volatile Throwable errorPending;
7575

7676
private final String logPrefix;
7777

@@ -228,21 +228,24 @@ private void changeToDemandState(State oldState) {
228228
}
229229
}
230230

231-
private void handleCompletionOrErrorBeforeDemand() {
231+
private boolean handlePendingCompletionOrError() {
232232
State state = this.state.get();
233-
if (!state.equals(State.UNSUBSCRIBED) && !state.equals(State.SUBSCRIBING)) {
234-
if (this.completionBeforeDemand) {
235-
rsReadLogger.trace(getLogPrefix() + "Completed before demand");
233+
if (state.equals(State.DEMAND) || state.equals(State.NO_DEMAND)) {
234+
if (this.completionPending) {
235+
rsReadLogger.trace(getLogPrefix() + "Processing pending completion");
236236
this.state.get().onAllDataRead(this);
237+
return true;
237238
}
238-
Throwable ex = this.errorBeforeDemand;
239+
Throwable ex = this.errorPending;
239240
if (ex != null) {
240241
if (rsReadLogger.isTraceEnabled()) {
241-
rsReadLogger.trace(getLogPrefix() + "Completed with error before demand: " + ex);
242+
rsReadLogger.trace(getLogPrefix() + "Processing pending completion with error: " + ex);
242243
}
243244
this.state.get().onError(this, ex);
245+
return true;
244246
}
245247
}
248+
return false;
246249
}
247250

248251
private Subscription createSubscription() {
@@ -305,7 +308,7 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
305308
publisher.subscriber = subscriber;
306309
subscriber.onSubscribe(subscription);
307310
publisher.changeState(SUBSCRIBING, NO_DEMAND);
308-
publisher.handleCompletionOrErrorBeforeDemand();
311+
publisher.handlePendingCompletionOrError();
309312
}
310313
else {
311314
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
@@ -315,14 +318,14 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
315318

316319
@Override
317320
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
318-
publisher.completionBeforeDemand = true;
319-
publisher.handleCompletionOrErrorBeforeDemand();
321+
publisher.completionPending = true;
322+
publisher.handlePendingCompletionOrError();
320323
}
321324

322325
@Override
323326
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
324-
publisher.errorBeforeDemand = ex;
325-
publisher.handleCompletionOrErrorBeforeDemand();
327+
publisher.errorPending = ex;
328+
publisher.handlePendingCompletionOrError();
326329
}
327330
},
328331

@@ -341,14 +344,14 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
341344

342345
@Override
343346
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
344-
publisher.completionBeforeDemand = true;
345-
publisher.handleCompletionOrErrorBeforeDemand();
347+
publisher.completionPending = true;
348+
publisher.handlePendingCompletionOrError();
346349
}
347350

348351
@Override
349352
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
350-
publisher.errorBeforeDemand = ex;
351-
publisher.handleCompletionOrErrorBeforeDemand();
353+
publisher.errorPending = ex;
354+
publisher.handlePendingCompletionOrError();
352355
}
353356
},
354357

@@ -379,14 +382,17 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
379382
boolean demandAvailable = publisher.readAndPublish();
380383
if (demandAvailable) {
381384
publisher.changeToDemandState(READING);
385+
publisher.handlePendingCompletionOrError();
382386
}
383387
else {
384388
publisher.readingPaused();
385389
if (publisher.changeState(READING, NO_DEMAND)) {
386-
// Demand may have arrived since readAndPublish returned
387-
long r = publisher.demand;
388-
if (r > 0) {
389-
publisher.changeToDemandState(NO_DEMAND);
390+
if (!publisher.handlePendingCompletionOrError()) {
391+
// Demand may have arrived since readAndPublish returned
392+
long r = publisher.demand;
393+
if (r > 0) {
394+
publisher.changeToDemandState(NO_DEMAND);
395+
}
390396
}
391397
}
392398
}
@@ -408,6 +414,18 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
408414
publisher.changeToDemandState(NO_DEMAND);
409415
}
410416
}
417+
418+
@Override
419+
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
420+
publisher.completionPending = true;
421+
publisher.handlePendingCompletionOrError();
422+
}
423+
424+
@Override
425+
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
426+
publisher.errorPending = ex;
427+
publisher.handlePendingCompletionOrError();
428+
}
411429
},
412430

413431
COMPLETED {

Diff for: spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,11 @@ public final void onComplete() {
151151
* container.
152152
*/
153153
public final void onWritePossible() {
154+
State state = this.state.get();
154155
if (rsWriteLogger.isTraceEnabled()) {
155-
rsWriteLogger.trace(getLogPrefix() + "onWritePossible");
156+
rsWriteLogger.trace(getLogPrefix() + "onWritePossible [" + state + "]");
156157
}
157-
this.state.get().onWritePossible(this);
158+
state.onWritePossible(this);
158159
}
159160

160161
/**

0 commit comments

Comments
 (0)