Skip to content

Commit 55946bf

Browse files
committed
Factor out MetadataEncoder from RSocketRequester
To be re-used also for creating metadata for the setup payload. See: gh-23368
1 parent c76370d commit 55946bf

File tree

6 files changed

+475
-230
lines changed

6 files changed

+475
-230
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.function.BiConsumer;
2626

2727
import io.netty.buffer.ByteBuf;
28-
import io.netty.buffer.PooledByteBufAllocator;
28+
import io.netty.buffer.ByteBufAllocator;
2929
import io.rsocket.Payload;
3030
import io.rsocket.metadata.CompositeMetadata;
3131

@@ -179,8 +179,9 @@ private void extractEntry(ByteBuf content, @Nullable String mimeType, Map<String
179179

180180
private static class EntryExtractor<T> {
181181

182+
// We only need this to wrap ByteBufs
182183
private final static NettyDataBufferFactory bufferFactory =
183-
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
184+
new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
184185

185186

186187
private final Decoder<T> decoder;

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

Lines changed: 14 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,10 @@
1717
package org.springframework.messaging.rsocket;
1818

1919
import java.util.Collections;
20-
import java.util.LinkedHashMap;
2120
import java.util.Map;
22-
import java.util.regex.Matcher;
23-
import java.util.regex.Pattern;
2421

25-
import io.netty.buffer.ByteBuf;
26-
import io.netty.buffer.ByteBufAllocator;
27-
import io.netty.buffer.CompositeByteBuf;
28-
import io.netty.buffer.Unpooled;
2922
import io.rsocket.Payload;
3023
import io.rsocket.RSocket;
31-
import io.rsocket.metadata.CompositeMetadataFlyweight;
3224
import org.reactivestreams.Publisher;
3325
import reactor.core.publisher.Flux;
3426
import reactor.core.publisher.Mono;
@@ -41,12 +33,9 @@
4133
import org.springframework.core.io.buffer.DataBuffer;
4234
import org.springframework.core.io.buffer.DataBufferFactory;
4335
import org.springframework.core.io.buffer.DataBufferUtils;
44-
import org.springframework.core.io.buffer.NettyDataBuffer;
45-
import org.springframework.core.io.buffer.NettyDataBufferFactory;
4636
import org.springframework.lang.Nullable;
4737
import org.springframework.util.Assert;
4838
import org.springframework.util.MimeType;
49-
import org.springframework.util.ObjectUtils;
5039

5140
/**
5241
* Default implementation of {@link RSocketRequester}.
@@ -56,9 +45,6 @@
5645
*/
5746
final class DefaultRSocketRequester implements RSocketRequester {
5847

59-
/** For route variable replacement. */
60-
private static final Pattern VARS_PATTERN = Pattern.compile("\\{([^/]+?)\\}");
61-
6248
private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();
6349

6450

@@ -107,30 +93,7 @@ public MimeType metadataMimeType() {
10793

10894
@Override
10995
public RequestSpec route(String route, Object... vars) {
110-
Assert.notNull(route, "'route' is required");
111-
route = expand(route, vars);
112-
return new DefaultRequestSpec(route, isCompositeMetadata() ? MetadataExtractor.ROUTING : null);
113-
}
114-
115-
private static String expand(String route, Object... vars) {
116-
if (ObjectUtils.isEmpty(vars)) {
117-
return route;
118-
}
119-
StringBuffer sb = new StringBuffer();
120-
int index = 0;
121-
Matcher matcher = VARS_PATTERN.matcher(route);
122-
while (matcher.find()) {
123-
Assert.isTrue(index < vars.length, () -> "No value for variable '" + matcher.group(1) + "'");
124-
String value = vars[index].toString();
125-
value = value.contains(".") ? value.replaceAll("\\.", "%2E") : value;
126-
matcher.appendReplacement(sb, value);
127-
index++;
128-
}
129-
return sb.toString();
130-
}
131-
132-
private boolean isCompositeMetadata() {
133-
return metadataMimeType().equals(MetadataExtractor.COMPOSITE_METADATA);
96+
return new DefaultRequestSpec(route, vars);
13497
}
13598

13699
@Override
@@ -150,22 +113,23 @@ private DataBufferFactory bufferFactory() {
150113

151114
private class DefaultRequestSpec implements RequestSpec {
152115

153-
private final Map<Object, MimeType> metadata = new LinkedHashMap<>(4);
116+
private final MetadataEncoder metadataEncoder;
154117

155118

156-
DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
157-
mimeType = (mimeType == null && !isCompositeMetadata() ? metadataMimeType() : mimeType);
158-
Assert.notNull(mimeType, "MimeType is required for composite metadata");
159-
metadata(metadata, mimeType);
119+
public DefaultRequestSpec(String route, Object... vars) {
120+
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
121+
this.metadataEncoder.route(route, vars);
160122
}
161123

124+
public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
125+
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
126+
this.metadataEncoder.metadata(metadata, mimeType);
127+
}
128+
129+
162130
@Override
163131
public RequestSpec metadata(Object metadata, MimeType mimeType) {
164-
Assert.notNull(metadata, "Metadata content is required");
165-
Assert.notNull(mimeType, "MimeType is required");
166-
Assert.isTrue(this.metadata.isEmpty() || isCompositeMetadata(),
167-
"Composite metadata required for multiple metadata entries.");
168-
this.metadata.put(metadata, mimeType);
132+
this.metadataEncoder.metadata(metadata, mimeType);
169133
return this;
170134
}
171135

@@ -265,70 +229,18 @@ private <T> DataBuffer encodeData(T value, ResolvableType elementType, @Nullable
265229
private Payload firstPayload(DataBuffer data) {
266230
DataBuffer metadata;
267231
try {
268-
metadata = getMetadata();
269-
return PayloadUtils.createPayload(metadata, data);
232+
metadata = this.metadataEncoder.encode();
270233
}
271234
catch (Throwable ex) {
272235
DataBufferUtils.release(data);
273236
throw ex;
274237
}
238+
return PayloadUtils.createPayload(metadata, data);
275239
}
276240

277241
private Mono<Payload> emptyPayload() {
278242
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
279243
}
280-
281-
private DataBuffer getMetadata() {
282-
if (isCompositeMetadata()) {
283-
CompositeByteBuf metadata = getAllocator().compositeBuffer();
284-
this.metadata.forEach((value, mimeType) -> {
285-
DataBuffer dataBuffer = encodeMetadata(value, mimeType);
286-
CompositeMetadataFlyweight.encodeAndAddMetadata(metadata, getAllocator(), mimeType.toString(),
287-
dataBuffer instanceof NettyDataBuffer ?
288-
((NettyDataBuffer) dataBuffer).getNativeBuffer() :
289-
Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()));
290-
});
291-
return asDataBuffer(metadata);
292-
}
293-
else {
294-
Assert.isTrue(this.metadata.size() == 1, "Composite metadata required for multiple entries");
295-
Map.Entry<Object, MimeType> entry = this.metadata.entrySet().iterator().next();
296-
if (!metadataMimeType().equals(entry.getValue())) {
297-
throw new IllegalArgumentException(
298-
"Connection configured for metadata mime type " +
299-
"'" + metadataMimeType() + "', but actual is `" + this.metadata + "`");
300-
}
301-
return encodeMetadata(entry.getKey(), entry.getValue());
302-
}
303-
}
304-
305-
@SuppressWarnings("unchecked")
306-
private <T> DataBuffer encodeMetadata(Object metadata, MimeType mimeType) {
307-
if (metadata instanceof DataBuffer) {
308-
return (DataBuffer) metadata;
309-
}
310-
ResolvableType type = ResolvableType.forInstance(metadata);
311-
Encoder<T> encoder = strategies.encoder(type, mimeType);
312-
Assert.notNull(encoder, () -> "No encoder for metadata " + metadata + ", mimeType '" + mimeType + "'");
313-
return encoder.encodeValue((T) metadata, bufferFactory(), type, mimeType, EMPTY_HINTS);
314-
}
315-
316-
private ByteBufAllocator getAllocator() {
317-
return bufferFactory() instanceof NettyDataBufferFactory ?
318-
((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator() :
319-
ByteBufAllocator.DEFAULT;
320-
}
321-
322-
private DataBuffer asDataBuffer(ByteBuf byteBuf) {
323-
if (bufferFactory() instanceof NettyDataBufferFactory) {
324-
return ((NettyDataBufferFactory) bufferFactory()).wrap(byteBuf);
325-
}
326-
else {
327-
DataBuffer dataBuffer = bufferFactory().wrap(byteBuf.nioBuffer());
328-
byteBuf.release();
329-
return dataBuffer;
330-
}
331-
}
332244
}
333245

334246

0 commit comments

Comments
 (0)