Skip to content

Commit 7dce0e5

Browse files
committed
* added support for streaming response modification
Fixes spring-cloudgh-2275
1 parent 1e7f83f commit 7dce0e5

File tree

3 files changed

+200
-30
lines changed

3 files changed

+200
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2013-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.gateway.filter.factory.rewrite;
18+
19+
import java.util.function.BiFunction;
20+
21+
import reactor.core.publisher.Flux;
22+
23+
import org.springframework.web.server.ServerWebExchange;
24+
25+
/**
26+
* This interface is BETA and may be subject to change in a future release.
27+
*
28+
* @param <T> the type of the first argument to the function
29+
* @param <R> the type of element signaled by the {@link Flux}
30+
*/
31+
public interface FluxRewriteFunction<T, R> extends BiFunction<ServerWebExchange, Flux<T>, Flux<R>> {
32+
33+
}

spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/ModifyResponseBodyGatewayFilterFactory.java

+133-30
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,19 @@ public static class Config {
9191

9292
private String newContentType;
9393

94+
/**
95+
* Deprecated in favour of {@link FluxRewriteFunction} &
96+
* {@link MonoRewriteFunction} Use {@link MonoRewriteFunction} for modifying
97+
* non-streaming response body Use {@link FluxRewriteFunction} for modifying
98+
* streaming response body
99+
*/
100+
@Deprecated
94101
private RewriteFunction rewriteFunction;
95102

103+
private FluxRewriteFunction fluxRewriteFunction;
104+
105+
private MonoRewriteFunction monoRewriteFunction;
106+
96107
public Class getInClass() {
97108
return inClass;
98109
}
@@ -138,15 +149,42 @@ public Config setNewContentType(String newContentType) {
138149
return this;
139150
}
140151

152+
@Deprecated
141153
public RewriteFunction getRewriteFunction() {
142154
return rewriteFunction;
143155
}
144156

157+
public <T, R> MonoRewriteFunction<Mono<T>, Mono<R>> getMonoRewriteFunction() {
158+
return monoRewriteFunction;
159+
}
160+
161+
public <T, R> FluxRewriteFunction<Flux<T>, Flux<R>> getFluxRewriteFunction() {
162+
return fluxRewriteFunction;
163+
}
164+
165+
/**
166+
* Deprecated in favour of {@link Config#setMonoRewriteFunction} &
167+
* {@link Config#setFluxRewriteFunction} Use {@link Config#setMonoRewriteFunction}
168+
* for modifying non-streaming response body Use
169+
* {@link Config#setFluxRewriteFunction} for modifying streaming response body
170+
*/
171+
@Deprecated
145172
public Config setRewriteFunction(RewriteFunction rewriteFunction) {
146173
this.rewriteFunction = rewriteFunction;
147174
return this;
148175
}
149176

177+
public Config setMonoRewriteFunction(MonoRewriteFunction monoRewriteFunction) {
178+
this.monoRewriteFunction = monoRewriteFunction;
179+
return this;
180+
}
181+
182+
public Config setFluxRewriteFunction(FluxRewriteFunction fluxRewriteFunction) {
183+
this.fluxRewriteFunction = fluxRewriteFunction;
184+
return this;
185+
}
186+
187+
@Deprecated
150188
public <T, R> Config setRewriteFunction(Class<T> inClass, Class<R> outClass,
151189
RewriteFunction<T, R> rewriteFunction) {
152190
setInClass(inClass);
@@ -155,6 +193,21 @@ public <T, R> Config setRewriteFunction(Class<T> inClass, Class<R> outClass,
155193
return this;
156194
}
157195

196+
public <T, R> Config setFluxRewriteFunction(Class<T> inClass, Class<R> outClass,
197+
FluxRewriteFunction<T, R> fluxRewriteFunction) {
198+
setInClass(inClass);
199+
setOutClass(outClass);
200+
setFluxRewriteFunction(fluxRewriteFunction);
201+
return this;
202+
}
203+
204+
public <T, R> Config setMonoRewriteFunction(Class<T> inClass, Class<R> outClass,
205+
MonoRewriteFunction<T, R> monoRewriteFunction) {
206+
setInClass(inClass);
207+
setOutClass(outClass);
208+
setMonoRewriteFunction(monoRewriteFunction);
209+
return this;
210+
}
158211
}
159212

160213
public class ModifyResponseGatewayFilter implements GatewayFilter, Ordered {
@@ -204,51 +257,101 @@ public ModifiedServerHttpResponse(ServerWebExchange exchange, Config config) {
204257
this.config = config;
205258
}
206259

207-
@SuppressWarnings("unchecked")
208260
@Override
261+
@SuppressWarnings({ "unchecked", "rawtypes" })
209262
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
210263

211264
Class inClass = config.getInClass();
212265
Class outClass = config.getOutClass();
213266

214-
String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
215-
HttpHeaders httpHeaders = new HttpHeaders();
216-
// explicitly add it in this way instead of
217-
// 'httpHeaders.setContentType(originalResponseContentType)'
218-
// this will prevent exception in case of using non-standard media
219-
// types like "Content-Type: image"
220-
httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);
221-
267+
HttpHeaders httpHeaders = prepareHttpHeaders();
222268
ClientResponse clientResponse = prepareClientResponse(body, httpHeaders);
223269

224-
// TODO: flux or mono
225-
Mono modifiedBody = extractBody(exchange, clientResponse, inClass)
226-
.flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody))
227-
.switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null)));
270+
var modifiedBody = extractBody(exchange, clientResponse, inClass);
271+
if (config.getRewriteFunction() != null) {
272+
// TODO: to be removed with removal of rewriteFunction
273+
modifiedBody = modifiedBody
274+
.flatMap(originalBody -> config.getRewriteFunction()
275+
.apply(exchange, originalBody))
276+
.switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction()
277+
.apply(exchange, null)));
278+
}
279+
if (config.getMonoRewriteFunction() != null) {
280+
modifiedBody = config.getMonoRewriteFunction().apply(exchange,
281+
modifiedBody);
282+
}
228283

