Skip to content

Commit 5fc1806

Browse files
committed
Use encode with an Object value where feasible
Closes gh-22782
1 parent 181482f commit 5fc1806

File tree

7 files changed

+69
-106
lines changed

7 files changed

+69
-106
lines changed

Diff for: spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,14 @@ static DefaultDataBuffer fromEmptyByteBuffer(DefaultDataBufferFactory dataBuffer
8080

8181

8282
/**
83-
* Directly exposes the native {@code ByteBuffer} that this buffer is based on.
83+
* Directly exposes the native {@code ByteBuffer} that this buffer is based
84+
* on also updating the {@code ByteBuffer's} position and limit to match
85+
* the current {@link #readPosition()} and {@link #readableByteCount()}.
8486
* @return the wrapped byte buffer
8587
*/
8688
public ByteBuffer getNativeBuffer() {
89+
this.byteBuffer.position(this.readPosition);
90+
this.byteBuffer.limit(readableByteCount());
8791
return this.byteBuffer;
8892
}
8993

Diff for: spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.springframework.core.codec.Encoder;
3434
import org.springframework.core.io.buffer.DataBuffer;
3535
import org.springframework.core.io.buffer.DataBufferFactory;
36-
import org.springframework.core.io.buffer.DataBufferUtils;
3736
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
3837
import org.springframework.lang.Nullable;
3938
import org.springframework.messaging.Message;
@@ -148,7 +147,7 @@ private Flux<DataBuffer> encodeContent(
148147

149148
Encoder<?> encoder = getEncoder(elementType, mimeType);
150149

151-
return Flux.from((Publisher) publisher).concatMap(value ->
150+
return Flux.from((Publisher) publisher).map(value ->
152151
encodeValue(value, elementType, encoder, bufferFactory, mimeType, hints));
153152
}
154153

@@ -176,21 +175,19 @@ private <T> Encoder<T> getEncoder(ResolvableType elementType, @Nullable MimeType
176175
}
177176

178177
@SuppressWarnings("unchecked")
179-
private <T> Mono<DataBuffer> encodeValue(
178+
private <T> DataBuffer encodeValue(
180179
Object element, ResolvableType elementType, @Nullable Encoder<T> encoder,
181180
DataBufferFactory bufferFactory, @Nullable MimeType mimeType,
182181
@Nullable Map<String, Object> hints) {
183182

184183
if (encoder == null) {
185184
encoder = getEncoder(ResolvableType.forInstance(element), mimeType);
186185
if (encoder == null) {
187-
return Mono.error(new MessagingException(
188-
"No encoder for " + elementType + ", current value type is " + element.getClass()));
186+
throw new MessagingException(
187+
"No encoder for " + elementType + ", current value type is " + element.getClass());
189188
}
190189
}
191-
Mono<T> mono = Mono.just((T) element);
192-
Flux<DataBuffer> dataBuffers = encoder.encode(mono, bufferFactory, elementType, mimeType, hints);
193-
return DataBufferUtils.join(dataBuffers);
190+
return encoder.encodeValue((T) element, bufferFactory, elementType, mimeType, hints);
194191
}
195192

196193
/**

Diff for: spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.springframework.core.codec.Decoder;
3333
import org.springframework.core.codec.Encoder;
3434
import org.springframework.core.io.buffer.DataBuffer;
35-
import org.springframework.core.io.buffer.DataBufferUtils;
3635
import org.springframework.lang.Nullable;
3736
import org.springframework.util.Assert;
3837
import org.springframework.util.MimeType;
@@ -124,8 +123,10 @@ else if (adapter != null) {
124123
publisher = adapter.toPublisher(input);
125124
}
126125
else {
127-
Mono<Payload> payloadMono = encodeValue(input, ResolvableType.forInstance(input), null)
126+
Mono<Payload> payloadMono = Mono
127+
.fromCallable(() -> encodeValue(input, ResolvableType.forInstance(input), null))
128128
.map(this::firstPayload)
129+
.doOnDiscard(Payload.class, Payload::release)
129130
.switchIfEmpty(emptyPayload());
130131
return new DefaultResponseSpec(payloadMono);
131132
}
@@ -140,36 +141,36 @@ else if (adapter != null) {
140141

141142
if (adapter != null && !adapter.isMultiValue()) {
142143
Mono<Payload> payloadMono = Mono.from(publisher)
143-
.flatMap(value -> encodeValue(value, dataType, encoder))
144+
.map(value -> encodeValue(value, dataType, encoder))
144145
.map(this::firstPayload)
145146
.switchIfEmpty(emptyPayload());
146147
return new DefaultResponseSpec(payloadMono);
147148
}
148149

149150
Flux<Payload> payloadFlux = Flux.from(publisher)
150-
.concatMap(value -> encodeValue(value, dataType, encoder))
151+
.map(value -> encodeValue(value, dataType, encoder))
151152
.switchOnFirst((signal, inner) -> {
152153
DataBuffer data = signal.get();
153154
if (data != null) {
154-
return Flux.concat(
155-
Mono.just(firstPayload(data)),
156-
inner.skip(1).map(PayloadUtils::createPayload));
155+
return Mono.fromCallable(() -> firstPayload(data))
156+
.concatWith(inner.skip(1).map(PayloadUtils::createPayload));
157157
}
158158
else {
159159
return inner.map(PayloadUtils::createPayload);
160160
}
161161
})
162+
.doOnDiscard(Payload.class, Payload::release)
162163
.switchIfEmpty(emptyPayload());
163164
return new DefaultResponseSpec(payloadFlux);
164165
}
165166

166167
@SuppressWarnings("unchecked")
167-
private <T> Mono<DataBuffer> encodeValue(T value, ResolvableType valueType, @Nullable Encoder<?> encoder) {
168+
private <T> DataBuffer encodeValue(T value, ResolvableType valueType, @Nullable Encoder<?> encoder) {
168169
if (encoder == null) {
169170
encoder = strategies.encoder(ResolvableType.forInstance(value), dataMimeType);
170171
}
171-
return DataBufferUtils.join(((Encoder<T>) encoder).encode(
172-
Mono.just(value), strategies.dataBufferFactory(), valueType, dataMimeType, EMPTY_HINTS));
172+
return ((Encoder<T>) encoder).encodeValue(
173+
value, strategies.dataBufferFactory(), valueType, dataMimeType, EMPTY_HINTS);
173174
}
174175

175176
private Payload firstPayload(DataBuffer data) {

Diff for: spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void handleMonoString() {
8181
@Test
8282
public void handleFluxString() {
8383
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
84-
messsageHandler.handleMessage(message("fluxString", "abc\ndef\nghi")).block(Duration.ofSeconds(5));
84+
messsageHandler.handleMessage(message("fluxString", "abc", "def", "ghi")).block(Duration.ofSeconds(5));
8585
verifyOutputContent(Arrays.asList("abc::response", "def::response", "ghi::response"));
8686
}
8787

Diff for: spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -129,19 +129,22 @@ public void string() {
129129

130130
@Test
131131
public void validateStringMono() {
132+
TestValidator validator = new TestValidator();
132133
ResolvableType type = ResolvableType.forClassWithGenerics(Mono.class, String.class);
133134
MethodParameter param = this.testMethod.arg(type);
134-
Mono<Object> mono = resolveValue(param, Mono.just(toDataBuffer("12345")), new TestValidator());
135+
Mono<Object> mono = resolveValue(param, Mono.just(toDataBuffer("12345")), validator);
135136

136137
StepVerifier.create(mono).expectNextCount(0)
137138
.expectError(MethodArgumentNotValidException.class).verify();
138139
}
139140

140141
@Test
141142
public void validateStringFlux() {
143+
TestValidator validator = new TestValidator();
142144
ResolvableType type = ResolvableType.forClassWithGenerics(Flux.class, String.class);
143145
MethodParameter param = this.testMethod.arg(type);
144-
Flux<Object> flux = resolveValue(param, Mono.just(toDataBuffer("12345678\n12345")), new TestValidator());
146+
Flux<DataBuffer> content = Flux.just(toDataBuffer("12345678"), toDataBuffer("12345"));
147+
Flux<Object> flux = resolveValue(param, content, validator);
145148

146149
StepVerifier.create(flux)
147150
.expectNext("12345678")

Diff for: spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java

+25-25
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.nio.charset.StandardCharsets;
2020
import java.time.Duration;
21+
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.List;
2324
import java.util.Map;
@@ -111,9 +112,9 @@ public Mono<Void> write(Publisher<?> input, ResolvableType elementType, @Nullabl
111112
}
112113

113114
private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType elementType,
114-
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
115+
MediaType mediaType, DataBufferFactory bufferFactory, Map<String, Object> hints) {
115116

116-
ResolvableType valueType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ?
117+
ResolvableType dataType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ?
117118
elementType.getGeneric() : elementType);
118119

119120
return Flux.from(input).map(element -> {
@@ -143,12 +144,10 @@ private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType el
143144
sb.append("data:");
144145
}
145146

146-
Flux<DataBuffer> flux = Flux.concat(
147-
encodeText(sb, mediaType, factory),
148-
encodeData(data, valueType, mediaType, factory, hints),
149-
encodeText("\n", mediaType, factory));
147+
Mono<DataBuffer> bufferMono = Mono.fromCallable(() ->
148+
bufferFactory.join(encodeEvent(sb, data, dataType, mediaType, bufferFactory, hints)));
150149

151-
return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
150+
return bufferMono.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
152151
});
153152
}
154153

@@ -160,31 +159,32 @@ private void writeField(String fieldName, Object fieldValue, StringBuilder sb) {
160159
}
161160

162161
@SuppressWarnings("unchecked")
163-
private <T> Flux<DataBuffer> encodeData(@Nullable T dataValue, ResolvableType valueType,
162+
private <T> List<DataBuffer> encodeEvent(CharSequence markup, @Nullable T data, ResolvableType dataType,
164163
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
165164

166-
if (dataValue == null) {
167-
return Flux.empty();
168-
}
169-
170-
if (dataValue instanceof String) {
171-
String text = (String) dataValue;
172-
return Flux.from(encodeText(StringUtils.replace(text, "\n", "\ndata:") + "\n", mediaType, factory));
173-
}
174-
175-
if (this.encoder == null) {
176-
return Flux.error(new CodecException("No SSE encoder configured and the data is not String."));
165+
List<DataBuffer> result = new ArrayList<>(4);
166+
result.add(encodeText(markup, mediaType, factory));
167+
if (data != null) {
168+
if (data instanceof String) {
169+
String dataLine = StringUtils.replace((String) data, "\n", "\ndata:") + "\n";
170+
result.add(encodeText(dataLine, mediaType, factory));
171+
}
172+
else if (this.encoder == null) {
173+
throw new CodecException("No SSE encoder configured and the data is not String.");
174+
}
175+
else {
176+
result.add(((Encoder<T>) this.encoder).encodeValue(data, factory, dataType, mediaType, hints));
177+
result.add(encodeText("\n", mediaType, factory));
178+
}
177179
}
178-
179-
return ((Encoder<T>) this.encoder)
180-
.encode(Mono.just(dataValue), factory, valueType, mediaType, hints)
181-
.concatWith(encodeText("\n", mediaType, factory));
180+
result.add(encodeText("\n", mediaType, factory));
181+
return result;
182182
}
183183

184-
private Mono<DataBuffer> encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
184+
private DataBuffer encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
185185
Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
186186
byte[] bytes = text.toString().getBytes(mediaType.getCharset());
187-
return Mono.just(bufferFactory.wrap(bytes)); // wrapping, not allocating
187+
return bufferFactory.wrap(bytes); // wrapping, not allocating
188188
}
189189

190190
@Override

Diff for: spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java

+17-59
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@
4040
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
4141

4242
import static org.junit.Assert.*;
43-
import static org.springframework.core.ResolvableType.forClass;
43+
import static org.springframework.core.ResolvableType.*;
4444

4545
/**
4646
* Unit tests for {@link ServerSentEventHttpMessageWriter}.
@@ -88,9 +88,8 @@ public void writeServerSentEvent() {
8888
testWrite(source, outputMessage, ServerSentEvent.class);
8989

9090
StepVerifier.create(outputMessage.getBody())
91-
.consumeNextWith(stringConsumer("id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:"))
92-
.consumeNextWith(stringConsumer("bar\n"))
93-
.consumeNextWith(stringConsumer("\n"))
91+
.consumeNextWith(stringConsumer(
92+
"id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n"))
9493
.expectComplete()
9594
.verify();
9695
}
@@ -101,12 +100,8 @@ public void writeString() {
101100
testWrite(source, outputMessage, String.class);
102101

103102
StepVerifier.create(outputMessage.getBody())
104-
.consumeNextWith(stringConsumer("data:"))
105-
.consumeNextWith(stringConsumer("foo\n"))
106-
.consumeNextWith(stringConsumer("\n"))
107-
.consumeNextWith(stringConsumer("data:"))
108-
.consumeNextWith(stringConsumer("bar\n"))
109-
.consumeNextWith(stringConsumer("\n"))
103+
.consumeNextWith(stringConsumer("data:foo\n\n"))
104+
.consumeNextWith(stringConsumer("data:bar\n\n"))
110105
.expectComplete()
111106
.verify();
112107
}
@@ -117,12 +112,8 @@ public void writeMultiLineString() {
117112
testWrite(source, outputMessage, String.class);
118113

119114
StepVerifier.create(outputMessage.getBody())
120-
.consumeNextWith(stringConsumer("data:"))
121-
.consumeNextWith(stringConsumer("foo\ndata:bar\n"))
122-
.consumeNextWith(stringConsumer("\n"))
123-
.consumeNextWith(stringConsumer("data:"))
124-
.consumeNextWith(stringConsumer("foo\ndata:baz\n"))
125-
.consumeNextWith(stringConsumer("\n"))
115+
.consumeNextWith(stringConsumer("data:foo\ndata:bar\n\n"))
116+
.consumeNextWith(stringConsumer("data:foo\ndata:baz\n\n"))
126117
.expectComplete()
127118
.verify();
128119
}
@@ -136,14 +127,11 @@ public void writeStringWithCustomCharset() {
136127

137128
assertEquals(mediaType, outputMessage.getHeaders().getContentType());
138129
StepVerifier.create(outputMessage.getBody())
139-
.consumeNextWith(stringConsumer("data:"))
140130
.consumeNextWith(dataBuffer -> {
141-
String value =
142-
DataBufferTestUtils.dumpString(dataBuffer, charset);
131+
String value = DataBufferTestUtils.dumpString(dataBuffer, charset);
143132
DataBufferUtils.release(dataBuffer);
144-
assertEquals("\u00A3\n", value);
133+
assertEquals("data:\u00A3\n\n", value);
145134
})
146-
.consumeNextWith(stringConsumer("\n"))
147135
.expectComplete()
148136
.verify();
149137
}
@@ -154,14 +142,8 @@ public void writePojo() {
154142
testWrite(source, outputMessage, Pojo.class);
155143

156144
StepVerifier.create(outputMessage.getBody())
157-
.consumeNextWith(stringConsumer("data:"))
158-
.consumeNextWith(stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"))
159-
.consumeNextWith(stringConsumer("\n"))
160-
.consumeNextWith(stringConsumer("\n"))
161-
.consumeNextWith(stringConsumer("data:"))
162-
.consumeNextWith(stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"))
163-
.consumeNextWith(stringConsumer("\n"))
164-
.consumeNextWith(stringConsumer("\n"))
145+
.consumeNextWith(stringConsumer("data:{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n\n"))
146+
.consumeNextWith(stringConsumer("data:{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n\n"))
165147
.expectComplete()
166148
.verify();
167149
}
@@ -175,18 +157,12 @@ public void writePojoWithPrettyPrint() {
175157
testWrite(source, outputMessage, Pojo.class);
176158

177159
StepVerifier.create(outputMessage.getBody())
178-
.consumeNextWith(stringConsumer("data:"))
179-
.consumeNextWith(stringConsumer("{\n" +
160+
.consumeNextWith(stringConsumer("data:{\n" +
180161
"data: \"foo\" : \"foofoo\",\n" +
181-
"data: \"bar\" : \"barbar\"\n" + "data:}"))
182-
.consumeNextWith(stringConsumer("\n"))
183-
.consumeNextWith(stringConsumer("\n"))
184-
.consumeNextWith(stringConsumer("data:"))
185-
.consumeNextWith(stringConsumer("{\n" +
162+
"data: \"bar\" : \"barbar\"\n" + "data:}\n\n"))
163+
.consumeNextWith(stringConsumer("data:{\n" +
186164
"data: \"foo\" : \"foofoofoo\",\n" +
187-
"data: \"bar\" : \"barbarbar\"\n" + "data:}"))
188-
.consumeNextWith(stringConsumer("\n"))
189-
.consumeNextWith(stringConsumer("\n"))
165+
"data: \"bar\" : \"barbarbar\"\n" + "data:}\n\n"))
190166
.expectComplete()
191167
.verify();
192168
}
@@ -200,28 +176,10 @@ public void writePojoWithCustomEncoding() {
200176

201177
assertEquals(mediaType, outputMessage.getHeaders().getContentType());
202178
StepVerifier.create(outputMessage.getBody())
203-
.consumeNextWith(dataBuffer1 -> {
204-
String value1 =
205-
DataBufferTestUtils.dumpString(dataBuffer1, charset);
206-
DataBufferUtils.release(dataBuffer1);
207-
assertEquals("data:", value1);
208-
})
209179
.consumeNextWith(dataBuffer -> {
210180
String value = DataBufferTestUtils.dumpString(dataBuffer, charset);
211181
DataBufferUtils.release(dataBuffer);
212-
assertEquals("{\"foo\":\"foo\uD834\uDD1E\",\"bar\":\"bar\uD834\uDD1E\"}", value);
213-
})
214-
.consumeNextWith(dataBuffer2 -> {
215-
String value2 =
216-
DataBufferTestUtils.dumpString(dataBuffer2, charset);
217-
DataBufferUtils.release(dataBuffer2);
218-
assertEquals("\n", value2);
219-
})
220-
.consumeNextWith(dataBuffer3 -> {
221-
String value3 =
222-
DataBufferTestUtils.dumpString(dataBuffer3, charset);
223-
DataBufferUtils.release(dataBuffer3);
224-
assertEquals("\n", value3);
182+
assertEquals("data:{\"foo\":\"foo\uD834\uDD1E\",\"bar\":\"bar\uD834\uDD1E\"}\n\n", value);
225183
})
226184
.expectComplete()
227185
.verify();

0 commit comments

Comments
 (0)