Skip to content

Commit 2c878e9

Browse files
committed
Support for setupPayload in RSocketRequester
Closes gh-23368
1 parent 55946bf commit 2c878e9

File tree

5 files changed

+141
-14
lines changed

5 files changed

+141
-14
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,29 @@
1919
import java.net.URI;
2020
import java.util.ArrayList;
2121
import java.util.Collections;
22+
import java.util.LinkedHashMap;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.function.Consumer;
2426

27+
import io.rsocket.Payload;
2528
import io.rsocket.RSocketFactory;
2629
import io.rsocket.frame.decoder.PayloadDecoder;
2730
import io.rsocket.transport.ClientTransport;
2831
import io.rsocket.transport.netty.client.TcpClientTransport;
2932
import io.rsocket.transport.netty.client.WebsocketClientTransport;
3033
import reactor.core.publisher.Mono;
3134

35+
import org.springframework.core.ResolvableType;
3236
import org.springframework.core.codec.Decoder;
37+
import org.springframework.core.codec.Encoder;
3338
import org.springframework.core.codec.StringDecoder;
39+
import org.springframework.core.io.buffer.DataBuffer;
40+
import org.springframework.core.io.buffer.DataBufferUtils;
3441
import org.springframework.core.io.buffer.NettyDataBufferFactory;
3542
import org.springframework.lang.Nullable;
3643
import org.springframework.util.Assert;
44+
import org.springframework.util.CollectionUtils;
3745
import org.springframework.util.MimeType;
3846

3947
/**
@@ -45,11 +53,26 @@
4553
*/
4654
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
4755

56+
private static final Map<String, Object> HINTS = Collections.emptyMap();
57+
58+
4859
@Nullable
4960
private MimeType dataMimeType;
5061

5162
private MimeType metadataMimeType = MetadataExtractor.COMPOSITE_METADATA;
5263

64+
@Nullable
65+
private Object setupData;
66+
67+
@Nullable
68+
private String setupRoute;
69+
70+
@Nullable
71+
private Object[] setupRouteVars;
72+
73+
@Nullable
74+
private Map<Object, MimeType> setupMetadata;
75+
5376
@Nullable
5477
private RSocketStrategies strategies;
5578

@@ -71,6 +94,26 @@ public RSocketRequester.Builder metadataMimeType(MimeType mimeType) {
7194
return this;
7295
}
7396

97+
@Override
98+
public RSocketRequester.Builder setupData(Object data) {
99+
this.setupData = data;
100+
return this;
101+
}
102+
103+
@Override
104+
public RSocketRequester.Builder setupRoute(String route, Object... routeVars) {
105+
this.setupRoute = route;
106+
this.setupRouteVars = routeVars;
107+
return this;
108+
}
109+
110+
@Override
111+
public RSocketRequester.Builder setupMetadata(Object metadata, @Nullable MimeType mimeType) {
112+
this.setupMetadata = (this.setupMetadata == null ? new LinkedHashMap<>(4) : this.setupMetadata);
113+
this.setupMetadata.put(metadata, mimeType);
114+
return this;
115+
}
116+
74117
@Override
75118
public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies) {
76119
this.strategies = strategies;
@@ -120,12 +163,52 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
120163
factory.dataMimeType(dataMimeType.toString());
121164
factory.metadataMimeType(this.metadataMimeType.toString());
122165

166+
Payload setupPayload = getSetupPayload(dataMimeType, rsocketStrategies);
167+
if (setupPayload != null) {
168+
factory.setupPayload(setupPayload);
169+
}
170+
123171
return factory.transport(transport)
124172
.start()
125173
.map(rsocket -> new DefaultRSocketRequester(
126174
rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies));
127175
}
128176

