|
16 | 16 |
|
17 | 17 | package org.springframework.http.client.reactive;
|
18 | 18 |
|
| 19 | +import java.util.Objects; |
19 | 20 | import java.util.concurrent.atomic.AtomicBoolean;
|
20 | 21 |
|
| 22 | +import org.reactivestreams.Publisher; |
| 23 | +import org.reactivestreams.Subscriber; |
| 24 | +import org.reactivestreams.Subscription; |
21 | 25 | import reactor.core.publisher.Flux;
|
22 | 26 |
|
23 | 27 | import org.springframework.core.io.buffer.DataBuffer;
|
@@ -54,16 +58,7 @@ protected AbstractClientHttpResponse(int statusCode, HttpHeaders headers,
|
54 | 58 | this.statusCode = statusCode;
|
55 | 59 | this.headers = headers;
|
56 | 60 | this.cookies = cookies;
|
57 |
| - this.body = singleSubscription(body); |
58 |
| - } |
59 |
| - |
60 |
| - private static Flux<DataBuffer> singleSubscription(Flux<DataBuffer> body) { |
61 |
| - AtomicBoolean subscribed = new AtomicBoolean(); |
62 |
| - return body.doOnSubscribe(s -> { |
63 |
| - if (!subscribed.compareAndSet(false, true)) { |
64 |
| - throw new IllegalStateException("The client response body can only be consumed once"); |
65 |
| - } |
66 |
| - }); |
| 61 | + this.body = Flux.from(new SingleSubscriberPublisher<>(body)); |
67 | 62 | }
|
68 | 63 |
|
69 | 64 |
|
@@ -91,4 +86,39 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
|
91 | 86 | public Flux<DataBuffer> getBody() {
|
92 | 87 | return this.body;
|
93 | 88 | }
|
| 89 | + |
| 90 | + |
| 91 | + private static final class SingleSubscriberPublisher<T> implements Publisher<T> { |
| 92 | + |
| 93 | + private static final Subscription NO_OP_SUBSCRIPTION = new Subscription() { |
| 94 | + @Override |
| 95 | + public void request(long l) { |
| 96 | + } |
| 97 | + |
| 98 | + @Override |
| 99 | + public void cancel() { |
| 100 | + } |
| 101 | + }; |
| 102 | + |
| 103 | + private final Publisher<T> delegate; |
| 104 | + |
| 105 | + private final AtomicBoolean subscribed = new AtomicBoolean(); |
| 106 | + |
| 107 | + |
| 108 | + public SingleSubscriberPublisher(Publisher<T> delegate) { |
| 109 | + this.delegate = delegate; |
| 110 | + } |
| 111 | + |
| 112 | + @Override |
| 113 | + public void subscribe(Subscriber<? super T> subscriber) { |
| 114 | + Objects.requireNonNull(subscriber, "Subscriber must not be null"); |
| 115 | + if (this.subscribed.compareAndSet(false, true)) { |
| 116 | + this.delegate.subscribe(subscriber); |
| 117 | + } |
| 118 | + else { |
| 119 | + subscriber.onSubscribe(NO_OP_SUBSCRIPTION); |
| 120 | + subscriber.onError(new IllegalStateException("The client response body can only be consumed once")); |
| 121 | + } |
| 122 | + } |
| 123 | + } |
94 | 124 | }
|
0 commit comments