229-
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass);
284+
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody,
285+
outClass);
230286
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
231287
exchange.getResponse().getHeaders());
232-
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
233-
Mono<DataBuffer> messageBody = writeBody(getDelegate(), outputMessage, outClass);
234-
HttpHeaders headers = getDelegate().getHeaders();
235-
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
236-
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
237-
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
238-
}
288+
return bodyInserter.insert(outputMessage, new BodyInserterContext())
289+
.then(Mono.defer(() -> {
290+
Mono<DataBuffer> messageBody = writeBody(getDelegate(),
291+
outputMessage, outClass);
292+
HttpHeaders headers = getDelegate().getHeaders();
293+
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
294+
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
295+
messageBody = messageBody.doOnNext(data -> headers
296+
.setContentLength(data.readableByteCount()));
297+
}
298+
299+
if (StringUtils.hasText(config.newContentType)) {
300+
headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType);
301+
}
302+
303+
// TODO: fail if isStreamingMediaType?
304+
return getDelegate().writeWith(messageBody);
305+
}));
306+
}
239307

240-
if (StringUtils.hasText(config.newContentType)) {
241-
headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType);
242-
}
308+
@Override
309+
@SuppressWarnings({ "unchecked", "rawtypes" })
310+
public Mono<Void> writeAndFlushWith(
311+
Publisher<? extends Publisher<? extends DataBuffer>> body) {
312+
final var httpHeaders = prepareHttpHeaders();
313+
final var fluxRewriteConfig = config.getFluxRewriteFunction();
314+
final var publisher = Flux.from(body).flatMapSequential(r -> r);
315+
final var clientResponse = prepareClientResponse(publisher, httpHeaders);
316+
var modifiedBody = clientResponse.bodyToFlux(config.inClass);
317+
if (config.getRewriteFunction() != null) {
318+
// TODO: to be removed with removal of rewriteFunction
319+
modifiedBody = modifiedBody
320+
.flatMap(originalBody -> config.getRewriteFunction()
321+
.apply(exchange, originalBody))
322+
.switchIfEmpty(Flux.defer(() -> (Flux) config.getRewriteFunction()
323+
.apply(exchange, null)));
324+
}
325+
if (config.getFluxRewriteFunction() != null) {
326+
modifiedBody = fluxRewriteConfig.apply(exchange, modifiedBody);
327+
}
328+
final var bodyInserter = BodyInserters.fromPublisher(modifiedBody,
329+
config.outClass);
330+
final var outputMessage = new CachedBodyOutputMessage(exchange,
331+
exchange.getResponse().getHeaders());
243332

244-
// TODO: fail if isStreamingMediaType?
245-
return getDelegate().writeWith(messageBody);
246-
}));
333+
return bodyInserter.insert(outputMessage, new BodyInserterContext())
334+
.then(Mono.defer(() -> {
335+
final var messageBody = outputMessage.getBody();
336+
HttpHeaders headers = getDelegate().getHeaders();
337+
if (StringUtils.hasText(config.newContentType)) {
338+
headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType);
339+
}
340+
return getDelegate()
341+
.writeAndFlushWith(messageBody.map(Flux::just));
342+
}));
247343
}
248344

249-
@Override
250-
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
251-
return writeWith(Flux.from(body).flatMapSequential(p -> p));
345+
private HttpHeaders prepareHttpHeaders() {
346+
String originalResponseContentType = exchange
347+
.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
348+
HttpHeaders httpHeaders = new HttpHeaders();
349+
// explicitly add it in this way instead of
350+
// 'httpHeaders.setContentType(originalResponseContentType)'
351+
// this will prevent exception in case of using non-standard media
352+
// types like "Content-Type: image"
353+
httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);
354+
return httpHeaders;
252355
}
253356

254357
private ClientResponse prepareClientResponse(Publisher<? extends DataBuffer> body, HttpHeaders httpHeaders) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2013-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.gateway.filter.factory.rewrite;
18+
19+
import java.util.function.BiFunction;
20+
21+
import reactor.core.publisher.Mono;
22+
23+
import org.springframework.web.server.ServerWebExchange;
24+
25+
26+
/**
27+
* This interface is BETA and may be subject to change in a future release.
28+
*
29+
* @param <T> the type of the first argument to the function
30+
* @param <R> the type of element signaled by the {@link Mono}
31+
*/
32+
public interface MonoRewriteFunction<T, R> extends BiFunction<ServerWebExchange, Mono<T>, Mono<R>> {
33+
34+
}

0 commit comments

Comments
 (0)