177+
@Nullable
178+
private Payload getSetupPayload(MimeType dataMimeType, RSocketStrategies strategies) {
179+
DataBuffer metadata = null;
180+
if (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata)) {
181+
metadata = new MetadataEncoder(this.metadataMimeType, strategies)
182+
.metadataAndOrRoute(this.setupMetadata, this.setupRoute, this.setupRouteVars)
183+
.encode();
184+
}
185+
DataBuffer data = null;
186+
if (this.setupData != null) {
187+
try {
188+
ResolvableType type = ResolvableType.forClass(this.setupData.getClass());
189+
Encoder<Object> encoder = strategies.encoder(type, dataMimeType);
190+
Assert.notNull(encoder, () -> "No encoder for " + dataMimeType + ", " + type);
191+
data = encoder.encodeValue(this.setupData, strategies.dataBufferFactory(), type, dataMimeType, HINTS);
192+
}
193+
catch (Throwable ex) {
194+
if (metadata != null) {
195+
DataBufferUtils.release(metadata);
196+
}
197+
throw ex;
198+
}
199+
}
200+
if (metadata == null && data == null) {
201+
return null;
202+
}
203+
metadata = metadata != null ? metadata : emptyBuffer(strategies);
204+
data = data != null ? data : emptyBuffer(strategies);
205+
return PayloadUtils.createPayload(metadata, data);
206+
}
207+
208+
private DataBuffer emptyBuffer(RSocketStrategies strategies) {
209+
return strategies.dataBufferFactory().wrap(new byte[0]);
210+
}
211+
129212
private RSocketStrategies getRSocketStrategies() {
130213
if (!this.strategiesConfigurers.isEmpty()) {
131214
RSocketStrategies.Builder builder =

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.function.Consumer;
2121

2222
import io.rsocket.ConnectionSetupPayload;
23+
import io.rsocket.Payload;
2324
import io.rsocket.RSocket;
2425
import io.rsocket.transport.ClientTransport;
2526
import io.rsocket.transport.netty.client.TcpClientTransport;
@@ -139,6 +140,28 @@ interface Builder {
139140
*/
140141
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
141142

143+
/**
144+
* Set the data for the setup payload. The data will be encoded
145+
* according to the configured {@link #dataMimeType(MimeType)}.
146+
* <p>By default this is not set.
147+
*/
148+
RSocketRequester.Builder setupData(Object data);
149+
150+
/**
151+
* Set the route for the setup payload. The rules for formatting and
152+
* encoding the route are the same as those for a request route as
153+
* described in {@link #route(String, Object...)}.
154+
* <p>By default this is not set.
155+
*/
156+
RSocketRequester.Builder setupRoute(String route, Object... routeVars);
157+
158+
/**
159+
* Add metadata entry to the setup payload. Composite metadata must be
160+
* in use if this is called more than once or in addition to
161+
* {@link #setupRoute(String, Object...)}.
162+
*/
163+
RSocketRequester.Builder setupMetadata(Object value, @Nullable MimeType mimeType);
164+
142165
/**
143166
* Provide {@link RSocketStrategies} to use.
144167
* <p>By default this is based on default settings of
@@ -157,12 +180,20 @@ interface Builder {
157180

158181
/**
159182
* Callback to configure the {@code ClientRSocketFactory} directly.
160-
* <p>Note that the data and metadata mime types cannot be set directly
161-
* on the {@code ClientRSocketFactory}. Use shortcuts on this builder
162-
* {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)}
163-
* instead.
164-
* <p>To configure client side responding, see
183+
* <ul>
184+
* <li>The data and metadata mime types cannot be set directly
185+
* on the {@code ClientRSocketFactory} and will be overridden. Use the
186+
* shortcuts {@link #dataMimeType(MimeType)} and
187+
* {@link #metadataMimeType(MimeType)} on this builder instead.
188+
* <li>The frame decoder also cannot be set directly and instead is set
189+
* to match the configured {@code DataBufferFactory}.
190+
* <li>For the
191+
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#setupPayload(Payload)
192+
* setupPayload}, consider using methods on this builder to specify the
193+
* route, other metadata, and data as Object values to be encoded.
194+
* <li>To configure client side responding, see
165195
* {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
196+
* </ul>
166197
*/
167198
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
168199

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ interface Builder {
188188
* Configure a {@link MetadataExtractor} to extract the route along with
189189
* other metadata. This option is applicable to client or server
190190
* responders.
191-
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
191+
* <p>By default this is {@link DefaultMetadataExtractor} created with
192+
* the {@link #decoder(Decoder[]) configured} decoders and extracting a
192193
* route from {@code "message/x.rsocket.routing.v0"} metadata.
193194
*/
194195
Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor);

spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,24 @@ public void mimeTypesCannotBeChangedAtRSocketFactoryLevel() {
172172
assertThat(requester.metadataMimeType()).isEqualTo(metaMimeType);
173173
}
174174

175+
@Test
176+
public void setupRoute() {
177+
RSocketRequester.builder()
178+
.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
179+
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
180+
.setupRoute("toA")
181+
.setupData("My data")
182+
.connect(this.transport)
183+
.block();
184+
185+
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
186+
.map(ConnectionSetupPayload::create)
187+
.block();
188+
189+
assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA");
190+
assertThat(setupPayload.getDataUtf8()).isEqualTo("My data");
191+
}
192+
175193
@Test
176194
public void frameDecoderMatchesDataBufferFactory() throws Exception {
177195
testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.rsocket.frame.decoder.PayloadDecoder;
2424
import io.rsocket.transport.netty.server.CloseableChannel;
2525
import io.rsocket.transport.netty.server.TcpServerTransport;
26-
import io.rsocket.util.ByteBufPayload;
2726
import org.junit.AfterClass;
2827
import org.junit.BeforeClass;
2928
import org.junit.Test;
@@ -37,12 +36,10 @@
3736
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3837
import org.springframework.context.annotation.Bean;
3938
import org.springframework.context.annotation.Configuration;
40-
import org.springframework.core.codec.StringDecoder;
4139
import org.springframework.messaging.handler.annotation.MessageMapping;
4240
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
4341
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
4442
import org.springframework.stereotype.Controller;
45-
import org.springframework.util.MimeTypeUtils;
4643

4744
/**
4845
* Client-side handling of requests initiated from the server side.
@@ -112,10 +109,9 @@ private static void connectAndRunTest(String connectionRoute) {
112109
RSocketRequester requester = null;
113110
try {
114111
requester = RSocketRequester.builder()
115-
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
112+
.setupRoute(connectionRoute)
116113
.rsocketStrategies(strategies)
117114
.rsocketFactory(clientResponderConfigurer)
118-
.rsocketFactory(factory -> factory.setupPayload(ByteBufPayload.create("", connectionRoute)))
119115
.connectTcp("localhost", server.address().getPort())
120116
.block();
121117

@@ -266,9 +262,7 @@ public RSocketMessageHandler serverMessageHandler() {
266262

267263
@Bean
268264
public RSocketStrategies rsocketStrategies() {
269-
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes());
270-
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
271-
return RSocketStrategies.builder().metadataExtractor(extractor).build();
265+
return RSocketStrategies.create();
272266
}
273267
}
274268

0 commit comments

Comments
 (0)